Python3爬虫系列01 (理论) - I/O Models 阻塞 非阻塞 同步 异步

  • 原创
  • Madman
  • /
  • /
  • 0
  • 2259 次阅读

spider 01-min.png

Synopsis: Richard Stevens所著的《UNIX® Network Programming Volume 1, Third Edition: The Sockets Networking》一书中,6.2章节"IO Models"列出了五种IO模型,本文将详细介绍这几种IO模型,并说明阻塞(blocking)和非阻塞(non-blocking)的区别、同步(synchronous)和异步(asynchronous)的区别

代码已上传到 https://github.com/wangy8961/python3-concurrency/tree/master/io-models ,欢迎star

1. 预备知识

1.1 CPU Ring

对于操作系统而言,稳定且可靠地运行是最重要的。现行技术方案是将内核与用户进程、用户进程与用户进程之间进行分离,内核可以管理用户进程,但是用户进程之间不能互相干扰,更不能"侵入"内核,即使用户的程序崩溃了,内核也不会受影响。

为了提升计算机安全,避免恶意操作,CPU的硬件机制中一般将使用权划分为四个特权级别:

cpu ring

Ring 0级别最高,可以执行一切指令,包括像清空内存、磁盘I/O操作等特权指令(privilege instruction)和其它非特权指令,内核代码运行在这个模式下;Ring 3级别最低,只能执行非特权指令,用户进程运行在这个模式下。所以CPU模式(CPU models)可以划分为:

  • kernel mode,也叫内核态
  • user mode,也叫用户态

计算机开机启动后,首先会加载内核,由于占了先机,操作系统内核将自己设置为最高级别,而之后创建的用户进程都设置为最低级别。这样内核就能控制CPU、内存、磁盘等一切资源,而用户进程不能直接使用这些资源。例如,如果用户进程可以直接使用磁盘,就没必要在内核中实现一套文件系统的权限管理了

1.2 Kernel space vs. User space

不管是内核代码还是用户程序代码都需要加载到内存中,如果不对内存进行管理,就会出现用户代码之间、用户代码与内核之间出现被覆盖的情况,所以内核将内存划分成两部分:

  • 内核空间(kernel space): 内核代码的运行空间
  • 用户空间(user space): 用户应用程序代码的运行空间

kernel-space-vs-user-space

用户进程只能访问用户空间,而内核可以访问所有内存。因为内核已将用户进程设置为最低级别,它只能运行在CPU的Ring 3上,所以如果用户进程要进行磁盘I/O或网络I/O时,只能通过系统调用(system call)将请求发给内核,由内核代为执行相应的指令(CPU模式由用户态转成内核态),数据会先缓存到内核空间中,然后内核将数据从内核空间拷贝到用户空间中,之后用户进程才能继续处理数据(CPU模式由内核态转成用户态

system call

Linux System Call Table

1.3 blocking vs. non-blocking

  • 阻塞(blocking): 用户进程在等待某个操作完成期间,自身无法继续干别的事情,则称进程在该操作上是阻塞的。Blocking I/O means that the calling system does not return control to the caller until the operation is finished
  • 非阻塞(non-blocking): 用户进程在等待某个操作完成期间,自身可以继续干别的事情,则称进程在该操作上是非阻塞的。A non-blocking synchronous call returns control to the caller immediately. The caller is not made to wait, and the invoked system immediately returns one of two responses: If the call was executed and the results are ready, then the caller is told of that. Alternatively, the invoked system can tell the caller that the system has no resources (no data in the socket) to perform the requested action. In that case, it is the responsibility of the caller may repeat the call until it succeeds. For example, a read() operation on a socket in non-blocking mode may return the number of read bytes or a special return code -1 with errno set to EWOULBLOCK/EAGAIN, meaning "not ready; try again later."

1.4 Synchronous I/O vs. Asynchronous I/O

  • A synchronous I/O operation causes the requesting process to be blocked until that I/O operation completes.
  • An asynchronous I/O operation does not cause the requesting process to be blocked.

如果用户进程因为I/O操作而阻塞的话,那就是同步I/O,否则是异步I/O。后续要介绍的blocking I/Ononblocking I/OI/O multiplexingsignal driven I/O这四种I/O模型都是同步I/O,因为第二阶段中的真正I/O操作(比如,recvfrom)会阻塞用户进程,只有asynchronous I/O模型才是异步I/O

2. Unix中五种I/O模型

  • blocking I/O
  • nonblocking I/O
  • I/O multiplexing (select, poll, epoll, kqueque)
  • signal driven I/O (SIGIO)
  • asynchronous I/O (the POSIX aio_read functions)

例如,用户进程要读取网络数据或磁盘数据时(Input),数据会经过两个阶段:

  • 内核空间等待数据准备完成。Waiting for the data to be ready
  • 将数据从内核空间拷贝到用户空间中。Copying the data from the kernel to the process

网络套接字的输入操作第一步是等待网络数据包(对CPU来说耗时特别久),当数据包到达时,它先被复制到内核的缓冲区中,第二步是将这些数据从内核的缓冲区复制到我们的应用程序缓冲区中(速度快)。上述五种I/O模型的区别就在于I/O经历的两个阶段的不同上

2.1 blocking I/O

默认情况下,所有套接字都是阻塞的。下图中我们使用UDP的数据报套接字来说明网络I/O的两个阶段:

blocking io

首先是我们的用户进程运行(左边),当需要获取网络数据报(datagram)时,用户进程只能通过recvfrom系统调用将请求发给内核,然后在内核中运行(右边)

用户进程在两个阶段都是阻塞的,这期间不能做任何其它事情,直到数据被拷贝到用户空间(或发生错误,如系统调用被信号中断)后,我们的应用程序才能够继续处理数据报。即用户进程从调用recvfrom到有数据返回的整个时间内,进程都是被阻塞的,所以它是同步I/O

举例来说,如果要下载1000张图片,用阻塞I/O(blocking I/O)模型的话,必须依序下载,在等待第1张图片数据时,整个用户进程被阻塞,只有第1张图片数据到达内核,并复制到用户空间后,才能保存到本地磁盘,然后依次类推,下载其它图片

(1) 单进程TCP Server

如果Web服务器采用这种模式的话,那么一次只能为一个客户服务(注意:当服务器为这个客户服务的时候,只要服务器的listen队列还有空闲,那么当其它新的客户端发起连接后,服务器就会为新客户端建立连接,并且新客户端也可以发送数据,但服务器还不会处理。只有当第1个客户关闭连接后,服务器才会一次性将第2个客户发送的所有数据接收完,并继续只为第2个客户服务,依次类推):

#!/usr/bin/env python3
# -*- coding: utf-8 -*-
# TCP Echo Server,单进程,阻塞 blocking I/O
import socket


# 创建监听socket
server_sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)

