Python 3 爬虫|第4章:多进程并发下载
Synopsis: I/O 密集型适合使用多线程,CPU 密集型适合使用多进程。当然,我们还是可以利用多进程将下载速度有一定的提升。Python 3 中可以使用 multiprocessing 模块和 concurrent.futures.ProcessPoolExecutor 进程池模块,实现多进程
代码已上传到 https://github.com/wangy8961/python3-concurrency ,欢迎 star
1. 使用multiprocessing
承接 Python 3 爬虫|第3章:同步阻塞下载 ,我们试一下用 多进程
的效果会怎样?
1.1 Pool.apply_async()
创建 processpool.py
模块:
import time from multiprocessing import Pool from common import setup_down_path, get_links, download_one from logger import logger def download_many(): '''多进程,按进程数 并行 下载所有图片 使用multiprocessing.Pool.apply_async() ''' down_path = setup_down_path() links = get_links() p = Pool(4) # 指定进程池中的进程数 for linkno, link in enumerate(links, 1): image = { 'path': down_path, 'linkno': linkno, 'link': link } p.apply_async(download_one, args=(image,)) logger.info('Waiting for all subprocesses done...') p.close() # 关闭进程池 p.join() # 主进程等待进程池中的所有子进程结束 logger.info('All subprocesses done.') return len(links) if __name__ == '__main__': t0 = time.time() count = download_many() msg = '{} flags downloaded in {} seconds.' logger.info(msg.format(count, time.time() - t0))
我的 Win10 主机有 4 颗 CPU 核心:
多进程
的测试结果:
进程线 | 用时 | 备注 |
---|---|---|
4 | 13.86秒 | p = Pool(4) |
8 | 8.88秒 | p = Pool(8) |
16 | 5.90秒 | p = Pool(16) |
32 | 7.48秒 | p = Pool(32) |
64 | 12.48秒 | p = Pool(64) |
128 | 23.08秒 | p = Pool(128) |
上一篇文章中,依序下载平均用时 50 秒
,理论上 4
个进程应该是 12.5 秒
,而测试的结果是 13.86 秒
,原因是,创建进程需要时间,比如使用 128 个进程测试,你会发现前面要等待很长一段时间才会有进程真正开始下载。另外,当进程数大于 CPU 核心数时,必然会发生 进程间切换
(其实就算只有 4 个进程也会有进程切换,因为系统开机后就运行了很多进程,也就是说你开 4 个进程来下载,同一时刻也并不是会有 4 个下载进程都在 4 核 CPU 上同时运行),开销非常大,所以你会发现 并不是 启动越多进程效率越高
一般服务器资源是有限的,操作系统在稳定运行的前提下,可以同时处理的进程数在数十个到数百个规模。如果进程数量规模更大,系统运行将不稳定,而且可用内存资源往往也会不足
1.2 Pool.map()
Pool.map()
类似于内置的 map()
函数,它将 可迭代的
序列映射到调用的函数上,注意: 调用的函数只能接受一个参数
def download_many_1(): '''多进程,按进程数 并行 下载所有图片 使用multiprocessing.Pool.map(download_one, images) 注意Pool.map()限制了download_one()只能接受一个参数,所以images是字典构成的列表 ''' down_path = setup_down_path() links = get_links() images = [] for linkno, link in enumerate(links, 1): image = { 'path': down_path, 'linkno': linkno, 'link': link } images.append(image) with Pool(4) as p: p.map(download_one, images) # 将images序列依次映射给download_one()函数 logger.info('Waiting for all subprocesses done...') # p.close() # 使用with语句和Pool.map()后,会自动调用Pool.close()和Pool.join() # p.join() logger.info('All subprocesses done.') return len(links)
1.3 Pool.starmap()
Python 3.3 添加了 Pool.starmap()
方法,它可以将 元组
组成的序列依次映射给调用的函数上,自动 解包元组
给调用函数的多个参数
先在 common.py
模块中增加一个接受多个参数的 download_one_1(path, linkno, link)
函数:
def download_one_1(path, linkno, link): '''下载一张图片 :param path: 图片的保存目录 :param linkno: 图片的序号 :param link: 图片的URL ''' logger.info('Downloading No.{} [{}]'.format(linkno, link)) t0 = time.time() resp = requests.get(link) filename = os.path.split(link)[1] with open(os.path.join(path, filename), 'wb') as f: f.write(resp.content) t1 = time.time() logger.info('Task No.{} [{}] runs {} seconds.'.format(linkno, link, t1 - t0))
然后在 processpool.py
模块中:
from common import download_one_1 def download_many_2(): '''多进程,按进程数 并行 下载所有图片 使用multiprocessing.Pool.starmap(download_one_1, images),它是Python-3.3添加的 可以给download_one_1()函数传元组组成的序列,会自动解包元组给函数的多个参数 ''' down_path = setup_down_path() links = get_links() images = [] for linkno, link in enumerate(links, 1): images.append((down_path, linkno, link)) with Pool(4) as p: p.starmap(download_one_1, images) # 链接带序号 logger.info('Waiting for all subprocesses done...') # p.close() # p.join() logger.info('All subprocesses done.') return len(links)
由于下载每张图片时的保存目录都相同,可以使用 functools.partial()
固定住这个参数:
def download_many_3(): '''多进程,按进程数 并行 下载所有图片 使用multiprocessing.Pool.starmap(download_one_1, images),它是Python-3.3添加的 可以给download_one_1()函数传元组组成的序列,会自动解包元组给函数的多个参数 由于下载每张图片时的保存目录都相同,可以使用functools.partial()固定住这个参数 ''' down_path = setup_down_path() links = get_links() # 固定住保存的路径,不用每次调用下载图片函数时都传同样的down_path参数 download_one_1_partial = partial(download_one_1, down_path) images = [] for linkno, link in enumerate(links, 1): images.append((linkno, link)) # 每个元组将不包含保存的目录 with Pool(4) as p: p.starmap(download_one_1_partial, images) # 链接带序号 logger.info('Waiting for all subprocesses done...') # p.close() # p.join() logger.info('All subprocesses done.') return len(links)
2. 使用 ProcessPoolExecutor
从 Python 3.2 开始提供了 concurrent.futures
模块,它为 Python 多进程和多线程提供了统一格式的接口,多进程使用 concurrent.futures.ProcessPoolExecutor
,多线程使用 concurrent.futures.ThreadPoolExecutor
,让多任务变得更简单。当然,如果你想灵活设置多任务,还是可以使用 multiprocessing
(多进程模块)和 threading
(多线程模块)
2.1 Executor.map()
(1) 每个进程调用的函数接受一个参数
from concurrent import futures def download_many_4(): '''多进程,按进程数 并行 下载所有图片 使用concurrent.futures.ProcessPoolExecutor() Executor.map()使用Future而不是返回Future,它返回迭代器, 迭代器的__next__()方法调用各个Future的result()方法,因此我们得到的是各个Future的结果,而非Future本身 注意Executor.map()限制了download_one()只能接受一个参数,所以images是字典构成的列表 ''' down_path = setup_down_path() links = get_links() images = [] for linkno, link in enumerate(links, 1): image = { 'path': down_path, 'linkno': linkno, 'link': link } images.append(image) # with语句将调用executor.__exit__()方法,而这个方法会调用executor.shutdown(wait=True)方法,它会在所有进程都执行完毕前阻塞主进程 with futures.ProcessPoolExecutor(max_workers=16) as executor: # 不指定max_workers时,进程池中进程个数默认为os.cpu_count() # executor.map()效果类似于内置函数map(),但download_one()函数会在多个进程中并行调用 # 它的返回值res是一个迭代器<itertools.chain object>,我们后续可以迭代获取各个被调用函数的返回值 res = executor.map(download_one, images) # 传一个序列 return len(list(res)) # 如果有进程抛出异常,异常会在这里抛出,类似于迭代器中隐式调用next()的效果
(2) 每个进程调用的函数接受多个参数
from functools import partial from concurrent import futures def download_many_5(): '''多进程,按进程数 并行 下载所有图片 使用concurrent.futures.ProcessPoolExecutor() Executor.map()中的调用函数如果要接受多个参数,可以给Executor.map()传多个序列 参考:https://yuanjiang.space/threadpoolexecutor-map-method-with-multiple-parameters ''' down_path = setup_down_path() links = get_links() # 固定住保存的路径,不用每次调用下载图片函数时都传同样的down_path参数 download_one_1_partial = partial(download_one_1, down_path) # 创建包含所有linkno的序列 linknos = [i for i in range(len(links))] with futures.ProcessPoolExecutor(max_workers=16) as executor: res = executor.map(download_one_1_partial, linknos, links) # 给Executor.map()传多个序列 return len(list(res))
2.2 Executor.submit() 和 concurrent.futures.as_completed()
为了演示 Executor.map()
内部是怎么工作的:
def download_many_6(): '''多进程,按进程数 并行 下载所有图片 使用concurrent.futures.ProcessPoolExecutor() 不使用Executor.map(),而使用Executor.submit()和concurrent.futures.as_completed() Executor.submit()方法会返回Future,而Executor.map()是使用Future ''' down_path = setup_down_path() links = get_links() # 固定住保存的路径,不用每次调用下载图片函数时都传同样的down_path参数 download_one_1_partial = partial(download_one_1, down_path) with futures.ProcessPoolExecutor(max_workers=16) as executor: to_do = [] # 创建并且排定Future for linkno, link in enumerate(links, 1): # 链接带序号 future = executor.submit(download_one_1_partial, linkno, link) to_do.append(future) logger.debug('Scheduled for No.{} {}: {}'.format(linkno, link, future)) results = [] # 获取Future的结果,futures.as_completed(to_do)的参数是Future列表,返回迭代器, # 只有当有Future运行结束后,才产出future for future in futures.as_completed(to_do): # future变量表示已完成的Future对象,所以后续future.result()绝不会阻塞 res = future.result() results.append(res) logger.debug('{} result: {!r}'.format(future, res)) return len(results)
前面介绍过,像网络 I/O 密集型最适合使用 多线程
,所以下一篇将介绍 多线程
下载
代码已上传到 https://github.com/wangy8961/python3-concurrency ,欢迎 star
0 条评论
评论者的用户名
评论时间暂时还没有评论.