diff --git a/examples/agent/README.md b/examples/agent/README.md index a1cadf4..69a9ab8 100644 --- a/examples/agent/README.md +++ b/examples/agent/README.md @@ -57,6 +57,12 @@ fractale agent --plan ./plans/run-lammps.yaml # or try using with the cache fractale agent --plan ./plans/run-lammps.yaml --use-cache + +# Save metadata +fractale agent --plan ./plans/run-lammps.yaml --results ./results + +# Save metadata and include incremental results +fractale agent --plan ./plans/run-lammps.yaml --results ./results --incremental ``` We haven't hit the case yet where the manager needs to take over - that needs further development, along with being goal oriented (e.g., parsing a log and getting an output). @@ -66,11 +72,17 @@ We haven't hit the case yet where the manager needs to take over - that needs fu #### To do items - Figure out optimization agent (with some goal) +- Right now when we restart, we do with fresh slate (no log memory) - should there be? +- We likely want some want to quantify the amount of change between prompts, and the difficulty of the task. +- I think likely when we return to the manager, we want the last response (that might say why it is returning) should inform step selection. But not just step selection, the updated prompt to the step missing something. + - Right now we rely on random sampling of the space to avoid whatever the issue might be. #### Research Questions **And experiment ideas** +- Why does it make the same mistakes? E.g., always forgetting ca-certificates. Did it learn from data that was OK to do and thus errors result from inconsistencies between the way things used to work and the way they do now? +- Insight: if I don't know how to run an app, it's unlikely the LLM can do it, because I can't give any guidance (and it guesses) - How do we define stability? - What are the increments of change (e.g., "adding a library")? We should be able to keep track of times for each stage and what changed, and an analyzer LLM can look at result and understand (categorize) most salient contributions to change. - We also can time the time it takes to do subsequent changes, when relevant. For example, if we are building, we should be able to use cached layers (and the build times speed up) if the LLM is changing content later in the Dockerfile. diff --git a/fractale/agent/base.py b/fractale/agent/base.py index 22ccecd..1ec97dd 100644 --- a/fractale/agent/base.py +++ b/fractale/agent/base.py @@ -1,6 +1,8 @@ -import json +import copy import os +import re import sys +import time import google.generativeai as genai @@ -8,6 +10,7 @@ import fractale.agent.logger as logger import fractale.utils as utils from fractale.agent.context import get_context +from fractale.agent.decorators import save_result, timed class Agent: @@ -22,28 +25,37 @@ class Agent: """ # name and description should be on the class + state_variables = ["result", "error_message"] - def __init__(self, use_cache=False): + def __init__( + self, use_cache=False, results_dir=None, save_incremental=False, max_attempts=None + ): + self.attempts = 0 + self.max_attempts = max_attempts - # Max attempts defaults to unlimited - # We start counting at 1 for the user to see. - # Eat your heart out, Matlab. - self.attempts = 1 - self.max_attempts = None + # For now, assume these are for the manager. + # They get added to other agents via the step creation + # We can optionally save incremental result objects + self.results_dir = results_dir or os.getcwd() + self.save_incremental = save_incremental # The user can save if desired - caching the context to skip steps that already run. self.setup_cache(use_cache) + # This supports saving custom logs and step (attempt) metadata + self.init_metadata() + # Custom initialization functions self.init() + def init_metadata(self): + self.metadata = {"times": {}, "assets": {}, "ask_gemini": [], "retries": 0, "failures": []} + + @save_result def run(self, context): """ Run the agent - a wrapper around internal function _run that prepares it. """ - # Init attempts. Each agent has an internal counter for total attempts - self.attempts = self.attempts or 1 - # Load cached context. This is assumed to override user provided args # If we have a saved context, we assume we want to use it, return early cached_context = self.load_cache() @@ -57,7 +69,8 @@ def run(self, context): context = get_context(context) # Run, wrapping with a load and save of cache - context = self._run(context) + # This will return here when the internal loop is done + context = self.run_step(context) self.save_cache(context) return context @@ -70,6 +83,32 @@ def print_result(self, result): """ pass + def reset_context(self, context): + """ + Remove output and any stateful variables. This is assuming we + are starting again. + """ + for key in self.state_variables: + if key in context: + del context[key] + + # Since we will try again, let's move current metadata into a subsection + metadata = copy.deepcopy(self.metadata) + + # We don't want this to recurse forever + failures = metadata.get("failures") or [] + if "failures" in metadata: + del metadata["failures"] + failures.append(metadata) + + # Reset metadata, save retries + self.init_metadata() + self.metadata["failures"] = failures + self.metadata["retries"] = metadata["retries"] + + # We don't need a return here, but let's be explicit + return context + def setup_cache(self, use_cache=False): """ Setup (or load) a cache. @@ -123,10 +162,7 @@ def reached_max_attempts(self): # Unset (None) or 1. if not self.max_attempts: return False - return self.attempts >= self.max_attempts - - def set_max_attempts(self, max_attempts): - self.max_attempts = max_attempts + return self.attempts > self.max_attempts def add_shared_arguments(self, agent): """ @@ -190,29 +226,25 @@ def get_code_block(self, content, code_type): """ Parse a code block from the response """ + pattern = f"```(?:{code_type})?\n(.*?)```" + match = re.search(pattern, content, re.DOTALL) + if match: + return match.group(1).strip() if content.startswith(f"```{code_type}"): content = content[len(f"```{code_type}") :] if content.startswith("```"): content = content[len("```") :] if content.endswith("```"): content = content[: -len("```")] - return content + return content.strip() - def _run(self, context): + def run_step(self, context): """ Run the agent. This expects to be called with a loaded context. """ assert context raise NotImplementedError(f"The {self.name} agent is missing internal 'run' function") - def get_initial_prompt(self, context): - """ - Get the initial prompt (with details) to provide context to the manager. - - If we don't do this, the manager can provide a bad instruction for how to fix the error. - """ - return self.get_prompt(context) - def get_prompt(self, context): """ This function should take the same context as run and return the parsed prompt that @@ -235,15 +267,21 @@ def init(self): except KeyError: sys.exit("ERROR: GEMINI_API_KEY environment variable not set.") + # We don't add timed here because we do it custom def ask_gemini(self, prompt, with_history=True): """ Ask gemini adds a wrapper with some error handling. """ try: + start = time.perf_counter() if with_history: response = self.chat.send_message(prompt) else: response = self.model.generate_content(prompt) + end = time.perf_counter() + + if self.save_incremental: + self.save_gemini_metadata(end - start, response, with_history) # This line can fail. If it succeeds, return entire response return response.text.strip() @@ -251,3 +289,17 @@ def ask_gemini(self, prompt, with_history=True): except ValueError as e: print(f"[Error] The API response was blocked and contained no text: {str(e)}") return "GEMINI ERROR: The API returned an error (or stop) and we need to try again." + + def save_gemini_metadata(self, elapsed_time, response, with_history): + """ + Save gemini response metadata and elapsed time + """ + self.metadata["ask_gemini"].append( + { + "conversation_history": with_history, + "prompt_token_count": response.usage_metadata.prompt_token_count, + "candidates_token_count": response.usage_metadata.candidates_token_count, + "total_token_count": response.usage_metadata.total_token_count, + "time_seconds": elapsed_time, + } + ) diff --git a/fractale/agent/build/agent.py b/fractale/agent/build/agent.py index c165eca..8990432 100644 --- a/fractale/agent/build/agent.py +++ b/fractale/agent/build/agent.py @@ -1,5 +1,6 @@ from fractale.agent.base import GeminiAgent import fractale.agent.build.prompts as prompts +from fractale.agent.decorators import timed from fractale.agent.context import get_context from fractale.agent.errors import DebugAgent import fractale.agent.logger as logger @@ -18,10 +19,6 @@ import textwrap -# regular expression in case LLM does not follow my instructions! -dockerfile_pattern = r"```(?:dockerfile)?\n(.*?)```" - - class BuildAgent(GeminiAgent): """ Builder agent. @@ -33,6 +30,8 @@ class BuildAgent(GeminiAgent): name = "build" description = "builder agent" + state_variables = ["result", "dockerfile", "error_message"] + result_type = "dockerfile" def _add_arguments(self, subparser): """ @@ -55,6 +54,12 @@ def _add_arguments(self, subparser): "--environment", help="Environment description to build for (defaults to generic)", ) + build.add_argument( + "--load", + help="Load into kind on success.", + default=False, + action="store_true", + ) return build def get_prompt(self, context): @@ -85,7 +90,8 @@ def filter_output(self, output): regex = "(%s)" % "|".join(skips) return "\n".join([x for x in output.split("\n") if not re.search(regex, x)]) - def _run(self, context): + @timed + def run_step(self, context): """ Run the agent. @@ -113,6 +119,7 @@ def _run(self, context): if return_code == 0: self.print_result(context.result) logger.success(f"Build complete in {self.attempts} attempts") + self.load(context) else: # Filter out likely not needed lines (ubuntu install) output = self.filter_output(output) @@ -122,12 +129,14 @@ def _run(self, context): # Ask the debug agent to better instruct the error message # This becomes a more guided output context.error_message = output - agent = DebugAgent() + # This updates the error message to be the output - context = agent.run(context, requires=prompts.requires) + context = DebugAgent().run(context, requires=prompts.requires) + print("\n[bold cyan] Requesting Correction from Build Agent[/bold cyan]") # If we have reached the max attempts... - if self.reached_max_attempts(): + if self.reached_max_attempts() or context.get("return_to_manager") is True: + context.return_to_manager = False # If we are being managed, return the result if context.is_managed(): @@ -139,10 +148,9 @@ def _run(self, context): logger.exit(f"Max attempts {self.max_attempts} reached.", title="Agent Failure") self.attempts += 1 - print("\n[bold cyan] Requesting Correction from Build Agent[/bold cyan]") # Update the context with error message - return self.run(context) + return self.run_step(context) # Add generation line self.write_file(context, context.result) @@ -151,6 +159,25 @@ def _run(self, context): # unless we are being managed return context + @timed + def load(self, context): + """ + If specified, load into kind. + """ + if not context.get("load") is True: + return + + logger.info("Loading into kind...") + p = subprocess.run( + ["kind", "load", "docker-image", context.container], + capture_output=True, + text=True, + check=False, + ) + if p.returncode != 0: + output = p.stdout + p.stderr + logger.warning(f"Issue with kind load: {output}") + def print_result(self, dockerfile): """ Print Dockerfile with highlighted Syntax @@ -183,6 +210,7 @@ def generate_name(self, name): name = name + "c" return name.lower() + @timed def build(self, context): """ Build the Dockerfile! Yolo! @@ -219,6 +247,18 @@ def build(self, context): shutil.rmtree(build_dir, ignore_errors=True) return (p.returncode, p.stdout + p.stderr) + def save_dockerfile(self, dockerfile): + """ + Save logs to metadata + """ + if self.save_incremental: + if "dockerfile" not in self.metadata["assets"]: + self.metadata["assets"]["dockerfile"] = [] + self.metadata["assets"]["dockerfile"].append( + {"item": dockerfile, "attempt": self.attempts} + ) + + @timed def generate_dockerfile(self, context): """ Generates or refines a Dockerfile using the Gemini API. @@ -233,14 +273,14 @@ def generate_dockerfile(self, context): # Try to remove Dockerfile from code block try: - content = self.get_code_block(content, "dockerfile") - - # If we are getting commentary... - match = re.search(dockerfile_pattern, content, re.DOTALL) + # This can be provided as docker or dockerfile + pattern = "```(?:docker|dockerfile)?\n(.*?)```" + match = re.search(pattern, content, re.DOTALL) if match: dockerfile = match.group(1).strip() else: - dockerfile = content.strip() + dockerfile = self.get_code_block(content, "dockerfile") + self.save_dockerfile(dockerfile) # The result is saved as a build step # The dockerfile is the argument used internally diff --git a/fractale/agent/build/prompts.py b/fractale/agent/build/prompts.py index 20f46e3..a44e855 100644 --- a/fractale/agent/build/prompts.py +++ b/fractale/agent/build/prompts.py @@ -9,6 +9,7 @@ - Assume a default of CPU if GPU or CPU is not stated. - Do not do a multi-stage build, and do not COPY or ADD anything. - Try to place executables on the PATH so they do not need to be discovered. +- You are only scoped to edit a Dockerfile to build the image. """ common_instructions = ( diff --git a/fractale/agent/decorators.py b/fractale/agent/decorators.py new file mode 100644 index 0000000..3831f3c --- /dev/null +++ b/fractale/agent/decorators.py @@ -0,0 +1,52 @@ +import functools +import time + + +def save_result(func): + """ + Save the final result, with total elapsed time + """ + + @functools.wraps(func) + def wrapper(self, *args, **kwargs): + # This is the original function + start = time.perf_counter() + context = func(self, *args, **kwargs) + # Get the result and pass to the callback! + end = time.perf_counter() + elapsed_time = end - start + final_result = context.get("result") + # Cut out early if missing result (e.g., failure) + if not self.save_incremental or not final_result: + return context + # This can be called more than once if nested + self.metadata["result"] = { + "item": final_result, + "total_seconds": elapsed_time, + "type": self.result_type, + } + return context + + return wrapper + + +def timed(func): + """ + Timed decorator that adds timed executions for different functions + """ + + @functools.wraps(func) + def wrapper(self, *args, **kwargs): + + # This is the original function + start = time.perf_counter() + result = func(self, *args, **kwargs) + end = time.perf_counter() + elapsed_time = end - start + name = f"{func.__name__}_seconds" + if name not in self.metadata["times"]: + self.metadata["times"][name] = [] + self.metadata["times"][name].append(elapsed_time) + return result + + return wrapper diff --git a/fractale/agent/defaults.py b/fractale/agent/defaults.py index 752d8b9..214a5b1 100644 --- a/fractale/agent/defaults.py +++ b/fractale/agent/defaults.py @@ -3,13 +3,15 @@ # These are common / default args we don't need to give in any prompt. shared_args = { - "debug", - "quiet", - "config_dir", - "version", "command", - "plan", + "config_dir", + "debug", + "error_message", + "incremental", "outfile", + "plan", + "quiet", "result", - "error_message", + "results", + "version", } diff --git a/fractale/agent/errors/agent.py b/fractale/agent/errors/agent.py index 4852e34..96637fe 100644 --- a/fractale/agent/errors/agent.py +++ b/fractale/agent/errors/agent.py @@ -46,6 +46,25 @@ def run(self, context, requires=None): print("Received debugging advice from Gemini...") logger.custom(content, title="[green]Debug Advice[/green]", border_style="green") + # Do we allow interactive and have a request for it? + if "INTERACTIVE SESSION" in content and context.get("allow_interactive") is True: + logger.custom( + "Error debugging agent has recommended interactive session", + title="[blue]Debug Advice[/blue]", + border_style="blue", + ) + context.interactive = True + + # Do we have instructions to return to the manager? + if "RETURN TO MANAGER" in content: + logger.custom( + "Error debugging agent has recommended return to manager", + title="[blue]Debug Advice[/blue]", + border_style="blue", + ) + content = content.replace("RETURN TO MANAGER", "") + context.return_to_manager = True + # The tweaked output as the advice for next step (instead of entire error output) context.error_message = content diff --git a/fractale/agent/errors/prompts.py b/fractale/agent/errors/prompts.py index ae65bf6..9570193 100644 --- a/fractale/agent/errors/prompts.py +++ b/fractale/agent/errors/prompts.py @@ -1,7 +1,14 @@ import fractale.agent.defaults as defaults debug_prompt = f"""You are a debugging agent and expert. We attempted the following piece of code and had problems. -Please identify the error and advise for how to fix the error.""" +Please identify the error and advise for how to fix the error. The agent you are returning to can only make scoped +changes, which we provide below. If you determine the issue cannot be resolved by changing one of these files, +we will need to return to another agent. In this case, please provide "RETURN TO MANAGER" anywhere in your response. +""" + +interactive_prompt = """ If there is information the step cannot know such as runtime parameters or data paths" +and the fix is not obvious (and you are guessing) please provide "INTERACTIVE SESSION" anywhere in your response +""" def get_debug_prompt(context, requires): @@ -13,6 +20,9 @@ def get_debug_prompt(context, requires): code_block = context.get("result", required=True) prompt = debug_prompt + if context.get("allow_interactive") is True: + prompt += interactive_prompt + # Requirements are specific constraints to give to the error agent if requires: prompt += requires + "\n" diff --git a/fractale/agent/kubernetes_job/agent.py b/fractale/agent/kubernetes_job/agent.py index 9e5743f..8dbcf11 100644 --- a/fractale/agent/kubernetes_job/agent.py +++ b/fractale/agent/kubernetes_job/agent.py @@ -13,24 +13,15 @@ from rich import print from rich.syntax import Syntax +import fractale.agent.kubernetes_job.objects as objects import fractale.agent.kubernetes_job.prompts as prompts import fractale.agent.logger as logger import fractale.utils as utils from fractale.agent.base import GeminiAgent from fractale.agent.context import get_context +from fractale.agent.decorators import timed from fractale.agent.errors import DebugAgent -yaml_pattern = r"```(?:yaml)?\n(.*?)```" - -# All the ways a container can go wrong... (upside down smiley face) -container_issues = [ - "ImagePullBackOff", - "ErrImagePull", - "ErrImageNeverPull", - "CrashLoopBackOff", - "CreateContainerConfigError", -] - class KubernetesJobAgent(GeminiAgent): """ @@ -39,9 +30,7 @@ class KubernetesJobAgent(GeminiAgent): name = "kubernetes-job" description = "Kubernetes Job agent" - - # Arbitrary max tries for class... - max_tries = 25 + result_type = "kubernetes-job-manifest" def _add_arguments(self, subparser): """ @@ -84,7 +73,8 @@ def get_prompt(self, context): prompt = prompts.get_generate_prompt(context) return prompt - def _run(self, context): + @timed + def run_step(self, context): """ Run the agent. """ @@ -92,7 +82,7 @@ def _run(self, context): context = self.add_build_context(context) # This will either generate fresh or rebuild erroneous Job - job_crd = self.generate_crd(context) + job_crd = self.generate_job_manifest(context) logger.custom(job_crd, title="[green]job.yaml[/green]", border_style="green") # Make and deploy it! Success is exit code 0. @@ -101,26 +91,34 @@ def _run(self, context): self.print_result(job_crd) logger.success(f"Deploy complete in {self.attempts} attempts") else: - logger.error(f"Build failed:\n{output[-1000:]}", title="Deploy Status") + logger.error(f"Deploy failed:\n{output[-1000:]}", title="Deploy Status") print("\n[bold cyan] Requesting Correction from Kubernetes Job Agent[/bold cyan]") - self.attempts += 1 # Ask the debug agent to better instruct the error message context.error_message = output - agent = DebugAgent() + # This updates the error message to be the output - context = agent.run(context, requires=prompts.requires) + context = DebugAgent().run(context, requires=prompts.requires) # Return early based on max attempts - # if self.return_on_failure(): - # context.return_code = -1 - # context.result = output - # return self.get_result(context) + if self.reached_max_attempts() or context.get("return_to_manager") is True: + context.return_to_manager = False + + # If we are being managed, return the result + if context.is_managed(): + context.return_code = -1 + context.result = context.error_message + return context + + # Otherwise this is a failure state + logger.exit(f"Max attempts {self.max_attempts} reached.", title="Agent Failure") + + self.attempts += 1 # Trigger again, provide initial context and error message # This is the internal loop running, no manager agent context.result = job_crd - return self.run(context) + return self.run_step(context) self.write_file(context, job_crd) return context @@ -146,174 +144,30 @@ def print_result(self, job_crd): highlighted_syntax, title="Final Kubernetes Job", border_style="green", expand=True ) - def get_diagnostics(self, job_name, namespace): + def get_diagnostics(self, job, pod): """ Helper to collect rich error data for a failed job. """ print("[yellow]Gathering diagnostics for failed job...[/yellow]") - - describe_job_cmd = ["kubectl", "describe", "job", job_name, "-n", namespace] - job_description = subprocess.run( - describe_job_cmd, capture_output=True, text=True, check=False - ).stdout - - describe_pods_cmd = [ - "kubectl", - "describe", - "pod", - "-l", - f"job-name={job_name}", - "-n", - namespace, - ] - pods_description = subprocess.run( - describe_pods_cmd, capture_output=True, text=True, check=False - ).stdout - - get_events_cmd = ["kubectl", "get", "events", "-n", namespace, "--sort-by=lastTimestamp"] - events = subprocess.run(get_events_cmd, capture_output=True, text=True, check=False).stdout - return prompts.meta_bundle % (job_description, pods_description, events) - - def wait_for_pod_complete(self, pod_name, namespace): - """ - Wait for a pod to be ready. - """ - for j in range(self.max_tries): - pod_status = self.pod_status(pod_name, namespace) - pod_phase = pod_status.get("phase") - - # Let's assume when we are running the pod is ready for logs. - # If not, we need to check container statuses too. - if pod_phase in ["Succeeded", "Failed"]: - return True - - print( - f"[dim]Pod '{pod_name}' has status '{pod_phase}'. Waiting... ({j+1}/{self.max_tries})[/dim]" - ) - time.sleep(2) - - # If we get here, fail and timeout - print(f"[red]Pod '{pod_name}' never reached completed status, state is unknown[/red]") - return False - - def pod_status(self, pod_name, namespace): - """ - Get pod status (subset of info) - """ - return self.pod_info(pod_name, namespace).get("status", {}) - - def pod_info(self, pod_name, namespace): - """ - Helper function to get pod status - """ - # 25 x 5 seconds == 10 minutes - for _ in range(self.max_tries): - pod_proc = subprocess.run( - ["kubectl", "get", "pod", pod_name, "-n", namespace, "-o", "json"], - capture_output=True, - text=True, - check=False, - ) - if pod_proc.returncode != 0: - time.sleep(5) - continue - - return json.loads(pod_proc.stdout) - - def wait_for_pod_ready(self, pod_name, namespace): - """ - Wait for a pod to be ready. - """ - for _ in range(self.max_tries): - pod_status = self.pod_status(pod_name, namespace) - pod_phase = pod_status.get("phase") - - # Let's assume when we are running the pod is ready for logs. - # If not, we need to check container statuses too. - if pod_phase == "Running": - print(f"[green]Pod '{pod_name}' entered running phase.[/green]") - return True - - if pod_phase in ["Succeeded", "Failed"]: - print( - f"[yellow]Pod '{pod_name}' entered terminal phase '{pod_phase}' before logging could start.[/yellow]" - ) - return True - - # If we get here, not ready - sleep and try again. - print( - f"[dim]Pod '{pod_name}' has status '{pod_phase}'. Waiting... ({j+1}/{self.max_tries})[/dim]" - ) - time.sleep(25) - - # If we get here, fail and timeout - print(f"[red]Pod '{pod_name}' never reached running status, state is unknown[/red]") - return False - - def wait_for_job(self, job_name, namespace): - """ - Wait for a job to be active and fail or succeed. - """ - is_active, is_failed, is_succeeded = False, False, False - - # Poll for 10 minutes. This assumes a large container that needs to pull - for i in range(60): # 60 * 10s = 600s = 10 minutes - get_status_cmd = ["kubectl", "get", "job", job_name, "-n", namespace, "-o", "json"] - status_process = subprocess.run( - get_status_cmd, capture_output=True, text=True, check=False - ) - if status_process.returncode != 0: - time.sleep(10) - continue - - status = json.loads(status_process.stdout).get("status", {}) - if status.get("succeeded", 0) > 0: - print("[green]✅ Job succeeded before log streaming began.[/green]") - is_succeeded = True - break - - if status.get("failed", 0) > 0: - print("[red]❌ Job entered failed state.[/red]") - is_failed = True - break - - if status.get("active", 0) > 0: - print("[green]Job is active. Attaching to logs...[/green]") - is_active = True - break - - print(f"[dim]Still waiting... ({i+1}/30)[/dim]") - time.sleep(10) - return is_active, is_failed, is_succeeded - - def get_pod_name_for_job(self, job_name, namespace): - """ - Find the name of the pod created by a specific job. - """ - cmd = [ - "kubectl", - "get", - "pods", - "-n", - namespace, - "-l", - f"job-name={job_name}", - "-o", - "jsonpath={.items[0].metadata.name}", - ] - proc = subprocess.run(cmd, capture_output=True, text=True, check=False) - return proc.stdout.strip() or None - - def cleanup_job(self, job_name, namespace): - """ - Delete job so we can create again. - """ - subprocess.run( - ["kubectl", "delete", "job", job_name, "-n", namespace, "--ignore-not-found"], - capture_output=True, - check=False, - ) - + pod_status = pod.get_filtered_status() + job_status = job.get_filtered_status() + + # Use json.dumps because it's more compact (maybe fewer tokens) + pod_events = pod.get_events() + job_events = job.get_events() + events = sorted(job_events + pod_events, key=lambda e: e.get("lastTimestamp", "")) + job_description = json.dumps(job_status) + pods_description = json.dumps(pod_status) + events_description = json.dumps(events) + full_logs = job.get_logs() + + # Get job and pod events, add lgs if we have them. + diagnostics = prompts.meta_bundle % (job_description, pods_description, events_description) + if full_logs: + return diagnostics + full_logs + return diagnostics + + @timed def deploy(self, context): """ Deploy the Kubernetes Job. @@ -335,7 +189,7 @@ def deploy(self, context): job_name = job_data.get("metadata", {}).get("name") namespace = job_data.get("metadata", {}).get("namespace", "default") if not job_name: - return (1, f"Generated YAML is missing required '.metadata.name' field.") + return 1, "Generated YAML is missing required '.metadata.name' field." # If it doesn't follow instructions... containers = ( @@ -358,6 +212,17 @@ def deploy(self, context): deploy_dir = tempfile.mkdtemp() print(f"[dim]Created temporary deploy context: {deploy_dir}[/dim]") + # The debugger can decide to give the user an interactive session + # For interactive, we set the command to sleep infinity + if context.get("interactive") is True: + logger.info( + f"Starting Interative Debugging Session: job manifest at => {job_manifest_path}\nType 'exit' when done." + ) + command = job_data["spec"]["template"]["spec"]["containers"][0]["command"] + logger.custom(f" Initial command: {command}") + job_data["spec"]["template"]["spec"]["containers"][0]["command"] = ["sleep", "infinity"] + job_crd = yaml.dump(job_data) + # Write the manifest to a temporary directory job_manifest_path = os.path.join(deploy_dir, "job.yaml") utils.write_file(job_crd, job_manifest_path) @@ -377,17 +242,27 @@ def deploy(self, context): print("[green]✅ Manifest applied successfully.[/green]") + # For interactive, we set the command to sleep infinity + if context.get("interactive") is True: + context.interactive = False + import IPython + + IPython.embed() + # 2. We then need to wait until the job is running or fails print("[yellow]Waiting for Job to start... (Timeout: 5 minutes)[/yellow]") - pod_name = None + + # Create job objects (and eventually pod) + job = objects.KubernetesJob(job_name, namespace) + pod = None # This assumes a backoff / retry of 1, so we aren't doing recreation # If it fails once, it fails once and for all. - # 60 * 5s = 300s (5 minutes!) - for i in range(60): + # 30 * 5s = 150s (2.5 minutes!) + for i in range(30): # 1. Check the parent Job's status for a quick terminal state - job_status = self.get_job_status(job_name, namespace) + job_status = job.get_status() if job_status and job_status.get("succeeded", 0) > 0: # The job is done, try to get logs and report success print("[green]✅ Job has Succeeded.[/green]") @@ -396,20 +271,19 @@ def deploy(self, context): # Womp womp if job_status.get("failed", 0) > 0: logger.error("Job reports Failed.", title="Job Status") - diagnostics = self.get_diagnostics(job_name, namespace) - self.cleanup_job(job_name, namespace) + diagnostics = self.get_diagnostics(job, pod) + job.delete() return ( 1, f"Job entered failed state. This usually happens after repeated pod failures.\n\n{diagnostics}", ) # 2. If the job isn't terminal, find the pod. It may not exist yet. - if not pod_name: - pod_name = self.get_pod_name_for_job(job_name, namespace) + pod = pod or job.get_pod_name() # 3. If a pod exists, inspect it deeply for fatal errors or readiness. - if pod_name: - pod_info = self.pod_info(pod_name, namespace) + if pod: + pod_info = pod.get_info() if pod_info: pod_status = pod_info.get("status", {}) pod_phase = pod_status.get("phase") @@ -419,41 +293,34 @@ def deploy(self, context): if pod_phase == "Running": container_statuses = pod_status.get("containerStatuses", []) if all(cs.get("ready") for cs in container_statuses): - print(f"[green]✅ Pod '{pod_name}' is Ready.[/green]") + print(f"[green]✅ Pod '{pod.name}' is Ready.[/green]") break # If the pod succeeded already, we can also proceed... if pod_phase == "Succeeded": - print(f"[green]✅ Pod '{pod_name}' has Succeeded.[/green]") + print(f"[green]✅ Pod '{pod.name}' has Succeeded.[/green]") break # This is important because a pod can be active, but then go into a crashed state - container_statuses = pod_status.get("containerStatuses", []) - for cs in container_statuses: - if cs.get("state", {}).get("waiting"): - reason = cs["state"]["waiting"].get("reason") - if reason in container_issues: - message = cs["state"]["waiting"].get("message") - logger.error( - f"Pod has a fatal container status: {reason}", title="Pod Error" - ) - diagnostics = self.get_diagnostics(job_name, namespace) - self.cleanup_job(job_name, namespace) - return ( - 1, - f"Pod '{pod_name}' is stuck in a fatal state: {reason}\nMessage: {message}\n\n{diagnostics}", - ) + # We provide the status that coincides with our info query to be consistent + if reason := pod.has_failed_container(pod_status): + diagnostics = self.get_diagnostics(job, pod) + job.delete() + return ( + 1, + f"Pod '{pod.name}' is stuck in a fatal state: {reason}\n\n{diagnostics}", + ) print( - f"[dim]Job is active, Pod '{pod_name}' has status '{pod_phase}'. Waiting... ({i+1}/60)[/dim]" + f"[dim]Job is active, Pod '{pod.name}' has status '{pod_phase}'. Waiting... ({i+1}/60)[/dim]" ) # This means we saw the pod name, but didn't get pod info / it disappeared - let loop continue else: print( - f"[dim]Job is active, but Pod '{pod_name}' disappeared. Waiting for new pod... ({i+1}/60)[/dim]" + f"[dim]Job is active, but Pod '{pod.name}' disappeared. Waiting for new pod... ({i+1}/60)[/dim]" ) - pod_name = None + pod = None # No pod yet, keep waiting. else: @@ -463,7 +330,7 @@ def deploy(self, context): # This gets hit when the loop is done, so we probably have a timeout else: - diagnostics = self.get_diagnostics(job_name, namespace) + diagnostics = self.get_diagnostics(job, pod) return ( 1, f"Timeout: Job did not reach a stable running or completed state within the time limit.\n\n{diagnostics}", @@ -471,80 +338,59 @@ def deploy(self, context): # Let's try to stream logs! print("[green]🚀 Proceeding to stream logs...[/green]") - full_logs = self.get_job_logs(job_name, namespace) + full_logs = job.get_logs() + pod.wait_for_ready() - # Wait for pod to be ready, then we can get logs - self.wait_for_pod_ready(pod_name, namespace) + # Save logs regardless of success or not (so we see change) + self.save_log(full_logs) # The above command will wait for the job to complete, so this should be OK to do. is_active = True while is_active: - final_status = self.get_job_status(job_name, namespace) + final_status = job.get_status() is_active = final_status.get("active", 0) > 0 time.sleep(5) # But did it succeed? if final_status.get("succeeded", 0) > 0: - print(f"\n[green]✅ Job final status is Succeeded.[/green]") + print("\n[green]✅ Job final status is Succeeded.[/green]") else: print("\n[red]❌ Job final status is Failed.[/red]") - diagnostics = self.get_diagnostics(job_name, namespace) - self.cleanup_job(job_name, namespace) + diagnostics = self.get_diagnostics(job, pod) + job.delete() # We already have the logs, so we can pass them directly. - return 1, prompts.failure_message % (diagnostics, full_logs) + return 1, prompts.failure_message % diagnostics if cleanup and os.path.exists(deploy_dir): print(f"[dim]Cleaning up temporary deploy directory: {deploy_dir}[/dim]") - self.cleanup_job(job_name, namespace) + job.delete() shutil.rmtree(deploy_dir, ignore_errors=True) + + # Save full logs for the step return 0, full_logs - def get_job_logs(self, job_name, namespace): + def save_log(self, full_logs): """ - Get the logs of a pod. + Save logs to metadata """ - full_logs = "" - # We use the job selector to get logs, which is more robust if the pod was recreated. - log_cmd = ["kubectl", "logs", "-f", f"job/{job_name}", "-n", namespace] - with subprocess.Popen( - log_cmd, - stdout=subprocess.PIPE, - stderr=subprocess.STDOUT, - text=True, - bufsize=1, - universal_newlines=True, - ) as log_process: - # We can add a timeout to the log streaming itself if needed - # For now, we wait for it to complete naturally. - full_logs = "".join(log_process.stdout) - return full_logs - - def get_job_status(self, job_name, namespace): - """ - Get job status - """ - final_status_proc = subprocess.run( - ["kubectl", "get", "job", job_name, "-n", namespace, "-o", "json"], - capture_output=True, - text=True, - check=False, - ) - return json.loads(final_status_proc.stdout).get("status", {}) + if self.save_incremental: + if "logs" not in self.metadata["assets"]: + self.metadata["assets"]["logs"] = [] + self.metadata["assets"]["logs"].append({"item": full_logs, "attempt": self.attempts}) - def get_job_status(self, job_name, namespace): + def save_job_manifest(self, job): """ - Get the job status, return None if not possible. + Save job manifest to metadata """ - job_info = subprocess.run( - ["kubectl", "get", "job", job_name, "-n", namespace, "-o", "json"], - capture_output=True, - text=True, - check=False, - ) - if job_info.returncode == 0: - return json.loads(job_info.stdout).get("status", {}) + if self.save_incremental: + if self.result_type not in self.metadata["assets"]: + self.metadata["assets"][self.result_type] = [] + self.metadata["assets"][self.result_type].append( + {"item": job, "attempt": self.attempts} + ) - def generate_crd(self, context): + @timed + def generate_job_manifest(self, context): """ Generates or refines an existing Job CRD using the Gemini API. """ @@ -557,15 +403,9 @@ def generate_crd(self, context): # Try to remove code (Dockerfile, manifest, etc.) from the block try: - content = self.get_code_block(content, "yaml") - - # If we are getting commentary... - match = re.search(yaml_pattern, content, re.DOTALL) - if match: - job_crd = match.group(1).strip() - else: - job_crd = content.strip() + job_crd = self.get_code_block(content, "yaml") context.result = job_crd + self.save_job_manifest(job_crd) return job_crd except Exception as e: diff --git a/fractale/agent/kubernetes_job/objects.py b/fractale/agent/kubernetes_job/objects.py new file mode 100644 index 0000000..764c9fa --- /dev/null +++ b/fractale/agent/kubernetes_job/objects.py @@ -0,0 +1,289 @@ +import json +import subprocess +import time + +from rich import print + +import fractale.agent.logger as logger + +# All the ways a container can go wrong... (upside down smiley face) +container_issues = [ + "ImagePullBackOff", + "ErrImagePull", + "ErrImageNeverPull", + "CrashLoopBackOff", + "CreateContainerConfigError", +] + + +class KubernetesAbstraction: + def __init__(self, name, namespace="default", max_tries=25): + self.name = name + self.namespace = namespace + self.max_tries = max_tries + + def get_events(self): + """ + If we get ALL events it can be over 200K tokens. Let's get a smaller set. + """ + obj = self.obj.capitalize() + selector = f"involvedObject.kind={obj},involvedObject.name={self.name}" + events_cmd = [ + "kubectl", + "get", + "events", + "-n", + self.namespace, + f"--field-selector={selector}", + "-o", + "json", + ] + events = subprocess.run(events_cmd, capture_output=True, text=True, check=False) + events = json.loads(events.stdout).get("items", []) + + # Sort events by time and format to be shorter (most important stuff) + events = [ + { + "time": e.get("lastTimestamp"), + "type": e.get("type"), + "reason": e.get("reason"), + "object": e.get("involvedObject", {}).get("name"), + "message": e.get("message"), + } + for e in events + ] + return sorted(events, key=lambda e: e.get("lastTimestamp", "")) + + def get_status(self): + """ + Get the status, return None if not possible. + """ + info = self.get_info() + if not info: + return + return info.get("status", {}) + + def get_info(self): + """ + Get the status, return None if not possible. + """ + info = subprocess.run( + ["kubectl", "get", self.obj, self.name, "-n", self.namespace, "-o", "json"], + capture_output=True, + text=True, + check=False, + ) + if info.returncode == 0: + return json.loads(info.stdout) + + def delete(self): + """ + Delete object so we can... create again? + """ + subprocess.run( + ["kubectl", "delete", self.obj, self.name, "-n", self.namespace, "--ignore-not-found"], + capture_output=True, + check=False, + ) + + +class KubernetesPod(KubernetesAbstraction): + """ + Wrapper to better expose different interactions. + """ + + obj = "pod" + + def get_filtered_status(self): + """ + Gets the most critical status fields from a Job's pod(s). + This is the most valuable source of debugging information. + """ + status = self.get_status() + if not status: + return + + # Extract container statuses, which contain exit codes, reasons, etc. + container_statuses = [] + for cs in status.get("containerStatuses", []): + container_statuses.append( + { + "name": cs.get("name"), + "ready": cs.get("ready"), + "restartCount": cs.get("restartCount"), + "state": cs.get("state"), + "lastState": cs.get("lastState"), + } + ) + + return { + "phase": status.get("phase"), + "reason": status.get("reason"), + "message": status.get("message"), + "containerStatuses": container_statuses, + } + + def has_failed_container(self, pod_status=None): + """ + Determine from container statusues if there is a failed container. + We can pass in a status object in case it needs to sync with pod info. + We return a failure reason, if exists. + """ + pod_status = pod_status or self.get_status() + container_statuses = pod_status.get("containerStatuses", []) + for cs in container_statuses: + if cs.get("state", {}).get("waiting"): + reason = cs["state"]["waiting"].get("reason") + if reason in container_issues: + message = cs["state"]["waiting"].get("message") + logger.error(f"Pod has a fatal container status: {reason}", title="Pod Error") + return f"{reason}\nMessage: {message}" + + def wait_for_ready(self): + """ + Wait for a pod to be ready. + """ + for j in range(self.max_tries): + pod_status = self.get_status() + pod_phase = pod_status.get("phase") + + # Let's assume when we are running the pod is ready for logs. + # If not, we need to check container statuses too. + if pod_phase == "Running": + print(f"[green]Pod '{self.name}' entered running phase.[/green]") + return True + + if pod_phase in ["Succeeded", "Failed"]: + print( + f"[yellow]Pod '{self.name}' entered terminal phase '{pod_phase}' before logging could start.[/yellow]" + ) + return True + + # If we get here, not ready - sleep and try again. + print( + f"[dim]Pod '{self.name}' has status '{pod_phase}'. Waiting... ({j+1}/{self.max_tries})[/dim]" + ) + time.sleep(25) + + # If we get here, fail and timeout + print(f"[red]Pod '{self.name}' never reached running status, state is unknown[/red]") + return False + + +class KubernetesJob(KubernetesAbstraction): + """ + A wrapper around a job to provide an easier means to get + pod names and metadata. + """ + + obj = "job" + + def wait_for_status(self): + """ + Wait for a job to be active and fail or succeed. + """ + is_active, is_failed, is_succeeded = False, False, False + + # Poll for 10 minutes. This assumes a large container that needs to pull + for i in range(60): # 60 * 10s = 600s = 10 minutes + get_status_cmd = [ + "kubectl", + "get", + "job", + self.name, + "-n", + self.namespace, + "-o", + "json", + ] + status_process = subprocess.run( + get_status_cmd, capture_output=True, text=True, check=False + ) + if status_process.returncode != 0: + time.sleep(10) + continue + + status = json.loads(status_process.stdout).get("status", {}) + if status.get("succeeded", 0) > 0: + print("[green]✅ Job succeeded before log streaming began.[/green]") + is_succeeded = True + break + + if status.get("failed", 0) > 0: + print("[red]❌ Job entered failed state.[/red]") + is_failed = True + break + + if status.get("active", 0) > 0: + print("[green]Job is active. Attaching to logs...[/green]") + is_active = True + break + + print(f"[dim]Still waiting... ({i+1}/30)[/dim]") + time.sleep(10) + return is_active, is_failed, is_succeeded + + def get_logs(self): + """ + Get the logs of a pod. + """ + full_logs = "" + # We use the job selector to get logs, which is more robust if the pod was recreated. + log_cmd = ["kubectl", "logs", "-f", f"job/{self.name}", "-n", self.namespace] + with subprocess.Popen( + log_cmd, + stdout=subprocess.PIPE, + stderr=subprocess.STDOUT, + text=True, + bufsize=1, + universal_newlines=True, + ) as log_process: + # We can add a timeout to the log streaming itself if needed + # For now, we wait for it to complete naturally. + full_logs = "".join(log_process.stdout) + return full_logs + + def get_filtered_status(self): + """ + Get a more filtered (streamlined) status to minimize tokens. + Jobs have information about their pods - succeeded, failed, etc. + """ + status = self.get_status() + if not status: + return + return { + "succeeded": status.get("succeeded", 0), + "failed": status.get("failed", 0), + "active": status.get("active", 0), + "conditions": [ + { + "type": c.get("type"), + "status": c.get("status"), + "reason": c.get("reason"), + "message": c.get("message"), + } + for c in status.get("conditions", []) + ], + } + + def get_pod_name(self): + """ + Find the name of the pod created by a specific job. + """ + cmd = [ + "kubectl", + "get", + "pods", + "-n", + self.namespace, + "-l", + f"job-name={self.name}", + "-o", + "jsonpath={.items[0].metadata.name}", + ] + proc = subprocess.run(cmd, capture_output=True, text=True, check=False) + output = proc.stdout.strip() + + # Only return output + if output: + return KubernetesPod(output, self.namespace) diff --git a/fractale/agent/kubernetes_job/prompts.py b/fractale/agent/kubernetes_job/prompts.py index e4dc51a..19e1b0b 100644 --- a/fractale/agent/kubernetes_job/prompts.py +++ b/fractale/agent/kubernetes_job/prompts.py @@ -2,13 +2,16 @@ from fractale.agent.prompts import prompt_wrapper # Requirements are separate to give to error helper agent +# This should explicitly state what the agent is capable of doing. requires = """ - Please deploy to the default namespace. - Do not create or require abstractions beyond the Job (no ConfigMap or Volume or other types) - Do not create or require external data. Use example data provided with the app or follow instructions. - Do not add resources, custom entrypoint/args, affinity, init containers, nodeSelector, or securityContext unless explicitly told to. +- Do NOT add resource requests or limits. The pod should be able to use the full available resources and be Burstable. - Assume that needed software is on the PATH, and don't specify full paths to executables. - Set the backoff limit to 1, assuming if it does not work the first time, it will not. +- You are only scoped to edit the Job manifest for Kubernetes. """ common_instructions = ( @@ -36,7 +39,7 @@ def get_regenerate_prompt(context): generate_prompt = ( - f"""You are a Kubernetes job generator service expert. I need to create a YAML manifest for a Kubernetes Job in an environment for '%s' for the exact container named '%s'. + """You are a Kubernetes job generator service expert. I need to create a YAML manifest for a Kubernetes Job in an environment for '%s' for the exact container named '%s'. Please generate a robust, production-ready manifest. """ @@ -58,7 +61,7 @@ def add_no_pull(prompt, no_pull=False): return prompt -meta_bundle = f""" +meta_bundle = """ --- JOB DESCRIPTION --- %s @@ -70,7 +73,4 @@ def add_no_pull(prompt, no_pull=False): """ failure_message = """Job failed during execution. - ---- Diagnostics --- -%s %s""" diff --git a/fractale/agent/manager/agent.py b/fractale/agent/manager/agent.py index 26f59b2..8ca4ec5 100644 --- a/fractale/agent/manager/agent.py +++ b/fractale/agent/manager/agent.py @@ -1,3 +1,4 @@ +import copy import json import os from datetime import datetime @@ -9,6 +10,7 @@ import fractale.utils as utils from fractale.agent.base import GeminiAgent from fractale.agent.context import Context +from fractale.agent.decorators import timed from fractale.agent.manager.plan import Plan from fractale.utils.timer import Timer @@ -22,55 +24,40 @@ class ManagerAgent(GeminiAgent): The manager can initialize other agents at the order it decides. """ - def get_recovery_step(self, context, failed_step, message): + def get_recovery_step(self, context, failed_step, plan): """ Uses Gemini to decide which agent to call to fix an error. - This is the intelligent error routing engine. """ - print("GET RECOVERY STEP") - import IPython - - IPython.embed() - # move to file - agent_descriptions = "\n".join( - [f"- {name}: {agent.description}" for name, agent in self.agents.items()] - ) - - prompt = f""" - You are an expert AI workflow troubleshooter. A step in a workflow has failed. Your job is to analyze the error and recommend a single, corrective step. - - Available Agents: - {agent_descriptions} - - Context: - - The overall goal is to run a build-and-deploy workflow. - - The step using agent '{failed_step['agent_name']}' failed while trying to: {failed_step['task_description']} - - Error Message: - ``` - {error_message} - ``` - - Instructions: - 1. Analyze the error message to determine the root cause (e.g., is it a Dockerfile syntax error, a Kubernetes resource issue, an image name typo, etc.?). - 2. Decide which agent is best suited to fix this specific error. - 3. Formulate a JSON object for the corrective step with two keys: "agent_name" and "task_description". - 4. The new "task_description" MUST be a clear instruction for the agent to correct the specific error. - - Provide only the single JSON object for the corrective step in your response. - """ - logger.warning("Consulting LLM for error recovery plan...", title="Error Triage") - - response = self.model.generate_content(prompt) + # Only go up to the step we are at... + descriptions = "" + for step in plan.agents: + descriptions += f"- {step.agent}: {step.description}" + if step.agent == failed_step.agent: + break - try: - text = response.text.strip().removeprefix("```json").removesuffix("```").strip() - return json.loads(text) - except (json.JSONDecodeError, AttributeError): - logger.custom( - f"[bold red]Error: Could not parse recovery step from LLM response.[/bold red]\nFull Response:\n{response.text}", - title="[red]Critical Error[/red]", - ) + prompt = prompts.recovery_prompt % (descriptions, failed_step.agent, context.error_message) + logger.warning("Consulting Manager Agent for error recovery plan...", title="Error Triage") + step = None + + while not step: + response = self.model.generate_content(prompt) + try: + step = json.loads(self.get_code_block(response.text, "json")) + if "agent_name" not in step: + raise ValueError("Response is missing 'agent_name'") + if step["agent_name"] not in plan.agent_names: + raise ValueError( + f"Response 'agent_name' {step['agent_name']} is not a known agent for this plan." + ) + except Exception as e: + step = None + prompt = prompts.recovery_error_prompt % ( + descriptions, + failed_step.agent, + context.error_message, + e, + ) + return step def save_results(self, tracker): """ @@ -78,11 +65,14 @@ def save_results(self, tracker): Just ploop into pwd for now, we can eventually take a path. """ + if not os.path.exists(self.results_dir): + os.makedirs(self.results_dir) now = datetime.now() timestamp = now.strftime("%Y-%m-%d_%H-%M-%S") - results_file = os.path.join(os.getcwd(), f"results-{timestamp}.json") + results_file = os.path.join(self.results_dir, f"results-{timestamp}.json") utils.write_json(tracker, results_file) + @timed def run(self, context): """ Executes a plan-driven workflow with intelligent error recovery. How it can work: @@ -91,14 +81,24 @@ def run(self, context): 2. Each agent can define initial inputs. 3. A context directory is handed between agents. Each agent will be given the complete context. """ + # The context is managed, meaning we return updated contexts + context["managed"] = True + + # Store raw context for restore + self._context = copy.deepcopy(context) + # Create a global context context = Context(context) - # The context is managed, meaning we return updated contexts - context.managed = True + # We shouldn't allow the manager to go forever + self.max_attempts = self.max_attempts or 10 # Load plan (required) and pass on setting to use cache to agents - plan = Plan(context.get("plan", required=True), use_cache=self.use_cache) + plan = Plan( + context.get("plan", required=True), + use_cache=self.use_cache, + save_incremental=self.save_incremental, + ) # The manager model works as the orchestrator of work. logger.custom( @@ -117,10 +117,33 @@ def run(self, context): # Raise for now so I can see the issue. except Exception as e: - raise e logger.error( f"Orchestration failed:\n{str(e)}", title="Orchestration Failed", expand=False ) + raise e + + def restore_context(self): + """ + Get a new, updated context. + """ + context = copy.deepcopy(self._context) + context.manager = True + return context + + def reset_context(self, context, plan, failed_step=None): + """ + reset context up to failed state. + + If no failed state provided, reset the entire thing. + """ + for step in plan.agents: + context = step.reset_context(context) + for key in ["result", "return_code"]: + if key in context: + del context[key] + if failed_step is not None and step.agent == failed_step.agent: + break + return context def run_tasks(self, context, plan): """ @@ -129,34 +152,21 @@ def run_tasks(self, context, plan): Each step in the plan can have a maximum number of attempts. """ # These are top level attempts. Each agent has its own counter - # that is allowed to go up to some limit. - - attempts = {} - # Keep track of sequence of agent running times and sequence, and times + # that is allowed to go up to some limit. The manager will + # attempt the entire thing some number of times. Note that + # I haven't tested this yet. tracker = [] timer = Timer() current_step_index = 0 - # Keep going until the plan is done, or max attempts reached for a step + # Keep going until the plan is done, or max attempts reached for the manager + # Each step has its own internal max attempts (just another agent) while current_step_index < len(plan): # Get the step - we already have validated the agent step = plan[current_step_index] - - # Keep track of attempts and check if we've gone over - if step.agent not in attempts: - attempts[step.agent] = 0 - - # Each time build runs, it has its own internal attempts counter. - # Important: this counter is external (different) for the entire step - # which has some number of internal attempts that reset each time - if step.reached_maximum_attempts(attempts[step.agent]): - print(f"[red]Agent '{step.agent}' has reached max attempts {step.attempts}.[/red]") - break - attempts[step.agent] += 1 - logger.custom( f"Executing step {current_step_index + 1}/{len(plan)} with agent [bold cyan]{step.agent}[/bold cyan]", - title=f"[blue]Orchestrator Attempt {attempts[step.agent]}[/blue]", + title=f"[blue]Orchestrator Attempt {self.attempts}[/blue]", ) # Execute the agent. @@ -167,7 +177,17 @@ def run_tasks(self, context, plan): # Keep track of running the agent and the time it took # Also keep result of each build step (we assume there is one) - tracker.append([step.agent, timer.elapsed_time, context.get("result")]) + # We will eventually want to also save the log. + tracker.append( + { + "agent": step.agent, + "total_seconds": timer.elapsed_time, + "result": context.get("result"), + # We start counting at 0 + "attempts": step.attempts + 1, + "metadata": step.logs(), + } + ) # If we are successful, we go to the next step. # Not setting a return code indicates success. @@ -178,43 +198,33 @@ def run_tasks(self, context, plan): # If we reach max attempts and no success, we need to intervene else: - print("STEP NOT SUCCESSFUL: TODO what to do here?") - # Try getting recovery step and ask manager to choose next action - import IPython - - IPython.embed() - - # This is the intiial (cleaned) prompt to give the manager context - instruction = step.get_initial_prompt(context) - - # Give the error message to the manager to triage - prompt = prompts.get_retry_prompt(instruction, context.result) - response = self.ask_gemini(prompt, with_history=False) - logger.error( - f"Step failed. Instruction to agent:\n{context.result}", - title=f"Manager Instruction: {step.agent}", - expand=False, - ) - context.reset() - context.error_message = response - continue + # Do we try again? If we reached max, break to cut out of loop + if self.reached_max_attempts(): + break + + # Otherwise, we want to get a recovery step and keep going + self.attempts += 1 - # TODO: we eventually want to allow the LLM to choose a step + # If we are at the first step, just reset and try again. + if current_step_index == 0: + context = self.reset_context(context, plan=plan) + continue + + # Allow the LLM to choose a step # At this point we need to get a recovery step, and include the entire context - # up to that point. - recovery_step = self.get_recovery_step(context, step, message) - if recovery_step: - logger.warning() - print( - f"Inserting recovery step from agent [bold cyan]{recovery_step['agent_name']}[/bold cyan].", - title="Recovery Plan", - ) - # TODO this shouldn't be an insert, but a find and replace and then - # updating of the index to supoprt that. - plan.insert(current_step_index, recovery_step) - else: - print("[red]Could not determine recovery step. Aborting workflow.[/red]") - break + recovery_step = self.get_recovery_step(context, step, plan) + print( + f"Attempting recovery step from agent [bold cyan]{recovery_step['agent_name']}[/bold cyan].", + ) + current_step_index = [ + i + for i, step in enumerate(plan.agents) + if step.agent == recovery_step["agent_name"] + ][0] + # Reset the context. This removes output and stateful variables UP TO the failed + # step so we don't give context that leads to another erroneous state + context = self.reset_context(context, plan, step) + continue # If successful, reset the context for the next step. # This resets return code and result only. diff --git a/fractale/agent/manager/plan.py b/fractale/agent/manager/plan.py index bc1bfca..7b628f8 100644 --- a/fractale/agent/manager/plan.py +++ b/fractale/agent/manager/plan.py @@ -1,11 +1,8 @@ -import copy - import jsonschema from jsonschema import validators from rich import print import fractale.utils as utils -from fractale.agent.context import get_context def set_defaults(validator, properties, instance, schema): @@ -56,11 +53,12 @@ class Plan: A plan for a manager includes one or more agents and a goal. """ - def __init__(self, plan_path, use_cache=False): + def __init__(self, plan_path, use_cache=False, save_incremental=False): self.plan_path = plan_path self.plan = utils.read_yaml(plan_path) self.agent_names = set() self.use_cache = use_cache + self.save_incremental = save_incremental self.validate() self.load() @@ -81,7 +79,12 @@ def load(self): raise ValueError(f"Agent {spec['agent']} is not known.") # Add the agent to the step - step = Step(spec, known_agents[agent_name], use_cache=self.use_cache) + step = Step( + spec, + known_agents[agent_name], + use_cache=self.use_cache, + save_incremental=self.save_incremental, + ) # The agents are retrieved via index self.agents.append(step) @@ -116,13 +119,26 @@ def __getitem__(self, key): class Step: - def __init__(self, step, agent, use_cache=False): + """ + A step is a simple abstraction to hold an agent and a plan. + + It could be unecessary, but I am keeping for now. + """ + + def __init__(self, step, agent, use_cache=False, save_incremental=False): self.step = step - self._agent = agent(use_cache=use_cache) # If the step context defines a max number of attempts, set it for the agent max_attempts = self.step["context"].get("max_attempts") - self._agent.set_max_attempts(max_attempts) + self._agent = agent( + use_cache=use_cache, save_incremental=save_incremental, max_attempts=max_attempts + ) + + def logs(self): + """ + Courtesy function to expose agent logs. + """ + return self._agent.metadata def update(self, context): """ @@ -137,12 +153,6 @@ def update(self, context): context[k] = v return context - def get_initial_prompt(self, context): - """ - More easily expose the get_initial_prompt function on the agent. - """ - return self._agent.get_initial_prompt(context) - def execute(self, context): """ Execute a plan step (associated with an agent) @@ -153,31 +163,33 @@ def execute(self, context): # This is the context returned return self._agent.run(context) + def mark_retry(self): + """ + A function to mark that the entire plan was retried. + """ + self._agent.metadata["retries"] += 1 + @property def agent(self): return self.get("agent") + @property + def attempts(self): + return self._agent.attempts + @property def context(self): return self.get("context") - @property - def attempts(self): - return self.get("attempts") + def reset_context( + self, + context, + ): + return self._agent.reset_context(context) @property def description(self): return self.get("description", f"This is a {self.agent} agent.") - def reached_maximum_attempts(self, attempts): - """ - Determine if we have reached maximum attempts for the step. - """ - if self.attempts is None: - return False - if self.attempts > attempts: - return True - return False - def get(self, name, default=None): return self.step.get(name) or default diff --git a/fractale/agent/manager/prompts.py b/fractale/agent/manager/prompts.py index f797b90..712c8da 100644 --- a/fractale/agent/manager/prompts.py +++ b/fractale/agent/manager/prompts.py @@ -1,3 +1,17 @@ +recovery_prompt = f"""You are an expert AI workflow troubleshooter. A step in a workflow has failed and reached a maximum number of retries. THis could mean that we need to go back in the workflow and redo work. Your job is to analyze the error and recommend a single, corrective step. The steps, each associated with an agent, you can choose from are as follows: + +Available Agents: +%s + +The above is in the correct order, and ends on the agent that ran last with the failure (%s). The error message of the last step is the following: + +%s + +Your job is to analyze the error message to determine the root cause, and decide which agent is best suited to fix this specific error. Formulate a JSON object for the corrective step with two keys: "agent_name" and "task_description". The new "task_description" MUST be a clear instruction for the agent to correct the specific error. Provide only the single JSON object for the corrective step in your response. +""" + +recovery_error_prompt = recovery_prompt.strip() + " Your first attempt was not successful:\n%s" + retry_prompt = """You are a manager of LLM agents. A step in your plan has failed. We are going to try again. Please prepare a prompt for the agent that includes instructions for a suggested fix. Here is the original prompt given to this agent. Please use this to be mindful of the instructions that you provide back, diff --git a/fractale/cli/__init__.py b/fractale/cli/__init__.py index 97ce84e..45058a5 100644 --- a/fractale/cli/__init__.py +++ b/fractale/cli/__init__.py @@ -97,6 +97,28 @@ def get_parser(): action="store_true", default=False, ) + agent.add_argument( + "--max-attempts", + help="Maximum attempts for a manager or individual agent", + default=None, + type=int, + ) + agent.add_argument( + "--results", + help="Save to a custom results directory.", + ) + agent.add_argument( + "--incremental", + help="Save incremental results for later inspection", + action="store_true", + default=False, + ) + agent.add_argument( + "--allow-interactive", + help="Allow the agent to suggest an interactive terminal for you (the user) to debug.", + action="store_true", + default=False, + ) # Add agent parsers parsers.register(agents) diff --git a/fractale/cli/agent.py b/fractale/cli/agent.py index 23de2be..8fbbf3f 100644 --- a/fractale/cli/agent.py +++ b/fractale/cli/agent.py @@ -18,8 +18,15 @@ def main(args, extra, **kwargs): sys.exit(f"{args.agent_name} is not a recognized agent.") # Get the agent and run! - # Save determines if we want to save state to an output directory - agent = agents[args.agent_name](use_cache=args.use_cache) + # - results determines if we want to save state to an output directory + # - save_incremental will add a metadata section + # - max_attempts is for the manager agent (defaults to 10) + agent = agents[args.agent_name]( + use_cache=args.use_cache, + results_dir=args.results, + save_incremental=args.incremental, + max_attempts=args.max_attempts, + ) # This is the context - we can remove variables not needed context = vars(args)