Python3爬虫系列01 (理论) - I/O Models 阻塞 非阻塞 同步 异步

  • 原创
  • Madman
  • /
  • 2018-10-01 14:42
  • /
  • 0
  • 758 次阅读

spider 01-min.png

Synopsis: Richard Stevens所著的《UNIX® Network Programming Volume 1, Third Edition: The Sockets Networking》一书中,6.2章节"IO Models"列出了五种IO模型,本文将详细介绍这几种IO模型,并说明阻塞(blocking)和非阻塞(non-blocking)的区别、同步(synchronous)和异步(asynchronous)的区别

代码已上传到 https://github.com/wangy8961/python3-concurrency/tree/master/io-models ,欢迎star

1. 预备知识

1.1 CPU Ring

对于操作系统而言,稳定且可靠地运行是最重要的。现行技术方案是将内核与用户进程、用户进程与用户进程之间进行分离,内核可以管理用户进程,但是用户进程之间不能互相干扰,更不能"侵入"内核,即使用户的程序崩溃了,内核也不会受影响。

为了提升计算机安全,避免恶意操作,CPU的硬件机制中一般将使用权划分为四个特权级别:

cpu ring

Ring 0级别最高,可以执行一切指令,包括像清空内存、磁盘I/O操作等特权指令(privilege instruction)和其它非特权指令,内核代码运行在这个模式下;Ring 3级别最低,只能执行非特权指令,用户进程运行在这个模式下。所以CPU模式(CPU models)可以划分为:

  • kernel mode,也叫内核态
  • user mode,也叫用户态

计算机开机启动后,首先会加载内核,由于占了先机,操作系统内核将自己设置为最高级别,而之后创建的用户进程都设置为最低级别。这样内核就能控制CPU、内存、磁盘等一切资源,而用户进程不能直接使用这些资源。例如,如果用户进程可以直接使用磁盘,就没必要在内核中实现一套文件系统的权限管理了

1.2 Kernel space vs. User space

不管是内核代码还是用户程序代码都需要加载到内存中,如果不对内存进行管理,就会出现用户代码之间、用户代码与内核之间出现被覆盖的情况,所以内核将内存划分成两部分:

  • 内核空间(kernel space): 内核代码的运行空间
  • 用户空间(user space): 用户应用程序代码的运行空间

kernel-space-vs-user-space

