Python3爬虫系列04 (实验) - 多进程并发下载

  • 原创
  • Madman
  • /
  • 2018-10-04 14:16
  • /
  • 0
  • 598 次阅读

spider 04-min.jpg

Synopsis: I/O密集型适合使用多线程,CPU密集型适合使用多进程。当然,我们还是可以利用多进程将下载速度有一定的提升。Python3中可以使用multiprocessing模块和concurrent.futures.ProcessPoolExecutor进程池模块,实现多进程

代码已上传到 https://github.com/wangy8961/python3-concurrency ,欢迎star

1. 使用multiprocessing

承接 3 实验 - 同步阻塞下载 ,我们试一下用多进程的效果会怎样?

1.1 Pool.apply_async()

创建processpool.py模块:

import time
from multiprocessing import Pool
from common import setup_down_path, get_links, download_one
from logger import logger


def download_many():
    '''多进程,按进程数 并行 下载所有图片
    使用multiprocessing.Pool.apply_async()
    '''
    down_path = setup_down_path()
    links = get_links()

    p = Pool(4)  # 指定进程池中的进程数
    for linkno, link in enumerate(links, 1):
        image = {
            'path': down_path,
            'linkno': linkno,
            'link': link
        }
        p.apply_async(download_one, args=(image,))

    logger.info('Waiting for all subprocesses done...')
    p.close()  # 关闭进程池
    p.join()  # 主进程等待进程池中的所有子进程结束
    logger.info('All subprocesses done.')

    return len(links)


if __name__ == '__main__':
    t0 = time.time()
    count = download_many()
    msg = '{} flags downloaded in {} seconds.'
    logger.info(msg.format(count, time.time() - t0))

我的Win10主机有4颗CPU核心:

In [1]: import os

In [2]: os.cpu_count()
Out[2]: 4

多进程的测试结果:

进程线 用时 备注
4 13.86秒 p = Pool(4)
8 8.88秒 p = Pool(8)
16 5.90秒 p = Pool(16)
32 7.48秒 p = Pool(32)
64 12.48秒 p = Pool(64)
128 23.08秒 p = Pool(128)

第一篇依序下载平均用时50秒,理论上4个进程应该是12.5秒,而测试的结果是13.86秒,原因是,创建进程需要时间,比如使用128个进程测试,你会发现前面要等待很长一段时间才会有进程真正开始下载。另外,当进程数大于CPU核心数时,必然会发生进程间切换(其实就算只有4个进程也会有进程切换,因为系统开机后就运行了很多进程,也就是说你开4个进程来下载,同一时刻并不是4个下载的进程都在4核CPU上运行),开销非常大,所以你会发现并不是启动越多进程效率越高

一般服务器资源是有限的,操作系统在稳定运行的前提下,可以同时处理的进程数在数十个到数百个规模。如果进程数量规模更大,系统运行将不稳定,而且可用内存资源往往也会不足。

1.2 Pool.map()

Pool.map()类似于内置的map()函数,它将可迭代的序列映射到调用的函数上,注意: 调用的函数只能接受一个参数

def download_many_1():
    '''多进程,按进程数 并行 下载所有图片
    使用multiprocessing.Pool.map(download_one, images)
    注意Pool.map()限制了download_one()只能接受一个参数,所以images是字典构成的列表
    '''
    down_path = setup_down_path()
    links = get_links()

    images = []
    for linkno, link in enumerate(links, 1):
        image = {
            'path': down_path,
            'linkno': linkno,
            'link': link
        }
        images.append(image)

    with Pool(4) as p:
        p.map(download_one, images)  # 将images序列依次映射给download_one()函数

    logger.info('Waiting for all subprocesses done...')
    # p.close()  # 使用with语句和Pool.map()后,会自动调用Pool.close()和Pool.join()
    # p.join()
    logger.info('All subprocesses done.')

    return len(links)

1.3 Pool.startmap()

Python-3.3添加了Pool.startmap()方法,它可以将元组组成的序列依次映射给调用的函数上,自动解包元组给调用函数的多个参数

先在common.py模块中增加一个接受多个参数的download_one_1(path, linkno, link)函数:

def download_one_1(path, linkno, link):
    '''下载一张图片
    :param path: 图片的保存目录
    :param linkno: 图片的序号
    :param link: 图片的URL
    '''
    logger.info('Downloading No.{} [{}]'.format(linkno, link))
    t0 = time.time()

    resp = requests.get(link)
    filename = os.path.split(link)[1]
    with open(os.path.join(path, filename), 'wb') as f:
        f.write(resp.content)

    t1 = time.time()
    logger.info('Task No.{} [{}] runs {} seconds.'.format(linkno, link, t1 - t0))

然后在processpool.py模块中:

from common import download_one_1


def download_many_2():
    '''多进程,按进程数 并行 下载所有图片
    使用multiprocessing.Pool.starmap(download_one_1, images),它是Python-3.3添加的
    可以给download_one_1()函数传元组组成的序列,会自动解包元组给函数的多个参数
    '''
    down_path = setup_down_path()
    links = get_links()

    images = []
    for linkno, link in enumerate(links, 1):
        images.append((down_path, linkno, link))

    with Pool(4) as p:
        p.starmap(download_one_1, images)  # 链接带序号

    logger.info('Waiting for all subprocesses done...')
    # p.close()
    # p.join()
    logger.info('All subprocesses done.')

    return len(links)

由于下载每张图片时的保存目录都相同,可以使用functools.partial()固定住这个参数:

