Skip to content

Commit 7bc0e81

Browse files
committed
manage workflow state through MCP
1 parent e00d191 commit 7bc0e81

File tree

9 files changed

+145
-346
lines changed

9 files changed

+145
-346
lines changed
Lines changed: 0 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,13 +1,11 @@
11
"""MCP Agent Cloud workflows commands."""
22

33
from .describe import describe_workflow
4-
from .suspend import suspend_workflow
54
from .resume import resume_workflow
65
from .cancel import cancel_workflow
76

87
__all__ = [
98
"describe_workflow",
10-
"suspend_workflow",
119
"resume_workflow",
1210
"cancel_workflow",
1311
]
Lines changed: 35 additions & 50 deletions
Original file line numberDiff line numberDiff line change
@@ -1,16 +1,46 @@
11
"""Workflow cancel command implementation."""
22

3+
import json
34
from typing import Optional
45

56
import typer
67

7-
from mcp_agent.cli.auth import load_api_key_credentials
8-
from mcp_agent.cli.core.api_client import UnauthenticatedError
9-
from mcp_agent.cli.core.constants import DEFAULT_API_BASE_URL
8+
from mcp_agent.app import MCPApp
109
from mcp_agent.cli.core.utils import run_async
1110
from mcp_agent.cli.exceptions import CLIError
1211
from mcp_agent.cli.utils.ux import console
13-
from mcp_agent.cli.workflows.api_client import WorkflowAPIClient
12+
from mcp_agent.mcp.gen_client import gen_client
13+
14+
15+
async def _cancel_workflow_async(
16+
run_id: str,
17+
reason: Optional[str] = None
18+
) -> None:
19+
"""Cancel a workflow using MCP tool calls."""
20+
# Create a temporary MCP app to connect to temporal server
21+
app = MCPApp(name="workflows_cli")
22+
23+
try:
24+
async with app.run() as workflow_app:
25+
async with gen_client("temporal", server_registry=workflow_app.context.server_registry) as client:
26+
tool_params = {"run_id": run_id}
27+
28+
result = await client.call_tool("workflows-cancel", tool_params)
29+
30+
success = result.content[0].text if result.content else False
31+
if isinstance(success, str):
32+
success = success.lower() == 'true'
33+
34+
if success:
35+
console.print(f"[yellow]⚠[/yellow] Successfully cancelled workflow")
36+
console.print(f" Run ID: [cyan]{run_id}[/cyan]")
37+
if reason:
38+
console.print(f" Reason: [dim]{reason}[/dim]")
39+
else:
40+
raise CLIError(f"Failed to cancel workflow with run ID {run_id}")
41+
42+
except Exception as e:
43+
raise CLIError(f"Error cancelling workflow with run ID {run_id}: {str(e)}") from e
1444

1545

