Python 3 爬虫|第12章:并发下载大文件 支持断点续传

  • 原创
  • Madman
  • /
  • /
  • 6
  • 1581 次阅读

Python 3 爬虫-min.png

Synopsis: 本文介绍如何流式下载大文件,并实现断点续传功能。根据顺序下载大文件的字节和乱序下载大文件的各个分段,分别进行了多线程或协程并发,还实现了多个大文件并发下载

1. stream 流式下载大文件

1.1 stream=True 和 iter_content()

https://2.python-requests.org/en/master/user/quickstart/#raw-response-content

我们将继续使用 Python 3 爬虫|第3章:同步阻塞下载 所搭建的测试环境,在 Nginx 默认的网站根目录 /usr/share/nginx/html 下存在如下文件:

# pwd
/usr/share/nginx/html

# ls -hl
total 5.6G
-rw-r--r-- 1 root root  494 Aug 13 23:04 50x.html
-rw-r--r-- 1 root root 4.1G Aug 20 15:08 CentOS-7-x86_64-DVD-1611.iso
-rw-r--r-- 1 root root 680M Aug 20 15:03 CentOS-7-x86_64-Minimal-1611.iso
-rw-r--r-- 1 root root  612 Aug 13 23:04 index.html
-rw-r--r-- 1 root root  17M Aug 20 09:20 Python-3.7.4.tar.xz
-rw-r--r-- 1 root root 834M Aug 23 13:45 ubuntu-18.04.2-live-server-amd64.iso

假设我们的客户端只有 2 GB 内存,现在要下载 4.1 GB 的 ISO 文件,如果使用下面的代码:

import requests

with requests.get(url) as r:
    r.raise_for_status()
    with open(filename, 'wb') as fp:
        fp.write(r.content)

根据 Python 3 爬虫|第1章:I/O Models 阻塞/非阻塞 同步/异步 的说明,客户端发起 HTTP GET 请求后,需要等待 ISO 完整的 4.1 GB 大小的内容全部到达 内核空间 并被内核拷贝到 用户空间 后,用户的下载进程才能解除阻塞状态,并继续将文件内容保存到本地文件中。由于客户端的内存不足以缓存此 ISO 的全部内容,所以在下载的过程中,用户的下载进程会因为 OOM(out of memory) 被内核 killed

# python download-large-files.py http://192.168.40.121/CentOS-7-x86_64-DVD-1611.iso
Killed

如果设置 requests.request(method, url, **kwargs) 中的 stream 参数为 True 的话,客户端不会立即下载文件的内容,但客户端与服务端会持续建立 HTTP 连接。另外,我们再通过 requests.Response.iter_content(chunk_size=1, decode_unicode=False) 指定每次下载的数据块的大小,就可以实现 分块下载,即每次有 chunk 大小的数据块到达客户端的内核空间,然后被复制到用户空间后,下载的进程就会解除阻塞状态,并将此 chunk 大小的内容保存到本地文件中了:

import requests

with requests.get(url, stream=True) as r:
    r.raise_for_status()
    with open(filename, 'wb') as fp:
        for chunk in r.iter_content(chunk_size=512):
            if chunk:
                fp.write(chunk)

1.2 tqdm 显示下载的进度

前面的文章中我用的是 Progress Bar 2,比如要下载 1000 个文件,当前已下载 90 个,可以显示整体进度条。但是,我们如果要下载单个大文件,想显示下载速度的话,建议使用 tqdm

from tqdm import tqdm

with tqdm(total=file_size, unit='B', unit_scale=True, unit_divisor=1024, ascii=True, desc=filename) as bar:  # 打印下载时的进度条,实时显示下载速度
    with requests.get(url, stream=True) as r:
        r.raise_for_status()
        with open(filename, 'wb') as fp:
            for chunk in r.iter_content(chunk_size=512):
                if chunk:
                    fp.write(chunk)
                    bar.update(len(chunk))  # 实时更新已完成的数据量