def download_many_3():
    '''多进程,按进程数 并行 下载所有图片
    使用multiprocessing.Pool.starmap(download_one_1, images),它是Python-3.3添加的
    可以给download_one_1()函数传元组组成的序列,会自动解包元组给函数的多个参数
    由于下载每张图片时的保存目录都相同,可以使用functools.partial()固定住这个参数
    '''
    down_path = setup_down_path()
    links = get_links()

    # 固定住保存的路径,不用每次调用下载图片函数时都传同样的down_path参数
    download_one_1_partial = partial(download_one_1, down_path)

    images = []
    for linkno, link in enumerate(links, 1):
        images.append((linkno, link))  # 每个元组将不包含保存的目录

    with Pool(4) as p:
        p.starmap(download_one_1_partial, images)  # 链接带序号

    logger.info('Waiting for all subprocesses done...')
    # p.close()
    # p.join()
    logger.info('All subprocesses done.')

    return len(links)

2. 使用ProcessPoolExecutor

从Python-3.2开始提供了concurrent.futures模块,它为Python多进程和多线程提供了统一格式的接口,多进程使用concurrent.futures.ProcessPoolExecutor,多线程使用concurrent.futures.ThreadPoolExecutor,让多任务变得更简单。当然,如果你想灵活设置多任务,还是可以使用multiprocessing(多进程模块)和threading(多线程模块)

2.1Executor.map()

(1) 每个进程调用的函数接受一个参数

from concurrent import futures


def download_many_4():
    '''多进程,按进程数 并行 下载所有图片
    使用concurrent.futures.ProcessPoolExecutor()
    Executor.map()使用Future而不是返回Future,它返回迭代器,
    迭代器的__next__()方法调用各个Future的result()方法,因此我们得到的是各个Future的结果,而非Future本身

    注意Executor.map()限制了download_one()只能接受一个参数,所以images是字典构成的列表
    '''
    down_path = setup_down_path()
    links = get_links()

    images = []
    for linkno, link in enumerate(links, 1):
        image = {
            'path': down_path,
            'linkno': linkno,
            'link': link
        }
        images.append(image)

    # with语句将调用executor.__exit__()方法,而这个方法会调用executor.shutdown(wait=True)方法,它会在所有进程都执行完毕前阻塞主进程
    with futures.ProcessPoolExecutor(max_workers=16) as executor:  # 不指定max_workers时,进程池中进程个数默认为os.cpu_count()
        # executor.map()效果类似于内置函数map(),但download_one()函数会在多个进程中并行调用
        # 它的返回值res是一个迭代器<itertools.chain object>,我们后续可以迭代获取各个被调用函数的返回值
        res = executor.map(download_one, images)  # 传一个序列

    return len(list(res))  # 如果有进程抛出异常,异常会在这里抛出,类似于迭代器中隐式调用next()的效果

(2) 每个进程调用的函数接受多个参数

from functools import partial
from concurrent import futures


def download_many_5():
    '''多进程,按进程数 并行 下载所有图片
    使用concurrent.futures.ProcessPoolExecutor()
    Executor.map()中的调用函数如果要接受多个参数,可以给Executor.map()传多个序列
    参考:https://yuanjiang.space/threadpoolexecutor-map-method-with-multiple-parameters
    '''
    down_path = setup_down_path()
    links = get_links()

    # 固定住保存的路径,不用每次调用下载图片函数时都传同样的down_path参数
    download_one_1_partial = partial(download_one_1, down_path)

    # 创建包含所有linkno的序列
    linknos = [i for i in range(len(links))]

    with futures.ProcessPoolExecutor(max_workers=16) as executor:
        res = executor.map(download_one_1_partial, linknos, links)  # 给Executor.map()传多个序列

    return len(list(res))

2.2 Executor.submit()和concurrent.futures.as_completed()

为了演示Executor.map()内部是怎么工作的:

def download_many_6():
    '''多进程,按进程数 并行 下载所有图片
    使用concurrent.futures.ProcessPoolExecutor()
    不使用Executor.map(),而使用Executor.submit()和concurrent.futures.as_completed()
    Executor.submit()方法会返回Future,而Executor.map()是使用Future
    '''
    down_path = setup_down_path()
    links = get_links()

    # 固定住保存的路径,不用每次调用下载图片函数时都传同样的down_path参数
    download_one_1_partial = partial(download_one_1, down_path)

    with futures.ProcessPoolExecutor(max_workers=16) as executor:
        to_do = []
        # 创建并且排定Future
        for linkno, link in enumerate(links, 1):  # 链接带序号
            future = executor.submit(download_one_1_partial, linkno, link)
            to_do.append(future)
            logger.debug('Scheduled for No.{} {}: {}'.format(linkno, link, future))

        results = []
        # 获取Future的结果,futures.as_completed(to_do)的参数是Future列表,返回迭代器,
        # 只有当有Future运行结束后,才产出future
        for future in futures.as_completed(to_do):  # future变量表示已完成的Future对象,所以后续future.result()绝不会阻塞
            res = future.result()
            results.append(res)
            logger.debug('{} result: {!r}'.format(future, res))

    return len(results)

前面介绍过,像网络I/O密集型最适合使用多线程,所以下一篇将介绍多线程下载

代码已上传到 https://github.com/wangy8961/python3-concurrency ,欢迎star

未经允许不得转载: LIFE & SHARE - 王颜公子 » Python3爬虫系列04 (实验) - 多进程并发下载

分享

作者

作者头像

Madman

如果博文内容有误或其它任何问题,欢迎留言评论,我会尽快回复; 或者通过QQ、微信等联系我

0 条评论

暂时还没有评论.

发表评论前请先登录