用户进程只能访问用户空间,而内核可以访问所有内存。因为内核已将用户进程设置为最低级别,它只能运行在CPU的Ring 3上,所以如果用户进程要进行磁盘I/O或网络I/O时,只能通过系统调用(system call)将请求发给内核,由内核代为执行相应的指令(CPU模式由用户态转成内核态),数据会先缓存到内核空间中,然后内核将数据从内核空间拷贝到用户空间中,之后用户进程才能继续处理数据(CPU模式由内核态转成用户态

system call

Linux System Call Table

1.3 blocking vs. non-blocking

  • 阻塞(blocking): 用户进程在等待某个操作完成期间,自身无法继续干别的事情,则称进程在该操作上是阻塞的。Blocking I/O means that the calling system does not return control to the caller until the operation is finished
  • 非阻塞(non-blocking): 用户进程在等待某个操作完成期间,自身可以继续干别的事情,则称进程在该操作上是非阻塞的。A non-blocking synchronous call returns control to the caller immediately. The caller is not made to wait, and the invoked system immediately returns one of two responses: If the call was executed and the results are ready, then the caller is told of that. Alternatively, the invoked system can tell the caller that the system has no resources (no data in the socket) to perform the requested action. In that case, it is the responsibility of the caller may repeat the call until it succeeds. For example, a read() operation on a socket in non-blocking mode may return the number of read bytes or a special return code -1 with errno set to EWOULBLOCK/EAGAIN, meaning "not ready; try again later."

1.4 Synchronous I/O vs. Asynchronous I/O

  • A synchronous I/O operation causes the requesting process to be blocked until that I/O operation completes.
  • An asynchronous I/O operation does not cause the requesting process to be blocked.

如果用户进程因为I/O操作而阻塞的话,那就是同步I/O,否则是异步I/O。后续要介绍的blocking I/Ononblocking I/OI/O multiplexingsignal driven I/O这四种I/O模型都是同步I/O,因为第二阶段中的真正I/O操作(比如,recvfrom)会阻塞用户进程,只有asynchronous I/O模型才是异步I/O

2. Unix中五种I/O模型

  • blocking I/O
  • nonblocking I/O
  • I/O multiplexing (select, poll, epoll, kqueque)
  • signal driven I/O (SIGIO)
  • asynchronous I/O (the POSIX aio_read functions)

例如,用户进程要读取网络数据或磁盘数据时(Input),数据会经过两个阶段:

  • 内核空间等待数据准备完成。Waiting for the data to be ready
  • 将数据从内核空间拷贝到用户空间中。Copying the data from the kernel to the process

网络套接字的输入操作第一步是等待网络数据包(对CPU来说耗时特别久),当数据包到达时,它先被复制到内核的缓冲区中,第二步是将这些数据从内核的缓冲区复制到我们的应用程序缓冲区中(速度快)。上述五种I/O模型的区别就在于I/O经历的两个阶段的不同上

2.1 blocking I/O

默认情况下,所有套接字都是阻塞的。下图中我们使用UDP的数据报套接字来说明网络I/O的两个阶段:

blocking io

首先是我们的用户进程运行(左边),当需要获取网络数据报(datagram)时,用户进程只能通过recvfrom系统调用将请求发给内核,然后在内核中运行(右边)

用户进程在两个阶段都是阻塞的,这期间不能做任何其它事情,直到数据被拷贝到用户空间(或发生错误,如系统调用被信号中断)后,我们的应用程序才能够继续处理数据报。即用户进程从调用recvfrom到有数据返回的整个时间内,进程都是被阻塞的,所以它是同步I/O

举例来说,如果要下载1000张图片,用阻塞I/O(blocking I/O)模型的话,必须依序下载,在等待第1张图片数据时,整个用户进程被阻塞,只有第1张图片数据到达内核,并复制到用户空间后,才能保存到本地磁盘,然后依次类推,下载其它图片

(1) 单进程TCP Server

如果Web服务器采用这种模式的话,那么一次只能为一个客户服务(注意:当服务器为这个客户服务的时候,只要服务器的listen队列还有空闲,那么当其它新的客户端发起连接后,服务器就会为新客户端建立连接,并且新客户端也可以发送数据,但服务器还不会处理。只有当第1个客户关闭连接后,服务器才会一次性将第2个客户发送的所有数据接收完,并继续只为第2个客户服务,依次类推):

#!/usr/bin/env python3
# -*- coding: utf-8 -*-
# TCP Echo Server,单进程,阻塞 blocking I/O
import socket


# 创建监听socket
server_sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)

# socket默认不支持地址复用,OSError: [Errno 98] Address already in use
server_sock.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)

# 绑定IP地址和固定端口
server_address = ('', 9090)
print('TCP Server starting up on port {}'.format(server_address[1]))
server_sock.bind(server_address)

# socket默认是主动连接,调用listen()函数将socket变为被动连接,这样就可以接收客户端连接了
server_sock.listen(5)

try:
    while True:
        print('Main Process, waiting for client connection...')

        # client_sock是专为这个客户端服务的socket,client_addr是包含客户端IP和端口的元组
        client_sock, client_addr = server_sock.accept()
        print('Client {} is connected'.format(client_addr))

        try:
            while True:
                # 接收客户端发来的数据,阻塞,直到有数据到来
                # 事实上,除非当前客户端关闭后,才会跳转到外层的while循环,即一次只能服务一个客户
                # 如果客户端关闭了连接,data是空字符串
                data = client_sock.recv(4096)
                if data:
                    print('Received {}({} bytes) from {}'.format(data, len(data), client_addr))
                    # 返回响应数据,将客户端发送来的数据原样返回
                    client_sock.send(data)
                    print('Sent {} to {}'.format(data, client_addr))
                else:
                    print('Client {} is closed'.format(client_addr))
                    break
        finally:
            # 关闭为这个客户端服务的socket
            client_sock.close()
finally:
    # 关闭监听socket,不再响应其它客户端连接
    server_sock.close()

TCP客户端测试代码如下,可以观察如果指定50个客户端时,服务端输出结果和客户端输出结果:

#!/usr/bin/env python3
# -*- coding: utf-8 -*-
import time
from datetime import datetime
import socket


server_ip = input('Please enter the TCP server ip: ')
server_port = int(input('Enter the TCP server port: '))
client_num = int(input('Enter the TCP clients count: '))

