Python3爬虫系列08 (理论) - 使用asyncio模块实现并发

  • 原创
  • Madman
  • /
  • 2018-10-08 15:39
  • /
  • 0
  • 801 次阅读

spider 08-min.jpg

Synopsis: asyncio 模块于Python 3.4添加到标准库中,它在单线程中使用事件循环来驱动协程从而实现并发。对事件循环来说,调用回调与在暂停的协程上调用 .send() 方法效果差不多。各个暂停的协程是要消耗内存,但是比线程消耗的内存数量级小。而且,协程能避免可怕的"回调地狱"。使用 asyncio 包时,我们编写的协程被包装成Task对象(相当于调用方),并且在我们编写的协程中,会通过调用 await 或 yield from 来使用由 asyncio 模块或第三方库(如aiohttp)所提供的协程(即委派生成器),而生成器最终把职责委托给Future对象,这种处理方式相当于架起了管道,让 asyncio 事件循环(通过我们编写的协程)驱动执行低层异步 I/O 操作的库函数

代码已上传到 https://github.com/wangy8961/python3-concurrency ,欢迎star

1. asyncio概述

Python 3.4的标准库中添加了asyncio模块,它在单线程中使用事件循环(event loop)来驱动协程(coroutines)从而实现并发(concurrency)。此模块的主要组件和概念包括:

  • Event Loop: 每个线程中只能有一个事件循环,每次只能运行一个任务(Task)事件循环会不断地重复 "监听事件发生 -- 处理该事件" 这一过程。当Task A在等待Future对象运行完成时,会把控制权交还事件循环(通过yield关键字),这样事件循环就可以运行Task B。一段时间后,Task A所等待的Future对象完成了,事件循环会唤醒Task A继续运行,从而实现并发
  • Coroutines: 协程可以在执行期间暂停(suspend),这样就可以在等待外部的处理(例如,等待网络I/O数据)完成之后,从之前暂停的地方恢复执行(resume)
  • FuturesFuture对象是一个"占位符",用来保存尚未完成的操作的结果(或异常)。在asyncio模块中,所有需要异步完成的操作都对应一个Future对象,当操作完成时,将数据设置为Future对象的结果。Python 3.4时实现了__iter__方法,可以在协程中使用yield from future等待Future对象的返回值; Python 3.5+实现了__await__方法,可以在协程中使用await futureyield from future(有一行代码: __iter__ = __await__)等待Future对象的返回值
  • TasksTask类是Future的子类,用来包装我们写的协程,当事件循环启动后,Task会自动调用result = coro.send(None)驱动协程的运行,相当于 http://www.madmalls.com/blog/post/coroutine-in-python/#16-yield-from 中的调用方,而Future相当于end-point(sink)Future__await__方法中的yield self会直接将Future对象本身传递给Task中的result并把控制权交还事件循环。同时,Task会向事件循环注册事件: 当Future对象完成时,请执行我的__wakeup()方法唤醒我。所以,当Future对象结束时,Task被唤醒,再次执行coro.send(None),最终获取到Future对象的结果(可能被协程链中的其它协程处理过),并设置为Task的结果,详情见章节5的总结

