Skip to content

Commit b63e1bb

Browse files
committed
Refactor workflow runs list to use MCP tool calls
1 parent 2666061 commit b63e1bb

File tree

1 file changed

+172
-86
lines changed
  • src/mcp_agent/cli/cloud/commands/workflows/runs

1 file changed

+172
-86
lines changed

src/mcp_agent/cli/cloud/commands/workflows/runs/main.py

Lines changed: 172 additions & 86 deletions
Original file line numberDiff line numberDiff line change
@@ -7,18 +7,90 @@
77
import yaml
88
from rich.table import Table
99

10+
from mcp_agent.app import MCPApp
1011
from mcp_agent.cli.core.utils import run_async
11-
from mcp_agent.cli.mcp_app.api_client import WorkflowExecutionStatus
12+
from mcp_agent.cli.exceptions import CLIError
13+
from mcp_agent.config import MCPServerSettings, Settings, LoggerSettings
14+
from mcp_agent.mcp.gen_client import gen_client
1215
from ...utils import (
1316
setup_authenticated_client,
1417
validate_output_format,
15-
handle_server_api_errors,
1618
resolve_server,
1719
)
1820
from mcp_agent.cli.utils.ux import console, print_info
1921

2022

21-
@handle_server_api_errors
23+
async def _list_workflow_runs_async(
24+
server_id_or_url: str, limit: Optional[int], status: Optional[str], format: str
25+
) -> None:
26+
"""List workflow runs using MCP tool calls to a deployed server."""
27+
if server_id_or_url.startswith(("http://", "https://")):
28+
server_url = server_id_or_url
29+
else:
30+
client = setup_authenticated_client()
31+
server = resolve_server(client, server_id_or_url)
32+
33+
if hasattr(server, "appServerInfo") and server.appServerInfo:
34+
server_url = server.appServerInfo.serverUrl
35+
else:
36+
raise CLIError(
37+
f"Server '{server_id_or_url}' is not deployed or has no server URL"
38+
)
39+
40+
if not server_url:
41+
raise CLIError(f"No server URL found for server '{server_id_or_url}'")
42+
43+
quiet_settings = Settings(logger=LoggerSettings(level="error"))
44+
app = MCPApp(name="workflows_cli", settings=quiet_settings)
45+
46+
try:
47+
async with app.run() as workflow_app:
48+
context = workflow_app.context
49+
50+
sse_url = (
51+
f"{server_url}/sse" if not server_url.endswith("/sse") else server_url
52+
)
53+
context.server_registry.registry["workflow_server"] = MCPServerSettings(
54+
name="workflow_server",
55+
description=f"Deployed MCP server {server_url}",
56+
url=sse_url,
57+
transport="sse",
58+
)
59+
60+
async with gen_client(
61+
"workflow_server", server_registry=context.server_registry
62+
) as client:
63+
result = await client.call_tool("workflows-runs-list", {})
64+
65+
workflows_data = result.content[0].text if result.content else []
66+
if isinstance(workflows_data, str):
67+
workflows_data = json.loads(workflows_data)
68+
69+
if not workflows_data:
70+
workflows_data = []
71+
72+
# Apply filtering
73+
workflows = workflows_data
74+
if status:
75+
status_filter = _get_status_filter(status)
76+
workflows = [w for w in workflows if _matches_status(w, status_filter)]
77+
78+
if limit:
79+
workflows = workflows[:limit]
80+
81+
if format == "json":
82+
_print_workflows_json(workflows)
83+
elif format == "yaml":
84+
_print_workflows_yaml(workflows)
85+
else:
86+
_print_workflows_text(workflows, status, server_id_or_url)
87+
88+
except Exception as e:
89+
raise CLIError(
90+
f"Error listing workflow runs for server {server_id_or_url}: {str(e)}"
91+
) from e
92+
93+
2294
def list_workflow_runs(
2395
server_id_or_url: str = typer.Argument(
2496
..., help="Server ID, app config ID, or server URL to list workflow runs for"
@@ -46,62 +118,38 @@ def list_workflow_runs(
46118
mcp-agent cloud workflows runs apcnf_xyz789 --limit 10 --format json
47119
"""
48120
validate_output_format(format)
49-
client = setup_authenticated_client()
50-
51-
if server_id_or_url.startswith(("http://", "https://")):
52-
resolved_server = resolve_server(client, server_id_or_url)
121+
run_async(_list_workflow_runs_async(server_id_or_url, limit, status, format))
53122

54-
if hasattr(resolved_server, "appId"):
55-
app_id_or_config_id = resolved_server.appId
56-
elif hasattr(resolved_server, "appConfigurationId"):
57-
app_id_or_config_id = resolved_server.appConfigurationId
58-
else:
59-
raise ValueError(
60-
f"Could not extract app ID or config ID from server: {server_id_or_url}"
61-
)
62-
else:
63-
app_id_or_config_id = server_id_or_url
64-
65-
max_results = limit or 100
66-
67-
status_filter = None
68-
if status:
69-
status_map = {
70-
"running": WorkflowExecutionStatus.WORKFLOW_EXECUTION_STATUS_RUNNING,
71-
"failed": WorkflowExecutionStatus.WORKFLOW_EXECUTION_STATUS_FAILED,
72-
"timed_out": WorkflowExecutionStatus.WORKFLOW_EXECUTION_STATUS_TIMED_OUT,
73-
"timeout": WorkflowExecutionStatus.WORKFLOW_EXECUTION_STATUS_TIMED_OUT, # alias
74-
"canceled": WorkflowExecutionStatus.WORKFLOW_EXECUTION_STATUS_CANCELED,
75-
"cancelled": WorkflowExecutionStatus.WORKFLOW_EXECUTION_STATUS_CANCELED, # alias
76-
"terminated": WorkflowExecutionStatus.WORKFLOW_EXECUTION_STATUS_TERMINATED,
77-
"completed": WorkflowExecutionStatus.WORKFLOW_EXECUTION_STATUS_COMPLETED,
78-
"continued": WorkflowExecutionStatus.WORKFLOW_EXECUTION_STATUS_CONTINUED_AS_NEW,
79-
"continued_as_new": WorkflowExecutionStatus.WORKFLOW_EXECUTION_STATUS_CONTINUED_AS_NEW,
80-
}
81-
status_filter = status_map.get(status.lower())
82-
if not status_filter:
83-
valid_statuses = "running|failed|timed_out|timeout|canceled|cancelled|terminated|completed|continued|continued_as_new"
84-
raise typer.BadParameter(
85-
f"Invalid status '{status}'. Valid options: {valid_statuses}"
86-
)
87123

88-
async def list_workflows_async():
89-
return await client.list_workflows(
90-
app_id_or_config_id=app_id_or_config_id, max_results=max_results
124+
def _get_status_filter(status: str) -> str:
125+
"""Convert status string to normalized status."""
126+
status_map = {
127+
"running": "running",
128+
"failed": "failed",
129+
"timed_out": "timed_out",
130+
"timeout": "timed_out", # alias
131+
"canceled": "canceled",
132+
"cancelled": "canceled", # alias
133+
"terminated": "terminated",
134+
"completed": "completed",
135+
"continued": "continued",
136+
"continued_as_new": "continued",
137+
}
138+
normalized_status = status_map.get(status.lower())
139+
if not normalized_status:
140+
valid_statuses = "running|failed|timed_out|timeout|canceled|cancelled|terminated|completed|continued|continued_as_new"
141+
raise typer.BadParameter(
142+
f"Invalid status '{status}'. Valid options: {valid_statuses}"
91143
)
144+
return normalized_status
92145

93-
response = run_async(list_workflows_async())
94-
workflows = response.workflows or []
95-
96-
if status_filter:
97-
workflows = [w for w in workflows if w.execution_status == status_filter]
98146

99-
if format == "json":
100-
_print_workflows_json(workflows)
101-
elif format == "yaml":
102-
_print_workflows_yaml(workflows)
103-
else:
104-
_print_workflows_text(workflows, status, server_id_or_url)
147+
def _matches_status(workflow: dict, status_filter: str) -> bool:
148+
"""Check if workflow matches the status filter."""
149+
workflow_status = workflow.get("execution_status", "")
150+
if isinstance(workflow_status, str):
151+
return status_filter.lower() in workflow_status.lower()
152+
return False
105153

106154

107155
def _print_workflows_text(workflows, status_filter, server_id_or_url):
@@ -127,21 +175,48 @@ def _print_workflows_text(workflows, status_filter, server_id_or_url):
127175
table.add_column("Principal", style="dim", width=15)
128176

129177
for workflow in workflows:
130-
status_display = _get_status_display(workflow.execution_status)
131-
created_display = (
132-
workflow.created_at.strftime("%Y-%m-%d %H:%M:%S")
133-
if workflow.created_at
134-
else "N/A"
135-
)
136-
run_id_display = _truncate_string(workflow.run_id or "N/A", 15)
178+
# Handle both dict and object formats
179+
if isinstance(workflow, dict):
180+
workflow_id = workflow.get("workflow_id", "N/A")
181+
name = workflow.get("name", "N/A")
182+
execution_status = workflow.get("execution_status", "N/A")
183+
run_id = workflow.get("run_id", "N/A")
184+
created_at = workflow.get("created_at", "N/A")
185+
principal_id = workflow.get("principal_id", "N/A")
186+
else:
187+
workflow_id = getattr(workflow, "workflow_id", "N/A")
188+
name = getattr(workflow, "name", "N/A")
189+
execution_status = getattr(workflow, "execution_status", "N/A")
190+
run_id = getattr(workflow, "run_id", "N/A")
191+
created_at = getattr(workflow, "created_at", "N/A")
192+
principal_id = getattr(workflow, "principal_id", "N/A")
193+
194+
status_display = _get_status_display(execution_status)
195+
196+
# Handle created_at formatting
197+
if created_at and created_at != "N/A":
198+
if hasattr(created_at, "strftime"):
199+
created_display = created_at.strftime("%Y-%m-%d %H:%M:%S")
200+
else:
201+
# Try to parse ISO string
202+
try:
203+
from datetime import datetime
204+
dt = datetime.fromisoformat(str(created_at).replace("Z", "+00:00"))
205+
created_display = dt.strftime("%Y-%m-%d %H:%M:%S")
206+
except (ValueError, TypeError):
207+
created_display = str(created_at)
208+
else:
209+
created_display = "N/A"
210+
211+
run_id_display = _truncate_string(str(run_id) if run_id else "N/A", 15)
137212

138213
table.add_row(
139-
_truncate_string(workflow.workflow_id, 20),
140-
_truncate_string(workflow.name, 20),
214+
_truncate_string(str(workflow_id) if workflow_id else "N/A", 20),
215+
_truncate_string(str(name) if name else "N/A", 20),
141216
status_display,
142217
run_id_display,
143218
created_display,
144-
_truncate_string(workflow.principal_id, 15),
219+
_truncate_string(str(principal_id) if principal_id else "N/A", 15),
145220
)
146221

147222
console.print(table)
@@ -163,16 +238,19 @@ def _print_workflows_yaml(workflows):
163238

164239

165240
def _workflow_to_dict(workflow):
166-
"""Convert WorkflowInfo to dictionary."""
241+
"""Convert workflow dict to standardized dictionary format."""
242+
# If it's already a dict, just return it
243+
if isinstance(workflow, dict):
244+
return workflow
245+
246+
# Otherwise convert from object attributes
167247
return {
168-
"workflow_id": workflow.workflow_id,
169-
"run_id": workflow.run_id,
170-
"name": workflow.name,
171-
"created_at": workflow.created_at.isoformat() if workflow.created_at else None,
172-
"principal_id": workflow.principal_id,
173-
"execution_status": workflow.execution_status.value
174-
if workflow.execution_status
175-
else None,
248+
"workflow_id": getattr(workflow, "workflow_id", None),
249+
"run_id": getattr(workflow, "run_id", None),
250+
"name": getattr(workflow, "name", None),
251+
"created_at": getattr(workflow, "created_at", None).isoformat() if getattr(workflow, "created_at", None) else None,
252+
"principal_id": getattr(workflow, "principal_id", None),
253+
"execution_status": getattr(workflow, "execution_status", None).value if getattr(workflow, "execution_status", None) else None,
176254
}
177255

178256

@@ -184,18 +262,26 @@ def _truncate_string(text: str, max_length: int) -> str:
184262

185263

186264
def _get_status_display(status):
187-
"""Convert WorkflowExecutionStatus to display string with emoji."""
265+
"""Convert status to display string with emoji."""
188266
if not status:
189267
return "❓ Unknown"
190-
191-
status_map = {
192-
WorkflowExecutionStatus.WORKFLOW_EXECUTION_STATUS_RUNNING: "[green]🟢 Running[/green]",
193-
WorkflowExecutionStatus.WORKFLOW_EXECUTION_STATUS_COMPLETED: "[blue]✅ Completed[/blue]",
194-
WorkflowExecutionStatus.WORKFLOW_EXECUTION_STATUS_FAILED: "[red]❌ Failed[/red]",
195-
WorkflowExecutionStatus.WORKFLOW_EXECUTION_STATUS_CANCELED: "[yellow]🟡 Canceled[/yellow]",
196-
WorkflowExecutionStatus.WORKFLOW_EXECUTION_STATUS_TERMINATED: "[red]🔴 Terminated[/red]",
197-
WorkflowExecutionStatus.WORKFLOW_EXECUTION_STATUS_TIMED_OUT: "[orange]⏰ Timed Out[/orange]",
198-
WorkflowExecutionStatus.WORKFLOW_EXECUTION_STATUS_CONTINUED_AS_NEW: "[purple]🔄 Continued[/purple]",
199-
}
200-
201-
return status_map.get(status, "❓ Unknown")
268+
269+
# Convert to string for comparison
270+
status_str = str(status).lower()
271+
272+
if "running" in status_str:
273+
return "[green]🟢 Running[/green]"
274+
elif "completed" in status_str:
275+
return "[blue]✅ Completed[/blue]"
276+
elif "failed" in status_str or "error" in status_str:
277+
return "[red]❌ Failed[/red]"
278+
elif "cancel" in status_str:
279+
return "[yellow]🟡 Canceled[/yellow]"
280+
elif "terminat" in status_str:
281+
return "[red]🔴 Terminated[/red]"
282+
elif "timeout" in status_str or "timed_out" in status_str:
283+
return "[orange]⏰ Timed Out[/orange]"
284+
elif "continued" in status_str:
285+
return "[purple]🔄 Continued[/purple]"
286+
else:
287+
return f"❓ {status}"

0 commit comments

Comments
 (0)