这篇文章上次修改于 427 天前,可能其部分内容已经发生变化,如有疑问可询问作者。

Python 异步基础以及原理深层理解

概述

python的异步是基于python的生成器机制,利用生成器的特性,可以完成在多个函数之间互相切换。我们可以关注到生成器的一个特性,就是在next了之后,这个函数属于一个暂停状态,如果让多个函数在多次暂停下互相切换,就可以做到异步的效果。

asyncio 异步基础

关于python异步有几个常见的类型

  1. 异步函数:通过async def定义的函数,不能直接运行,并且一旦调用就会出现RuntimeWarning,他会返回一个coroutine协程对象。
  2. 协程对象(coroutine):通过异步函数调用获取,包含着程序的行为操作。
  3. 任务对象 (Task):包装了协程对象,便于管理和查看协程对象的执行状态,和控制它的执行,使得协程更容易被管理和监控。
  4. Future 对象:是用于表示异步操作结果的对象。它是 asyncio 模块中的一个核心概念,用于表示尚未完成的异步操作

关于await的机制

await后面可以跟上coroutine或者task或者Future,我们现在主要讨论的就是await面对他们进行的行为,我们先讨论两种不同的情况。

async def example1():
    print("start")
    await asyncio.sleep(1)
    print("end")


async def example2():
    print("start")
    task = asyncio.create_task(asyncio.sleep(1))
    await task
    print("end")

他们有同样的运行结果,通过viztracor,我们可以发现他们的不同。以下分别是example1和example2

example1.png

example2.png

可以发现,example1在await之后并没有交出控制权,而是像等待生成器一样等待asyncio结果。而example2则是在await后结束了,并在之后进行sleep,再回到example2。

何为协程的本质,如何做到中间停止

至于await的具体行为就需要从底层入手

通过dis可见

 9           0 LOAD_GLOBAL              0 (print)
             2 LOAD_CONST               1 ('start')
             4 CALL_FUNCTION            1
             6 POP_TOP

10           8 LOAD_GLOBAL              1 (asyncio)
            10 LOAD_METHOD              2 (sleep)
            12 LOAD_CONST               2 (1e-06)
            14 CALL_METHOD              1
            16 GET_AWAITABLE
            18 LOAD_CONST               0 (None)
            20 YIELD_FROM
            22 POP_TOP

11          24 LOAD_GLOBAL              0 (print)
            26 LOAD_CONST               3 ('end')
            28 CALL_FUNCTION            1
            30 POP_TOP
            32 LOAD_CONST               0 (None)
            34 RETURN_VALUE

说明遇到await的时候,运行了GET_AWAITABLE字节码到YIELD_FROM字节码

关于字节码GET_AWAITABLE

先看看GET_AWAITABLE的源码

case TARGET(GET_AWAITABLE): {
    PREDICTED(GET_AWAITABLE);
    PyObject *iterable = TOP();
    PyObject *iter = _PyCoro_GetAwaitableIter(iterable);
	// 如果参数是一个`coroutine`,返回coro本身,如果不是则运行并返回`__await__()`函数的值
    if (iter == NULL) {
        int opcode_at_minus_3 = 0;
        if ((next_instr - first_instr) > 2) {
            opcode_at_minus_3 = _Py_OPCODE(next_instr[-3]);
        }
        format_awaitable_error(tstate, Py_TYPE(iterable),
                               opcode_at_minus_3,
                               _Py_OPCODE(next_instr[-2]));
    }

    Py_DECREF(iterable);

    if (iter != NULL && PyCoro_CheckExact(iter)) {
        PyObject *yf = _PyGen_yf((PyGenObject*)iter);
        if (yf != NULL) {
            /* `iter` is a coroutine object that is being
                       awaited, `yf` is a pointer to the current awaitable
                       being awaited on. */
            Py_DECREF(yf);
            Py_CLEAR(iter);
            _PyErr_SetString(tstate, PyExc_RuntimeError,
                             "coroutine is being awaited already");
            /* The code below jumps to `error` if `iter` is NULL. */
        }
    }

    SET_TOP(iter); /* Even if it's NULL */

    if (iter == NULL) {
        goto error;
    }

    PREDICT(LOAD_CONST);
    DISPATCH();
}