# 保存所有已成功连接的客户端TCP socket
client_socks = []

for i in range(client_num):
    sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
    sock.connect((server_ip, server_port))
    client_socks.append(sock)
    print('Client {}[ID: {}] has connected to {}'.format(sock, i, (server_ip, server_port)))

while True:
    for s in client_socks:
        data = str(datetime.now()).encode('utf-8')
        s.send(data)
        print('Client {} has sent {} to {}'.format(s, data, (server_ip, server_port)))
    # 睡眠3秒后,继续让每个客户端连接向TCP Server发送数据
    time.sleep(3)

Windows平台也可以下载 Hercules SETUP utility,先打开一个Hercules使用TCP Client去连接服务器,如果再打开一个Hercules,可以发现,也能够连接且可以发送数据,但服务器不会处理数据也就不会返回(此时,在Linux服务器上watch -n 1 'netstat|grep tcp查看TCP连接的状态有很多SYN_RECV

(2) 多进程TCP Server

由于上面单进程版本中,client_sock.recv(4096)会一直阻塞,所以实际上并不能跳转到外层while循环中去为其它新的客户端创建socket,只能一次为一个客户服务。这根本满足不了实际应用需要,为了实现并发处理多个客户端请求,可以使用多进程,应用程序的主进程只负责为每一个新的客户端连接创建socket,然后为每个客户创建一个子进程,用来分别处理每个客户的数据:

#!/usr/bin/env python3
# -*- coding: utf-8 -*-
# TCP Echo Server,多进程,阻塞 blocking I/O
import os
import socket
from multiprocessing import Process


def client_handler(client_sock, client_addr):
    '''接收各个客户端发来的数据,并原样返回'''
    try:
        while True:
            # 接收客户端发来的数据,阻塞,直到有数据到来
            # 如果客户端关闭了连接,data是空字符串
            data = client_sock.recv(4096)
            if data:
                print('Child Process [PID: {}], received {}({} bytes) from {}'.format(os.getpid(), data, len(data), client_addr))
                # 返回响应数据,将客户端发送来的数据原样返回
                client_sock.send(data)
                print('Child Process [PID: {}], sent {} to {}'.format(os.getpid(), data, client_addr))
            else:
                print('Child Process [PID: {}], client {} is closed'.format(os.getpid(), client_addr))
                break
    except:
        # 如果客户端强制关闭连接,会报异常: ConnectionResetError: [Errno 104] Connection reset by peer
        pass
    finally:
        # 关闭为这个客户端服务的socket
        client_sock.close()


# 创建监听socket
server_sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)

# socket默认不支持地址复用,OSError: [Errno 98] Address already in use
server_sock.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)

# 绑定IP地址和固定端口
server_address = ('', 9090)
print('TCP Server starting up on port {}'.format(server_address[1]))
server_sock.bind(server_address)

# socket默认是主动连接,调用listen()函数将socket变为被动连接,这样就可以接收客户端连接了
server_sock.listen(5)

try:
    while True:
        print('Main Process [PID: {}], waiting for client connection...'.format(os.getpid()))

        # 主进程只用来负责监听新的客户连接
        # client_sock是专为这个客户端服务的socket,client_addr是包含客户端IP和端口的元组
        client_sock, client_addr = server_sock.accept()
        print('Main Process [PID: {}], client {} is connected'.format(os.getpid(), client_addr))

        # 为每个新的客户连接创建一个子进程,用来处理客户数据
        client = Process(target=client_handler, args=(client_sock, client_addr))
        client.start()
        # 子进程已经复制了一份client_sock,所以主进程中可以关闭此client_sock
        client_sock.close()
finally:
    # 关闭监听socket,不再响应其它客户端连接
    server_sock.close()

(3) 多线程TCP Server

上面多进程版本的问题在于,为每个客户端连接都分别创建一个进程,如果同时有10000个客户连接,操作系统不可能创建10000个进程,那样系统开销会非常大,内存会被耗尽,导致系统崩溃。就算没有崩溃,使用了虚拟内存,那么性能将急剧下降。同时,这么多个进程,CPU进行进程间切换(上下文切换)的代价也无比巨大,最终的结果就是大部分时间都花在进程切换上了,而为客户提供服务的时间几乎没有

虽然可以使用进程池concurrent.futures.ProcessPoolExecutor创建固定数量的进程,一旦有客户端关闭了连接后,对应的进程就可以重新为下一个新的客户连接服务,但是多进程间的上下文切换的代价还是太大

