实现事件循环
先从一个问题开始:如果一个 callback 里又安排了新的 callback,要不要马上执行?
如果马上执行,很容易一层套一层,最后整个循环被一串 callback 占住。所以教学版事件循环采用更简单的做法:先放进队列,等下一轮再跑。
EventLoop 只处理三类等待:
- ready callback。
- timer callback。
- socket I/O callback。
数据结构
python
self._ready = collections.deque()
self._scheduled = []
self._selector = selectors.DefaultSelector()| 字段 | 结构 | 作用 |
|---|---|---|
_ready | deque | 现在可以运行的 callback |
_scheduled | heap | 还没到时间的 timer |
_selector | DefaultSelector | 等 socket 可读或可写 |
call_soon
python
def call_soon(self, callback, *args):
self._ready.append((callback, args))这行代码要留意:它只入队,不执行。Task 创建后,也是把 Task._step 放到这里,等事件循环下一轮取出来跑。
call_later
python
def call_later(self, delay, callback, *args):
return self.call_at(self.time() + delay, callback, *args)timer 进入一个按时间排序的堆。每轮 _run_once() 开始时,loop 只看堆顶:如果到时间了,就把它挪到 ready 队列;没到时间,就继续等。
_run_once
教学版主循环:
这里还有一个小细节:只运行本轮开始时 ready 队列里已有的 callback。
python
ready_count = len(self._ready)
for _ in range(ready_count):
callback, args = self._ready.popleft()
callback(*args)这样做是为了防止一个 callback 不停创建新 callback,把本轮循环占满。新加入的工作留到下一轮,其他 timer 和 I/O 也有机会被处理。
selector I/O
socket 不能用 while True: sock.recv() 硬等。没数据时这么写会浪费 CPU,甚至把整个事件循环堵住。
sock_recv() 的做法是:先创建一个 Future,然后告诉 selector,“这个 socket 可读时叫我”。
python
future = self.create_future()
self.add_reader(sock, on_ready)
return future等 selector 告诉 loop 这个 socket 可读,on_ready 才调用 sock.recv()。读到数据后,把数据写进 Future,等待这个 Future 的 Task 就能继续了。
小练习
阅读 sock_send_all(),回答:
- 为什么它使用
memoryview? - 为什么
BlockingIOError不算失败? - 为什么完成后要
remove_writer(sock)?