Termux之AutoTG.py


咱们利用 Python 的 Pyrogram 库作为接头人伪装成 TG 客户端(或 Bot),直接从 TG 服务器实时拉取视频流数据块(Chunk),拿到一块就立刻通过异步管道通过 PUT 请求直接灌入 Openlist 的上传接口 (/api/fs/put)。

一、安装环境

在 Termux 里安装所需的现代异步网络库:

pip install pyrogram tgcrypto httpx

二、Telegram 的 API_ID 和 API_HASH

手把手拿下 Telegram 的 API_ID 和 API_HASH

Telegram 官方要求任何第三方自动化脚本(无论是普通客户端还是 Bot)必须绑定一组开发者身份凭证。申请过程完全免费,但必须走官方开发者后台。

📝 极简申请步骤

1.准备高稳定全局网络:打开你的代理软件(务必开启全局路由模式,避开频繁抖动的免费 CDN 节点,确保访问顺畅)。

2.进入官方后台:浏览器直接打开开发者门户网址 👉 my.telegram.org

3.安全登录:

  • 在输入框填入你绑定的 Telegram 手机号(务必带上国际区号,例如中国大陆号码填 +86138xxxx)。

  • 点击 Next 后,千万别等短信,直接打开你的 Telegram 客户端,官方服务号(Service Notifications)会实时发给你一串长文本验证码。复制粘贴回网页完成登录。

4.进入开发配置:登录成功后,点击页面核心菜单里的 API development tools。

5.填写应用表单(首次进入需创建应用):

  • App title:随便起个名字(比如 AutoLeecherPro)。

  • Short name:必须全网唯一,且只能用纯英文字母和数字(比如 mytermuxleecher2026)。

  • URL:直接留空不管。

  • Platform:下拉选择 Desktop 或 Android 均可。

  • Description:留空,直接点击底部的 Create application 提交。

6.提取终极密钥:

  • 提交成功后,页面当场刷新,显示出你的 App api_id(一串纯数字)和 App api_hash(一串极长的混合字符)。

  • ⚠️ 铁律:直接把这两个值复制贴进咱们脚本的配置项里。绝对绝不泄露给任何人,这是你账号操控权的底层凭证。

三、autotg.py手动版

import os
import re
import time
import json
import asyncio
from datetime import datetime
from urllib.parse import quote
import httpx
from pyrogram import Client, filters, enums, idle
import logging

# =================================================================
# ⚙️ 核心网关、路径与凭证配置区域
# =================================================================
API_ID = 33349348             
API_HASH = "44bde7f01d2b6001589c28cea93716af"      

COMMAND_CENTER_CHAT = "@xxskyemby_bot"

OLIST_URL = "http://127.0.0.1:5244"
OLIST_TOKEN = "openlist-a87614da-32dd-4b80-9150-6447de823da8f33x53ymkrx0aPKG0HUcsFHmjFRYTKFhSADLRhoQLkXa7ogaiByhWRNEXCjpblp9" 

STEWARD_BASE_URL = "http://127.0.0.1:5000" 

BASE_DIR = os.path.dirname(os.path.abspath(__file__))
TG_ROUTE_DB = os.path.join(BASE_DIR, "tg_manual_routes.json")
LOCAL_TEMP_DIR = os.path.join(BASE_DIR, "tg_temp")
TG_LISTENER_DB = os.path.join(BASE_DIR, "tg_listener_config.json") 
TG_HISTORY_DB = os.path.join(BASE_DIR, "tg_download_history.json") 
os.makedirs(LOCAL_TEMP_DIR, exist_ok=True)

TMDB_API_KEY = "9c88e18e43543c8ff195c631aaa0d2fa" 
START_TIME = time.time()

# 把它放在 OLIST_TOKEN 等配置的下面
TG_SETTINGS_DB = os.path.join(BASE_DIR, "tg_settings.json")

def get_mount_root():
    if os.path.exists(TG_SETTINGS_DB):
        try:
            with open(TG_SETTINGS_DB, "r", encoding="utf-8") as f: 
                return json.load(f).get("mount_root", "/family/177_cas")
        except: pass
    return "/family/177_cas"
    
def set_mount_root(path):
    settings = {}
    if os.path.exists(TG_SETTINGS_DB):
        try:
            with open(TG_SETTINGS_DB, "r", encoding="utf-8") as f: 
                settings = json.load(f)
        except: pass
    settings["mount_root"] = path
    with open(TG_SETTINGS_DB, "w", encoding="utf-8") as f: 
        json.dump(settings, f, ensure_ascii=False, indent=2)

# =================================================================
# 🤫 日志与上报中枢配置
# =================================================================
logging.basicConfig(level=logging.INFO, format='[%(asctime)s] %(message)s', datefmt='%H:%M:%S')
logging.getLogger("pyrogram").setLevel(logging.WARNING)
logging.getLogger("httpx").setLevel(logging.WARNING)
logger = logging.getLogger("TGEngine")

app = Client("tg_robust_leecher_v15", api_id=API_ID, api_hash=API_HASH)

async def notify_steward_log(msg, level="INFO"):
    logger.info(f"[{level}] {msg}")
    try:
        async with httpx.AsyncClient(timeout=2.0) as client:
            await client.post(f"{STEWARD_BASE_URL}/api/remote_log", json={"level": level, "msg": msg})
    except Exception: pass

def clean_orphan_temp_files(max_age_hours=24):
    if os.path.exists(LOCAL_TEMP_DIR):
        now = time.time()
        for f in os.listdir(LOCAL_TEMP_DIR):
            file_path = os.path.join(LOCAL_TEMP_DIR, f)
            try:
                if os.path.isfile(file_path) and (now - os.path.getmtime(file_path)) > (max_age_hours * 3600):
                    os.remove(file_path)
            except Exception: pass

def load_listener_config():
    if not os.path.exists(TG_LISTENER_DB):
        dummy = {"trusted_channels": {}}
        with open(TG_LISTENER_DB, "w", encoding="utf-8") as f: json.dump(dummy, f, ensure_ascii=False, indent=4)
        return dummy
    try:
        with open(TG_LISTENER_DB, "r", encoding="utf-8") as f: return json.load(f)
    except Exception: return {"trusted_channels": {}}

def load_history():
    if os.path.exists(TG_HISTORY_DB):
        try:
            with open(TG_HISTORY_DB, 'r', encoding='utf-8') as f: 
                return json.load(f) # 🔥 卸下包袱,开机直接秒读,不再进行任何循环和多余判断
        except Exception: pass
    return {}

def check_and_record_history(drama, file_season, ep, version=""):
    history = load_history()
    ver_tag = f".{version}" if version else ""
    
    # 🔥 这一层日常防御留着:防止以后某些奇葩剧名自带点号导致拼接出双点
    base_name = drama.rstrip(".")
    key = f"{base_name}.S{file_season:02d}E{ep:02d}{ver_tag}"
    
    if key in history: return True
    history[key] = int(time.time())
    try:
        with open(TG_HISTORY_DB, 'w', encoding='utf-8') as f: json.dump(history, f, ensure_ascii=False, indent=2)
    except Exception: pass
    return False

def load_tg_routes():
    if os.path.exists(TG_ROUTE_DB):
        try:
            with open(TG_ROUTE_DB, 'r', encoding='utf-8') as f: return json.load(f)
        except Exception: pass
    return {}

def save_tg_routes(data):
    try:
        with open(TG_ROUTE_DB, 'w', encoding='utf-8') as f: json.dump(data, f, ensure_ascii=False, indent=2)
    except Exception: pass

GLOBAL_ROUTE_CACHE = {"folder_name": "", "category": "", "folder_season": 1, "file_season": 1, "year": "", "expire_time": 0, "manual_ep": None}
GLOBAL_ACTIVE_LOCKS = set() # 🔥 绝对防撞车全局锁

async def fetch_tmdb_year(cn_title):
    if not TMDB_API_KEY: return datetime.now().strftime("%Y")
    clean_q = re.sub(r'S\d+$|\s+\d+$', '', cn_title).strip()
    url = f"https://api.themoviedb.org/3/search/multi?api_key={TMDB_API_KEY}&language=zh-CN&query={quote(clean_q)}&page=1"
    try:
        async with httpx.AsyncClient(timeout=5.0) as client:
            res = await client.get(url)
            results = res.json().get("results")
            if results and len(results[0].get("first_air_date") or results[0].get("release_date") or "") >= 4:
                return (results[0].get("first_air_date") or results[0].get("release_date"))[:4]
    except Exception: pass
    return datetime.now().strftime("%Y")