其中的关键函数_PyCoro_GetAwaitableIter的注释说明了:

如果参数是一个coroutine,返回它本身,如果不是则运行并返回__await__函数的值存入iter

然后使用了SET_TOP(iter);将其放在了栈顶,以便下次使用。

什么是__await__函数

class Future:
    """其他代码略"""
    def __await__(self):
        if not self.done():
            self._asyncio_future_blocking = True
            yield self  # This tells Task to wait for completion.
        if not self.done():
            raise RuntimeError("await wasn't used with future")
        return self.result()  # May raise too.

task继承于Future,他们都有这个__await__函数

  1. 首先,它检查Future对象是否已经完成(即是否具有结果)。如果Future对象已经完成,它会立即返回结果,允许程序继续执行下一步操作。
  2. 如果Future对象尚未完成,它将设置self._asyncio_future_blocking属性为True,这是为了告诉任务(Task)对象,当前任务需要等待这个Future对象的完成。然后,它使用yield关键字将自身(即Future对象)返回,暂时挂起当前的协程任务,等待Future对象的完成。
  3. 一旦Future对象完成,协程将从之前的挂起点继续执行。在这个时候,Future对象已经完成,self.done()返回True,因此代码将继续执行。
  4. 最后,它返回Future对象的结果,如果有的话。

可见,__await__可以保证已经完成的任务不再被执行,总之就是做了一个前置的检查,查看任务是否已经完成。

关于字节码YIELD_FROM

现在开始具体分析YIELD_FROM(已手动添加注释)

case TARGET(YIELD_FROM): {
    PyObject *v = POP();
    PyObject *receiver = TOP(); // 上次GET_AWAITABLE获取的coro
    int err;
    if (PyGen_CheckExact(receiver) || PyCoro_CheckExact(receiver)) {
        // PyGen_CheckExact判断是否是生成器,PyCoro_CheckExact判断是否是coroutine
        retval = _PyGen_Send((PyGenObject *)receiver, v);
        // 发送一个send,并将yield返回得到的值存入retval
    } else {
        _Py_IDENTIFIER(send);
        if (v == Py_None)
            retval = Py_TYPE(receiver)->tp_iternext(receiver);
        else
            retval = _PyObject_CallMethodIdOneArg(receiver, &PyId_send, v);
    }
    // retval在结束或者出现StopIteration异常就会为NULL
    Py_DECREF(v);
    if (retval == NULL) {
        // 此处为该coro结束了或者是该coro进行了return
        PyObject *val;
        if (tstate->c_tracefunc != NULL
            && _PyErr_ExceptionMatches(tstate, PyExc_StopIteration))
            call_exc_trace(tstate->c_tracefunc, tstate->c_traceobj, tstate, f);
        err = _PyGen_FetchStopIterationValue(&val); // 获取StopIteration内容的值
        if (err < 0)
            goto error;
        Py_DECREF(receiver);
        SET_TOP(val);
        DISPATCH(); // 结束这个字节码
    }
    // retval 并非NULL,说明生成了一个值现在存在retval中
    
    /* receiver remains on stack, retval is value to be yielded */
    f->f_stacktop = stack_pointer;
    /* and repeat... */
    assert(f->f_lasti >= (int)sizeof(_Py_CODEUNIT)); // 此时的counter应当还没有结束
    f->f_lasti -= sizeof(_Py_CODEUNIT); // 调整counter回到上一句指令,下次调用还会从推出位置开始
    goto exiting; // 后面的代码省略,大致就是把yield出来得到的值 返回回去
}

以上说明了一个coroutine是如何让程序在运行到一半的时候停住,实际上和生成器的原理是一样的

并且有个需要注意的语法糖,在协程函数和生成器函数中出现的return的实际原理是raise了一个StopIteration并且把return的值存入这个异常的内容。

这里具体讲述了当await时,发生的具体行为

如何做到并行和相互依赖

