Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
254 changes: 167 additions & 87 deletions src/mcp_agent/cli/cloud/commands/workflows/runs/main.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,18 +7,89 @@
import yaml
from rich.table import Table

from mcp_agent.app import MCPApp
from mcp_agent.cli.core.utils import run_async
from mcp_agent.cli.mcp_app.api_client import WorkflowExecutionStatus
from mcp_agent.cli.exceptions import CLIError
from mcp_agent.config import MCPServerSettings, Settings, LoggerSettings
from mcp_agent.mcp.gen_client import gen_client
from ...utils import (
setup_authenticated_client,
validate_output_format,
handle_server_api_errors,
resolve_server,
)
from mcp_agent.cli.utils.ux import console, print_info


@handle_server_api_errors
async def _list_workflow_runs_async(
server_id_or_url: str, limit: Optional[int], status: Optional[str], format: str
) -> None:
"""List workflow runs using MCP tool calls to a deployed server."""
if server_id_or_url.startswith(("http://", "https://")):
server_url = server_id_or_url
else:
client = setup_authenticated_client()
server = resolve_server(client, server_id_or_url)

if hasattr(server, "appServerInfo") and server.appServerInfo:
server_url = server.appServerInfo.serverUrl
else:
raise CLIError(
f"Server '{server_id_or_url}' is not deployed or has no server URL"
)

if not server_url:
raise CLIError(f"No server URL found for server '{server_id_or_url}'")

quiet_settings = Settings(logger=LoggerSettings(level="error"))
app = MCPApp(name="workflows_cli", settings=quiet_settings)

try:
async with app.run() as workflow_app:
context = workflow_app.context

sse_url = (
f"{server_url}/sse" if not server_url.endswith("/sse") else server_url
)
context.server_registry.registry["workflow_server"] = MCPServerSettings(
name="workflow_server",
description=f"Deployed MCP server {server_url}",
url=sse_url,
transport="sse",
)

async with gen_client(
"workflow_server", server_registry=context.server_registry
) as client:
result = await client.call_tool("workflows-runs-list", {})

workflows_data = result.content[0].text if result.content else []
if isinstance(workflows_data, str):
workflows_data = json.loads(workflows_data)

if not workflows_data:
workflows_data = []

workflows = workflows_data
if status:
status_filter = _get_status_filter(status)
workflows = [w for w in workflows if _matches_status(w, status_filter)]

if limit:
workflows = workflows[:limit]

if format == "json":
_print_workflows_json(workflows)
elif format == "yaml":
_print_workflows_yaml(workflows)
else:
_print_workflows_text(workflows, status, server_id_or_url)

except Exception as e:
raise CLIError(
f"Error listing workflow runs for server {server_id_or_url}: {str(e)}"
) from e