def extract_pure_episode(search_text, drama_anchor=None):
    text = search_text
    # 1. 斩断剧名干扰
    if drama_anchor and drama_anchor.lower() in text.lower():
        idx = text.lower().find(drama_anchor.lower())
        text = text[idx:]
        
    # 2. 标准化标记 (优先匹配 E16, EP16)
    m = re.search(r'(?i)E0*(\d+)', text)
    if m: return int(m.group(1))
    
    # 3. 匹配中文标记:第16集, 16集, 16更, 第16话 (带各种奇怪符号)
    m = re.search(r'第\d+[季部].*?第\s*(\d+)\s*[集话期更]', text)
    if m: return int(m.group(1))
    m = re.search(r'第\s*(\d+)\s*[集话期更]', text)
    if m: return int(m.group(1))
    m = re.search(r'(?<![第\d])\s*(\d+)\s*[集话期更]', text)
    if m: return int(m.group(1))
    
    # 4. 🔥 终极裸数字捕获 (彻底修复劫持与粘连 Bug)
    # 提前清洗掉 H264, H265, 720p, 1080p 等干扰项,防止把 265 当成 265集!
    clean_text = re.sub(r'(?i)(h264|h265|x264|x265|720p|1080p|2160p|4k|8k|web-dl|webrip)', '', text)
    # 用 (?<!\d) 和 (?!\d) 代替 \b,完美解决中文字符粘连问题 (如 "雨霖铃16")
    m_trail = re.search(r'(?<!\d)0*(\d{1,3})(?!\d)', clean_text)
    if m_trail and not (1900 < int(m_trail.group(1)) < 2100):
        return int(m_trail.group(1))
        
    # 5. 纯中文数字转换 (如:第十六集)
    m_cn = re.search(r'第\s*([一二三四五六七八九十零百]+)\s*[集话期更]', text)
    if m_cn:
        cn_str = m_cn.group(1)
        cn_map = {"一":1, "二":2, "三":3, "四":4, "五":5, "六":6, "七":7, "八":8, "九":9, "十":10}
        if cn_str in cn_map: return cn_map[cn_str]
        if cn_str.startswith("十") and len(cn_str) == 2: return 10 + cn_map.get(cn_str[1], 0)
        if len(cn_str) == 3 and cn_str[1] == "十": return cn_map.get(cn_str[0],0)*10 + cn_map.get(cn_str[2],0)
        if cn_str.endswith("十") and len(cn_str) == 2: return cn_map.get(cn_str[0],0)*10
        
    return None

async def bg_upload_retry_task(local_path, target_full, total_bytes, clean_base, file_season, ep_num, is_movie, standard_name, version_suffix=""):
    try:
        await asyncio.sleep(600)  
        while os.path.exists(local_path):
            try:
                put_url = f"{OLIST_URL}/api/fs/put"
                headers = {"Authorization": OLIST_TOKEN, "File-Path": quote(target_full), "Content-Length": str(total_bytes), "Content-Type": "application/octet-stream"}
                with open(local_path, "rb") as f_upload:
                    async def file_iter():
                        while True:
                            chunk = f_upload.read(1024 * 1024)
                            if not chunk: break
                            yield chunk
                    async with httpx.AsyncClient(timeout=httpx.Timeout(connect=10.0, read=300.0, write=None, pool=None), trust_env=False) as h: 
                        resp = await h.put(put_url, content=file_iter(), headers=headers)
                
                if resp.json().get("code") == 200:
                    try: os.remove(local_path)
                    except: pass
                    if not is_movie and ep_num is not None:
                        check_and_record_history(clean_base, file_season, ep_num, version_suffix)
                        ver_tag = f".{version_suffix}" if version_suffix else ""
                        await notify_steward_log(f"📝 [后台重推-补录] {clean_base}.S{file_season:02d}E{ep_num:02d}{ver_tag}")
                    break
            except Exception:
                pass
            await asyncio.sleep(600)
    finally:
        GLOBAL_ACTIVE_LOCKS.discard(standard_name)