说明:

  • total=file_size: 整个文件的大小,单位是 bytes
  • unit='B': 默认是 unit='it' 按 bit 来计算的,所以我们需要改成按 Byte 来计算
  • unit_scale=True: 会自动扩展单位,如果是 False,速度显示为 80625485.97B/s,如果是 True,速度显示为 62.6MB/s
  • unit_divisor=1024: 由于默认是按 1000 来除,所以计算出来的文件大小和速度不对,应该按 1024 来除
  • ascii=True: 进度条默认使用 unicode 字符 Windows 系统默认使用 cp936 编码,不会在同一行动态显示进度条,而是分多行显示,而 Linux 和 Mac 系统使用 utf-8 编码,所以一切正常。如果指定 ascii=True 则会用 123456789# 来填充进度条,此时大家都能正常地在同一行显示了
  • desc=filename: 在进度条前面显示当前下载的文件名

1.3 click 添加命令行选项与参数

Click 是一个 Python 包,只需要很少的代码就能够快速创建漂亮的命令行选项与参数

@click.command()
@click.option('--dest_filename', type=click.Path(), help="Name of the local destination file with extension")
@click.option('--multipart_chunksize', default=8*1024*1024, help="Size of chunk, unit is bytes")
@click.argument('url', type=click.Path())
def crawl(dest_filename, multipart_chunksize, url):
    pass

我们指定一个可选的命令行选项 --dest_filename,和必填的命令行参数 url,可以使用 --help 查看使用说明:

# python 1-spider.py --help
Usage: 1-spider.py [OPTIONS] URL

Options:
  --dest_filename PATH           Name of the local destination file with
                                 extension
  --multipart_chunksize INTEGER  Size of chunk, unit is bytes
  --help                         Show this message and exit.

1.4 测试

我们准备一个 Python 3 的虚拟环境:

# python -m venv venv

激活虚拟环境并安装一些相关的包:

# source venv/bin/activate
(venv) # pip install requests tqdm click

创建自定义的日志模块 logger.py

import os
import time
import logging


###
# 1. 创建logger实例,如果参数为空则返回 root logger
###

logger = logging.getLogger('spider')
# 设置总日志级别, 也可以给不同的handler设置不同的日志级别
logger.setLevel(logging.DEBUG)

###
# 2. 创建Handler, 输出日志到控制台和文件
###

# 控制台日志和日志文件使用同一个Formatter
formatter = logging.Formatter('%(asctime)s - %(levelname)s: %(message)s')

# 日志文件FileHandler
basedir = os.path.abspath(os.path.dirname(__file__))
log_dest = os.path.join(basedir, 'logs')  # 日志文件所在目录
if not os.path.isdir(log_dest):
    os.mkdir(log_dest)
filename = time.strftime('%Y-%m-%d-%H-%M-%S', time.localtime(time.time())) + '.log'  # 日志文件名,以当前时间命名
file_handler = logging.FileHandler(os.path.join(log_dest, filename), encoding='utf-8')  # 创建日志文件handler
file_handler.setFormatter(formatter)  # 设置Formatter
# file_handler.setLevel(logging.INFO)  # 单独设置日志文件的日志级别,注释掉则使用总日志级别

# 控制台日志StreamHandler
stream_handler = logging.StreamHandler()
stream_handler.setFormatter(formatter)
stream_handler.setLevel(logging.INFO)

###
# 3. 将handler添加到logger中
###

logger.addHandler(file_handler)
logger.addHandler(stream_handler)

创建 custom_request.py 文件,可以捕获 requests.request() 方法的异常,比如连接超时、被拒绝等:

import requests
from logger import logger


def custom_request(method, url, info='common url', *args, **kwargs):
    '''捕获 requests.request() 方法的异常,比如连接超时、被拒绝等
    如果请求成功,则返回响应体;如果请求失败,则返回 None,所以在调用 custom_request() 函数时需要先判断返回值
    '''
    s = requests.session()
    s.keep_alive = False

    try:
        resp = requests.request(method, url, *args, **kwargs)
        resp.raise_for_status()
    except requests.exceptions.HTTPError as errh:
        # In the event of the rare invalid HTTP response, Requests will raise an HTTPError exception (e.g. 401 Unauthorized)
        logger.error('Unsuccessfully get {} [{}], HTTP Error: {}'.format(info, url, errh))
        pass
    except requests.exceptions.ConnectionError as errc:
        # In the event of a network problem (e.g. DNS failure, refused connection, etc)
        logger.error('Unsuccessfully get {} [{}], Connecting Error: {}'.format(info, url, errc))
        pass
    except requests.exceptions.Timeout as errt:
        # If a request times out, a Timeout exception is raised. Maybe set up for a retry, or continue in a retry loop
        logger.error('Unsuccessfully get {} [{}], Timeout Error: {}'.format(info, url, errt))
        pass
    except requests.exceptions.TooManyRedirects as errr:
        # If a request exceeds the configured number of maximum redirections, a TooManyRedirects exception is raised. Tell the user their URL was bad and try a different one
        logger.error('Unsuccessfully get {} [{}], Redirect Error: {}'.format(info, url, errr))
        pass
    except requests.exceptions.RequestException as err:
        # catastrophic error. bail.
        logger.error('Unsuccessfully get {} [{}], Else Error: {}'.format(info, url, err))
        pass
    except Exception as err:
        logger.error('Unsuccessfully get {} [{}], Exception: {}'.format(info, url, err.__class__))
        pass
    else:
        return resp

