Python3爬虫系列07 (理论) - 协程

  • 原创
  • Madman
  • /
  • 2018-10-07 09:40
  • /
  • 0
  • 398 次阅读

spider 07-min.jpg

Synopsis: 生成器可以作为协程(coroutine)使用,称为 "基于生成器的协程"。协程和生成器类似,都是定义体中包含 yield 关键字的函数。但它们也有本质区别,生成器用于 生成 供迭代的数据,next()方法只允许调用方从生成器中获取数据; 而协程与迭代无关,协程是数据的消费者,调用方会把数据推送给协程。PEP 342给生成器增加了 send() 方法,允许调用方和协程之间双向交换数据。PEP 380允许生成器中可以return返回值,并新增了 yield from 语法结构,打开了调用方和子协程的双向通道。PEP 492新增了async和await关键字,实现了 "原生协程",以便于跟生成器进行区分。协程不等于异步编程,所以将在下一篇博客中介绍 asyncio 模块,它提供了事件循环,利用Coroutines、Tasks、Futures一起才能实现异步I/O(底层基于selectors模块,请回头查看本爬虫系列的第一篇博客I/O models中的I/O multiplexing)

代码已上传到 https://github.com/wangy8961/python3-concurrency ,欢迎star

1. 基于生成器的协程

2001年,Python 2.2 通过了 PEP 255 -- Simple Generators ,引入了 yield 关键字实现了生成器函数yield包含 产出让步 两个含义: 生成器yield x 这行代码会 产出 一个值,提供给 next(...) 的调用方; 此外,还会作出 让步,暂停执行 生成器,让调用方继续工作,直到需要使用另一个值时再调用 next(...)

2005年,Python 2.5 通过了 PEP 342 - "Coroutines via Enhanced Generators",给生成器增加了.send().throw().close()方法,第一次实现了基于生成器的协程函数(generator-based coroutines),详情请查看上一篇博客 http://www.madmalls.com/blog/post/iterable-iterator-and-generator-in-python/#35-pep-342

协程(coroutine)可以在执行期间暂停(suspend),这样就可以在等待外部数据处理完成之后(例如,等待网络I/O数据),从之前暂停的地方恢复执行(resume)

1.1 协程最简单的使用演示

coroutinegenerator 一样,经常用同一个词来表示两个不同的概念:

  • 表示 协程函数(coroutine function)
  • 表示 协程对象(coroutine object)
In [1]: def simple_coroutine():
   ...:     print('-> coroutine started')
   ...:     x = yield  # 该协程只需从调用方那里接收数据,所以yield关键字右边没有表达式,默认产出None
   ...:     print('-> coroutine received: ', x)
   ...:     

In [2]: import inspect

In [3]: inspect.iscoroutinefunction(simple_coroutine)
Out[3]: False

In [4]: inspect.isgeneratorfunction(simple_coroutine)
Out[4]: True

In [5]: my_coro = simple_coroutine()  # 调用协程函数后并不会立即执行定义体里的代码,只是返回一个协程对象

In [6]: my_coro  # 基于生成器的协程,其本质还是生成器,只不过是PEP 342加强了生成器功能,增加了send()方法
Out[6]: <generator object simple_coroutine at 0x7f21912c02b0>

In [7]: inspect.iscoroutine(my_coro)
Out[7]: False

In [8]: inspect.isgenerator(my_coro)
Out[8]: True

In [9]: next(my_coro)  # 首先需要启动协程,Python表达式会先执行等号的右边,所以x = yield中只会先执行到yield,根据生成器的语法,调用next()方法会执行yield语句后暂停(这里,由于yield后面没有值,所以输出None后暂停)
-> coroutine started  # 可以看到这一行会输出

In [10]: my_coro.send(18)  # 调用send(18)方法,给yield表达式发送数值18。协程会恢复执行,x = yield 中现在右边整体是18,根据Python赋值语句的语法,现在会把18赋值给等号左边的变量x。再继续执行后面的打印语句,所以会看到输出-> coroutine received:  18。此时,协程运行完成了,根据生成器语法,正常结束后会抛出StopIteration异常
-> coroutine received:  18
---------------------------------------------------------------------------
StopIteration                             Traceback (most recent call last)
<ipython-input-10-5532319df1a8> in <module>()
----> 1 my_coro.send(18)

StopIteration: 

