Python 的下刷流量工具代码
import os
import requests
from concurrent.futures import ThreadPoolExecutor, as_completed
import time
import math
import threading
from queue import Queue
class TurboDownloader:
def __init__(self, url, max_workers=8, output_dir="downloads", keep_files=3, max_retries=3):
"""
初始化极速下载器
:param url: 要下载的视频URL
:param max_workers: 最大线程数(建议4-8个)
:param output_dir: 下载文件保存目录
:param keep_files: 保留的最新文件数量
:param max_retries: 最大重试次数
"""
self.url = url.strip() # 移除URL首尾空格
self.max_workers = min(max_workers, 16) # 限制最大线程数
self.output_dir = output_dir
self.keep_files = keep_files
self.max_retries = max_retries
self.stop_flag = False
self.download_count = 0
self.total_downloaded = 0
self.lock = threading.Lock() # 线程锁[1](@ref)
self.error_count = 0
os.makedirs(self.output_dir, exist_ok=True)
self.filename = self.url.split('/')[-1].split('?')[0] or "downloaded_video.mp4"
def download_chunk(self, start, end, chunk_id, retry_count=0):
"""
下载文件的一个分块
:param start: 起始字节位置
:param end: 结束字节位置
:param chunk_id: 分块ID
:param retry_count: 当前重试次数
:return: 下载的字节数
"""
headers = {
'User-Agent': 'Mozilla/5.0',
'Range': f'bytes={start}-{end}'
}
try:
response = requests.get(self.url, headers=headers, stream=True, timeout=15)
response.raise_for_status()
return response.content
except requests.exceptions.RequestException as e:
if retry_count < self.max_retries:
time.sleep(1) # 等待1秒后重试
return self.download_chunk(start, end, chunk_id, retry_count + 1)
print(f"分块 {chunk_id} 下载失败: {e}")
return None
def check_url_valid(self):
"""检查URL有效性"""
try:
response = requests.head(self.url, headers={'User-Agent': 'Mozilla/5.0'}, timeout=10)
if response.status_code == 404:
print(f"错误: 文件不存在(404) - {self.url}")
return False
return True
except Exception as e:
print(f"URL检查失败: {e}")
return False
def turbo_download(self, iteration):
"""极速多线程下载实现"""
if not self.check_url_valid():
return
try:
# 获取文件信息
head_resp = requests.head(self.url, headers={'User-Agent': 'Mozilla/5.0'}, timeout=10)
file_size = int(head_resp.headers.get('content-length', 0))
if file_size == 0:
print("无法获取文件大小,使用优化单线程下载")
return self.stream_download(iteration)
# 动态计算分块大小(最小1MB,最大10MB)
chunk_size = min(max(file_size // self.max_workers, 1024*1024), 1024*1024*10)
chunks = math.ceil(file_size / chunk_size)
output_path = os.path.join(self.output_dir,
f"{os.path.splitext(self.filename)[0]}_{iteration}{os.path.splitext(self.filename)[1]}")
# 使用更高效的多线程下载方式
with ThreadPoolExecutor(max_workers=self.max_workers) as executor:
futures = []
for i in range(chunks):
start = i * chunk_size
end = start + chunk_size - 1 if i < chunks - 1 else file_size - 1
futures.append(executor.submit(self.download_chunk, start, end, i))
with open(output_path, 'wb') as f:
for future in as_completed(futures):
chunk_data = future.result()
if chunk_data:
f.write(chunk_data)
with self.lock: # 加锁保护共享变量[1](@ref)
self.download_count += 1
self.total_downloaded += file_size
print(f"第 {iteration} 次下载完成: {output_path}")
# 清理旧文件
self.cleanup_old_files()
except Exception as e:
with self.lock:
self.error_count += 1
print(f"第 {iteration} 次下载异常: {e}")
def stream_download(self, iteration):
"""优化的流式单线程下载"""
try:
output_path = os.path.join(self.output_dir,
f"{os.path.splitext(self.filename)[0]}_{iteration}{os.path.splitext(self.filename)[1]}")
with requests.get(self.url, stream=True, headers={'User-Agent': 'Mozilla/5.0'}, timeout=30) as response:
response.raise_for_status()
file_size = int(response.headers.get('content-length', 0))
with open(output_path, 'wb') as f:
for chunk in response.iter_content(chunk_size=1024*1024): # 1MB块大小
if chunk:
f.write(chunk)
with self.lock:
self.download_count += 1
if file_size > 0:
self.total_downloaded += file_size
print(f"第 {iteration} 次流式下载完成: {output_path}")
# 清理旧文件
self.cleanup_old_files()
except Exception as e:
with self.lock:
self.error_count += 1
print(f"第 {iteration} 次流式下载失败: {e}")
def cleanup_old_files(self):
"""自动清理旧文件,保持目录整洁"""
try:
base_name = os.path.splitext(self.filename)
files = [f for f in os.listdir(self.output_dir)
if f.startswith(base_name) and f.endswith(os.path.splitext(self.filename)[1])]
if len(files) > self.keep_files:
# 按修改时间排序
files.sort(key=lambda x: os.path.getmtime(os.path.join(self.output_dir, x)))
for old_file in files[:-self.keep_files]:
os.remove(os.path.join(self.output_dir, old_file))
print(f"已清理旧文件: {old_file}")
except Exception as e:
print(f"清理旧文件时出错: {e}")
def start(self):
"""启动极速循环下载"""
print(f"启动极速循环下载: {self.url}")
print(f"线程数: {self.max_workers}")
print(f"保存目录: {self.output_dir}")
print(f"保留文件数: {self.keep_files}")
print("按 Ctrl+C 停止下载")
iteration = 1
try:
while not self.stop_flag:
start_time = time.time()
self.turbo_download(iteration)
elapsed = time.time() - start_time
# 动态调整间隔,避免过快请求导致服务器限制
wait_time = max(1.0 - elapsed, 0.1)
time.sleep(wait_time)
iteration += 1
except KeyboardInterrupt:
self.stop_flag = True
print("\n下载已停止")
print("\n下载统计:")
print(f"总下载次数: {self.download_count}")
print(f"失败次数: {self.error_count}")
if self.total_downloaded > 0:
print(f"总下载数据量: {self.total_downloaded / (1024 * 1024):.2f} MB")
if __name__ == "__main__":
# 要下载的视频URL
video_url = "https://obj.muyoung.com/video/Zero-Two-Night-Rays-4K.mp4"
# 创建并启动下载器
downloader = TurboDownloader(
url=video_url,
max_workers=8, # 推荐4-8个线程
output_dir="turbo_downloads",
keep_files=3 # 只保留最新的3个文件
)
downloader.start()URL 验证与错误处理:
添加了
check_url_valid()方法检查 URL 有效性对 404 错误进行专门处理
增加了错误计数器统计失败次数
线程安全改进:
使用
threading.Lock()保护共享变量限制最大线程数防止资源耗尽
下载可靠性增强:
实现分块下载重试机制
动态调整分块大小优化下载速度
添加流式下载作为备用方案
资源管理优化:
改进旧文件清理逻辑
添加下载间隔控制防止服务器限制
统计与监控:
增加下载统计信息
显示失败次数和总下载量
使用建议
如果持续遇到 404 错误,请检查:
URL 是否正确
资源是否仍然可用
是否有访问权限限制
性能调优建议:
根据网络状况调整
max_workers(4-16 之间)对于大文件可适当增加
chunk_size根据服务器响应调整下载间隔
错误排查:
检查网络连接
验证目标服务器可用性
检查防火墙或代理设置
Python 的下刷流量工具代码
https://uniomo.com/archives/wei-ming-ming-wen-zhang-xdXsMp3b