Python3爬虫系列09 (实验) - 使用asyncio+aiohttp并发下载

  • 原创
  • Madman
  • /
  • 2018-10-09 17:15
  • /
  • 0
  • 485 次阅读

spider 09-min.jpg

Synopsis: 支持HTTP协议的异步网络I/O库:aiohttp,我们的爬虫需要使用该库的Client功能。需要结合asyncio一起使用,经过测试,单线程的异步编程比多线程版本的性能还要好一些,毕竟没有创建线程的开销和线程间上下文切换。另外,如果你下载的是视频等大文件,此时,将网络数据保存到本地磁盘的这个过程,本身是阻塞的,所以它会阻塞事件循环。asyncio不支持磁盘I/O异步,需要使用aiofiles,其实它背后也只是创建一个线程池而已。后续爬虫实战时,动辄下载数十万个文件,为保持控制台清爽,需要使用 progressbar2 显示进度条,而详细信息将保存到日志文件中

代码已上传到 https://github.com/wangy8961/python3-concurrency ,欢迎star

1. aiohttp

Asynchronous HTTP Client/Server for asyncio and Python

上一篇博客介绍的asyncio提供了基于socket的异步I/O,支持TCPUDP协议,但是不支持应用层协议HTTP,而aiohttp就是为此而生

1.1 安装

# pip install aiohttp

1.2 基本使用

import aiohttp
import asyncio

async def fetch(session, url):
    async with session.get(url) as response:
        return await response.text()

async def main():
    async with aiohttp.ClientSession() as session:
        html = await fetch(session, 'http://www.madmalls.com')
        print(html)

loop = asyncio.get_event_loop()
loop.run_until_complete(main())

2. asyncio + aiohttp

aiohttp Client Quickstart 文档要求HTTP客户端需要首先创建aiohttp.ClientSession()异步上下文管理器,并且不能为每个请求单独创建session

  • async with aiohttp.ClientSession() as session: 会自动管理session的创建与销毁
  • async with session.get('http://httpbin.org/') as response: 会自动管理HTTP连接的建立与关闭
  • await response.read(): 异步获取响应数据,read()返回二进制数据,text()返回字符串形式的HTML文档
# 请使用Python 3.7
import os
import time
import asyncio
import aiohttp
from logger import logger

basepath = os.path.abspath(os.path.dirname(__file__))  # 当前模块文件的根目录

def setup_down_path():
    '''设置图片下载后的保存位置,所有图片放在同一个目录下'''
    down_path = os.path.join(basepath, 'downloads')
    if not os.path.isdir(down_path):
        os.mkdir(down_path)
        logger.info('Create download path {}'.format(down_path))
    return down_path

def get_links():
    '''获取所有图片的下载链接'''
    with open(os.path.join(basepath, 'flags.txt')) as f:  # 图片名都保存在这个文件中,每行一个图片名
        return ['http://192.168.40.121/flags/' + flag.strip() for flag in f.readlines()]

async def download_one(session, image):
    logger.info('Downloading No.{} [{}]'.format(image['linkno'], image['link']))
    t0 = time.time()

    async with session.get(image['link']) as response:
        image_content = await response.read()  # Binary Response Content: access the response body as bytes, for non-text requests

    filename = os.path.split(image['link'])[1]
    with open(os.path.join(image['path'], filename), 'wb') as f:
        f.write(image_content)

    t1 = time.time()
    logger.info('Task No.{} [{}] runs {:.2f} seconds.'.format(image['linkno'], image['link'], t1 - t0))

async def download_many():
    down_path = setup_down_path()
    links = get_links()

    tasks = []  # 保存所有任务的列表
    async with aiohttp.ClientSession() as session:  # aiohttp建议整个应用只创建一个session,不能为每个请求创建一个seesion
        for linkno, link in enumerate(links, 1):
            image = {
                'path': down_path,
                'linkno': linkno,  # 图片序号,方便日志输出时,正在下载哪一张
                'link': link
            }
            task = asyncio.create_task(download_one(session, image))  # asyncio.create_task()是Python 3.7新加的,否则使用asyncio.ensure_future()
            tasks.append(task)
        results = await asyncio.gather(*tasks)
        return len(results)

if __name__ == '__main__':
    t0 = time.time()
    count = asyncio.run(download_many())
    msg = '{} flags downloaded in {:.2f} seconds.'
    logger.info(msg.format(count, time.time() - t0))

测试结果平均为0.89秒,比多线程版本还要快!

3. 使用uvloop

http://www.madmalls.com/blog/post/asyncio-howto-in-python3/#23 说明了uvloop替换asyncio中的默认事件循环策略后,性能还能进一步提升:

3.1 安装

# pip install uvloop

3.2 使用uvloop.EventLoopPolicy()

增加两行:

import uvloop
asyncio.set_event_loop_policy(uvloop.EventLoopPolicy())

4. aiofiles