1646
def cancel_workflow(
@@ -26,49 +56,4 @@ def cancel_workflow(
2656
mcp-agent cloud workflows cancel run_abc123
2757
mcp-agent cloud workflows cancel run_abc123 --reason "User requested cancellation"
2858
"""
29-
effective_api_key = load_api_key_credentials()
30-
if not effective_api_key:
31-
raise CLIError(
32-
"Must be logged in to cancel workflow. Run 'mcp-agent login' or set MCP_API_KEY environment variable."
33-
)
34-
35-
client = WorkflowAPIClient(api_url=DEFAULT_API_BASE_URL, api_key=effective_api_key)
36-
37-
try:
38-
workflow_info = run_async(client.cancel_workflow(workflow_id=run_id, reason=reason))
39-
40-
console.print(f"[yellow]⚠[/yellow] Successfully cancelled workflow")
41-
console.print(f" Workflow ID: [cyan]{workflow_info.workflowId}[/cyan]")
42-
console.print(f" Run ID: [cyan]{workflow_info.runId or 'N/A'}[/cyan]")
43-
console.print(f" Status: [cyan]{_execution_status_text(workflow_info.executionStatus)}[/cyan]")
44-
45-
if reason:
46-
console.print(f" Reason: [dim]{reason}[/dim]")
47-
48-
except UnauthenticatedError as e:
49-
raise CLIError(
50-
"Authentication failed. Try running 'mcp-agent login'"
51-
) from e
52-
except Exception as e:
53-
raise CLIError(f"Error cancelling workflow with run ID {run_id}: {str(e)}") from e
54-
55-
56-
def _execution_status_text(status: str | None) -> str:
57-
"""Format the execution status text."""
58-
match status:
59-
case "WORKFLOW_EXECUTION_STATUS_RUNNING":
60-
return "🔄 Running"
61-
case "WORKFLOW_EXECUTION_STATUS_FAILED":
62-
return "❌ Failed"
63-
case "WORKFLOW_EXECUTION_STATUS_TIMED_OUT":
64-
return "⌛ Timed Out"
65-
case "WORKFLOW_EXECUTION_STATUS_CANCELED":
66-
return "🚫 Cancelled"
67-
case "WORKFLOW_EXECUTION_STATUS_TERMINATED":
68-
return "🛑 Terminated"
69-
case "WORKFLOW_EXECUTION_STATUS_COMPLETED":
70-
return "✅ Completed"
71-
case "WORKFLOW_EXECUTION_STATUS_CONTINUED_AS_NEW":
72-
return "🔁 Continued as New"
73-
case _:
74-
return "❓ Unknown"
59+
run_async(_cancel_workflow_async(run_id, reason))

src/mcp_agent/cli/cloud/commands/workflows/describe/main.py

Lines changed: 76 additions & 71 deletions
Original file line numberDiff line numberDiff line change
@@ -7,13 +7,44 @@
77
import yaml
88
from rich.panel import Panel
99

10-
from mcp_agent.cli.auth import load_api_key_credentials
11-
from mcp_agent.cli.core.api_client import UnauthenticatedError
12-
from mcp_agent.cli.core.constants import DEFAULT_API_BASE_URL
10+
from mcp_agent.app import MCPApp
1311
from mcp_agent.cli.core.utils import run_async
1412
from mcp_agent.cli.exceptions import CLIError
1513
from mcp_agent.cli.utils.ux import console
16-
from mcp_agent.cli.workflows.api_client import WorkflowAPIClient, WorkflowInfo
14+
from mcp_agent.mcp.gen_client import gen_client
15+
16+
17+
async def _describe_workflow_async(
18+
run_id: str,
19+
format: str = "text"
20+
) -> None:
21+
"""Describe a workflow using MCP tool calls."""
22+
# Create a temporary MCP app to connect to temporal server
23+
app = MCPApp(name="workflows_cli")
24+
25+
try:
26+
async with app.run() as workflow_app:
27+
async with gen_client("temporal", server_registry=workflow_app.context.server_registry) as client:
28+
result = await client.call_tool("workflows-get_status", {
29+
"run_id": run_id
30+
})
31+
32+
workflow_status = result.content[0].text if result.content else {}
33+
if isinstance(workflow_status, str):
34+
workflow_status = json.loads(workflow_status)
35+
36+
if not workflow_status:
37+
raise CLIError(f"Workflow with run ID '{run_id}' not found.")
38+
39+
if format == "json":
40+
print(json.dumps(workflow_status, indent=2))
41+
elif format == "yaml":
42+
print(yaml.dump(workflow_status, default_flow_style=False))
43+
else: # text format
44+
print_workflow_status(workflow_status)
45+
46+
except Exception as e:
47+
raise CLIError(f"Error describing workflow with run ID {run_id}: {str(e)}") from e
1748

1849

1950
def describe_workflow(
@@ -25,86 +56,60 @@ def describe_workflow(
2556
Shows detailed information about a workflow execution including its current status,
2657
creation time, and other metadata.
2758
"""
28-
# Validate format
2959
if format not in ["text", "json", "yaml"]:
3060
console.print("[red]Error: --format must be 'text', 'json', or 'yaml'[/red]")
3161
raise typer.Exit(6)
3262

33-
effective_api_key = load_api_key_credentials()
34-
if not effective_api_key:
35-
raise CLIError(
36-
"Must be logged in to describe workflow. Run 'mcp-agent login' or set MCP_API_KEY environment variable."
37-
)
38-
39-
client = WorkflowAPIClient(api_url=DEFAULT_API_BASE_URL, api_key=effective_api_key)
40-
41-
try:
42-
workflow_info = run_async(client.get_workflow(run_id))
63+
run_async(_describe_workflow_async(run_id, format))
4364

44-
if not workflow_info:
45-
raise CLIError(f"Workflow with run ID '{run_id}' not found.")
4665

47-
if format == "json":
48-
print(json.dumps(_workflow_to_dict(workflow_info), indent=2))
49-
elif format == "yaml":
50-
print(yaml.dump(_workflow_to_dict(workflow_info), default_flow_style=False))
51-
else: # text format
52-
print_workflow_info(workflow_info)
53-
54-
except UnauthenticatedError as e:
55-
raise CLIError(
56-
"Authentication failed. Try running 'mcp-agent login'"
57-
) from e
58-
except Exception as e:
59-
raise CLIError(f"Error describing workflow with run ID {run_id}: {str(e)}") from e
60-
61-
62-
def print_workflow_info(workflow_info: WorkflowInfo) -> None:
63-
"""Print workflow information in text format."""
66+
def print_workflow_status(workflow_status: dict) -> None:
67+
"""Print workflow status information in text format."""
68+
name = workflow_status.get("name", "N/A")
69+
workflow_id = workflow_status.get("workflow_id", workflow_status.get("workflowId", "N/A"))
70+
run_id = workflow_status.get("run_id", workflow_status.get("runId", "N/A"))
71+
status = workflow_status.get("status", "N/A")
72+
73+
created_at = workflow_status.get("created_at", workflow_status.get("createdAt", "N/A"))
74+
if created_at != "N/A" and isinstance(created_at, str):
75+
try:
76+
from datetime import datetime
77+
created_dt = datetime.fromisoformat(created_at.replace('Z', '+00:00'))
78+
created_at = created_dt.strftime('%Y-%m-%d %H:%M:%S')
79+
except:
80+
pass # Keep original format if parsing fails
81+
6482
console.print(
6583
Panel(
66-
f"Name: [cyan]{workflow_info.name}[/cyan]\n"
67-
f"Workflow ID: [cyan]{workflow_info.workflowId}[/cyan]\n"
68-
f"Run ID: [cyan]{workflow_info.runId or 'N/A'}[/cyan]\n"
69-
f"Created: [cyan]{workflow_info.createdAt.strftime('%Y-%m-%d %H:%M:%S')}[/cyan]\n"
70-
f"Creator: [cyan]{workflow_info.principalId}[/cyan]\n"
71-
f"Status: [cyan]{_execution_status_text(workflow_info.executionStatus)}[/cyan]",
84+
f"Name: [cyan]{name}[/cyan]\n"
85+
f"Workflow ID: [cyan]{workflow_id}[/cyan]\n"
86+
f"Run ID: [cyan]{run_id}[/cyan]\n"
87+
f"Created: [cyan]{created_at}[/cyan]\n"
88+
f"Status: [cyan]{_format_status(status)}[/cyan]",
7289
title="Workflow",
7390
border_style="blue",
7491
expand=False,
7592
)
7693
)
7794

7895

79-
def _workflow_to_dict(workflow_info: WorkflowInfo) -> dict:
80-
"""Convert workflow info to dictionary for JSON/YAML output."""
81-
return {
82-
"name": workflow_info.name,
83-
"workflowId": workflow_info.workflowId,
84-
"runId": workflow_info.runId,
85-
"createdAt": workflow_info.createdAt.isoformat(),
86-
"creator": workflow_info.principalId,
87-
"executionStatus": workflow_info.executionStatus,
88-
"status": _execution_status_text(workflow_info.executionStatus),
89-
}
90-
91-
92-
def _execution_status_text(status: Optional[str]) -> str:
96+
def _format_status(status: str) -> str:
9397
"""Format the execution status text."""
94-
match status:
95-
case "WORKFLOW_EXECUTION_STATUS_RUNNING":
96-
return "🔄 Running"
97-
case "WORKFLOW_EXECUTION_STATUS_FAILED":
98-
return "❌ Failed"
99-
case "WORKFLOW_EXECUTION_STATUS_TIMED_OUT":
100-
return "⌛ Timed Out"
101-
case "WORKFLOW_EXECUTION_STATUS_CANCELED":
102-
return "🚫 Cancelled"
103-
case "WORKFLOW_EXECUTION_STATUS_TERMINATED":
104-
return "🛑 Terminated"
105-
case "WORKFLOW_EXECUTION_STATUS_COMPLETED":
106-
return "✅ Completed"
107-
case "WORKFLOW_EXECUTION_STATUS_CONTINUED_AS_NEW":
108-
return "🔁 Continued as New"
109-
case _:
110-
return "❓ Unknown"
98+
status_lower = str(status).lower()
99+
100+
if "running" in status_lower:
101+
return "🔄 Running"
102+
elif "failed" in status_lower or "error" in status_lower:
103+
return "❌ Failed"
104+
elif "timeout" in status_lower or "timed_out" in status_lower:
105+
return "⌛ Timed Out"
106+
elif "cancel" in status_lower:
107+
return "🚫 Cancelled"
108+
elif "terminat" in status_lower:
109+
return "🛑 Terminated"
110+
elif "complet" in status_lower:
111+
return "✅ Completed"
112+
elif "continued" in status_lower:
113+
return "🔁 Continued as New"
114+
else:
115+
return f"❓ {status}"

0 commit comments

Comments
 (0)