从语法上看,协程生成器类似,都是定义体中包含yield关键字的函数。但是,协程yield关键字通常出现在等号的右边,比如c = yield a + b,等号右边的yield a + b称为yield表达式(yield-expression)协程调用方可以通过新增的.send(value)方法给协程发送数据,发送的数据会成为yield表达式(yield-expression)的值。比如执行.send(10),则yield表达式会接收到数值10,即yield a + b等于10。根据Python赋值语句的语法,等号右边的值会被赋值给等号左边的变量,即变量c等于10

注意: 协程可以产出值,即yield关键字后面有表达式,比如x = yield 'Hello',启动协程后会产出字符串 Hello;也可以不产出值,即yield关键字后面没有表达式,比如x = yield(其实是产出None)

1.2 协程与生成器的对比

  • generators are data producers生成器用于生成迭代的数据,next()方法只允许调用方从生成器中获取数据
  • coroutines are data consumers协程是数据的消费者,调用方会把数据推送给协程。send()方法允许调用方和协程之间双向交换数据。注意: 协程也可以产出值,但这与迭代无关

从根本上把 yield 视作控制流程的方式,这样就好理解协程了。使用它可以实现协作式多任务协程可以把控制权让步给中心调度程序,从而激活其他的协程

1.3 用作协程的生成器的基本行为

(1) 协程的状态

可以使用 inspect.getgeneratorstate(...) 函数查看协程的当前状态:

  • 'GEN_CREATED': 等待开始执行
  • 'GEN_RUNNING': 正在被解释器执行。只有在多线程应用中才能看到这个状态
  • 'GEN_SUSPENDED': 在yield表达式处暂停
  • 'GEN_CLOSED': 执行结束

(2) 剖析协程的执行过程

协程yield关键字通常出现在等号的右边,比如c = yield a + b。在赋值语句中,等号右边的代码在赋值之前执行。因此,协程首先会执行yield a + b产出表达式a + b的值,然后协程会在yield关键字所在的位置暂停(suspend)。等到调用方执行.send(10)时,协程会从之前暂停的地方恢复执行(resume)而且.send(10) 方法的参数10会成为暂停的yield表达式的值。所以yield a + b整体等于10,然后再赋值给变量c:

In [1]: def simple_coro2(a):  # 产出两个值的协程
   ...:     print('--> Started: a =', a)
   ...:     b = yield a
   ...:     print('--> Received: b =', b)
   ...:     c = yield a + b
   ...:     print('--> Received: c =', c)
   ...:

In [2]: my_coro2 = simple_coro2(14)

In [3]: my_coro2
Out[3]: <generator object simple_coro at 0x0441E240>

In [4]: from inspect import getgeneratorstate

In [5]: getgeneratorstate(my_coro2)
Out[5]: 'GEN_CREATED'  # 协程未启动

In [6]: next(my_coro2)  # 预激协程
--> Started: a = 14  # 执行语句 print('--> Started: a =', a)
Out[6]: 14  # 执行赋值语句中等号右边的 yield a ,第一次 产出 a的值14,并且暂停,等待为b赋值

In [7]: getgeneratorstate(my_coro2)
Out[7]: 'GEN_SUSPENDED'  # 协程在 yield 表达式处暂停

In [8]: my_coro2.send(28)  # 调用方把值9发给暂停的协程,计算yield表达式(yield a)得到28,赋值给b
--> Received: b = 28  # 执行语句 print('--> Received: b =', b)
Out[8]: 42  # 执行赋值语句中等号右边的 yield a + b ,第二次 产出 a+b的值42,并且暂停,等待为c赋值

In [9]: my_coro2.send(99)  # 调用方把值10发给暂停的协程,计算yield表达式(yield a + b)得到99,赋值给c
--> Received: c = 99  # 执行语句 print('--> Received: c =', c)
---------------------------------------------------------------------------
StopIteration                             Traceback (most recent call last)
<ipython-input-9-faabc476fd4f> in <module>()
----> 1 my_coro2.send(99)

StopIteration:  # 已到达结尾处,协程终止,导致生成器对象抛出 StopIteration 异常

In [10]: getgeneratorstate(my_coro2)
Out[10]: 'GEN_CLOSED'  # 表明协程执行结束

simple coroutine

因为.send()方法的参数会成为暂停的yield表达式的值,所以,仅当协程处于暂停状态时才能调用.send()方法。如果协程没激活(即状态是'GEN_CREATED'),把None之外的值发给它会抛出异常:

In [11]: my_coro3 = simple_coro2(3)