多线程版本比多进程版本的系统开销小几个数量级,操作系统可以同时开启更多的线程,而线程间的调度切换比多进程也小很多:

#!/usr/bin/env python3
# -*- coding: utf-8 -*-
# TCP Echo Server,多线程,阻塞 blocking I/O
import socket
import threading


def client_handler(client_sock, client_addr):
    '''接收各个客户端发来的数据,并原样返回'''
    try:
        while True:
            # 接收客户端发来的数据,阻塞,直到有数据到来
            # 如果客户端关闭了连接,data是空字符串
            data = client_sock.recv(4096)
            if data:
                print('Child Thread [{}], received {}({} bytes) from {}'.format(threading.current_thread().name, data, len(data), client_addr))
                # 返回响应数据,将客户端发送来的数据原样返回
                client_sock.send(data)
                print('Child Thread [{}], sent {} to {}'.format(threading.current_thread().name, data, client_addr))
            else:
                print('Child Thread [{}], client {} is closed'.format(threading.current_thread().name, client_addr))
                break
    except:
        # 如果客户端强制关闭连接,会报异常: ConnectionResetError: [Errno 104] Connection reset by peer
        pass
    finally:
        # 关闭为这个客户端服务的socket
        client_sock.close()


# 创建监听socket
server_sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)

# socket默认不支持地址复用,OSError: [Errno 98] Address already in use
server_sock.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)

# 绑定IP地址和固定端口
server_address = ('', 9090)
print('TCP Server starting up on port {}'.format(server_address[1]))
server_sock.bind(server_address)

# socket默认是主动连接,调用listen()函数将socket变为被动连接,这样就可以接收客户端连接了
server_sock.listen(5)

try:
    while True:
        print('Main Thread [{}], waiting for client connection...'.format(threading.current_thread().name))

        # 主进程只用来负责监听新的客户连接
        # client_sock是专为这个客户端服务的socket,client_addr是包含客户端IP和端口的元组
        client_sock, client_addr = server_sock.accept()
        print('Main Thread [{}], client {} is connected'.format(threading.current_thread().name, client_addr))

        # 为每个新的客户连接创建一个线程,用来处理客户数据
        client = threading.Thread(target=client_handler, args=(client_sock, client_addr))
        client.start()

        # 因为主线程与子线程共享client_sock,所以在主线程中不能关闭client_sock
        # client_sock.close()
finally:
    # 关闭监听socket,不再响应其它客户端连接
    server_sock.close()

也可以使用线程池concurrent.futures.ThreadPoolExecutor实现

2.2 nonblocking I/O

当我们将socket设置为nonblocking时,相当于告诉内核:when an I/O operation that I request cannot be completed without putting the process to sleep, do not put the process to sleep, but return an error instead.

下图中,前三次系统调用时,数据报还没有到达,所以内核立即返回一个叫EWOULDBLOCK的错误。第四次系统调用时,一个数据报已经到达,所以内核不会立即返回,当数据报从内核空间复制到用户空间后,recvfrom系统调用返回成功信息给用户进程,然后用户进程就能处理数据了:

nonblocking io

像这种在 nonblocking socket 上重复调用recvfrom被称为轮询(polling)用户进程不断轮询内核以查看某些操作是否已准备就绪(忙等待,busy-waiting),这通常很浪费CPU时间(一般网络I/O的第一阶段数据到达内核前会非常慢,如果用户进程频繁且不必要的向内核发起系统调用,CPU就会不断地在用户态内核态之间切换,即用户进程不断地睡眠、被唤醒,这将极大浪费CPU资源)。如果有1000个socket,就算平均每个socket你第4次系统调用时告知数据到达,你也要进行4000次系统调用,太恐怖了,所以一般不会使用这种I/O模型

#!/usr/bin/env python3
# -*- coding: utf-8 -*-
# TCP Echo Server,单进程,非阻塞 nonblocking I/O
import socket


# 用来保存所有已成功连接的客户端,每个列表元素是client_sock和client_addr组成的元组
clients = []

# 创建监听socket
server_sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)

# socket默认不支持地址复用,OSError: [Errno 98] Address already in use
server_sock.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)

# 绑定IP地址和固定端口
server_address = ('', 9090)
print('TCP Server starting up on port {}'.format(server_address[1]))
server_sock.bind(server_address)