def list_workflow_runs(
server_id_or_url: str = typer.Argument(
..., help="Server ID, app config ID, or server URL to list workflow runs for"
Expand Down Expand Up @@ -46,62 +117,44 @@ def list_workflow_runs(
mcp-agent cloud workflows runs apcnf_xyz789 --limit 10 --format json
"""
validate_output_format(format)
client = setup_authenticated_client()
run_async(_list_workflow_runs_async(server_id_or_url, limit, status, format))

if server_id_or_url.startswith(("http://", "https://")):
resolved_server = resolve_server(client, server_id_or_url)

if hasattr(resolved_server, "appId"):
app_id_or_config_id = resolved_server.appId
elif hasattr(resolved_server, "appConfigurationId"):
app_id_or_config_id = resolved_server.appConfigurationId
else:
raise ValueError(
f"Could not extract app ID or config ID from server: {server_id_or_url}"
)
else:
app_id_or_config_id = server_id_or_url

max_results = limit or 100

status_filter = None
if status:
status_map = {
"running": WorkflowExecutionStatus.WORKFLOW_EXECUTION_STATUS_RUNNING,
"failed": WorkflowExecutionStatus.WORKFLOW_EXECUTION_STATUS_FAILED,
"timed_out": WorkflowExecutionStatus.WORKFLOW_EXECUTION_STATUS_TIMED_OUT,
"timeout": WorkflowExecutionStatus.WORKFLOW_EXECUTION_STATUS_TIMED_OUT, # alias
"canceled": WorkflowExecutionStatus.WORKFLOW_EXECUTION_STATUS_CANCELED,
"cancelled": WorkflowExecutionStatus.WORKFLOW_EXECUTION_STATUS_CANCELED, # alias
"terminated": WorkflowExecutionStatus.WORKFLOW_EXECUTION_STATUS_TERMINATED,
"completed": WorkflowExecutionStatus.WORKFLOW_EXECUTION_STATUS_COMPLETED,
"continued": WorkflowExecutionStatus.WORKFLOW_EXECUTION_STATUS_CONTINUED_AS_NEW,
"continued_as_new": WorkflowExecutionStatus.WORKFLOW_EXECUTION_STATUS_CONTINUED_AS_NEW,
}
status_filter = status_map.get(status.lower())
if not status_filter:
valid_statuses = "running|failed|timed_out|timeout|canceled|cancelled|terminated|completed|continued|continued_as_new"
raise typer.BadParameter(
f"Invalid status '{status}'. Valid options: {valid_statuses}"
)

async def list_workflows_async():
return await client.list_workflows(
app_id_or_config_id=app_id_or_config_id, max_results=max_results
def _get_status_filter(status: str) -> str:
"""Convert status string to normalized status."""
status_map = {
"running": "running",
"failed": "failed",
"timed_out": "timed_out",
"timeout": "timed_out", # alias
"canceled": "canceled",
"cancelled": "canceled", # alias
"terminated": "terminated",
"completed": "completed",
"continued": "continued",
"continued_as_new": "continued",
}
Comment on lines +123 to +136
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Looks like our WorkflowExecutionStatus got nuked? Can you restore?

normalized_status = status_map.get(status.lower())
if not normalized_status:
valid_statuses = "running|failed|timed_out|timeout|canceled|cancelled|terminated|completed|continued|continued_as_new"
raise typer.BadParameter(
f"Invalid status '{status}'. Valid options: {valid_statuses}"
)
return normalized_status

response = run_async(list_workflows_async())
workflows = response.workflows or []

if status_filter:
workflows = [w for w in workflows if w.execution_status == status_filter]

if format == "json":
_print_workflows_json(workflows)
elif format == "yaml":
_print_workflows_yaml(workflows)
else:
_print_workflows_text(workflows, status, server_id_or_url)
def _matches_status(workflow: dict, status_filter: str) -> bool:
"""Check if workflow matches the status filter.

Note: We use string-based matching instead of protobuf enum values because
the MCP tool response format returns status as strings, not enum objects.
This approach is more flexible and doesn't require maintaining sync with
the protobuf definitions.
"""
workflow_status = workflow.get("execution_status", "")
if isinstance(workflow_status, str):
return status_filter.lower() in workflow_status.lower()
return False
Comment on lines +154 to +157
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The current status matching logic has a potential issue with false positives. Using status_filter.lower() in workflow_status.lower() performs substring matching, which means a status filter of running would incorrectly match statuses like not_running_anymore.

Consider using exact matching instead:

# More precise matching options:
return workflow_status.lower() == status_filter.lower()

# Or if matching against enum-style values:
return workflow_status.lower().endswith(status_filter.lower())

This would ensure that only workflows with the exact requested status are returned to the user.

Suggested change
workflow_status = workflow.get("execution_status", "")
if isinstance(workflow_status, str):
return status_filter.lower() in workflow_status.lower()
return False
workflow_status = workflow.get("execution_status", "")
if isinstance(workflow_status, str):
return workflow_status.lower() == status_filter.lower()
return False

Spotted by Diamond

Fix in Graphite


Is this helpful? React 👍 or 👎 to let us know.



def _print_workflows_text(workflows, status_filter, server_id_or_url):
Expand All @@ -124,24 +177,44 @@ def _print_workflows_text(workflows, status_filter, server_id_or_url):
table.add_column("Status", style="yellow", width=15)
table.add_column("Run ID", style="dim", width=15)
table.add_column("Created", style="dim", width=20)
table.add_column("Principal", style="dim", width=15)

for workflow in workflows:
status_display = _get_status_display(workflow.execution_status)
created_display = (
workflow.created_at.strftime("%Y-%m-%d %H:%M:%S")
if workflow.created_at
else "N/A"
)
run_id_display = _truncate_string(workflow.run_id or "N/A", 15)
if isinstance(workflow, dict):
workflow_id = workflow.get("workflow_id", "N/A")
name = workflow.get("name", "N/A")
execution_status = workflow.get("execution_status", "N/A")
run_id = workflow.get("run_id", "N/A")
created_at = workflow.get("created_at", "N/A")
else:
workflow_id = getattr(workflow, "workflow_id", "N/A")
name = getattr(workflow, "name", "N/A")
execution_status = getattr(workflow, "execution_status", "N/A")
run_id = getattr(workflow, "run_id", "N/A")
created_at = getattr(workflow, "created_at", "N/A")

status_display = _get_status_display(execution_status)

if created_at and created_at != "N/A":
if hasattr(created_at, "strftime"):
created_display = created_at.strftime("%Y-%m-%d %H:%M:%S")
else:
try:
from datetime import datetime
dt = datetime.fromisoformat(str(created_at).replace("Z", "+00:00"))
created_display = dt.strftime("%Y-%m-%d %H:%M:%S")
except (ValueError, TypeError):
created_display = str(created_at)
else:
created_display = "N/A"

run_id_display = _truncate_string(str(run_id) if run_id else "N/A", 15)

table.add_row(
_truncate_string(workflow.workflow_id, 20),
_truncate_string(workflow.name, 20),
_truncate_string(str(workflow_id) if workflow_id else "N/A", 20),
_truncate_string(str(name) if name else "N/A", 20),
status_display,
run_id_display,
created_display,
_truncate_string(workflow.principal_id, 15),
)

console.print(table)
Expand All @@ -163,16 +236,16 @@ def _print_workflows_yaml(workflows):


def _workflow_to_dict(workflow):
"""Convert WorkflowInfo to dictionary."""
"""Convert workflow dict to standardized dictionary format."""
if isinstance(workflow, dict):
return workflow

return {
"workflow_id": workflow.workflow_id,
"run_id": workflow.run_id,
"name": workflow.name,
"created_at": workflow.created_at.isoformat() if workflow.created_at else None,
"principal_id": workflow.principal_id,
"execution_status": workflow.execution_status.value
if workflow.execution_status
else None,
"workflow_id": getattr(workflow, "workflow_id", None),
"run_id": getattr(workflow, "run_id", None),
"name": getattr(workflow, "name", None),
"created_at": getattr(workflow, "created_at", None).isoformat() if getattr(workflow, "created_at", None) else None,
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This line contains a potential bug due to redundant attribute access. The expression calls getattr(workflow, "created_at", None) twice, which is inefficient and could cause errors if the first call returns a falsy value but the attribute doesn't exist.

Consider refactoring to:

created_at_value = getattr(workflow, "created_at", None)
"created_at": created_at_value.isoformat() if created_at_value and hasattr(created_at_value, 'isoformat') else None,

This approach safely handles all cases: when the attribute doesn't exist, when it exists but is None/falsy, and when it exists and has an isoformat method.

Suggested change
"created_at": getattr(workflow, "created_at", None).isoformat() if getattr(workflow, "created_at", None) else None,
"created_at": created_at_value.isoformat() if (created_at_value := getattr(workflow, "created_at", None)) and hasattr(created_at_value, 'isoformat') else None,

Spotted by Diamond

Fix in Graphite


Is this helpful? React 👍 or 👎 to let us know.

"execution_status": getattr(workflow, "execution_status", None).value if getattr(workflow, "execution_status", None) else None,
}


Expand All @@ -184,18 +257,25 @@ def _truncate_string(text: str, max_length: int) -> str:


def _get_status_display(status):
"""Convert WorkflowExecutionStatus to display string with emoji."""
"""Convert status to display string with emoji."""
if not status:
return "❓ Unknown"

status_map = {
WorkflowExecutionStatus.WORKFLOW_EXECUTION_STATUS_RUNNING: "[green]🟢 Running[/green]",
WorkflowExecutionStatus.WORKFLOW_EXECUTION_STATUS_COMPLETED: "[blue]✅ Completed[/blue]",
WorkflowExecutionStatus.WORKFLOW_EXECUTION_STATUS_FAILED: "[red]❌ Failed[/red]",
WorkflowExecutionStatus.WORKFLOW_EXECUTION_STATUS_CANCELED: "[yellow]🟡 Canceled[/yellow]",
WorkflowExecutionStatus.WORKFLOW_EXECUTION_STATUS_TERMINATED: "[red]🔴 Terminated[/red]",
WorkflowExecutionStatus.WORKFLOW_EXECUTION_STATUS_TIMED_OUT: "[orange]⏰ Timed Out[/orange]",
WorkflowExecutionStatus.WORKFLOW_EXECUTION_STATUS_CONTINUED_AS_NEW: "[purple]🔄 Continued[/purple]",
}

return status_map.get(status, "❓ Unknown")

status_str = str(status).lower()

if "running" in status_str:
return "[green]🟢 Running[/green]"
elif "completed" in status_str:
return "[blue]✅ Completed[/blue]"
elif "failed" in status_str or "error" in status_str:
return "[red]❌ Failed[/red]"
elif "cancel" in status_str:
return "[yellow]🟡 Canceled[/yellow]"
elif "terminat" in status_str:
return "[red]🔴 Terminated[/red]"
elif "timeout" in status_str or "timed_out" in status_str:
return "[orange]⏰ Timed Out[/orange]"
elif "continued" in status_str:
return "[purple]🔄 Continued[/purple]"
else:
return f"❓ {status}"
Loading
Loading