定时器、I/O 与 Queue
有了 EventLoop、Future、Task,还差一件事:等待什么?这一章接三种等待源:时间、socket、队列。
sleep
python
def sleep(delay, result=None):
loop = get_running_loop()
future = loop.create_future()
loop.call_later(delay, future.set_result, result)
return futuresleep 返回的是 Future。当前 Task await 它以后会暂停;timer 到点后,这个 Future 完成,Task 再继续。
socket I/O
sock_recv 只做一件事:socket 可读时再 recv。
python
def on_ready():
try:
data = sock.recv(nbytes)
except BlockingIOError:
return
self.remove_reader(sock)
future.set_result(data)非阻塞 socket 暂时没数据时会抛 BlockingIOError。这不是失败,只是说明“现在还读不到,等下一次可读通知”。
sock_send_all 的思路是:能写多少先写多少,写不完就继续等可写。
AsyncQueue
Queue 这里没有线程锁。它做的是维护等待者列表:有人等数据,就把它的 Future 存起来;后来有人放数据,就完成这个 Future。
queue.join() 也是同样思路:如果 unfinished 计数不为 0,就创建一个 Future 等待;task_done() 把计数减到 0 时完成这些 Future。
为什么它们都能叫醒 Task
timer、socket、queue 表面上不一样,最后都走同一条路:完成某个 Future。
| 等待源 | Future 什么时候完成 |
|---|---|
sleep | timer 到期 |
sock_recv | socket 可读并成功 recv |
sock_send_all | 所有 bytes 都写完 |
queue.get | 队列里已有 item 或后续 put 唤醒 |
queue.join | unfinished 计数归零 |
Task 不关心结果是 timer 到点来的,还是 socket 读到的,还是 queue 放进来的。它只关心自己 await 的 Future 什么时候完成。
小练习
运行:
bash
python3 examples/mini_asyncio_runtime/demos/03_queue_workers.py
python3 examples/mini_asyncio_runtime/demos/04_socket_echo.py然后在 AsyncQueue.put_nowait() 里打印 _getters 和 _items 的长度,观察 worker 等待和被唤醒的过程。