# socket默认是主动连接,调用listen()函数将socket变为被动连接,这样就可以接收客户端连接了
server_sock.listen(5)

# 将监听用的server_sock设置为非阻塞
server_sock.setblocking(False)

print('Main Process, waiting for client connection...')

try:
    while True:
        try:
            # client_sock是专为这个客户端服务的socket,client_addr是包含客户端IP和端口的元组
            client_sock, client_addr = server_sock.accept()
        except:
            # server_sock设置为非堵塞后,如果accept时,恰巧没有客户端connect,那么accept会产生一个异常
            pass
        else:
            print('Client {} is connected'.format(client_addr))
            # 将新的客户端连接socket也设置为非阻塞
            client_sock.setblocking(False)
            # 添加到client_socks列表中
            clients.append((client_sock, client_addr))

        # 循环处理每个客户端连接
        for client_sock, client_addr in clients:
            try:
                data = client_sock.recv(4096)
                if data:
                    print('Received {}({} bytes) from {}'.format(data, len(data), client_addr))
                    # 返回响应数据,将客户端发送来的数据原样返回
                    client_sock.send(data)
                    print('Sent {} to {}'.format(data, client_addr))
                else:
                    print('Client {} is closed'.format(client_addr))
                    # 关闭为这个客户端服务的socket
                    client_sock.close()
                    # 从列表中删除
                    clients.remove((client_sock, client_addr))
            except:
                # client_sock设置为非堵塞后,如果recv时,恰巧客户端没有发送数据过来,将会产生一个异常
                pass
finally:
    # 关闭监听socket,不再响应其它客户端连接
    server_sock.close()

上面的代码中,while循环先在监听socket上accept(),如果有数据表示此时有新的客户端连接,不管怎样都要进行一次系统调用。然后,for循环依次遍历各个客户连接socket,查看是否有哪个客户发来数据,有多少个客户就发起多少次系统调用。所以,这种I/O模型非常浪费CPU时间

第一阶段不会阻塞,但数据到达内核空间后,第二阶段从内核空间复制到用户空间时会阻塞,所以它是同步I/O

2.3 I/O multiplexing

blocking I/O使用多线程技术,当并发客户端达到千万级时,操作系统不可能同时创建这么多线程。nonblocking I/O中用户进程不断polling(轮询)内核,浪费CPU资源。

每个网络socket在类Unix操作系统中都是一个文件描述符(FD, file descriptor),如果操作系统能提供一种机制,只需要单个线程就可以同时监视多个文件描述符,当一个或多个描述符就绪(一般是读就绪或写就绪)时,就通知应用程序进行相应的读写操作的话,那么效率将大幅提升

于是,现代操作系统底层陆续实现了各自的I/O多路复用(I/O multiplexing)机制,multiplexing是指在单个线程中调用一次select/poll/epoll等函数,可以同时监视多个文件描述符上的可读取事件或可写入事件,一旦这些事件发生,内核通知用户进程,然后用户进程调用recvfrom等函数进行读写(真正的I/O操作),这样就能实现在单个线程中对多个文件描述符进行并发读写(I/O multiplexing也叫event driven I/O - 事件驱动I/O),Multiplexing Wiki: https://en.wikipedia.org/wiki/Multiplexing,可以参考电气工程中的 "时分复用" 图进行理解:

mux demux

下图中,第一阶段阻塞在select系统调用上,第二阶段阻塞在recvfrom系统调用上,所以它是同步I/O

io multiplexing

(1) select

I/O多路复用概念被提出以后,1983年左右,在BSD里面最早实现了select,目前几乎所有的操作系统都提供了select函数,具体实现可以参考:http://man7.org/linux/man-pages/man2/select.2.html 和 《UNIX® Network Programming Volume 1, Third Edition: The Sockets Networking》的6.8章节示例

select的缺点:

  • 能够监视的文件描述符最大数量有限制,在Linux中默认是1024个,定义在FD_SETSIZE
  • 当一个或多个文件描述就绪后,select函数返回已就绪的文件描述符的数目(超时返回0,出错返回-1),用户并不知道哪些描述符可读、哪些描述符可写,还要通过FD_ISSET()宏去轮询所有的文件描述符(假设描述符3可读,内核会修改fd_set *readset中描述符3对应的位bit为1,那么FD_ISSET(3, &readset)会返回1,表示描述符3可读),才能知道哪个描述符可读写。如果是最后一个文件描述符就绪,也会从头开始线性遍历至结尾,时间复杂度为O(n),这样会很浪费CPU资源
  • 因为内核会修改文件描述符集合的位bit,所以用户进程每次调用select函数,都要重新复制初始化过的文件描述符集合给内核,当描述符数据量大时,效率低

