Python 3 爬虫|第5章:多线程并发下载
Synopsis: I/O 密集型最适合使用多线程,当然包括网络 I/O。我们要下载多张图片,每次去下载一张图片,就是发起一次 HTTP 请求(使用 TCP 协议),客户端首先通过 socket.socket() 创建一个套接字,然后调用 connect() 方法经过三次握手与服务端建立 TCP 连接,这个过程是阻塞的。建立连接后,客户端将请求(要访问图片资源)发送给服务端,然后服务端返回响应,客户端用 recv() 方法每次接收一定数量的字节,客户端在每个响应报文(一张图片有多个数据包)到达操作系统内核时,是阻塞的。网络 I/O 对于 CPU 来说是无比漫长的,如果是依序下载,CPU 就要一直阻塞到第 1 张图片的字节全部下载完成后,才能下载第 2 张,这些等待的时间(对 CPU 来说,比它处理数据的时间多出无数倍)就白白浪费了。为了合理利用 CPU 资源,可以使用多线程,每个线程去下载一张图片,当下载第 1 张图片的任务阻塞时,CPU 切换到第 2 个线程,它开始下载第 2 张图片,依次类推,当第 1 张图片有响应报文到达时,等其它线程阻塞后,CPU 又会切换到下载第 1 张图片的那个线程
代码已上传到 https://github.com/wangy8961/python3-concurrency ,欢迎star
1. threading 和 Queue
threading
模块可以实现多线程,Queue
模块创建线程级安全的队列,各线程从队列中取任务并执行
import time from queue import Queue from threading import Thread from common import setup_down_path, get_links, download_one_1 from logger import logger class ThreadWorker(Thread): def __init__(self, queue): Thread.__init__(self) self.queue = queue def run(self): while True: down_path, linkno, link = self.queue.get() download_one_1(down_path, linkno, link) self.queue.task_done() def download_many(): '''多线程,按线程数 并发(非并行) 下载所有图片''' down_path = setup_down_path() links = get_links() # 创建队列 queue = Queue() # 创建多个线程 for i in range(4): worker = ThreadWorker(queue) worker.daemon = True # 如果工作线程在等待更多的任务时阻塞了,主线程也可以正常退出 worker.start() # 启动线程 # 往队列中投放任务 for linkno, link in enumerate(links, 1): # 链接带序号 logger.info('Queueing No.{} {}'.format(linkno, link)) queue.put((down_path, linkno, link)) logger.info('Waiting for all subthread done...') # Causes the main thread to wait for the queue to finish processing all the tasks queue.join() logger.info('All subthread done.') return len(links) if __name__ == '__main__': t0 = time.time() count = download_many() msg = '{} flags downloaded in {} seconds.' logger.info(msg.format(count, time.time() - t0))
多线程
的测试结果:
线程数 | 用时 | 备注 |
---|---|---|
4 | 12.12秒 | for i in range(4) |
8 | 6.15秒 | for i in range(8) |
16 | 3.23秒 | for i in range(16) |
32 | 1.88秒 | for i in range(32) |
64 | 1.58秒 | for i in range(64) |
128 | 1.60秒 | for i in range(128) |
依序下载是 50 秒
,多进程最好的结果是 5.90 秒
,而多线程只需要 1.58 秒
,比依序下载效率提升了 30
多倍!
现在我们来分析一下,为什么 多线程
会提升效率?
多进程
时,进程间切换
的开销非常大,而多线程也会有切换,但这种系统开销变得很小。从运行时间上看,多线程似乎已经解决了切换开销大的问题。而且可支持的任务数量规模,也变成了数百个到数千个
有人可能会问了,我们默认使用的 CPython
解释器本身就不是线程安全的,因此有 GIL(Global Interpreter Lock,全局解释器锁)
的限制(这是 CPython 解释器的局限,与 Python 语言本身无关,Jython 和 IronPython 就没有这种限制。不过,目前最快的 Python 解释器 PyPy 也有 GIL
),一次只允许使用一个线程执行 Python 字节码,因此,一个 Python 进程通常不能同时使用多个 CPU 核心
问题1
: 多线程脚本和依序下载脚本都是使用一个 CPU 核心(一个进程),只不过多线程脚本创建了多个线程,但是 Python 线程受 GIL 的限制,任何时候都只允许运行一个线程,那么为什么多线程脚本会比依序下载脚本快 30 倍?
答案
: Python 标准库中所有执行阻塞型 I/O 操作的函数,在等待操作系统返回结果时都会释放 GIL
。这意味着在 Python 语言这个层次上可以使用多线程,而 I/O 密集型 Python 程序能从中受益: 一个 Python 线程在等待网络响应时,阻塞型 I/O 函数会释放 GIL,再运行另一个线程。因此,尽管有 GIL,Python 线程还是能在 I/O 密集型应用中发挥作用。(备注: time.sleep()
函数也会释放 GIL,因此在多线程中它并不会阻塞其它线程)
问题2
: 处理 CPU 密集型任务时,使用 ProcessPoolExecutor
模块能绕开 GIL,利用所有可用的 CPU 核心。但是在 I/O 密集型中,多进程为什么没有多线程效果好?
答案
: Python 3 爬虫|第4章:多进程并发下载 其实已经说明了: 1. 创建进程比创建线程开销大 2. 当进程数大于 CPU 核心数时,必然出现进程间切换,而进程切换比线程切换开销也大
多进程脚本最少也要 5.90 秒(16 个进程),而多线程只需要 1.58 秒,主要原因可能是,我的电脑用的是 4 核 CPU,因此限制只能有 4 个并发下载(其它 12 个进程需要等待进程切换才能使用 CPU),而使用线程池的版本有 64 个工作的线程
2. ThreadPoolExecutor
2.1 Executor.map()
(1) 每个线程调用的函数接受一个参数
from concurrent import futures def download_many_1(): '''多线程,按线程数 并发(非并行) 下载所有图片 使用concurrent.futures.ThreadPoolExecutor() Executor.map()使用Future而不是返回Future,它返回迭代器, 迭代器的__next__()方法调用各个Future的result()方法,因此我们得到的是各个Future的结果,而非Future本身 注意Executor.map()限制了download_one()只能接受一个参数,所以images是字典构成的列表 ''' down_path = setup_down_path() links = get_links() images = [] for linkno, link in enumerate(links, 1): image = { 'path': down_path, 'linkno': linkno, 'link': link } images.append(image) workers = min(64, len(links)) # 保证线程池中的线程不会多于总的下载任务数 # with语句将调用executor.__exit__()方法,而这个方法会调用executor.shutdown(wait=True)方法,它会在所有进程都执行完毕前阻塞主进程 with futures.ThreadPoolExecutor(workers) as executor: # executor.map()效果类似于内置函数map(),但download_one()函数会在多个线程中并发调用 # 它的返回值res是一个迭代器<itertools.chain object>,我们后续可以迭代获取各个被调用函数的返回值 res = executor.map(download_one, images) # 传一个序列 return len(list(res)) # 如果有进程抛出异常,异常会在这里抛出,类似于迭代器中隐式调用next()的效果
(2) 每个线程调用的函数接受多个参数
from functools import partial from concurrent import futures def download_many_2(): '''多线程,按线程数 并发(非并行) 下载所有图片 使用concurrent.futures.ThreadPoolExecutor() Executor.map()中的调用函数如果要接受多个参数,可以给Executor.map()传多个序列 参考:https://yuanjiang.space/threadpoolexecutor-map-method-with-multiple-parameters ''' down_path = setup_down_path() links = get_links() # 固定住保存的路径,不用每次调用下载图片函数时都传同样的down_path参数 download_one_1_partial = partial(download_one_1, down_path) # 创建包含所有linkno的序列 linknos = [i for i in range(len(links))] workers = min(64, len(links)) # 保证线程池中的线程不会多于总的下载任务数 with futures.ThreadPoolExecutor(workers) as executor: res = executor.map(download_one_1_partial, linknos, links) # 给Executor.map()传多个序列 return len(list(res))
2.2 Executor.submit() 和 concurrent.futures.as_completed()
为了演示 Executor.map()
内部是怎么工作的:
def download_many_3(): '''多线程,按线程数 并发(非并行) 下载所有图片 使用concurrent.futures.ThreadPoolExecutor() 不使用Executor.map(),而使用Executor.submit()和concurrent.futures.as_completed() Executor.submit()方法会返回Future,而Executor.map()是使用Future ''' down_path = setup_down_path() links = get_links() # 固定住保存的路径,不用每次调用下载图片函数时都传同样的down_path参数 download_one_1_partial = partial(download_one_1, down_path) workers = min(64, len(links)) # 保证线程池中的线程不会多于总的下载任务数 with futures.ThreadPoolExecutor(workers) as executor: to_do = [] # 创建并排定Future for linkno, link in enumerate(links, 1): # 链接带序号 future = executor.submit(download_one_1_partial, linkno, link) to_do.append(future) logger.debug('Scheduled for No.{} {}: {}'.format(linkno, link, future)) results = [] # 获取Future的结果,futures.as_completed(to_do)的参数是Future列表,返回迭代器, # 只有当有Future运行结束后,才产出future for future in futures.as_completed(to_do): # future变量表示已完成的Future对象,所以后续future.result()绝不会阻塞 res = future.result() results.append(res) logger.debug('{} result: {!r}'.format(future, res)) return len(results)
代码已上传到 https://github.com/wangy8961/python3-concurrency ,欢迎 star
0 条评论
评论者的用户名
评论时间暂时还没有评论.