Python 3 爬虫|第4章:多进程并发下载

  • 原创
  • Madman
  • /
  • /
  • 0
  • 13940 次阅读

Python 3 爬虫-min.png

Synopsis: I/O 密集型适合使用多线程,CPU 密集型适合使用多进程。当然,我们还是可以利用多进程将下载速度有一定的提升。Python 3 中可以使用 multiprocessing 模块和 concurrent.futures.ProcessPoolExecutor 进程池模块,实现多进程

代码已上传到 https://github.com/wangy8961/python3-concurrency ,欢迎 star

1. 使用multiprocessing

承接 Python 3 爬虫|第3章:同步阻塞下载 ,我们试一下用 多进程 的效果会怎样?

1.1 Pool.apply_async()

创建 processpool.py 模块:

import time
from multiprocessing import Pool
from common import setup_down_path, get_links, download_one
from logger import logger


def download_many():
    '''多进程,按进程数 并行 下载所有图片
    使用multiprocessing.Pool.apply_async()
    '''
    down_path = setup_down_path()
    links = get_links()

    p = Pool(4)  # 指定进程池中的进程数
    for linkno, link in enumerate(links, 1):
        image = {
            'path': down_path,
            'linkno': linkno,
            'link': link
        }
        p.apply_async(download_one, args=(image,))

    logger.info('Waiting for all subprocesses done...')
    p.close()  # 关闭进程池
    p.join()  # 主进程等待进程池中的所有子进程结束
    logger.info('All subprocesses done.')

    return len(links)


if __name__ == '__main__':
    t0 = time.time()
    count = download_many()
    msg = '{} flags downloaded in {} seconds.'
    logger.info(msg.format(count, time.time() - t0))

我的 Win10 主机有 4 颗 CPU 核心:

In [1]: import os

In [2]: os.cpu_count()
Out[2]: 4

多进程 的测试结果:

进程线 用时 备注
4 13.86秒 p = Pool(4)
8 8.88秒 p = Pool(8)
16 5.90秒 p = Pool(16)
32 7.48秒 p = Pool(32)
64 12.48秒 p = Pool(64)
128 23.08秒 p = Pool(128)

上一篇文章中,依序下载平均用时 50 秒,理论上 4 个进程应该是 12.5 秒,而测试的结果是 13.86 秒,原因是,创建进程需要时间,比如使用 128 个进程测试,你会发现前面要等待很长一段时间才会有进程真正开始下载。另外,当进程数大于 CPU 核心数时,必然会发生 进程间切换(其实就算只有 4 个进程也会有进程切换,因为系统开机后就运行了很多进程,也就是说你开 4 个进程来下载,同一时刻也并不是会有 4 个下载进程都在 4 核 CPU 上同时运行),开销非常大,所以你会发现 并不是 启动越多进程效率越高

一般服务器资源是有限的,操作系统在稳定运行的前提下,可以同时处理的进程数在数十个到数百个规模。如果进程数量规模更大,系统运行将不稳定,而且可用内存资源往往也会不足

1.2 Pool.map()

Pool.map() 类似于内置的 map() 函数,它将 可迭代的 序列映射到调用的函数上,注意: 调用的函数只能接受一个参数

def download_many_1():
    '''多进程,按进程数 并行 下载所有图片
    使用multiprocessing.Pool.map(download_one, images)
    注意Pool.map()限制了download_one()只能接受一个参数,所以images是字典构成的列表
    '''
    down_path = setup_down_path()
    links = get_links()

    images = []
    for linkno, link in enumerate(links, 1):
        image = {
            'path': down_path,
            'linkno': linkno,
            'link': link
        }
        images.append(image)

    with Pool(4) as p:
        p.map(download_one, images)  # 将images序列依次映射给download_one()函数

    logger.info('Waiting for all subprocesses done...')
    # p.close()  # 使用with语句和Pool.map()后,会自动调用Pool.close()和Pool.join()
    # p.join()
    logger.info('All subprocesses done.')

    return len(links)

1.3 Pool.starmap()

Python 3.3 添加了 Pool.starmap() 方法,它可以将 元组 组成的序列依次映射给调用的函数上,自动 解包元组 给调用函数的多个参数

先在 common.py 模块中增加一个接受多个参数的 download_one_1(path, linkno, link) 函数:

def download_one_1(path, linkno, link):
    '''下载一张图片
    :param path: 图片的保存目录
    :param linkno: 图片的序号
    :param link: 图片的URL
    '''
    logger.info('Downloading No.{} [{}]'.format(linkno, link))
    t0 = time.time()

    resp = requests.get(link)
    filename = os.path.split(link)[1]
    with open(os.path.join(path, filename), 'wb') as f:
        f.write(resp.content)

    t1 = time.time()
    logger.info('Task No.{} [{}] runs {} seconds.'.format(linkno, link, t1 - t0))

然后在 processpool.py 模块中:

from common import download_one_1


def download_many_2():
    '''多进程,按进程数 并行 下载所有图片
    使用multiprocessing.Pool.starmap(download_one_1, images),它是Python-3.3添加的
    可以给download_one_1()函数传元组组成的序列,会自动解包元组给函数的多个参数
    '''
    down_path = setup_down_path()
    links = get_links()

    images = []
    for linkno, link in enumerate(links, 1):
        images.append((down_path, linkno, link))

    with Pool(4) as p:
        p.starmap(download_one_1, images)  # 链接带序号

    logger.info('Waiting for all subprocesses done...')
    # p.close()
    # p.join()
    logger.info('All subprocesses done.')

    return len(links)

由于下载每张图片时的保存目录都相同,可以使用 functools.partial() 固定住这个参数:

