一个函数实现 Python 超大文件的流式下载与断点续传 在做项目的时候有了超大文件连续高速下载这个需求,然后上网查了不少资料。最后总结并在项目中不断迭代,实现了这个函数。
函数接口非常简单,性能却出人意料的好,因此这个函数无论是用在项目里,还是平时写写脚本下点大东西都非常好用。
这个函数主要的亮点是在于信号量 signal
的使用。requests
库的 timeout
参数实际上是针对请求阶段的超时进行处理 ,不能通过设置 timeout
限制整个请求下载过程的时间 。而我们在下载大文件时,往往有这样的需求:当一个大文件下载时间过长(例如超过 10 分钟),我们更倾向于放弃此次下载,转而重新开始一次新的下载。因此,我们需要一个能够限制整个下载过程的时间的方法。
为此,我采用了 Python 的 signal
模块,通过设置信号量 signal.alarm()
来限制整个下载过程的时间。当下载时间超过设定值时,会抛出 TimeoutException
异常,从而中断下载。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 def _timeout (signum, frame ): """ Raise an TimeoutException. This is intended for use as a signal handler. The signum and frame arguments passed to this are ignored. """ pass def getUrlContent (url, file_path, max_try=10 , download_max_time_sec=600 ): ''' 下载文件,支持断点续传和流式下载 Parameters: url - 下载地址 file_path - 文件存储路径 max_try - 最大重试次数 download_max_time_sec - 下载最大时间,超过此时间则放弃下载 Returns: int - 1为正常退出,0为超过最大重试次数 ''' repeat_time = 0 frist_repeat_time = 6 total_size = 0 while frist_repeat_time > 0 and total_size == 0 : try : with requests.get(url, stream=True ) as r1: if not r1: logger.error(f'Get file length error: {url} ' ) return 0 total_size = int (r1.headers['Content-Length' ]) if total_size == 0 : raise Exception('File length is 0' ) except Exception as e: logger.error(f'Request file length error: {e} , retrying remaining {frist_repeat_time} times' ) frist_repeat_time -= 1 gevent.sleep(5 ) logger.info(f'开始下载文件 {url} ,文件大小: {total_size} bytes' ) temp_size = 0 if os.path.exists(file_path): subprocess.Popen(['rm' , '-rf' , file_path]) signal.signal(signal.SIGALRM, _timeout) signal.alarm(download_max_time_sec) try : while repeat_time < max_try: if repeat_time > 0 : temp_size = os.path.getsize(file_path) if os.path.exists(file_path) else 0 if temp_size >= total_size: return 1 repeat_time += 1 logger.info("第[%d]次下载文件 %s ,已经下载数据大小 (bytes): [%d],应下载数据大小 (bytes): [%d]" % (repeat_time, url, temp_size, total_size)) headers = {"Range" : f"bytes={temp_size} -{total_size} " } try : with requests.get(url, stream=True , headers=headers, timeout=(12 , 60 )) as r: with open (file_path, "ab" ) as f: if repeat_time != 1 : f.seek(temp_size) for chunk in r.iter_content(chunk_size=1024 * 64 ): if chunk: temp_size += len (chunk) f.write(chunk) f.flush() except requests.exceptions.Timeout as e: logger.warn('Timeout' ) except requests.exceptions.ChunkedEncodingError as e: logger.warn('Chunked_encoding_error' ) except requests.exceptions.ConnectionError as e: logger.warn('Connection_error' ) except Exception as e: logger.error("流式传输失败,未知错误:%s" % e) except TimeoutException as e: logger.error(f"下载时间超过设定值 {download_max_time_sec} 秒,下载失败: {e} " ) except Exception as e: logger.error(f"下载失败,未知错误: {e} " ) finally : signal.alarm(0 ) return 0