然后,我们再新建 1-spider.py

import click
import os
import time
from tqdm import tqdm
from custom_request import custom_request
from logger import logger


@click.command()
@click.option('--dest_filename', type=click.Path(), help="Name of the local destination file with extension")
@click.option('--multipart_chunksize', default=8*1024*1024, help="Size of chunk, unit is bytes")
@click.argument('url', type=click.Path())
def crawl(dest_filename, multipart_chunksize, url):
    t0 = time.time()

    # 如果没有指定本地保存时的文件名,则默认使用 URL 中最后一部分作为文件名
    official_filename = dest_filename if dest_filename else url.split('/')[-1]  # 正式文件名
    temp_filename = official_filename + '.swp'  # 没下载完成时,临时文件名

    # 获取文件的大小
    r = custom_request('HEAD', url, info='header message')
    if not r:  # 请求失败时,r 为 None
        logger.error('Failed to get header message on URL [{}]'.format(url))
        return
    file_size = int(r.headers['Content-Length'])
    logger.info('File size: {} bytes'.format(file_size))

    # 如果正式文件存在
    if os.path.exists(official_filename):
        if os.path.getsize(official_filename) == file_size:  # 且大小与待下载的目标文件大小一致时
            logger.warning('The file [{}] has already been downloaded'.format(official_filename))
            return
        else:  # 大小不一致时,提醒用户要保存的文件名已存在,需要手动处理,不能随便覆盖
            logger.warning('The filename [{}] has already exist, but it does not match the remote file'.format(official_filename))
            return

    # 分块下载,即使文件非常大,也不会撑爆内存
    with tqdm(total=file_size, unit='B', unit_scale=True, unit_divisor=1024, desc=official_filename) as bar:  # 打印下载时的进度条,并动态显示下载速度
        r = custom_request('GET', url, info='all content', stream=True)
        if not r:  # 请求失败时,r 为 None
            logger.error('Failed to get all content on URL [{}]'.format(url))
            return

        with open(temp_filename, 'wb') as fp:
            for chunk in r.iter_content(chunk_size=multipart_chunksize):
                if chunk:
                    fp.write(chunk)
                    bar.update(len(chunk))

    # 整个文件内容被成功下载后,将临时文件名修改回正式文件名
    if os.path.getsize(temp_filename) == file_size:  # 以防网络故障
        os.rename(temp_filename, official_filename)
        logger.info('{} downloaded'.format(official_filename))
        logger.info('Cost {:.2f} seconds'.format(time.time() - t0))
    else:
        logger.error('Failed to download {}'.format(official_filename))


if __name__ == '__main__':
    crawl()

执行如下测试命令:

# python 1-spider.py --dest_filename='/tmp/Python-3.7.4.tar.xz' --multipart_chunksize=1024 http://192.168.40.121/Python-3.7.4.tar.xz

未经允许不得转载: LIFE & SHARE - 王颜公子 » Python 3 爬虫|第12章:并发下载大文件 支持断点续传

分享

作者

作者头像

Madman

如需 Linux / Python 相关问题付费解答,请按如下方式联系我

6 条评论

TianYing001
TianYing001

124555

TianYing001
TianYing001 TianYing001

21515

Jerry_Chueng
Jerry_Chueng

well

Jerry_Chueng
Jerry_Chueng Jerry_Chueng

你好啊

Jerry_Chueng
Jerry_Chueng Jerry_Chueng

大家好

Jerry_Chueng
Jerry_Chueng Jerry_Chueng

才是真的好

专题系列