Python 3 爬虫|第7章:协程 Coroutines
Synopsis: 生成器可以作为协程(coroutine)使用,称为 "基于生成器的协程"。协程和生成器类似,都是定义体中包含 yield 关键字的函数。但它们也有本质区别,生成器用于 "生成" 供迭代的数据,next() 方法只允许调用方从生成器中获取数据; 而协程与迭代无关,协程是数据的消费者,调用方会把数据推送给协程。PEP 342 给生成器增加了 send() 方法,允许调用方和协程之间双向交换数据。PEP 380 允许生成器中可以 return 返回值,并新增了 yield from 语法结构,打开了调用方和子协程的双向通道。PEP 492 新增了 async 和 await 关键字,实现了 "原生协程",以便于跟生成器进行区分。协程不等于异步编程,所以将在下一篇博客中介绍 asyncio 模块,它提供了事件循环,利用 Coroutines、Tasks、Futures 一起才能实现异步 I/O(底层基于 selectors 模块,请回头查看本爬虫系列的第一篇博客 I/O Models 中的 I/O multiplexing)
代码已上传到 https://github.com/wangy8961/python3-concurrency ,欢迎 star
1. 基于生成器的协程
2001 年,Python 2.2 通过了 PEP 255 -- Simple Generators,引入了 yield 关键字实现了 生成器函数,yield 包含 产出 和 让步 两个含义: 生成器 中 yield x 这行代码会 产出 一个值,提供给 next(...) 的调用方; 此外,还会作出 让步,暂停执行 生成器,让调用方继续工作,直到需要使用另一个值时再调用 next(...)
2005 年,Python 2.5 通过了 PEP 342 - "Coroutines via Enhanced Generators",给 生成器 增加了 .send()、.throw() 和 .close() 方法,第一次实现了 基于生成器的协程函数(generator-based coroutines),详情请查看 Python 3 爬虫|第6章:可迭代对象 / 迭代器 / 生成器
协程(coroutine)可以在执行期间暂停(suspend),这样就可以在等待外部数据处理完成之后(例如,等待网络 I/O 数据),从之前暂停的地方恢复执行(resume)
1.1 协程最简单的使用演示
coroutine 同 generator 一样,经常用同一个词来表示两个不同的概念:
- 表示
协程函数(coroutine function) - 表示
协程对象(coroutine object)
In [1]: def simple_coroutine(): ...: print('-> coroutine started') ...: x = yield # 该协程只需从调用方那里接收数据,所以yield关键字右边没有表达式,默认产出None ...: print('-> coroutine received: ', x) ...: In [2]: import inspect In [3]: inspect.iscoroutinefunction(simple_coroutine) Out[3]: False In [4]: inspect.isgeneratorfunction(simple_coroutine) Out[4]: True In [5]: my_coro = simple_coroutine() # 调用协程函数后并不会立即执行定义体里的代码,只是返回一个协程对象 In [6]: my_coro # 基于生成器的协程,其本质还是生成器,只不过是PEP 342加强了生成器功能,增加了send()方法 Out[6]: <generator object simple_coroutine at 0x7f21912c02b0> In [7]: inspect.iscoroutine(my_coro) Out[7]: False In [8]: inspect.isgenerator(my_coro) Out[8]: True In [9]: next(my_coro) # 首先需要启动协程,Python表达式会先执行等号的右边,所以x = yield中只会先执行到yield,根据生成器的语法,调用next()方法会执行yield语句后暂停(这里,由于yield后面没有值,所以输出None后暂停) -> coroutine started # 可以看到这一行会输出 In [10]: my_coro.send(18) # 调用send(18)方法,给yield表达式发送数值18。协程会恢复执行,x = yield 中现在右边整体是18,根据Python赋值语句的语法,现在会把18赋值给等号左边的变量x。再继续执行后面的打印语句,所以会看到输出-> coroutine received: 18。此时,协程运行完成了,根据生成器语法,正常结束后会抛出StopIteration异常 -> coroutine received: 18 --------------------------------------------------------------------------- StopIteration Traceback (most recent call last) <ipython-input-10-5532319df1a8> in <module>() ----> 1 my_coro.send(18) StopIteration:
从语法上看,协程 和 生成器 类似,都是定义体中包含 yield 关键字的函数。但是,协程 中 yield 关键字通常出现在等号的右边,比如 c = yield a + b,等号右边的 yield a + b 称为 yield表达式(yield-expression)。协程 的 调用方 可以通过新增的 .send(value) 方法给 协程 发送数据,发送的数据会成为 yield表达式(yield-expression) 的值。比如执行 .send(10),则 yield表达式 会接收到数值 10,即 yield a + b 等于 10。根据 Python 赋值语句的语法,等号右边的值会被赋值给等号左边的变量,即变量 c 等于 10
注意:
协程可以产出值,即yield关键字后面有表达式,比如x = yield 'Hello',启动协程后会产出字符串 Hello; 也可以不产出值,即yield关键字后面没有表达式,比如x = yield(其实是产出 None)
1.2 协程与生成器的对比
- generators are data producers:
生成器用于生成供迭代的数据,next()方法只允许调用方从生成器中获取数据 - coroutines are data consumers:
协程是数据的消费者,调用方会把数据推送给协程。send()方法允许调用方和协程之间双向交换数据。注意:协程也可以产出值,但这与迭代无关
从根本上把 yield 视作控制流程的方式,这样就好理解 协程 了。使用它可以实现 协作式多任务: 协程 可以把控制权让步给中心调度程序,从而激活其他的 协程
1.3 用作协程的生成器的基本行为
(1) 协程的状态
可以使用 inspect.getgeneratorstate(...) 函数查看 协程 的当前状态:
'GEN_CREATED': 等待开始执行'GEN_RUNNING': 正在被解释器执行。只有在多线程应用中才能看到这个状态'GEN_SUSPENDED': 在yield表达式处暂停'GEN_CLOSED': 执行结束
(2) 剖析协程的执行过程
协程 中 yield 关键字通常出现在等号的右边,比如 c = yield a + b。在赋值语句中,等号右边的代码在赋值之前执行。因此,协程 首先会执行 yield a + b,产出 表达式 a + b 的值,然后 协程 会在 yield 关键字所在的位置 暂停(suspend)。等到 调用方 执行 .send(10) 时,协程 会从之前暂停的地方 恢复执行(resume),而且 .send(10) 方法的参数 10 会成为暂停的 yield 表达式的值。所以 yield a + b 整体等于 10,然后再赋值给变量 c:
In [1]: def simple_coro2(a): # 产出两个值的协程 ...: print('--> Started: a =', a) ...: b = yield a ...: print('--> Received: b =', b) ...: c = yield a + b ...: print('--> Received: c =', c) ...: In [2]: my_coro2 = simple_coro2(14) In [3]: my_coro2 Out[3]: <generator object simple_coro at 0x0441E240> In [4]: from inspect import getgeneratorstate In [5]: getgeneratorstate(my_coro2) Out[5]: 'GEN_CREATED' # 协程未启动 In [6]: next(my_coro2) # 预激协程 --> Started: a = 14 # 执行语句 print('--> Started: a =', a) Out[6]: 14 # 执行赋值语句中等号右边的 yield a ,第一次 产出 a的值14,并且暂停,等待为b赋值 In [7]: getgeneratorstate(my_coro2) Out[7]: 'GEN_SUSPENDED' # 协程在 yield 表达式处暂停 In [8]: my_coro2.send(28) # 调用方把值28发给暂停的协程,计算yield表达式(yield a)得到28,赋值给b --> Received: b = 28 # 执行语句 print('--> Received: b =', b) Out[8]: 42 # 执行赋值语句中等号右边的 yield a + b ,第二次 产出 a+b的值42,并且暂停,等待为c赋值 In [9]: my_coro2.send(99) # 调用方把值99发给暂停的协程,计算yield表达式(yield a + b)得到99,赋值给c --> Received: c = 99 # 执行语句 print('--> Received: c =', c) --------------------------------------------------------------------------- StopIteration Traceback (most recent call last) <ipython-input-9-faabc476fd4f> in <module>() ----> 1 my_coro2.send(99) StopIteration: # 已到达结尾处,协程终止,导致生成器对象抛出 StopIteration 异常 In [10]: getgeneratorstate(my_coro2) Out[10]: 'GEN_CLOSED' # 表明协程执行结束

