Skip to content

接口怎么定

这里开始写 mini_asyncio。先定接口,不急着写事件循环。接口只保留前面练习用过的几件事:启动入口、创建任务、等待时间、收集结果、Queue、socket 读写。

对外 API

python
import mini_asyncio as aio

aio.run(main())
aio.create_task(coro)
await aio.sleep(1)
await aio.gather(coro1, coro2)

queue = aio.AsyncQueue()
await queue.put(item)
item = await queue.get()
queue.task_done()
await queue.join()

事件循环里会用到的 API:

python
loop = aio.get_running_loop()
future = loop.create_future()
loop.call_soon(callback)
loop.call_later(1, callback)
data = await loop.sock_recv(sock, 1024)
await loop.sock_send_all(sock, data)

为什么需要 run

aio.run(main()) 做三件事:

  1. 创建新的 EventLoop
  2. main() 包装成 Task 并运行到完成。
  3. 关闭 loop,清理全局引用。

对应实现:

python
def run(awaitable):
    loop = new_event_loop()
    set_event_loop(loop)
    try:
        return loop.run_until_complete(awaitable)
    finally:
        loop.close()
        set_event_loop(None)

这和标准库 asyncio.run() 的教学版思路一致,但省略了异步生成器、executor、SIGINT 等收尾细节。

为什么 create_task 只能在 running loop 里调用

python
def create_task(coro):
    return get_running_loop().create_task(coro)

Task 必须绑定正在运行的事件循环。否则它不知道把 _step callback 放到哪里,也不知道 Future 完成后应该由哪个 loop 恢复它。

为什么 Future 有 await

python
def __await__(self):
    if not self._done:
        yield self
    return self.result()

这段代码让 Future 可以被 await。Task 运行 coroutine 时,如果看到 coroutine yield 出一个 Future,就给它注册回调,等 Future 完成后再恢复 Task。

教学版 gather 做了什么

教学版 gather 只做三件事:

  1. 把 coroutine 包装成 Task。
  2. 给每个 Task 加 done callback。
  3. 全部成功后按输入顺序返回结果。

它不处理标准库 gather 的所有异常和取消细节。这里先把主线写清楚:数一数还有几个任务没结束,保存每个任务的结果,最后完成外层 Future。

小练习

打开 examples/mini_asyncio_runtime/demos/02_gather.py,把 fetch() 的 delay 调整成不同顺序,验证返回列表仍然按输入顺序排列。

面向学习目的的 Python asyncio 中文教程与 mini_asyncio 教学运行时。