Python3爬虫系列05 (实验) - 多线程并发下载

  • 原创
  • Madman
  • /
  • 2018-10-05 16:42
  • /
  • 0
  • 241 次阅读

spider 05-min.png

Synopsis: I/O密集型最适合使用多线程,当然包括网络I/O。我们要下载多张图片,每次去下载一张图片,就是发起一次HTTP请求(使用TCP协议),客户端首先通过socket.socket()创建一个套接字,然后调用connect()方法经过三次握手与服务端建立TCP连接,这个过程是阻塞的。建立连接后,客户端将请求(要访问图片资源)发送给服务端,然后服务端返回响应,客户端用recv()方法每次接收一定数量的字节,客户端在每个响应报文(一张图片有多个数据包)到达操作系统内核时,是阻塞的。网络I/O对于CPU来说是无比漫长的,如果是依序下载,CPU就要一直阻塞到第1张图片的字节全部下载完成后,才能下载第2张,这些等待的时间(对CPU来说,比它处理数据的时间多出无数倍)就白白浪费了。为了合理利用CPU资源,可以使用多线程,每个线程去下载一张图片,当下载第1张图片的任务阻塞时,CPU切换到第2个线程,它开始下载第2张图片,依次类推,当第1张图片有响应报文到达时,等其它线程阻塞后,CPU又会切换到下载第1张图片的那个线程

代码已上传到 https://github.com/wangy8961/python3-concurrency ,欢迎star

1. threading和Queue

threading模块可以实现多线程,Queue模块创建线程级安全的队列,各线程从队列中取任务并执行

import time
from queue import Queue
from threading import Thread
from common import setup_down_path, get_links, download_one_1
from logger import logger


class ThreadWorker(Thread):
    def __init__(self, queue):
        Thread.__init__(self)
        self.queue = queue

    def run(self):
        while True:
            down_path, linkno, link = self.queue.get()
            download_one_1(down_path, linkno, link)
            self.queue.task_done()


def download_many():
    '''多线程,按线程数 并发(非并行) 下载所有图片'''
    down_path = setup_down_path()
    links = get_links()

    # 创建队列
    queue = Queue()

    # 创建多个线程
    for i in range(4):
        worker = ThreadWorker(queue)
        worker.daemon = True  # 如果工作线程在等待更多的任务时阻塞了,主线程也可以正常退出
        worker.start()  # 启动线程

    # 往队列中投放任务
    for linkno, link in enumerate(links, 1):  # 链接带序号
        logger.info('Queueing No.{} {}'.format(linkno, link))
        queue.put((down_path, linkno, link))

    logger.info('Waiting for all subthread done...')
    # Causes the main thread to wait for the queue to finish processing all the tasks
    queue.join()
    logger.info('All subthread done.')

    return len(links)


if __name__ == '__main__':
    t0 = time.time()
    count = download_many()
    msg = '{} flags downloaded in {} seconds.'
    logger.info(msg.format(count, time.time() - t0))

多线程的测试结果:

线程数 用时 备注
4 12.12秒 for i in range(4)
8 6.15秒 for i in range(8)
16 3.23秒 for i in range(16)
32 1.88秒 for i in range(32)
64 1.58秒 for i in range(64)
128 1.60秒 for i in range(128)

依序下载是50秒,多进程最好的结果是5.90秒,而多线程只需要1.58秒,比依序下载效率提升了30多倍!

现在我们来分析一下,为什么多线程会提升效率?

多进程时,进程间切换的开销非常大,而多线程也会有切换,但这种系统开销变得很小。从运行时间上看,多线程似乎已经解决了切换开销大的问题。而且可支持的任务数量规模,也变成了数百个到数千个。

有人可能会问了,我们默认使用的CPython解释器本身就不是线程安全的,因此有GIL(Global Interpreter Lock,全局解释器锁)的限制(这是 CPython 解释器的局限,与 Python 语言本身无关。 Jython 和 IronPython 没有这种限制。不过,目前最快的Python解释器 PyPy 也有GIL),一次只允许使用一个线程执行 Python 字节码,因此,一个 Python 进程通常不能同时使用多个 CPU 核心。

问题1: 多线程脚本和依序下载脚本都是使用一个CPU核心(一个进程),只不过多线程脚本创建了多个线程,但是Python 线程受GIL的限制,任何时候都只允许运行一个线程,那么为什么多线程脚本会比依序下载脚本快30倍?

答案是: Python标准库中所有执行阻塞型 I/O 操作的函数,在等待操作系统返回结果时都会释放GIL。这意味着在Python 语言这个层次上可以使用多线程,而 I/O 密集型 Python 程序能从中受益:一个 Python 线程在等待网络响应时,阻塞型 I/O 函数会释放 GIL,再运行另一个线程。 因此,尽管有GIL,Python 线程还是能在 I/O 密集型应用中发挥作用。 (备注:time.sleep()函数也会释放 GIL,因此在多线程中它并不会阻塞其它线程)

