Streams、Queue 与同步原语
会用 Task 之后,就可以写 I/O 程序了。asyncio 有几类常用工具:Streams 写网络连接,Queue 分发工作,同步原语保护共享状态。
Streams
写 TCP 客户端或服务端,先看 Streams:
python
reader, writer = await asyncio.open_connection("127.0.0.1", 8888)
writer.write(b"hello\n")
await writer.drain()
line = await reader.readline()
writer.close()
await writer.wait_closed()StreamReader 管读缓冲,StreamWriter 管写入和关闭。用了 Streams,应用代码通常不用直接碰 callback 风格的 Transport/Protocol。
drain 不能省
writer.write(data) 通常只是把数据交给内部传输对象,可能进入内部缓冲区。await writer.drain() 是流控点:当缓冲区到达高水位时,它会暂停当前 Task,等缓冲区降到低水位再继续。
高吞吐服务里如果忘记 drain(),写缓冲可能把内存撑大。
Queue 工作池
Queue 很适合写工作池。生产者只管把任务放进去,消费者慢慢取出来处理:
典型写法:
python
queue = asyncio.Queue(maxsize=100)
async def worker():
while True:
item = await queue.get()
try:
await handle(item)
finally:
queue.task_done()maxsize 用来限制队列长度。生产者太快时,await queue.put(item) 会暂停,避免内存里越堆越多。
Python 3.13 为 asyncio.Queue 增加了 shutdown() 和 QueueShutDown,用于更明确地结束队列。本教程前面的 Demo 仍然用 sentinel 值结束 worker,因为它能直接展示 put()、get()、task_done() 和 join() 是怎么配合的;读完这条主线后,再看 shutdown() 会更容易。
同步原语
asyncio 的同步原语类似 threading,但有两个差别先要记住:
- 它们不是线程安全工具,别拿来同步 OS 线程。
- 方法通常不带
timeout参数,需要用asyncio.wait_for()或asyncio.timeout()包裹。
常见工具:
| 原语 | 用途 |
|---|---|
Lock | 保护一段共享状态,公平唤醒等待者 |
Event | 一次性通知多个 Task 某个条件已经发生 |
Condition | 条件变量,适合复杂状态等待 |
Semaphore | 限制并发数量,例如最多同时 10 个请求 |
Barrier | 等一组 Task 都到达某个阶段 |
Semaphore 限流
python
sem = asyncio.Semaphore(10)
async def fetch(url):
async with sem:
return await real_fetch(url)它限制的是同一时间能进入这段代码的 Task 数量。API 调用、数据库连接、文件打开数量都可以用这种方式加上限。
小练习
运行:
bash
python3 examples/asyncio_demos/08_queue_basics.py
python3 examples/asyncio_demos/09_queue_worker_pool.py
python3 examples/asyncio_demos/10_stream_echo.py然后修改 09_queue_worker_pool.py:
- 把
Queue(maxsize=4)改成Queue(maxsize=1),观察生产速度。 - 把 worker 数量从 3 改成 1,比较总耗时。