In [12]: getgeneratorstate(my_coro3)
Out[12]: 'GEN_CREATED'

In [13]: my_coro3.send(5)
---------------------------------------------------------------------------
TypeError                                 Traceback (most recent call last)
<ipython-input-14-c26f18faf2b5> in <module>()
----> 1 my_coro3.send(5)

TypeError: can't send non-None value to a just-started generator

最先调用 next(my_coro3) 函数这一步通常称为 预激(prime)协程(即,让协程向前执行到第一个yield表达式,准备好作为活跃的协程使用):

In [15]: next(my_coro3)  # 预激协程,相当于 my_coro3.send(None) 或者 my_coro3.__next__()
--> Started: a = 3
Out[15]: 3

In [16]: my_coro3.send(5)
--> Received: b = 5
Out[16]: 8

(3) 预激协程的装饰器

使用协程之前必须预激(prime)(如果不预激协程,那么不能调用 .send() 方法发送 None 之外的值给协程),可是这一步容易忘记。为了避免忘记,可以在协程上使用一个特殊的装饰器(装饰器请参考 http://www.madmalls.com/blog/post/closure-and-decorator-in-python/ ):

'''test.py模块'''
from functools import wraps

def coroutine(func):
    @wraps(func)
    def primer(*args, **kwargs):
        gen = func(*args, **kwargs)  # 调用被装饰的函数,获取生成器对象
        next(gen)   # 预激生成器
        return gen  # 返回生成器
    return primer

@coroutine
def averager():
    '''计算移动平均值'''
    total = 0.0
    count = 0
    average = None
    while True:
        term = yield average
        total += term
        count += 1
        average = total / count

测试:

In [1]: from test import averager

In [2]: coro_avg = averager()  # 调用 averager() 函数创建一个生成器对象,在 coroutine 装饰器的 primer 函数中已经预激了这个生成器

In [3]: from inspect import getgeneratorstate

In [4]: getgeneratorstate(coro_avg)
Out[4]: 'GEN_SUSPENDED'  # 处于 GEN_SUSPENDED 状态,因此这个协程已经准备好,可以接收值了

In [5]: coro_avg.send(10)
Out[5]: 10.0

In [6]: coro_avg.send(20)
Out[6]: 15.0

In [7]: coro_avg.send(30)
Out[7]: 20.0

(4) 终止协程和异常处理

协程中未处理的异常会向上冒泡,传给next()函数或.send()方法的调用方(即触发协程的对象):

In [1]: from test import averager

In [2]: coro_avg = averager()

In [3]: coro_avg.send(10)  # 使用 @coroutine 装饰器装饰的 averager 协程,可以立即开始发送值
Out[3]: 10.0

In [4]: coro_avg.send(20)
Out[4]: 15.0

In [5]: coro_avg.send('Hi')  # 发送的值不是数字,导致协程内部有异常抛出
---------------------------------------------------------------------------
TypeError                                 Traceback (most recent call last)
<ipython-input-5-822668dd0208> in <module>()
----> 1 coro_avg.send('Hi')

D:\python-code\test.py in averager()
     18     while True:
     19         term = yield average
---> 20         total += term
     21         count += 1
     22         average = total / count

TypeError: unsupported operand type(s) for +=: 'float' and 'str'

In [6]: coro_avg.send(30)  # 由于在协程内没有处理异常,协程会终止。如果试图重新激活协程,会抛出 StopIteration 异常
---------------------------------------------------------------------------
StopIteration                             Traceback (most recent call last)
<ipython-input-6-a92524f121be> in <module>()
----> 1 coro_avg.send(30)

StopIteration:

出错的原因是,发送给协程的字符串 'Hi' 不能加到 total 变量上。从 Python 2.5 开始(PEP 342),调用方可以调用 throw()方法显式地把异常发给协程,调用close()方法来关闭协程

  • generator.throw(exc_type[, exc_value[, traceback]]): 使协程在暂停的yield 表达式处抛出指定的异常。如果协程处理了抛出的异常,代码会向前执行到下一个yield 表达式,而产出的值会返回给generator.throw()方法的调用方。如果协程没有处理抛出的异常异常会向上冒泡,传到调用方的上下文中
  • generator.close(): 使协程在暂停的yield 表达式处抛出GeneratorExit异常,正常终止协程

现在我们来演示如何使用 throw()close() 方法控制协程

'''test.py模块'''
class DemoException(Exception):
    """An exception type for the demonstration."""

def demo_finally():
    print('-> coroutine started')
    try:
        while True:
            try:
                x = yield
            except DemoException:  # 特别处理 DemoException 异常
                print('*** DemoException handled. Continuing...')
            else:  # 如果没有异常,那么显示接收到的值
                print('-> coroutine received: {!r}'.format(x))
    finally:  # 不管协程如何结束都想做些清理工作
        print('-> coroutine ending')

如果把DemoException异常传入demo_finally协程,它会处理,然后继续运行:

In [1]: from test import demo_finally

In [2]: coro_exc = demo_finally()

In [3]: next(coro_exc)
-> coroutine started

In [4]: coro_exc.send(10)
-> coroutine received: 10

In [5]: from test import DemoException

In [6]: coro_exc.throw(DemoException)
*** DemoException handled. Continuing...

In [7]: from inspect import getgeneratorstate

In [8]: getgeneratorstate(coro_exc)
Out[8]: 'GEN_SUSPENDED'

In [9]: coro_exc.send(20)
-> coroutine received: 20

但是,如果传入协程的异常没有处理,协程会终止,即状态变成 'GEN_CLOSED':

In [10]: coro_exc.throw(ZeroDivisionError)
-> coroutine ending
---------------------------------------------------------------------------
ZeroDivisionError                         Traceback (most recent call last)
<ipython-input-11-f06bd2f1e23e> in <module>()
----> 1 coro_exc.throw(ZeroDivisionError)

D:\python-code\test.py in demo_finally()
      8         while True:
      9             try:
---> 10                 x = yield
     11             except DemoException:
     12                 print('*** DemoException handled. Continuing...')

ZeroDivisionError:

In [11]: getgeneratorstate(coro_exc)
Out[11]: 'GEN_CLOSED'

使用close()可以正常终止协程,没有异常:

In [12]: coro_exc2 = demo_finally()

In [13]: next(coro_exc2)
-> coroutine started

In [14]: coro_exc2.send(30)
-> coroutine received: 30

In [15]: coro_exc2.send(40)
-> coroutine received: 40

In [16]: coro_exc2.close()
-> coroutine ending

In [17]: getgeneratorstate(coro_exc2)
Out[17]: 'GEN_CLOSED'

1.4 让协程return值

Python 3.3 通过了 PEP 380 -- Syntax for Delegating to a Subgenerator ,允许在协程中有return expr表达式,如果执行到该语句,则协程运行结束。同时会抛出StopIteration异常,而返回值就在该异常对象的value属性上。如果协程中有return语句,则返回return后面的表达式的值,否则返回None

在Python 3.3之前,如果协程中有return expr表达式,Python解释器会抛出SyntaxError语法错误

有些协程在被激活后,每次驱动(drive)协程时,不会产出值,而是在最后(协程正常终止时)返回一个值(通常是某种累加值)。下面修改之前的计算移动平均值的协程,每次驱动协程时不会产出移动平均值,最后返回的结果是一个namedtuple对象,两个字段分别是项数 count 和平均值 average :

from collections import namedtuple

Result = namedtuple('Result', 'count average')

def averager():
    total = 0.0
    count = 0
    average = None
    while True:
        term = yield
        if term is None:  # 为了返回值,协程必须正常终止。这里有个条件判断,以便退出累计循环
            break
        total += term
        count += 1
        average = total/count
    return Result(count, average)  # 返回一个 namedtuple,包含 count 和 average 两个字段。在 Python 3.3 之前,如果生成器返回值,解释器会报句法错误

测试使用新版的 averager 协程

In [1]: from test import averager

In [2]: coro_avg = averager()

In [3]: next(coro_avg)

In [4]: coro_avg.send(10)  # 每次驱动协程时,不会产出值

In [5]: coro_avg.send(20)

In [6]: coro_avg.send(30)

In [7]: coro_avg.send(None)  # 发送 None 会终止循环,导致协程结束,返回结果
---------------------------------------------------------------------------
StopIteration                             Traceback (most recent call last)
<ipython-input-7-a9c80bbec98f> in <module>()
----> 1 coro_avg.send(None)

StopIteration: Result(count=3, average=20.0)  # 生成器对象的常规行为: 耗尽时会抛出 StopIteration 异常。异常对象的 value 属性保存着返回的值

捕获StopIteration异常,获取 averager 返回的值:

In [8]: coro_avg = averager()

In [9]: next(coro_avg)

In [10]: coro_avg.send(10)

In [11]: coro_avg.send(20)

In [12]: coro_avg.send(30)

In [13]: try:
    ...:     coro_avg.send(None)
    ...: except StopIteration as exc:
    ...:     result = exc.value
    ...:

In [14]: result
Out[14]: Result(count=3, average=20.0)

1.5 链式多协程

可以将多个协程管道(pipelines)一样链接起来使用,注意: 链式多协程中必须存在一个end-point(sink),它接收数据后不再传递给另一个协程。比如在Linux中,可以使用tail -f /var/log/nginx/access.log |grep 'python'来监视Nginx日志的新内容是否包含 'python',如果包含则输出该行:

coroutines pipelines

from functools import wraps
import time

# A decorator function that takes care of starting a coroutine automatically on call
def coroutine(func):
    @wraps(func)
    def primer(*args, **kwargs):
        gen = func(*args, **kwargs)
        next(gen)
        return gen
    return primer

# A data source.  This is not a coroutine, but it sends data into one (target)
def follow(thefile, target):
    thefile.seek(0, 2)      # Go to the end of the file
    while True:
        line = thefile.readline()
        if not line:
            time.sleep(0.1)    # Sleep briefly
            continue
        target.send(line)

# A filter
@coroutine
def grep(pattern, target):
    while True:
        line = yield           # Receive a line
        if pattern in line:
            target.send(line)  # Send to next stage

# A sink.  A coroutine that receives data
@coroutine
def printer():
    while True:
        line = yield
        print(line)

# Example use
if __name__ == '__main__':
    f = open('/var/log/nginx/access.log')
    follow(f, grep('python', printer()))

1.6 yield from: 调用方和子协程的双向通道

2012年,在Python 3.3中实现了PEP 380 - "Syntax for Delegating to a Subgenerator"(把职责委托给子生成器的语法),引入了 yield from 语法结构,yield from表达式的值等于子生成器终止时的返回值,如果子生成器中没有return语句,将返回None。详情见上一篇博客:http://www.madmalls.com/blog/post/iterable-iterator-and-generator-in-python/#2-yield-from-subgenerator

yield from subcoroutine

from collections import namedtuple

Result = namedtuple('Result', 'count average')

# the subgenerator
def averager():  # 与章节1.4中的 averager 协程一样,这里作为协程使用的生成器
    total = 0.0
    count = 0
    average = None
    while True:
        term = yield  # main 函数中的客户代码发送的各个值绑定到这里的 term 变量上
        if term is None:  # 至关重要的终止条件。如果不这么做,使用 yield from 调用这个协程的生成器(委派生成器grouper)会永远阻塞
            break
        total += term
        count += 1
        average = total/count
    return Result(count, average)  # 返回的 Result 会成为 grouper 函数中 yield from 表达式的值

# the delegating generator
def grouper(results, key):  # grouper 是委派生成器
    while True:  # 这个循环每次迭代时会新建一个 averager 实例;每个实例都是作为协程使用的生成器对象
        # main 函数中的客户代码发送的各个值都会经由 yield from 处理,通过管道传给 averager 实例。 
        # grouper 会在 yield from 表达式处暂停,等待 averager 实例处理完客户端发来的所有值。
        # averager 实例运行完毕后,返回的值绑定到 results[key] 上。 
        # while 循环会不断创建 averager 实例,处理更多的值
        results[key] = yield from averager()  

# the client code, a.k.a. the caller
def main(data):  # main 函数是客户端代码,用 PEP 380 定义的术语来说,是"调用方"。这是驱动一切的函数
    results = {}
    for key, values in data.items():
        group = grouper(results, key)  # group 是调用 grouper 函数得到的生成器对象,传给 grouper 函数的第一个参数是results,用于收集结果;第二个参数是某个键。 group 作为协程使用
        next(group)  # 预激 group 协程,同时 grouper 里面的 yield from 会自动预激各个 averager
        for value in values:
            group.send(value)  # 把各个 value 传给 grouper。传入的值最终到达 averager 函数中 term = yield 那一行; grouper 永远不知道传入的值是什么
        group.send(None)  # important! 把 None 传入 grouper,导致当前的 averager 实例终止,也让 grouper 继续运行,再创建一个 averager 实例,处理下一组值

    # print(results)  # uncomment to debug
    report(results)

# output report
def report(results):
    for key, result in sorted(results.items()):
        group, unit = key.split(';')
        print('{:2} {:5} averaging {:.2f}{}'.format(
              result.count, group, result.average, unit))

data = {
    'girls;kg':
        [40.9, 38.5, 44.3, 42.2, 45.2, 41.7, 44.5, 38.0, 40.6, 44.5],
    'girls;m':
        [1.6, 1.51, 1.4, 1.3, 1.41, 1.39, 1.33, 1.46, 1.45, 1.43],
    'boys;kg':
        [39.0, 40.8, 43.2, 40.8, 43.1, 38.6, 41.4, 40.6, 36.3],
    'boys;m':
        [1.38, 1.5, 1.32, 1.25, 1.37, 1.48, 1.25, 1.49, 1.46],
}

if __name__ == '__main__':
    main(data)

# Output:
 9 boys  averaging 40.42kg
 9 boys  averaging 1.39m
10 girls averaging 42.04kg
10 girls averaging 1.43m

2. 原生协程API: async和await

2015年,Python 3.5 通过了 PEP 492 - Coroutines with async and await syntax ,使用async def来定义协程,同时不再使用yield from从子协程中获取返回值,而使用新关键字await(至于为什么已经有基于生成器的协程了还要实现新API呢,请参考:https://www.python.org/dev/peps/pep-0492/#rationale-and-goals)

2.1 新的协程声明语法

In [1]: async def read_data():
   ...:     pass
   ...: 

In [2]: import inspect

In [3]: inspect.iscoroutinefunction(read_data)
Out[3]: True

In [4]: coro = read_data()  # 调用协程函数,返回一个协程对象

In [5]: coro
Out[5]: <coroutine object read_data at 0x7fd7b2f60150>

In [6]: inspect.iscoroutine(coro)
Out[6]: True
  • 不管函数定义体里面有没有await表达式,async def定义的函数永远是协程函数,被称为原生协程(native coroutine)
  • 如果async def定义的协程中混用了yieldyield from,解释器将抛出SyntaxError异常

新语法的底层增加了两个code object flags

  • CO_COROUTINE: 用来标记native coroutines
  • CO_ITERABLE_COROUTINE: 用来标记generator-based coroutines,以便于兼容native coroutines,即在async def定义的协程中await关键字后面可以使用generator-based coroutines,同时在generator-based coroutines的定义体中yield from后面可以使用native coroutines(此时,基于生成器的协程需要使用@types.coroutine装饰器:https://www.python.org/dev/peps/pep-0492/#types-coroutine)

2.2 await表达式

async def read_data(db):
    data = await db.fetch('SELECT ...')
    ...

该示例中的await表达式用来获取db.fetch('SELECT ...')这个子协程运行完成后的返回值awaityield from类似,它会暂停(suspend)执行read_data协程,直到后面的db.fetch运行并返回值之后,才恢复(resume)执行read_data协程,并把db.fetch返回值赋值给data变量

但是,await后面只接受awaitable类型的对象,如果不是awaitable对象则会抛出TypeError异常。awaitable对象包括:

  1. 调用native coroutine function后,返回的native coroutine object
  2. 调用generator-based coroutine function(必须使用@types.coroutine装饰器)后,返回的generator-based coroutine object
  3. 实现了__await__方法,并返回一个迭代器(iterator)的对象,如果__await__方法不返回迭代器,解释器将抛出TypeError异常。下一篇博客将要介绍的asyncio模块中的Future类就实现了__await__方法,实现了__await__方法的对象也叫Future-like objects

如果在async def函数定义体外面使用await,解释器将抛出SyntaxError异常(跟在def函数定义体外使用yield也会抛出SyntaxError异常一样)

2.3 Asynchronous Context Managers 与 async with

异步上下文管理器(Asynchronous Context Managers)是一种上下文管理器,能够在其 enter methods 和 exit methods 中暂停执行,为此新增了两个魔术方法__aenter____aexit__,这两个方法都必须返回awaitable对象,比如下面的示例:

class AsyncContextManager:
    async def __aenter__(self):
        await log('entering context')

    async def __aexit__(self, exc_type, exc, tb):
        await log('exiting context')

同时,新增了async with语法结构来使用异步上下文管理器(Asynchronous Context Managers)

async with EXPR as VAR:
    BLOCK

如果async with后面的上下文管理器中没有实现__aenter____aexit__方法,解释器将直接报错。同样地,不允许在async def函数定义体外面使用async with(否则将抛出SyntaxError异常)

使用异步上下文管理器,可以轻松地在协程中实现数据库事务管理器:

async def commit(session, data):
    ...

    async with session.transaction():
        ...
        await session.update(data)
        ...

同时,要异步获取数据库锁的代码也由:

with (yield from lock):
    ...

变成:

async with lock:
    ...

2.4 Asynchronous Iterators 与 async for

异步可迭代的对象(Asynchronous iterable)必须实现__aiter__方法,并且要返回一个异步迭代器(asynchronous iterator object),可以使用isinstance(x, collections.abc.AsyncIterable)来判断是否为异步可迭代的对象

异步迭代器(Asynchronous Iterator)必须实现:

  • 实现__aiter__方法,返回异步迭代器本身(self)。因为异步迭代器肯定是异步可迭代的对象,所以它要实现该方法
  • 实现__anext__方法,并且要返回一个awaitable对象

可以使用isinstance(x, collections.abc.AsyncIterator)来判断是否为异步迭代器

class AsyncIterable:
    def __aiter__(self):
        return self

    async def __anext__(self):
        data = await self.fetch_data()
        if data:
            return data
        else:
            raise StopAsyncIteration

    async def fetch_data(self):
        ...

同样地,新增了async for来使用异步迭代器(异步迭代):

async for TARGET in ITER:
    BLOCK
else:
    BLOCK2

如果上面的ITER中没有实现__aiter__方法,解释器将抛出TypeError异常。同样地,不允许在async def函数定义体外面使用async for(否则将抛出SyntaxError异常)

2.5 原生协程与生成器的对比

  • native coroutine object不再实现__iter____next__方法,因为协程迭代无关! 也就是说原生协程不能再被iter()list()tuple()等方法迭代,也不能用于for...in循环,否则会报错:TypeError: 'coroutine' object is not iterable
  • 普通的生成器(没有使用@types.coroutine装饰器哦)中不能让yield from后面跟native coroutines,否则会报错:TypeError: cannot 'yield from' a coroutine object in a non-coroutine generator
  • 基于生成器的协程(使用了@types.coroutine装饰器,如果是asyncio模块中使用,则必须使用@asyncio.coroutine,其实asyncio.coroutine代码中也一行coro = types.coroutine(coro))中可以让yield from后面跟native coroutines
  • 对于native coroutine object来说,inspect.isgenerator()inspect.isgeneratorfunction()函数返回False,而inspect.iscoroutine()inspect.iscoroutinefunction函数返回True

注意: native coroutines在Python语法底层还是基于generators实现的,所以原生协程也有send()throw()close()方法,同时StopIterationGeneratorExit扮演相同的角色`

2.6 新的标准库函数

  • types.coroutine(gen),参考: https://www.python.org/dev/peps/pep-0492/#types-coroutine
  • inspect.iscoroutine(obj): 如果 obj 是native coroutine object,则返回True
  • inspect.iscoroutinefunction(obj): 如果 obj 是native coroutine function,则返回True
  • inspect.isawaitable(obj): 如果 obj 是一个awaitable对象,则返回True
  • inspect.getcoroutinestate(coro): 返回native coroutine object的当前状态(类似于inspect.getfgeneratorstate(gen)
  • inspect.getcoroutinelocals(coro): 返回native coroutine objectlocal variables和各变量的对应值

2.7 新的抽象基类

  • collections.abc.AwaitableFuture-like对象的抽象基类,定义了__await__方法
  • collections.abc.Coroutinenative coroutine对象的抽象基类,定义了send()throw()close()__await__方法

注意: 有CO_ITERABLE_COROUTINE标志的generator-based coroutines(即使用了@types.coroutine装饰器),由于它没有实现__await__方法,所以它不是collections.abc.Awaitablecollections.abc.Coroutine的实例对象

In [1]: import types, collections, inspect

In [2]: @types.coroutine
   ...: def gencoro():
   ...:     yield
   ...:     

In [3]: isinstance(gencoro(), collections.abc.Awaitable)
Out[3]: False

In [4]: isinstance(gencoro(), collections.abc.Coroutine)
Out[4]: False

In [5]: inspect.isawaitable(gencoro())  # 使用了@types.coroutine装饰器的generator-based coroutine是awaitable对象,章节2.2中有说明
Out[5]: True

为了方便测试一个对象是否支持异步迭代,新增了两个抽象基类:

  • collections.abc.AsyncIterable: 测试是否有__aiter__方法
  • collections.abc.AsyncIterator: 测试是否有__aiter____anext__方法

2.8 牛刀小试

将章节1.6中的示例改写为async defawait,需要注意的是,由于链式多协程中,最后必须有一个协程只有yield关键字,用来接收调用方发送的值,并在自己终止时把返回值传给await表达式,这个协程也叫end-point(sink)。由于它必须使用yield关键字,所以不能由async def来定义函数体。同时它又必须被await使用,所以它只能是使用了@types.coroutine装饰器的 generator-based coroutine function。另外,由于native coroutine迭代无关,所以没有__next__方法,调用方要激活原生协程,只能使用.send(None),而不能使用next()

from collections import namedtuple
import types


Result = namedtuple('Result', 'count average')

# generator-based coroutine function必须使用@types.coroutine装饰器后,才能被 await 使用
# 否则会报错,TypeError: object generator can't be used in 'await' expression
@types.coroutine
def averager():  # 子协程,是协程链中的end-point (sink)
    total = 0.0
    count = 0
    average = None
    while True:
        term = yield  # main 函数中的客户代码发送的各个值绑定到这里的 term 变量上
        if term is None:  # 至关重要的终止条件。如果不这么做,使用 await 调用这个协程的 "委派协程" - grouper 会永远阻塞
            break
        total += term
        count += 1
        average = total/count
    return Result(count, average)  # 返回的 Result 会成为 grouper 函数中 await 表达式的值

async def grouper(results, key):  # 委派协程
    while True:  # 这个循环每次迭代时会新建一个 averager 实例;每个实例都是作为协程使用的生成器对象
        # main 函数中的客户代码发送的各个值都会经由 await 处理,通过管道传给 averager 实例。
        # grouper 会在 await 表达式处暂停,等待 averager 实例处理完客户端发来的所有值。
        # averager 实例运行完毕后,返回的值绑定到 results[key] 上。
        # while 循环会不断创建 averager 实例,处理更多的值
        results[key] = await averager()  # 阻塞grouper,直到averager返回值

# the client code, a.k.a. the caller
def main(data):  # main 函数是客户端代码,用 PEP 380 定义的术语来说,是"调用方"。这是驱动一切的函数
    results = {}
    for key, values in data.items():
        group = grouper(results, key)  # group 是调用 grouper 函数得到的生成器对象,传给 grouper 函数的第一个参数是results,用于收集结果;第二个参数是某个键。 group 作为协程使用
        group.send(None)  # native coroutine跟迭代无关,所以没有__next__方法,要激活原生协程,只能使用group.send(None),不能使用next(group)
        for value in values:
            group.send(value)  # 把各个 value 传给 grouper。传入的值最终到达 averager 函数中 term = yield 那一行; grouper 永远不知道传入的值是什么
        group.send(None)  # important! 把 None 传入 grouper,导致当前的 averager 实例终止,也让 grouper 继续运行,再创建一个 averager 实例,处理下一组值

    # print(results)  # uncomment to debug
    report(results)

# output report
def report(results):
    for key, result in sorted(results.items()):
        group, unit = key.split(';')
        print('{:2} {:5} averaging {:.2f}{}'.format(
              result.count, group, result.average, unit))

data = {
    'girls;kg':
        [40.9, 38.5, 44.3, 42.2, 45.2, 41.7, 44.5, 38.0, 40.6, 44.5],
    'girls;m':
        [1.6, 1.51, 1.4, 1.3, 1.41, 1.39, 1.33, 1.46, 1.45, 1.43],
    'boys;kg':
        [39.0, 40.8, 43.2, 40.8, 43.1, 38.6, 41.4, 40.6, 36.3],
    'boys;m':
        [1.38, 1.5, 1.32, 1.25, 1.37, 1.48, 1.25, 1.49, 1.46],
}

if __name__ == '__main__':
    main(data)

# Output:
 9 boys  averaging 40.42kg
 9 boys  averaging 1.39m
10 girls averaging 42.04kg
10 girls averaging 1.43m

代码已上传到 https://github.com/wangy8961/python3-concurrency ,欢迎star

未经允许不得转载: LIFE & SHARE - 王颜公子 » Python3爬虫系列07 (理论) - 协程

分享

作者

作者头像

Madman

如果博文内容有误或其它任何问题,欢迎留言评论,我会尽快回复; 或者通过QQ、微信等联系我

0 条评论

暂时还没有评论.

发表评论前请先登录

专题