# socket默认不支持地址复用,OSError: [Errno 98] Address already in use
server_sock.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)

# 绑定IP地址和固定端口
server_address = ('', 9090)
print('TCP Server starting up on port {}'.format(server_address[1]))
server_sock.bind(server_address)

# socket默认是主动连接,调用listen()函数将socket变为被动连接,这样就可以接收客户端连接了
server_sock.listen(5)

try:
    while True:
        print('Main Process, waiting for client connection...')

        # client_sock是专为这个客户端服务的socket,client_addr是包含客户端IP和端口的元组
        client_sock, client_addr = server_sock.accept()
        print('Client {} is connected'.format(client_addr))

        try:
            while True:
                # 接收客户端发来的数据,阻塞,直到有数据到来
                # 事实上,除非当前客户端关闭后,才会跳转到外层的while循环,即一次只能服务一个客户
                # 如果客户端关闭了连接,data是空字符串
                data = client_sock.recv(4096)
                if data:
                    print('Received {}({} bytes) from {}'.format(data, len(data), client_addr))
                    # 返回响应数据,将客户端发送来的数据原样返回
                    client_sock.send(data)
                    print('Sent {} to {}'.format(data, client_addr))
                else:
                    print('Client {} is closed'.format(client_addr))
                    break
        finally:
            # 关闭为这个客户端服务的socket
            client_sock.close()
finally:
    # 关闭监听socket,不再响应其它客户端连接
    server_sock.close()

TCP客户端测试代码如下,可以观察如果指定50个客户端时,服务端输出结果和客户端输出结果:

#!/usr/bin/env python3
# -*- coding: utf-8 -*-
import time
from datetime import datetime
import socket


server_ip = input('Please enter the TCP server ip: ')
server_port = int(input('Enter the TCP server port: '))
client_num = int(input('Enter the TCP clients count: '))

# 保存所有已成功连接的客户端TCP socket
client_socks = []

for i in range(client_num):
    sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
    sock.connect((server_ip, server_port))
    client_socks.append(sock)
    print('Client {}[ID: {}] has connected to {}'.format(sock, i, (server_ip, server_port)))

while True:
    for s in client_socks:
        data = str(datetime.now()).encode('utf-8')
        s.send(data)
        print('Client {} has sent {} to {}'.format(s, data, (server_ip, server_port)))
    # 睡眠3秒后,继续让每个客户端连接向TCP Server发送数据
    time.sleep(3)

Windows平台也可以下载 Hercules SETUP utility,先打开一个Hercules使用TCP Client去连接服务器,如果再打开一个Hercules,可以发现,也能够连接且可以发送数据,但服务器不会处理数据也就不会返回(此时,在Linux服务器上watch -n 1 'netstat|grep tcp查看TCP连接的状态有很多SYN_RECV

(2) 多进程TCP Server

由于上面单进程版本中,client_sock.recv(4096)会一直阻塞,所以实际上并不能跳转到外层while循环中去为其它新的客户端创建socket,只能一次为一个客户服务。这根本满足不了实际应用需要,为了实现并发处理多个客户端请求,可以使用多进程,应用程序的主进程只负责为每一个新的客户端连接创建socket,然后为每个客户创建一个子进程,用来分别处理每个客户的数据:

#!/usr/bin/env python3
# -*- coding: utf-8 -*-
# TCP Echo Server,多进程,阻塞 blocking I/O
import os
import socket
from multiprocessing import Process


def client_handler(client_sock, client_addr):
    
                                
                            
  • picawen
  • stevenszhou20
  • xiyao
  • 高压锅嚼起来咯牙
  • 794754074
  • luciferlg
  • moya
  • 习惯过了敏
  • x00miya
  • CME11030
  • 大伟123
  • YLcharmyl
未经允许不得转载: LIFE & SHARE - 王颜公子 » Python3爬虫系列01 (理论) - I/O Models 阻塞 非阻塞 同步 异步

分享

作者

作者头像

Madman

如需 Linux / Python 相关问题付费解答,请按如下方式联系我

0 条评论

暂时还没有评论.