Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions src/mcp_agent/cli/cloud/commands/servers/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,9 +3,11 @@
from .list.main import list_servers
from .describe.main import describe_server
from .delete.main import delete_server
from .workflows.main import list_workflows_for_server

__all__ = [
"list_servers",
"describe_server",
"delete_server",
"list_workflows_for_server",
]
5 changes: 1 addition & 4 deletions src/mcp_agent/cli/cloud/commands/servers/list/main.py
Original file line number Diff line number Diff line change
Expand Up @@ -28,16 +28,13 @@ def list_servers(
"""List MCP Servers with optional filtering and sorting.

Examples:
# Filter servers containing 'api'

mcp-agent cloud servers list --filter api

# Sort by creation date (newest first)
mcp-agent cloud servers list --sort-by -created

# Filter active servers and sort by name
mcp-agent cloud servers list --filter active --sort-by name

# Get JSON output with filtering
mcp-agent cloud servers list --filter production --format json
"""
validate_output_format(format)
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,7 @@
"""Workflows subcommand for servers."""

from .main import list_workflows_for_server

__all__ = [
"list_workflows_for_server",
]
180 changes: 180 additions & 0 deletions src/mcp_agent/cli/cloud/commands/servers/workflows/main.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,180 @@
"""Server workflows command implementation."""

import json
from typing import Optional

import typer
import yaml
from rich.table import Table

from mcp_agent.cli.core.utils import run_async
from mcp_agent.cli.mcp_app.api_client import WorkflowExecutionStatus
from ...utils import (
setup_authenticated_client,
validate_output_format,
handle_server_api_errors,
resolve_server,
)
from mcp_agent.cli.utils.ux import console, print_info


@handle_server_api_errors
def list_workflows_for_server(
server_id_or_url: str = typer.Argument(..., help="Server ID, app config ID, or server URL to list workflows for"),
limit: Optional[int] = typer.Option(None, "--limit", help="Maximum number of results to return"),
status: Optional[str] = typer.Option(None, "--status", help="Filter by status: running|failed|timed_out|canceled|terminated|completed|continued"),
format: Optional[str] = typer.Option("text", "--format", help="Output format (text|json|yaml)"),
) -> None:
"""List workflows for an MCP Server.
Examples:
mcp-agent cloud servers workflows app_abc123
mcp-agent cloud servers workflows https://server.example.com --status running
mcp-agent cloud servers workflows apcnf_xyz789 --limit 10 --format json
"""
validate_output_format(format)
client = setup_authenticated_client()

if server_id_or_url.startswith(('http://', 'https://')):
resolved_server = resolve_server(client, server_id_or_url)

if hasattr(resolved_server, 'appId'):
app_id_or_config_id = resolved_server.appId
elif hasattr(resolved_server, 'appConfigurationId'):
app_id_or_config_id = resolved_server.appConfigurationId
else:
raise ValueError(f"Could not extract app ID or config ID from server: {server_id_or_url}")
else:
app_id_or_config_id = server_id_or_url

max_results = limit or 100

status_filter = None
if status:
status_map = {
"running": WorkflowExecutionStatus.WORKFLOW_EXECUTION_STATUS_RUNNING,
"failed": WorkflowExecutionStatus.WORKFLOW_EXECUTION_STATUS_FAILED,
"timed_out": WorkflowExecutionStatus.WORKFLOW_EXECUTION_STATUS_TIMED_OUT,
"timeout": WorkflowExecutionStatus.WORKFLOW_EXECUTION_STATUS_TIMED_OUT, # alias
"canceled": WorkflowExecutionStatus.WORKFLOW_EXECUTION_STATUS_CANCELED,
"cancelled": WorkflowExecutionStatus.WORKFLOW_EXECUTION_STATUS_CANCELED, # alias
"terminated": WorkflowExecutionStatus.WORKFLOW_EXECUTION_STATUS_TERMINATED,
"completed": WorkflowExecutionStatus.WORKFLOW_EXECUTION_STATUS_COMPLETED,
"continued": WorkflowExecutionStatus.WORKFLOW_EXECUTION_STATUS_CONTINUED_AS_NEW,
"continued_as_new": WorkflowExecutionStatus.WORKFLOW_EXECUTION_STATUS_CONTINUED_AS_NEW,
}
status_filter = status_map.get(status.lower())
if not status_filter:
valid_statuses = "running|failed|timed_out|timeout|canceled|cancelled|terminated|completed|continued|continued_as_new"
raise typer.BadParameter(f"Invalid status '{status}'. Valid options: {valid_statuses}")

async def list_workflows_async():
return await client.list_workflows(
app_id_or_config_id=app_id_or_config_id,
max_results=max_results
)

response = run_async(list_workflows_async())
workflows = response.workflows or []

if status_filter:
workflows = [w for w in workflows if w.execution_status == status_filter]
Comment on lines +83 to +84
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The status filtering is currently applied client-side after fetching up to max_results workflows from the API. This approach may miss relevant workflows if there are more matching workflows than the limit allows.

Consider modifying the API client to pass the status filter directly to the API endpoint, which would:

  1. Reduce unnecessary data transfer
  2. Ensure all matching workflows are considered before pagination limits are applied
  3. Leverage server-side filtering capabilities

If the API doesn't support status filtering, consider increasing the max_results value when a status filter is applied to reduce the chance of missing relevant workflows.

Spotted by Diamond

Fix in Graphite


Is this helpful? React 👍 or 👎 to let us know.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.


if format == "json":
_print_workflows_json(workflows)
elif format == "yaml":
_print_workflows_yaml(workflows)
else:
_print_workflows_text(workflows, status, server_id_or_url)


def _print_workflows_text(workflows, status_filter, server_id_or_url):
"""Print workflows in text format."""
server_name = server_id_or_url

console.print(f"\n[bold blue]📊 Workflows for Server: {server_name}[/bold blue]")

if not workflows:
print_info("No workflows found for this server.")
return

console.print(f"\nFound {len(workflows)} workflow(s):")

table = Table(show_header=True, header_style="bold blue")
table.add_column("Workflow ID", style="cyan", width=20)
table.add_column("Name", style="green", width=20)
table.add_column("Status", style="yellow", width=15)
table.add_column("Run ID", style="dim", width=15)
table.add_column("Created", style="dim", width=20)
table.add_column("Principal", style="dim", width=15)

for workflow in workflows:
status_display = _get_status_display(workflow.execution_status)
created_display = workflow.created_at.strftime('%Y-%m-%d %H:%M:%S') if workflow.created_at else "N/A"
run_id_display = _truncate_string(workflow.run_id or "N/A", 15)

table.add_row(
_truncate_string(workflow.workflow_id, 20),
_truncate_string(workflow.name, 20),
status_display,
run_id_display,
created_display,
_truncate_string(workflow.principal_id, 15),
)

console.print(table)

if status_filter:
console.print(f"\n[dim]Filtered by status: {status_filter}[/dim]")


def _print_workflows_json(workflows):
"""Print workflows in JSON format."""
workflows_data = [_workflow_to_dict(workflow) for workflow in workflows]
print(json.dumps({"workflows": workflows_data}, indent=2, default=str))


def _print_workflows_yaml(workflows):
"""Print workflows in YAML format."""
workflows_data = [_workflow_to_dict(workflow) for workflow in workflows]
print(yaml.dump({"workflows": workflows_data}, default_flow_style=False))


def _workflow_to_dict(workflow):
"""Convert WorkflowInfo to dictionary."""
return {
"workflow_id": workflow.workflow_id,
"run_id": workflow.run_id,
"name": workflow.name,
"created_at": workflow.created_at.isoformat() if workflow.created_at else None,
"principal_id": workflow.principal_id,
"execution_status": workflow.execution_status.value if workflow.execution_status else None,
}


def _truncate_string(text: str, max_length: int) -> str:
"""Truncate string to max_length, adding ellipsis if truncated."""
if len(text) <= max_length:
return text
return text[:max_length-3] + "..."


def _get_status_display(status):
"""Convert WorkflowExecutionStatus to display string with emoji."""
if not status:
return "❓ Unknown"

status_map = {
WorkflowExecutionStatus.WORKFLOW_EXECUTION_STATUS_RUNNING: "[green]🟢 Running[/green]",
WorkflowExecutionStatus.WORKFLOW_EXECUTION_STATUS_COMPLETED: "[blue]✅ Completed[/blue]",
WorkflowExecutionStatus.WORKFLOW_EXECUTION_STATUS_FAILED: "[red]❌ Failed[/red]",
WorkflowExecutionStatus.WORKFLOW_EXECUTION_STATUS_CANCELED: "[yellow]🟡 Canceled[/yellow]",
WorkflowExecutionStatus.WORKFLOW_EXECUTION_STATUS_TERMINATED: "[red]🔴 Terminated[/red]",
WorkflowExecutionStatus.WORKFLOW_EXECUTION_STATUS_TIMED_OUT: "[orange]⏰ Timed Out[/orange]",
WorkflowExecutionStatus.WORKFLOW_EXECUTION_STATUS_CONTINUED_AS_NEW: "[purple]🔄 Continued[/purple]",
}

return status_map.get(status, "❓ Unknown")
4 changes: 3 additions & 1 deletion src/mcp_agent/cli/cloud/commands/workflows/cancel/main.py
Original file line number Diff line number Diff line change
Expand Up @@ -81,7 +81,9 @@ def cancel_workflow(
cannot be resumed and will be marked as cancelled.

Examples:

mcp-agent cloud workflows cancel app_abc123 run_xyz789
mcp-agent cloud workflows cancel https://server.example.com run_xyz789 --reason "User requested cancellation"

mcp-agent cloud workflows cancel app_abc123 run_xyz789 --reason "User requested"
"""
run_async(_cancel_workflow_async(server_id_or_url, run_id, reason))
4 changes: 3 additions & 1 deletion src/mcp_agent/cli/cloud/commands/workflows/describe/main.py
Original file line number Diff line number Diff line change
Expand Up @@ -86,8 +86,10 @@ def describe_workflow(
creation time, and other metadata.

Examples:

mcp-agent cloud workflows describe app_abc123 run_xyz789
mcp-agent cloud workflows describe https://server.example.com run_xyz789 --format json

mcp-agent cloud workflows describe app_abc123 run_xyz789 --format json
"""
if format not in ["text", "json", "yaml"]:
console.print("[red]Error: --format must be 'text', 'json', or 'yaml'[/red]")
Expand Down
5 changes: 4 additions & 1 deletion src/mcp_agent/cli/cloud/commands/workflows/resume/main.py
Original file line number Diff line number Diff line change
Expand Up @@ -86,8 +86,11 @@ def resume_workflow(
a payload (JSON or text) to pass data to the resumed workflow.

Examples:

mcp-agent cloud workflows resume app_abc123 run_xyz789
mcp-agent cloud workflows resume https://server.example.com run_xyz789 --payload '{"data": "value"}'

mcp-agent cloud workflows resume app_abc123 run_xyz789 --payload '{"data": "value"}'

mcp-agent cloud workflows resume app_abc123 run_xyz789 --payload "simple text"
"""
if payload:
Expand Down
2 changes: 2 additions & 0 deletions src/mcp_agent/cli/cloud/main.py
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,7 @@
list_servers,
describe_server,
delete_server,
list_workflows_for_server,
)
from mcp_agent.cli.exceptions import CLIError
from mcp_agent.cli.utils.ux import print_error
Expand Down Expand Up @@ -163,6 +164,7 @@ def invoke(self, ctx):
app_cmd_servers.command(name="list")(list_servers)
app_cmd_servers.command(name="describe")(describe_server)
app_cmd_servers.command(name="delete")(delete_server)
app_cmd_servers.command(name="workflows")(list_workflows_for_server)
app.add_typer(app_cmd_servers, name="servers", help="Manage MCP Servers")

# Alias for servers - apps should behave identically
Expand Down
71 changes: 71 additions & 0 deletions src/mcp_agent/cli/mcp_app/api_client.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
from datetime import datetime
from typing import Any, Dict, List, Literal, Optional, Union
from urllib.parse import urlparse
from enum import Enum

from pydantic import BaseModel

Expand Down Expand Up @@ -63,6 +64,35 @@ class CanDoActionsResponse(BaseModel):
canDoActions: Optional[List[CanDoActionCheck]] = []


class WorkflowExecutionStatus(Enum):
"""From temporal.api.enums.v1.WorkflowExecutionStatus"""
WORKFLOW_EXECUTION_STATUS_UNSPECIFIED = "WORKFLOW_EXECUTION_STATUS_UNSPECIFIED"
WORKFLOW_EXECUTION_STATUS_RUNNING = "WORKFLOW_EXECUTION_STATUS_RUNNING"
WORKFLOW_EXECUTION_STATUS_FAILED = "WORKFLOW_EXECUTION_STATUS_FAILED"
WORKFLOW_EXECUTION_STATUS_TIMED_OUT = "WORKFLOW_EXECUTION_STATUS_TIMED_OUT"
WORKFLOW_EXECUTION_STATUS_CANCELED = "WORKFLOW_EXECUTION_STATUS_CANCELED"
WORKFLOW_EXECUTION_STATUS_TERMINATED = "WORKFLOW_EXECUTION_STATUS_TERMINATED"
WORKFLOW_EXECUTION_STATUS_COMPLETED = "WORKFLOW_EXECUTION_STATUS_COMPLETED"
WORKFLOW_EXECUTION_STATUS_CONTINUED_AS_NEW = "WORKFLOW_EXECUTION_STATUS_CONTINUED_AS_NEW"


class WorkflowInfo(BaseModel):
"""Information about a workflow execution instance"""
workflow_id: str
run_id: Optional[str] = None
name: str
created_at: datetime
principal_id: str
execution_status: Optional[WorkflowExecutionStatus] = None


class ListWorkflowsResponse(BaseModel):
"""Response for listing workflows"""
workflows: Optional[List[WorkflowInfo]] = []
next_page_token: Optional[str] = None
total_count: Optional[int] = 0


APP_ID_PREFIX = "app_"
APP_CONFIG_ID_PREFIX = "apcnf_"

Expand Down Expand Up @@ -464,6 +494,47 @@ async def list_app_configurations(
response = await self.post("/mcp_app/list_app_configurations", payload)
return ListAppConfigurationsResponse(**response.json())

async def list_workflows(
self,
app_id_or_config_id: str,
name_filter: Optional[str] = None,
max_results: int = 100,
page_token: Optional[str] = None,
) -> ListWorkflowsResponse:
"""List workflows for a specific app or app configuration.

Args:
app_id_or_config_id: The app ID (e.g. app_abc123) or app config ID (e.g. apcnf_xyz789)
name_filter: Optional workflow name filter
max_results: Maximum number of results to return
page_token: Pagination token

Returns:
ListWorkflowsResponse: The list of workflows

Raises:
ValueError: If the app_id_or_config_id is invalid
httpx.HTTPError: If the API request fails
"""
payload: Dict[str, Any] = {
"max_results": max_results,
}

if is_valid_app_id_format(app_id_or_config_id):
payload["app_specifier"] = {"app_id": app_id_or_config_id}
elif is_valid_app_config_id_format(app_id_or_config_id):
payload["app_specifier"] = {"app_config_id": app_id_or_config_id}
else:
raise ValueError(f"Invalid app ID or app config ID format: {app_id_or_config_id}. Expected format: app_xxx or apcnf_xxx")

if name_filter:
payload["name"] = name_filter
if page_token:
payload["page_token"] = page_token

response = await self.post("/workflow/list", payload)
return ListWorkflowsResponse(**response.json())

async def delete_app(self, app_id: str) -> str:
"""Delete an MCP App via the API.

Expand Down
Loading