Skip to content

Commit c0ec067

Browse files
committed
wip: new results and optimize agent
Signed-off-by: vsoch <[email protected]>
1 parent 85ecd28 commit c0ec067

File tree

13 files changed

+442
-26
lines changed

13 files changed

+442
-26
lines changed

fractale/agent/base.py

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -49,7 +49,7 @@ def __init__(
4949
self.init()
5050

5151
def init_metadata(self):
52-
self.metadata = {"times": {}, "assets": {}, "ask_gemini": [], "retries": 0, "failures": []}
52+
self.metadata = {"times": {}, "assets": {}, "retries": 0, "failures": []}
5353

5454
@save_result
5555
def run(self, context):
@@ -294,6 +294,8 @@ def save_gemini_metadata(self, elapsed_time, response, with_history):
294294
"""
295295
Save gemini response metadata and elapsed time
296296
"""
297+
if "ask_gemini" not in self.metadata:
298+
self.metadata["ask_gemini"] = []
297299
self.metadata["ask_gemini"].append(
298300
{
299301
"conversation_history": with_history,

fractale/agent/kubernetes/__init__.py

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,2 +1,3 @@
11
from .job import KubernetesJobAgent
2-
assert KubernetesJobAgent
2+
3+
assert KubernetesJobAgent

fractale/agent/kubernetes/base.py

Lines changed: 60 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,9 @@
11
import argparse
2+
import json
3+
import subprocess
4+
5+
from rich import print
6+
from rich.panel import Panel
27
from rich.syntax import Syntax
38

49
import fractale.agent.logger as logger
@@ -45,7 +50,6 @@ def print_result(self, job_crd):
4550
highlighted_syntax, title="Final Kubernetes Job", border_style="green", expand=True
4651
)
4752

48-
4953
def save_log(self, full_logs):
5054
"""
5155
Save logs to metadata
@@ -64,4 +68,58 @@ def save_job_manifest(self, job):
6468
self.metadata["assets"][self.result_type] = []
6569
self.metadata["assets"][self.result_type].append(
6670
{"item": job, "attempt": self.attempts}
67-
)
71+
)
72+
73+
def cluster_resources(self):
74+
"""
75+
Get cluster resources - count of nodes and resources.
76+
I was thinking of caching this, but clusters can change,
77+
and it's easy (and inexpensive) enough to query that we repeat.
78+
"""
79+
print("[yellow]Querying Kubernetes cluster for node resources...[/yellow]")
80+
try:
81+
# Execute the kubectl command
82+
result = subprocess.run(
83+
["kubectl", "get", "nodes", "-o", "json"],
84+
capture_output=True,
85+
text=True,
86+
check=True,
87+
timeout=30,
88+
)
89+
90+
# Parse the JSON output
91+
nodes_data = json.loads(result.stdout)
92+
nodes = nodes_data.get("items", [])
93+
94+
if not nodes:
95+
print("[red]Error: No nodes found in the cluster.[/red]")
96+
return None
97+
98+
# Keep a listing (with count) of node specs
99+
# The key is the cpu, memory, and arch, and then node count
100+
node_specs = {}
101+
for node in nodes:
102+
node_spec = (
103+
node["status"]["allocatable"]["cpu"],
104+
node["status"]["allocatable"]["memory"],
105+
node["status"]["nodeInfo"]["architecture"],
106+
)
107+
if node_spec not in node_specs:
108+
node_specs[node_spec] = 0
109+
node_specs[node_spec] += 1
110+
111+
# Ensure we expand the resources
112+
node_specs = [
113+
{"cpu": x[0], "memory": x[1], "arch": x[2], "count": v}
114+
for x, v in node_specs.items()
115+
]
116+
cluster_info = {"total_nodes": len(nodes), "node_specs": node_specs}
117+
118+
print("[green]✅ Successfully retrieved cluster information.[/green]")
119+
return cluster_info
120+
121+
except Exception as e:
122+
print(
123+
f"[bold red]Error executing kubectl command. Do you have access to the cluster?[/bold red]"
124+
)
125+
print(f"Stderr: {e.stderr}")

fractale/agent/kubernetes/job/agent.py

Lines changed: 72 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,5 @@
1-
import argparse
21
import json
32
import os
4-
import re
53
import shutil
64
import subprocess
75
import sys
@@ -11,17 +9,16 @@
119

1210
import yaml
1311
from rich import print
14-
from rich.syntax import Syntax
1512

16-
import fractale.agent.kubernetes.objects as objects
17-
from fractale.agent.kubernetes.base import KubernetesAgent
1813
import fractale.agent.kubernetes.job.prompts as prompts
14+
import fractale.agent.kubernetes.objects as objects
1915
import fractale.agent.logger as logger
2016
import fractale.utils as utils
21-
from fractale.agent.base import GeminiAgent
2217
from fractale.agent.context import get_context
2318
from fractale.agent.decorators import timed
2419
from fractale.agent.errors import DebugAgent
20+
from fractale.agent.kubernetes.base import KubernetesAgent
21+
from fractale.agent.optimize import OptimizationAgent
2522

2623

2724
class KubernetesJobAgent(KubernetesAgent):
@@ -33,6 +30,13 @@ class KubernetesJobAgent(KubernetesAgent):
3330
description = "Kubernetes Job agent"
3431
result_type = "kubernetes-job-manifest"
3532

33+
def __init__(self, *args, **kwargs):
34+
"""
35+
Add the optimization agent, even if we don't need it.
36+
"""
37+
super().__init__(*args, **kwargs)
38+
self.optimize_agent = OptimizationAgent()
39+
3640
def get_prompt(self, context):
3741
"""
3842
Get the prompt for the LLM. We expose this so the manager can take it
@@ -115,15 +119,19 @@ def get_diagnostics(self, job, pod):
115119
Helper to collect rich error data for a failed job.
116120
"""
117121
print("[yellow]Gathering diagnostics for failed job...[/yellow]")
118-
pod_status = pod.get_filtered_status()
122+
pod_events = []
123+
pods_description = ""
124+
if pod is not None:
125+
pod_status = pod.get_filtered_status()
126+
pod_events = pod.get_events()
127+
pods_description = json.dumps(pod_status)
128+
119129
job_status = job.get_filtered_status()
120130

121131
# Use json.dumps because it's more compact (maybe fewer tokens)
122-
pod_events = pod.get_events()
123132
job_events = job.get_events()
124133
events = sorted(job_events + pod_events, key=lambda e: e.get("lastTimestamp", ""))
125134
job_description = json.dumps(job_status)
126-
pods_description = json.dumps(pod_status)
127135
events_description = json.dumps(events)
128136
full_logs = job.get_logs()
129137

@@ -139,7 +147,6 @@ def deploy(self, context):
139147
Deploy the Kubernetes Job.
140148
"""
141149
job_crd = context.result
142-
cleanup = context.get("cleanup", True)
143150

144151
# Not sure if this can happen, assume it can
145152
if not job_crd:
@@ -189,6 +196,12 @@ def deploy(self, context):
189196
job_data["spec"]["template"]["spec"]["containers"][0]["command"] = ["sleep", "infinity"]
190197
job_crd = yaml.dump(job_data)
191198

199+
# Create job objects (and eventually pod)
200+
# But ensure we delete any that might exist from before.
201+
job = objects.KubernetesJob(job_name, namespace)
202+
job.delete()
203+
pod = None
204+
192205
# Write the manifest to a temporary directory
193206
job_manifest_path = os.path.join(deploy_dir, "job.yaml")
194207
utils.write_file(job_crd, job_manifest_path)
@@ -218,10 +231,6 @@ def deploy(self, context):
218231
# 2. We then need to wait until the job is running or fails
219232
print("[yellow]Waiting for Job to start... (Timeout: 5 minutes)[/yellow]")
220233

221-
# Create job objects (and eventually pod)
222-
job = objects.KubernetesJob(job_name, namespace)
223-
pod = None
224-
225234
# This assumes a backoff / retry of 1, so we aren't doing recreation
226235
# If it fails once, it fails once and for all.
227236
# 30 * 5s = 150s (2.5 minutes!)
@@ -245,7 +254,12 @@ def deploy(self, context):
245254
)
246255

247256
# 2. If the job isn't terminal, find the pod. It may not exist yet.
248-
pod = pod or job.get_pod_name()
257+
tries = 0
258+
while not pod and tries < 10:
259+
print("Waiting for pod...")
260+
pod = job.get_pod()
261+
time.sleep(5)
262+
tries += 1
249263

250264
# 3. If a pod exists, inspect it deeply for fatal errors or readiness.
251265
if pod:
@@ -320,21 +334,63 @@ def deploy(self, context):
320334
# But did it succeed?
321335
if final_status.get("succeeded", 0) > 0:
322336
print("\n[green]✅ Job final status is Succeeded.[/green]")
337+
338+
# if we want to optimize, we continue to run until we are instructed not to.
339+
if context.get("optimize") is not None:
340+
341+
# TODO move into own function?
342+
# We should provide the cluster resources to the agent
343+
resources = self.cluster_resources()
344+
345+
# The agent calling the optimize agent decides what metadata to present.
346+
# This is how this agent will work for cloud vs. bare metal
347+
context.requires = prompts.get_optimize_prompt(context, resources)
348+
context = self.optimize_agent.run(context, full_logs)
349+
350+
# Go through spec and update fields that match.
351+
decision = context.optimize_result["decision"]
352+
print(f"\n[green]✅ Optimization agent decided to {decision}.[/green]")
353+
if decision == "RETRY":
354+
355+
# Retry will mean recreating job
356+
job.delete()
357+
context.result = self.update_job_crd(context.optimize_result, job_crd)
358+
print(context.result)
359+
return self.deploy(context)
360+
361+
# Agent has decided to return - no more optimize.
362+
# TODO: we need to ensure regex can be passed from context (and input)
363+
# Here we add the optimization agent metadata the agent here for saving
364+
self.optimize_agent.metadata["foms"] = self.optimize_agent.foms
365+
self.metadata["assets"]["optimize"] = self.optimize_agent.metadata
366+
return 0, full_logs
367+
323368
else:
324369
print("\n[red]❌ Job final status is Failed.[/red]")
325370
diagnostics = self.get_diagnostics(job, pod)
326371
job.delete()
327372
# We already have the logs, so we can pass them directly.
328373
return 1, prompts.failure_message % diagnostics
329374

330-
if cleanup and os.path.exists(deploy_dir):
375+
if context.get("cleanup") is True and os.path.exists(deploy_dir):
331376
print(f"[dim]Cleaning up temporary deploy directory: {deploy_dir}[/dim]")
332377
job.delete()
333378
shutil.rmtree(deploy_dir, ignore_errors=True)
334379

335380
# Save full logs for the step
336381
return 0, full_logs
337382

383+
def update_job_crd(self, updates, job_crd):
384+
"""
385+
Update the job crd with a set of controlled fields.
386+
"""
387+
for key in ["decision", "reason"]:
388+
if key in updates:
389+
del updates[key]
390+
prompt = prompts.update_prompt % (job_crd, json.dumps(updates))
391+
result = self.ask_gemini(prompt)
392+
return self.get_code_block(result, "yaml")
393+
338394
def save_job_manifest(self, job):
339395
"""
340396
Save job manifest to metadata

fractale/agent/kubernetes/job/prompts.py

Lines changed: 42 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,3 +1,5 @@
1+
import json
2+
13
import fractale.agent.defaults as defaults
24
from fractale.agent.prompts import prompt_wrapper
35

@@ -29,6 +31,46 @@
2931
%s
3032
"""
3133

34+
update_prompt = """You are a Kubernetes Job update agent. Your job is to take a spec of updates for a Job Manifest and apply them.
35+
You are NOT allowed to make other changes to the manifest. Ignore the 'decision' field and if you think appropriate, add context from "reason" as comments.
36+
Here are the updates:
37+
38+
%s
39+
40+
And here is the Job manifest to apply them to:
41+
%s
42+
Return ONLY the YAML with no other text or commentary.
43+
"""
44+
45+
46+
def get_optimize_prompt(context, resources):
47+
"""
48+
Get a description of cluster resources and optimization goals.
49+
"""
50+
prompt = """
51+
Your task is to optimize the running of a Kubernetes Job: %s in %s. You are allowed to request anywhere in the range of available resources, including count and type. Here are the available resources:
52+
%s
53+
Here is the current job manifest:
54+
```yaml
55+
%s
56+
```
57+
Please return ONLY a json structure to be loaded that includes a limited set of fields (with keys corresponding to the names that are organized the same as a Kubernetes Job, e.g., spec -> template -spec.
58+
The result should be provided as json. The fields should map 1:1 into a pod spec serialzied as json.
59+
Do not make requests that lead to Guaranteed pods. DO NOT CHANGE PROBLEM SIZE PARAMETERS OR COMMAND. You can change args. Remember that
60+
to get a full node resources you often have to ask for slightly less than what is available.
61+
""" % (
62+
context.optimize,
63+
context.environment,
64+
json.dumps(resources),
65+
context.result,
66+
)
67+
dockerfile = context.get("dockerfile")
68+
if dockerfile:
69+
prompt += (
70+
f" Here is the Dockerfile that helped to generate the application.\n {dockerfile}\n"
71+
)
72+
return prompt
73+
3274

3375
def get_regenerate_prompt(context):
3476
"""

fractale/agent/kubernetes/objects.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -266,7 +266,7 @@ def get_filtered_status(self):
266266
],
267267
}
268268

269-
def get_pod_name(self):
269+
def get_pod(self):
270270
"""
271271
Find the name of the pod created by a specific job.
272272
"""

fractale/agent/manager/agent.py

Lines changed: 4 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -59,18 +59,17 @@ def get_recovery_step(self, context, failed_step, plan):
5959
)
6060
return step
6161

62-
def save_results(self, tracker):
62+
def save_results(self, tracker, plan):
6363
"""
6464
Save results to file based on timestamp.
65-
66-
Just ploop into pwd for now, we can eventually take a path.
6765
"""
6866
if not os.path.exists(self.results_dir):
6967
os.makedirs(self.results_dir)
7068
now = datetime.now()
7169
timestamp = now.strftime("%Y-%m-%d_%H-%M-%S")
7270
results_file = os.path.join(self.results_dir, f"results-{timestamp}.json")
73-
utils.write_json(tracker, results_file)
71+
result = {"steps": tracker, "manager": plan.plan}
72+
utils.write_json(result, results_file)
7473

7574
@timed
7675
def run(self, context):
@@ -113,7 +112,7 @@ def run(self, context):
113112
f"Agentic tasks complete: [bold magenta]{len(tracker)} agent runs[/bold magenta]",
114113
title="[green]Manager Status[/green]",
115114
)
116-
self.save_results(tracker)
115+
self.save_results(tracker, plan)
117116

118117
# Raise for now so I can see the issue.
119118
except Exception as e:

fractale/agent/optimize/__init__.py

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1 @@
1+
from .agent import OptimizationAgent

0 commit comments

Comments
 (0)