Skip to content

实现事件循环

先从一个问题开始:如果一个 callback 里又安排了新的 callback,要不要马上执行?

如果马上执行,很容易一层套一层,最后整个循环被一串 callback 占住。所以教学版事件循环采用更简单的做法:先放进队列,等下一轮再跑。

EventLoop 只处理三类等待:

  • ready callback。
  • timer callback。
  • socket I/O callback。

数据结构

python
self._ready = collections.deque()
self._scheduled = []
self._selector = selectors.DefaultSelector()
字段结构作用
_readydeque现在可以运行的 callback
_scheduledheap还没到时间的 timer
_selectorDefaultSelector等 socket 可读或可写

call_soon

python
def call_soon(self, callback, *args):
    self._ready.append((callback, args))

这行代码要留意:它只入队,不执行。Task 创建后,也是把 Task._step 放到这里,等事件循环下一轮取出来跑。

call_later

python
def call_later(self, delay, callback, *args):
    return self.call_at(self.time() + delay, callback, *args)

timer 进入一个按时间排序的堆。每轮 _run_once() 开始时,loop 只看堆顶:如果到时间了,就把它挪到 ready 队列;没到时间,就继续等。

_run_once

教学版主循环:

这里还有一个小细节:只运行本轮开始时 ready 队列里已有的 callback。

python
ready_count = len(self._ready)
for _ in range(ready_count):
    callback, args = self._ready.popleft()
    callback(*args)

这样做是为了防止一个 callback 不停创建新 callback,把本轮循环占满。新加入的工作留到下一轮,其他 timer 和 I/O 也有机会被处理。

selector I/O

socket 不能用 while True: sock.recv() 硬等。没数据时这么写会浪费 CPU,甚至把整个事件循环堵住。

sock_recv() 的做法是:先创建一个 Future,然后告诉 selector,“这个 socket 可读时叫我”。

python
future = self.create_future()
self.add_reader(sock, on_ready)
return future

等 selector 告诉 loop 这个 socket 可读,on_ready 才调用 sock.recv()。读到数据后,把数据写进 Future,等待这个 Future 的 Task 就能继续了。

小练习

阅读 sock_send_all(),回答:

  1. 为什么它使用 memoryview
  2. 为什么 BlockingIOError 不算失败?
  3. 为什么完成后要 remove_writer(sock)

面向学习目的的 Python asyncio 中文教程与 mini_asyncio 教学运行时。