Python 3 爬虫|第12章:并发下载大文件 支持断点续传
Synopsis: 本文介绍如何流式下载大文件,并实现断点续传功能。根据顺序下载大文件的字节和乱序下载大文件的各个分段,分别进行了多线程或协程并发,还实现了多个大文件并发下载
1. stream 流式下载大文件
1.1 stream=True 和 iter_content()
https://2.python-requests.org/en/master/user/quickstart/#raw-response-content
我们将继续使用 Python 3 爬虫|第3章:同步阻塞下载 所搭建的测试环境,在 Nginx
默认的网站根目录 /usr/share/nginx/html
下存在如下文件:
# pwd /usr/share/nginx/html # ls -hl total 5.6G -rw-r--r-- 1 root root 494 Aug 13 23:04 50x.html -rw-r--r-- 1 root root 4.1G Aug 20 15:08 CentOS-7-x86_64-DVD-1611.iso -rw-r--r-- 1 root root 680M Aug 20 15:03 CentOS-7-x86_64-Minimal-1611.iso -rw-r--r-- 1 root root 612 Aug 13 23:04 index.html -rw-r--r-- 1 root root 17M Aug 20 09:20 Python-3.7.4.tar.xz -rw-r--r-- 1 root root 834M Aug 23 13:45 ubuntu-18.04.2-live-server-amd64.iso
假设我们的客户端只有 2 GB 内存,现在要下载 4.1 GB 的 ISO 文件,如果使用下面的代码:
import requests with requests.get(url) as r: r.raise_for_status() with open(filename, 'wb') as fp: fp.write(r.content)
根据 Python 3 爬虫|第1章:I/O Models 阻塞/非阻塞 同步/异步 的说明,客户端发起 HTTP GET 请求后,需要等待 ISO 完整的 4.1 GB 大小的内容全部到达 内核空间
并被内核拷贝到 用户空间
后,用户的下载进程才能解除阻塞状态,并继续将文件内容保存到本地文件中。由于客户端的内存不足以缓存此 ISO 的全部内容,所以在下载的过程中,用户的下载进程会因为 OOM(out of memory)
被内核 killed:
如果设置 requests.request(method, url, **kwargs)
中的 stream
参数为 True
的话,客户端不会立即下载文件的内容,但客户端与服务端会持续建立 HTTP 连接。另外,我们再通过 requests.Response.iter_content(chunk_size=1, decode_unicode=False) 指定每次下载的数据块的大小,就可以实现 分块下载
,即每次有 chunk 大小的数据块到达客户端的内核空间,然后被复制到用户空间后,下载的进程就会解除阻塞状态,并将此 chunk 大小的内容保存到本地文件中了:
import requests with requests.get(url, stream=True) as r: r.raise_for_status() with open(filename, 'wb') as fp: for chunk in r.iter_content(chunk_size=512): if chunk: fp.write(chunk)
1.2 tqdm 显示下载的进度
前面的文章中我用的是 Progress Bar 2,比如要下载 1000 个文件,当前已下载 90 个,可以显示整体进度条。但是,我们如果要下载单个大文件,想显示下载速度的话,建议使用 tqdm
from tqdm import tqdm with tqdm(total=file_size, unit='B', unit_scale=True, unit_divisor=1024, ascii=True, desc=filename) as bar: # 打印下载时的进度条,实时显示下载速度 with requests.get(url, stream=True) as r: r.raise_for_status() with open(filename, 'wb') as fp: for chunk in r.iter_content(chunk_size=512): if chunk: fp.write(chunk) bar.update(len(chunk)) # 实时更新已完成的数据量
说明:
total=file_size
: 整个文件的大小,单位是 bytesunit='B'
: 默认是unit='it'
按 bit 来计算的,所以我们需要改成按 Byte 来计算unit_scale=True
: 会自动扩展单位,如果是 False,速度显示为 80625485.97B/s,如果是 True,速度显示为 62.6MB/sunit_divisor=1024
: 由于默认是按 1000 来除,所以计算出来的文件大小和速度不对,应该按 1024 来除ascii=True
: 进度条默认使用 unicode 字符▉
,Windows 系统默认使用cp936
编码,不会在同一行动态显示进度条,而是分多行显示,而 Linux 和 Mac 系统使用utf-8
编码,所以一切正常。如果指定ascii=True
则会用123456789#
来填充进度条,此时大家都能正常地在同一行显示了desc=filename
: 在进度条前面显示当前下载的文件名
1.3 click 添加命令行选项与参数
Click 是一个 Python 包,只需要很少的代码就能够快速创建漂亮的命令行选项与参数
@click.command() @click.option('--dest_filename', type=click.Path(), help="Name of the local destination file with extension") @click.option('--multipart_chunksize', default=8*1024*1024, help="Size of chunk, unit is bytes") @click.argument('url', type=click.Path()) def crawl(dest_filename, multipart_chunksize, url): pass
我们指定一个可选的命令行选项 --dest_filename
,和必填的命令行参数 url
,可以使用 --help
查看使用说明:
# python 1-spider.py --help Usage: 1-spider.py [OPTIONS] URL Options: --dest_filename PATH Name of the local destination file with extension --multipart_chunksize INTEGER Size of chunk, unit is bytes --help Show this message and exit.
1.4 测试
我们准备一个 Python 3 的虚拟环境:
激活虚拟环境并安装一些相关的包:
创建自定义的日志模块 logger.py
:
import os import time import logging ### # 1. 创建logger实例,如果参数为空则返回 root logger ### logger = logging.getLogger('spider') # 设置总日志级别, 也可以给不同的handler设置不同的日志级别 logger.setLevel(logging.DEBUG) ### # 2. 创建Handler, 输出日志到控制台和文件 ### # 控制台日志和日志文件使用同一个Formatter formatter = logging.Formatter('%(asctime)s - %(levelname)s: %(message)s') # 日志文件FileHandler basedir = os.path.abspath(os.path.dirname(__file__)) log_dest = os.path.join(basedir, 'logs') # 日志文件所在目录 if not os.path.isdir(log_dest): os.mkdir(log_dest) filename = time.strftime('%Y-%m-%d-%H-%M-%S', time.localtime(time.time())) + '.log' # 日志文件名,以当前时间命名 file_handler = logging.FileHandler(os.path.join(log_dest, filename), encoding='utf-8') # 创建日志文件handler file_handler.setFormatter(formatter) # 设置Formatter # file_handler.setLevel(logging.INFO) # 单独设置日志文件的日志级别,注释掉则使用总日志级别 # 控制台日志StreamHandler stream_handler = logging.StreamHandler() stream_handler.setFormatter(formatter) stream_handler.setLevel(logging.INFO) ### # 3. 将handler添加到logger中 ### logger.addHandler(file_handler) logger.addHandler(stream_handler)
创建 custom_request.py
文件,可以捕获 requests.request()
方法的异常,比如连接超时、被拒绝等:
import requests from logger import logger def custom_request(method, url, info='common url', *args, **kwargs): '''捕获 requests.request() 方法的异常,比如连接超时、被拒绝等 如果请求成功,则返回响应体;如果请求失败,则返回 None,所以在调用 custom_request() 函数时需要先判断返回值 ''' s = requests.session() s.keep_alive = False try: resp = requests.request(method, url, *args, **kwargs) resp.raise_for_status() except requests.exceptions.HTTPError as errh: # In the event of the rare invalid HTTP response, Requests will raise an HTTPError exception (e.g. 401 Unauthorized) logger.error('Unsuccessfully get {} [{}], HTTP Error: {}'.format(info, url, errh)) pass except requests.exceptions.ConnectionError as errc: # In the event of a network problem (e.g. DNS failure, refused connection, etc) logger.error('Unsuccessfully get {} [{}], Connecting Error: {}'.format(info, url, errc)) pass except requests.exceptions.Timeout as errt: # If a request times out, a Timeout exception is raised. Maybe set up for a retry, or continue in a retry loop logger.error('Unsuccessfully get {} [{}], Timeout Error: {}'.format(info, url, errt)) pass except requests.exceptions.TooManyRedirects as errr: # If a request exceeds the configured number of maximum redirections, a TooManyRedirects exception is raised. Tell the user their URL was bad and try a different one logger.error('Unsuccessfully get {} [{}], Redirect Error: {}'.format(info, url, errr)) pass except requests.exceptions.RequestException as err: # catastrophic error. bail. logger.error('Unsuccessfully get {} [{}], Else Error: {}'.format(info, url, err)) pass except Exception as err: logger.error('Unsuccessfully get {} [{}], Exception: {}'.format(info, url, err.__class__)) pass else: return resp
然后,我们再新建 1-spider.py
:
import click import os import time from tqdm import tqdm from custom_request import custom_request from logger import logger @click.command() @click.option('--dest_filename', type=click.Path(), help="Name of the local destination file with extension") @click.option('--multipart_chunksize', default=8*1024*1024, help="Size of chunk, unit is bytes") @click.argument('url', type=click.Path()) def crawl(dest_filename, multipart_chunksize, url): t0 = time.time() # 如果没有指定本地保存时的文件名,则默认使用 URL 中最后一部分作为文件名 official_filename = dest_filename if dest_filename else url.split('/')[-1] # 正式文件名 temp_filename = official_filename + '.swp' # 没下载完成时,临时文件名 # 获取文件的大小 r = custom_request('HEAD', url, info='header message') if not r: # 请求失败时,r 为 None logger.error('Failed to get header message on URL [{}]'.format(url)) return file_size = int(r.headers['Content-Length']) logger.info('File size: {} bytes'.format(file_size)) # 如果正式文件存在 if os.path.exists(official_filename): if os.path.getsize(official_filename) == file_size: # 且大小与待下载的目标文件大小一致时 logger.warning('The file [{}] has already been downloaded'.format(official_filename)) return else: # 大小不一致时,提醒用户要保存的文件名已存在,需要手动处理,不能随便覆盖 logger.warning('The filename [{}] has already exist, but it does not match the remote file'.format(official_filename)) return # 分块下载,即使文件非常大,也不会撑爆内存 with tqdm(total=file_size, unit='B', unit_scale=True, unit_divisor=1024, desc=official_filename) as bar: # 打印下载时的进度条,并动态显示下载速度 r = custom_request('GET', url, info='all content', stream=True) if not r: # 请求失败时,r 为 None logger.error('Failed to get all content on URL [{}]'.format(url)) return with open(temp_filename, 'wb') as fp: for chunk in r.iter_content(chunk_size=multipart_chunksize): if chunk: fp.write(chunk) bar.update(len(chunk)) # 整个文件内容被成功下载后,将临时文件名修改回正式文件名 if os.path.getsize(temp_filename) == file_size: # 以防网络故障 os.rename(temp_filename, official_filename) logger.info('{} downloaded'.format(official_filename)) logger.info('Cost {:.2f} seconds'.
9 条评论
评论者的用户名
评论时间TianYing001
2019-09-16T02:52:35Z124555
TianYing001 TianYing001
2019-09-16T02:52:47Z21515
Jerry_Chueng
2019-10-26T14:59:55Zwell
Jerry_Chueng Jerry_Chueng
2019-10-26T15:00:34Z你好啊
Jerry_Chueng Jerry_Chueng
2019-10-26T15:01:19Z大家好
Jerry_Chueng Jerry_Chueng
2019-10-26T15:01:34Z才是真的好
NEGAN
2020-08-12T05:14:27ZNEGAN
2020-08-12T05:14:55Zsfx0211
2022-01-14T04:55:55Z请问怎么支付