async def process_media_transfer(client, message, status, override_info=None):
    media = message.video or message.document
    if not media: return
    raw_file = getattr(media, "file_name", f"TG_{message.id}.mp4")
    _, ext = os.path.splitext(raw_file)
    if not ext: ext = ".mp4"
    
    if override_info:
        # ... (这里是自动订阅的逻辑,不用动)
        if len(override_info) == 7:
            folder, cat, folder_season, file_season, year, ep_num, version_suffix = override_info
        else:
            folder, cat, folder_season, file_season, year, ep_num = override_info[:6]
            version_suffix = ""
    else:
        # 👇 这里是手动 /go 发车的逻辑
        folder, cat = GLOBAL_ROUTE_CACHE["folder_name"], GLOBAL_ROUTE_CACHE["category"]
        folder_season, file_season = GLOBAL_ROUTE_CACHE["folder_season"], GLOBAL_ROUTE_CACHE["file_season"]
        year = GLOBAL_ROUTE_CACHE.get("year", "")
        
        # 🔥 关键修复:从缓存中读取版本号,不再硬编码为空!
        version_suffix = GLOBAL_ROUTE_CACHE.get("version", "")
        
        if GLOBAL_ROUTE_CACHE["manual_ep"] is not None:
            ep_num = GLOBAL_ROUTE_CACHE["manual_ep"]
            GLOBAL_ROUTE_CACHE["manual_ep"] += 1
        else: 
            ep_num = extract_pure_episode(f"{message.caption or ''} {raw_file}")
            
    is_movie = "电影" in cat or cat in ["演唱会", "纪录片"]
    
    # 🔥 剥离引擎:从文件夹名中完美抽离出纯剧名
    clean_base = re.sub(r'\s*\(\d{4}\)', '', folder).strip()
    if version_suffix and clean_base.endswith(version_suffix):
        clean_base = clean_base[:-len(version_suffix)].strip()
    clean_base = clean_base.replace(" ", ".")
    
    # 生成版本号尾巴
    ver_tag = f".{version_suffix}" if version_suffix else ""
    
    # ✅ 替换为下面这样:
    current_mount = get_mount_root()  # 🔥 动态获取你刚才设置的目录
    
    if is_movie:
        standard_name = f"{clean_base}.{year}{ver_tag}{ext}" if year else f"{clean_base}{ver_tag}{ext}"
        target_dir = f"{current_mount}/{cat}/{folder}"
    else:
        ep_str = f"{ep_num:02d}" if ep_num is not None else "00"
        target_dir = f"{current_mount}/{cat}/{folder}/Season {folder_season}"
        standard_name = f"{clean_base}.S{file_season:02d}E{ep_str}.{year}{ver_tag}{ext}" if year else f"{clean_base}.S{file_season:02d}E{ep_str}{ver_tag}{ext}"
        
    if standard_name in GLOBAL_ACTIVE_LOCKS:
        await notify_steward_log(f"🛡️ [防撞锁拦截] `{standard_name}` 正在被施工处理,巡逻哨兵已静默撤退。")
        try:
            await status.edit_text(f"🛡️ **[防撞锁拦截生效]** `{standard_name}` 已经在下载了,拦截并销毁重复任务!")
            await asyncio.sleep(3)
            await status.delete()
        except: pass
        return
    
    GLOBAL_ACTIVE_LOCKS.add(standard_name)
    bg_task_spawned = False
    downloaded_bytes = 0

    try:
        local_path = os.path.join(LOCAL_TEMP_DIR, standard_name)
        target_full = f"{target_dir}/{standard_name}".replace("//", "/")
        total_bytes = media.file_size
        file_size_mb = total_bytes / (1024 * 1024) if total_bytes else 0
        
        # 📡 提取真实的来源频道名字
        source_name = message.forward_from_chat.title if message.forward_from_chat and message.forward_from_chat.title else (message.chat.title if message.chat and message.chat.title else "未知来源")
        
        await notify_steward_log(f"📥 [涡轮启动] 拉取: {standard_name} | 来源: {source_name} | 大小: {file_size_mb:.2f} MB")
        
        # 强制在 TG 聊天框里把来源和大小亮出来!
        try:
            await status.edit_text(f"🚀 **[极速拉取]** `{standard_name}`\n📡 来源频道: **{source_name}**\n⚖️ 机器测重: **{file_size_mb:.2f} MB**\n涡轮进度: **0%**")
        except: pass
        
        chunk_size = 1024 * 1024
        last_pct = 0
        retry_count = 0
        max_retries = 15  # 🔥 顺手调大一点,允许它多试几次
        last_stuck_chunks = -1
        stuck_loop_count = 0
        
        while True:
            # 🔥 加装致命刹车!重试超过限制,立刻打断施法,跳出死循环!
            if retry_count >= max_retries:
                break
                
            current_bytes = os.path.getsize(local_path) if os.path.exists(local_path) else 0
            
            if current_bytes > total_bytes:
                try: os.remove(local_path)
                except: pass
                current_bytes = 0
                
            if current_bytes == total_bytes:
                downloaded_bytes = current_bytes
                break
                
            completed_chunks = current_bytes // chunk_size
            secure_bytes = completed_chunks * chunk_size
            if secure_bytes > 0:
                try:
                    with open(local_path, "r+b") as f: f.truncate(secure_bytes)
                except: pass
                downloaded_bytes = secure_bytes
                open_mode = "ab"
            else: 
                downloaded_bytes = 0
                completed_chunks = 0
                open_mode = "wb"
                
            try:
                buffer_queue = asyncio.Queue(maxsize=50)
                writer_error = None
                async def disk_writer():
                    nonlocal writer_error
                    try:
                        with open(local_path, open_mode) as f:
                            while True:
                                chunk = await buffer_queue.get()
                                if chunk is None: break 
                                f.write(chunk); buffer_queue.task_done()
                    except Exception as e: writer_error = e

                writer_task = asyncio.create_task(disk_writer())
                
                async for chunk in client.stream_media(message, offset=completed_chunks):
                    if writer_error: raise writer_error 
                    await buffer_queue.put(chunk)
                    downloaded_bytes += len(chunk)
                    pct = int(downloaded_bytes * 100 / total_bytes)
                    if (pct // 10) > (last_pct // 10):
                        last_pct = pct - (pct % 10)
                        asyncio.create_task(status.edit_text(f"🚀 **[极速拉取]** `{standard_name}`\n📡 来源: **{source_name}** | ⚖️ 大小: **{file_size_mb:.2f} MB**\n涡轮进度: **{last_pct}%**"))
                        asyncio.create_task(notify_steward_log(f"🚀 [下载进度] {standard_name} ➔ {last_pct}%"))
                
                await buffer_queue.put(None)
                await writer_task
                
                if downloaded_bytes >= total_bytes: 
                    break
                else:
                    retry_count += 1
                    if completed_chunks == last_stuck_chunks:
                        stuck_loop_count += 1
                        if stuck_loop_count >= 10:
                            if os.path.exists(local_path): os.remove(local_path)
                            last_stuck_chunks = -1; stuck_loop_count = 0; continue
                        await status.edit_text(f"⚠️ **[节点堵塞]** 正在重新校准物理块...\n🔄 第 **{retry_count}** 次死磕底层数据...")
                    else: 
                        last_stuck_chunks = completed_chunks; stuck_loop_count = 0
                    await asyncio.sleep(3.0)
                    
            except Exception as e:
                if 'writer_task' in locals() and not writer_task.done(): writer_task.cancel()
                retry_count += 1
                await status.edit_text(f"⚠️ **[网络闪断]** 传输突发中断!\n🛠️ 第 **{retry_count}** 次断点重联死磕...")
                await asyncio.sleep(3.0)
                
        if downloaded_bytes < total_bytes: 
            if retry_count >= max_retries:
                try:
                    if os.path.exists(local_path): os.remove(local_path)
                except: pass
                await status.edit_text("❌ 下载尝试彻底耗尽,已自动销毁本地残片。")
            else:
                await status.edit_text("⚠️ 节点暂存断点,保留残片。")
            return
            
        await status.edit_text("✅ 本地落盘完成,正在上传云端...")
        
        put_url = f"{OLIST_URL}/api/fs/put"
        headers = {"Authorization": OLIST_TOKEN, "File-Path": quote(target_full), "Content-Length": str(total_bytes), "Content-Type": "application/octet-stream"}
        push_success, up_retries = False, 0
        f_upload = None
        
        while not push_success and up_retries < 5:
            try:
                last_up = 0
                f_upload = open(local_path, "rb")
                f_upload.seek(0)
                async def file_iter():
                    nonlocal last_up
                    sent = 0
                    while True:
                        chunk = f_upload.read(1024 * 1024)
                        if not chunk: break
                        sent += len(chunk)
                        pct = int(sent * 100 / total_bytes)
                        if (pct // 10) > (last_up // 10):
                            last_up = pct - (pct % 10)
                            asyncio.create_task(status.edit_text(f"🚀 **[直推天翼云]** `{standard_name}`\n📡 来源: **{source_name}** | ⚖️ 大小: **{file_size_mb:.2f} MB**\n云端进度: **{last_up}%**"))
                            asyncio.create_task(notify_steward_log(f"☁️ [上传进度] {standard_name} ➔ {last_up}%"))
                        yield chunk
                # 🔥 修复 1:把 read 延长到 900秒(15分钟)给天翼云合并大文件留足时间,把 write 加锁防止网络假死
                async with httpx.AsyncClient(timeout=httpx.Timeout(connect=10.0, read=900.0, write=60.0, pool=None), trust_env=False) as h: 
                    resp = await h.put(put_url, content=file_iter(), headers=headers)
                
                # 🔥 修复 2:逼出天翼云/Alist的真实拦截原因
                resp_json = resp.json()
                if resp_json.get("code") == 200: 
                    push_success = True
                else: 
                    err_msg = resp_json.get("message", "未知拦截")
                    raise Exception(f"云端拒绝落盘: {err_msg}")
            
            except Exception as e: 
                up_retries += 1
                err_detail = str(e) if str(e) else "网络响应死锁或彻底超时"
                # 🔥 修复 3:把真实死因打印在 TG 群里,死也要死个明白!
                await status.edit_text(f"⚠️ 云端传输受阻: {err_detail}\n🔄 正在进行第 {up_retries} 次重连...")
                await asyncio.sleep(up_retries*4)
            finally:
                if f_upload:
                    try: f_upload.close()
                    except: pass
                    f_upload = None
                
        if push_success:
            if not is_movie and ep_num is not None:
                check_and_record_history(clean_base, file_season, ep_num, version_suffix)
                ver_tag = f".{version_suffix}" if version_suffix else ""
                await notify_steward_log(f"📝 [历史已写入] {clean_base}.S{file_season:02d}E{ep_num:02d}{ver_tag}")
                
            await notify_steward_log(f"🎉 [终极入库成功] ➔ {target_full}")
            try: await status.edit_text(f"🎉 **终极入库成功** ➔ `{target_full}`")
            except Exception: await client.send_message("me", f"🎉 **[备用通道通知] 入库成功** ➔ `{standard_name}`")
            
            try:
                if os.path.exists(local_path): os.remove(local_path)
            except Exception: pass
        else:
            asyncio.create_task(bg_upload_retry_task(local_path, target_full, total_bytes, clean_base, file_season, ep_num, is_movie, standard_name, version_suffix))
            bg_task_spawned = True
            try: await status.edit_text("⚠️ 云端直推受阻,已强行托管至后台队列,每10分钟自动重试...")
            except: await client.send_message("me", f"⚠️ 云端直推受阻,已托管至后台重推: `{standard_name}`")
            
    finally:
        if not bg_task_spawned:
            GLOBAL_ACTIVE_LOCKS.discard(standard_name)

# =================================================================
# 🌟 全自动挖掘引擎 (加入了动态重量拦截)
# =================================================================
# 把首行定义改成这样,加一个 year=None
async def sweep_existing_history(client, chat_id, drama_key, category, folder_season, file_season, min_ep, min_mb=0, max_mb=999999, fetch_limit=200, year=None):
    try:
        chat_id_int = int(chat_id) if str(chat_id).lstrip('-').isdigit() else chat_id
        
        # 🔥 如果是新结构,解析出真实搜索词和版本;如果是老数据,drama_key 就是剧名
        config = load_listener_config()
        d_info = config.get("trusted_channels", {}).get(str(chat_id), {}).get("monitored_dramas", {}).get(drama_key, {})
        search_kw = d_info.get("search_kw", drama_key)
        version_suffix = d_info.get("version", "")
        
        if not year: year = await fetch_tmdb_year(search_kw)
            
        # 🔥 文件夹物理隔离:将夜 (2026) HDR
        folder_name = f"{search_kw} ({year})" if year else search_kw
        if version_suffix: 
            folder_name = f"{folder_name} {version_suffix}"  # 👈 这里同样只留空格
        
        # 账本判定名
        hist_base = f"{search_kw}_{version_suffix}" if version_suffix else search_kw
        
        async for old_msg in client.get_chat_history(chat_id_int, limit=fetch_limit):
            media = old_msg.video or old_msg.document
            if not media: continue
            
            text_to_scan = f"{old_msg.caption or ''} {getattr(media, 'file_name', '')}"
            
            # 🔥 新增:历史消息扫荡,同样执行上下文缝合术!
            if old_msg.reply_to_message:
                parent_text = old_msg.reply_to_message.caption or old_msg.reply_to_message.text or ""
                text_to_scan = f"{text_to_scan} {parent_text}"
            # 🔥 这里要用真实的剧名 search_kw 去扫描文字
            if search_kw.lower() in text_to_scan.lower():
                ep_num = extract_pure_episode(text_to_scan, drama_anchor=search_kw)
                if ep_num is None or ep_num < min_ep: continue
                
                file_size_mb = media.file_size / (1024 * 1024) if getattr(media, "file_size", 0) else 0
                if not (min_mb <= file_size_mb <= max_mb): continue
                
                # 🔥 使用独立版本账本进行核对
                ver_tag = f".{version_suffix}" if version_suffix else ""
                if f"{search_kw}.S{file_season:02d}E{ep_num:02d}{ver_tag}" in load_history(): 
                    continue
                    
                override_info = (folder_name, category, folder_season, file_season, year, ep_num, version_suffix)
                try: status = await client.send_message(COMMAND_CENTER_CHAT, f"🎯 **[哨兵发车]**\n📺 `{search_kw}` ({version_suffix or '默认'}) ➔ S{file_season:02d}E{ep_num:02d}")
                except Exception: status = await client.send_message("me", f"🎯 **[备用嗅探]**\n📺 `{search_kw}` ({version_suffix or '默认'}) ➔ S{file_season:02d}E{ep_num:02d}")
                await process_media_transfer(client, old_msg, status, override_info)
                await asyncio.sleep(5) 
    except Exception as e:
        pass

# =================================================================
# 🧠 智能安全巡航进程 (完美融合夜间休眠 + 周更日更排班过滤)
# =================================================================
async def smart_patrol_daemon(client):
    await asyncio.sleep(60) 
    while True:
        try:
            # 门禁 1:凌晨 0 点到早上 9 点强制进入深度睡眠,防封号
            current_hour = datetime.now().hour
            if 0 <= current_hour < 9:
                await notify_steward_log(f"💤 [夜间休眠] 当前 {current_hour} 点,凌晨0-9点不巡逻,打更人深度睡眠中...")
                await asyncio.sleep(3600) 
                continue
            
            # 白天每 15 分钟打更一次
            await asyncio.sleep(900)
            
            # 门禁 2:计算今天星期几
            weekday_map = {0: "周一", 1: "周二", 2: "周三", 3: "周四", 4: "周五", 5: "周六", 6: "周日"}
            today_cn = weekday_map[datetime.now().weekday()]
            
            await notify_steward_log(f"🔍 [定时巡航] 开启全盘清算... 今天是: {today_cn}")
            
            config = load_listener_config()
            channels = config.get("trusted_channels", {})
            if not channels:
                continue
                
            drama_count = 0
            for chat_id, info in channels.items():
                for drama_name, d_info in info.get("monitored_dramas", {}).items():
                    
                    # ⚖️ 核心过滤:如果设置了周更,且今天不是它的更新日,直接一脚踢开!
                    freq = d_info.get("frequency", "日更")
                    if freq != "日更" and freq != today_cn:
                        # 频率不对,今天不属于它,直接跳过,绝不去骚扰 TG 接口
                        continue
                    
                    folder_s = d_info.get("folder_season", 1)
                    file_s = d_info.get("file_season", folder_s)
                    min_ep = d_info.get("min_ep", 1)
                    cat = d_info.get("category", "未分类")
                    min_mb = d_info.get("min_mb", 0)
                    max_mb = d_info.get("max_mb", 999999)
                    
                    d_year = d_info.get("year")
                    asyncio.create_task(sweep_existing_history(client, chat_id, drama_name, cat, folder_s, file_s, min_ep, min_mb, max_mb, fetch_limit=10, year=d_year))
                    drama_count += 1
            
            if drama_count > 0:
                await notify_steward_log(f"✅ [巡航分流完毕] 今天符合更新条件的剧集共 {drama_count} 部,已派出哨兵翻找。")
            else:
                await notify_steward_log(f"☕ [巡航结束] 今天没有任何剧集达到更新排班,哨兵原地喝茶。")
                
        except Exception as e:
            await notify_steward_log(f"❌ [巡航突发异常] 详情: {str(e)}", level="ERROR")
            await asyncio.sleep(60)

# =================================================================
# 🧠 开机自动补漏清算
# =================================================================
async def startup_catchup_sweep(client):
    await asyncio.sleep(5)
    try:
        config = load_listener_config()
        await notify_steward_log("🚀 [系统启动] 正在进行开机全自动历史补漏扫荡...")
        for chat_id, info in config.get("trusted_channels", {}).items():
            for drama_name, d_info in info.get("monitored_dramas", {}).items():
                folder_s = d_info.get("folder_season", 1)
                file_s = d_info.get("file_season", folder_s)
                min_ep = d_info.get("min_ep", 1)
                cat = d_info.get("category", "未分类")
                # 🔥 读取重量锁
                min_mb = d_info.get("min_mb", 0)
                max_mb = d_info.get("max_mb", 999999)
                
                d_year = d_info.get("year")
                await sweep_existing_history(client, chat_id, drama_name, cat, folder_s, file_s, min_ep, min_mb, max_mb, fetch_limit=200, year=d_year)
                await asyncio.sleep(2)
        await notify_steward_log("✅ [开机扫荡完成] 历史遗留清点完毕。")
    except Exception:
        pass

# =================================================================
# 📡 远程指令总枢 (加入全新的订阅重力门)
# =================================================================
STANDARD_CATS = ["华语剧", "欧美剧", "日韩剧", "短剧", "华语电影", "欧美电影", "日韩电影", "演唱会", "国漫", "日漫", "综艺", "纪录片"]

@app.on_message(filters.command(["sub", "unsub", "list", "add", "del", "go", "history", "ping", "rm", "clean", "scan", "rmh", "setdir"]) & filters.user("me"))
async def manage_system_commands(client, message):
    command = message.command[0].lower()
    config = load_listener_config()
    
    if command == "ping":
        uptime_minutes = int((time.time() - START_TIME) / 60)
        await message.reply_text(f"🟢 **系统健康度 [优秀]**\n⏱️ 存活: `{uptime_minutes}` 分钟")
        return

    if command == "setdir":
        if len(message.command) < 2:
            return await message.reply_text(f"📁 当前上传目录: `{get_mount_root()}`\n⚠️ 语法:`/setdir [新目录路径]` (例如: `/setdir /family/188_cas`)")
        new_dir = message.command[1]
        set_mount_root(new_dir)
        return await message.reply_text(f"✅ 上传主目录已成功切换至: `{new_dir}`")

    if command == "clean":
        count = 0
        if os.path.exists(LOCAL_TEMP_DIR):
            for f in os.listdir(LOCAL_TEMP_DIR):
                try: os.remove(os.path.join(LOCAL_TEMP_DIR, f)); count += 1
                except: pass
        await message.reply_text(f"🧹 抹除 `{count}` 个遗留碎片!")
        return

    if command == "rm":
        if len(message.command) < 2: return await message.reply_text("⚠️ 语法:`/rm [航线剧名关键字]`")
        kw = " ".join(message.command[1:]).lower()
        routes = load_tg_routes()
        matched_keys = [k for k in routes.keys() if kw in k.lower()]
        if not matched_keys: return await message.reply_text(f"❌ 航线库里没找到包含 `{kw}` 的记录")
        for k in matched_keys: del routes[k]
        save_tg_routes(routes)
        return await message.reply_text(f"🗑️ 已彻底删除 {len(matched_keys)} 条自定义航线!")

    if command == "rmh":
        if len(message.command) < 2: return await message.reply_text("⚠️ `/rmh [剧名关键字]`")
        kw = " ".join(message.command[1:]).lower()
        history = load_history()
        matched_keys = [k for k in history.keys() if kw in k.lower()]
        if not matched_keys: return await message.reply_text(f"❌ 历史库里没找到包含 `{kw}` 的记录")
        for k in matched_keys: del history[k]
        with open(TG_HISTORY_DB, 'w', encoding='utf-8') as f: json.dump(history, f, ensure_ascii=False, indent=2)
        return await message.reply_text(f"🗑️ 已从历史账本中彻底抹除 {len(matched_keys)} 条记录!")

    if command == "history":
        history = load_history()
        if not history: return await message.reply_text("📭 历史账本目前是空的。")
        sorted_hist = sorted(history.items(), key=lambda x: x[1], reverse=True)[:15]
        msg_text = "📜 **[最近入库成功记录 (前15条)]**\n\n"
        for k, v in sorted_hist:
            time_str = datetime.fromtimestamp(v).strftime('%m-%d %H:%M')
            msg_text += f"🔹 `{k}`  *(于 {time_str})*\n"
        return await message.reply_text(msg_text)

    if command == "add":
        if len(message.command) < 3: return await message.reply_text("⚠️ `/add [频道ID] [别名]`")
        c_id, c_name = message.command[1], " ".join(message.command[2:])
        if c_id not in config["trusted_channels"]: config["trusted_channels"][c_id] = {"monitored_dramas": {}}
        config["trusted_channels"][c_id]["channel_name"] = c_name
        with open(TG_LISTENER_DB, "w", encoding="utf-8") as f: json.dump(config, f, ensure_ascii=False, indent=4)
        await message.reply_text(f"🏢 **频道挂载成功**: {c_name} (`{c_id}`)")
        return

    if command == "del":
        if len(message.command) < 2: return
        if message.command[1] in config["trusted_channels"]:
            del config["trusted_channels"][message.command[1]]
            with open(TG_LISTENER_DB, "w", encoding="utf-8") as f: json.dump(config, f, ensure_ascii=False, indent=4)
            await message.reply_text("🗑️ 频道已拔除。")
        return

    if command == "list":
        try:
            msg_text = "📡 **[雷达大盘状态]**\n\n"
            for chat_id, info in config.get("trusted_channels", {}).items():
                msg_text += f"🏢 **频道**: {info.get('channel_name', chat_id)}\n"
                dramas = info.get("monitored_dramas", {})
                if not dramas: 
                    msg_text += "  └ 📭 (空)\n"
                else:
                    for d_name, d_info in dramas.items(): 
                        # 🛡️ 强力兜底:用 .get() 获取,即使是旧数据没有这些字段,也会塞一个默认值,绝不报错!
                        f_season = d_info.get('file_season', 1)
                        try: f_season = int(f_season)
                        except: f_season = 1
                        
                        min_mb = d_info.get('min_mb', 0)
                        max_mb = d_info.get('max_mb', 999999)
                        freq = d_info.get('frequency', '日更')
                        
                        search_kw = d_info.get('search_kw', d_name)
                        ver = d_info.get('version', '')
                        
                        s_text = f"S{f_season:02d}"
                        size_txt = f" / {min_mb}-{max_mb if max_mb != 999999 else '上限'}MB" if (min_mb or max_mb != 999999) else ""
                        freq_txt = f" / ⏱️{freq}"
                        ver_txt = f" / 🏷️{ver}" if ver else ""
                        
                        line = f"  └ 🎬 `{search_kw}` ({s_text}{size_txt}{freq_txt}{ver_txt})\n"
                        
                        # 🛡️ 终极防御:TG单条消息有长度限制,超过 4000 字会发不出来装死,这里做分段发送
                        if len(msg_text) + len(line) > 4000:
                            await message.reply_text(msg_text)
                            msg_text = ""
                        msg_text += line
            
            if msg_text:
                await message.reply_text(msg_text)
                
        except Exception as e:
            # 万一真出了神仙 Bug,逼它把错误打印在 TG 里,死个明白
            await message.reply_text(f"❌ `/list` 读取失败,原因: {str(e)}")
        return

    if command == "scan":
        args = message.command[1:]
        target_kw = " ".join(args).lower() if args else None
        await message.reply_text("🔍 **[手动扫荡触发]** ➔ 正在全自动翻找目标漏网之鱼...")
        for chat_id, info in config.get("trusted_channels", {}).items():
            for drama_name, d_info in info.get("monitored_dramas", {}).items():
                if target_kw and target_kw not in drama_name.lower(): continue
                folder_s = d_info.get("folder_season", 1)
                file_s = d_info.get("file_season", folder_s)
                min_ep = d_info.get("min_ep", 1)
                cat = d_info.get("category", "未分类")
                # 🔥 读取重量锁
                min_mb = d_info.get("min_mb", 0)
                max_mb = d_info.get("max_mb", 999999)
                
                d_year = d_info.get("year")
                asyncio.create_task(sweep_existing_history(client, chat_id, drama_name, cat, folder_s, file_s, min_ep, min_mb, max_mb, fetch_limit=200, year=d_year))
        return

    if command == "sub":
        args = message.command[1:]
        if len(args) < 2: return await message.reply_text("⚠️ 语法:`/sub [剧名] [分类] [最小MB-最大MB] [频率:日更/周一~周日] [v=版本号] [可选:年份/季数/起步集]`")
            
        cat_idx = next((i for i, arg in enumerate(args) if arg in STANDARD_CATS), -1)
        if cat_idx == -1: return await message.reply_text("⚠️ 请提供有效分类。")
        category = args.pop(cat_idx)
        
        # 智能提取更新频率
        freq_list = ["日更", "周一", "周二", "周三", "周四", "周五", "周六", "周日"]
        cn_weekday_map = {"星期一": "周一", "星期二": "周二", "星期三": "周三", "星期四": "周四", "星期五": "周五", "星期六": "周六", "星期日": "周日"}
        frequency = "日更"
        for i, arg in enumerate(args):
            if arg in freq_list: frequency = args.pop(i); break
            elif arg in cn_weekday_map: frequency = cn_weekday_map[args.pop(i)]; break
        
        # 提取大小限制区间
        min_mb, max_mb = 0, 999999
        size_idx = next((i for i, arg in enumerate(args) if re.match(r'^\d+-\d+$', arg)), -1)
        if size_idx != -1:
            size_str = args.pop(size_idx)
            min_mb, max_mb = map(int, size_str.split('-'))
        
        # 🔥🔥🔥 痛点克星:如果你的指令是“回复”给某条转发视频的,自动抠出频道ID并建档!
        replied = message.reply_to_message
        if replied and replied.forward_from_chat:
            specific_channel = str(replied.forward_from_chat.id)
            channel_title = replied.forward_from_chat.title or specific_channel
            # 自动帮新频道在数据库开户!
            if specific_channel not in config.setdefault("trusted_channels", {}):
                config["trusted_channels"][specific_channel] = {"channel_name": channel_title, "monitored_dramas": {}}
        else:
            # 备用方案:如果你没用回复功能,就看看命令里有没有手写的 -100 ID
            specific_channel = next((args.pop(i) for i, arg in enumerate(args) if arg.startswith("-100")), None)
            
        target_pools = [specific_channel] if specific_channel else list(config.get("trusted_channels", {}).keys())
        
        if not target_pools:
            return await message.reply_text("⚠️ 你的大盘里还没有任何频道!请先转发一条频道的视频,并对它【回复】 /sub 指令来自动添加!")

        # 🔥 智能提取年份 (咱们上一轮挪上来的,保持在这别动)
        custom_year = next((args.pop(i) for i, arg in enumerate(args) if arg.isdigit() and len(arg) == 4 and (1900 < int(arg) < 2100)), None)

        # 🔥🔥🔥 智能提取版本标签 (从下面剪切挪上来的!) 
        version_suffix = ""
        for i, arg in enumerate(args):
            if arg.lower().startswith("v="):
                version_suffix = args.pop(i)[2:]
                break

        # ========================================================
        # 此时 args 里所有的杂质都被排干净了,只剩下 [剧名] 和 [季数] [集数]
        # 下面是你原本提取季和集的代码,老老实实放在这兜底
        # ========================================================
        file_season = int(args.pop(-1)[1:]) if args and re.match(r'^s\d+$', args[-1], re.IGNORECASE) else None
        min_ep = int(args.pop(-1)) if args and args[-1].isdigit() else 1
        folder_season = int(args.pop(-1)) if args and args[-1].isdigit() else 1
        if file_season is None: file_season = folder_season
        
        drama_name = " ".join(args) if args else "未知目标"
        
        # 🔥 创建数据库唯一键值,防止两个版本互相覆盖
        drama_key = f"{drama_name}_{version_suffix}" if version_suffix else drama_name
        
        if not custom_year:
            custom_year = await fetch_tmdb_year(drama_name)
            
        target_pools = [specific_channel] if specific_channel else list(config["trusted_channels"].keys())
        
        for chat_id in target_pools:
            if "monitored_dramas" not in config["trusted_channels"][chat_id]: config["trusted_channels"][chat_id]["monitored_dramas"] = {}
            # 这里存入的是 drama_key,不是 drama_name
            config["trusted_channels"][chat_id]["monitored_dramas"][drama_key] = {
                "search_kw": drama_name,  # 🔥 记录真实的搜索词
                "version": version_suffix, # 🔥 记录版本后缀
                "category": category, "folder_season": folder_season, 
                "file_season": file_season, "min_ep": min_ep,
                "min_mb": min_mb, "max_mb": max_mb,
                "frequency": frequency,
                "year": custom_year
            }
            
        with open(TG_LISTENER_DB, "w", encoding="utf-8") as f: json.dump(config, f, ensure_ascii=False, indent=4)
        await message.reply_text(f"✅ **订阅成功**: `{drama_name}`\n🏷️ **版本追踪**: `{version_suffix or '默认原版'}`\n⚖️ **画质锁定**: `{min_mb}-{max_mb} MB`\n⏱️ **频率**: `{frequency}`\n📅 **年份**: `{custom_year}`\n🚀 **启动扫荡!**")
        
        for chat_id in target_pools:
            asyncio.create_task(sweep_existing_history(client, chat_id, drama_key, category, folder_season, file_season, min_ep, min_mb, max_mb, fetch_limit=200, year=custom_year))
        return
    
    if command in ["unsub", "del"]:
        args = message.command[1:]
        if not args: 
            return await message.reply_text("⚠️ 语法:`/unsub [剧名] [可选: v=版本号] [可选:-100开头的频道ID]`")
        
        # 🎯 提取精准狙击的频道 ID
        specific_channel = next((arg for arg in args if arg.startswith("-100")), None)
        if specific_channel:
            args.remove(specific_channel)
            
        # 🎯 提取要取消的版本号 (例如 v=HDR)
        version_suffix = ""
        for i, arg in enumerate(args):
            if arg.lower().startswith("v="):
                version_suffix = args.pop(i)[2:]
                break
                
        drama_name = " ".join(args)
        if not drama_name:
            return await message.reply_text("⚠️ 请输入要取消的剧名!")
            
        # 拼接数据库里的真实 Key
        drama_key = f"{drama_name}_{version_suffix}" if version_suffix else drama_name
        
        config = load_listener_config()
        channels = config.get("trusted_channels", {})
        
        deleted_count = 0
        # 如果指定了频道,就只杀那个频道;没指定,就全频道通杀!
        target_pools = [specific_channel] if specific_channel else list(channels.keys())
        
        for chat_id in target_pools:
            if chat_id in channels and "monitored_dramas" in channels[chat_id]:
                # 精准匹配带版本的剧名
                if drama_key in channels[chat_id]["monitored_dramas"]:
                    del channels[chat_id]["monitored_dramas"][drama_key]
                    deleted_count += 1
                # 兼容老数据,如果没有版本号,也顺手试着删一下
                elif drama_name in channels[chat_id]["monitored_dramas"]:
                    del channels[chat_id]["monitored_dramas"][drama_name]
                    deleted_count += 1
                    
        if deleted_count > 0:
            with open(TG_LISTENER_DB, "w", encoding="utf-8") as f:
                json.dump(config, f, ensure_ascii=False, indent=4)
            await message.reply_text(f"✅ **精确打击成功**\n💣 目标: `{drama_key}`\n📉 共从 **{deleted_count}** 个频道中移除了该监听雷达!")
        else:
            await message.reply_text(f"⚠️ **未找到目标**: `{drama_key}`\n💡 请使用 `/list` 命令查看雷达大盘里的准确名称(包含版本)。")
        return

    if command == "go":
        args = message.command[1:]
        if not args: return await message.reply_text("⚠️ 语法:`/go [剧名关键字] [可选:分类/年份/v=版本/季数]`")
        routes = load_tg_routes()
        
        # 🔥 新增:智能提取版本标签 (例如 v=HDR)
        version_suffix = ""
        for i, arg in enumerate(args):
            if arg.lower().startswith("v="):
                version_suffix = args.pop(i)[2:]
                break

        # 🔥 新增:智能提取年份
        custom_year = next((args.pop(i) for i, arg in enumerate(args) if arg.isdigit() and len(arg) == 4 and (1900 < int(arg) < 2100)), None)

        cat_idx = next((i for i, arg in enumerate(args) if arg in STANDARD_CATS), -1)
        
        # 如果没带分类,说明是老航线唤醒
        if cat_idx == -1:
            search_kw = " ".join(args).lower()
            matched_item = None
            for k, v in routes.items():
                if search_kw in k.lower() or search_kw in v.get("folder_name", "").lower():
                    matched_item = v; break
            if matched_item:
                folder_s = matched_item.get("folder_season", 1)
                file_s = matched_item.get("file_season", folder_s)
                # 🔥 将历史记忆中的版本也压入全局缓存
                GLOBAL_ROUTE_CACHE.update({"folder_name": matched_item["folder_name"], "category": matched_item["category"], "folder_season": folder_s, "file_season": file_s, "year": matched_item.get("year", ""), "version": matched_item.get("version", ""), "expire_time": time.time() + 3600, "manual_ep": None})
                return await message.reply_text(f"✅ 命中记忆航线!已锁定目标: `{matched_item['folder_name']}`\n请直接转发发车!")
            return await message.reply_text("⚠️ 未匹配到历史,如果要开辟新线请提供标准分类。")
        
        # 下面是开辟新航线的逻辑
        cat = args.pop(cat_idx)
        
        # 1. 抓 S 标签 (例如 S2)
        file_season = int(args.pop(-1)[1:]) if args and re.match(r'^s\d+$', args[-1], re.IGNORECASE) else None
        
        # 🔥 2. 抓文件夹季数 (老样子,只抓最后一个数字)
        folder_season = int(args.pop(-1)) if args and args[-1].isdigit() else 1
        
        if file_season is None: file_season = folder_season 
        
        # 🔥 3. 剩下干干净净的纯剧名!
        pure_title = " ".join(args)
        year = custom_year if custom_year else await fetch_tmdb_year(pure_title)
        
        # 拼装物理主文件夹名
        folder_name = f"{pure_title} ({year})" if year else pure_title
        if version_suffix: 
            folder_name = f"{folder_name} {version_suffix}"
        
        # 存入数据库并激活全局缓存
        routes[folder_name] = {"folder_name": folder_name, "category": cat, "folder_season": folder_season, "file_season": file_season, "year": year, "version": version_suffix, "created_at": int(time.time())}
        save_tg_routes(routes)
        
        # 全局缓存 (manual_ep 保持 None,让机器自己去视频里抓)
        GLOBAL_ROUTE_CACHE.update({"folder_name": folder_name, "category": cat, "folder_season": folder_season, "file_season": file_season, "year": year, "version": version_suffix, "expire_time": time.time() + 3600, "manual_ep": None})
        
        await message.reply_text(f"✅ 新航线已打通:\n📁 目标锁定: `{folder_name}`\n请直接转发媒体发车!")
        return
    
    if command in ["help", "h"]:
        try:
            help_text = """🤖 **打更人媒体管家 - 终极参数指南** 🤖

🎬 **1. 自动订阅 (/sub)**
💬 **语法**: `/sub [剧名] [分类] [最小-最大MB] [频率] [年份] [v=版本] [文件夹季] [S文件季]`
👉 **示例**: `/sub 将夜 国漫 400-1500 周四 2026 v=HDR 1 S2`
💡 **秘籍**: 直接回复频道的视频发送此命令,自动免填频道ID!

🗑️ **2. 取消订阅 (/unsub)**
💬 **语法**: `/unsub [剧名] [v=版本] [-100频道ID]`
👉 **示例**: `/unsub 将夜 v=HDR` (精准取消HDR版)

🚀 **3. 手动发车 (/go)**
💬 **语法**: `/go [剧名] [分类] [年份] [v=版本] [文件夹季]`
👉 **步骤1**: `/go 绝命毒师 欧美剧 2018 v=4K 1` (锁死文件夹)
👉 **步骤2**: 发送 `#S2E8` (随意捏造文件名,不破坏文件夹)
👉 **步骤3**: 狂发视频,自动重命名入库!

📡 **4. 大盘与扫荡**
👉 `/list` : 查看各频道监听排班表
👉 `/scan` : 强行触发一次全网补漏扫荡

❌ **5. 删除记录**
👉 `/rmh` : 删除下载历史记录
👉 `/rm` : 删除历史航线!把不再需要的死档永久抹除"""
            
            await message.reply_text(help_text)
        except Exception as e:
            await message.reply_text(f"❌ 帮助菜单解析失败,原因: {str(e)}")
        return

# =================================================================
# 🎛️ 航线遥控器:手动霸权重塑文件名 (绝对不碰底层文件夹)
# =================================================================
@app.on_message(filters.text & ~filters.media & filters.user("me"))
async def override_episode(client, message):
    text = message.text.strip().upper()
    # 只有在 /go 航线开启且未过期时,遥控器才生效
    if text.startswith("#") and GLOBAL_ROUTE_CACHE.get("expire_time", 0) > time.time():
        m_se = re.search(r'^#S(\d+)E(\d+)', text)
        m_s = re.search(r'^#S(\d+)$', text)
        m_e = re.search(r'^#E(\d+)$', text)
        
        reply_msg = ""
        if m_se:
            GLOBAL_ROUTE_CACHE["file_season"] = int(m_se.group(1))
            # ❌ 砍掉了这里修改 folder_season 的代码,物理文件夹彻底死锁保护!
            GLOBAL_ROUTE_CACHE["manual_ep"] = int(m_se.group(2))
            reply_msg = f"🎛️ **文件名重塑**: 季数 ➔ S{int(m_se.group(1)):02d} | 集数 ➔ E{int(m_se.group(2)):02d}\n🔒 (物理文件夹维持原位不变)"
        elif m_s:
            GLOBAL_ROUTE_CACHE["file_season"] = int(m_s.group(1))
            # ❌ 砍掉了这里修改 folder_season 的代码!
            reply_msg = f"🎛️ **文件名重塑**: 季数 ➔ S{int(m_s.group(1)):02d}\n🔒 (物理文件夹维持原位不变)"
        elif m_e:
            GLOBAL_ROUTE_CACHE["manual_ep"] = int(m_e.group(1))
            reply_msg = f"🎛️ **文件名重塑**: 集数 ➔ E{int(m_e.group(1)):02d}"
            
        if reply_msg:
            await message.reply_text(reply_msg)

# =================================================================
# 🎯 被动网关双保险拦截 (穿透评论区结界版)
# =================================================================
@app.on_message(filters.video | filters.document)
@app.on_edited_message(filters.video | filters.document)
async def media_routing_gateway(client, message):
    config = load_listener_config()
    
    chat_id_to_check = str(message.chat.id) if message.chat else ""
    original_channel_id = ""
    parent_text = ""
    
    # 🔥 核心穿透术:如果是评论区消息,精准提取它的“主频道户口”和“主贴文案”!
    if message.reply_to_message:
        parent = message.reply_to_message
        parent_text = parent.caption or parent.text or ""
        
        # 兼容 TG 最新底层逻辑:评论区主贴的身份其实是 sender_chat!
        if parent.forward_from_chat:
            original_channel_id = str(parent.forward_from_chat.id)
        elif parent.sender_chat:
            original_channel_id = str(parent.sender_chat.id) # 👈 加了这行,瞬间打通!
            
        # 防御极端的“楼中楼”回复(回复了评论区的某个人)
        if parent.reply_to_message:
            parent_text += f" {parent.reply_to_message.caption or parent.reply_to_message.text or ''}"

    # 🕵️‍♂️ 身份双重核验:当前群聊ID 或 背后主频道ID,只要有一个在订阅列表里,就放行!
    matched_channel = None
    for k in config.get("trusted_channels", {}):
        if chat_id_to_check and (k in chat_id_to_check or chat_id_to_check in k):
            matched_channel = k
            break
        if original_channel_id and (k in original_channel_id or original_channel_id in k):
            matched_channel = k
            break

    # 🚪 第一道门:只处理订阅频道的自动拦截与质检
    if matched_channel:
        channel_info = config["trusted_channels"][matched_channel]
        media = message.video or message.document
        
        # 🔥 将当前无字视频,强行与主频道文案缝合!
        text_to_scan = f"{message.caption or ''} {getattr(media, 'file_name', '')} {parent_text}"

        # 注意这里从字典取出的是 drama_key
        for drama_key, route_info in channel_info.get("monitored_dramas", {}).items():
            # 🔥 优先使用真实的搜索词监听,兼容旧数据
            search_kw = route_info.get("search_kw", drama_key)
            version_suffix = route_info.get("version", "")
            
            if search_kw.lower() in text_to_scan.lower():
                ep_num = extract_pure_episode(text_to_scan, drama_anchor=search_kw)
                if ep_num is not None and ep_num < route_info.get("min_ep", 1): return 
                
                min_mb = route_info.get("min_mb", 0)
                max_mb = route_info.get("max_mb", 999999)
                file_size_mb = media.file_size / (1024 * 1024) if getattr(media, "file_size", 0) else 0
                source_name = message.forward_from_chat.title if message.forward_from_chat and message.forward_from_chat.title else (message.chat.title if message.chat and message.chat.title else "未知来源")
                
                if not (min_mb <= file_size_mb <= max_mb):
                    asyncio.create_task(notify_steward_log(f"🚫 [画质拦截] {search_kw} E{ep_num} ({version_suffix or '默认'}) | 大小: {file_size_mb:.1f}MB (要求: {min_mb}-{max_mb}MB)", level="WARNING"))
                    return 
                
                folder_season = route_info.get("folder_season", 1)
                file_season = route_info.get("file_season", folder_season)
                year = route_info.get("year")
                if not year: year = await fetch_tmdb_year(search_kw)
                
                # 🔥 文件夹物理隔离:将夜 (2026) HDR
                folder_name = f"{search_kw} ({year})" if year else search_kw
                if version_suffix: 
                    folder_name = f"{folder_name} {version_suffix}"  # 👈 这里!中间只留一个空格
                
                # 🔥 标准账本核对:剧名.S01E01.HQ
                ver_tag = f".{version_suffix}" if version_suffix else ""
                if ep_num is not None and f"{search_kw}.S{file_season:02d}E{ep_num:02d}{ver_tag}" in load_history(): 
                    return
                
                override_info = (folder_name, route_info["category"], folder_season, file_season, year, ep_num, version_suffix)
                try:
                    status = await client.send_message(COMMAND_CENTER_CHAT, f"🎯 **[实时发车]**\n📺 `{search_kw}` ({version_suffix or '默认'}) ➔ S{file_season:02d}E{ep_num:02d}")
                except Exception:
                    status = await client.send_message("me", f"🎯 **[备用发车]**\n📺 `{search_kw}` ({version_suffix or '默认'}) ➔ S{file_season:02d}E{ep_num:02d}")
                await process_media_transfer(client, message, status, override_info)
                return
        return

    # 🚪 第二道门:纯手动 /go 引流通道 
    # 🔥 修复重点:兼容 ChatType.BOT 和 ChatType.PRIVATE,杜绝转发给机器人时假死!
    if message.chat and message.chat.type in [enums.ChatType.PRIVATE, enums.ChatType.BOT]:
        if time.time() > GLOBAL_ROUTE_CACHE["expire_time"] or not GLOBAL_ROUTE_CACHE["folder_name"]:
            return
        status = await message.reply_text("⚡ 转发航线认证通过,正向引流拉取...")
        await process_media_transfer(client, message, status)

# =================================================================
# 🚀 引擎启动 
# =================================================================
if __name__ == "__main__":
    clean_orphan_temp_files(max_age_hours=24) 
    load_listener_config()
    
    async def start_system():
        await app.start()
        asyncio.create_task(startup_catchup_sweep(app))
        asyncio.create_task(smart_patrol_daemon(app))
        try:
            await app.send_message(COMMAND_CENTER_CHAT, "🤖 TG下载189上传引擎启动啦!")
            await notify_steward_log("✅ TG下载189上传引擎启动啦!")
        except Exception: pass
        await idle()
        await app.stop()
        
    app.run(start_system())

2.登陆帐号 在前台手动点火,直接用 Python 原生命令把代码跑起来,让它把输出打在你的屏幕上:

python3 ~/189py/autotg.py

回车之后,盯着屏幕,你会看到 Pyrogram 极度硬核的认证提示依次弹出:

Enter phone number or bot token: 👉 既然注释了 token,直接输入你的 Telegram 注册手机号,必须带国际区号!比如中国大陆就输入 +8613800138000,然后回车。

Is “+86 138 0013 8000” correct? (y/N): 👉 输入 y 回车确认。

Enter confirmation code: 👉 此时,你的 Telegram 官方客户端(手机或电脑版)会收到一条系统发来的 5 位数验证码。在 Termux 里敲进去并回车。

Enter password (hint: ***): 👉 (注意:只有你开了两步验证密码才会有这一步)。如果有,输入你的两步验证密码(注意:Linux 终端输入密码时屏幕上不会显示任何字符,这是正常的防偷窥保护,凭感觉盲打完直接回车即可)。

一旦认证成功,屏幕上会刷出一堆日志,最后显示: 🤖 工业级全栖搬运中枢 (V15 三维雷达全自动版) 满血上线! 并且你的目录里会自动生成一个极其珍贵的授权凭证文件:tg_robust_leecher_v15.session。

有了这个文件,脚本以后就彻底记住了你是谁,再也不用输密码了。

现在,按下键盘上的 Ctrl + C,强行把前台运行的脚本关掉。

3.指令说明:

📖 V15.10 核心遥控指令清单 📡 一、 雷达全自动阵列(适用于未来更新的剧集) /ping ➔ 💓 查看系统是否活着、存活时间、雷达正在盯防多少部剧。

/list ➔ 📋 查看当前所有雷达监控大名单。

/add [频道ID] [别名] ➔ 🏢 把一个发资源的群加入信任大名单。

(例:/add_channel -1001234567 顶级原盘群)

/del [频道ID] ➔ 🗑️ 把某个群从大名单里踢出去。

/sub [分类] [剧名] [季数] [起步集] ➔ 🎯 部署雷达!(分类和剧名谁在前都可以,闭眼发)。

完整语法:/sub [剧名关键字] [分类] [文件夹季数] [起步集数] [可选: 文件大小范围] [可选: 文件名季数] [可选: 剧名年份] [可选: v=版本号] [可选: 频道ID]

(例:/sub 国漫 凡人修仙传 1 15 ➔ 从第15集开始死盯凡人修仙传)

/unsub [剧名] ➔ ⛔ 剧追完了?取消雷达监控。

(例:/unsub 凡人修仙传)

🎫 二、 手工霸王发车(适用于补以前的旧视频) /history ➔ 📂 查看曾经发过车的历史航线记录(方便你想复用)。

/go [关键字] ➔ 🚀 秒开旧车! 模糊搜索历史记录,一秒复用。

(例:发 /go 卧底,自动匹配《宗门里除了我都是卧底》,直接发车)

/go [分类] [新剧名] ➔ 🆕 秒开新车! 历史里没有的剧,直接强行建档。

(例:/go 日漫 咒术回战)

#E[数字] ➔ 🔢 强行纠正/覆写下一集的集数。

完整语法:/go [剧名关键字] [分类] [文件夹季数] [可选: 文件名季数]

(例:视频名字太乱提取不出集数,你直接发 #E12,下一集强制按 12 集命名)

🧹 三、 清理与维护(本次为你紧急加装!) /rm [剧名关键字] ➔ 💥 删除历史航线! 把不再需要的死档从 /history 里永久抹除。

(例:/rm 卧底 ➔ 瞬间清理账本,保持后台极度干净) /rmh [剧名关键字] ➔ 删除下载历史记录 /clean 清除下载碎片

/scan 拉取订阅下载(+剧名可直接拉取单剧)

终极备忘录:全指令参数详解图鉴
你可以把这段保存在你的记事本里,随时查阅:

/sub - ➕ 自动订阅 (可直接回复视频抓取频道)

完整语法:/sub [剧名] [分类] [最小MB-最大MB] [频率:日更/周一~周日] [可选:年份] [可选:v=版本号] [可选:文件夹季数] [可选:S文件名季数]

实战举例:/sub 将夜 国漫 1500-3000 周四 2026 v=HDR 1 S2

快捷玩法:直接转发目标频道的视频,对它点击【回复】,然后输入上述参数(此时无需手填频道ID,机器自动抓取建档)。

/unsub - 🗑️ 取消订阅 (全网通杀或精准单杀)

完整语法:/unsub [剧名] [可选:v=版本号] [可选:-100开头的频道ID]

实战举例:/unsub 将夜 v=HDR (只取消所有频道的HDR版本)

实战举例:/unsub 将夜 -1001234567 (只踢掉某个发垃圾画质的频道)

/go - 🚀 手动发车 (开辟航线并锁死物理文件夹)

完整语法:/go [剧名关键字] [可选:分类] [可选:年份] [可选:v=版本号] [可选:文件夹季数] [可选:S文件名季数]

实战举例:/go 绝命毒师 欧美剧 2018 v=4K 1 (将后续文件死锁进 Season 1 文件夹)

配合遥控:发车后,发送 #S2E8 (只改变文件的刮削名字为第二季第八集,绝不改变刚才锁死的 Season 1 文件夹)。

/list - 📡 查看雷达大盘排班与状态

语法:直接发送 /list 即可,无参数。

/scan - 🔍 强行触发一次全网补漏扫荡

语法:直接发送 /scan 即可,无参数。

/help - 📖 查看随身说明书与指令语法

语法:直接发送 /help 或 /h 即可。

五、机器人指令菜单

1.@BotFather

2.在对话框里发送指令:/mybots

3.屏幕上会弹出你创建过的机器人列表,点击咱们正在用的这个机器人的名字

4.接着点击弹出面板上的 Edit Bot

5.然后点击 Edit Commands

6.把这套菜单直接喂给它

sub - ➕ 自动订阅 (可直接回复视频抓取频道)
unsub - 🗑️ 取消订阅 (全网通杀或精准单杀)
go - 🚀 手动发车 (开辟航线并锁死文件夹)
list - 📡 查看雷达大盘排班与状态
scan - 🔍 强行触发一次全网补漏扫荡
history - 📋 查看曾经发过车的历史航线记录
rm - ❌ 删除历史航线! 把不再需要的死档从history 里永久抹除
rmh - ❎ 删除下载历史记录
clean - ⭕️ 清除下载碎片
setdir - ❇️ 设置上传目录
ping - 💓 查看系统是否活着、存活时间、雷达正在盯防多少部剧
help - 📖 查看随身说明书与指令语法

也可以看看