一个函数实现 Python 超大文件的流式下载与断点续传

在做项目的时候有了超大文件连续高速下载这个需求,然后上网查了不少资料。最后总结并在项目中不断迭代,实现了这个函数。

函数接口非常简单,性能却出人意料的好,因此这个函数无论是用在项目里,还是平时写写脚本下点大东西都非常好用。

这个函数主要的亮点是在于信号量 signal 的使用。requests 库的 timeout 参数实际上是针对请求阶段的超时进行处理不能通过设置 timeout 限制整个请求下载过程的时间 。而我们在下载大文件时,往往有这样的需求:当一个大文件下载时间过长(例如超过 10 分钟),我们更倾向于放弃此次下载,转而重新开始一次新的下载。因此,我们需要一个能够限制整个下载过程的时间的方法。

为此,我采用了 Python 的 signal 模块,通过设置信号量 signal.alarm() 来限制整个下载过程的时间。当下载时间超过设定值时,会抛出 TimeoutException 异常,从而中断下载。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
def _timeout(signum, frame):
""" Raise an TimeoutException.

This is intended for use as a signal handler.
The signum and frame arguments passed to this are ignored.

"""
# # Raise TimeoutException with system default timeout message
# raise TimeoutException()

# 可以像上面一样抛出异常,也可以在此自定义放弃下载后的行为
pass

def getUrlContent(url, file_path, max_try=10, download_max_time_sec=600):
'''
下载文件,支持断点续传和流式下载

Parameters:
url - 下载地址

file_path - 文件存储路径

max_try - 最大重试次数

download_max_time_sec - 下载最大时间,超过此时间则放弃下载

Returns:
int - 1为正常退出,0为超过最大重试次数
'''
# 重试计数
repeat_time = 0
frist_repeat_time = 6
# 第一次请求是为了得到文件总大小
total_size = 0

while frist_repeat_time > 0 and total_size == 0:
try:
with requests.get(url, stream=True) as r1:
if not r1:
logger.error(f'Get file length error: {url}')
return 0

total_size = int(r1.headers['Content-Length'])

if total_size == 0:
raise Exception('File length is 0')

except Exception as e:
logger.error(f'Request file length error: {e}, retrying remaining {frist_repeat_time} times')
frist_repeat_time -= 1
gevent.sleep(5)

logger.info(f'开始下载文件 {url} ,文件大小: {total_size} bytes')

temp_size = 0
# 如果文件已经存在,我此处的策略是删除后重新下载
# 可以根据实际需求进行修改
if os.path.exists(file_path):
subprocess.Popen(['rm', '-rf', file_path])

# 通过信号量控制整个下载的用时(而非请求超时)
signal.signal(signal.SIGALRM, _timeout)
signal.alarm(download_max_time_sec) # 限制最多下载 download_max_time_sec 秒
# 开始下载
try:
while repeat_time < max_try:
if repeat_time > 0:
temp_size = os.path.getsize(file_path) if os.path.exists(file_path) else 0
# 文件大小一致,跳出循环s
if temp_size >= total_size:
return 1
repeat_time += 1
logger.info("第[%d]次下载文件 %s ,已经下载数据大小 (bytes): [%d],应下载数据大小 (bytes): [%d]" %
(repeat_time, url, temp_size, total_size))
# 重新请求网址,加入新的请求头的
# 核心部分,这个是请求下载时,从本地文件已经下载过的后面下载
headers = {"Range": f"bytes={temp_size}-{total_size}"}

try:
with requests.get(url, stream=True, headers=headers, timeout=(12, 60)) as r:
# "ab"表示追加形式写入文件
with open(file_path, "ab") as f:
if repeat_time != 1:
f.seek(temp_size)
for chunk in r.iter_content(chunk_size=1024 * 64):
if chunk:
temp_size += len(chunk)
f.write(chunk)
f.flush()
except requests.exceptions.Timeout as e:
logger.warn('Timeout')
except requests.exceptions.ChunkedEncodingError as e:
logger.warn('Chunked_encoding_error')
except requests.exceptions.ConnectionError as e:
logger.warn('Connection_error')
except Exception as e:
logger.error("流式传输失败,未知错误:%s" % e)
except TimeoutException as e:
logger.error(f"下载时间超过设定值 {download_max_time_sec} 秒,下载失败: {e}")
except Exception as e:
logger.error(f"下载失败,未知错误: {e}")
finally:
# 取消信号量
signal.alarm(0)
return 0