def download_many_3():
    '''多进程,按进程数 并行 下载所有图片
    使用multiprocessing.Pool.starmap(download_one_1, images),它是Python-3.3添加的
    可以给download_one_1()函数传元组组成的序列,会自动解包元组给函数的多个参数
    由于下载每张图片时的保存目录都相同,可以使用functools.partial()固定住这个参数
    '''
    down_path = setup_down_path()
    links = get_links()

    # 固定住保存的路径,不用每次调用下载图片函数时都传同样的down_path参数
    download_one_1_partial = partial(download_one_1, down_path)

    images = []
    for linkno, link in enumerate(links, 1):
        images.append((linkno, link))  # 每个元组将不包含保存的目录

    with Pool(4) as p:
        p.starmap(download_one_1_partial, images)  # 链接带序号

    logger.info('Waiting for all subprocesses done...')
    # p.close()
    # p.join()
    logger.info('All subprocesses done.')

    return len(links)

2. 使用 ProcessPoolExecutor

从 Python 3.2 开始提供了 concurrent.futures 模块,它为 Python 多进程和多线程提供了统一格式的接口,多进程使用 concurrent.futures.ProcessPoolExecutor,多线程使用 concurrent.futures.ThreadPoolExecutor,让多任务变得更简单。当然,如果你想灵活设置多任务,还是可以使用 multiprocessing(多进程模块)和 threading(多线程模块)

2.1 Executor.map()

(1) 每个进程调用的函数接受一个参数

from concurrent import futures


def download_many_4():
    '''多进程,按进程数 并行 下载所有图片
    使用concurrent.futures.ProcessPoolExecutor()
    Executor.map()使用Future而不是返回Future,它返回迭代器,
    迭代器的__next__()方法调用各个Future的result()方法,因此我们得到的是各个Future的结果,而非Future本身

    注意Executor.map()限制了download_one()只能接受一个参数,所以images是字典构成的列表
    '''
    down_path = setup_down_path()
    links = get_links()

    images = []
    for linkno, link in enumerate(links, 1):
        image = {
            'path': down_path,
            'linkno': linkno,
            'link': link
        }
        images.append(image)

    # with语句将调用executor.__exit__()方法,而这个方法会调用executor.shutdown(wait=True)方法,它会在所有进程都执行完毕前阻塞主进程
    with futures.ProcessPoolExecutor(max_workers=16) as executor:  # 不指定max_workers时,进程池中进程个数默认为os.cpu_count()
        # executor.map()效果类似于内置函数map(),但download_one()函数会在多个进程中并行调用
        # 它的返回值res是一个迭代器<itertools.chain object>,我们后续可以迭代获取各个被调用函数的返回值
        res = executor.map(download_one, images)  # 传一个序列

    return len(list(res))  # 如果有进程抛出异常,异常会在这里抛出,类似于迭代器中隐式调用next()的效果

(2) 每个进程调用的函数接受多个参数

from functools import partial
from concurrent import futures


def download_many_5():
    '''多进程,按进程数 并行 下载所有图片
    使用concurrent.futures.ProcessPoolExecutor()
    Executor.map()中的调用函数如果要接受多个参数,可以给Executor.map()传多个序列
    参考:https://yuanjiang.space/threadpoolexecutor-map-method-with-multiple-parameters
    '''
    down_path = setup_down_path()
    links = get_links()

    # 固定住保存的路径,不用每次调用下载图片函数时都传同样的down_path参数
    download_one_1_partial = partial(download_one_1, down_path)

    # 创建包含所有linkno的序列
    linknos = [i for i in range(len(links))]

    with futures.ProcessPoolExecutor(max_workers=16) as executor:
        res = executor.map(download_one_1_partial, linknos, links)  # 给Executor.map()传多个序列

    return len(list(res))

2.2 Executor.submit() 和 concurrent.futures.as_completed()

为了演示 Executor.map() 内部是怎么工作的:

def download_many_6():
    '''多进程,按进程数 并行 下载所有图片
    使用concurrent.futures.ProcessPoolExecutor()
    不使用Executor.map(),而使用Executor.submit()和concurrent.futures.as_completed()
    Executor.submit()方法会返回Future,而Executor.map()是使用Future
    '''
    down_path = setup_down_path()
    links = get_links()

    # 固定住保存的路径,不用每次调用下载图片函数时都传同样的down_path参数
    download_one_1_partial = partial(download_one_1, down_path)

    with futures.ProcessPoolExecutor(max_workers=16) as executor:
        to_do = []
        # 创建并且排定Future
        for linkno, link in enumerate(links, 1):  # 链接带序号
            future = executor.submit(download_one_1_partial, linkno, link)
            to_do.append(future)
            logger.debug('Scheduled for No.{} {}: {}'.format(linkno, link, future))

        results = []
        # 获取Future的结果,futures.as_completed(to_do)的参数是Future列表,返回迭代器,
        # 只有当有Future运行结束后,才产出future
        for future in futures.as_completed(to_do):  # future变量表示已完成的Future对象,所以后续future.result()绝不会阻塞
            res = future.result()
            results.append(res)
            logger.debug('{} result: {!r}'.format(future, res))

    return len(results)

前面介绍过,像网络 I/O 密集型最适合使用 多线程,所以下一篇将介绍 多线程 下载

代码已上传到 https://github.com/wangy8961/python3-concurrency ,欢迎 star

未经允许不得转载: LIFE & SHARE - 王颜公子 » Python 3 爬虫|第4章:多进程并发下载

分享

作者

作者头像

Madman

如需 Linux / Python 相关问题付费解答,请按如下方式联系我

0 条评论

暂时还没有评论.