class Task(futures._PyFuture):
    _log_destroy_pending = True

    def __init__(self, coro, *, loop=None, name=None):
        super().__init__(loop=loop)
        if self._source_traceback:
            del self._source_traceback[-1]
        if not coroutines.iscoroutine(coro):
            self._log_destroy_pending = False
            raise TypeError(f"a coroutine was expected, got {coro!r}")

        if name is None:
            self._name = f'Task-{_task_name_counter()}'
        else:
            self._name = str(name)

        self._must_cancel = False
        self._fut_waiter = None
        self._coro = coro
        self._context = contextvars.copy_context()

        self._loop.call_soon(self.__step, context=self._context)  # 
        _register_task(self)

当一个task被创建的时候可见self._loop.call_soon(self.__step, context=self._context)

要求事件循环运行函数__step

def __step(self, exc=None):
    if self.done():
        raise exceptions.InvalidStateError(
            f'_step(): already done: {self!r}, {exc!r}')
    if self._must_cancel:
        if not isinstance(exc, exceptions.CancelledError):
            exc = self._make_cancelled_error()
        self._must_cancel = False
    coro = self._coro
    self._fut_waiter = None

    _enter_task(self._loop, self)
    # Call either coro.throw(exc) or coro.send(None).
    try:
        if exc is None:
            # We use the `send` method directly, because coroutines
            # don't have `__iter__` and `__next__` methods.
            result = coro.send(None)  # 只会有两种情况,一种是yield了值,此处的值应当是await了一个卡住的任务
        else:  # result即被卡住的任务
            result = coro.throw(exc)
    except StopIteration as exc:  # 另一种是 没有卡住的任务了(没有await)结束并返回值
        if self._must_cancel:
            # Task is cancelled right before coro stops.
            self._must_cancel = False
            super().cancel(msg=self._cancel_message)
        else:
            super().set_result(exc.value)  # 设置返回的结果
		"""省略一大段代码"""
            elif blocking:
                if result is self:
                    new_exc = RuntimeError(
                        f'Task cannot await on itself: {self!r}')
                    self._loop.call_soon(
                        self.__step, new_exc, context=self._context)
                else:  # 如果result存放了我需要await的任务
                    result._asyncio_future_blocking = False
                    result.add_done_callback(
                        self.__wakeup, context=self._context)  # 给任务添加回调,希望任务结束之后再唤醒自己
                    self._fut_waiter = result
                    if self._must_cancel:
                        if self._fut_waiter.cancel(
                                msg=self._cancel_message):
                            self._must_cancel = False
		"""省略一大段代码"""
        _leave_task(self._loop, self)
        self = None  # Needed to break cycles when an exception occurs.  # 离开了loop,只有被唤醒才会再被加入
        

举例子,在A运行的时候await了B,此时的result就是B,A让B完成了之后再叫A,这个标记就是add_done_callback

def add_done_callback(self, fn, *, context=None):
    """Add a callback to be run when the future becomes done.

    The callback is called with a single argument - the future object. If
    the future is already done when this is called, the callback is
    scheduled with call_soon.
    """
    if self._state != _PENDING:
        self._loop.call_soon(fn, self, context=context)
    else:
        if context is None:
            context = contextvars.copy_context()
        self._callbacks.append((fn, context))

在完成的时候源码会有这样一行

super().set_result(exc.value) # 设置返回的结果

负责设置返回结果,同时,这个coroutine也结束了,

def set_result(self, result):
    """Mark the future done and set its result.

    If the future is already done when this method is called, raises
    InvalidStateError.
    """
    if self._state != _PENDING:
        raise exceptions.InvalidStateError(f'{self._state}: {self!r}')
    self._result = result
    self._state = _FINISHED
    self.__schedule_callbacks()

结尾调用了self.__schedule_callbacks()

def __schedule_callbacks(self):
    """Internal: Ask the event loop to call all callbacks.

    The callbacks are scheduled to be called as soon as possible. Also
    clears the callback list.
    """
    callbacks = self._callbacks[:]
    if not callbacks:
        return

    self._callbacks[:] = []
    for callback, ctx in callbacks:
        self._loop.call_soon(callback, self, context=ctx)

此处就是运行了我们add_done_callback加入的__wake,唤醒了被卡住的那个coroutine。

至此也就解释了互相依赖的原理。