接口怎么定
这里开始写 mini_asyncio。先定接口,不急着写事件循环。接口只保留前面练习用过的几件事:启动入口、创建任务、等待时间、收集结果、Queue、socket 读写。
对外 API
python
import mini_asyncio as aio
aio.run(main())
aio.create_task(coro)
await aio.sleep(1)
await aio.gather(coro1, coro2)
queue = aio.AsyncQueue()
await queue.put(item)
item = await queue.get()
queue.task_done()
await queue.join()事件循环里会用到的 API:
python
loop = aio.get_running_loop()
future = loop.create_future()
loop.call_soon(callback)
loop.call_later(1, callback)
data = await loop.sock_recv(sock, 1024)
await loop.sock_send_all(sock, data)为什么需要 run
aio.run(main()) 做三件事:
- 创建新的
EventLoop。 - 把
main()包装成 Task 并运行到完成。 - 关闭 loop,清理全局引用。
对应实现:
python
def run(awaitable):
loop = new_event_loop()
set_event_loop(loop)
try:
return loop.run_until_complete(awaitable)
finally:
loop.close()
set_event_loop(None)这和标准库 asyncio.run() 的教学版思路一致,但省略了异步生成器、executor、SIGINT 等收尾细节。
为什么 create_task 只能在 running loop 里调用
python
def create_task(coro):
return get_running_loop().create_task(coro)Task 必须绑定正在运行的事件循环。否则它不知道把 _step callback 放到哪里,也不知道 Future 完成后应该由哪个 loop 恢复它。
为什么 Future 有 await
python
def __await__(self):
if not self._done:
yield self
return self.result()这段代码让 Future 可以被 await。Task 运行 coroutine 时,如果看到 coroutine yield 出一个 Future,就给它注册回调,等 Future 完成后再恢复 Task。
教学版 gather 做了什么
教学版 gather 只做三件事:
- 把 coroutine 包装成 Task。
- 给每个 Task 加 done callback。
- 全部成功后按输入顺序返回结果。
它不处理标准库 gather 的所有异常和取消细节。这里先把主线写清楚:数一数还有几个任务没结束,保存每个任务的结果,最后完成外层 Future。
小练习
打开 examples/mini_asyncio_runtime/demos/02_gather.py,把 fetch() 的 delay 调整成不同顺序,验证返回列表仍然按输入顺序排列。