(2) poll

http://man7.org/linux/man-pages/man2/poll.2.html

因为它使用链表数据结构pollfd表示待监视的文件描述符,所以没有1024个最大文件描述符的限制。但是,poll返回后,还是要通过轮询所有文件描述符才能获取哪些描述符可读、哪些描述符可写,时间复杂度为O(n)。随着文件描述符的增加,性能会线性下降

(3) epoll

http://man7.org/linux/man-pages/man7/epoll.7.html

selectpoll都有一个缺点,需要将要监视的文件描述符集合在用户空间内核空间之间来回复制,当有描述符就绪后,又需要遍历整个描述符集合才能得知哪些可读写,当描述符很多时,性能就越差。epoll于Linux 2.6内核开始引入,是为了处理大批量文件描述符而作了改进的poll(只支持Linux)

epoll的相关函数:

  1. int epoll_create(int size): 创建一个epoll实例,返回代表这个实例的文件描述符。自从Linux 2.6.8之后,size参数被忽略
  2. int epoll_ctl(int epfd, int op, int fd, struct epoll_event *event)epoll事件注册函数,它可以动态修改要监视的文件描述符及对应的事件。第一个参数是epoll_create()的返回值;第二个参数表示注册处理动作,用三个宏来表示:EPOLL_CTL_ADD(注册新的fd到epfd中)、EPOLL_CTL_MOD(修改已经注册的fd的监听事件)、EPOLL_CTL_DEL(从epfd中删除一个fd);第三个参数是需要监听的fd;第四个参数是告诉内核需要监听什么事,比如EPOLLIN(可读)、EPOLLOUT(可写)
  3. int epoll_wait(int epfd, struct epoll_event *events, int maxevents, int timeout): 用户进程调用epoll_wait()函数后会被阻塞,直到要监听的I/O事件发生,返回就绪的文件描述符数目(超时返回0)。这个步骤相当于调用selectpoll,但是不需要传递描述符集合给内核,因为内核已经在步骤2中通过epoll_ctl()拿到了要监视的描述符列表。epoll采用基于事件的就绪通知方式,一旦某个文件描述符就绪时,内核会采用类似callback的回调机制,将就绪的文件描述符放到就绪链表里面,当用户进程调用epoll_wait()时便得到通知。另外,epoll使用了Linux的mmap(内存映射技术),内核不需要复制就绪链表给用户空间,用户进程可以直接读取就绪链表中的文件描述符

epoll的优点:

  • 没有最大文件描述符的限制,1GB内存大约支持10万左右的连接数,可运行cat /proc/sys/fs/file-max进行查看
  • I/O效率不随fd数目增加而线性下降,比如有10万个连接,由于长连接和网络传输延时等原因,同一时刻可能只有少部分是"活跃"的(有数据可读写),如果采用selectpoll,每次都从头至尾线性扫描全部描述符集合。而epoll只关心"活跃"的连接,而跟连接总数无关(因为内核只会为就绪的描述符调用callback),时间复杂度为O(1)
  • 使用mmap技术,加速内核与用户空间的消息传递,调用epoll_create后,内核就已经在内核态开始准备帮你存储要监控的文件描述符了,每次调用epoll_ctl只是在往内核的数据结构里塞入新的描述符

(4) select vs. poll vs. epoll

select poll epoll
最大连接数 1024(x86)或 2048(x64) 无上限 无上限
存储fds 数组 链表 红黑树
传递fds 每次调用select,都需要将fds在用户空间和内核空间复制2次 每次调用poll,都需要将fds在用户空间和内核空间复制2次 调用epoll_ctl时注册fd到内核中红黑树上,之后每次epoll_wait无需复制fds
获取就绪的描述符 遍历 遍历 回调函数
I/O效率 每次调用select都进行线性遍历,时间复杂度为O(n) 每次调用poll都进行线性遍历,时间复杂度为O(n) 基于事件通知方式,每当fd就绪,内核注册的回调函数就会被调用,将就绪fd放到就绪链表里,时间复杂度为O(1)

