Skip to content

定时器、I/O 与 Queue

有了 EventLoop、Future、Task,还差一件事:等待什么?这一章接三种等待源:时间、socket、队列。

sleep

python
def sleep(delay, result=None):
    loop = get_running_loop()
    future = loop.create_future()
    loop.call_later(delay, future.set_result, result)
    return future

sleep 返回的是 Future。当前 Task await 它以后会暂停;timer 到点后,这个 Future 完成,Task 再继续。

socket I/O

sock_recv 只做一件事:socket 可读时再 recv

python
def on_ready():
    try:
        data = sock.recv(nbytes)
    except BlockingIOError:
        return
    self.remove_reader(sock)
    future.set_result(data)

非阻塞 socket 暂时没数据时会抛 BlockingIOError。这不是失败,只是说明“现在还读不到,等下一次可读通知”。

sock_send_all 的思路是:能写多少先写多少,写不完就继续等可写。

AsyncQueue

Queue 这里没有线程锁。它做的是维护等待者列表:有人等数据,就把它的 Future 存起来;后来有人放数据,就完成这个 Future。

queue.join() 也是同样思路:如果 unfinished 计数不为 0,就创建一个 Future 等待;task_done() 把计数减到 0 时完成这些 Future。

为什么它们都能叫醒 Task

timer、socket、queue 表面上不一样,最后都走同一条路:完成某个 Future。

等待源Future 什么时候完成
sleeptimer 到期
sock_recvsocket 可读并成功 recv
sock_send_all所有 bytes 都写完
queue.get队列里已有 item 或后续 put 唤醒
queue.joinunfinished 计数归零

Task 不关心结果是 timer 到点来的,还是 socket 读到的,还是 queue 放进来的。它只关心自己 await 的 Future 什么时候完成。

小练习

运行:

bash
python3 examples/mini_asyncio_runtime/demos/03_queue_workers.py
python3 examples/mini_asyncio_runtime/demos/04_socket_echo.py

然后在 AsyncQueue.put_nowait() 里打印 _getters_items 的长度,观察 worker 等待和被唤醒的过程。

面向学习目的的 Python asyncio 中文教程与 mini_asyncio 教学运行时。