diff --git a/.gitignore b/.gitignore index 0f34269d..879e31ce 100644 --- a/.gitignore +++ b/.gitignore @@ -64,3 +64,9 @@ run_indexer_with_filtering.py # Cline files memory-bank/ +/deepcode_lab +/logs +mcp_agent.config.yaml +mcp_agent.secrets.yaml +mcp_agent.config.yaml +mcp_agent.secrets.yaml diff --git a/CHANGELOG.md b/CHANGELOG.md new file mode 100644 index 00000000..89c570ba --- /dev/null +++ b/CHANGELOG.md @@ -0,0 +1,74 @@ +# Changelog + +All notable changes to DeepCode will be documented in this file. + +## [1.0.6-jm] - 2025-10-19 + +### Added +- **Dynamic Model Limit Detection**: New `utils/model_limits.py` module that automatically detects and adapts to any LLM model's token limits and pricing +- **Loop Detection System**: `utils/loop_detector.py` prevents infinite loops by detecting repeated tool calls, timeouts, and progress stalls +- **Progress Tracking**: 8-phase progress tracking (5% → 100%) with file-level progress indicators in both UI and terminal +- **Abort Mechanism**: "Stop Processing" button in UI with global abort flag for clean process termination +- **Cache Cleanup Scripts**: `start_clean.bat` and `start_clean.ps1` to clear Python cache before starting +- **Enhanced Error Display**: Real-time error messages in both UI and terminal with timestamps +- **File Progress Tracking**: Shows files completed/total with estimated time remaining + +### Fixed +- **Critical: False Error Detection**: Fixed overly aggressive error detection that was marking successful operations as failures, causing premature abort and empty file generation +- **Critical: Empty File Generation**: Files now contain actual code instead of being empty (2-byte files) +- **Unique Folder Naming**: Each project run now creates `paper_{timestamp}` folders instead of reusing `pdf_output` +- **PDF Save Location**: PDFs now save to `deepcode_lab/papers/` instead of system temp directory +- **Duplicate Folder Prevention**: Added session state caching to prevent duplicate folder creation on UI reruns +- **Token Limit Compliance**: Fixed `max_tokens` to respect model limits dynamically (e.g., gpt-4o-mini's 16,384 token limit) +- **Empty Plan Detection**: System now fails early with clear error messages when initial plan is empty or invalid +- **Process Hanging**: Fixed infinite loops and hanging on errors - process now exits cleanly +- **Token Cost Tracking**: Restored accurate token usage and cost display (was showing $0.0000) +- **PDF to Markdown Conversion**: Fixed automatic conversion and file location handling +- **Document Segmentation**: Properly uses configured 50K character threshold from `mcp_agent.config.yaml` +- **Error Propagation**: Abort mechanism now properly stops process after 10 consecutive real errors + +### Changed +- **Model-Aware Token Management**: Token limits now adapt automatically based on configured model instead of hardcoded values +- **Cost Calculation**: Dynamic pricing based on actual model rates (OpenAI, Anthropic) +- **Retry Logic**: Token limits for retries now respect model maximum (87.5% → 95% → 98% of max) +- **Segmentation Workflow**: Better integration with code implementation phase +- **Error Handling**: Enhanced error propagation - errors no longer reported as "success" +- **UI Display**: Shows project folder name after PDF conversion for better visibility +- **Terminal Logging**: Added timestamps to all progress messages + +### Technical Improvements +- Added document-segmentation server to code implementation workflow for better token management +- Improved error handling in agent orchestration engine with proper cleanup +- Enhanced subprocess handling on Windows (hide console windows, prevent hanging) +- Better LibreOffice detection on Windows using direct path checking +- Fixed input data format consistency (JSON with `paper_path` key) +- Added comprehensive logging throughout the pipeline +- Improved resource cleanup on errors and process termination + +### Documentation +- Translated Chinese comments to English in core workflow files +- Added inline documentation for new utility modules +- Created startup scripts with clear usage instructions + +### Breaking Changes +- None - all changes are backward compatible + +### Known Issues +- Terminal may show trailing "Calling Tool..." line after completion (cosmetic display artifact - process completes successfully) +- Some Chinese comments remain in non-critical files (cli, tools) - translation in progress +- tiktoken package optional warning (doesn't affect functionality) + +### Success Metrics +- ✅ Complete end-to-end workflow: DOCX upload → PDF conversion → Markdown → Segmentation → Planning → Code generation +- ✅ Files generated with actual code content (15+ files with proper implementation) +- ✅ Single folder per project run (no duplicates) +- ✅ Dynamic token management working across different models +- ✅ Accurate cost tracking per model +- ✅ Clean process termination with proper error handling + +--- + +## [1.0.5] - Previous Release + +See previous releases for earlier changes. + diff --git a/__init__.py b/__init__.py index 680cae06..0192dc82 100644 --- a/__init__.py +++ b/__init__.py @@ -5,9 +5,10 @@ ⚡ Transform research papers into working code automatically """ -__version__ = "1.0.5" -__author__ = "DeepCode Team" +__version__ = "1.0.6-jm" +__author__ = "DeepCode Team, Jany Martelli" __url__ = "https://github.com/HKUDS/DeepCode" +__repo__ = "https://github.com/Jany-M/DeepCode/" # Import main components for easy access from utils import FileProcessor, DialogueLogger diff --git a/deepcode.py b/deepcode.py index 9a300c54..4df70b33 100755 --- a/deepcode.py +++ b/deepcode.py @@ -50,6 +50,7 @@ def check_dependencies(): try: import subprocess import platform + import os subprocess_kwargs = { "capture_output": True, @@ -58,25 +59,56 @@ def check_dependencies(): } if platform.system() == "Windows": - subprocess_kwargs["creationflags"] = 0x08000000 # Hide console window + subprocess_kwargs["creationflags"] = 0x08000000 + # Also configure startupinfo to hide window + startupinfo = subprocess.STARTUPINFO() + startupinfo.dwFlags |= subprocess.STARTF_USESHOWWINDOW + startupinfo.wShowWindow = subprocess.SW_HIDE + subprocess_kwargs["startupinfo"] = startupinfo # Try different LibreOffice commands libreoffice_found = False - for cmd in ["libreoffice", "soffice"]: - try: - result = subprocess.run([cmd, "--version"], **subprocess_kwargs) - if result.returncode == 0: - print( - "✅ LibreOffice is installed (for Office document conversion)" - ) + + # On Windows, try standard installation paths first + # Just check if file exists to avoid window/hanging issues + if platform.system() == "Windows": + possible_paths = [ + r"C:\Program Files\LibreOffice\program\soffice.exe", + r"C:\Program Files (x86)\LibreOffice\program\soffice.exe", + ] + + # Also check PROGRAMFILES environment variables + program_files = os.environ.get("PROGRAMFILES") + program_files_x86 = os.environ.get("PROGRAMFILES(X86)") + + if program_files: + possible_paths.append(os.path.join(program_files, "LibreOffice", "program", "soffice.exe")) + if program_files_x86: + possible_paths.append(os.path.join(program_files_x86, "LibreOffice", "program", "soffice.exe")) + + for path in possible_paths: + if os.path.exists(path): + print("✅ LibreOffice is installed (for Office document conversion)") libreoffice_found = True break - except ( - subprocess.CalledProcessError, - FileNotFoundError, - subprocess.TimeoutExpired, - ): - continue + + # Try standard commands if not found via Windows paths (non-Windows systems) + if not libreoffice_found and platform.system() != "Windows": + for cmd in ["libreoffice", "soffice"]: + try: + result = subprocess.run([cmd, "--version"], **subprocess_kwargs) + if result.returncode == 0: + print( + "✅ LibreOffice is installed (for Office document conversion)" + ) + libreoffice_found = True + break + except ( + subprocess.CalledProcessError, + FileNotFoundError, + subprocess.TimeoutExpired, + ): + continue if not libreoffice_found: missing_system_deps.append("LibreOffice") diff --git a/requirements.txt b/requirements.txt index 6388a68b..a1164dff 100644 --- a/requirements.txt +++ b/requirements.txt @@ -5,8 +5,12 @@ asyncio-mqtt docling mcp-agent mcp-server-git +openapi nest_asyncio pathlib2 PyPDF2>=2.0.0 reportlab>=3.5.0 streamlit +openai +PyPDF2 +aiohttp \ No newline at end of file diff --git a/tools/pdf_converter.py b/tools/pdf_converter.py index c9d01e8d..ba012c30 100644 --- a/tools/pdf_converter.py +++ b/tools/pdf_converter.py @@ -18,8 +18,9 @@ import tempfile import shutil import platform +import os from pathlib import Path -from typing import Union, Optional, Dict, Any +from typing import Union, Optional, Dict, Any, List class PDFConverter: @@ -40,6 +41,39 @@ def __init__(self) -> None: """Initialize the PDF converter.""" pass + @staticmethod + def find_libreoffice_windows() -> Optional[str]: + """ + Find LibreOffice installation on Windows. + + Returns: + Path to soffice.exe if found, None otherwise + """ + if platform.system() != "Windows": + return None + + # Common LibreOffice installation paths on Windows + possible_paths = [ + r"C:\Program Files\LibreOffice\program\soffice.exe", + r"C:\Program Files (x86)\LibreOffice\program\soffice.exe", + ] + + # Also check PROGRAMFILES environment variables + program_files = os.environ.get("PROGRAMFILES") + program_files_x86 = os.environ.get("PROGRAMFILES(X86)") + + if program_files: + possible_paths.append(os.path.join(program_files, "LibreOffice", "program", "soffice.exe")) + if program_files_x86: + possible_paths.append(os.path.join(program_files_x86, "LibreOffice", "program", "soffice.exe")) + + # Check each path + for path in possible_paths: + if os.path.exists(path): + return path + + return None + @staticmethod def convert_office_to_pdf( doc_path: Union[str, Path], output_dir: Optional[str] = None @@ -67,7 +101,15 @@ def convert_office_to_pdf( if output_dir: base_output_dir = Path(output_dir) else: - base_output_dir = doc_path.parent / "pdf_output" + # Generate unique folder name with timestamp to avoid conflicts + import time + timestamp = int(time.time()) + folder_name = f"paper_{timestamp}" + + # Save to workspace instead of temp directory + workspace_base = Path(os.getcwd()) / "deepcode_lab" / "papers" + workspace_base.mkdir(parents=True, exist_ok=True) + base_output_dir = workspace_base / folder_name base_output_dir.mkdir(parents=True, exist_ok=True) @@ -86,26 +128,41 @@ def convert_office_to_pdf( # Hide console window on Windows if platform.system() == "Windows": - subprocess_kwargs["creationflags"] = ( - 0x08000000 # subprocess.CREATE_NO_WINDOW - ) - - try: - result = subprocess.run( - ["libreoffice", "--version"], **subprocess_kwargs - ) - libreoffice_available = True - working_libreoffice_cmd = "libreoffice" - logging.info(f"LibreOffice detected: {result.stdout.strip()}") # type: ignore - except ( - subprocess.CalledProcessError, - FileNotFoundError, - subprocess.TimeoutExpired, - ): - pass - - # Try alternative commands for LibreOffice - if not libreoffice_available: + # Use CREATE_NO_WINDOW to prevent console window from appearing + subprocess_kwargs["creationflags"] = 0x08000000 + # Also configure startupinfo to hide window + startupinfo = subprocess.STARTUPINFO() + startupinfo.dwFlags |= subprocess.STARTF_USESHOWWINDOW + startupinfo.wShowWindow = subprocess.SW_HIDE + subprocess_kwargs["startupinfo"] = startupinfo + + # On Windows, try to find LibreOffice in standard installation paths first + # Don't run --version check on Windows as it can cause window/hanging issues + if platform.system() == "Windows": + windows_path = PDFConverter.find_libreoffice_windows() + if windows_path: + libreoffice_available = True + working_libreoffice_cmd = windows_path + logging.info(f"LibreOffice detected at {windows_path}") + + # On non-Windows systems, try standard commands + if not libreoffice_available and platform.system() != "Windows": + try: + result = subprocess.run( + ["libreoffice", "--version"], **subprocess_kwargs + ) + libreoffice_available = True + working_libreoffice_cmd = "libreoffice" + logging.info(f"LibreOffice detected: {result.stdout.strip()}") # type: ignore + except ( + subprocess.CalledProcessError, + FileNotFoundError, + subprocess.TimeoutExpired, + ): + pass + + # Try alternative commands for LibreOffice (non-Windows) + if not libreoffice_available and platform.system() != "Windows": for cmd in ["soffice", "libreoffice"]: try: result = subprocess.run([cmd, "--version"], **subprocess_kwargs) @@ -142,7 +199,13 @@ def convert_office_to_pdf( # Use the working LibreOffice command first, then try alternatives if it fails commands_to_try = [working_libreoffice_cmd] - if working_libreoffice_cmd == "libreoffice": + + # Add alternative commands based on what was found + if platform.system() == "Windows" and working_libreoffice_cmd: + # If we're using the full Windows path, also try standard commands + if "Program Files" in working_libreoffice_cmd: + commands_to_try.extend(["soffice", "libreoffice"]) + elif working_libreoffice_cmd == "libreoffice": commands_to_try.append("soffice") else: commands_to_try.append("libreoffice") @@ -173,9 +236,12 @@ def convert_office_to_pdf( # Hide console window on Windows if platform.system() == "Windows": - convert_subprocess_kwargs["creationflags"] = ( - 0x08000000 # subprocess.CREATE_NO_WINDOW - ) + convert_subprocess_kwargs["creationflags"] = 0x08000000 + # Also configure startupinfo to hide window + startupinfo = subprocess.STARTUPINFO() + startupinfo.dwFlags |= subprocess.STARTF_USESHOWWINDOW + startupinfo.wShowWindow = subprocess.SW_HIDE + convert_subprocess_kwargs["startupinfo"] = startupinfo result = subprocess.run( convert_cmd, **convert_subprocess_kwargs @@ -227,6 +293,10 @@ def convert_office_to_pdf( # Copy PDF to final output directory final_pdf_path = base_output_dir / f"{name_without_suff}.pdf" shutil.copy2(pdf_path, final_pdf_path) + + print(f"✅ PDF saved to: {final_pdf_path}") + print(f" File size: {final_pdf_path.stat().st_size} bytes") + print(f" Parent folder: {base_output_dir}") return final_pdf_path @@ -281,7 +351,15 @@ def convert_text_to_pdf( if output_dir: base_output_dir = Path(output_dir) else: - base_output_dir = text_path.parent / "pdf_output" + # Generate unique folder name with timestamp to avoid conflicts + import time + timestamp = int(time.time()) + folder_name = f"paper_{timestamp}" + + # Save to workspace instead of temp directory + workspace_base = Path(os.getcwd()) / "deepcode_lab" / "papers" + workspace_base.mkdir(parents=True, exist_ok=True) + base_output_dir = workspace_base / folder_name base_output_dir.mkdir(parents=True, exist_ok=True) pdf_path = base_output_dir / f"{text_path.stem}.pdf" @@ -435,6 +513,10 @@ def convert_text_to_pdf( f"PDF conversion failed for {text_path.name} - generated PDF is empty or corrupted." ) + print(f"✅ PDF saved to: {pdf_path}") + print(f" File size: {pdf_path.stat().st_size} bytes") + print(f" Parent folder: {base_output_dir}") + return pdf_path except Exception as e: @@ -532,27 +614,34 @@ def check_dependencies(self) -> dict: } # Check LibreOffice - try: - subprocess_kwargs: Dict[str, Any] = { - "capture_output": True, - "text": True, - "check": True, - "encoding": "utf-8", - "errors": "ignore", - } - - if platform.system() == "Windows": - subprocess_kwargs["creationflags"] = ( - 0x08000000 # subprocess.CREATE_NO_WINDOW - ) - - subprocess.run(["libreoffice", "--version"], **subprocess_kwargs) - results["libreoffice"] = True - except (subprocess.CalledProcessError, FileNotFoundError): - try: - subprocess.run(["soffice", "--version"], **subprocess_kwargs) + # On Windows, just check if the executable exists (don't run it to avoid window issues) + if platform.system() == "Windows": + windows_path = PDFConverter.find_libreoffice_windows() + if windows_path: results["libreoffice"] = True - except (subprocess.CalledProcessError, FileNotFoundError): + else: + # On non-Windows systems, try running the version command + try: + subprocess_kwargs: Dict[str, Any] = { + "capture_output": True, + "text": True, + "check": True, + "timeout": 5, + "encoding": "utf-8", + "errors": "ignore", + } + + try: + subprocess.run(["libreoffice", "--version"], **subprocess_kwargs) + results["libreoffice"] = True + except (subprocess.CalledProcessError, FileNotFoundError, subprocess.TimeoutExpired): + try: + subprocess.run(["soffice", "--version"], **subprocess_kwargs) + results["libreoffice"] = True + except (subprocess.CalledProcessError, FileNotFoundError, subprocess.TimeoutExpired): + pass + except Exception: + # If any unexpected error occurs during LibreOffice check, silently pass pass # Check ReportLab diff --git a/ui/components.py b/ui/components.py index 933f419b..c3bfab0c 100644 --- a/ui/components.py +++ b/ui/components.py @@ -342,6 +342,13 @@ def file_input_component(task_counter: int) -> Optional[str]: Returns: PDF file path or None """ + # Check if we already have a converted file in session state for this task + cache_key = f"converted_file_{task_counter}" + if cache_key in st.session_state and st.session_state[cache_key]: + cached_result = st.session_state[cache_key] + st.info(f"📁 Using previously uploaded file: {cached_result.get('folder', 'Unknown')}") + return cached_result.get('json_result') + uploaded_file = st.file_uploader( "Upload research paper file", type=[ @@ -395,9 +402,57 @@ def file_input_component(task_counter: int) -> Optional[str]: # Check if file is already PDF if file_ext == "pdf": st.info("📑 File is already in PDF format, no conversion needed.") - return original_file_path - - # Convert to PDF + # Return JSON structure with paper_path for consistency + import json + json_result = json.dumps({"paper_path": original_file_path}) + + # Cache the result + cache_key = f"converted_file_{task_counter}" + st.session_state[cache_key] = { + "json_result": json_result, + "folder": os.path.basename(os.path.dirname(original_file_path)), + "pdf_path": original_file_path + } + + return json_result + + # Check if PDF already exists next to the original file + original_dir = os.path.dirname(original_file_path) + base_name = os.path.splitext(os.path.basename(original_file_path))[0] + potential_pdf = os.path.join(original_dir, f"{base_name}.pdf") + + if os.path.exists(potential_pdf): + st.info(f"📑 Found existing PDF: {os.path.basename(potential_pdf)} - using it instead of converting") + pdf_path = potential_pdf + + # Clean up uploaded temp file + try: + os.unlink(original_file_path) + except Exception: + pass + + # Display PDF info + pdf_size = Path(pdf_path).stat().st_size + st.success("✅ Using existing PDF file!") + st.info(f"📑 **PDF File:** {Path(pdf_path).name} ({format_file_size(pdf_size)})") + + # Return JSON structure with paper_path + import json + json_result = json.dumps({"paper_path": str(pdf_path)}) + + # Cache the result + cache_key = f"converted_file_{task_counter}" + pdf_dir = os.path.dirname(pdf_path) + folder_name = os.path.basename(pdf_dir) + st.session_state[cache_key] = { + "json_result": json_result, + "folder": folder_name, + "pdf_path": str(pdf_path) + } + + return json_result + + # Convert to PDF if no existing PDF found with st.spinner(f"🔄 Converting {file_ext.upper()} to PDF..."): try: converter = PDFConverter() @@ -435,8 +490,9 @@ def file_input_component(task_counter: int) -> Optional[str]: pass return None - # Perform conversion - pdf_path = converter.convert_to_pdf(original_file_path) + # Perform conversion - Save to temp location first, pipeline will organize it properly + # Use None for output_dir to let converter create temp folder + pdf_path = converter.convert_to_pdf(original_file_path, output_dir=None) # Clean up original file try: @@ -447,11 +503,28 @@ def file_input_component(task_counter: int) -> Optional[str]: # Display conversion result pdf_size = Path(pdf_path).stat().st_size st.success("✅ Successfully converted to PDF!") + + # Show the organized folder location, not just the temp filename + pdf_dir = os.path.dirname(pdf_path) + folder_name = os.path.basename(pdf_dir) st.info( f"📑 **PDF File:** {Path(pdf_path).name} ({format_file_size(pdf_size)})" ) - - return str(pdf_path) + st.info(f"📁 **Saved to project folder:** `{folder_name}`") + + # Return JSON structure with paper_path for consistency + import json + json_result = json.dumps({"paper_path": str(pdf_path)}) + + # Cache the result to prevent re-conversion on UI rerun + cache_key = f"converted_file_{task_counter}" + st.session_state[cache_key] = { + "json_result": json_result, + "folder": folder_name, + "pdf_path": str(pdf_path) + } + + return json_result except Exception as e: st.error(f"❌ PDF conversion failed: {str(e)}") diff --git a/ui/handlers.py b/ui/handlers.py index b109003a..857d5c4f 100644 --- a/ui/handlers.py +++ b/ui/handlers.py @@ -17,6 +17,25 @@ import nest_asyncio import concurrent.futures +# Global abort flag +_abort_requested = False + +def set_abort_requested(value: bool = True): + """Set the global abort flag""" + global _abort_requested + _abort_requested = value + if value: + print("🛑 Abort requested by user") + +def is_abort_requested() -> bool: + """Check if abort has been requested""" + return _abort_requested + +def reset_abort_flag(): + """Reset the abort flag""" + global _abort_requested + _abort_requested = False + # Import necessary modules from mcp_agent.app import MCPApp from workflows.agent_orchestration_engine import ( @@ -118,6 +137,10 @@ async def process_input_async( else: progress_callback(5, "🚀 Initializing AI research engine...") + # Check for abort before starting + if is_abort_requested(): + return {"status": "aborted", "message": "Process aborted by user"} + # Choose pipeline based on input type if input_type == "chat": # Use chat-based planning pipeline for user requirements @@ -397,6 +420,14 @@ def handle_processing_workflow( progress_bar, status_text, step_indicators, workflow_steps = ( enhanced_progress_display_component(enable_indexing, chat_mode) ) + + # Add Stop button + col1, col2 = st.columns([3, 1]) + with col2: + if st.button("🛑 Stop Processing", type="secondary", use_container_width=True): + set_abort_requested(True) + st.warning("🛑 Stop requested. Process will terminate after current operation.") + return {"status": "aborted", "message": "Process stopped by user"} # Step mapping: map progress percentages to step indices - adjust based on mode and indexing toggle if chat_mode: @@ -436,12 +467,26 @@ def handle_processing_workflow( current_step = 0 # Define enhanced progress callback function - def update_progress(progress: int, message: str): + def update_progress(progress: int, message: str, error: str = None): nonlocal current_step + + # Check for abort request + if is_abort_requested(): + st.error("🛑 Process aborted by user") + return # Update progress bar progress_bar.progress(progress) - status_text.markdown(f"**{message}**") + + # Display error if present + if error: + st.error(f"❌ Error: {error}") + print(f"❌ Error: {error}") + + # Update status with timestamp + timestamp = datetime.now().strftime("%H:%M:%S") + status_text.markdown(f"**[{timestamp}]** {message}") + print(f"[{timestamp}] {message}") # Determine current step new_step = step_mapping.get(progress, current_step) @@ -466,6 +511,10 @@ def update_progress(progress: int, message: str): # Start async processing with progress callback with st.spinner("🔄 Processing workflow stages..."): + # Check for abort before starting + if is_abort_requested(): + return {"status": "aborted", "message": "Process aborted by user"} + try: # First try using simple async processing method result = run_async_task_simple( @@ -474,7 +523,11 @@ def update_progress(progress: int, message: str): ) ) except Exception as e: - st.warning(f"Primary async method failed: {e}") + error_msg = f"Primary async method failed: {e}" + st.warning(error_msg) + print(f"⚠️ {error_msg}") + update_progress(0, "Retrying with fallback method...", error_msg) + # Fallback method: use original thread pool method try: result = run_async_task( @@ -483,7 +536,10 @@ def update_progress(progress: int, message: str): ) ) except Exception as backup_error: - st.error(f"Both async methods failed. Error: {backup_error}") + error_msg = f"Both async methods failed. Error: {backup_error}" + st.error(error_msg) + print(f"❌ {error_msg}") + update_progress(0, "Processing failed", error_msg) return { "status": "error", "error": str(backup_error), @@ -827,6 +883,9 @@ def handle_start_processing_button(input_source: str, input_type: str): # Clean up system resources cleanup_resources() + + # Reset abort flag + reset_abort_flag() # Rerun to display results or errors st.rerun() diff --git a/utils/file_processor.py b/utils/file_processor.py index 99632861..469936b6 100644 --- a/utils/file_processor.py +++ b/utils/file_processor.py @@ -43,7 +43,7 @@ def extract_file_path(file_info: Union[str, Dict]) -> Optional[str]: try: info_dict = json.loads(file_info) except json.JSONDecodeError: - # 尝试从文本中提取JSON + # Try to extract JSON from text info_dict = FileProcessor.extract_json_from_text(file_info) if not info_dict: # If not JSON and doesn't look like a file path, raise error @@ -191,9 +191,21 @@ async def read_file_content(file_path: str) -> str: with open(file_path, "rb") as f: header = f.read(8) if header.startswith(b"%PDF"): - raise IOError( - f"File {file_path} is a PDF file, not a text file. Please convert it to markdown format or use PDF processing tools." - ) + # Try to convert PDF to markdown automatically + try: + from tools.pdf_downloader import SimplePdfConverter + converter = SimplePdfConverter() + conversion_result = converter.convert_pdf_to_markdown(file_path) + + if conversion_result["success"]: + # Use the converted markdown file instead + file_path = conversion_result["output_file"] + else: + raise IOError(f"PDF conversion failed: {conversion_result['error']}") + except Exception as conv_error: + raise IOError( + f"File {file_path} is a PDF file, not a text file. PDF conversion failed: {str(conv_error)}" + ) # Read file content # Note: Using async with would be better for large files @@ -278,7 +290,7 @@ async def process_file_input( Dict: The structured content with sections and standardized text """ try: - # 首先尝试从字符串中提取markdown文件路径 + # First try to extract markdown file path from string if isinstance(file_input, str): import re @@ -298,7 +310,7 @@ async def process_file_input( else: # Extract the relative part and combine with base_dir paper_name = os.path.basename(paper_dir) - # 保持原始目录名不变,不做任何替换 + # Keep original directory name unchanged, no replacements paper_dir = os.path.join(base_dir, "papers", paper_name) # Ensure the directory exists @@ -310,12 +322,12 @@ async def process_file_input( # Get the actual file path file_path = None if isinstance(file_input, str): - # 尝试解析为JSON(处理下载结果) + # Try to parse as JSON (handle download results) try: parsed_json = json.loads(file_input) if isinstance(parsed_json, dict) and "paper_path" in parsed_json: file_path = parsed_json.get("paper_path") - # 如果文件不存在,尝试查找markdown文件 + # If file doesn't exist, try to find markdown file if file_path and not os.path.exists(file_path): paper_dir = os.path.dirname(file_path) if os.path.isdir(paper_dir): @@ -327,11 +339,11 @@ async def process_file_input( else: raise ValueError("Invalid JSON format: missing paper_path") except json.JSONDecodeError: - # 尝试从文本中提取JSON(处理包含额外文本的下载结果) + # Try to extract JSON from text (handle download results with extra text) extracted_json = cls.extract_json_from_text(file_input) if extracted_json and "paper_path" in extracted_json: file_path = extracted_json.get("paper_path") - # 如果文件不存在,尝试查找markdown文件 + # If file doesn't exist, try to find markdown file if file_path and not os.path.exists(file_path): paper_dir = os.path.dirname(file_path) if os.path.isdir(paper_dir): @@ -341,7 +353,7 @@ async def process_file_input( f"No markdown file found in directory: {paper_dir}" ) else: - # 不是JSON,按文件路径处理 + # Not JSON, handle as file path # Check if it's a file path (existing or not) if file_input.endswith( (".md", ".pdf", ".txt", ".docx", ".doc", ".html", ".htm") diff --git a/utils/loop_detector.py b/utils/loop_detector.py new file mode 100644 index 00000000..a54d065f --- /dev/null +++ b/utils/loop_detector.py @@ -0,0 +1,198 @@ +""" +Loop Detection and Timeout Safeguards for Code Implementation Workflow + +This module provides tools to detect infinite loops, timeouts, and progress stalls +in the code implementation process to prevent hanging processes. +""" + +import time +from typing import List, Dict, Any, Optional +from datetime import datetime, timedelta + + +class LoopDetector: + """ + Detects infinite loops, timeouts, and progress stalls in workflow execution. + + Features: + - Track tool call history to detect repeated patterns + - Monitor time per file/operation + - Detect progress stalls + - Force stop after consecutive errors + """ + + def __init__(self, max_repeats: int = 5, timeout_seconds: int = 300, + stall_threshold: int = 180, max_errors: int = 10): + """ + Initialize loop detector. + + Args: + max_repeats: Maximum consecutive calls to same tool before flagging + timeout_seconds: Maximum time per file/operation (5 minutes default) + stall_threshold: Maximum time without progress (3 minutes default) + max_errors: Maximum consecutive errors before force stop + """ + self.max_repeats = max_repeats + self.timeout_seconds = timeout_seconds + self.stall_threshold = stall_threshold + self.max_errors = max_errors + + # Tracking state + self.tool_history: List[str] = [] + self.start_time = time.time() + self.last_progress_time = time.time() + self.consecutive_errors = 0 + self.current_file = None + self.file_start_time = None + + def start_file(self, filename: str): + """Start tracking a new file.""" + self.current_file = filename + self.file_start_time = time.time() + self.last_progress_time = time.time() + print(f"📁 Starting file: {filename}") + + def check_tool_call(self, tool_name: str) -> Dict[str, Any]: + """ + Check if tool call indicates a loop or timeout. + + Args: + tool_name: Name of the tool being called + + Returns: + Dict with status and warnings + """ + current_time = time.time() + self.tool_history.append(tool_name) + + # Keep only recent history (last 10 calls) + if len(self.tool_history) > 10: + self.tool_history = self.tool_history[-10:] + + # Check for repeated tool calls + if len(self.tool_history) >= self.max_repeats: + recent_tools = self.tool_history[-self.max_repeats:] + if len(set(recent_tools)) == 1: # All same tool + return { + "status": "loop_detected", + "message": f"⚠️ Loop detected: {tool_name} called {self.max_repeats} times consecutively", + "should_stop": True + } + + # Check file timeout + if self.file_start_time and (current_time - self.file_start_time) > self.timeout_seconds: + return { + "status": "timeout", + "message": f"⏰ Timeout: File {self.current_file} processing exceeded {self.timeout_seconds}s", + "should_stop": True + } + + # Check progress stall + if (current_time - self.last_progress_time) > self.stall_threshold: + return { + "status": "stall", + "message": f"🐌 Progress stall: No progress for {self.stall_threshold}s", + "should_stop": True + } + + # Check consecutive errors + if self.consecutive_errors >= self.max_errors: + return { + "status": "max_errors", + "message": f"❌ Too many errors: {self.consecutive_errors} consecutive errors", + "should_stop": True + } + + return { + "status": "ok", + "message": "Processing normally", + "should_stop": False + } + + def record_progress(self): + """Record that progress has been made.""" + self.last_progress_time = time.time() + self.consecutive_errors = 0 # Reset error counter on progress + + def record_error(self, error_message: str): + """Record an error occurred.""" + self.consecutive_errors += 1 + print(f"❌ Error #{self.consecutive_errors}: {error_message}") + + def record_success(self): + """Record a successful operation.""" + self.consecutive_errors = 0 + self.record_progress() + + def get_status_summary(self) -> Dict[str, Any]: + """Get current status summary.""" + current_time = time.time() + file_elapsed = (current_time - self.file_start_time) if self.file_start_time else 0 + total_elapsed = current_time - self.start_time + + return { + "current_file": self.current_file, + "file_elapsed_seconds": file_elapsed, + "total_elapsed_seconds": total_elapsed, + "consecutive_errors": self.consecutive_errors, + "recent_tools": self.tool_history[-5:], # Last 5 tools + "time_since_last_progress": current_time - self.last_progress_time + } + + def should_abort(self) -> bool: + """Check if process should be aborted.""" + status = self.check_tool_call("") # Check without adding to history + return status["should_stop"] + + def get_abort_reason(self) -> Optional[str]: + """Get reason for abort if should abort.""" + if self.should_abort(): + status = self.check_tool_call("") + return status["message"] + return None + + +class ProgressTracker: + """ + Track progress through implementation phases and files. + """ + + def __init__(self, total_files: int = 0): + self.total_files = total_files + self.completed_files = 0 + self.current_phase = "Initializing" + self.phase_progress = 0 + self.start_time = time.time() + + def set_phase(self, phase_name: str, progress_percent: int): + """Set current phase and progress percentage.""" + self.current_phase = phase_name + self.phase_progress = progress_percent + print(f"📊 Progress: {progress_percent}% - {phase_name}") + + def complete_file(self, filename: str): + """Record completion of a file.""" + self.completed_files += 1 + print(f"✅ Completed file {self.completed_files}/{self.total_files}: {filename}") + + def get_progress_info(self) -> Dict[str, Any]: + """Get current progress information.""" + elapsed = time.time() - self.start_time + + # Estimate remaining time + if self.completed_files > 0 and self.total_files > 0: + avg_time_per_file = elapsed / self.completed_files + remaining_files = self.total_files - self.completed_files + estimated_remaining = avg_time_per_file * remaining_files + else: + estimated_remaining = 0 + + return { + "phase": self.current_phase, + "phase_progress": self.phase_progress, + "files_completed": self.completed_files, + "total_files": self.total_files, + "file_progress": (self.completed_files / self.total_files * 100) if self.total_files > 0 else 0, + "elapsed_seconds": elapsed, + "estimated_remaining_seconds": estimated_remaining + } diff --git a/utils/model_limits.py b/utils/model_limits.py new file mode 100644 index 00000000..053bc5d1 --- /dev/null +++ b/utils/model_limits.py @@ -0,0 +1,271 @@ +""" +Model Limits and Capabilities Detection + +This module provides utilities to detect LLM model capabilities and limits +dynamically, avoiding hardcoded values and supporting model changes. +""" + +from typing import Dict, Tuple, Optional +import yaml + + +# Model capability database +# Format: {model_name_pattern: {max_completion_tokens, max_context_tokens, cost_per_1m_input, cost_per_1m_output}} +MODEL_LIMITS = { + # OpenAI Models + "gpt-4o-mini": { + "max_completion_tokens": 16384, + "max_context_tokens": 128000, + "input_cost_per_1m": 0.15, + "output_cost_per_1m": 0.60, + "provider": "openai" + }, + "gpt-4o": { + "max_completion_tokens": 16384, + "max_context_tokens": 128000, + "input_cost_per_1m": 2.50, + "output_cost_per_1m": 10.00, + "provider": "openai" + }, + "gpt-4-turbo": { + "max_completion_tokens": 4096, + "max_context_tokens": 128000, + "input_cost_per_1m": 10.00, + "output_cost_per_1m": 30.00, + "provider": "openai" + }, + "gpt-4": { + "max_completion_tokens": 8192, + "max_context_tokens": 8192, + "input_cost_per_1m": 30.00, + "output_cost_per_1m": 60.00, + "provider": "openai" + }, + "gpt-3.5-turbo": { + "max_completion_tokens": 4096, + "max_context_tokens": 16385, + "input_cost_per_1m": 0.50, + "output_cost_per_1m": 1.50, + "provider": "openai" + }, + "o1-mini": { + "max_completion_tokens": 65536, + "max_context_tokens": 128000, + "input_cost_per_1m": 3.00, + "output_cost_per_1m": 12.00, + "provider": "openai" + }, + "o1": { + "max_completion_tokens": 100000, + "max_context_tokens": 200000, + "input_cost_per_1m": 15.00, + "output_cost_per_1m": 60.00, + "provider": "openai" + }, + # Anthropic Models + "claude-3-5-sonnet": { + "max_completion_tokens": 8192, + "max_context_tokens": 200000, + "input_cost_per_1m": 3.00, + "output_cost_per_1m": 15.00, + "provider": "anthropic" + }, + "claude-3-opus": { + "max_completion_tokens": 4096, + "max_context_tokens": 200000, + "input_cost_per_1m": 15.00, + "output_cost_per_1m": 75.00, + "provider": "anthropic" + }, + "claude-3-sonnet": { + "max_completion_tokens": 4096, + "max_context_tokens": 200000, + "input_cost_per_1m": 3.00, + "output_cost_per_1m": 15.00, + "provider": "anthropic" + }, + "claude-3-haiku": { + "max_completion_tokens": 4096, + "max_context_tokens": 200000, + "input_cost_per_1m": 0.25, + "output_cost_per_1m": 1.25, + "provider": "anthropic" + }, +} + + +def get_model_from_config(config_path: str = "mcp_agent.config.yaml") -> Optional[str]: + """ + Get the default model from configuration file. + + Args: + config_path: Path to the configuration file + + Returns: + Model name or None if not found + """ + try: + with open(config_path, "r", encoding="utf-8") as f: + config = yaml.safe_load(f) + + # Check OpenAI config first + if "openai" in config and "default_model" in config["openai"]: + return config["openai"]["default_model"] + + # Check Anthropic config + if "anthropic" in config and "default_model" in config["anthropic"]: + return config["anthropic"]["default_model"] + + return None + except Exception as e: + print(f"⚠️ Warning: Could not read model from config: {e}") + return None + + +def get_model_limits(model_name: Optional[str] = None, config_path: str = "mcp_agent.config.yaml") -> Dict: + """ + Get the limits and capabilities for a specific model. + + Args: + model_name: Name of the model (if None, reads from config) + config_path: Path to the configuration file + + Returns: + Dictionary with model limits and capabilities + """ + # Get model name from config if not provided + if not model_name: + model_name = get_model_from_config(config_path) + + if not model_name: + print("⚠️ Warning: Could not determine model, using safe defaults") + return { + "max_completion_tokens": 4096, + "max_context_tokens": 8192, + "input_cost_per_1m": 1.00, + "output_cost_per_1m": 3.00, + "provider": "unknown" + } + + # Find matching model in database + for pattern, limits in MODEL_LIMITS.items(): + if pattern.lower() in model_name.lower(): + print(f"📊 Detected model: {model_name} → {pattern}") + print(f" Max completion tokens: {limits['max_completion_tokens']}") + print(f" Max context tokens: {limits['max_context_tokens']}") + return limits.copy() + + # Model not in database - use conservative defaults + print(f"⚠️ Warning: Model '{model_name}' not in database, using conservative defaults") + return { + "max_completion_tokens": 4096, + "max_context_tokens": 8192, + "input_cost_per_1m": 1.00, + "output_cost_per_1m": 3.00, + "provider": "unknown" + } + + +def get_safe_max_tokens( + model_name: Optional[str] = None, + config_path: str = "mcp_agent.config.yaml", + safety_margin: float = 0.9 +) -> int: + """ + Get a safe max_tokens value for the model with a safety margin. + + Args: + model_name: Name of the model (if None, reads from config) + config_path: Path to the configuration file + safety_margin: Percentage of max to use (0.9 = 90% of max) + + Returns: + Safe max_tokens value + """ + limits = get_model_limits(model_name, config_path) + safe_tokens = int(limits["max_completion_tokens"] * safety_margin) + print(f"🔧 Safe max_tokens for {model_name or 'current model'}: {safe_tokens} ({safety_margin*100:.0f}% of {limits['max_completion_tokens']})") + return safe_tokens + + +def calculate_token_cost( + input_tokens: int, + output_tokens: int, + model_name: Optional[str] = None, + config_path: str = "mcp_agent.config.yaml" +) -> float: + """ + Calculate the cost for a given number of tokens. + + Args: + input_tokens: Number of input/prompt tokens + output_tokens: Number of output/completion tokens + model_name: Name of the model (if None, reads from config) + config_path: Path to the configuration file + + Returns: + Total cost in dollars + """ + limits = get_model_limits(model_name, config_path) + + input_cost = (input_tokens / 1_000_000) * limits["input_cost_per_1m"] + output_cost = (output_tokens / 1_000_000) * limits["output_cost_per_1m"] + total_cost = input_cost + output_cost + + return total_cost + + +def get_retry_token_limits( + base_tokens: int, + retry_count: int, + model_name: Optional[str] = None, + config_path: str = "mcp_agent.config.yaml" +) -> int: + """ + Get adjusted token limits for retries, respecting model maximum. + + Args: + base_tokens: Base token limit + retry_count: Current retry attempt (0, 1, 2, ...) + model_name: Name of the model (if None, reads from config) + config_path: Path to the configuration file + + Returns: + Adjusted token limit for retry + """ + limits = get_model_limits(model_name, config_path) + max_allowed = limits["max_completion_tokens"] + + # Increase tokens with each retry, but cap at model maximum + if retry_count == 0: + # First retry: 87.5% of max + new_tokens = int(max_allowed * 0.875) + elif retry_count == 1: + # Second retry: 95% of max + new_tokens = int(max_allowed * 0.95) + else: + # Third+ retry: Use max with small safety margin + new_tokens = int(max_allowed * 0.98) + + # Ensure we don't exceed the model's hard limit + new_tokens = min(new_tokens, max_allowed) + + print(f"🔧 Retry {retry_count + 1}: Adjusting tokens from {base_tokens} → {new_tokens} (max: {max_allowed})") + + return new_tokens + + +def get_provider_from_model(model_name: Optional[str] = None, config_path: str = "mcp_agent.config.yaml") -> str: + """ + Determine the provider (openai/anthropic) for a given model. + + Args: + model_name: Name of the model (if None, reads from config) + config_path: Path to the configuration file + + Returns: + Provider name: "openai", "anthropic", or "unknown" + """ + limits = get_model_limits(model_name, config_path) + return limits.get("provider", "unknown") + diff --git a/workflows/agent_orchestration_engine.py b/workflows/agent_orchestration_engine.py index e9703505..79b1f2d7 100644 --- a/workflows/agent_orchestration_engine.py +++ b/workflows/agent_orchestration_engine.py @@ -64,16 +64,16 @@ def _assess_output_completeness(text: str) -> float: """ - 精准评估YAML格式实现计划的完整性 + Accurately assess the completeness of YAML-formatted implementation plans. - 基于CODE_PLANNING_PROMPT_TRADITIONAL的实际要求: - 1. 检查5个必需的YAML sections是否都存在 - 2. 验证YAML结构的完整性(开始和结束标记) - 3. 检查最后一行是否被截断 - 4. 验证最小合理长度 + Based on the actual requirements of CODE_PLANNING_PROMPT_TRADITIONAL: + 1. Check if all 5 required YAML sections are present + 2. Verify YAML structure integrity (start and end markers) + 3. Check if the last line is truncated + 4. Verify minimum reasonable length Returns: - float: 完整性分数 (0.0-1.0),越高表示越完整 + float: Completeness score (0.0-1.0), higher indicates more complete """ if not text or len(text.strip()) < 500: return 0.0 @@ -81,8 +81,8 @@ def _assess_output_completeness(text: str) -> float: score = 0.0 text_lower = text.lower() - # 1. 检查5个必需的YAML sections (权重: 0.5 - 最重要) - # 这是prompt明确要求的5个sections + # 1. Check for 5 required YAML sections (weight: 0.5 - most important) + # These are the 5 sections explicitly required by the prompt required_sections = [ "file_structure:", "implementation_components:", @@ -97,7 +97,7 @@ def _assess_output_completeness(text: str) -> float: print(f" 📋 Required sections: {sections_found}/{len(required_sections)}") - # 2. 检查YAML结构完整性 (权重: 0.2) + # 2. Check YAML structure integrity (weight: 0.2) has_yaml_start = any( marker in text for marker in ["```yaml", "complete_reproduction_plan:", "paper_info:"] @@ -112,25 +112,25 @@ def _assess_output_completeness(text: str) -> float: elif has_yaml_start: score += 0.1 - # 3. 检查最后一行完整性 (权重: 0.15) + # 3. Check last line integrity (weight: 0.15) lines = text.strip().split("\n") if lines: last_line = lines[-1].strip() - # YAML的最后一行通常是缩进的内容行或结束标记 + # YAML's last line is usually an indented content line or end marker if ( last_line.endswith(("```", ".", ":", "]", "}")) - or last_line.startswith(("-", "*", " ")) # YAML列表项或缩进内容 + or last_line.startswith(("-", "*", " ")) # YAML list items or indented content or ( len(last_line) < 100 and not last_line.endswith(",") - ) # 短行且不是被截断的 + ) # Short line and not truncated ): score += 0.15 else: - # 长行且没有合适的结尾,很可能被截断 + # Long line without proper ending, likely truncated print(f" ⚠️ Last line suspicious: '{last_line[-50:]}'") - # 4. 检查合理的最小长度 (权重: 0.15) - # 一个完整的5-section计划应该至少8000字符 + # 4. Check reasonable minimum length (weight: 0.15) + # A complete 5-section plan should be at least 8000 characters length = len(text) if length >= 10000: score += 0.15 @@ -146,40 +146,34 @@ def _assess_output_completeness(text: str) -> float: def _adjust_params_for_retry(params: RequestParams, retry_count: int) -> RequestParams: """ - 激进的token增长策略以确保完整输出 - - 策略说明: - - 第1次重试:大幅增加到40000 tokens(确保有足够空间输出完整YAML) - - 第2次重试:进一步增加到60000 tokens(处理极端情况) - - 降低temperature提高稳定性和可预测性 - - 为什么需要这么多tokens? - - ParallelLLM的fan_out agents会生成长篇分析结果(各5000+ tokens) - - fan_in agent接收这些结果作为输入context - - 需要输出包含5个详细sections的完整YAML(10000+ tokens) - - 因此需要为OUTPUT预留充足的token空间 + Dynamic token adjustment strategy that respects model limits. + + Strategy: + - Automatically detects current model's token limits + - Progressively increases tokens with each retry + - Never exceeds the model's maximum + - Decreases temperature for more consistent output + + Why dynamic adjustment is needed: + - Different models have different token limits (gpt-4o-mini: 16K, o1: 100K, etc.) + - Hardcoding limits breaks when switching models + - Need to maximize output space while respecting limits """ - # 激进的token增长策略 - if retry_count == 0: - # 第一次重试:直接跳到40K,确保有足够输出空间 - new_max_tokens = 40000 - elif retry_count == 1: - # 第二次重试:进一步增加到60K - new_max_tokens = 60000 - else: - # 第三次及以上:使用最大限制 - new_max_tokens = 80000 + from utils.model_limits import get_retry_token_limits + + # Get dynamically adjusted token limit based on current model and retry count + new_max_tokens = get_retry_token_limits(params.maxTokens, retry_count) - # 随着重试次数增加,降低temperature以获得更一致、更可预测的输出 + # Decrease temperature with each retry to get more consistent and predictable output new_temperature = max(params.temperature - (retry_count * 0.15), 0.05) print(f"🔧 Adjusting parameters for retry {retry_count + 1}:") print(f" Token limit: {params.maxTokens} → {new_max_tokens}") print(f" Temperature: {params.temperature:.2f} → {new_temperature:.2f}") - print(" 💡 Strategy: Ensure sufficient output space for complete 5-section YAML") + print(" 💡 Strategy: Dynamically adjusted for current model") return RequestParams( - maxTokens=new_max_tokens, # 注意:使用 camelCase + maxTokens=new_max_tokens, # Note: Using camelCase temperature=new_temperature, ) @@ -352,7 +346,7 @@ async def run_research_analyzer(prompt_text: str, logger) -> str: # Set higher token output for research analysis analysis_params = RequestParams( - maxTokens=6144, # 使用 camelCase + maxTokens=6144, # Using camelCase temperature=0.3, ) @@ -444,7 +438,7 @@ async def run_resource_processor(analysis_result: str, logger) -> str: # Set higher token output for resource processing processor_params = RequestParams( - maxTokens=4096, # 使用 camelCase + maxTokens=4096, # Using camelCase temperature=0.2, ) @@ -505,21 +499,25 @@ async def run_code_analyzer( ) # Advanced token management system with dynamic scaling - # 关键优化:ParallelLLM需要为输出预留充足空间 - # fan_in agent会接收fan_out agents的完整输出作为context,然后需要生成完整YAML + # Key optimization: ParallelLLM needs to reserve sufficient space for output + # fan_in agent receives complete output from fan_out agents as context, then needs to generate complete YAML + # Dynamically determine max_tokens based on the configured model + from utils.model_limits import get_safe_max_tokens + + # Get safe token limit for current model (90% of max to leave safety margin) + max_tokens_limit = get_safe_max_tokens(safety_margin=0.9) + if use_segmentation: - # 分段模式:输入已优化,但仍需大量输出空间 - max_tokens_limit = 30000 # 充足的输出空间确保5个sections完整生成 - temperature = 0.2 # 稍微降低temperature以提高一致性 - print("🧠 Using SEGMENTED mode: max_tokens=30000 for complete YAML output") + # Segmented mode: Input is optimized, but still needs large output space + temperature = 0.2 # Slightly lower temperature for better consistency + print(f"🧠 Using SEGMENTED mode: max_tokens={max_tokens_limit} for complete YAML output") else: - # 传统模式:需要更多输出空间应对长篇分析结果 - max_tokens_limit = 30000 # 足够的空间确保完整输出 + # Traditional mode: Needs more output space for lengthy analysis results temperature = 0.3 - print("🧠 Using TRADITIONAL mode: max_tokens=30000 for complete YAML output") + print(f"🧠 Using TRADITIONAL mode: max_tokens={max_tokens_limit} for complete YAML output") enhanced_params = RequestParams( - maxTokens=max_tokens_limit, # 注意:使用 camelCase 而不是 snake_case + maxTokens=max_tokens_limit, # Note: Using camelCase instead of snake_case temperature=temperature, ) @@ -534,7 +532,7 @@ async def run_code_analyzer( The goal is to create a reproduction plan detailed enough for independent implementation.""" - # 智能输出完整性检查和重试机制 + # Intelligent output completeness check and retry mechanism max_retries = 3 retry_count = 0 @@ -547,11 +545,11 @@ async def run_code_analyzer( message=message, request_params=enhanced_params ) - # 检查输出完整性的高级指标 + # Advanced metrics for checking output completeness completeness_score = _assess_output_completeness(result) print(f"📊 Output completeness score: {completeness_score:.2f}/1.0") - if completeness_score >= 0.8: # 输出被认为是完整的 + if completeness_score >= 0.8: # Output is considered complete print( f"✅ Code analysis completed successfully (length: {len(result)} chars)" ) @@ -560,7 +558,7 @@ async def run_code_analyzer( print( f"⚠️ Output appears truncated (score: {completeness_score:.2f}), retrying with enhanced parameters..." ) - # 动态调整参数进行重试 + # Dynamically adjust parameters for retry enhanced_params = _adjust_params_for_retry(enhanced_params, retry_count) retry_count += 1 @@ -570,7 +568,7 @@ async def run_code_analyzer( if retry_count >= max_retries: raise - # 如果所有重试都失败,返回最后一次的结果 + # If all retries fail, return the last result print(f"⚠️ Returning potentially incomplete result after {max_retries} attempts") return result @@ -601,7 +599,7 @@ async def github_repo_download(search_result: str, paper_dir: str, logger) -> st # Set higher token output for GitHub download github_params = RequestParams( - maxTokens=4096, # 使用 camelCase + maxTokens=4096, # Using camelCase temperature=0.1, ) @@ -698,7 +696,37 @@ async def orchestrate_research_analysis_agent( progress_callback( 25, "📥 Processing downloads and preparing document structure..." ) - download_result = await run_resource_processor(analysis_result, logger) + + print(f"📋 Analysis result preview: {analysis_result[:200] if len(analysis_result) > 200 else analysis_result}") + + # Check if file is already in organized location - if so, skip resource processing + # This prevents creating duplicate paper_{timestamp} folders + try: + import json + import re + analysis_dict = json.loads(analysis_result) + if "file_path" in analysis_dict or "paper_path" in analysis_dict: + file_path = analysis_dict.get("file_path") or analysis_dict.get("paper_path") + if file_path and os.path.exists(file_path): + file_dir = os.path.dirname(file_path) + folder_name = os.path.basename(file_dir) + + # If already in a paper_{timestamp} folder, skip resource processing + if re.match(r"paper_\d+$", folder_name): + print(f"✅ File already in organized workspace folder: {folder_name}") + print(f" Skipping resource processing to avoid duplicate folders") + download_result = analysis_result # Use existing location + else: + download_result = await run_resource_processor(analysis_result, logger) + else: + download_result = await run_resource_processor(analysis_result, logger) + else: + download_result = await run_resource_processor(analysis_result, logger) + except: + # If not JSON or any error, do normal processing + download_result = await run_resource_processor(analysis_result, logger) + + print(f"📥 Download result preview: {download_result[:200] if len(download_result) > 200 else download_result}") return analysis_result, download_result @@ -834,9 +862,26 @@ async def orchestrate_document_preprocessing_agent( with open(md_path, "rb") as f: header = f.read(8) if header.startswith(b"%PDF"): - raise IOError( - f"File {md_path} is a PDF file, not a text file. Please convert it to markdown format or use PDF processing tools." - ) + # If we find a PDF file where we expected markdown, try to convert it + print(f"⚠️ Found PDF file instead of markdown: {md_path}") + print("🔄 Attempting to convert PDF to markdown...") + + # Try to convert the PDF to markdown + try: + from tools.pdf_downloader import SimplePdfConverter + converter = SimplePdfConverter() + conversion_result = converter.convert_pdf_to_markdown(md_path) + + if conversion_result["success"]: + print(f"✅ PDF converted to markdown: {conversion_result['output_file']}") + # Use the converted markdown file instead + md_path = conversion_result["output_file"] + else: + raise IOError(f"PDF conversion failed: {conversion_result['error']}") + except Exception as conv_error: + raise IOError( + f"File {md_path} is a PDF file, not a text file. PDF conversion failed: {str(conv_error)}" + ) with open(md_path, "r", encoding="utf-8") as f: document_content = f.read() @@ -854,6 +899,7 @@ async def orchestrate_document_preprocessing_agent( # Step 3: Determine if segmentation should be used should_segment, reason = should_use_document_segmentation(document_content) + print(f"📊 Segmentation decision: {should_segment}") print(f" Reason: {reason}") @@ -952,12 +998,43 @@ async def orchestrate_code_planning_agent( use_segmentation = dir_info.get("use_segmentation", True) print(f"📊 Planning mode: {'Segmented' if use_segmentation else 'Traditional'}") + # First, verify there's a markdown file to analyze + import glob + md_files = glob.glob(os.path.join(dir_info["paper_dir"], "*.md")) + md_files = [f for f in md_files if not f.endswith("implement_code_summary.md")] # Exclude summary + + if not md_files: + error_msg = f"❌ No markdown file found in {dir_info['paper_dir']}. PDF conversion may have failed." + print(error_msg) + print(f" Paper directory: {dir_info['paper_dir']}") + print(f" Directory exists: {os.path.exists(dir_info['paper_dir'])}") + if os.path.exists(dir_info['paper_dir']): + all_files = os.listdir(dir_info['paper_dir']) + print(f" Available files ({len(all_files)}): {all_files}") + + # Check for PDF files that might need conversion + pdf_files = [f for f in all_files if f.endswith('.pdf')] + if pdf_files: + print(f" Found PDF files that weren't converted: {pdf_files}") + else: + print(f" ⚠️ Directory doesn't exist!") + raise ValueError(error_msg) + + print(f"📄 Found markdown file for analysis: {os.path.basename(md_files[0])}") + initial_plan_result = await run_code_analyzer( dir_info["paper_dir"], logger, use_segmentation=use_segmentation ) + + # Check if plan is empty or invalid + if not initial_plan_result or len(initial_plan_result.strip()) < 100: + error_msg = f"❌ Code planning failed: Generated plan is empty or too short ({len(initial_plan_result)} chars)" + print(error_msg) + raise ValueError(error_msg) + with open(initial_plan_path, "w", encoding="utf-8") as f: f.write(initial_plan_result) - print(f"Initial plan saved to {initial_plan_path}") + print(f"✅ Initial plan saved to {initial_plan_path} ({len(initial_plan_result)} chars)") async def automate_repository_acquisition_agent( @@ -1206,6 +1283,10 @@ async def synthesize_code_implementation_agent( print(f"Using initial plan from {dir_info['initial_plan_path']}") # Run code implementation workflow with pure code mode + # Pass segmentation information to help with token management + use_segmentation = dir_info.get("use_segmentation", False) + print(f"🔧 Code implementation using segmentation: {use_segmentation}") + implementation_result = await code_workflow.run_workflow( plan_file_path=dir_info["initial_plan_path"], target_directory=dir_info["paper_dir"], @@ -1300,7 +1381,7 @@ async def run_chat_planning_agent(user_input: str, logger) -> str: # Set higher token output for comprehensive planning planning_params = RequestParams( - maxTokens=8192, # 使用 camelCase - Higher token limit for detailed plans + maxTokens=8192, # Using camelCase - Higher token limit for detailed plans temperature=0.2, # Lower temperature for more structured output ) @@ -1382,11 +1463,12 @@ async def execute_multi_agent_research_pipeline( str: The comprehensive pipeline execution result with status and outcomes """ try: - # Phase 0: Workspace Setup + # Phase 0: Workspace Setup (5%) if progress_callback: progress_callback(5, "🔄 Setting up workspace for file processing...") print("🚀 Initializing intelligent multi-agent research orchestration system") + print("📊 Progress: 5% - Workspace Setup") # Setup local workspace directory workspace_dir = os.path.join(os.getcwd(), "deepcode_lab") @@ -1402,11 +1484,59 @@ async def execute_multi_agent_research_pipeline( else: print("⚡ Optimized mode - advanced intelligence analysis disabled") - # Phase 1: Input Processing and Validation + # Phase 1: Input Processing and Validation (10%) + if progress_callback: + progress_callback(10, "📄 Processing and validating input source...") + print("📊 Progress: 10% - Input Processing") + input_source = await _process_input_source(input_source, logger) - # Phase 2: Research Analysis and Resource Processing (if needed) - if isinstance(input_source, str) and ( + # Phase 2: Research Analysis and Resource Processing (25%) + if progress_callback: + progress_callback(25, "🔍 Analyzing research content and downloading resources...") + print("📊 Progress: 25% - Research Analysis") + + # Check if input_source is already a JSON with paper_path in a paper_{timestamp} folder + skip_processing = False + if isinstance(input_source, str): + try: + import json + import re + input_dict = json.loads(input_source) + if "paper_path" in input_dict: + paper_path = input_dict["paper_path"] + paper_dir = os.path.dirname(paper_path) + # Check if already in a paper_{timestamp} folder + if re.match(r"paper_\d+$", os.path.basename(paper_dir)): + print(f"✅ File already in organized folder: {paper_dir}") + print(f" Skipping research analysis phase (file already processed)") + + # Convert PDF to markdown if not already done + if paper_path.endswith('.pdf'): + print(f"🔄 Converting PDF to markdown...") + try: + from tools.pdf_downloader import SimplePdfConverter + converter = SimplePdfConverter() + conversion_result = converter.convert_pdf_to_markdown(paper_path) + if conversion_result["success"]: + print(f"✅ PDF converted to markdown: {conversion_result['output_file']}") + # Update paper_path to point to markdown file + input_dict["paper_path"] = conversion_result["output_file"] + download_result = json.dumps(input_dict) + else: + print(f"⚠️ PDF conversion failed: {conversion_result.get('error')}") + download_result = input_source + except Exception as e: + print(f"⚠️ PDF conversion error: {e}") + download_result = input_source + else: + download_result = input_source + + skip_processing = True + except: + pass # Not JSON, continue normal processing + + if not skip_processing and isinstance(input_source, str) and ( input_source.endswith((".pdf", ".docx", ".txt", ".html", ".md")) or input_source.startswith(("http", "file://")) ): @@ -1416,21 +1546,25 @@ async def execute_multi_agent_research_pipeline( ) = await orchestrate_research_analysis_agent( input_source, logger, progress_callback ) - else: + elif not skip_processing: download_result = input_source # Use input directly if already processed - # Phase 3: Workspace Infrastructure Synthesis + # Phase 3: Workspace Infrastructure Synthesis (40%) if progress_callback: progress_callback( 40, "🏗️ Synthesizing intelligent workspace infrastructure..." ) + print("📊 Progress: 40% - Workspace Setup") dir_info = await synthesize_workspace_infrastructure_agent( download_result, logger, workspace_dir ) await asyncio.sleep(30) - # Phase 3.5: Document Segmentation and Preprocessing + # Phase 4: Document Segmentation and Preprocessing (50%) + if progress_callback: + progress_callback(50, "📄 Processing and segmenting document content...") + print("📊 Progress: 50% - Document Preprocessing") segmentation_result = await orchestrate_document_preprocessing_agent( dir_info, logger @@ -1456,10 +1590,18 @@ async def execute_multi_agent_research_pipeline( f"⚠️ Document preprocessing encountered issues: {segmentation_result.get('error_message', 'Unknown')}" ) - # Phase 4: Code Planning Orchestration + # Phase 5: Code Planning Orchestration (65%) + if progress_callback: + progress_callback(65, "📋 Generating implementation plan and code structure...") + print("📊 Progress: 65% - Code Planning") + await orchestrate_code_planning_agent(dir_info, logger, progress_callback) - # Phase 5: Reference Intelligence (only when indexing is enabled) + # Phase 6: Reference Intelligence (only when indexing is enabled) (70%) + if progress_callback: + progress_callback(70, "🔍 Analyzing references and related work...") + print("📊 Progress: 70% - Reference Analysis") + if enable_indexing: reference_result = await orchestrate_reference_intelligence_agent( dir_info, logger, progress_callback @@ -1471,7 +1613,11 @@ async def execute_multi_agent_research_pipeline( with open(dir_info["reference_path"], "w", encoding="utf-8") as f: f.write(reference_result) - # Phase 6: Repository Acquisition Automation (optional) + # Phase 7: Repository Acquisition Automation (optional) (75%) + if progress_callback: + progress_callback(75, "📦 Acquiring related repositories and codebases...") + print("📊 Progress: 75% - Repository Acquisition") + if enable_indexing: await automate_repository_acquisition_agent( reference_result, dir_info, logger, progress_callback @@ -1484,7 +1630,11 @@ async def execute_multi_agent_research_pipeline( "Automated repository acquisition skipped - fast mode enabled for optimized processing" ) - # Phase 7: Codebase Intelligence Orchestration (optional) + # Phase 8: Codebase Intelligence Orchestration (optional) (80%) + if progress_callback: + progress_callback(80, "🧠 Analyzing codebase intelligence and indexing...") + print("📊 Progress: 80% - Codebase Intelligence") + if enable_indexing: index_result = await orchestrate_codebase_intelligence_agent( dir_info, logger, progress_callback @@ -1500,11 +1650,20 @@ async def execute_multi_agent_research_pipeline( with open(dir_info["index_report_path"], "w", encoding="utf-8") as f: f.write(str(index_result)) - # Phase 8: Code Implementation Synthesis + # Phase 9: Code Implementation Synthesis (85%) + if progress_callback: + progress_callback(85, "💻 Implementing code based on analysis and planning...") + print("📊 Progress: 85% - Code Implementation") + implementation_result = await synthesize_code_implementation_agent( dir_info, logger, progress_callback, enable_indexing ) + # Phase 10: Finalization (100%) + if progress_callback: + progress_callback(100, "🎉 Finalizing results and generating summary...") + print("📊 Progress: 100% - Finalization") + # Final Status Report if enable_indexing: pipeline_summary = ( @@ -1546,7 +1705,18 @@ async def execute_multi_agent_research_pipeline( return pipeline_summary except Exception as e: - print(f"Error in execute_multi_agent_research_pipeline: {e}") + error_msg = f"Error in execute_multi_agent_research_pipeline: {e}" + print(f"❌ {error_msg}") + print(f" Error type: {type(e).__name__}") + print(f" Error details: {str(e)}") + + # Display error in UI if progress callback available + if progress_callback: + progress_callback(0, "Pipeline failed", error_msg) + + # Ensure all resources are cleaned up on error + import gc + gc.collect() raise e diff --git a/workflows/code_implementation_workflow.py b/workflows/code_implementation_workflow.py index c18ddd95..aae99b9f 100644 --- a/workflows/code_implementation_workflow.py +++ b/workflows/code_implementation_workflow.py @@ -18,6 +18,7 @@ import sys import time import yaml +from utils.loop_detector import LoopDetector, ProgressTracker from pathlib import Path from typing import Dict, Any, Optional, List @@ -59,6 +60,8 @@ def __init__(self, config_path: str = "mcp_agent.secrets.yaml"): self.enable_read_tools = ( True # Default value, will be overridden by run_workflow parameter ) + self.loop_detector = LoopDetector() + self.progress_tracker = ProgressTracker() def _load_api_config(self) -> Dict[str, Any]: """Load API configuration from YAML file""" @@ -299,6 +302,10 @@ async def _pure_code_implementation_loop( self.mcp_agent, self.logger, self.enable_read_tools ) memory_agent = ConciseMemoryAgent(plan_content, self.logger, target_directory) + + # Initialize progress tracker for file-level progress + self.progress_tracker = ProgressTracker() + self.progress_tracker.set_phase("Code Implementation", 85) # Log read tools configuration read_tools_status = "ENABLED" if self.enable_read_tools else "DISABLED" @@ -324,6 +331,22 @@ async def _pure_code_implementation_loop( if elapsed_time > max_time: self.logger.warning(f"Time limit reached: {elapsed_time:.2f}s") break + + # Check for loops and timeouts + if self.loop_detector.should_abort(): + abort_reason = self.loop_detector.get_abort_reason() + self.logger.error(f"🛑 Process aborted: {abort_reason}") + # Return error immediately instead of continuing to final report + return f"❌ Process aborted due to: {abort_reason}\n\nThe code implementation was stopped because the system detected an issue that prevented progress. Please check the logs for more details." + + # Update file-level progress + files_implemented = code_agent.get_files_implemented_count() + if files_implemented > 0: + self.progress_tracker.total_files = max(self.progress_tracker.total_files, files_implemented + 5) # Estimate total + progress_info = self.progress_tracker.get_progress_info() + print(f"📁 Files: {progress_info['files_completed']}/{progress_info['total_files']} ({progress_info['file_progress']:.1f}%)") + if progress_info['estimated_remaining_seconds'] > 0: + print(f"⏱️ Estimated remaining: {progress_info['estimated_remaining_seconds']:.0f}s") # # Test simplified memory approach if we have files implemented # if iteration == 5 and code_agent.get_files_implemented_count() > 0: @@ -351,12 +374,36 @@ async def _pure_code_implementation_loop( # Handle tool calls if response.get("tool_calls"): + # Check for loops before executing tools + for tool_call in response["tool_calls"]: + loop_status = self.loop_detector.check_tool_call(tool_call["name"]) + if loop_status["should_stop"]: + self.logger.error(f"🛑 Tool execution aborted: {loop_status['message']}") + return f"Process aborted: {loop_status['message']}" + tool_results = await code_agent.execute_tool_calls( response["tool_calls"] ) # Record essential tool results in concise memory agent for tool_call, tool_result in zip(response["tool_calls"], tool_results): + # Check if tool actually failed + # Only count as error if isError flag is True + is_error = tool_result.get("isError", False) + + if not is_error: + # Tool succeeded + self.loop_detector.record_success() + + # Track file completion + if tool_call["name"] == "write_file": + filename = tool_call["input"].get("file_path", "unknown") + self.progress_tracker.complete_file(filename) + print(f"✅ File completed: {filename}") + else: + # Tool actually failed + self.loop_detector.record_error(f"Tool {tool_call['name']} failed: {tool_result.get('result', '')[:100]}") + memory_agent.record_tool_result( tool_name=tool_call["name"], tool_input=tool_call["input"], @@ -451,8 +498,8 @@ async def _initialize_mcp_agent(self, code_directory: str): try: self.mcp_agent = Agent( name="CodeImplementationAgent", - instruction="You are a code implementation assistant, using MCP tools to implement paper code replication.", - server_names=["code-implementation", "code-reference-indexer"], + instruction="You are a code implementation assistant, using MCP tools to implement paper code replication. For large documents, use document-segmentation tools to read content in smaller chunks to avoid token limits.", + server_names=["code-implementation", "code-reference-indexer", "document-segmentation"], ) await self.mcp_agent.__aenter__() @@ -581,7 +628,7 @@ async def _call_llm_with_tools( async def _call_anthropic_with_tools( self, client, system_message, messages, tools, max_tokens ): - """Call Anthropic API""" + """Call Anthropic API with token limit management""" validated_messages = self._validate_messages(messages) if not validated_messages: validated_messages = [ @@ -612,7 +659,34 @@ async def _call_anthropic_with_tools( {"id": block.id, "name": block.name, "input": block.input} ) - return {"content": content, "tool_calls": tool_calls} + # Extract token usage and calculate cost + token_usage = {} + cost = 0.0 + + if hasattr(response, 'usage') and response.usage: + token_usage = { + "input_tokens": response.usage.input_tokens, + "output_tokens": response.usage.output_tokens, + "total_tokens": response.usage.input_tokens + response.usage.output_tokens + } + + # Use dynamic cost calculation based on current model + from utils.model_limits import calculate_token_cost + cost = calculate_token_cost( + response.usage.input_tokens, + response.usage.output_tokens, + model_name=self.default_models.get("anthropic") + ) + + print(f"💰 Tokens: {token_usage['total_tokens']} (${cost:.4f})") + self.logger.info(f"Token usage: {token_usage['input_tokens']} input + {token_usage['output_tokens']} output = {token_usage['total_tokens']} total (${cost:.4f})") + + return { + "content": content, + "tool_calls": tool_calls, + "token_usage": token_usage, + "cost": cost + } def _repair_truncated_json(self, json_str: str, tool_name: str = "") -> dict: """ @@ -789,7 +863,34 @@ async def _call_openai_with_tools( print(" ⚠️ Skipping unrepairable tool call") continue - return {"content": content, "tool_calls": tool_calls} + # Extract token usage and calculate cost + token_usage = {} + cost = 0.0 + + if hasattr(response, 'usage') and response.usage: + token_usage = { + "prompt_tokens": response.usage.prompt_tokens, + "completion_tokens": response.usage.completion_tokens, + "total_tokens": response.usage.total_tokens + } + + # Use dynamic cost calculation based on current model + from utils.model_limits import calculate_token_cost + cost = calculate_token_cost( + response.usage.prompt_tokens, + response.usage.completion_tokens, + model_name=self.default_models.get("openai") + ) + + print(f"💰 Tokens: {token_usage['total_tokens']} (${cost:.4f})") + self.logger.info(f"Token usage: {token_usage['prompt_tokens']} prompt + {token_usage['completion_tokens']} completion = {token_usage['total_tokens']} total (${cost:.4f})") + + return { + "content": content, + "tool_calls": tool_calls, + "token_usage": token_usage, + "cost": cost + } # ==================== 5. Tools and Utility Methods (Utility Layer) ==================== diff --git a/workflows/code_implementation_workflow_index.py b/workflows/code_implementation_workflow_index.py index f7d0aa4b..ec56d107 100644 --- a/workflows/code_implementation_workflow_index.py +++ b/workflows/code_implementation_workflow_index.py @@ -18,6 +18,7 @@ import sys import time import yaml +from utils.loop_detector import LoopDetector, ProgressTracker from pathlib import Path from typing import Dict, Any, Optional, List @@ -60,6 +61,8 @@ def __init__(self, config_path: str = "mcp_agent.secrets.yaml"): self.enable_read_tools = ( True # Default value, will be overridden by run_workflow parameter ) + self.loop_detector = LoopDetector() + self.progress_tracker = ProgressTracker() def _load_api_config(self) -> Dict[str, Any]: """Load API configuration from YAML file""" @@ -452,8 +455,8 @@ async def _initialize_mcp_agent(self, code_directory: str): try: self.mcp_agent = Agent( name="CodeImplementationAgent", - instruction="You are a code implementation assistant, using MCP tools to implement paper code replication.", - server_names=["code-implementation", "code-reference-indexer"], + instruction="You are a code implementation assistant, using MCP tools to implement paper code replication. For large documents, use document-segmentation tools to read content in smaller chunks to avoid token limits.", + server_names=["code-implementation", "code-reference-indexer", "document-segmentation"], ) await self.mcp_agent.__aenter__() @@ -582,7 +585,7 @@ async def _call_llm_with_tools( async def _call_anthropic_with_tools( self, client, system_message, messages, tools, max_tokens ): - """Call Anthropic API""" + """Call Anthropic API with token limit management""" validated_messages = self._validate_messages(messages) if not validated_messages: validated_messages = [ @@ -613,7 +616,34 @@ async def _call_anthropic_with_tools( {"id": block.id, "name": block.name, "input": block.input} ) - return {"content": content, "tool_calls": tool_calls} + # Extract token usage and calculate cost + token_usage = {} + cost = 0.0 + + if hasattr(response, 'usage') and response.usage: + token_usage = { + "input_tokens": response.usage.input_tokens, + "output_tokens": response.usage.output_tokens, + "total_tokens": response.usage.input_tokens + response.usage.output_tokens + } + + # Use dynamic cost calculation based on current model + from utils.model_limits import calculate_token_cost + cost = calculate_token_cost( + response.usage.input_tokens, + response.usage.output_tokens, + model_name=self.default_models.get("anthropic") + ) + + print(f"💰 Tokens: {token_usage['total_tokens']} (${cost:.4f})") + self.logger.info(f"Token usage: {token_usage['input_tokens']} input + {token_usage['output_tokens']} output = {token_usage['total_tokens']} total (${cost:.4f})") + + return { + "content": content, + "tool_calls": tool_calls, + "token_usage": token_usage, + "cost": cost + } def _repair_truncated_json(self, json_str: str, tool_name: str = "") -> dict: """ @@ -790,7 +820,34 @@ async def _call_openai_with_tools( print(" ⚠️ Skipping unrepairable tool call") continue - return {"content": content, "tool_calls": tool_calls} + # Extract token usage and calculate cost + token_usage = {} + cost = 0.0 + + if hasattr(response, 'usage') and response.usage: + token_usage = { + "prompt_tokens": response.usage.prompt_tokens, + "completion_tokens": response.usage.completion_tokens, + "total_tokens": response.usage.total_tokens + } + + # Use dynamic cost calculation based on current model + from utils.model_limits import calculate_token_cost + cost = calculate_token_cost( + response.usage.prompt_tokens, + response.usage.completion_tokens, + model_name=self.default_models.get("openai") + ) + + print(f"💰 Tokens: {token_usage['total_tokens']} (${cost:.4f})") + self.logger.info(f"Token usage: {token_usage['prompt_tokens']} prompt + {token_usage['completion_tokens']} completion = {token_usage['total_tokens']} total (${cost:.4f})") + + return { + "content": content, + "tool_calls": tool_calls, + "token_usage": token_usage, + "cost": cost + } # ==================== 5. Tools and Utility Methods (Utility Layer) ====================