由于asyncio模块使用单线程,任何阻塞操作都会阻塞整个事件循环,所以任何阻塞型操作都要换成非阻塞型的异步操作。标准库中的socket模块,所有的方法比如socket.accept()socket.recv()等都是是阻塞型操作,所以要换成非阻塞型的异步操作比如loop.sock_accept()loop.sock_recv()等。同时,asyncio模块的事件循环基于Python 3.4中新增的selectors模块,使用操作系统底层提供的I/O multiplexing(参考: http://www.madmalls.com/blog/post/io-models/#23-io-multiplexing )来同时监听多个socket上的 可读/可写 事件,从而实现了异步网络I/O(参考: http://www.madmalls.com/blog/post/concurrent-programming-for-python/#4-io

2. Event Loop

Event Loop - wiki

event loop

asyncio实现了两种事件循环对象:

  • asyncio.SelectorEventLoop: (默认使用)基于Python 3.4中添加的selectors模块,它会根据OS自动选择最优的I/O multiplexing接口,比如在Linux中会使用epoll,在BSD中使用Kqueue
  • asyncio.ProactorEventLoop: 只能用于Windows系统,使用IOCP(I/O Completion Ports),参考MSDN documentation on I/O Completion Ports

asyncio event loop UML

2.1 获取事件循环

使用asyncio.get_event_loop()获取当前的事件循环

# For Linux
In [1]: import asyncio

In [2]: loop = asyncio.get_event_loop()

In [3]: loop
Out[3]: <_UnixSelectorEventLoop running=False closed=False debug=False>
# For Windows
In [1]: import asyncio

In [2]: loop = asyncio.get_event_loop()

In [3]: loop
Out[3]: <_WindowsSelectorEventLoop running=False closed=False debug=False>

2.2 设置事件循环

使用asyncio.set_event_loop(loop)设置事件循环,Windows中默认使用asyncio.windows_events._WindowsSelectorEventLoop,只支持sockets,不支持Pipes和subprocesses,我们可以更换为asyncio.windows_events.ProactorEventLoop,更详细的区别参考: https://docs.python.org/3/library/asyncio-eventloops.html#windows

In [1]: import sys

In [2]: import asyncio

In [3]: asyncio.get_event_loop()
Out[3]: <_WindowsSelectorEventLoop running=False closed=False debug=False>

In [4]: if sys.platform == 'win32':
   ...:     loop = asyncio.ProactorEventLoop()
   ...:     asyncio.set_event_loop(loop)
   ...:

In [5]: asyncio.get_event_loop()
Out[5]: <ProactorEventLoop running=False closed=False debug=False>

设置asyncio.SelectorEventLoop使用的I/O multiplexing接口:

在Mac OS 10.6、10.7和10.8中,默认的事件循环是asyncio.SelectorEventLoop,它会自动选择selectors.KqueueSelector,使用操作系统底层提供的Kquque接口实现I/O multiplexing,但是它不支持字符设备,可以更换为selectors.SelectSelector()

import asyncio
import selectors

selector = selectors.SelectSelector()
loop = asyncio.SelectorEventLoop(selector)
asyncio.set_event_loop(loop)

2.3 管理事件循环策略

  • asyncio.get_event_loop_policy(): 获取当前事件循环策略。Get the current event loop policy.
  • asyncio.set_event_loop_policy(policy): 更换事件循环策略。Set the current event loop policy. If policy is None, the default policy is restored.

可以替换asyncio中默认的事件循环策略,比如使用uvloop,它用Cython编写,基于libuvuvloop可以让asyncio更快,测试表明比tornadocuriogevent等快两倍,几乎接近于Go程序的速度

(1) 安装uvloop

# pip install uvloop

(2) 使用uvloop.EventLoopPolicy()

In [1]: import asyncio

In [2]: import uvloop

In [3]: asyncio.set_event_loop_policy(uvloop.EventLoopPolicy())

In [4]: asyncio.get_event_loop()
Out[4]: <uvloop.Loop running=False closed=False debug=False>

2.4 运行事件循环

事件循环需要启动才会不断循环监视事件并处理事件,常用方法:

  • AbstractEventLoop.run_until_complete(future): 运行事件循环,直到传入的 asyncio.Future 对象完成(如果传入协程,首先会自动将协程包装成Task对象,后面会细讲),返回Future对象的结果或抛出异常
  • AbstractEventLoop.run_forever(): 一直运行事件循环,直到调用 AbstractEventLoop.stop() 后才会停止
  • AbstractEventLoop.stop(): 停止正在运行的事件循环
  • AbstractEventLoop.close(): 关闭事件循环,The loop must not be running. Pending callbacks will be lost. This clears the queues and shuts down the executor, but does not wait for the executor to finish. 一旦关闭了事件循环后,就不能再调用 run_until_complete 等方法

2.5 基于内部时钟调度callback

asyncio的默认事件循环中有一个内部时钟,不同于time.time(),可以用AbstractEventLoop.time()获取当前的内部时间

(1) call_soon - 立即调用

运行AbstractEventLoop.call_soon(callback, *args, context=None)方法后,会立即将callback回调函数注册到事件循环上,如果其它地方也执行了call_soon()函数,将按照类似FIFO队列的方式依次执行对应的callback,当callback执行完成后,控制权交还给事件循环

call_soon()返回asyncio.Handle对象,可以调用该对象的cancel()方法取消执行callback

import time
import asyncio

def hello_world(loop):
    print('When calling hello_world in event loop is: {}'.format(loop.time()))
    print('Hello World')
    loop.stop()

loop = asyncio.get_event_loop()
print('The current time of time.time() is {}'.format(time.time()))
print('The current time in event loop is: {}'.format(loop.time()))

# Arrange for a callback to be called as soon as possible.
h = loop.call_soon(hello_world, loop)
print(type(h))
print(h)

# Blocking call interrupted by loop.stop()
loop.run_forever()
loop.close()


# Output:
The current time of time.time() is 1533901306.6052473
The current time in event loop is: 3821.5
<class 'asyncio.events.Handle'>
<Handle hello_world(<_WindowsSele...e debug=False>) at d:/python-code/test_asyncio.py:4>
When calling hello_world in event loop is: 3821.5
Hello World

call_soon()不是线程安全的,call_soon_threadsafe()方法跟call_soon功能类似,只不过它是线程安全的

(2) call_later - 延迟调用

运行AbstractEventLoop.call_later(delay, callback, *args, context=None)方法后,将在delay秒后执行callback

call_later()返回asyncio.TimeHandle对象,可以调用该对象的cancel()方法取消执行callback

import time
import asyncio

def hello_world(loop):
    print('When calling hello_world in event loop is: {}'.format(loop.time()))
    print('Hello World')
    loop.stop()

loop = asyncio.get_event_loop()
print('The current time of time.time() is {}'.format(time.time()))
print('The current time in event loop is: {}'.format(loop.time()))

# Arrange for the callback to be called after the given delay seconds (either an int or float).
h = loop.call_later(1, hello_world, loop)
print(type(h))
print(h)

# Blocking call interrupted by loop.stop()
loop.run_forever()
loop.close()


# Output:
The current time of time.time() is 1533902066.982436
The current time in event loop is: 4581.875
<class 'asyncio.events.TimerHandle'>
<TimerHandle when=4582.875 hello_world(<_WindowsSele...e debug=False>) at d:/python-code/test_asyncio.py:4>
When calling hello_world in event loop is: 4582.875
Hello World

(3) call_at - 具体时刻调用

运行AbstractEventLoop.call_at(when, callback, *args, context=None)方法后,将在When(可以是用int或float表示的asyncio内部时钟的timestamp)指定的时刻执行callback

call_at()返回asyncio.TimeHandle对象,可以调用该对象的cancel()方法取消执行callback

import time
import asyncio

def say_number(n, loop):
    print('Callback say_number output {} at {}'.format(n, loop.time()))

loop = asyncio.get_event_loop()
print('The current time of time.time() is {}'.format(time.time()))
print('The current time in event loop is: {}'.format(loop.time()))

# Arrange for the callback to be called at the given absolute timestamp when (an int or float)
loop.call_at(loop.time() + 1, say_number, 1, loop)  # 当前时间加1秒的时刻执行say_number(1, loop)
loop.call_at(loop.time() + 2, say_number, 2, loop)
loop.call_soon(say_number, 3, loop)  # 立即执行say_number(3, loop)
loop.call_later(5, lambda: loop.stop())  # 5秒执行匿名函数,函数体会关闭事件循环

# Blocking call interrupted by loop.stop()
loop.run_forever()
loop.close()


# Output:
The current time of time.time() is 1533903313.8902044
The current time in event loop is: 5828.781
Callback say_number output 3 at 5828.781
Callback say_number output 1 at 5829.781
Callback say_number output 2 at 5830.796

如果阅读源码,其实call_later(self, delay, callback, *args, context=None)里面是调用了self.call_at(self.time() + delay, callback, *args, context=context)

2.6 监视并处理Unix signals

使用AbstractEventLoop.add_signal_handler(signum, callback, *args)方法为Unix信号(比如,signum为2时表示SIGINT,signum为15时表示SIGTERM)注册处理函数,当事件循环接收到该Unix signal时,执行callback

# asyncio_signal_handler.py
import asyncio
import functools
import os
import signal

def ask_exit(signame):
    print("got signal %s: exit" % signame)
    loop.stop()

loop = asyncio.get_event_loop()
for signame in ('SIGINT', 'SIGTERM'):
    loop.add_signal_handler(getattr(signal, signame),
                            functools.partial(ask_exit, signame))

print("Event loop running forever, press Ctrl+C to interrupt.")
print("pid %s: send SIGINT or SIGTERM to exit." % os.getpid())
try:
    loop.run_forever()
finally:
    loop.close()

首先在Linux一个会话中,执行python3 asyncio_signal_handler.py,会一直运行:

[root@CentOS IO_models]# python3 asyncio_signal_handler.py 
Event loop running forever, press Ctrl+C to interrupt.
pid 20009: send SIGINT or SIGTERM to exit.

Ctrl+C键可以终止它,也可以给该进程发送SIGINTSIGTERM信号。先另启一个会话,查找脚本运行的进程PID:

[root@CentOS ~]# ps -ef | grep 'asyncio' | grep -v 'grep'
root     20009 19890  0 19:51 pts/0    00:00:00 python3 asyncio_signal_handler.py

再发送Unix signal即可:

[root@CentOS ~]# kill -15 20009

2.7 监视文件描述符的读/写事件

  • AbstractEventLoop.add_reader(fd, callback, *args): 开始监视fd上是否有可读事件,如果可读事件发生,执行callback
  • AbstractEventLoop.add_writer(fd, callback, *args): 开始监视fd上是否有可写事件,如果可写事件发生,执行callback
  • AbstractEventLoop.remove_reader(fd): 停止监视fd上的可读事件
  • AbstractEventLoop.remove_writer(fd): 停止监视fd上的可写事件
import asyncio
from socket import socketpair

# Create a pair of connected file descriptors
rsock, wsock = socketpair()
loop = asyncio.get_event_loop()

def reader():
    '''当rsock有可读事件时,执行该函数收取数据'''
    data = rsock.recv(100)
    print("Received:", data.decode())
    # We are done: unregister the file descriptor
    loop.remove_reader(rsock)
    # Stop the event loop
    loop.stop()

# Register the file descriptor for read event
loop.add_reader(rsock, reader)

# Simulate the reception of data from the network
loop.call_soon(wsock.send, 'abc'.encode())

# Run the event loop
loop.run_forever()

# We are done, close sockets and the event loop
rsock.close()
wsock.close()
loop.close()

3. Futures

Future对象是一个"占位符",用来保存尚未完成的操作的结果(或异常)。在asyncio模块中,所有需要异步完成的操作都对应一个Future对象,当操作完成时,将数据设置为Future对象的结果

class Future:
    ...
    _asyncio_future_blocking = False
    ...
    def __await__(self):
        if not self.done():
            self._asyncio_future_blocking = True
            yield self  # This tells Task to wait for completion.
        if not self.done():
            raise RuntimeError("await wasn't used with future")
        return self.result()  # May raise too.

    __iter__ = __await__  # make compatible with 'yield from'.

常用的方法:

  • future = asyncio.Future()future = loop.create_future(): 创建Future对象
  • future.set_result(result): 将Future对象标记为已完成,并设置它的结果
  • future.done(): 此方法不会阻塞,调用后立即返回布尔值,指明Future对象是否已经运行完成
  • future.add_done_callback(callback, *, context=None)Future对象运行结束后会调用callback
  • future.result(): 此方法没有参数,因此不能指定超时时间。如果调用此方法时Future对象还没运行完成,那么此方法不会阻塞用户进程去等待结果,而是抛出 asyncio.InvalidStateError 异常
import asyncio

def slow_operation(future):
    future.set_result('Future is done!')

def got_result(future):
    print('Result: {}'.format(future.result()))
    loop.stop()

loop = asyncio.get_event_loop()
future = asyncio.Future()  # 创建Future对象,或者: future = loop.create_future()
loop.call_later(3, slow_operation, future)  # 模拟3秒后,Future对象完成了
future.add_done_callback(got_result)  # Future对象一旦完成,就会执行callback
try:
    loop.run_forever()
finally:
    loop.close()

# Output:
Result: Future is done!  # 3秒后输出结果

4. Coroutines, Tasks

asyncio模块是在Python 3.4首次加入标准库的,那个时候还只有基于生成器的协程,如果你的Python版本刚好是Python 3.4的话,需要按如下方式使用:

import asyncio

@asyncio.coroutine
def compute(x, y):
    print("Compute %s + %s ..." % (x, y))
    yield from asyncio.sleep(1.0)  # 遇到异步的延迟操作时,协程暂停运行,且将控制权交还给事件循环
    return x + y

@asyncio.coroutine
def print_sum(x, y):
    result = yield from compute(x, y)
    print("%s + %s = %s" % (x, y, result))

loop = asyncio.get_event_loop()
coro1 = print_sum(1, 2)
coro2 = print_sum(3, 4)
loop.run_until_complete(asyncio.gather(coro1, coro2))
loop.close()

# Output:
Compute 1 + 2 ...  # 立即输出这两行
Compute 3 + 4 ...  # 立即输出这两行
1 + 2 = 3          # 1秒后,同时输出计算结果
3 + 4 = 7          # 1秒后,同时输出计算结果

Python 3.5增加了新语法async defawait,称为原生协程(native coroutine),请参考上一篇博客 Python3爬虫系列07 (理论) - 协程 。本博客后续内容都只使用async def

import asyncio

async def compute(x, y):
    print("Compute %s + %s ..." % (x, y))
    await asyncio.sleep(1.0)  # 遇到异步的延迟操作时,协程暂停运行,且将控制权交还给事件循环
    return x + y

async def print_sum(x, y):
    result = await compute(x, y)
    print("%s + %s = %s" % (x, y, result))

loop = asyncio.get_event_loop()
coro1 = print_sum(1, 2)
coro2 = print_sum(3, 4)
loop.run_until_complete(asyncio.gather(coro1, coro2))
loop.close()

4.1 协程能干嘛

协程(coroutine)可以在执行期间暂停(suspend),这样就可以在等待外部数据处理完成之后(例如,等待网络I/O数据),从之前暂停的地方恢复执行(resume)

  1. result = await coroutine or result = yield from coroutine: 在协程里面使用awaityield from获取另一个协程返回值或异常
  2. result = await future or result = yield from future: 在协程里面使用awaityield from等待一个Future对象运行结束,并获取它的返回值或异常
  3. return expression: 在协程里面使用return语句,返回值传递给另一个使用了awaityield from协程(等待结果)
  4. raise exception: 在协程里面使用raise语句,异常抛给另一个使用了awaityield from协程(等待结果)

4.2 Task驱动协程的运行

asyncio模块中,我们不会手动调用.send()方法来驱动协程的运行,而是创建一个Task对象(它是Future的子类),它包装了协程对象,通过__step()方法自动驱动协程。当协程awaityiled from等待的数据准备好了,事件循环会通过callback调用Task中的__wakeup()方法重新唤醒该Task对象:

class Task(futures.Future):
    ...
    def __init__(self, coro, *, loop=None):
        ...
        self._loop.call_soon(self.__step, context=self._context)

     def __step(self, exc=None):
         ...
         # Call either coro.throw(exc) or coro.send(None).
         try:
             if exc is None:
                 result = coro.send(None)  # Task相当于调用方,激活我们自己写的协程(委派协程),然后到asyncio提供的协程中,比如sleep、sock_recv等,前进到return await future,继续进入Future类的__await__方法里,因为此时future对象没完成,所以会: 1.设置_asyncio_future_blocking为True 2.yield self,产出future对象本身并直接传给了Task,即这里的result是future对象
             else:
                 result = coro.throw(exc)
         except StopIteration as exc:  # 如果我们写的协程运行完成时
             super().set_result(exc.value)  # 设置Task对象的值为 exc.value,即我们的协程的返回值
         else:  # 如果我们的协程没结束,也没异常
             # result是future,它是Future类型,刚才设置了_asyncio_future_blocking为True,即blocking = True
             blocking = getattr(result, '_asyncio_future_blocking', None)
             if blocking:
                 ...
                 result._asyncio_future_blocking = False  # 先把标志位设置为False,不阻塞等待future完成
                 # 给事件循环中,添加一个回调事件: 如果Future完成,唤醒Task继续执行
                 result.add_done_callback(self.__wakeup, context=self._context)
                 ...

    def __wakeup(self, future):
        try:
            # Task被重新唤醒之后,尝试去获取future的结果
            future.result()
        except Exception as exc:
            # This may also be a cancellation.
            self.__step(exc)
        else:
            # 如果没抛出异常,就再执行__step(),再次发送send(None)后,我们写的协程运行结果,抛出StopIteration异常,上面的代码会捕获异常并设置Task的结果
            self.__step()
        self = None  # Needed to break cycles when an exception occurs.

注意: CoroutinesTasks只有在事件循环已经运行的情况下,才会启动

5. 剖析asyncio内部运行原理

import asyncio

async def hi(name, t, loop):
    print('Task {}: say "hi" at {}'.format(name, loop.time()))
    await asyncio.sleep(t)
    print('Task {}: say "hello" at {}'.format(name, loop.time()))

loop = asyncio.get_event_loop()
loop.run_until_complete(asyncio.gather(
    hi('A', 3, loop), hi('B', 5, loop)
))
loop.close()

# Output:
Task A: say "hi" at 27737.328
Task B: say "hi" at 27737.328
Task A: say "hello" at 27740.328  # 3秒后,Task A完成
Task B: say "hello" at 27742.328  # 又过去2秒,Task B也完成,总耗时5秒,如果是同步模式则要8秒

asyncio.sleep()asyncio模块中实现的协程

async def sleep(delay, result=None, *, loop=None):
    """Coroutine that completes after a given time (in seconds)."""
    if delay <= 0:
        await __sleep0()
        return result

    if loop is None:
        loop = events.get_event_loop()
    future = loop.create_future()  # 创建Future对象
    h = loop.call_later(delay,
                        futures._set_result_unless_cancelled,
                        future, result)  # 向事件循环中添加事件,delay秒后,设置future的结果为result,默认是None
    try:
        return await future  # 会暂停sleep协程,直到future返回结果。await future会执行Future.__await__方法
    finally:
        h.cancel()

asyncio howto

事件循环运行后,会创建各个Task,自动调用.send(None)方法驱动协程的运行,对应图片中红色线条的部分。当耗时的异步操作完成时,事件循环通过callback唤醒Task,继续执行协程的下一步,又发送.send(None),对应图片中黑色线条的部分

上面的示例中,使用了asyncio.sleep()只是为了模拟耗时操作,实际开发中,asyncio模块要实现异步网络I/O,提供了sock_recvsock_sendall非阻塞版的协程函数

async def sock_recv(self, sock, n):
    """Receive data from the socket.

    The return value is a bytes object representing the data received.
    The maximum amount of data to be received at once is specified by
    nbytes.
    """
    if self._debug and sock.gettimeout() != 0:
        raise ValueError("the socket must be non-blocking")
    fut = self.create_future()  # 创建Future对象
    self._sock_recv(fut, None, sock, n)  # 通过selectors模块,监视多个socket上的可读事件
    return await fut

def _sock_recv(self, fut, registered_fd, sock, n):
    # _sock_recv() can add itself as an I/O callback if the operation can't
    # be done immediately. Don't use it directly, call sock_recv().
    if registered_fd is not None:
        # Remove the callback early.  It should be rare that the
        # selector says the fd is ready but the call still returns
        # EAGAIN, and I am willing to take a hit in that case in
        # order to simplify the common case.
        self.remove_reader(registered_fd)
    if fut.cancelled():
        return
    try:
        data = sock.recv(n)
    except (BlockingIOError, InterruptedError):
        fd = sock.fileno()
        self.add_reader(fd, self._sock_recv, fut, fd, sock, n)  # 监视socket上的可读事件,参考章节2.7
    except Exception as exc:
        fut.set_exception(exc)
    else:
        fut.set_result(data)  # 如果data = sock.recv(n)没有异常说明数据准备好了,设置Future对象的结果

其实,跟asyncio.sleep()类似,也是会创建Future对象,当数据准备好了以后,再设置Future对象的结果

总结:

  1. asyncio只需要单线程就能实现并发,每个线程中只能有一个事件循环事件循环使用协作式调度(cooperative scheduling),一次只运行一个Task(如果你在其它线程中也启动了事件循环,则可以并行运行多个Task)。当它启动后,会依次从任务队列获取Task并执行它
  2. 每个Task会自动执行协程的下一步,即发送result = coro.send(None)
  3. 如果协程通过awaityield from调用了其它协程,则当前协程暂停(suspend)并进行协程的上下文切换(保存当前协程的variables、state等,然后导入其它协程的上下文 )
  4. 如果其它协程中也使用了awaityield from调用了其它协程,则重复步骤3,直到某个协程中调用了await futureyield from future,则执行Future对象的__await____iter__方法,通过yield self直接返回Future对象本身给Task中的result并把控制权交还事件循环Task会向事件循环注册事件: 当Future对象完成时,请执行我的__wakeup()方法唤醒
  5. 事件循环重复步骤1,继续调度执行其它任务
  6. 当某个Future对象(代表耗时的异步操作,如网络I/O)运行完成后,会被设置结果。同时,事件循环唤醒对应的Task继续运行,又发送result = coro.send(None)让协程链上的各个协程依次恢复执行(resume)。此时,Future对象会将它的结果作为返回值通过StopIteration异常发送给等待它的协程,协程链上的各个协程依次返回值(可能会再处理),最终Task会获取经过各协程处理后的数据,并设置为它的结果,至此,该Task运行结束

6. 牛刀小试

6.1 创建单个Task

不要直接实例化asyncio.Task,而应该:

  • asyncio.create_task(coro): (首选)Python 3.7新增的方法,返回一个Task对象
  • AbstractEventLoop.create_task(coro): 返回一个Task对象
  • asyncio.ensure_future(coro_or_future, *, loop=None): 将一个协程对象或awaitable对象包装成Future对象,返回一个Task对象。如果直接传入Future对象,将原样返回

6.2 聚合多个Task

AbstractEventLoop.run_until_complete(future)只能接受一个参数,如果直接输入协程对象,则会自动被包装成Task。但是,我们要实现并发,必须是多个Task,那么如何给run_until_complete()方法传入多个Task呢?

(1) asyncio.gather()

asyncio.gather(*coros_or_futures, loop=None, return_exceptions=False)会将多个协程对象或Future 对象聚合成一个 Future 对象:

import asyncio

async def hi(name, t, loop):
    print('Task {}: say "hi" at {}'.format(name, loop.time()))
    await asyncio.sleep(t)
    print('Task {}: say "hello" at {}'.format(name, loop.time()))

loop = asyncio.get_event_loop()
coro1 = hi('A', 1, loop)
coro2 = hi('B', 2, loop)
# 或者传入列表并解包:loop.run_until_complete(asyncio.gather(*[coro1, coro2]))
loop.run_until_complete(asyncio.gather(coro1, coro2))
loop.close()

(2) asyncio.wait()

asyncio.wait(fs, *, loop=None, timeout=None, return_when='ALL_COMPLETED')的参数fs是由Future 对象或协程对象构成的序列,不能为空序列。Wait for the Futures and coroutines given by fs to complete. 由于它本身是一个协程函数,因为调用它后将返回一个协程对象,所以可以像done, pending = await asyncio.wait(fs)这样使用,而返回值是已完成的和尚未完成的对象组成的元组(done, pending)

asyncio.wait()返回协程对象,所以可以传入run_until_complete()方法:

import asyncio

async def hi(name, t, loop):
    print('Task {}: say "hi" at {}'.format(name, loop.time()))
    await asyncio.sleep(t)
    print('Task {}: say "hello" at {}'.format(name, loop.time()))

loop = asyncio.get_event_loop()
coro1 = hi('A', 1, loop)
coro2 = hi('B', 2, loop)
loop.run_until_complete(asyncio.wait([coro1, coro2]))
loop.close()

Python 3.7增加了asyncio.run(coro, *, debug=False)方法,它跟run_until_complete()方法类似,也只接收一个参数,但是它会自动管理事件循环的启动与停止,所以可以简化为:

import asyncio

async def hi(name, t):
    print('Task {}: say "hi"'.format(name))
    await asyncio.sleep(t)
    print('Task {}: say "hello"'.format(name))

coro1 = hi('A', 1)
coro2 = hi('B', 2)
wait_coro = asyncio.wait([coro1, coro2])  # 调用asyncio.wait()方法,返回一个协程对象
asyncio.run(wait_coro)

7. TCP Server

import asyncio

async def handle_echo(reader, writer):
    data = await reader.read(100)
    message = data.decode()
    addr = writer.get_extra_info('peername')
    print("Received %r from %r" % (message, addr))

    print("Send: %r" % message)
    writer.write(data)
    await writer.drain()

    print("Close the client socket")
    writer.close()

loop = asyncio.get_event_loop()
coro = asyncio.start_server(handle_echo, '127.0.0.1', 8888, loop=loop)
server = loop.run_until_complete(coro)

# Serve requests until Ctrl+C is pressed
print('Serving on {}'.format(server.sockets[0].getsockname()))
try:
    loop.run_forever()
except KeyboardInterrupt:
    pass

# Close the server
server.close()
loop.run_until_complete(server.wait_closed())
loop.close()

8. TCP Client

import asyncio

async def tcp_echo_client(message, loop):
    reader, writer = await asyncio.open_connection('127.0.0.1', 8888,
                                                   loop=loop)

    print('Send: %r' % message)
    writer.write(message.encode())

    data = await reader.read(100)
    print('Received: %r' % data.decode())

    print('Close the socket')
    writer.close()

message = 'Hello World!'
loop = asyncio.get_event_loop()
loop.run_until_complete(tcp_echo_client(message, loop))
loop.close()

代码已上传到 https://github.com/wangy8961/python3-concurrency ,欢迎star

未经允许不得转载: LIFE & SHARE - 王颜公子 » Python3爬虫系列08 (理论) - 使用asyncio模块实现并发

分享

作者

作者头像

Madman

如果博文内容有误或其它任何问题,欢迎留言评论,我会尽快回复; 或者通过QQ、微信等联系我

0 条评论

暂时还没有评论.

发表评论前请先登录

专题