Python 的下刷流量工具代码

import os
import requests
from concurrent.futures import ThreadPoolExecutor, as_completed
import time
import math
import threading
from queue import Queue

class TurboDownloader:
    def __init__(self, url, max_workers=8, output_dir="downloads", keep_files=3, max_retries=3):
        """
        初始化极速下载器
        :param url: 要下载的视频URL
        :param max_workers: 最大线程数(建议4-8个)
        :param output_dir: 下载文件保存目录
        :param keep_files: 保留的最新文件数量
        :param max_retries: 最大重试次数
        """
        self.url = url.strip()  # 移除URL首尾空格
        self.max_workers = min(max_workers, 16)  # 限制最大线程数
        self.output_dir = output_dir
        self.keep_files = keep_files
        self.max_retries = max_retries
        self.stop_flag = False
        self.download_count = 0
        self.total_downloaded = 0
        self.lock = threading.Lock()  # 线程锁[1](@ref)
        self.error_count = 0
        
        os.makedirs(self.output_dir, exist_ok=True)
        self.filename = self.url.split('/')[-1].split('?')[0] or "downloaded_video.mp4"
    
    def download_chunk(self, start, end, chunk_id, retry_count=0):
        """
        下载文件的一个分块
        :param start: 起始字节位置
        :param end: 结束字节位置
        :param chunk_id: 分块ID
        :param retry_count: 当前重试次数
        :return: 下载的字节数
        """
        headers = {
            'User-Agent': 'Mozilla/5.0',
            'Range': f'bytes={start}-{end}'
        }
        
        try:
            response = requests.get(self.url, headers=headers, stream=True, timeout=15)
            response.raise_for_status()
            return response.content
        except requests.exceptions.RequestException as e:
            if retry_count < self.max_retries:
                time.sleep(1)  # 等待1秒后重试
                return self.download_chunk(start, end, chunk_id, retry_count + 1)
            print(f"分块 {chunk_id} 下载失败: {e}")
            return None
    
    def check_url_valid(self):
        """检查URL有效性"""
        try:
            response = requests.head(self.url, headers={'User-Agent': 'Mozilla/5.0'}, timeout=10)
            if response.status_code == 404:
                print(f"错误: 文件不存在(404) - {self.url}")
                return False
            return True
        except Exception as e:
            print(f"URL检查失败: {e}")
            return False
    
    def turbo_download(self, iteration):
        """极速多线程下载实现"""
        if not self.check_url_valid():
            return
            
        try:
            # 获取文件信息
            head_resp = requests.head(self.url, headers={'User-Agent': 'Mozilla/5.0'}, timeout=10)
            file_size = int(head_resp.headers.get('content-length', 0))
            
            if file_size == 0:
                print("无法获取文件大小,使用优化单线程下载")
                return self.stream_download(iteration)
            
            # 动态计算分块大小(最小1MB,最大10MB)
            chunk_size = min(max(file_size // self.max_workers, 1024*1024), 1024*1024*10)
            chunks = math.ceil(file_size / chunk_size)
            
            output_path = os.path.join(self.output_dir, 
                                     f"{os.path.splitext(self.filename)[0]}_{iteration}{os.path.splitext(self.filename)[1]}")
            
            # 使用更高效的多线程下载方式
            with ThreadPoolExecutor(max_workers=self.max_workers) as executor:
                futures = []
                for i in range(chunks):
                    start = i * chunk_size
                    end = start + chunk_size - 1 if i < chunks - 1 else file_size - 1
                    futures.append(executor.submit(self.download_chunk, start, end, i))
                
                with open(output_path, 'wb') as f:
                    for future in as_completed(futures):
                        chunk_data = future.result()
                        if chunk_data:
                            f.write(chunk_data)
            
            with self.lock:  # 加锁保护共享变量[1](@ref)
                self.download_count += 1
                self.total_downloaded += file_size
            print(f"第 {iteration} 次下载完成: {output_path}")
            
            # 清理旧文件
            self.cleanup_old_files()
            
        except Exception as e:
            with self.lock:
                self.error_count += 1
            print(f"第 {iteration} 次下载异常: {e}")
    
    def stream_download(self, iteration):
        """优化的流式单线程下载"""
        try:
            output_path = os.path.join(self.output_dir, 
                                     f"{os.path.splitext(self.filename)[0]}_{iteration}{os.path.splitext(self.filename)[1]}")
            
            with requests.get(self.url, stream=True, headers={'User-Agent': 'Mozilla/5.0'}, timeout=30) as response:
                response.raise_for_status()
                file_size = int(response.headers.get('content-length', 0))
                
                with open(output_path, 'wb') as f:
                    for chunk in response.iter_content(chunk_size=1024*1024):  # 1MB块大小
                        if chunk:
                            f.write(chunk)
            
            with self.lock:
                self.download_count += 1
                if file_size > 0:
                    self.total_downloaded += file_size
            print(f"第 {iteration} 次流式下载完成: {output_path}")
            
            # 清理旧文件
            self.cleanup_old_files()
            
        except Exception as e:
            with self.lock:
                self.error_count += 1
            print(f"第 {iteration} 次流式下载失败: {e}")
    
    def cleanup_old_files(self):
        """自动清理旧文件,保持目录整洁"""
        try:
            base_name = os.path.splitext(self.filename)
            files = [f for f in os.listdir(self.output_dir) 
                    if f.startswith(base_name) and f.endswith(os.path.splitext(self.filename)[1])]
            
            if len(files) > self.keep_files:
                # 按修改时间排序
                files.sort(key=lambda x: os.path.getmtime(os.path.join(self.output_dir, x)))
                for old_file in files[:-self.keep_files]:
                    os.remove(os.path.join(self.output_dir, old_file))
                    print(f"已清理旧文件: {old_file}")
        except Exception as e:
            print(f"清理旧文件时出错: {e}")
    
    def start(self):
        """启动极速循环下载"""
        print(f"启动极速循环下载: {self.url}")
        print(f"线程数: {self.max_workers}")
        print(f"保存目录: {self.output_dir}")
        print(f"保留文件数: {self.keep_files}")
        print("按 Ctrl+C 停止下载")
        
        iteration = 1
        try:
            while not self.stop_flag:
                start_time = time.time()
                self.turbo_download(iteration)
                elapsed = time.time() - start_time
                
                # 动态调整间隔,避免过快请求导致服务器限制
                wait_time = max(1.0 - elapsed, 0.1)
                time.sleep(wait_time)
                
                iteration += 1
        except KeyboardInterrupt:
            self.stop_flag = True
            print("\n下载已停止")
        
        print("\n下载统计:")
        print(f"总下载次数: {self.download_count}")
        print(f"失败次数: {self.error_count}")
        if self.total_downloaded > 0:
            print(f"总下载数据量: {self.total_downloaded / (1024 * 1024):.2f} MB")

if __name__ == "__main__":
    # 要下载的视频URL
    video_url = "https://obj.muyoung.com/video/Zero-Two-Night-Rays-4K.mp4"
    
    # 创建并启动下载器
    downloader = TurboDownloader(
        url=video_url,
        max_workers=8,  # 推荐4-8个线程
        output_dir="turbo_downloads",
        keep_files=3    # 只保留最新的3个文件
    )
    downloader.start()

  1. URL 验证与错误处理

    • 添加了check_url_valid()方法检查 URL 有效性

    • 对 404 错误进行专门处理

    • 增加了错误计数器统计失败次数

  2. 线程安全改进

    • 使用threading.Lock()保护共享变量

    • 限制最大线程数防止资源耗尽

  3. 下载可靠性增强

    • 实现分块下载重试机制

    • 动态调整分块大小优化下载速度

    • 添加流式下载作为备用方案

  4. 资源管理优化

    • 改进旧文件清理逻辑

    • 添加下载间隔控制防止服务器限制

  5. 统计与监控

    • 增加下载统计信息

    • 显示失败次数和总下载量

使用建议

  1. 如果持续遇到 404 错误,请检查:

    • URL 是否正确

    • 资源是否仍然可用

    • 是否有访问权限限制

  2. 性能调优建议:

    • 根据网络状况调整max_workers(4-16 之间)

    • 对于大文件可适当增加chunk_size

    • 根据服务器响应调整下载间隔

  3. 错误排查:

    • 检查网络连接

    • 验证目标服务器可用性

    • 检查防火墙或代理设置


Python 的下刷流量工具代码
https://uniomo.com/archives/wei-ming-ming-wen-zhang-xdXsMp3b
作者
雨落秋垣
发布于
2025年09月28日
许可协议