Skip to content

Commit 0c60af7

Browse files
committed
Adding all changes for temporal to autokill tasks on restart
1 parent 1bf72ef commit 0c60af7

File tree

4 files changed

+348
-9
lines changed

4 files changed

+348
-9
lines changed

src/agentex/lib/cli/commands/agents.py

Lines changed: 70 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -11,6 +11,7 @@
1111
build_agent,
1212
run_agent,
1313
)
14+
from agentex.lib.cli.handlers.cleanup_handlers import cleanup_agent_workflows
1415
from agentex.lib.cli.handlers.deploy_handlers import (
1516
DeploymentError,
1617
HelmError,
@@ -71,6 +72,35 @@ def delete(
7172
logger.info(f"Agent deleted: {agent_name}")
7273

7374

75+
@agents.command()
76+
def cleanup_workflows(
77+
agent_name: str = typer.Argument(..., help="Name of the agent to cleanup workflows for"),
78+
force: bool = typer.Option(False, help="Force cleanup using direct Temporal termination (bypasses development check)"),
79+
):
80+
"""
81+
Clean up all running workflows for an agent.
82+
83+
By default, uses graceful cancellation via agent RPC.
84+
With --force, directly terminates workflows via Temporal client.
85+
This is a convenience command that does the same thing as 'agentex tasks cleanup'.
86+
"""
87+
try:
88+
console.print(f"[blue]Cleaning up workflows for agent '{agent_name}'...[/blue]")
89+
90+
cleanup_agent_workflows(
91+
agent_name=agent_name,
92+
force=force,
93+
development_only=True
94+
)
95+
96+
console.print(f"[green]✓ Workflow cleanup completed for agent '{agent_name}'[/green]")
97+
98+
except Exception as e:
99+
console.print(f"[red]Cleanup failed: {str(e)}[/red]")
100+
logger.exception("Agent workflow cleanup failed")
101+
raise typer.Exit(1) from e
102+
103+
74104
@agents.command()
75105
def build(
76106
manifest: str = typer.Option(..., help="Path to the manifest you want to use"),
@@ -101,23 +131,33 @@ def build(
101131
"""
102132
typer.echo(f"Building agent image from manifest: {manifest}")
103133

134+
# Validate required parameters for building
135+
if push and not registry:
136+
typer.echo("Error: --registry is required when --push is enabled", err=True)
137+
raise typer.Exit(1)
138+
139+
# Only proceed with build if we have a registry (for now, to match existing behavior)
140+
if not registry:
141+
typer.echo("No registry provided, skipping image build")
142+
return
143+
104144
platform_list = platforms.split(",") if platforms else []
105145

106146
try:
107147
image_url = build_agent(
108148
manifest_path=manifest,
109-
registry_url=registry,
110-
repository_name=repository_name,
149+
registry_url=registry, # Now guaranteed to be non-None
150+
repository_name=repository_name or "default-repo", # Provide default
111151
platforms=platform_list,
112152
push=push,
113-
secret=secret,
114-
tag=tag,
115-
build_args=build_arg,
153+
secret=secret or "", # Provide default empty string
154+
tag=tag or "latest", # Provide default
155+
build_args=build_arg or [], # Provide default empty list
116156
)
117157
if image_url:
118158
typer.echo(f"Successfully built image: {image_url}")
119159
else:
120-
typer.echo("No registry provided, image was not built")
160+
typer.echo("Image build completed but no URL returned")
121161
except Exception as e:
122162
typer.echo(f"Error building agent image: {str(e)}", err=True)
123163
logger.exception("Error building agent image")
@@ -127,11 +167,35 @@ def build(
127167
@agents.command()
128168
def run(
129169
manifest: str = typer.Option(..., help="Path to the manifest you want to use"),
170+
cleanup_on_start: bool = typer.Option(
171+
False,
172+
help="Clean up existing workflows for this agent before starting"
173+
),
130174
):
131175
"""
132176
Run an agent locally from the given manifest.
133177
"""
134178
typer.echo(f"Running agent from manifest: {manifest}")
179+
180+
# Optionally cleanup existing workflows before starting
181+
if cleanup_on_start:
182+
try:
183+
# Parse manifest to get agent name
184+
manifest_obj = AgentManifest.from_yaml(file_path=manifest)
185+
agent_name = manifest_obj.agent.name
186+
187+
console.print(f"[yellow]Cleaning up existing workflows for agent '{agent_name}'...[/yellow]")
188+
cleanup_agent_workflows(
189+
agent_name=agent_name,
190+
force=False,
191+
development_only=True
192+
)
193+
console.print("[green]✓ Pre-run cleanup completed[/green]")
194+
195+
except Exception as e:
196+
console.print(f"[yellow]⚠ Pre-run cleanup failed: {str(e)}[/yellow]")
197+
logger.warning(f"Pre-run cleanup failed: {e}")
198+
135199
try:
136200
run_agent(manifest_path=manifest)
137201
except Exception as e:

src/agentex/lib/cli/commands/tasks.py

Lines changed: 72 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,10 +1,13 @@
11
import typer
22
from rich import print_json
3+
from rich.console import Console
34

45
from agentex import Agentex
6+
from agentex.lib.cli.handlers.cleanup_handlers import cleanup_agent_workflows
57
from agentex.lib.utils.logging import make_logger
68

79
logger = make_logger(__name__)
10+
console = Console()
811

912
tasks = typer.Typer()
1013

@@ -33,6 +36,47 @@ def list():
3336
print_json(data=[task.to_dict() for task in tasks])
3437

3538

39+
@tasks.command()
40+
def list_running(
41+
agent_name: str = typer.Option(..., help="Name of the agent to list running tasks for"),
42+
):
43+
"""
44+
List all currently running tasks for a specific agent.
45+
"""
46+
client = Agentex()
47+
all_tasks = client.tasks.list()
48+
running_tasks = [task for task in all_tasks if hasattr(task, 'status') and task.status == "RUNNING"]
49+
50+
if not running_tasks:
51+
console.print(f"[yellow]No running tasks found for agent '{agent_name}'[/yellow]")
52+
return
53+
54+
console.print(f"[green]Found {len(running_tasks)} running task(s) for agent '{agent_name}':[/green]")
55+
56+
# Convert to dict with proper datetime serialization
57+
serializable_tasks = []
58+
for task in running_tasks:
59+
try:
60+
# Use model_dump with mode='json' for proper datetime handling
61+
if hasattr(task, 'model_dump'):
62+
serializable_tasks.append(task.model_dump(mode='json'))
63+
else:
64+
# Fallback for non-Pydantic objects
65+
serializable_tasks.append({
66+
"id": getattr(task, 'id', 'unknown'),
67+
"status": getattr(task, 'status', 'unknown')
68+
})
69+
except Exception as e:
70+
logger.warning(f"Failed to serialize task: {e}")
71+
# Minimal fallback
72+
serializable_tasks.append({
73+
"id": getattr(task, 'id', 'unknown'),
74+
"status": getattr(task, 'status', 'unknown')
75+
})
76+
77+
print_json(data=serializable_tasks)
78+
79+
3680
@tasks.command()
3781
def delete(
3882
task_id: str = typer.Argument(..., help="ID of the task to delete"),
@@ -44,3 +88,31 @@ def delete(
4488
client = Agentex()
4589
client.tasks.delete(task_id=task_id)
4690
logger.info(f"Task deleted: {task_id}")
91+
92+
93+
@tasks.command()
94+
def cleanup(
95+
agent_name: str = typer.Option(..., help="Name of the agent to cleanup tasks for"),
96+
force: bool = typer.Option(False, help="Force cleanup using direct Temporal termination (bypasses development check)"),
97+
):
98+
"""
99+
Clean up all running tasks/workflows for an agent.
100+
101+
By default, uses graceful cancellation via agent RPC.
102+
With --force, directly terminates workflows via Temporal client.
103+
"""
104+
try:
105+
console.print(f"[blue]Starting cleanup for agent '{agent_name}'...[/blue]")
106+
107+
cleanup_agent_workflows(
108+
agent_name=agent_name,
109+
force=force,
110+
development_only=True
111+
)
112+
113+
console.print(f"[green]✓ Cleanup completed for agent '{agent_name}'[/green]")
114+
115+
except Exception as e:
116+
console.print(f"[red]Cleanup failed: {str(e)}[/red]")
117+
logger.exception("Task cleanup failed")
118+
raise typer.Exit(1) from e

0 commit comments

Comments
 (0)