From 0e640932c34af78020211274a8e516374f2cdfc5 Mon Sep 17 00:00:00 2001 From: Yuxiang Wu Date: Mon, 12 May 2025 22:18:04 +0100 Subject: [PATCH 1/7] feat: Implement CLI liveness tracking and termination reporting Introduces mechanisms for the CLI to signal its liveness to the backend and report its termination reason, ensuring more robust state management. - Adds a background heartbeat thread (`HeartbeatSender`) that periodically sends PUT requests to `/api/sessions/{session_id}/heartbeat` every 30 seconds during an active run via the new `send_heartbeat` API function. Heartbeat failures are logged but non-fatal. - Implements signal handlers for SIGINT and SIGTERM to gracefully stop the heartbeat thread and report termination due to user interruption via a new `report_termination` API function and endpoint (`/terminate`). - Wraps the main optimization loop in a `try...except...finally` block: - Catches runtime errors during the optimization loop. - Ensures the heartbeat thread is stopped on exit (normal completion, error, or signal). - Calls `report_termination` in the `finally` block to inform the backend of the final status (completed or error), unless already reported by the signal handler. - Improves error handling for existing API calls (`start_optimization_session`, `evaluate_feedback_then_suggest_next_solution`) to be more specific and prevent unexpected exits. --- weco/api.py | 91 +++++++++++++++++++++++- weco/cli.py | 196 ++++++++++++++++++++++++++++++++++++++++++++-------- 2 files changed, 257 insertions(+), 30 deletions(-) diff --git a/weco/api.py b/weco/api.py index 99646ce..9371244 100644 --- a/weco/api.py +++ b/weco/api.py @@ -1,14 +1,20 @@ -from typing import Dict, Any +from typing import Dict, Any, Optional import rich import requests from weco import __pkg_version__, __base_url__ import sys +from rich.console import Console def handle_api_error(e: requests.exceptions.HTTPError, console: rich.console.Console) -> None: """Extract and display error messages from API responses in a structured format.""" - console.print(f"[bold red]{e.response.json()['detail']}[/]") - sys.exit(1) + try: + detail = e.response.json()['detail'] + except (ValueError, KeyError): # Handle cases where response is not JSON or detail key is missing + detail = f"HTTP {e.response.status_code} Error: {e.response.text}" + console.print(f"[bold red]{detail}[/]") + # Avoid exiting here, let the caller decide if the error is fatal + # sys.exit(1) def start_optimization_session( @@ -28,6 +34,7 @@ def start_optimization_session( ) -> Dict[str, Any]: """Start the optimization session.""" with console.status("[bold green]Starting Optimization..."): + try: response = requests.post( f"{__base_url__}/sessions", # Path is relative to base_url json={ @@ -47,6 +54,12 @@ def start_optimization_session( ) response.raise_for_status() return response.json() + except requests.exceptions.HTTPError as e: + handle_api_error(e, console) + sys.exit(1) # Exit if starting session fails + except requests.exceptions.RequestException as e: + console.print(f"[bold red]Network Error starting session: {e}[/]") + sys.exit(1) def evaluate_feedback_then_suggest_next_solution( @@ -58,6 +71,7 @@ def evaluate_feedback_then_suggest_next_solution( timeout: int = 800, ) -> Dict[str, Any]: """Evaluate the feedback and suggest the next solution.""" + try: response = requests.post( f"{__base_url__}/sessions/{session_id}/suggest", # Path is relative to base_url json={ @@ -70,12 +84,20 @@ def evaluate_feedback_then_suggest_next_solution( ) response.raise_for_status() return response.json() + except requests.exceptions.HTTPError as e: + # Allow caller to handle suggest errors, maybe retry or terminate + handle_api_error(e, Console()) # Use default console if none passed + raise # Re-raise the exception + except requests.exceptions.RequestException as e: + print(f"[bold red]Network Error during suggest: {e}[/]") # Use print as console might not be available + raise # Re-raise the exception def get_optimization_session_status( session_id: str, include_history: bool = False, auth_headers: dict = {}, timeout: int = 800 ) -> Dict[str, Any]: """Get the current status of the optimization session.""" + try: response = requests.get( f"{__base_url__}/sessions/{session_id}", # Path is relative to base_url params={"include_history": include_history}, @@ -84,3 +106,66 @@ def get_optimization_session_status( ) response.raise_for_status() return response.json() + except requests.exceptions.HTTPError as e: + handle_api_error(e, Console()) # Use default console + raise # Re-raise + except requests.exceptions.RequestException as e: + print(f"[bold red]Network Error getting status: {e}[/]") + raise # Re-raise + + +def send_heartbeat( + session_id: str, + auth_headers: dict = {}, + timeout: int = 10 # Shorter timeout for non-critical heartbeat +) -> bool: + """Send a heartbeat signal to the backend.""" + try: + response = requests.put( + f"{__base_url__}/sessions/{session_id}/heartbeat", + headers=auth_headers, + timeout=timeout, + ) + response.raise_for_status() # Raises HTTPError for bad responses (4xx or 5xx) + return True + except requests.exceptions.HTTPError as e: + # Log non-critical errors like 409 Conflict (session not running) + if e.response.status_code == 409: + print(f"[yellow]Heartbeat ignored: Session {session_id} is not running.[/yellow]", file=sys.stderr) + else: + print(f"[yellow]Heartbeat failed for session {session_id}: HTTP {e.response.status_code}[/yellow]", file=sys.stderr) + # Don't exit, just report failure + return False + except requests.exceptions.RequestException as e: + # Network errors are also non-fatal for heartbeats + print(f"[yellow]Heartbeat network error for session {session_id}: {e}[/yellow]", file=sys.stderr) + return False + + +def report_termination( + session_id: str, + status_update: str, + reason: str, + details: Optional[str] = None, + auth_headers: dict = {}, + timeout: int = 30 # Reasonably longer timeout for important termination message +) -> bool: + """Report the termination reason to the backend.""" + try: + response = requests.post( + f"{__base_url__}/sessions/{session_id}/terminate", + json={ + "status_update": status_update, + "termination_reason": reason, + "termination_details": details, + }, + headers=auth_headers, + timeout=timeout, + ) + response.raise_for_status() + print(f"[dim]Termination reported to backend (Status: {status_update}, Reason: {reason}).[/dim]", file=sys.stderr) + return True + except requests.exceptions.RequestException as e: + # Log failure, but don't prevent CLI exit + print(f"[bold yellow]Warning: Failed to report termination to backend for session {session_id}: {e}[/bold yellow]", file=sys.stderr) + return False diff --git a/weco/cli.py b/weco/cli.py index 56d81f8..a8ff630 100644 --- a/weco/cli.py +++ b/weco/cli.py @@ -5,6 +5,9 @@ import time import requests import webbrowser +import threading +import signal +import traceback from rich.console import Console from rich.live import Live from rich.panel import Panel @@ -15,6 +18,8 @@ evaluate_feedback_then_suggest_next_solution, get_optimization_session_status, handle_api_error, + send_heartbeat, + report_termination, ) from . import __base_url__ @@ -42,6 +47,54 @@ install(show_locals=True) console = Console() +# --- Global variable for heartbeat thread --- +heartbeat_thread = None +stop_heartbeat_event = threading.Event() +current_session_id_for_heartbeat = None +current_auth_headers_for_heartbeat = {} + +# --- Heartbeat Sender Class --- +class HeartbeatSender(threading.Thread): + def __init__(self, session_id: str, auth_headers: dict, stop_event: threading.Event, interval: int = 30): + super().__init__(daemon=True) # Daemon thread exits when main thread exits + self.session_id = session_id + self.auth_headers = auth_headers + self.interval = interval + self.stop_event = stop_event + + def run(self): + print("[dim]Heartbeat thread started.[/dim]", file=sys.stderr) + while not self.stop_event.is_set(): + if not send_heartbeat(self.session_id, self.auth_headers): + # Log failure, but continue trying (backend timeout is the fallback) + pass # Error is printed within send_heartbeat + # Wait for the interval OR until the stop event is set + self.stop_event.wait(self.interval) + print("[dim]Heartbeat thread stopped.[/dim]", file=sys.stderr) + +# --- Signal Handling --- +def signal_handler(signum, frame): + signal_name = signal.Signals(signum).name + console.print(f"\n[bold yellow]Termination signal ({signal_name}) received. Shutting down...[/]") + + # Stop heartbeat thread + stop_heartbeat_event.set() + if heartbeat_thread and heartbeat_thread.is_alive(): + heartbeat_thread.join(timeout=2) # Give it a moment to stop + + # Report termination (best effort) + if current_session_id_for_heartbeat: + report_termination( + session_id=current_session_id_for_heartbeat, + status_update="terminated", + reason=f"user_terminated_{signal_name.lower()}", + details=f"Process terminated by signal {signal_name} ({signum}).", + auth_headers=current_auth_headers_for_heartbeat + ) + + # Exit gracefully + sys.exit(0) + def perform_login(console: Console): """Handles the device login flow.""" @@ -161,6 +214,10 @@ def perform_login(console: Console): def main() -> None: """Main function for the Weco CLI.""" + # Setup signal handlers + signal.signal(signal.SIGINT, signal_handler) + signal.signal(signal.SIGTERM, signal_handler) + # --- Argument Parsing --- parser = argparse.ArgumentParser( description="[bold cyan]Weco CLI[/]", formatter_class=argparse.RawDescriptionHelpFormatter @@ -212,6 +269,10 @@ def main() -> None: # --- Handle Run Command --- elif args.command == "run": + global heartbeat_thread, current_session_id_for_heartbeat, current_auth_headers_for_heartbeat # Allow modification of globals + + session_id = None # Initialize session_id + optimization_completed_normally = False # Flag for finally block # --- Check Authentication --- weco_api_key = load_weco_api_key() llm_api_keys = read_api_keys_from_env() # Read keys from client environment @@ -243,10 +304,9 @@ def main() -> None: # --- Prepare API Call Arguments --- auth_headers = {} - if weco_api_key: auth_headers["Authorization"] = f"Bearer {weco_api_key}" - # Backend will decide whether to use client keys based on auth status + current_auth_headers_for_heartbeat = auth_headers # Store for signal handler # --- Main Run Logic --- try: @@ -294,16 +354,22 @@ def main() -> None: evaluator_config=evaluator_config, search_policy_config=search_policy_config, additional_instructions=additional_instructions, - api_keys=llm_api_keys, # Pass client LLM keys - auth_headers=auth_headers, # Pass Weco key if logged in + api_keys=llm_api_keys, + auth_headers=auth_headers, timeout=timeout, ) + session_id = session_response["session_id"] + current_session_id_for_heartbeat = session_id # Store for signal handler/finally + + # --- Start Heartbeat Thread --- + stop_heartbeat_event.clear() # Ensure event is clear before starting + heartbeat_thread = HeartbeatSender(session_id, auth_headers, stop_heartbeat_event) + heartbeat_thread.start() # --- Live Update Loop --- refresh_rate = 4 with Live(layout, refresh_per_second=refresh_rate, screen=True) as live: # Define the runs directory (.runs/) - session_id = session_response["session_id"] runs_dir = pathlib.Path(args.log_dir) / session_id runs_dir.mkdir(parents=True, exist_ok=True) @@ -364,6 +430,9 @@ def main() -> None: transition_delay=0.1, ) + # Send initial heartbeat immediately after starting + send_heartbeat(session_id, auth_headers) + # Run evaluation on the initial solution term_out = run_evaluation(eval_command=args.eval_command) @@ -384,14 +453,22 @@ def main() -> None: ) # Send feedback and get next suggestion - eval_and_next_solution_response = evaluate_feedback_then_suggest_next_solution( - session_id=session_id, - execution_output=term_out, - additional_instructions=current_additional_instructions, # Pass current instructions - api_keys=llm_api_keys, # Pass client LLM keys - auth_headers=auth_headers, # Pass Weco key if logged in - timeout=timeout, - ) + try: + eval_and_next_solution_response = evaluate_feedback_then_suggest_next_solution( + session_id=session_id, + execution_output=term_out, + additional_instructions=current_additional_instructions, # Pass current instructions + api_keys=llm_api_keys, # Pass client LLM keys + auth_headers=auth_headers, # Pass Weco key if logged in + timeout=timeout, + ) + except (requests.exceptions.HTTPError, requests.exceptions.RequestException) as suggest_error: + # If suggest fails, we consider it a CLI error and break the loop + console.print(f"\n[bold red]Error communicating with API during step {step}. Stopping optimization.[/]") + # Error details should have been printed by handle_api_error or the exception handler in evaluate_feedback_then_suggest_next_solution + # Prepare to report error in finally block + raise RuntimeError(f"API error during suggest at step {step}") from suggest_error + # Save next solution (.runs//step_.) write_to_path( fp=runs_dir / f"step_{step}{source_fp.suffix}", content=eval_and_next_solution_response["code"] @@ -483,15 +560,19 @@ def main() -> None: additional_instructions=args.additional_instructions ) - # Ensure we pass evaluation results for the last step's generated solution - eval_and_next_solution_response = evaluate_feedback_then_suggest_next_solution( - session_id=session_id, - execution_output=term_out, - additional_instructions=current_additional_instructions, - api_keys=llm_api_keys, - timeout=timeout, - auth_headers=auth_headers, - ) + # Final evaluation report + try: + eval_and_next_solution_response = evaluate_feedback_then_suggest_next_solution( + session_id=session_id, + execution_output=term_out, + additional_instructions=current_additional_instructions, + api_keys=llm_api_keys, + timeout=timeout, + auth_headers=auth_headers, + ) + except (requests.exceptions.HTTPError, requests.exceptions.RequestException) as suggest_error: + console.print("\n[bold red]Error communicating with API during final evaluation report. Results might be incomplete.[/]") + raise RuntimeError("API error during final suggest call") from suggest_error # Update the progress bar summary_panel.set_step(step=steps) @@ -563,14 +644,75 @@ def main() -> None: if not args.preserve_source: write_to_path(fp=source_fp, content=best_solution_content) + # Mark as completed normally for the finally block + optimization_completed_normally = True + console.print(end_optimization_layout) except Exception as e: + # Catch errors during the main optimization loop or setup try: - error_message = e.response.json()["detail"] + error_message = e.response.json()["detail"] # Try to get API error detail except Exception: - error_message = str(e) - console.print(Panel(f"[bold red]Error: {error_message}", title="[bold red]Error", border_style="red")) - # Print traceback for debugging + error_message = str(e) # Otherwise, use the exception string + console.print(Panel(f"[bold red]Error: {error_message}", title="[bold red]Optimization Error", border_style="red")) + # Print traceback for debugging if needed (can be noisy) # console.print_exception(show_locals=False) - sys.exit(1) + + # Ensure optimization_completed_normally is False + optimization_completed_normally = False + + # Prepare details for termination report + error_details = traceback.format_exc() + + # Exit code will be handled by finally block or sys.exit below + exit_code = 1 # Indicate error + # No sys.exit here, let finally block run + + finally: + # This block runs whether the try block completed normally or raised an exception + + # Stop heartbeat thread + stop_heartbeat_event.set() + if heartbeat_thread and heartbeat_thread.is_alive(): + print("[dim]Waiting for heartbeat thread to finish...[/dim]", file=sys.stderr) + heartbeat_thread.join(timeout=2) # Give it a moment to stop + + # Report final status if a session was started + if session_id: + final_status = "unknown" + final_reason = "unknown_termination" + final_details = None + + if optimization_completed_normally: + final_status = "completed" + final_reason = "completed_successfully" + else: + # If an exception was caught and we have details + if 'error_details' in locals(): + final_status = "error" + final_reason = "error_cli_internal" + final_details = error_details + # else: # Should have been handled by signal handler if terminated by user + # Keep default 'unknown' if we somehow end up here without error/completion/signal + + # Avoid reporting if terminated by signal handler (already reported) + # Check a flag or rely on status not being 'unknown' + if final_status != "unknown": + report_termination( + session_id=session_id, + status_update=final_status, + reason=final_reason, + details=final_details, + auth_headers=auth_headers + ) + + # Ensure proper exit code if an error occurred + if not optimization_completed_normally and 'exit_code' in locals() and exit_code != 0: + sys.exit(exit_code) + elif not optimization_completed_normally: + # Generic error exit if no specific code was set but try block failed + sys.exit(1) + else: + # Normal exit + sys.exit(0) From 6ad7b802c62137d20c0ab7b3b4696d70e3fd380f Mon Sep 17 00:00:00 2001 From: Yuxiang Wu Date: Tue, 13 May 2025 00:30:51 +0100 Subject: [PATCH 2/7] bugfix: try except indention error fixed --- weco/api.py | 76 ++++++++++++++++++++++++++--------------------------- 1 file changed, 38 insertions(+), 38 deletions(-) diff --git a/weco/api.py b/weco/api.py index 9371244..aee0214 100644 --- a/weco/api.py +++ b/weco/api.py @@ -35,25 +35,25 @@ def start_optimization_session( """Start the optimization session.""" with console.status("[bold green]Starting Optimization..."): try: - response = requests.post( - f"{__base_url__}/sessions", # Path is relative to base_url - json={ - "source_code": source_code, - "additional_instructions": additional_instructions, - "objective": {"evaluation_command": evaluation_command, "metric_name": metric_name, "maximize": maximize}, - "optimizer": { - "steps": steps, - "code_generator": code_generator_config, - "evaluator": evaluator_config, - "search_policy": search_policy_config, + response = requests.post( + f"{__base_url__}/sessions", # Path is relative to base_url + json={ + "source_code": source_code, + "additional_instructions": additional_instructions, + "objective": {"evaluation_command": evaluation_command, "metric_name": metric_name, "maximize": maximize}, + "optimizer": { + "steps": steps, + "code_generator": code_generator_config, + "evaluator": evaluator_config, + "search_policy": search_policy_config, + }, + "metadata": {"client_name": "cli", "client_version": __pkg_version__, **api_keys}, }, - "metadata": {"client_name": "cli", "client_version": __pkg_version__, **api_keys}, - }, - headers=auth_headers, # Add headers - timeout=timeout, - ) - response.raise_for_status() - return response.json() + headers=auth_headers, # Add headers + timeout=timeout, + ) + response.raise_for_status() + return response.json() except requests.exceptions.HTTPError as e: handle_api_error(e, console) sys.exit(1) # Exit if starting session fails @@ -72,18 +72,18 @@ def evaluate_feedback_then_suggest_next_solution( ) -> Dict[str, Any]: """Evaluate the feedback and suggest the next solution.""" try: - response = requests.post( - f"{__base_url__}/sessions/{session_id}/suggest", # Path is relative to base_url - json={ - "execution_output": execution_output, - "additional_instructions": additional_instructions, - "metadata": {**api_keys}, - }, - headers=auth_headers, # Add headers - timeout=timeout, - ) - response.raise_for_status() - return response.json() + response = requests.post( + f"{__base_url__}/sessions/{session_id}/suggest", # Path is relative to base_url + json={ + "execution_output": execution_output, + "additional_instructions": additional_instructions, + "metadata": {**api_keys}, + }, + headers=auth_headers, # Add headers + timeout=timeout, + ) + response.raise_for_status() + return response.json() except requests.exceptions.HTTPError as e: # Allow caller to handle suggest errors, maybe retry or terminate handle_api_error(e, Console()) # Use default console if none passed @@ -98,14 +98,14 @@ def get_optimization_session_status( ) -> Dict[str, Any]: """Get the current status of the optimization session.""" try: - response = requests.get( - f"{__base_url__}/sessions/{session_id}", # Path is relative to base_url - params={"include_history": include_history}, - headers=auth_headers, - timeout=timeout, - ) - response.raise_for_status() - return response.json() + response = requests.get( + f"{__base_url__}/sessions/{session_id}", # Path is relative to base_url + params={"include_history": include_history}, + headers=auth_headers, + timeout=timeout, + ) + response.raise_for_status() + return response.json() except requests.exceptions.HTTPError as e: handle_api_error(e, Console()) # Use default console raise # Re-raise From 03dee29a84f07181ede1820eed3d4ca471c68257 Mon Sep 17 00:00:00 2001 From: "Yuxiang (Jimmy) Wu" Date: Tue, 13 May 2025 16:31:51 +0100 Subject: [PATCH 3/7] Update weco/cli.py using the rich Console's print method instead of print with sys.stderr to maintain consistent output styling across the CLI. Co-authored-by: Copilot <175728472+Copilot@users.noreply.github.com> --- weco/cli.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/weco/cli.py b/weco/cli.py index 2dfd0f7..534fc54 100644 --- a/weco/cli.py +++ b/weco/cli.py @@ -63,14 +63,14 @@ def __init__(self, session_id: str, auth_headers: dict, stop_event: threading.Ev self.stop_event = stop_event def run(self): - print("[dim]Heartbeat thread started.[/dim]", file=sys.stderr) + console.print("[dim]Heartbeat thread started.[/dim]", stderr=True) while not self.stop_event.is_set(): if not send_heartbeat(self.session_id, self.auth_headers): # Log failure, but continue trying (backend timeout is the fallback) pass # Error is printed within send_heartbeat # Wait for the interval OR until the stop event is set self.stop_event.wait(self.interval) - print("[dim]Heartbeat thread stopped.[/dim]", file=sys.stderr) + console.print("[dim]Heartbeat thread stopped.[/dim]", stderr=True) # --- Signal Handling --- def signal_handler(signum, frame): From a418e6e8fabdbfd6b4a2b1934fd4488053195de2 Mon Sep 17 00:00:00 2001 From: Yuxiang Wu Date: Wed, 14 May 2025 22:27:16 +0100 Subject: [PATCH 4/7] Fix: Ensure HeartbeatSender thread robustness and prevent silent termination The HeartbeatSender thread was intermittently dying silently, leading to the backend not receiving heartbeats and prematurely marking the CLI session as errored. This commit addresses the issue by: 1. Replacing Rich `console.print` calls within the `HeartbeatSender.run()` method with standard Python `print(..., file=sys.stderr)`. This mitigates potential intermittent exceptions when using Rich `Console` methods from a background thread, which was identified as the likely primary cause of the silent thread crashes. 2. Wrapping the main loop within `HeartbeatSender.run()` with a `try...except Exception...finally` block. This ensures that any other unexpected exceptions are caught, logged with a traceback to `stderr`, and allow the thread to terminate gracefully rather than silently. 3. Ensuring a log message is printed to `stderr` when the thread starts and when it is stopping (in the `finally` block). These changes make the heartbeat mechanism more resilient and improve debuggability by ensuring that thread failures are logged. --- weco/api.py | 51 ++++++++++++------------- weco/cli.py | 102 ++++++++++++++++++++++++++++++------------------- weco/panels.py | 2 +- 3 files changed, 87 insertions(+), 68 deletions(-) diff --git a/weco/api.py b/weco/api.py index aee0214..e0cfa49 100644 --- a/weco/api.py +++ b/weco/api.py @@ -9,9 +9,9 @@ def handle_api_error(e: requests.exceptions.HTTPError, console: rich.console.Console) -> None: """Extract and display error messages from API responses in a structured format.""" try: - detail = e.response.json()['detail'] - except (ValueError, KeyError): # Handle cases where response is not JSON or detail key is missing - detail = f"HTTP {e.response.status_code} Error: {e.response.text}" + detail = e.response.json()["detail"] + except (ValueError, KeyError): # Handle cases where response is not JSON or detail key is missing + detail = f"HTTP {e.response.status_code} Error: {e.response.text}" console.print(f"[bold red]{detail}[/]") # Avoid exiting here, let the caller decide if the error is fatal # sys.exit(1) @@ -56,7 +56,7 @@ def start_optimization_session( return response.json() except requests.exceptions.HTTPError as e: handle_api_error(e, console) - sys.exit(1) # Exit if starting session fails + sys.exit(1) # Exit if starting session fails except requests.exceptions.RequestException as e: console.print(f"[bold red]Network Error starting session: {e}[/]") sys.exit(1) @@ -86,11 +86,11 @@ def evaluate_feedback_then_suggest_next_solution( return response.json() except requests.exceptions.HTTPError as e: # Allow caller to handle suggest errors, maybe retry or terminate - handle_api_error(e, Console()) # Use default console if none passed - raise # Re-raise the exception + handle_api_error(e, Console()) # Use default console if none passed + raise # Re-raise the exception except requests.exceptions.RequestException as e: - print(f"[bold red]Network Error during suggest: {e}[/]") # Use print as console might not be available - raise # Re-raise the exception + print(f"[bold red]Network Error during suggest: {e}[/]") # Use print as console might not be available + raise # Re-raise the exception def get_optimization_session_status( @@ -107,33 +107,31 @@ def get_optimization_session_status( response.raise_for_status() return response.json() except requests.exceptions.HTTPError as e: - handle_api_error(e, Console()) # Use default console - raise # Re-raise + handle_api_error(e, Console()) # Use default console + raise # Re-raise except requests.exceptions.RequestException as e: print(f"[bold red]Network Error getting status: {e}[/]") - raise # Re-raise + raise # Re-raise def send_heartbeat( session_id: str, auth_headers: dict = {}, - timeout: int = 10 # Shorter timeout for non-critical heartbeat + timeout: int = 10, # Shorter timeout for non-critical heartbeat ) -> bool: """Send a heartbeat signal to the backend.""" try: - response = requests.put( - f"{__base_url__}/sessions/{session_id}/heartbeat", - headers=auth_headers, - timeout=timeout, - ) - response.raise_for_status() # Raises HTTPError for bad responses (4xx or 5xx) + response = requests.put(f"{__base_url__}/sessions/{session_id}/heartbeat", headers=auth_headers, timeout=timeout) + response.raise_for_status() # Raises HTTPError for bad responses (4xx or 5xx) return True except requests.exceptions.HTTPError as e: # Log non-critical errors like 409 Conflict (session not running) if e.response.status_code == 409: - print(f"[yellow]Heartbeat ignored: Session {session_id} is not running.[/yellow]", file=sys.stderr) + print(f"[yellow]Heartbeat ignored: Session {session_id} is not running.[/yellow]", file=sys.stderr) else: - print(f"[yellow]Heartbeat failed for session {session_id}: HTTP {e.response.status_code}[/yellow]", file=sys.stderr) + print( + f"[yellow]Heartbeat failed for session {session_id}: HTTP {e.response.status_code}[/yellow]", file=sys.stderr + ) # Don't exit, just report failure return False except requests.exceptions.RequestException as e: @@ -148,17 +146,13 @@ def report_termination( reason: str, details: Optional[str] = None, auth_headers: dict = {}, - timeout: int = 30 # Reasonably longer timeout for important termination message + timeout: int = 30, # Reasonably longer timeout for important termination message ) -> bool: """Report the termination reason to the backend.""" try: response = requests.post( f"{__base_url__}/sessions/{session_id}/terminate", - json={ - "status_update": status_update, - "termination_reason": reason, - "termination_details": details, - }, + json={"status_update": status_update, "termination_reason": reason, "termination_details": details}, headers=auth_headers, timeout=timeout, ) @@ -167,5 +161,8 @@ def report_termination( return True except requests.exceptions.RequestException as e: # Log failure, but don't prevent CLI exit - print(f"[bold yellow]Warning: Failed to report termination to backend for session {session_id}: {e}[/bold yellow]", file=sys.stderr) + print( + f"[bold yellow]Warning: Failed to report termination to backend for session {session_id}: {e}[/bold yellow]", + file=sys.stderr, + ) return False diff --git a/weco/cli.py b/weco/cli.py index 534fc54..1890cc0 100644 --- a/weco/cli.py +++ b/weco/cli.py @@ -53,24 +53,42 @@ current_session_id_for_heartbeat = None current_auth_headers_for_heartbeat = {} + # --- Heartbeat Sender Class --- class HeartbeatSender(threading.Thread): def __init__(self, session_id: str, auth_headers: dict, stop_event: threading.Event, interval: int = 30): - super().__init__(daemon=True) # Daemon thread exits when main thread exits + super().__init__(daemon=True) # Daemon thread exits when main thread exits self.session_id = session_id self.auth_headers = auth_headers self.interval = interval self.stop_event = stop_event def run(self): - console.print("[dim]Heartbeat thread started.[/dim]", stderr=True) - while not self.stop_event.is_set(): - if not send_heartbeat(self.session_id, self.auth_headers): - # Log failure, but continue trying (backend timeout is the fallback) - pass # Error is printed within send_heartbeat - # Wait for the interval OR until the stop event is set - self.stop_event.wait(self.interval) - console.print("[dim]Heartbeat thread stopped.[/dim]", stderr=True) + # Using standard print for thread lifecycle messages for robustness + print(f"[HeartbeatSender] Thread for session {self.session_id} started.", file=sys.stderr) + try: + while not self.stop_event.is_set(): + if not send_heartbeat(self.session_id, self.auth_headers): + # send_heartbeat itself prints errors to stderr if it returns False + # No explicit HeartbeatSender log needed here unless more detail is desired for a False return + pass # Continue trying as per original logic + + if self.stop_event.is_set(): # Check before waiting for responsiveness + break + + self.stop_event.wait(self.interval) # Wait for interval or stop signal + + except Exception as e: + # Catch any unexpected error in the loop to prevent silent thread death + print( + f"[ERROR HeartbeatSender] Unhandled exception in run loop for session {self.session_id}: {e}", file=sys.stderr + ) + traceback.print_exc(file=sys.stderr) + # The loop will break due to the exception, and thread will terminate via finally. + finally: + # Ensure this message is printed when the thread is actually stopping + print(f"[HeartbeatSender] Thread for session {self.session_id} stopping.", file=sys.stderr) + # --- Signal Handling --- def signal_handler(signum, frame): @@ -80,7 +98,7 @@ def signal_handler(signum, frame): # Stop heartbeat thread stop_heartbeat_event.set() if heartbeat_thread and heartbeat_thread.is_alive(): - heartbeat_thread.join(timeout=2) # Give it a moment to stop + heartbeat_thread.join(timeout=2) # Give it a moment to stop # Report termination (best effort) if current_session_id_for_heartbeat: @@ -89,7 +107,7 @@ def signal_handler(signum, frame): status_update="terminated", reason=f"user_terminated_{signal_name.lower()}", details=f"Process terminated by signal {signal_name} ({signum}).", - auth_headers=current_auth_headers_for_heartbeat + auth_headers=current_auth_headers_for_heartbeat, ) # Exit gracefully @@ -264,10 +282,10 @@ def main() -> None: # --- Handle Run Command --- elif args.command == "run": - global heartbeat_thread, current_session_id_for_heartbeat, current_auth_headers_for_heartbeat # Allow modification of globals + global heartbeat_thread, current_session_id_for_heartbeat, current_auth_headers_for_heartbeat # Allow modification of globals - session_id = None # Initialize session_id - optimization_completed_normally = False # Flag for finally block + session_id = None # Initialize session_id + optimization_completed_normally = False # Flag for finally block # --- Check Authentication --- weco_api_key = load_weco_api_key() llm_api_keys = read_api_keys_from_env() # Read keys from client environment @@ -301,7 +319,7 @@ def main() -> None: auth_headers = {} if weco_api_key: auth_headers["Authorization"] = f"Bearer {weco_api_key}" - current_auth_headers_for_heartbeat = auth_headers # Store for signal handler + current_auth_headers_for_heartbeat = auth_headers # Store for signal handler # --- Main Run Logic --- try: @@ -354,10 +372,10 @@ def main() -> None: timeout=timeout, ) session_id = session_response["session_id"] - current_session_id_for_heartbeat = session_id # Store for signal handler/finally + current_session_id_for_heartbeat = session_id # Store for signal handler/finally # --- Start Heartbeat Thread --- - stop_heartbeat_event.clear() # Ensure event is clear before starting + stop_heartbeat_event.clear() # Ensure event is clear before starting heartbeat_thread = HeartbeatSender(session_id, auth_headers, stop_heartbeat_event) heartbeat_thread.start() @@ -457,11 +475,13 @@ def main() -> None: timeout=timeout, ) except (requests.exceptions.HTTPError, requests.exceptions.RequestException) as suggest_error: - # If suggest fails, we consider it a CLI error and break the loop - console.print(f"\n[bold red]Error communicating with API during step {step}. Stopping optimization.[/]") - # Error details should have been printed by handle_api_error or the exception handler in evaluate_feedback_then_suggest_next_solution - # Prepare to report error in finally block - raise RuntimeError(f"API error during suggest at step {step}") from suggest_error + # If suggest fails, we consider it a CLI error and break the loop + console.print( + f"\n[bold red]Error communicating with API during step {step}. Stopping optimization.[/]" + ) + # Error details should have been printed by handle_api_error or the exception handler in evaluate_feedback_then_suggest_next_solution + # Prepare to report error in finally block + raise RuntimeError(f"API error during suggest at step {step}") from suggest_error # Save next solution (.runs//step_.) write_to_path( @@ -564,8 +584,10 @@ def main() -> None: auth_headers=auth_headers, ) except (requests.exceptions.HTTPError, requests.exceptions.RequestException) as suggest_error: - console.print("\n[bold red]Error communicating with API during final evaluation report. Results might be incomplete.[/]") - raise RuntimeError("API error during final suggest call") from suggest_error + console.print( + "\n[bold red]Error communicating with API during final evaluation report. Results might be incomplete.[/]" + ) + raise RuntimeError("API error during final suggest call") from suggest_error # Update the progress bar summary_panel.set_step(step=steps) @@ -644,9 +666,9 @@ def main() -> None: except Exception as e: # Catch errors during the main optimization loop or setup try: - error_message = e.response.json()["detail"] # Try to get API error detail + error_message = e.response.json()["detail"] # Try to get API error detail except Exception: - error_message = str(e) # Otherwise, use the exception string + error_message = str(e) # Otherwise, use the exception string console.print(Panel(f"[bold red]Error: {error_message}", title="[bold red]Optimization Error", border_style="red")) # Print traceback for debugging if needed (can be noisy) # console.print_exception(show_locals=False) @@ -658,7 +680,7 @@ def main() -> None: error_details = traceback.format_exc() # Exit code will be handled by finally block or sys.exit below - exit_code = 1 # Indicate error + exit_code = 1 # Indicate error # No sys.exit here, let finally block run finally: @@ -668,7 +690,7 @@ def main() -> None: stop_heartbeat_event.set() if heartbeat_thread and heartbeat_thread.is_alive(): print("[dim]Waiting for heartbeat thread to finish...[/dim]", file=sys.stderr) - heartbeat_thread.join(timeout=2) # Give it a moment to stop + heartbeat_thread.join(timeout=2) # Give it a moment to stop # Report final status if a session was started if session_id: @@ -681,12 +703,12 @@ def main() -> None: final_reason = "completed_successfully" else: # If an exception was caught and we have details - if 'error_details' in locals(): - final_status = "error" - final_reason = "error_cli_internal" - final_details = error_details + if "error_details" in locals(): + final_status = "error" + final_reason = "error_cli_internal" + final_details = error_details # else: # Should have been handled by signal handler if terminated by user - # Keep default 'unknown' if we somehow end up here without error/completion/signal + # Keep default 'unknown' if we somehow end up here without error/completion/signal # Avoid reporting if terminated by signal handler (already reported) # Check a flag or rely on status not being 'unknown' @@ -696,15 +718,15 @@ def main() -> None: status_update=final_status, reason=final_reason, details=final_details, - auth_headers=auth_headers + auth_headers=auth_headers, ) # Ensure proper exit code if an error occurred - if not optimization_completed_normally and 'exit_code' in locals() and exit_code != 0: - sys.exit(exit_code) + if not optimization_completed_normally and "exit_code" in locals() and exit_code != 0: + sys.exit(exit_code) elif not optimization_completed_normally: - # Generic error exit if no specific code was set but try block failed - sys.exit(1) + # Generic error exit if no specific code was set but try block failed + sys.exit(1) else: - # Normal exit - sys.exit(0) + # Normal exit + sys.exit(0) diff --git a/weco/panels.py b/weco/panels.py index f086aee..7d50d34 100644 --- a/weco/panels.py +++ b/weco/panels.py @@ -253,7 +253,7 @@ def get_display(self, is_done: bool) -> Panel: # Make sure the metric tree is built before calling build_rich_tree return Panel( self._build_rich_tree(), - title="[bold]🔎 Exploring Solutions..." if not is_done else "[bold]🔎 Optimization Complete!", + title=("[bold]🔎 Exploring Solutions..." if not is_done else "[bold]🔎 Optimization Complete!"), border_style="green", expand=True, padding=(0, 1), From adfae56b635d87e9568b63ba0bb90b9ae06f88af Mon Sep 17 00:00:00 2001 From: Yuxiang Wu Date: Thu, 15 May 2025 00:49:28 +0100 Subject: [PATCH 5/7] Refactor: Reduce verbosity of heartbeat thread and termination report logging --- weco/api.py | 18 ++++++------------ weco/cli.py | 6 ------ 2 files changed, 6 insertions(+), 18 deletions(-) diff --git a/weco/api.py b/weco/api.py index e0cfa49..5d709d3 100644 --- a/weco/api.py +++ b/weco/api.py @@ -89,7 +89,7 @@ def evaluate_feedback_then_suggest_next_solution( handle_api_error(e, Console()) # Use default console if none passed raise # Re-raise the exception except requests.exceptions.RequestException as e: - print(f"[bold red]Network Error during suggest: {e}[/]") # Use print as console might not be available + print(f"Network Error during suggest: {e}") # Use print as console might not be available raise # Re-raise the exception @@ -110,7 +110,7 @@ def get_optimization_session_status( handle_api_error(e, Console()) # Use default console raise # Re-raise except requests.exceptions.RequestException as e: - print(f"[bold red]Network Error getting status: {e}[/]") + print(f"Network Error getting status: {e}") raise # Re-raise @@ -127,16 +127,14 @@ def send_heartbeat( except requests.exceptions.HTTPError as e: # Log non-critical errors like 409 Conflict (session not running) if e.response.status_code == 409: - print(f"[yellow]Heartbeat ignored: Session {session_id} is not running.[/yellow]", file=sys.stderr) + print(f"Heartbeat ignored: Session {session_id} is not running.", file=sys.stderr) else: - print( - f"[yellow]Heartbeat failed for session {session_id}: HTTP {e.response.status_code}[/yellow]", file=sys.stderr - ) + print(f"Heartbeat failed for session {session_id}: HTTP {e.response.status_code}", file=sys.stderr) # Don't exit, just report failure return False except requests.exceptions.RequestException as e: # Network errors are also non-fatal for heartbeats - print(f"[yellow]Heartbeat network error for session {session_id}: {e}[/yellow]", file=sys.stderr) + print(f"Heartbeat network error for session {session_id}: {e}", file=sys.stderr) return False @@ -157,12 +155,8 @@ def report_termination( timeout=timeout, ) response.raise_for_status() - print(f"[dim]Termination reported to backend (Status: {status_update}, Reason: {reason}).[/dim]", file=sys.stderr) return True except requests.exceptions.RequestException as e: # Log failure, but don't prevent CLI exit - print( - f"[bold yellow]Warning: Failed to report termination to backend for session {session_id}: {e}[/bold yellow]", - file=sys.stderr, - ) + print(f"Warning: Failed to report termination to backend for session {session_id}: {e}", file=sys.stderr) return False diff --git a/weco/cli.py b/weco/cli.py index 1890cc0..6cea9d6 100644 --- a/weco/cli.py +++ b/weco/cli.py @@ -64,8 +64,6 @@ def __init__(self, session_id: str, auth_headers: dict, stop_event: threading.Ev self.stop_event = stop_event def run(self): - # Using standard print for thread lifecycle messages for robustness - print(f"[HeartbeatSender] Thread for session {self.session_id} started.", file=sys.stderr) try: while not self.stop_event.is_set(): if not send_heartbeat(self.session_id, self.auth_headers): @@ -85,9 +83,6 @@ def run(self): ) traceback.print_exc(file=sys.stderr) # The loop will break due to the exception, and thread will terminate via finally. - finally: - # Ensure this message is printed when the thread is actually stopping - print(f"[HeartbeatSender] Thread for session {self.session_id} stopping.", file=sys.stderr) # --- Signal Handling --- @@ -689,7 +684,6 @@ def main() -> None: # Stop heartbeat thread stop_heartbeat_event.set() if heartbeat_thread and heartbeat_thread.is_alive(): - print("[dim]Waiting for heartbeat thread to finish...[/dim]", file=sys.stderr) heartbeat_thread.join(timeout=2) # Give it a moment to stop # Report final status if a session was started From e6dd7159e32d00bd2eb8e9fd2b0266c59d54fb0d Mon Sep 17 00:00:00 2001 From: Yuxiang Wu Date: Fri, 16 May 2025 21:58:18 +0100 Subject: [PATCH 6/7] Refactor: Centralize package version in pyproject.toml Dynamically load package version in weco/__init__.py from pyproject.toml using importlib.metadata. This removes the need to maintain the version number in multiple locations, making version bumps simpler and less error-prone. --- weco/__init__.py | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/weco/__init__.py b/weco/__init__.py index fc06bd0..a5595f0 100644 --- a/weco/__init__.py +++ b/weco/__init__.py @@ -1,7 +1,8 @@ import os +import importlib.metadata # DO NOT EDIT -__pkg_version__ = "0.2.17" +__pkg_version__ = importlib.metadata.version("weco") __api_version__ = "v1" __base_url__ = f"https://api.weco.ai/{__api_version__}" From 87ab675df8d620cf82eba0f2915f9a6ee7db29c2 Mon Sep 17 00:00:00 2001 From: Yuxiang Wu Date: Fri, 16 May 2025 22:07:06 +0100 Subject: [PATCH 7/7] Update pyproject.toml to version 0.2.18. Refactor heartbeat logic in cli.py to streamline error handling by removing redundant try-except blocks, enhancing code clarity and maintainability. --- pyproject.toml | 10 ++++------ weco/cli.py | 51 ++++++++++++++++++-------------------------------- 2 files changed, 22 insertions(+), 39 deletions(-) diff --git a/pyproject.toml b/pyproject.toml index a6d5c80..eaf764b 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -5,20 +5,18 @@ build-backend = "setuptools.build_meta" [project] name = "weco" -authors = [ - {name = "Weco AI Team", email = "contact@weco.ai"}, -] +authors = [{ name = "Weco AI Team", email = "contact@weco.ai" }] description = "Documentation for `weco`, a CLI for using Weco AI's code optimizer." readme = "README.md" -version = "0.2.17" -license = {text = "MIT"} +version = "0.2.18" +license = { text = "MIT" } requires-python = ">=3.8" dependencies = ["requests", "rich"] keywords = ["AI", "Code Optimization", "Code Generation"] classifiers = [ "Programming Language :: Python :: 3", "Operating System :: OS Independent", - "License :: OSI Approved :: MIT License" + "License :: OSI Approved :: MIT License", ] [project.scripts] diff --git a/weco/cli.py b/weco/cli.py index 6cea9d6..e5ac583 100644 --- a/weco/cli.py +++ b/weco/cli.py @@ -437,8 +437,8 @@ def main() -> None: transition_delay=0.1, ) - # Send initial heartbeat immediately after starting - send_heartbeat(session_id, auth_headers) + # # Send initial heartbeat immediately after starting + # send_heartbeat(session_id, auth_headers) # Run evaluation on the initial solution term_out = run_evaluation(eval_command=args.eval_command) @@ -460,23 +460,14 @@ def main() -> None: ) # Send feedback and get next suggestion - try: - eval_and_next_solution_response = evaluate_feedback_then_suggest_next_solution( - session_id=session_id, - execution_output=term_out, - additional_instructions=current_additional_instructions, # Pass current instructions - api_keys=llm_api_keys, # Pass client LLM keys - auth_headers=auth_headers, # Pass Weco key if logged in - timeout=timeout, - ) - except (requests.exceptions.HTTPError, requests.exceptions.RequestException) as suggest_error: - # If suggest fails, we consider it a CLI error and break the loop - console.print( - f"\n[bold red]Error communicating with API during step {step}. Stopping optimization.[/]" - ) - # Error details should have been printed by handle_api_error or the exception handler in evaluate_feedback_then_suggest_next_solution - # Prepare to report error in finally block - raise RuntimeError(f"API error during suggest at step {step}") from suggest_error + eval_and_next_solution_response = evaluate_feedback_then_suggest_next_solution( + session_id=session_id, + execution_output=term_out, + additional_instructions=current_additional_instructions, # Pass current instructions + api_keys=llm_api_keys, # Pass client LLM keys + auth_headers=auth_headers, # Pass Weco key if logged in + timeout=timeout, + ) # Save next solution (.runs//step_.) write_to_path( @@ -569,20 +560,14 @@ def main() -> None: ) # Final evaluation report - try: - eval_and_next_solution_response = evaluate_feedback_then_suggest_next_solution( - session_id=session_id, - execution_output=term_out, - additional_instructions=current_additional_instructions, - api_keys=llm_api_keys, - timeout=timeout, - auth_headers=auth_headers, - ) - except (requests.exceptions.HTTPError, requests.exceptions.RequestException) as suggest_error: - console.print( - "\n[bold red]Error communicating with API during final evaluation report. Results might be incomplete.[/]" - ) - raise RuntimeError("API error during final suggest call") from suggest_error + eval_and_next_solution_response = evaluate_feedback_then_suggest_next_solution( + session_id=session_id, + execution_output=term_out, + additional_instructions=current_additional_instructions, + api_keys=llm_api_keys, + timeout=timeout, + auth_headers=auth_headers, + ) # Update the progress bar summary_panel.set_step(step=steps)