Skip to content

后面可以加什么

mini_asyncio 已经能跑通最小版本,但离标准库还很远。想继续做,可以按下面顺序扩展。

1. 实现 wait_for

要做的效果:

python
await aio.wait_for(fetch(), timeout=1)

思路:

  1. 创建内部 Task。
  2. 创建 timer,到期后取消内部 Task。
  3. 内部 Task 成功时取消 timer。
  4. 超时时向外抛 TimeoutError

这里会遇到一个很实际的问题:timer 到点和任务正常完成,谁先发生?另一个要怎么取消?

2. 实现 Semaphore

要做的效果:

python
sem = aio.Semaphore(10)
async with sem:
    await fetch()

思路和 Queue 很像:没有可用配额时创建 waiter Future,释放配额时唤醒最早等待者。

3. 给 Queue 增加 maxsize

当前 AsyncQueue 是无界队列。加入 maxsize 后,需要同时处理两类等待:

  • 等待 item 的 getters。
  • 等待空间的 putters。
  • task_done()join() 的 unfinished 计数。

做完以后,你就能看见“生产者太快时先停一停”是怎么写出来的。

4. 实现 TaskGroup

如果你想继续补齐这套教学运行时,下一步可以做 TaskGroup

先做一个最小版本:

  1. async with TaskGroup() 进入后允许创建子任务。
  2. 退出时等待全部子任务。
  3. 任一子任务失败时取消其他子任务。
  4. 把异常聚合后抛出。

做完这一步,你会更清楚为什么一组任务应该放在一个明确的作用域里。

5. 增加 debug 信息

可以记录:

  • Task 创建位置。
  • Task 当前等待哪个 Future。
  • callback 执行耗时。
  • 未取走的异常。

标准库 debug 模式做的也是这类事情。

6. 实现一个小型 HTTP 客户端

这里不用做完整 HTTP。先用 sock_connectsock_send_allsock_recv 发出这段请求:

text
GET / HTTP/1.0
Host: example.com

这个小项目会把 socket I/O、超时、并发限制、响应解析放到同一份代码里。

源码阅读顺序

继续读标准库源码时,可以按这个顺序:

  1. tasks.pysleepgather
  2. futures.py 里结果是怎么保存的。
  3. base_events.py_run_once
  4. queues.pyQueue
  5. streams.py 的 reader/writer。
  6. taskgroups.py 的取消和异常聚合。

每读一段,都回到 mini_asyncio 问一句:标准库多出来的代码,是在处理哪个我们省略掉的情况?

面向学习目的的 Python asyncio 中文教程与 mini_asyncio 教学运行时。