这篇文章上次修改于 427 天前,可能其部分内容已经发生变化,如有疑问可询问作者。
Python 异步基础以及原理深层理解
概述
python的异步是基于python的生成器机制,利用生成器的特性,可以完成在多个函数之间互相切换。我们可以关注到生成器的一个特性,就是在next
了之后,这个函数属于一个暂停状态,如果让多个函数在多次暂停下互相切换,就可以做到异步的效果。
asyncio 异步基础
关于python异步有几个常见的类型
- 异步函数:通过
async def
定义的函数,不能直接运行,并且一旦调用就会出现RuntimeWarning
,他会返回一个coroutine
协程对象。 - 协程对象(
coroutine
):通过异步函数调用获取,包含着程序的行为操作。 - 任务对象 (
Task
):包装了协程对象,便于管理和查看协程对象的执行状态,和控制它的执行,使得协程更容易被管理和监控。 Future
对象:是用于表示异步操作结果的对象。它是asyncio
模块中的一个核心概念,用于表示尚未完成的异步操作
关于await
的机制
await后面可以跟上coroutine
或者task
或者Future
,我们现在主要讨论的就是await面对他们进行的行为,我们先讨论两种不同的情况。
async def example1():
print("start")
await asyncio.sleep(1)
print("end")
async def example2():
print("start")
task = asyncio.create_task(asyncio.sleep(1))
await task
print("end")
他们有同样的运行结果,通过viztracor,我们可以发现他们的不同。以下分别是example1和example2
可以发现,example1在await之后并没有交出控制权,而是像等待生成器一样等待asyncio结果。而example2则是在await后结束了,并在之后进行sleep,再回到example2。
何为协程的本质,如何做到中间停止
至于await的具体行为就需要从底层入手
通过dis可见
9 0 LOAD_GLOBAL 0 (print)
2 LOAD_CONST 1 ('start')
4 CALL_FUNCTION 1
6 POP_TOP
10 8 LOAD_GLOBAL 1 (asyncio)
10 LOAD_METHOD 2 (sleep)
12 LOAD_CONST 2 (1e-06)
14 CALL_METHOD 1
16 GET_AWAITABLE
18 LOAD_CONST 0 (None)
20 YIELD_FROM
22 POP_TOP
11 24 LOAD_GLOBAL 0 (print)
26 LOAD_CONST 3 ('end')
28 CALL_FUNCTION 1
30 POP_TOP
32 LOAD_CONST 0 (None)
34 RETURN_VALUE
说明遇到await
的时候,运行了GET_AWAITABLE
字节码到YIELD_FROM
字节码
关于字节码GET_AWAITABLE
先看看GET_AWAITABLE
的源码
case TARGET(GET_AWAITABLE): {
PREDICTED(GET_AWAITABLE);
PyObject *iterable = TOP();
PyObject *iter = _PyCoro_GetAwaitableIter(iterable);
// 如果参数是一个`coroutine`,返回coro本身,如果不是则运行并返回`__await__()`函数的值
if (iter == NULL) {
int opcode_at_minus_3 = 0;
if ((next_instr - first_instr) > 2) {
opcode_at_minus_3 = _Py_OPCODE(next_instr[-3]);
}
format_awaitable_error(tstate, Py_TYPE(iterable),
opcode_at_minus_3,
_Py_OPCODE(next_instr[-2]));
}
Py_DECREF(iterable);
if (iter != NULL && PyCoro_CheckExact(iter)) {
PyObject *yf = _PyGen_yf((PyGenObject*)iter);
if (yf != NULL) {
/* `iter` is a coroutine object that is being
awaited, `yf` is a pointer to the current awaitable
being awaited on. */
Py_DECREF(yf);
Py_CLEAR(iter);
_PyErr_SetString(tstate, PyExc_RuntimeError,
"coroutine is being awaited already");
/* The code below jumps to `error` if `iter` is NULL. */
}
}
SET_TOP(iter); /* Even if it's NULL */
if (iter == NULL) {
goto error;
}
PREDICT(LOAD_CONST);
DISPATCH();
}
其中的关键函数_PyCoro_GetAwaitableIter
的注释说明了:
如果参数是一个coroutine
,返回它本身,如果不是则运行并返回__await__
函数的值存入iter
然后使用了SET_TOP(iter);
将其放在了栈顶,以便下次使用。
什么是__await__
函数
class Future:
"""其他代码略"""
def __await__(self):
if not self.done():
self._asyncio_future_blocking = True
yield self # This tells Task to wait for completion.
if not self.done():
raise RuntimeError("await wasn't used with future")
return self.result() # May raise too.
task
继承于Future
,他们都有这个__await__
函数
- 首先,它检查Future对象是否已经完成(即是否具有结果)。如果Future对象已经完成,它会立即返回结果,允许程序继续执行下一步操作。
- 如果Future对象尚未完成,它将设置
self._asyncio_future_blocking
属性为True,这是为了告诉任务(Task)对象,当前任务需要等待这个Future对象的完成。然后,它使用yield
关键字将自身(即Future对象)返回,暂时挂起当前的协程任务,等待Future对象的完成。 - 一旦Future对象完成,协程将从之前的挂起点继续执行。在这个时候,Future对象已经完成,
self.done()
返回True,因此代码将继续执行。 - 最后,它返回Future对象的结果,如果有的话。
可见,__await__
可以保证已经完成的任务不再被执行,总之就是做了一个前置的检查,查看任务是否已经完成。
关于字节码YIELD_FROM
现在开始具体分析YIELD_FROM
(已手动添加注释)
case TARGET(YIELD_FROM): {
PyObject *v = POP();
PyObject *receiver = TOP(); // 上次GET_AWAITABLE获取的coro
int err;
if (PyGen_CheckExact(receiver) || PyCoro_CheckExact(receiver)) {
// PyGen_CheckExact判断是否是生成器,PyCoro_CheckExact判断是否是coroutine
retval = _PyGen_Send((PyGenObject *)receiver, v);
// 发送一个send,并将yield返回得到的值存入retval
} else {
_Py_IDENTIFIER(send);
if (v == Py_None)
retval = Py_TYPE(receiver)->tp_iternext(receiver);
else
retval = _PyObject_CallMethodIdOneArg(receiver, &PyId_send, v);
}
// retval在结束或者出现StopIteration异常就会为NULL
Py_DECREF(v);
if (retval == NULL) {
// 此处为该coro结束了或者是该coro进行了return
PyObject *val;
if (tstate->c_tracefunc != NULL
&& _PyErr_ExceptionMatches(tstate, PyExc_StopIteration))
call_exc_trace(tstate->c_tracefunc, tstate->c_traceobj, tstate, f);
err = _PyGen_FetchStopIterationValue(&val); // 获取StopIteration内容的值
if (err < 0)
goto error;
Py_DECREF(receiver);
SET_TOP(val);
DISPATCH(); // 结束这个字节码
}
// retval 并非NULL,说明生成了一个值现在存在retval中
/* receiver remains on stack, retval is value to be yielded */
f->f_stacktop = stack_pointer;
/* and repeat... */
assert(f->f_lasti >= (int)sizeof(_Py_CODEUNIT)); // 此时的counter应当还没有结束
f->f_lasti -= sizeof(_Py_CODEUNIT); // 调整counter回到上一句指令,下次调用还会从推出位置开始
goto exiting; // 后面的代码省略,大致就是把yield出来得到的值 返回回去
}
以上说明了一个coroutine
是如何让程序在运行到一半的时候停住,实际上和生成器的原理是一样的
并且有个需要注意的语法糖,在协程函数和生成器函数中出现的return
的实际原理是raise了一个StopIteration
并且把return的值存入这个异常的内容。
这里具体讲述了当await时,发生的具体行为
如何做到并行和相互依赖
class Task(futures._PyFuture):
_log_destroy_pending = True
def __init__(self, coro, *, loop=None, name=None):
super().__init__(loop=loop)
if self._source_traceback:
del self._source_traceback[-1]
if not coroutines.iscoroutine(coro):
self._log_destroy_pending = False
raise TypeError(f"a coroutine was expected, got {coro!r}")
if name is None:
self._name = f'Task-{_task_name_counter()}'
else:
self._name = str(name)
self._must_cancel = False
self._fut_waiter = None
self._coro = coro
self._context = contextvars.copy_context()
self._loop.call_soon(self.__step, context=self._context) #
_register_task(self)
当一个task被创建的时候可见self._loop.call_soon(self.__step, context=self._context)
要求事件循环运行函数__step
def __step(self, exc=None):
if self.done():
raise exceptions.InvalidStateError(
f'_step(): already done: {self!r}, {exc!r}')
if self._must_cancel:
if not isinstance(exc, exceptions.CancelledError):
exc = self._make_cancelled_error()
self._must_cancel = False
coro = self._coro
self._fut_waiter = None
_enter_task(self._loop, self)
# Call either coro.throw(exc) or coro.send(None).
try:
if exc is None:
# We use the `send` method directly, because coroutines
# don't have `__iter__` and `__next__` methods.
result = coro.send(None) # 只会有两种情况,一种是yield了值,此处的值应当是await了一个卡住的任务
else: # result即被卡住的任务
result = coro.throw(exc)
except StopIteration as exc: # 另一种是 没有卡住的任务了(没有await)结束并返回值
if self._must_cancel:
# Task is cancelled right before coro stops.
self._must_cancel = False
super().cancel(msg=self._cancel_message)
else:
super().set_result(exc.value) # 设置返回的结果
"""省略一大段代码"""
elif blocking:
if result is self:
new_exc = RuntimeError(
f'Task cannot await on itself: {self!r}')
self._loop.call_soon(
self.__step, new_exc, context=self._context)
else: # 如果result存放了我需要await的任务
result._asyncio_future_blocking = False
result.add_done_callback(
self.__wakeup, context=self._context) # 给任务添加回调,希望任务结束之后再唤醒自己
self._fut_waiter = result
if self._must_cancel:
if self._fut_waiter.cancel(
msg=self._cancel_message):
self._must_cancel = False
"""省略一大段代码"""
_leave_task(self._loop, self)
self = None # Needed to break cycles when an exception occurs. # 离开了loop,只有被唤醒才会再被加入
举例子,在A运行的时候await了B,此时的result就是B,A让B完成了之后再叫A,这个标记就是add_done_callback
def add_done_callback(self, fn, *, context=None):
"""Add a callback to be run when the future becomes done.
The callback is called with a single argument - the future object. If
the future is already done when this is called, the callback is
scheduled with call_soon.
"""
if self._state != _PENDING:
self._loop.call_soon(fn, self, context=context)
else:
if context is None:
context = contextvars.copy_context()
self._callbacks.append((fn, context))
在完成的时候源码会有这样一行
super().set_result(exc.value) # 设置返回的结果
负责设置返回结果,同时,这个coroutine也结束了,
def set_result(self, result):
"""Mark the future done and set its result.
If the future is already done when this method is called, raises
InvalidStateError.
"""
if self._state != _PENDING:
raise exceptions.InvalidStateError(f'{self._state}: {self!r}')
self._result = result
self._state = _FINISHED
self.__schedule_callbacks()
结尾调用了self.__schedule_callbacks()
def __schedule_callbacks(self):
"""Internal: Ask the event loop to call all callbacks.
The callbacks are scheduled to be called as soon as possible. Also
clears the callback list.
"""
callbacks = self._callbacks[:]
if not callbacks:
return
self._callbacks[:] = []
for callback, ctx in callbacks:
self._loop.call_soon(callback, self, context=ctx)
此处就是运行了我们add_done_callback
加入的__wake
,唤醒了被卡住的那个coroutine。
至此也就解释了互相依赖的原理。