diff --git a/examples/workflows/workflow_evaluator_optimizer/main.py b/examples/workflows/workflow_evaluator_optimizer/main.py index cc15813b6..72505caca 100644 --- a/examples/workflows/workflow_evaluator_optimizer/main.py +++ b/examples/workflows/workflow_evaluator_optimizer/main.py @@ -17,19 +17,22 @@ # The cycle continues until the letter meets a predefined quality standard. app = MCPApp(name="cover_letter_writer") -@app.async_tool(name="cover_letter_writer_tool", - description="This tool implements an evaluator-optimizer workflow for generating " - "high-quality cover letters. It takes job postings, candidate details, " - "and company information as input, then iteratively generates and refines " - "cover letters until they meet excellent quality standards through " - "automated evaluation and feedback.") + +@app.async_tool( + name="cover_letter_writer_tool", + description="This tool implements an evaluator-optimizer workflow for generating " + "high-quality cover letters. It takes job postings, candidate details, " + "and company information as input, then iteratively generates and refines " + "cover letters until they meet excellent quality standards through " + "automated evaluation and feedback.", +) async def example_usage( job_posting: str = "Software Engineer at LastMile AI. Responsibilities include developing AI systems, " - "collaborating with cross-functional teams, and enhancing scalability. Skills required: " - "Python, distributed systems, and machine learning.", + "collaborating with cross-functional teams, and enhancing scalability. Skills required: " + "Python, distributed systems, and machine learning.", candidate_details: str = "Alex Johnson, 3 years in machine learning, contributor to open-source AI projects, " - "proficient in Python and TensorFlow. Motivated by building scalable AI systems to solve real-world problems.", - company_information: str = "Look up from the LastMile AI About page: https://lastmileai.dev/about" + "proficient in Python and TensorFlow. Motivated by building scalable AI systems to solve real-world problems.", + company_information: str = "Look up from the LastMile AI About page: https://lastmileai.dev/about", ): async with app.run() as cover_letter_app: context = cover_letter_app.context diff --git a/src/mcp_agent/cli/auth/constants.py b/src/mcp_agent/cli/auth/constants.py index d90fcab1e..621eab02b 100644 --- a/src/mcp_agent/cli/auth/constants.py +++ b/src/mcp_agent/cli/auth/constants.py @@ -1,4 +1,12 @@ """Constants for the MCP Agent auth utilities.""" -# Default values +import os + +# Default credentials location (legacy) DEFAULT_CREDENTIALS_PATH = "~/.mcp-agent/credentials.json" + +# Additional locations to search (XDG-compatible and documented path) +XDG_CONFIG_HOME = os.environ.get("XDG_CONFIG_HOME") or os.path.expanduser("~/.config") +ALTERNATE_CREDENTIALS_PATHS = [ + os.path.join(XDG_CONFIG_HOME, "mcp-agent", "credentials.json"), +] diff --git a/src/mcp_agent/cli/auth/main.py b/src/mcp_agent/cli/auth/main.py index e169bf1d7..bf54dd214 100644 --- a/src/mcp_agent/cli/auth/main.py +++ b/src/mcp_agent/cli/auth/main.py @@ -1,8 +1,10 @@ import json import os +import tempfile from typing import Optional -from .constants import DEFAULT_CREDENTIALS_PATH +from .constants import DEFAULT_CREDENTIALS_PATH, ALTERNATE_CREDENTIALS_PATHS +from mcp_agent.cli.utils.ux import print_warning from .models import UserCredentials @@ -23,10 +25,35 @@ def save_credentials(credentials: UserCredentials) -> None: except OSError: pass - # Create file with restricted permissions (0600) to prevent leakage - fd = os.open(credentials_path, os.O_WRONLY | os.O_CREAT, 0o600) - with os.fdopen(fd, "w") as f: - f.write(credentials.to_json()) + # Write atomically to avoid partial or trailing content issues + # Use a temp file in the same directory, then replace + tmp_fd, tmp_path = tempfile.mkstemp( + prefix=".credentials.json.", dir=cred_dir, text=True + ) + try: + with os.fdopen(tmp_fd, "w") as f: + f.write(credentials.to_json()) + f.flush() + os.fsync(f.fileno()) + # Ensure restricted permissions (0600) + try: + os.chmod(tmp_path, 0o600) + except OSError: + pass + # Atomic replace + os.replace(tmp_path, credentials_path) + # Ensure final file perms in case replace inherited different mode + try: + os.chmod(credentials_path, 0o600) + except OSError: + pass + finally: + # Clean up temp if replace failed + try: + if os.path.exists(tmp_path): + os.remove(tmp_path) + except OSError: + pass def load_credentials() -> Optional[UserCredentials]: @@ -35,14 +62,26 @@ def load_credentials() -> Optional[UserCredentials]: Returns: UserCredentials object if it exists, None otherwise """ - credentials_path = os.path.expanduser(DEFAULT_CREDENTIALS_PATH) - if os.path.exists(credentials_path): - try: - with open(credentials_path, "r", encoding="utf-8") as f: - return UserCredentials.from_json(f.read()) - except (json.JSONDecodeError, KeyError, ValueError): - # Handle corrupted or old format credentials - return None + # Try primary location + primary_path = os.path.expanduser(DEFAULT_CREDENTIALS_PATH) + paths_to_try = [primary_path] + [ + os.path.expanduser(p) for p in ALTERNATE_CREDENTIALS_PATHS + ] + + for path in paths_to_try: + if os.path.exists(path): + try: + with open(path, "r", encoding="utf-8") as f: + return UserCredentials.from_json(f.read()) + except (json.JSONDecodeError, KeyError, ValueError): + # Corrupted credentials; warn and continue to other locations + try: + print_warning( + f"Detected corrupted credentials file at {path}. Please run 'mcp-agent login' again to re-authenticate." + ) + except Exception: + pass + continue return None @@ -52,11 +91,18 @@ def clear_credentials() -> bool: Returns: bool: True if credentials were cleared, False if none existed """ - credentials_path = os.path.expanduser(DEFAULT_CREDENTIALS_PATH) - if os.path.exists(credentials_path): - os.remove(credentials_path) - return True - return False + removed = False + paths = [os.path.expanduser(DEFAULT_CREDENTIALS_PATH)] + [ + os.path.expanduser(p) for p in ALTERNATE_CREDENTIALS_PATHS + ] + for path in paths: + if os.path.exists(path): + try: + os.remove(path) + removed = True + except OSError: + pass + return removed def load_api_key_credentials() -> Optional[str]: diff --git a/src/mcp_agent/cli/cloud/commands/auth/whoami/main.py b/src/mcp_agent/cli/cloud/commands/auth/whoami/main.py index 5c5b8cafb..bd2fc37f8 100644 --- a/src/mcp_agent/cli/cloud/commands/auth/whoami/main.py +++ b/src/mcp_agent/cli/cloud/commands/auth/whoami/main.py @@ -4,7 +4,8 @@ from rich.panel import Panel from rich.table import Table -from mcp_agent.cli.auth import load_credentials +from mcp_agent.cli.auth import load_credentials, UserCredentials +from mcp_agent.cli.config import settings as _settings from mcp_agent.cli.exceptions import CLIError @@ -13,11 +14,22 @@ def whoami() -> None: Shows the authenticated user information and organization memberships. """ + console = Console() credentials = load_credentials() - + # If no stored credentials, allow environment variable key + if not credentials and _settings.API_KEY: + credentials = UserCredentials(api_key=_settings.API_KEY) + # Print a brief note that this is env-based auth + console.print( + Panel( + "Using MCP_API_KEY environment variable for authentication.", + title="Auth Source", + border_style="green", + ) + ) if not credentials: raise CLIError( - "Not logged in. Use 'mcp-agent login' to authenticate.", exit_code=4 + "Not authenticated. Set MCP_API_KEY or run 'mcp-agent login'.", exit_code=4 ) if credentials.is_token_expired: @@ -26,8 +38,6 @@ def whoami() -> None: exit_code=4, ) - console = Console() - user_table = Table(show_header=False, box=None) user_table.add_column("Field", style="bold") user_table.add_column("Value") diff --git a/src/mcp_agent/cli/cloud/commands/logger/tail/main.py b/src/mcp_agent/cli/cloud/commands/logger/tail/main.py index 4a8ee476c..05a2ff8ba 100644 --- a/src/mcp_agent/cli/cloud/commands/logger/tail/main.py +++ b/src/mcp_agent/cli/cloud/commands/logger/tail/main.py @@ -19,6 +19,7 @@ from mcp_agent.cli.exceptions import CLIError from mcp_agent.cli.auth import load_credentials, UserCredentials +from mcp_agent.cli.config import settings as _settings from mcp_agent.cli.cloud.commands.utils import ( setup_authenticated_client, resolve_server, @@ -103,8 +104,13 @@ def tail_logs( """ credentials = load_credentials() + # Prefer environment variable if present + if not credentials and _settings.API_KEY: + credentials = UserCredentials(api_key=_settings.API_KEY) if not credentials: - print_error("Not authenticated. Run 'mcp-agent login' first.") + print_error( + "Not authenticated. Set MCP_API_KEY environment variable or run 'mcp-agent login'." + ) raise typer.Exit(4) # Validate conflicting options diff --git a/src/mcp_agent/cli/cloud/commands/utils.py b/src/mcp_agent/cli/cloud/commands/utils.py index 1e78c085e..fa0a3fbf6 100644 --- a/src/mcp_agent/cli/cloud/commands/utils.py +++ b/src/mcp_agent/cli/cloud/commands/utils.py @@ -4,6 +4,7 @@ from typing import Union from mcp_agent.cli.auth import load_api_key_credentials +from mcp_agent.cli.config import settings from mcp_agent.cli.core.api_client import UnauthenticatedError from mcp_agent.cli.core.constants import DEFAULT_API_BASE_URL from mcp_agent.cli.core.utils import run_async @@ -24,10 +25,13 @@ def setup_authenticated_client() -> MCPAppClient: Raises: CLIError: If authentication fails """ - effective_api_key = load_api_key_credentials() + # Prefer environment-provided key, then fall back to stored credentials + effective_api_key = settings.API_KEY or load_api_key_credentials() if not effective_api_key: - raise CLIError("Must be logged in to access servers. Run 'mcp-agent login'.") + raise CLIError( + "Must be authenticated. Set MCP_API_KEY or run 'mcp-agent login'." + ) return MCPAppClient(api_url=DEFAULT_API_BASE_URL, api_key=effective_api_key) @@ -48,10 +52,10 @@ def validate_output_format(format: str) -> None: ) -def resolve_server( +async def resolve_server_async( client: MCPAppClient, id_or_url: str ) -> Union[MCPApp, MCPAppConfiguration]: - """Resolve server from ID or URL. + """Resolve server from ID or URL (async). Args: client: Authenticated MCP App client @@ -64,12 +68,22 @@ def resolve_server( CLIError: If server resolution fails """ try: - return run_async(client.get_app_or_config(id_or_url)) - + return await client.get_app_or_config(id_or_url) except Exception as e: raise CLIError(f"Failed to resolve server '{id_or_url}': {str(e)}") from e +def resolve_server( + client: MCPAppClient, id_or_url: str +) -> Union[MCPApp, MCPAppConfiguration]: + """Resolve server from ID or URL (sync wrapper). + + Safe for synchronous CLI contexts. For async code paths, prefer + using resolve_server_async to avoid nested event loops. + """ + return run_async(resolve_server_async(client, id_or_url)) + + def handle_server_api_errors(func): """Decorator to handle common API errors for server commands. diff --git a/src/mcp_agent/cli/cloud/commands/workflows/cancel/main.py b/src/mcp_agent/cli/cloud/commands/workflows/cancel/main.py index cc2adbd2b..763070573 100644 --- a/src/mcp_agent/cli/cloud/commands/workflows/cancel/main.py +++ b/src/mcp_agent/cli/cloud/commands/workflows/cancel/main.py @@ -12,7 +12,7 @@ from ...utils import ( setup_authenticated_client, handle_server_api_errors, - resolve_server, + resolve_server_async, ) @@ -24,7 +24,7 @@ async def _cancel_workflow_async( server_url = server_id_or_url else: client = setup_authenticated_client() - server = resolve_server(client, server_id_or_url) + server = await resolve_server_async(client, server_id_or_url) if hasattr(server, "appServerInfo") and server.appServerInfo: server_url = server.appServerInfo.serverUrl @@ -36,7 +36,9 @@ async def _cancel_workflow_async( if not server_url: raise CLIError(f"No server URL found for server '{server_id_or_url}'") - effective_api_key = load_api_key_credentials() + from mcp_agent.cli.config import settings as _settings + + effective_api_key = _settings.API_KEY or load_api_key_credentials() if not effective_api_key: raise CLIError("Must be logged in to access server. Run 'mcp-agent login'.") diff --git a/src/mcp_agent/cli/cloud/commands/workflows/describe/main.py b/src/mcp_agent/cli/cloud/commands/workflows/describe/main.py index 2766b961f..85b034487 100644 --- a/src/mcp_agent/cli/cloud/commands/workflows/describe/main.py +++ b/src/mcp_agent/cli/cloud/commands/workflows/describe/main.py @@ -16,7 +16,7 @@ from ...utils import ( handle_server_api_errors, - resolve_server, + resolve_server_async, setup_authenticated_client, ) @@ -29,7 +29,7 @@ async def _describe_workflow_async( server_url = server_id_or_url else: client = setup_authenticated_client() - server = resolve_server(client, server_id_or_url) + server = await resolve_server_async(client, server_id_or_url) if hasattr(server, "appServerInfo") and server.appServerInfo: server_url = server.appServerInfo.serverUrl @@ -41,7 +41,9 @@ async def _describe_workflow_async( if not server_url: raise CLIError(f"No server URL found for server '{server_id_or_url}'") - effective_api_key = load_api_key_credentials() + from mcp_agent.cli.config import settings as _settings + + effective_api_key = _settings.API_KEY or load_api_key_credentials() if not effective_api_key: raise CLIError("Must be logged in to access server. Run 'mcp-agent login'.") diff --git a/src/mcp_agent/cli/cloud/commands/workflows/list/main.py b/src/mcp_agent/cli/cloud/commands/workflows/list/main.py index c8076458c..4aa8d9915 100644 --- a/src/mcp_agent/cli/cloud/commands/workflows/list/main.py +++ b/src/mcp_agent/cli/cloud/commands/workflows/list/main.py @@ -14,7 +14,7 @@ from mcp_agent.cli.utils.ux import console, print_error from ...utils import ( setup_authenticated_client, - resolve_server, + resolve_server_async, handle_server_api_errors, validate_output_format, ) @@ -26,7 +26,7 @@ async def _list_workflows_async(server_id_or_url: str, format: str = "text") -> server_url = server_id_or_url else: client = setup_authenticated_client() - server = resolve_server(client, server_id_or_url) + server = await resolve_server_async(client, server_id_or_url) if hasattr(server, "appServerInfo") and server.appServerInfo: server_url = server.appServerInfo.serverUrl @@ -38,7 +38,9 @@ async def _list_workflows_async(server_id_or_url: str, format: str = "text") -> if not server_url: raise CLIError(f"No server URL found for server '{server_id_or_url}'") - effective_api_key = load_api_key_credentials() + from mcp_agent.cli.config import settings as _settings + + effective_api_key = _settings.API_KEY or load_api_key_credentials() if not effective_api_key: raise CLIError("Must be logged in to access server. Run 'mcp-agent login'.") diff --git a/src/mcp_agent/cli/cloud/commands/workflows/resume/main.py b/src/mcp_agent/cli/cloud/commands/workflows/resume/main.py index 741e1e25c..1bb9b56cc 100644 --- a/src/mcp_agent/cli/cloud/commands/workflows/resume/main.py +++ b/src/mcp_agent/cli/cloud/commands/workflows/resume/main.py @@ -13,7 +13,7 @@ from ...utils import ( setup_authenticated_client, handle_server_api_errors, - resolve_server, + resolve_server_async, ) @@ -28,7 +28,7 @@ async def _signal_workflow_async( server_url = server_id_or_url else: client = setup_authenticated_client() - server = resolve_server(client, server_id_or_url) + server = await resolve_server_async(client, server_id_or_url) if hasattr(server, "appServerInfo") and server.appServerInfo: server_url = server.appServerInfo.serverUrl @@ -40,7 +40,9 @@ async def _signal_workflow_async( if not server_url: raise CLIError(f"No server URL found for server '{server_id_or_url}'") - effective_api_key = load_api_key_credentials() + from mcp_agent.cli.config import settings as _settings + + effective_api_key = _settings.API_KEY or load_api_key_credentials() if not effective_api_key: raise CLIError("Must be logged in to access server. Run 'mcp-agent login'.") diff --git a/src/mcp_agent/cli/cloud/commands/workflows/runs/main.py b/src/mcp_agent/cli/cloud/commands/workflows/runs/main.py index 90e6aedcd..2d3a10aaf 100644 --- a/src/mcp_agent/cli/cloud/commands/workflows/runs/main.py +++ b/src/mcp_agent/cli/cloud/commands/workflows/runs/main.py @@ -16,7 +16,7 @@ from mcp_agent.cli.utils.ux import console, print_error from ...utils import ( - resolve_server, + resolve_server_async, setup_authenticated_client, validate_output_format, ) @@ -30,7 +30,7 @@ async def _list_workflow_runs_async( server_url = server_id_or_url else: client = setup_authenticated_client() - server = resolve_server(client, server_id_or_url) + server = await resolve_server_async(client, server_id_or_url) if hasattr(server, "appServerInfo") and server.appServerInfo: server_url = server.appServerInfo.serverUrl @@ -42,7 +42,9 @@ async def _list_workflow_runs_async( if not server_url: raise CLIError(f"No server URL found for server '{server_id_or_url}'") - effective_api_key = load_api_key_credentials() + from mcp_agent.cli.config import settings as _settings + + effective_api_key = _settings.API_KEY or load_api_key_credentials() if not effective_api_key: raise CLIError("Must be logged in to access server. Run 'mcp-agent login'.") diff --git a/src/mcp_agent/cli/cloud/main.py b/src/mcp_agent/cli/cloud/main.py index 4dc218ddf..6ac0ae6cb 100644 --- a/src/mcp_agent/cli/cloud/main.py +++ b/src/mcp_agent/cli/cloud/main.py @@ -21,7 +21,6 @@ get_app_status, list_app_workflows, ) -from mcp_agent.cli.cloud.commands.apps import list_apps from mcp_agent.cli.cloud.commands.logger import tail_logs from mcp_agent.cli.cloud.commands.servers import ( delete_server, @@ -78,16 +77,6 @@ deploy_config ) - -# Sub-typer for `mcp-agent apps` commands -app_cmd_apps = typer.Typer( - help="Management commands for multiple MCP Apps", - no_args_is_help=True, - cls=HelpfulTyperGroup, -) -app_cmd_apps.command(name="list")(list_apps) -app.add_typer(app_cmd_apps, name="apps", help="Manage MCP Apps") - # Sub-typer for `mcp-agent app` commands app_cmd_app = typer.Typer( help="Management commands for an MCP App", @@ -98,7 +87,7 @@ app_cmd_app.command(name="delete")(delete_app) app_cmd_app.command(name="status")(get_app_status) app_cmd_app.command(name="workflows")(list_app_workflows) -app.add_typer(app_cmd_app, name="app", help="Manage an MCP App") +app.add_typer(app_cmd_app, name="apps", help="Manage an MCP App") # Sub-typer for `mcp-agent workflows` commands app_cmd_workflows = typer.Typer( diff --git a/src/mcp_agent/cli/mcp_app/mcp_client.py b/src/mcp_agent/cli/mcp_app/mcp_client.py index b77ddbefa..583efd7a3 100644 --- a/src/mcp_agent/cli/mcp_app/mcp_client.py +++ b/src/mcp_agent/cli/mcp_app/mcp_client.py @@ -16,6 +16,7 @@ console, print_success, ) +from mcp_agent.executor.workflow_registry import WorkflowRunsPage DEFAULT_CLIENT_INFO = types.Implementation(name="mcp", version="0.1.0") @@ -142,6 +143,7 @@ class ListWorkflowRunsResult(BaseModel): """Processed server response to a workflows-runs-list request from the client.""" workflow_runs: list[WorkflowRun] + next_page_token: Optional[str] = None class MCPClientSession(ClientSession): @@ -176,9 +178,26 @@ async def list_workflows(self) -> ListWorkflowsResult: return ListWorkflowsResult(workflows=workflows) - async def list_workflow_runs(self) -> ListWorkflowRunsResult: - """Send a workflows-runs-list request.""" - runs_response = await self.call_tool("workflows-runs-list", {}) + async def list_workflow_runs( + self, + *, + limit: Optional[int] = None, + page_size: Optional[int] = None, + next_page_token: Optional[str] = None, + ) -> ListWorkflowRunsResult: + """Send a workflows-runs-list request. + + Parses either a paginated WorkflowRunsPage shape or a legacy list/single-run shape. + """ + params: dict[str, Any] = {} + if limit is not None: + params["limit"] = limit + if page_size is not None: + params["page_size"] = page_size + if next_page_token: + params["next_page_token"] = next_page_token + + runs_response = await self.call_tool("workflows-runs-list", params) if runs_response.isError: error_message = ( runs_response.content[0].text @@ -188,17 +207,67 @@ async def list_workflow_runs(self) -> ListWorkflowRunsResult: ) raise Exception(error_message) - runs = [] + runs: list[WorkflowRun] = [] + next_token: Optional[str] = None + + text_items = [ + c for c in runs_response.content if isinstance(c, types.TextContent) + ] + if not text_items: + return ListWorkflowRunsResult(workflow_runs=runs, next_page_token=None) + for item in runs_response.content: - if isinstance(item, types.TextContent): - # Assuming the content is a JSON string representing a WorkflowRun item dict + if not isinstance(item, types.TextContent): + continue + + text = item.text + # Try JSON first + try: + data = json.loads(text) + except json.JSONDecodeError: + # Not JSON; ignore this content item + continue + + # Prefer paginated page shape when present + if isinstance(data, dict) and ("runs" in data or "next_page_token" in data): + try: + page = WorkflowRunsPage.model_validate(data) + for r in page.runs or []: + try: + runs.append( + MCPClientSession.deserialize_workflow_run(json.dumps(r)) + ) + except Exception: + pass + if page.next_page_token: + next_token = page.next_page_token + continue + except Exception: + # Fall through to normal handling if not a valid page + pass + + # Plain list or dict of runs + if isinstance(data, list): # List[Dict[str, Any]] + for r in data: + try: + runs.append( + MCPClientSession.deserialize_workflow_run(json.dumps(r)) + ) + except Exception: + pass + else: # Dict[str, Any] try: - workflow_run = MCPClientSession.deserialize_workflow_run(item.text) - runs.append(workflow_run) - except (json.JSONDecodeError, ValueError) as e: - raise ValueError(f"Invalid workflow run data: {e}") from e + runs.append( + MCPClientSession.deserialize_workflow_run(json.dumps(data)) + ) + except Exception: + # Last-ditch: attempt full deserialize of the original text + try: + runs.append(MCPClientSession.deserialize_workflow_run(text)) + except (json.JSONDecodeError, ValueError) as e: + raise ValueError(f"Invalid workflow run data: {e}") from e - return ListWorkflowRunsResult(workflow_runs=runs) + return ListWorkflowRunsResult(workflow_runs=runs, next_page_token=next_token) @staticmethod def deserialize_workflow_run(text: str) -> WorkflowRun: diff --git a/src/mcp_agent/executor/temporal/workflow_registry.py b/src/mcp_agent/executor/temporal/workflow_registry.py index 2cadd56df..2438468f0 100644 --- a/src/mcp_agent/executor/temporal/workflow_registry.py +++ b/src/mcp_agent/executor/temporal/workflow_registry.py @@ -1,5 +1,6 @@ import asyncio - +import base64 +from datetime import datetime, timedelta from typing import ( Any, Dict, @@ -9,7 +10,7 @@ ) from mcp_agent.logging.logger import get_logger -from mcp_agent.executor.workflow_registry import WorkflowRegistry +from mcp_agent.executor.workflow_registry import WorkflowRegistry, WorkflowRunsPage if TYPE_CHECKING: from mcp_agent.executor.temporal import TemporalExecutor @@ -216,23 +217,187 @@ async def get_workflow_status( return status_dict - async def list_workflow_statuses(self) -> List[Dict[str, Any]]: - result = [] - for run_id, workflow in self._local_workflows.items(): - # Get the workflow status directly to have consistent behavior - status = await workflow.get_status() - workflow_id = workflow.id or workflow.name + async def list_workflow_statuses( + self, + *, + query: str | None = None, + limit: int | None = None, + page_size: int | None = None, + next_page_token: bytes | None = None, + rpc_metadata: Dict[str, str] | None = None, + rpc_timeout: timedelta | None = None, + ) -> List[Dict[str, Any]] | WorkflowRunsPage: + """ + List workflow runs by querying Temporal visibility (preferred). - # Query Temporal for the status - temporal_status = await self._get_temporal_workflow_status( - workflow_id=workflow_id, run_id=run_id - ) + - When Temporal listing succeeds, only runs returned by Temporal are included; local + cache is used to enrich entries where possible. + - On failure or when listing is unsupported, fall back to locally tracked runs. - status["temporal"] = temporal_status + Args: + query: Optional Temporal visibility list filter; defaults to newest first when unset. + limit: Maximum number of runs to return; enforced locally if backend doesn't apply it. + page_size: Page size to request from Temporal, if supported by SDK version. + next_page_token: Opaque pagination token from prior call, if supported by SDK version. + rpc_metadata: Optional per-RPC headers for Temporal (not exposed via server tool). + rpc_timeout: Optional per-RPC timeout (not exposed via server tool). - result.append(status) + Returns: + A list of dictionaries with workflow information, or a WorkflowRunsPage object. + """ + results: List[Dict[str, Any]] = [] + + # Collect all executions for this task queue (best effort) + try: + await self._executor.ensure_client() + client = self._executor.client + + # TODO(saqadri): Multi-user auth scoping + # When supporting multiple users on one server, auth scoping should be enforced + # by the proxy layer using RPC metadata (e.g., API key). This client code should + # simply pass through rpc_metadata and let the backend filter results and manage + # pagination accordingly. + iterator = client.list_workflows( + query=query, + limit=limit, + page_size=page_size or 1000, + next_page_token=next_page_token, + rpc_metadata=rpc_metadata or {}, + rpc_timeout=rpc_timeout, + ) - return result + # Build quick lookup from local cache by (workflow_id, run_id) + in_memory_workflows: Dict[tuple[str, str], "Workflow"] = {} + for run_id, wf in self._local_workflows.items(): + workflow_id = wf.id or wf.name + if workflow_id and run_id: + in_memory_workflows[(workflow_id, run_id)] = wf + + count = 0 + max_count = limit if isinstance(limit, int) and limit > 0 else None + + async for workflow_info in iterator: + # Extract workflow_id and run_id robustly from various shapes + workflow_id = workflow_info.id + run_id = workflow_info.run_id + + if not workflow_id or not run_id: + # Can't build a handle without both IDs + continue + + # If we have a local workflow, start with its detailed status + wf = in_memory_workflows.get((workflow_id, run_id)) + if wf is not None: + status_dict = await wf.get_status() + else: + # Create a minimal status when not tracked locally + status_dict = { + "id": run_id, + "workflow_id": workflow_id, + "run_id": run_id, + "name": workflow_info.workflow_type or workflow_id, + "status": "unknown", + "running": False, + "state": {"status": "unknown", "metadata": {}, "error": None}, + } + + temporal_status: Dict[str, Any] = {} + try: + status: str | None = None + if workflow_info.status: + status = ( + workflow_info.status.name + if workflow_info.status.name + else str(workflow_info.status) + ) + + start_time = workflow_info.start_time + close_time = workflow_info.close_time + execution_time = workflow_info.execution_time + + def _to_timestamp(dt: datetime | None): + if dt is None: + return None + try: + if isinstance(dt, (int, float)): + return float(dt) + return dt.timestamp() + except Exception: + return None + + workflow_type = workflow_info.workflow_type + + temporal_status = { + "id": workflow_id, + "workflow_id": workflow_id, + "run_id": run_id, + "name": workflow_info.id, + "type": workflow_type, + "status": status, + "start_time": _to_timestamp(start_time), + "execution_time": _to_timestamp(execution_time), + "close_time": _to_timestamp(close_time), + "history_length": workflow_info.history_length, + "parent_workflow_id": workflow_info.parent_id, + "parent_run_id": workflow_info.parent_run_id, + } + except Exception: + temporal_status = await self._get_temporal_workflow_status( + workflow_id=workflow_id, run_id=run_id + ) + + status_dict["temporal"] = temporal_status + + # Reflect Temporal status into top-level summary + try: + ts = ( + temporal_status.get("status") + if isinstance(temporal_status, dict) + else None + ) + if isinstance(ts, str): + status_dict["status"] = ts.lower() + status_dict["running"] = ts.upper() in {"RUNNING", "OPEN"} + except Exception: + pass + + results.append(status_dict) + count += 1 + if max_count is not None and count >= max_count: + break + + token = getattr(iterator, "next_page_token", None) + if token: + if isinstance(token, str): + try: + token = token.encode("utf-8") + except Exception: + token = None + if token: + return WorkflowRunsPage( + runs=results, + next_page_token=base64.b64encode(token).decode("ascii"), + ) + else: + return results + except Exception as e: + logger.warning( + f"Error listing workflows from Temporal; falling back to local cache: {e}" + ) + # Fallback – return local cache augmented with Temporal describe where possible + for run_id, wf in self._local_workflows.items(): + status = await wf.get_status() + workflow_id = wf.id or wf.name + try: + status["temporal"] = await self._get_temporal_workflow_status( + workflow_id=workflow_id, run_id=run_id + ) + except Exception: + # This is expected if we couldn't get a hold of the temporal client + pass + + results.append(status) + return results async def list_workflows(self) -> List["Workflow"]: """ diff --git a/src/mcp_agent/executor/workflow.py b/src/mcp_agent/executor/workflow.py index 7e0eed92d..723c99bb9 100644 --- a/src/mcp_agent/executor/workflow.py +++ b/src/mcp_agent/executor/workflow.py @@ -586,6 +586,8 @@ async def get_status(self) -> Dict[str, Any]: """ status = { "id": self._run_id, + "workflow_id": self.id, + "run_id": self._run_id, "name": self.name, "status": self.state.status, "running": self._run_task is not None and not self._run_task.done() diff --git a/src/mcp_agent/executor/workflow_registry.py b/src/mcp_agent/executor/workflow_registry.py index 343de922d..1898135fa 100644 --- a/src/mcp_agent/executor/workflow_registry.py +++ b/src/mcp_agent/executor/workflow_registry.py @@ -1,9 +1,13 @@ import asyncio +from datetime import timedelta + +from pydantic import BaseModel from abc import ABC, abstractmethod from typing import ( Any, Dict, + Mapping, Optional, List, TYPE_CHECKING, @@ -17,6 +21,11 @@ logger = get_logger(__name__) +class WorkflowRunsPage(BaseModel): + runs: List[Dict[str, Any]] + next_page_token: str | None + + class WorkflowRegistry(ABC): """ Abstract base class for registry tracking workflow instances. @@ -127,12 +136,34 @@ async def get_workflow_status( pass @abstractmethod - async def list_workflow_statuses(self) -> List[Dict[str, Any]]: + async def list_workflow_statuses( + self, + *, + query: str | None = None, + limit: int | None = None, + page_size: int | None = None, + next_page_token: bytes | None = None, + rpc_metadata: Mapping[str, str] | None = None, + rpc_timeout: timedelta | None = None, + ) -> List[Dict[str, Any]] | WorkflowRunsPage: """ - List all registered workflow instances with their status. + List workflow runs with their status. + + Implementations may query an external backend (e.g., Temporal) or use local state. + The server tool defaults limit to 100 if not provided here. + + Args: + query: Optional backend-specific visibility filter (advanced). + limit: Maximum number of results to return. + page_size: Page size for backends that support paging. + next_page_token: Opaque pagination token from a prior call. + rpc_metadata: Optional per-RPC headers for backends. + rpc_timeout: Optional per-RPC timeout for backends. Returns: - A list of dictionaries with workflow information + A list of dictionaries with workflow information. + Implementations should only return the WorkflowRunsPage when a next_page_token exists. The token + should be base64-encoded for JSON transport. """ pass @@ -267,12 +298,33 @@ async def get_workflow_status( return await workflow.get_status() - async def list_workflow_statuses(self) -> List[Dict[str, Any]]: - result = [] - for workflow in self._workflows.values(): - # Get the workflow status directly to have consistent behavior - status = await workflow.get_status() + async def list_workflow_statuses( + self, + *, + query: str | None = None, + limit: int | None = None, + page_size: int | None = None, + next_page_token: bytes | None = None, + rpc_metadata: Mapping[str, str] | None = None, + rpc_timeout: timedelta | None = None, + ) -> List[Dict[str, Any]] | WorkflowRunsPage: + # For in-memory engine, ignore query/paging tokens; apply simple limit and recency sort + workflows = list(self._workflows.values()) if self._workflows else [] + try: + workflows.sort( + key=lambda wf: (wf.state.updated_at if wf.state else None) or 0, + reverse=True, + ) + except Exception: + pass + + result: List[Dict[str, Any]] = [] + max_count = limit if isinstance(limit, int) and limit > 0 else None + for wf in workflows: + status = await wf.get_status() result.append(status) + if max_count is not None and len(result) >= max_count: + break return result diff --git a/src/mcp_agent/server/app_server.py b/src/mcp_agent/server/app_server.py index 1b4e8b712..2b6126d5d 100644 --- a/src/mcp_agent/server/app_server.py +++ b/src/mcp_agent/server/app_server.py @@ -22,8 +22,9 @@ from mcp_agent.core.context_dependent import ContextDependent from mcp_agent.executor.workflow import Workflow from mcp_agent.executor.workflow_registry import ( - WorkflowRegistry, InMemoryWorkflowRegistry, + WorkflowRegistry, + WorkflowRunsPage, ) from mcp_agent.logging.logger import get_logger @@ -1053,7 +1054,12 @@ def list_workflows(ctx: MCPContext) -> Dict[str, Dict[str, Any]]: return result @mcp.tool(name="workflows-runs-list") - async def list_workflow_runs(ctx: MCPContext) -> List[Dict[str, Any]]: + async def list_workflow_runs( + ctx: MCPContext, + limit: int = 100, + page_size: int | None = 100, + next_page_token: str | None = None, + ) -> List[Dict[str, Any]] | WorkflowRunsPage: """ List all workflow instances (runs) with their detailed status information. @@ -1061,8 +1067,14 @@ async def list_workflow_runs(ctx: MCPContext) -> List[Dict[str, Any]]: For each running workflow, returns its ID, name, current state, and available operations. This helps in identifying and managing active workflow instances. + + Args: + limit: Maximum number of runs to return. Default: 100. + page_size: Page size for paginated backends. Default: 100. + next_page_token: Optional Base64-encoded token for pagination resume. Only provide if you received a next_page_token from a previous call. + Returns: - A dictionary mapping workflow instance IDs to their detailed status information. + A list of workflow run status dictionaries with detailed workflow information. """ # Ensure upstream session is set for any logs emitted during this call try: @@ -1076,10 +1088,26 @@ async def list_workflow_runs(ctx: MCPContext) -> List[Dict[str, Any]]: if server_context is None or not hasattr(server_context, "workflow_registry"): raise ToolError("Server context not available for MCPApp Server.") - # Get all workflow statuses from the registry + # Decode next_page_token if provided (base64-encoded string -> bytes) + token_bytes = None + if next_page_token: + try: + import base64 as _b64 + + token_bytes = _b64.b64decode(next_page_token) + except Exception: + token_bytes = None + + # Get workflow statuses from the registry with pagination/query hints workflow_statuses = ( - await server_context.workflow_registry.list_workflow_statuses() + await server_context.workflow_registry.list_workflow_statuses( + query=None, + limit=limit, + page_size=page_size, + next_page_token=token_bytes, + ) ) + return workflow_statuses @mcp.tool(name="workflows-run") diff --git a/uv.lock b/uv.lock index 96e962b6f..3260869a6 100644 --- a/uv.lock +++ b/uv.lock @@ -2041,7 +2041,7 @@ wheels = [ [[package]] name = "mcp-agent" -version = "0.1.21" +version = "0.1.22" source = { editable = "." } dependencies = [ { name = "aiohttp" },