Skip to content

Streams、Queue 与同步原语

会用 Task 之后,就可以写 I/O 程序了。asyncio 有几类常用工具:Streams 写网络连接,Queue 分发工作,同步原语保护共享状态。

Streams

写 TCP 客户端或服务端,先看 Streams:

python
reader, writer = await asyncio.open_connection("127.0.0.1", 8888)
writer.write(b"hello\n")
await writer.drain()
line = await reader.readline()
writer.close()
await writer.wait_closed()

StreamReader 管读缓冲,StreamWriter 管写入和关闭。用了 Streams,应用代码通常不用直接碰 callback 风格的 Transport/Protocol。

drain 不能省

writer.write(data) 通常只是把数据交给内部传输对象,可能进入内部缓冲区。await writer.drain() 是流控点:当缓冲区到达高水位时,它会暂停当前 Task,等缓冲区降到低水位再继续。

高吞吐服务里如果忘记 drain(),写缓冲可能把内存撑大。

Queue 工作池

Queue 很适合写工作池。生产者只管把任务放进去,消费者慢慢取出来处理:

典型写法:

python
queue = asyncio.Queue(maxsize=100)

async def worker():
    while True:
        item = await queue.get()
        try:
            await handle(item)
        finally:
            queue.task_done()

maxsize 用来限制队列长度。生产者太快时,await queue.put(item) 会暂停,避免内存里越堆越多。

Python 3.13 为 asyncio.Queue 增加了 shutdown()QueueShutDown,用于更明确地结束队列。本教程前面的 Demo 仍然用 sentinel 值结束 worker,因为它能直接展示 put()get()task_done()join() 是怎么配合的;读完这条主线后,再看 shutdown() 会更容易。

同步原语

asyncio 的同步原语类似 threading,但有两个差别先要记住:

  • 它们不是线程安全工具,别拿来同步 OS 线程。
  • 方法通常不带 timeout 参数,需要用 asyncio.wait_for()asyncio.timeout() 包裹。

常见工具:

原语用途
Lock保护一段共享状态,公平唤醒等待者
Event一次性通知多个 Task 某个条件已经发生
Condition条件变量,适合复杂状态等待
Semaphore限制并发数量,例如最多同时 10 个请求
Barrier等一组 Task 都到达某个阶段

Semaphore 限流

python
sem = asyncio.Semaphore(10)

async def fetch(url):
    async with sem:
        return await real_fetch(url)

它限制的是同一时间能进入这段代码的 Task 数量。API 调用、数据库连接、文件打开数量都可以用这种方式加上限。

小练习

运行:

bash
python3 examples/asyncio_demos/08_queue_basics.py
python3 examples/asyncio_demos/09_queue_worker_pool.py
python3 examples/asyncio_demos/10_stream_echo.py

然后修改 09_queue_worker_pool.py

  1. Queue(maxsize=4) 改成 Queue(maxsize=1),观察生产速度。
  2. 把 worker 数量从 3 改成 1,比较总耗时。

面向学习目的的 Python asyncio 中文教程与 mini_asyncio 教学运行时。