章节2中的示例,f.write(image_content)是本地磁盘I/O,它是阻塞型操作,当获取网络图片数据后,保存到磁盘的这个过程会短暂的阻塞整个线程。如果你获取的是多个高清视频文件,这个过程就会阻塞很长时间了,而asyncio只支持网络异步I/O,要实现这个需求,我们需要用到 aiofiles

import os
import time
import asyncio
import uvloop
import aiohttp
import aiofiles
from logger import logger

basepath = os.path.abspath(os.path.dirname(__file__))  # 当前模块文件的根目录

def setup_down_path():
    '''设置图片下载后的保存位置,所有图片放在同一个目录下'''
    down_path = os.path.join(basepath, 'downloads')
    if not os.path.isdir(down_path):
        os.mkdir(down_path)
        logger.info('Create download path {}'.format(down_path))
    return down_path

async def get_links():
    '''获取所有图片的下载链接'''
    async with aiofiles.open(os.path.join(basepath, 'flags.txt')) as f:  # 图片名都保存在这个文件中,每行一个图片名
        flags = await f.readlines()
        return ['http://192.168.40.121/flags/' + flag.strip() for flag in flags]

async def download_one(session, image):
    logger.info('Downloading No.{} [{}]'.format(image['linkno'], image['link']))
    t0 = time.time()

    async with session.get(image['link']) as response:
        image_content = await response.read()  # Binary Response Content: access the response body as bytes, for non-text requests

    filename = os.path.split(image['link'])[1]
    async with aiofiles.open(os.path.join(image['path'], filename), 'wb') as f:
        await f.write(image_content)

    t1 = time.time()
    logger.info('Task No.{} [{}] runs {:.2f} seconds.'.format(image['linkno'], image['link'], t1 - t0))

async def download_many():
    down_path = setup_down_path()
    links = await get_links()

    tasks = []  # 保存所有任务的列表
    async with aiohttp.ClientSession() as session:  # aiohttp建议整个应用只创建一个session,不能为每个请求创建一个seesion
        for linkno, link in enumerate(links, 1):
            image = {
                'path': down_path,
                'linkno': linkno,  # 图片序号,方便日志输出时,正在下载哪一张
                'link': link
            }
            task = asyncio.create_task(download_one(session, image))  # asyncio.create_task()是Python 3.7新加的,否则使用asyncio.ensure_future()
            tasks.append(task)
        results = await asyncio.gather(*tasks)
        return len(results)

if __name__ == '__main__':
    t0 = time.time()
    asyncio.set_event_loop_policy(uvloop.EventLoopPolicy())
    count = asyncio.run(download_many())
    msg = '{} flags downloaded in {:.2f} seconds.'
    logger.info(msg.format(count, time.time() - t0))

5. progressbar2显示进度条

progressbar2 可以显示下载的进度

# pip install progressbar2

要显示进度条,需要指定总下载数,并且每个下载完成时,实时更新进度条,所以要使用asyncio.as_completed()函数:

import asyncio
import logging
import os
import sys
import time
import aiohttp
import aiofiles
import progressbar


# 当前模块文件的根目录
basepath = os.path.abspath(os.path.dirname(__file__))

# 记录日志
logger = logging.getLogger('spider')  # 创建logger实例
logger.setLevel(logging.CRITICAL)  # 保持控制台清爽,只输出总信息和进度条
formatter = logging.Formatter('%(asctime)s - %(levelname)s: %(message)s')  # 控制台日志和日志文件使用同一个Formatter
log_path = os.path.join(basepath, 'logs')  # 日志文件所在目录
if not os.path.isdir(log_path):
    os.mkdir(log_path)
filename = time.strftime('%Y-%m-%d-%H-%M-%S', time.localtime(time.time())) + '.log'  # 日志文件名,以当前时间命名
file_handler = logging.FileHandler(os.path.join(log_path, filename), encoding='utf-8')  # 创建日志文件handler
file_handler.setFormatter(formatter)  # 设置Formatter
file_handler.setLevel(logging.DEBUG)  # 单独设置日志文件的日志级别,注释掉则使用总日志级别
stream_handler = logging.StreamHandler()  # 控制台日志StreamHandler
stream_handler.setFormatter(formatter)
logger.addHandler(file_handler)  # 将handler添加到logger中
logger.addHandler(stream_handler)


def setup_down_path():
    '''设置图片下载后的保存位置,所有图片放在同一个目录下'''
    down_path = os.path.join(basepath, 'downloads')
    if not os.path.isdir(down_path):
        os.mkdir(down_path)
        logger.critical('Create download path {}'.format(down_path))
    return down_path


async def get_links():
    '''获取所有图片的下载链接'''
    async with aiofiles.open(os.path.join(basepath, 'flags.txt')) as f:  # 图片名都保存在这个文件中,每行一个图片名
        flags = await f.readlines()
        return ['http://192.168.40.121/flags/' + flag.strip() for flag in flags]


