diff --git a/.gitignore b/.gitignore index cf653b3..e593c84 100644 --- a/.gitignore +++ b/.gitignore @@ -17,13 +17,8 @@ build/ venv/ env/ -# Runtime outputs (keep result-template examples) -data/result-*/ -!data/result-template1/ -!data/result-template2/ - -# Runtime cache and config -data/cache/ +# Runtime outputs and cache +data/ config.json .superpowers/ diff --git a/citationclaw/app/main.py b/citationclaw/app/main.py index a19e869..665f212 100644 --- a/citationclaw/app/main.py +++ b/citationclaw/app/main.py @@ -530,6 +530,78 @@ async def cancel_task(): return {"status": "success", "message": "任务取消中..."} +@app.post("/api/task/resume") +async def resume_task(): + """恢复被取消的任务""" + if task_executor.is_running: + return JSONResponse(status_code=400, + content={"status": "error", "message": "任务正在运行中"}) + if not task_executor._resume_state: + return JSONResponse(status_code=400, + content={"status": "error", "message": "没有可恢复的任务"}) + + resume = task_executor._resume_state + config = config_manager.get() + task_executor.current_task = asyncio.create_task( + task_executor.execute_for_titles( + paper_groups=resume["paper_groups"], + config=config, + output_prefix=resume["output_prefix"], + _resume=resume, + ) + ) + return { + "status": "success", + "message": f"已恢复,已完成的阶段将自动跳过" + } + + +class ResumeFolderRequest(BaseModel): + folder: str # e.g. "result-20260317_155742" + + +@app.post("/api/task/resume-from-folder") +async def resume_from_folder(request: ResumeFolderRequest): + """从结果文件夹恢复任务""" + import json as _json + + if task_executor.is_running: + return JSONResponse(status_code=400, + content={"status": "error", "message": "任务正在运行中"}) + + result_dir = Path("data") / request.folder + if not result_dir.is_dir(): + return JSONResponse(status_code=400, + content={"status": "error", "message": f"文件夹不存在: {request.folder}"}) + + meta_file = result_dir / "_task_meta.json" + if not meta_file.exists(): + return JSONResponse(status_code=400, + content={"status": "error", "message": "该文件夹缺少任务元数据(_task_meta.json),无法恢复"}) + + try: + meta = _json.loads(meta_file.read_text(encoding="utf-8")) + except Exception as e: + return JSONResponse(status_code=400, + content={"status": "error", "message": f"读取元数据失败: {e}"}) + + config = config_manager.get() + resume = { + "paper_groups": meta["paper_groups"], + "output_prefix": meta["output_prefix"], + "result_dir": str(result_dir), + } + task_executor.current_task = asyncio.create_task( + task_executor.execute_for_titles( + paper_groups=resume["paper_groups"], + config=config, + output_prefix=resume["output_prefix"], + _resume=resume, + ) + ) + return {"status": "success", "message": f"已从 {request.folder} 恢复任务,已完成的阶段将自动跳过"} + + @app.post("/api/task/year-traverse-respond") async def year_traverse_respond(request: YearTraverseResponse): """接收用户对年份遍历提示的响应""" @@ -633,6 +705,7 @@ async def list_result_folders(): "file_count": len(files), "modified": max((f.stat().st_mtime for f in files), default=sub.stat().st_mtime), "size": sum(f.stat().st_size for f in files), + "has_meta": (sub / "_task_meta.json").exists(), }) # 旧版扁平目录 diff --git a/citationclaw/app/task_executor.py b/citationclaw/app/task_executor.py index 4d9bf8b..8f0cf9f 100644 --- a/citationclaw/app/task_executor.py +++ b/citationclaw/app/task_executor.py @@ -31,6 +31,9 @@ def __init__(self, log_manager: LogManager): self._year_traverse_prompted: bool = False # 本次运行已提示过,不再重复 self.skills_runtime = SkillsRuntime() + # 取消后可恢复的上下文 + self._resume_state: Optional[dict] = None + async def _run_skill(self, skill_name: str, config: AppConfig, **kwargs): """Execute one pipeline skill with shared runtime context.""" return await self.skills_runtime.run( @@ -133,6 +136,8 @@ async def execute_full_pipeline( self.log_manager.success(f"JSON文件: {json_file}") self.log_manager.success("=" * 50) + except asyncio.CancelledError: + self.log_manager.warning("任务已取消") except Exception as e: self.log_manager.error(f"任务执行错误: {str(e)}") import traceback @@ -140,6 +145,7 @@ async def execute_full_pipeline( raise finally: self.is_running = False + self.should_cancel = False async def execute_stage1_scraping( self, @@ -208,6 +214,8 @@ async def execute_stage1_scraping( } }) + except asyncio.CancelledError: + self.log_manager.warning("任务已取消") except Exception as e: self.log_manager.error(f"阶段1执行错误: {str(e)}") import traceback @@ -215,6 +223,7 @@ async def execute_stage1_scraping( raise finally: self.is_running = False + self.should_cancel = False async def import_history(self, file_path: Path, config: AppConfig) -> dict: """ @@ -337,6 +346,8 @@ async def execute_stage2_and_3(self): # 清空阶段1结果 self.stage1_result = None + except asyncio.CancelledError: + self.log_manager.warning("任务已取消") except Exception as e: self.log_manager.error(f"阶段2/3执行错误: {str(e)}") import traceback @@ -344,12 +355,14 @@ async def execute_stage2_and_3(self): raise finally: self.is_running = False + self.should_cancel = False async def execute_for_titles( self, paper_groups: List[dict], config: AppConfig, output_prefix: str, + _resume: Optional[dict] = None, ): """ 全自动多论文流水线: @@ -363,6 +376,7 @@ async def execute_for_titles( self.is_running = True self.should_cancel = False + self._resume_state = None self._year_traverse_event = None self._year_traverse_choice = False self._year_traverse_prompted = False @@ -372,10 +386,25 @@ async def execute_for_titles( cost_tracker = CostTracker() try: - timestamp = datetime.now().strftime("%Y%m%d_%H%M%S") - result_dir = Path(f"data/result-{timestamp}") - result_dir.mkdir(parents=True, exist_ok=True) - self.log_manager.info(f"📁 结果目录: {result_dir}") + # 恢复模式:复用之前的结果目录,靠文件存在性自动跳过已完成阶段 + if _resume: + result_dir = Path(_resume["result_dir"]) + self.log_manager.info(f"🔄 恢复任务,结果目录: {result_dir}(已完成的阶段将自动跳过)") + else: + timestamp = datetime.now().strftime("%Y%m%d_%H%M%S") + result_dir = Path(f"data/result-{timestamp}") + result_dir.mkdir(parents=True, exist_ok=True) + self.log_manager.info(f"📁 结果目录: {result_dir}") + author_info_files = [] + citing_files = [] + + # 保存任务元数据(用于从结果文件夹恢复) + meta_file = result_dir / "_task_meta.json" + if not meta_file.exists(): + meta_file.write_text(_json.dumps({ + "paper_groups": paper_groups, + "output_prefix": output_prefix, + }, ensure_ascii=False, indent=2), encoding="utf-8") # 运行前快照 LLM 额度 self.log_manager.info(f"📊 [DEBUG] api_access_token={repr(config.api_access_token[:8] + '...' if config.api_access_token else '')}, api_user_id={repr(config.api_user_id)}") @@ -401,8 +430,6 @@ async def execute_for_titles( alias_to_canonical[alias] = canonical all_search_titles.append((alias, canonical)) - author_info_files = [] # 收集每篇/每别名的 Phase 2 输出 - citing_files = [] # 收集每篇/每别名的 Phase 1 输出 total = len(all_search_titles) # 目标论文作者缓存(每个 canonical 只查询一次 LLM,存完整文本) @@ -464,97 +491,106 @@ async def execute_for_titles( continue paper_slug = f"paper{i+1}" + citing_file = result_dir / f"{paper_slug}_citing.jsonl" + author_file = result_dir / f"{paper_slug}_authors.jsonl" - # —— Phase 1:爬取引用列表 —— - self.log_manager.info("▶ Phase 1: 爬取引用列表") + # —— Phase 1:爬取引用列表(已有输出文件则跳过)—— + if citing_file.exists() and citing_file.stat().st_size > 0: + self.log_manager.info(f"⏩ Phase 1 已完成,跳过: {citing_file.name}") + else: + self.log_manager.info("▶ Phase 1: 爬取引用列表") + + # —— 预检测引用数,超1000时询问是否开启年份遍历 —— + if not config.enable_year_traverse and not self._year_traverse_prompted: + probe_data = await self._run_skill( + "phase1_citation_fetch", + config, + url=url, + probe_only=True, + cost_tracker=cost_tracker, + ) + citation_count = int(probe_data.get("citation_count", 0)) + if citation_count > 1000: + self._year_traverse_prompted = True + self._year_traverse_event = asyncio.Event() + self.log_manager.broadcast_event("year_traverse_prompt", { + "title": title, + "citation_count": citation_count, + }) + self.log_manager.warning( + f"⚠️ 论文「{title}」检测到 {citation_count} 篇引用(超过 Google Scholar 1000 条限制)" + ) + self.log_manager.warning( + "⏸ 已暂停,等待用户选择是否启用按年份遍历(最多等待 60 秒)..." + ) + try: + await asyncio.wait_for(self._year_traverse_event.wait(), timeout=60.0) + if self._year_traverse_choice: + config = config.model_copy(update={"enable_year_traverse": True}) + self.log_manager.info("✅ 已启用按年份遍历,将逐年抓取完整数据") + else: + self.log_manager.info("▶ 已跳过,继续普通模式(可能只抓取前 1000 条)") + except asyncio.TimeoutError: + self.log_manager.warning("⏰ 等待超时(60s),以普通模式继续") + finally: + self._year_traverse_event = None - # —— 预检测引用数,超1000时询问是否开启年份遍历 —— - if not config.enable_year_traverse and not self._year_traverse_prompted: - probe_data = await self._run_skill( + await self._run_skill( "phase1_citation_fetch", config, url=url, - probe_only=True, + output_file=citing_file, + start_page=0, + sleep_seconds=config.sleep_between_pages, + enable_year_traverse=config.enable_year_traverse, cost_tracker=cost_tracker, ) - citation_count = int(probe_data.get("citation_count", 0)) - if citation_count > 1000: - self._year_traverse_prompted = True - self._year_traverse_event = asyncio.Event() - self.log_manager.broadcast_event("year_traverse_prompt", { - "title": title, - "citation_count": citation_count, - }) - self.log_manager.warning( - f"⚠️ 论文「{title}」检测到 {citation_count} 篇引用(超过 Google Scholar 1000 条限制)" - ) - self.log_manager.warning( - "⏸ 已暂停,等待用户选择是否启用按年份遍历(最多等待 60 秒)..." - ) - try: - await asyncio.wait_for(self._year_traverse_event.wait(), timeout=60.0) - if self._year_traverse_choice: - config = config.model_copy(update={"enable_year_traverse": True}) - self.log_manager.info("✅ 已启用按年份遍历,将逐年抓取完整数据") - else: - self.log_manager.info("▶ 已跳过,继续普通模式(可能只抓取前 1000 条)") - except asyncio.TimeoutError: - self.log_manager.warning("⏰ 等待超时(60s),以普通模式继续") - finally: - self._year_traverse_event = None + if self.should_cancel: + break - citing_file = result_dir / f"{paper_slug}_citing.jsonl" - await self._run_skill( - "phase1_citation_fetch", - config, - url=url, - output_file=citing_file, - start_page=0, - sleep_seconds=config.sleep_between_pages, - enable_year_traverse=config.enable_year_traverse, - cost_tracker=cost_tracker, - ) - if self.should_cancel: - break citing_files.append((citing_file, canonical)) if not config.skip_author_search: - # —— 获取目标论文作者(用于自引检测,每个 canonical 只查一次)—— - if canonical not in target_authors_cache: - need_self_filter = ( - config.enable_renowned_scholar_filter or config.enable_citing_description + # —— Phase 2:搜索作者信息(已有输出文件则跳过)—— + if author_file.exists() and author_file.stat().st_size > 0: + self.log_manager.info(f"⏩ Phase 2 已完成,跳过: {author_file.name}") + author_info_files.append(author_file) + else: + # —— 获取目标论文作者(用于自引检测,每个 canonical 只查一次)—— + if canonical not in target_authors_cache: + need_self_filter = ( + config.enable_renowned_scholar_filter or config.enable_citing_description + ) + if not config.test_mode and need_self_filter: + self.log_manager.info("🔍 自引检测:正在获取目标论文作者...") + target_authors_cache[canonical] = await self._fetch_target_authors(canonical, config) + else: + target_authors_cache[canonical] = "" + target_authors = target_authors_cache[canonical] + + self.log_manager.info("▶ Phase 2: 搜索作者信息") + await self._run_skill( + "phase2_author_intel", + config, + input_file=citing_file, + output_file=author_file, + sleep_seconds=config.sleep_between_authors, + parallel_workers=config.parallel_author_search, + citing_paper=canonical, # 始终用正式标题写入 Citing_Paper + target_paper_authors=target_authors, + author_cache=author_cache, + quota_event=self.quota_exceeded_event, ) - if not config.test_mode and need_self_filter: - self.log_manager.info("🔍 自引检测:正在获取目标论文作者...") - target_authors_cache[canonical] = await self._fetch_target_authors(canonical, config) - else: - target_authors_cache[canonical] = "" - target_authors = target_authors_cache[canonical] - - # —— Phase 2:搜索作者信息(以 canonical 为 Citing_Paper 值)—— - self.log_manager.info("▶ Phase 2: 搜索作者信息") - author_file = result_dir / f"{paper_slug}_authors.jsonl" - await self._run_skill( - "phase2_author_intel", - config, - input_file=citing_file, - output_file=author_file, - sleep_seconds=config.sleep_between_authors, - parallel_workers=config.parallel_author_search, - citing_paper=canonical, # 始终用正式标题写入 Citing_Paper - target_paper_authors=target_authors, - author_cache=author_cache, - quota_event=self.quota_exceeded_event, - ) - if self.should_cancel: - break - if self.quota_exceeded_event.is_set(): - self._handle_quota_exceeded() - return - author_info_files.append(author_file) + if self.should_cancel: + break + if self.quota_exceeded_event.is_set(): + self._handle_quota_exceeded() + return + author_info_files.append(author_file) else: self.log_manager.info("⏭ 跳过 Phase 2(skip_author_search=True)") + # Guard: 检查 Phase 1 是否爬取到任何引用文献 _total_citing = 0 for _cf, _ in citing_files: @@ -878,12 +914,38 @@ def _fwd(p: Path) -> str: "cost_summary": cost_summary, }}) + except asyncio.CancelledError: + self._save_resume_state(paper_groups, output_prefix, result_dir, total) except Exception as e: - self.log_manager.error(f"任务错误: {e}") - import traceback; self.log_manager.error(traceback.format_exc()) - raise + if self.should_cancel: + self._save_resume_state(paper_groups, output_prefix, result_dir, total) + else: + self.log_manager.error(f"任务错误: {e}") + import traceback; self.log_manager.error(traceback.format_exc()) + raise finally: self.is_running = False + self.should_cancel = False + + def _save_resume_state(self, paper_groups, output_prefix, result_dir, total): + """取消后保存恢复状态(恢复时靠文件存在性自动跳过已完成阶段)""" + # 统计已完成的论文数(Phase 1 输出文件已存在) + done = sum( + 1 for i in range(total) + if (result_dir / f"paper{i+1}_citing.jsonl").exists() + ) + self._resume_state = { + "paper_groups": paper_groups, + "output_prefix": output_prefix, + "result_dir": str(result_dir), + } + self.log_manager.warning( + f"⏸ 任务已取消({done}/{total} 篇已完成),可点击「恢复」继续" + ) + self.log_manager.broadcast_event("task_cancelled_resumable", { + "done": done, + "total": total, + }) def _handle_quota_exceeded(self): """Called when any phase signals that API quota is exhausted.""" @@ -972,6 +1034,12 @@ def cancel(self): if self.is_running: self.should_cancel = True self.log_manager.warning("正在取消任务...") + # 强制取消底层 asyncio Task,防止阻塞在 await 上导致 is_running 永远为 True + if self.current_task and not self.current_task.done(): + self.current_task.cancel() + else: + # 安全兜底:如果 is_running 已经是 False 但状态残留,重置 + self.should_cancel = False def get_status(self) -> dict: """ @@ -983,5 +1051,6 @@ def get_status(self) -> dict: return { "is_running": self.is_running, "should_cancel": self.should_cancel, - "has_stage1_result": self.stage1_result is not None + "has_stage1_result": self.stage1_result is not None, + "can_resume": self._resume_state is not None } diff --git a/citationclaw/static/js/main.js b/citationclaw/static/js/main.js index 5f5c286..06ce051 100644 --- a/citationclaw/static/js/main.js +++ b/citationclaw/static/js/main.js @@ -385,6 +385,9 @@ async function resultsShowFolders() { ${folder.file_count} 个文件  ·  ${sizeMB} MB  ·  ${date} + ${folder.has_meta ? `` : ''} @@ -393,6 +396,36 @@ async function resultsShowFolders() { const el = e.currentTarget; resultsOpenFolder(el.dataset.folder, el.dataset.display); }); + const resumeFolderBtn = item.querySelector('[data-resume-folder]'); + if (resumeFolderBtn) { + resumeFolderBtn.addEventListener('click', async (e) => { + e.stopPropagation(); + const folderName = e.currentTarget.dataset.resumeFolder; + if (!confirm(`确定要从 "${folderName}" 恢复任务吗?已完成的阶段将自动跳过。`)) return; + try { + await saveIndexConfig(); + const r = await fetch('/api/task/resume-from-folder', { + method: 'POST', + headers: { 'Content-Type': 'application/json' }, + body: JSON.stringify({ folder: folderName }) + }); + const data = await r.json(); + if (data.status === 'success') { + // 切换到 Home 面板显示运行状态 + document.querySelector('[data-spa-panel="home"]').click(); + const runBtn = document.getElementById('idx-run-btn'); + runBtn.disabled = true; + runBtn.innerHTML = '  运行中...'; + document.getElementById('idx-cancel-btn').style.display = 'inline-flex'; + document.getElementById('idx-resume-btn').style.display = 'none'; + } else { + alert('恢复失败: ' + data.message); + } + } catch (err) { + alert('恢复失败:' + err.message); + } + }); + } item.querySelector('[data-delete]').addEventListener('click', async (e) => { e.stopPropagation(); const name = e.currentTarget.dataset.delete; @@ -550,6 +583,38 @@ function initIndexPage() { }; let currentPhase = '处理中...'; + // Search Model: select + 自定义输入联动 + const _modelSelect = document.getElementById('idx-openai-model-select'); + const _modelCustom = document.getElementById('idx-openai-model-custom'); + const _modelHidden = document.getElementById('idx-openai-model'); + + function _syncModelHidden() { + if (_modelSelect.value === '__custom__') { + _modelCustom.style.display = ''; + _modelHidden.value = _modelCustom.value; + } else { + _modelCustom.style.display = 'none'; + _modelHidden.value = _modelSelect.value; + } + } + function _setModelFromValue(val) { + const opt = [..._modelSelect.options].find(o => o.value === val && o.value !== '__custom__'); + if (opt) { + _modelSelect.value = val; + _modelCustom.style.display = 'none'; + } else { + _modelSelect.value = '__custom__'; + _modelCustom.style.display = ''; + _modelCustom.value = val; + } + _modelHidden.value = val; + } + _modelSelect.addEventListener('change', () => { + if (_modelSelect.value === '__custom__') _modelCustom.focus(); + _syncModelHidden(); + }); + _modelCustom.addEventListener('input', _syncModelHidden); + // 加载配置并填充 Home 面板表单 (async () => { try { @@ -560,7 +625,7 @@ function initIndexPage() { el('idx-openai-key').value = cfg.openai_api_key || ''; _syncApiKeyType(el('idx-openai-key')); el('idx-openai-url').value = cfg.openai_base_url || ''; - el('idx-openai-model').value = cfg.openai_model || ''; + _setModelFromValue(cfg.openai_model || 'gemini-3-flash-preview-search'); el('idx-output-prefix').value = cfg.default_output_prefix || 'paper'; el('idx-renowned-scholar').checked = cfg.enable_renowned_scholar_filter !== false; el('idx-author-verify').checked = cfg.enable_author_verification || false; @@ -695,6 +760,10 @@ function initIndexPage() { }; }); + ws.on('task_cancelled_resumable', data => { + document.getElementById('idx-resume-btn').style.display = 'inline-flex'; + }); + ws.on('quota_exceeded', data => { const msgEl = document.getElementById('quota-exceeded-message'); if (msgEl && data.message) { @@ -748,6 +817,7 @@ function initIndexPage() { runBtn.innerHTML = '  运行中...'; document.getElementById('idx-cancel-btn').style.display = 'inline-flex'; + document.getElementById('idx-resume-btn').style.display = 'none'; document.getElementById('idx-progress-section').style.display = 'block'; document.getElementById('idx-log-section').style.display = 'block'; document.getElementById('idx-results-section').style.display = 'none'; @@ -807,6 +877,30 @@ function initIndexPage() { GlobalProgress.hide(); }); + // 恢复按钮 + document.getElementById('idx-resume-btn').addEventListener('click', async () => { + // 先保存当前 UI 配置(用户可能改了模型等参数) + await saveIndexConfig(); + const resumeBtn = document.getElementById('idx-resume-btn'); + resumeBtn.style.display = 'none'; + runBtn.disabled = true; + runBtn.innerHTML = '  运行中...'; + document.getElementById('idx-cancel-btn').style.display = 'inline-flex'; + startRunTimer(); + try { + const resp = await fetch('/api/task/resume', { method: 'POST' }); + const data = await resp.json(); + if (data.status !== 'success') { + alert('恢复失败: ' + data.message); + resetRunBtn(); + } + } catch (e) { + console.error('恢复失败:', e); + alert('恢复失败,请检查控制台'); + resetRunBtn(); + } + }); + // 清空日志 document.getElementById('idx-clear-log-btn').addEventListener('click', () => { document.getElementById('idx-log-container').innerHTML = diff --git a/citationclaw/templates/index.html b/citationclaw/templates/index.html index 9aa32c0..c436e81 100644 --- a/citationclaw/templates/index.html +++ b/citationclaw/templates/index.html @@ -156,13 +156,16 @@

论文被引画像分析

- + + +
@@ -210,6 +213,9 @@

论文被引画像分析

+