(5) Python 3.4+ selectors模块

Python select模块提供了select/poll/epoll/kqueue等函数,它是底层模块,用户可以更精细地自主选择使用哪个I/O multiplexing接口。而Python 3.4版本加入了selectors高级模块,它基于select,但提供了统一的接口,会自动选择当前操作系统中最优化的I/O多路复用接口,比如在Linux系统中,它会优先使用epoll,在BSD中使用kqueque

#!/usr/bin/env python3
# -*- coding: utf-8 -*-
import selectors
import socket

# 自动选择当前OS中最优的I/O multiplexing接口,Linux中会使用selectors.EpollSelector
sel = selectors.DefaultSelector()

def accept(sock, mask):
    '''监听套接字创建新的客户端连接'''
    conn, addr = sock.accept()  # Should be ready
    print('accepted', conn, 'from', addr)
    conn.setblocking(False)
    sel.register(conn, selectors.EVENT_READ, read)  # 将新的客户端socket注册到epoll实例上,并监听读事件

def read(conn, mask):
    '''接收客户端数据,并原样返回'''
    data = conn.recv(1000)  # Should be ready
    if data:
        print('echoing', repr(data), 'to', conn)
        conn.send(data)  # Hope it won't block
    else:
        print('closing', conn)
        sel.unregister(conn)
        conn.close()

sock = socket.socket()
sock.bind(('', 9090))
sock.listen(100)
sock.setblocking(False)
sel.register(sock, selectors.EVENT_READ, accept)

while True:
    events = sel.select()
    for key, mask in events:
        callback = key.data
        callback(key.fileobj, mask)

2.4 signal driven I/O

我们也可以使用信号,告诉内核在描述符就绪时用SIGIO信号通知我们,我们称之为信号驱动的I/O,如下图所示:

signal driven io

应用程序首先创建socket,并提供信号处理器(signal handler),然后发起sigaction系统调用,该调用会立即返回,用户进程不会被阻塞,可以继续往下执行。当socket准备就绪可以读取数据时,内核产生SIGIO信号通知signal handler去调用recvfrom,当数据被复制到用户空间后,告诉用户进程主循环,你要的数据已经准备好你可以直接处理了。或者,signal handler只是告诉用户进程主循环数据到达内核空间了,你可以自己调用recvfrom随时去复制

总之,此模型的优势在于第一阶段不会被阻塞(等待数据到达内核空间),The main loop can continue executing and just wait to be notified by the signal handler that either the data is ready to process or the datagram is ready to be read.

第一阶段不会阻塞,第二阶段调用recvfrom会阻塞,所以是同步I/O

2.5 asynchronous I/O

asynchronous I/O是指用户进程发起I/O读、写操作后,不会被阻塞,当I/O操作真正完成后(数据已被复制到用户空间,可以直接处理),内核使用信号或回调函数进行异步通知用户进程。第一阶段和第二阶段都是是非阻塞的,是真正的异步I/O

asynchronous io

Linux AIO API

  • POSIX AIO API (glibc): aio_read/aio_write/aio_fsync/lio_listio/aio_cancel/aio_suspend/aio_return/aio_error
  • Native Linux AIO API (libaio): io_setup/io_destroy/io_submit/io_getevents/io_cancel

Linux AIO不够成熟,Windows中的IOCP是成熟的异步I/O

2.6 五种I/O模型的比较

下图是五种不同I/O模型的比较。它表明前四个模型之间的主要区别是第一个阶段,因为前四个模型中的第二个阶段是相同的:当数据从内核复制到调用者的缓冲区时,在调用recvfrom时阻塞进程。根据1.4的定义,前四个I/O模型都是同步的,因为真正的I/O操作recvfrom会阻塞进程。但是,asynchronous I/O处理的两个阶段,与前四个模型不同,两个阶段都不会阻塞进程,所以只有它是异步的

Comparison of the I/O Models

代码已上传到 https://github.com/wangy8961/python3-concurrency/tree/master/io-models ,欢迎star

未经允许不得转载: LIFE & SHARE - 王颜公子 » Python3爬虫系列01 (理论) - I/O Models 阻塞 非阻塞 同步 异步

分享

作者

作者头像

Madman

如果博文内容有误或其它任何问题,欢迎留言评论,我会尽快回复; 或者通过QQ、微信等联系我

0 条评论

暂时还没有评论.

发表评论前请先登录