async def download_one(semaphore, session, image):
    logger.debug('Downloading No.{} [{}]'.format(image['linkno'], image['link']))
    t0 = time.time()

    try:
        async with semaphore:
            async with session.get(image['link']) as response:
                image_content = await response.read()  # Binary Response Content: access the response body as bytes, for non-text requests
    except Exception as e:
        logger.error('Exception {} raised on No.{} [{}]'.format(e.__class__, image['linkno'], image['link']))
        return False  # 用于告知 download_one() 的调用方,请求此图片URL时失败了

    filename = os.path.split(image['link'])[1]
    async with aiofiles.open(os.path.join(image['path'], filename), 'wb') as f:
        await f.write(image_content)

    t1 = time.time()
    logger.debug('Task No.{} [{}] runs {:.2f} seconds.'.format(image['linkno'], image['link'], t1 - t0))

    return True  # 用于告知 download_one() 的调用方,成功请求此图片URL


async def download_many():
    down_path = setup_down_path()
    links = await get_links()
    # 用于限制并发请求数量
    sem = asyncio.Semaphore(min(1000, len(links)))

    async with aiohttp.ClientSession() as session:  # aiohttp建议整个应用只创建一个session,不能为每个请求创建一个seesion
        successful_images = 0  # 请求成功的图片数
        failed_images = 0  # 请求失败的图片数

        if len(sys.argv) > 1 and sys.argv[1] == '-v':  # 输出详细信息
            logger.setLevel(logging.DEBUG)

            tasks = []  # 保存所有任务的列表
            for linkno, link in enumerate(links, 1):
                image = {
                    'path': down_path,
                    'linkno': linkno,  # 图片序号,方便日志输出时,正在下载哪一张
                    'link': link
                }
                task = asyncio.create_task(download_one(sem, session, image))  # asyncio.create_task()是Python 3.7新加的,否则使用asyncio.ensure_future()
                tasks.append(task)
            results = await asyncio.gather(*tasks)

            for result in results:
                if result:
                    successful_images += 1
                else:
                    failed_images += 1
        else:  # 输出进度条
            to_do = []
            for linkno, link in enumerate(links, 1):
                image = {
                    'path': down_path,
                    'linkno': linkno,  # 图片序号,方便日志输出时,正在下载哪一张
                    'link': link
                }
                to_do.append(download_one(sem, session, image))

            to_do_iter = asyncio.as_completed(to_do)

            with progressbar.ProgressBar(max_value=len(to_do)) as bar:
                for i, future in enumerate(to_do_iter):
                    result = await future
                    if result:
                        successful_images += 1
                    else:
                        failed_images += 1
                    bar.update(i)

        logger.critical('Successful [{}] images, failed [{}] images'.format(successful_images, failed_images))


if __name__ == '__main__':
    t0 = time.time()
    if sys.platform != 'win32':
        import uvloop
        asyncio.set_event_loop_policy(uvloop.EventLoopPolicy())
    loop = asyncio.get_event_loop()
    loop.run_until_complete(download_many())
    loop.close()
    logger.critical('Total Cost {:.2f} seconds'.format(time.time() - t0))

控制台只输出进度条信息,每次下载请求的日志信息全部输出到日志文件中。不加参数执行脚本时,只显示进度条; 加参数-v时,将控制台将显示详细日志信息:

[root@CentOS python3-concurrency]# python3 asynchronous.py 
100% (194 of 194) |#############################################################################################################| Elapsed Time: 0:00:00 Time:  0:00:00
2018-08-25 15:52:00,383 - CRITICAL: Successful [194] images, failed [0] images
2018-08-25 15:52:00,388 - CRITICAL: Total Cost 0.92 seconds

[root@CentOS python3-concurrency]# python3 asynchronous.py -v
2018-08-25 15:52:39,778 - DEBUG: Downloading No.1 [http://192.168.40.121/flags/ad.gif]
2018-08-25 15:52:39,779 - DEBUG: Downloading No.2 [http://192.168.40.121/flags/ae.gif]
2018-08-25 15:52:39,779 - DEBUG: Downloading No.3 [http://192.168.40.121/flags/af.gif]
...
2018-08-25 15:52:40,171 - DEBUG: Task No.1 [http://192.168.40.121/flags/ad.gif] runs 0.39 seconds.
2018-08-25 15:52:40,189 - DEBUG: Task No.2 [http://192.168.40.121/flags/ae.gif] runs 0.41 seconds.
2018-08-25 15:52:40,190 - DEBUG: Task No.6 [http://192.168.40.121/flags/am.gif] runs 0.41 seconds.
...
2018-08-25 15:52:40,473 - CRITICAL: Successful [194] images, failed [0] images
2018-08-25 15:52:40,478 - CRITICAL: Total Cost 0.71 seconds

代码已上传到 https://github.com/wangy8961/python3-concurrency ,欢迎star

未经允许不得转载: LIFE & SHARE - 王颜公子 » Python3爬虫系列09 (实验) - 使用asyncio+aiohttp并发下载

分享

作者

作者头像

Madman

如果博文内容有误或其它任何问题,欢迎留言评论,我会尽快回复; 或者通过QQ、微信等联系我

0 条评论

暂时还没有评论.

发表评论前请先登录