后面可以加什么
mini_asyncio 已经能跑通最小版本,但离标准库还很远。想继续做,可以按下面顺序扩展。
1. 实现 wait_for
要做的效果:
python
await aio.wait_for(fetch(), timeout=1)1
思路:
- 创建内部 Task。
- 创建 timer,到期后取消内部 Task。
- 内部 Task 成功时取消 timer。
- 超时时向外抛
TimeoutError。
这里会遇到一个很实际的问题:timer 到点和任务正常完成,谁先发生?另一个要怎么取消?
2. 实现 Semaphore
要做的效果:
python
sem = aio.Semaphore(10)
async with sem:
await fetch()1
2
3
2
3
思路和 Queue 很像:没有可用配额时创建 waiter Future,释放配额时唤醒最早等待者。
3. 给 Queue 增加 maxsize
当前 AsyncQueue 是无界队列。加入 maxsize 后,需要同时处理两类等待:
- 等待 item 的 getters。
- 等待空间的 putters。
task_done()和join()的 unfinished 计数。
做完以后,你就能看见“生产者太快时先停一停”是怎么写出来的。
4. 实现 TaskGroup
如果你想继续补齐这套教学运行时,下一步可以做 TaskGroup。
先做一个最小版本:
async with TaskGroup()进入后允许创建子任务。- 退出时等待全部子任务。
- 任一子任务失败时取消其他子任务。
- 把异常聚合后抛出。
做完这一步,你会更清楚为什么一组任务应该放在一个明确的作用域里。
5. 增加 debug 信息
可以记录:
- Task 创建位置。
- Task 当前等待哪个 Future。
- callback 执行耗时。
- 未取走的异常。
标准库 debug 模式做的也是这类事情。
6. 实现一个小型 HTTP 客户端
这里不用做完整 HTTP。先用 sock_connect、sock_send_all、sock_recv 发出这段请求:
text
GET / HTTP/1.0
Host: example.com1
2
2
这个小项目会把 socket I/O、超时、并发限制、响应解析放到同一份代码里。
源码阅读顺序
继续读标准库源码时,可以按这个顺序:
tasks.py的sleep和gather。futures.py里结果是怎么保存的。base_events.py的_run_once。queues.py的Queue。streams.py的 reader/writer。taskgroups.py的取消和异常聚合。
每读一段,都回到 mini_asyncio 问一句:标准库多出来的代码,是在处理哪个我们省略掉的情况?