问题2: 处理CPU密集型任务时,使用ProcessPoolExecutor模块能绕开GIL,利用所有可用的 CPU 核心。但是在I/O密集型中,多进程为什么没有多线程效果好?

答案是: 你参考我前一篇多进程文章也说明了,1. 创建进程比创建线程开销大 2. 当进程数大于CPU核心数时,必然出现进程间切换,而进程切换比线程切换开锁也大。

多进程脚本最少也要5.90秒,而多线程只需要1.58秒,主要原因可能是,我的电脑用的是四核 CPU,因此限制只能有 4 个并发下载(其它12个等着进程切换才能使用CPU),而使用线程池的版本有64个工作的线程。

2. ThreadPoolExecutor

2.1 Executor.map()

(1) 每个线程调用的函数接受一个参数

from concurrent import futures


def download_many_1():
    '''多线程,按线程数 并发(非并行) 下载所有图片
    使用concurrent.futures.ThreadPoolExecutor()
    Executor.map()使用Future而不是返回Future,它返回迭代器,
    迭代器的__next__()方法调用各个Future的result()方法,因此我们得到的是各个Future的结果,而非Future本身

    注意Executor.map()限制了download_one()只能接受一个参数,所以images是字典构成的列表
    '''
    down_path = setup_down_path()
    links = get_links()

    images = []
    for linkno, link in enumerate(links, 1):
        image = {
            'path': down_path,
            'linkno': linkno,
            'link': link
        }
        images.append(image)

    workers = min(64, len(links))  # 保证线程池中的线程不会多于总的下载任务数
    # with语句将调用executor.__exit__()方法,而这个方法会调用executor.shutdown(wait=True)方法,它会在所有进程都执行完毕前阻塞主进程
    with futures.ThreadPoolExecutor(workers) as executor:
        # executor.map()效果类似于内置函数map(),但download_one()函数会在多个线程中并发调用
        # 它的返回值res是一个迭代器<itertools.chain object>,我们后续可以迭代获取各个被调用函数的返回值
        res = executor.map(download_one, images)  # 传一个序列

    return len(list(res))  # 如果有进程抛出异常,异常会在这里抛出,类似于迭代器中隐式调用next()的效果

(2) 每个线程调用的函数接受多个参数

from functools import partial
from concurrent import futures


def download_many_2():
    '''多线程,按线程数 并发(非并行) 下载所有图片
    使用concurrent.futures.ThreadPoolExecutor()
    Executor.map()中的调用函数如果要接受多个参数,可以给Executor.map()传多个序列
    参考:https://yuanjiang.space/threadpoolexecutor-map-method-with-multiple-parameters
    '''
    down_path = setup_down_path()
    links = get_links()

    # 固定住保存的路径,不用每次调用下载图片函数时都传同样的down_path参数
    download_one_1_partial = partial(download_one_1, down_path)

    # 创建包含所有linkno的序列
    linknos = [i for i in range(len(links))]

    workers = min(64, len(links))  # 保证线程池中的线程不会多于总的下载任务数
    with futures.ThreadPoolExecutor(workers) as executor:
        res = executor.map(download_one_1_partial, linknos, links)  # 给Executor.map()传多个序列

    return len(list(res))

2.2 Executor.submit()和concurrent.futures.as_completed()

为了演示Executor.map()内部是怎么工作的:

def download_many_3():
    '''多线程,按线程数 并发(非并行) 下载所有图片
    使用concurrent.futures.ThreadPoolExecutor()
    不使用Executor.map(),而使用Executor.submit()和concurrent.futures.as_completed()
    Executor.submit()方法会返回Future,而Executor.map()是使用Future
    '''
    down_path = setup_down_path()
    links = get_links()

    # 固定住保存的路径,不用每次调用下载图片函数时都传同样的down_path参数
    download_one_1_partial = partial(download_one_1, down_path)

    workers = min(64, len(links))  # 保证线程池中的线程不会多于总的下载任务数
    with futures.ThreadPoolExecutor(workers) as executor:
        to_do = []
        # 创建并排定Future
        for linkno, link in enumerate(links, 1):  # 链接带序号
            future = executor.submit(download_one_1_partial, linkno, link)
            to_do.append(future)
            logger.debug('Scheduled for No.{} {}: {}'.format(linkno, link, future))

        results = []
        # 获取Future的结果,futures.as_completed(to_do)的参数是Future列表,返回迭代器,
        # 只有当有Future运行结束后,才产出future
        for future in futures.as_completed(to_do):  # future变量表示已完成的Future对象,所以后续future.result()绝不会阻塞
            res = future.result()
            results.append(res)
            logger.debug('{} result: {!r}'.format(future, res))

    return len(results)

代码已上传到 https://github.com/wangy8961/python3-concurrency ,欢迎star

未经允许不得转载: LIFE & SHARE - 王颜公子 » Python3爬虫系列05 (实验) - 多线程并发下载

分享

作者

作者头像

Madman

如果博文内容有误或其它任何问题,欢迎留言评论,我会尽快回复; 或者通过QQ、微信等联系我

0 条评论

暂时还没有评论.

发表评论前请先登录