因为 .send() 方法的参数会成为暂停的 yield 表达式的值,所以,仅当 协程 处于 暂停 状态时才能调用 .send() 方法。如果 协程 还 没激活(即状态是 'GEN_CREATED'),把 None 之外的值发给它会抛出异常:
In [11]: my_coro3 = simple_coro2(3) In [12]: getgeneratorstate(my_coro3) Out[12]: 'GEN_CREATED' In [13]: my_coro3.send(5) --------------------------------------------------------------------------- TypeError Traceback (most recent call last) <ipython-input-14-c26f18faf2b5> in <module>() ----> 1 my_coro3.send(5) TypeError: can't send non-None value to a just-started generator
最先调用 next(my_coro3) 函数这一步通常称为 预激(prime) 协程(即,让 协程 向前执行到第一个 yield 表达式,准备好作为活跃的 协程 使用):
In [15]: next(my_coro3) # 预激协程,相当于 my_coro3.send(None) 或者 my_coro3.__next__() --> Started: a = 3 Out[15]: 3 In [16]: my_coro3.send(5) --> Received: b = 5 Out[16]: 8
(3) 预激协程的装饰器
使用 协程 之前必须 预激(prime)(如果不预激 协程,那么不能调用 .send() 方法发送 None 之外的值给 协程),可是这一步容易忘记。为了避免忘记,可以在 协程 上使用一个特殊的装饰器(装饰器 请参考 http://www.madmalls.com/blog/post/closure-and-decorator-in-python/ ):
'''test.py模块''' from functools import wraps def coroutine(func): @wraps(func) def primer(*args, **kwargs): gen = func(*args, **kwargs) # 调用被装饰的函数,获取生成器对象 next(gen) # 预激生成器 return gen # 返回生成器 return primer @coroutine def averager(): '''计算移动平均值''' total = 0.0 count = 0 average = None while True: term = yield average total += term count += 1 average = total / count
测试:
In [1]: from test import averager In [2]: coro_avg = averager() # 调用 averager() 函数创建一个生成器对象,在 coroutine 装饰器的 primer 函数中已经预激了这个生成器 In [3]: from inspect import getgeneratorstate In [4]: getgeneratorstate(coro_avg) Out[4]: 'GEN_SUSPENDED' # 处于 GEN_SUSPENDED 状态,因此这个协程已经准备好,可以接收值了 In [5]: coro_avg.send(10) Out[5]: 10.0 In [6]: coro_avg.send(20) Out[6]: 15.0 In [7]: coro_avg.send(30) Out[7]: 20.0
(4) 终止协程和异常处理
协程 中未处理的 异常 会向上冒泡,传给 next() 函数或 .send() 方法的 调用方(即触发 协程 的对象):
In [1]: from test import averager In [2]: coro_avg = averager() In [3]: coro_avg.send(10) # 使用 @coroutine 装饰器装饰的 averager 协程,可以立即开始发送值 Out[3]: 10.0 In [4]: coro_avg.send(20) Out[4]: 15.0 In [5]: coro_avg.send('Hi') # 发送的值不是数字,导致协程内部有异常抛出 --------------------------------------------------------------------------- TypeError Traceback (most recent call last) <ipython-input-5-822668dd0208> in <module>() ----> 1 coro_avg.send('Hi') D:\python-
0 条评论
评论者的用户名
评论时间暂时还没有评论.