Skip to content
Merged
Show file tree
Hide file tree
Changes from 3 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 4 additions & 2 deletions src/mcp_agent/cli/core/utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -27,14 +27,16 @@ def run_async(coro):
raise


def load_user_app(script_path: Path | None, settings_override: Optional[Settings] = None) -> MCPApp:
def load_user_app(
script_path: Path | None, settings_override: Optional[Settings] = None
) -> MCPApp:
"""Import a user script and return an MCPApp instance.

Resolution order within module globals:
1) variable named 'app' that is MCPApp
2) callable 'create_app' or 'get_app' that returns MCPApp
3) first MCPApp instance found in globals

Args:
script_path: Path to the Python script containing the MCPApp
settings_override: Optional settings to override the app's configuration
Expand Down
91 changes: 80 additions & 11 deletions src/mcp_agent/cli/mcp_app/mcp_client.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@
console,
print_success,
)
from mcp_agent.executor.workflow_registry import WorkflowRunsPage

DEFAULT_CLIENT_INFO = types.Implementation(name="mcp", version="0.1.0")

Expand Down Expand Up @@ -142,6 +143,7 @@ class ListWorkflowRunsResult(BaseModel):
"""Processed server response to a workflows-runs-list request from the client."""

workflow_runs: list[WorkflowRun]
next_page_token: Optional[str] = None


class MCPClientSession(ClientSession):
Expand Down Expand Up @@ -176,9 +178,26 @@ async def list_workflows(self) -> ListWorkflowsResult:

return ListWorkflowsResult(workflows=workflows)

async def list_workflow_runs(self) -> ListWorkflowRunsResult:
"""Send a workflows-runs-list request."""
runs_response = await self.call_tool("workflows-runs-list", {})
async def list_workflow_runs(
self,
*,
limit: Optional[int] = None,
page_size: Optional[int] = None,
next_page_token: Optional[str] = None,
) -> ListWorkflowRunsResult:
"""Send a workflows-runs-list request.

Parses either a paginated WorkflowRunsPage shape or a legacy list/single-run shape.
"""
params: dict[str, Any] = {}
if limit is not None:
params["limit"] = limit
if page_size is not None:
params["page_size"] = page_size
if next_page_token:
params["next_page_token"] = next_page_token

runs_response = await self.call_tool("workflows-runs-list", params)
if runs_response.isError:
error_message = (
runs_response.content[0].text
Expand All @@ -188,17 +207,67 @@ async def list_workflow_runs(self) -> ListWorkflowRunsResult:
)
raise Exception(error_message)

runs = []
runs: list[WorkflowRun] = []
next_token: Optional[str] = None

text_items = [
c for c in runs_response.content if isinstance(c, types.TextContent)
]
if not text_items:
return ListWorkflowRunsResult(workflow_runs=runs, next_page_token=None)

for item in runs_response.content:
if isinstance(item, types.TextContent):
# Assuming the content is a JSON string representing a WorkflowRun item dict
if not isinstance(item, types.TextContent):
continue

text = item.text
# Try JSON first
try:
data = json.loads(text)
except json.JSONDecodeError:
# Not JSON; ignore this content item
continue

# Prefer paginated page shape when present
if isinstance(data, dict) and ("runs" in data or "next_page_token" in data):
try:
page = WorkflowRunsPage.model_validate(data)
for r in page.runs or []:
try:
runs.append(
MCPClientSession.deserialize_workflow_run(json.dumps(r))
)
except Exception:
pass
if page.next_page_token:
next_token = page.next_page_token
continue
except Exception:
# Fall through to normal handling if not a valid page
pass

# Plain list or dict of runs
if isinstance(data, list): # List[Dict[str, Any]]
for r in data:
try:
runs.append(
MCPClientSession.deserialize_workflow_run(json.dumps(r))
)
except Exception:
pass
else: # Dict[str, Any]
try:
workflow_run = MCPClientSession.deserialize_workflow_run(item.text)
runs.append(workflow_run)
except (json.JSONDecodeError, ValueError) as e:
raise ValueError(f"Invalid workflow run data: {e}") from e
runs.append(
MCPClientSession.deserialize_workflow_run(json.dumps(data))
)
except Exception:
# Last-ditch: attempt full deserialize of the original text
try:
runs.append(MCPClientSession.deserialize_workflow_run(text))
except (json.JSONDecodeError, ValueError) as e:
raise ValueError(f"Invalid workflow run data: {e}") from e

return ListWorkflowRunsResult(workflow_runs=runs)
return ListWorkflowRunsResult(workflow_runs=runs, next_page_token=next_token)

@staticmethod
def deserialize_workflow_run(text: str) -> WorkflowRun:
Expand Down
189 changes: 174 additions & 15 deletions src/mcp_agent/executor/temporal/workflow_registry.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
import asyncio

import base64
from datetime import datetime
from typing import (
Any,
Dict,
Expand All @@ -9,7 +10,7 @@
)

from mcp_agent.logging.logger import get_logger
from mcp_agent.executor.workflow_registry import WorkflowRegistry
from mcp_agent.executor.workflow_registry import WorkflowRegistry, WorkflowRunsPage

if TYPE_CHECKING:
from mcp_agent.executor.temporal import TemporalExecutor
Expand Down Expand Up @@ -216,23 +217,181 @@ async def get_workflow_status(

return status_dict

async def list_workflow_statuses(self) -> List[Dict[str, Any]]:
result = []
for run_id, workflow in self._local_workflows.items():
# Get the workflow status directly to have consistent behavior
status = await workflow.get_status()
workflow_id = workflow.id or workflow.name
async def list_workflow_statuses(
self,
*,
query: str | None = None,
limit: int | None = None,
page_size: int | None = None,
next_page_token: bytes | None = None,
rpc_metadata: Dict[str, str] | None = None,
rpc_timeout: Any | None = None,
) -> List[Dict[str, Any]] | WorkflowRunsPage:
"""
List workflow runs by querying Temporal visibility (preferred).

# Query Temporal for the status
temporal_status = await self._get_temporal_workflow_status(
workflow_id=workflow_id, run_id=run_id
)
- When Temporal listing succeeds, only runs returned by Temporal are included; local
cache is used to enrich entries where possible.
- On failure or when listing is unsupported, fall back to locally tracked runs.

status["temporal"] = temporal_status
Args:
query: Optional Temporal visibility list filter; defaults to newest first when unset.
limit: Maximum number of runs to return; enforced locally if backend doesn't apply it.
page_size: Page size to request from Temporal, if supported by SDK version.
next_page_token: Opaque pagination token from prior call, if supported by SDK version.
rpc_metadata: Optional per-RPC headers for Temporal (not exposed via server tool).
rpc_timeout: Optional per-RPC timeout (not exposed via server tool).

result.append(status)
Returns:
A list of dictionaries with workflow information, or a WorkflowRunsPage object.
"""
results: List[Dict[str, Any]] = []

# Collect all executions for this task queue (best effort)
try:
await self._executor.ensure_client()
client = self._executor.client

# Use caller query if provided; else default to newest first
query_local = query or "order by StartTime desc"

# TODO(saqadri): Multi-user auth scoping
# When supporting multiple users on one server, auth scoping should be enforced
# by the proxy layer using RPC metadata (e.g., API key). This client code should
# simply pass through rpc_metadata and let the backend filter results and manage
# pagination accordingly.
iterator = client.list_workflows(
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Just noting for later -- when we push support for multiple users using a server, we'll need to add the auth checks in the proxy layer for this request, which will have implications for results returned (e.g. we'll only return those that the user has permission to see)

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Good point, let me mark that as a TODO comment in the code as well. I think this is reasonable for the time being but agreed we'll want this to be user-scoped in the future.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Fwiw, this client code shouldn't really need to know/care about it, as long as our rpc metadata passes through the api key. Our backend proxy layer will handle the auth checks, but will need to handle pagination and such with some in-memory processing

query=query_local,
limit=limit,
page_size=page_size,
next_page_token=next_page_token,
rpc_metadata=rpc_metadata,
rpc_timeout=rpc_timeout,
)

return result
# Build quick lookup from local cache by (workflow_id, run_id)
in_memory_workflows: Dict[tuple[str, str], "Workflow"] = {}
for run_id, wf in self._local_workflows.items():
workflow_id = wf.id or wf.name
if workflow_id and run_id:
in_memory_workflows[(workflow_id, run_id)] = wf

count = 0
max_count = limit if isinstance(limit, int) and limit > 0 else None

async for workflow_info in iterator:
# Extract workflow_id and run_id robustly from various shapes
workflow_id = workflow_info.id
run_id = workflow_info.run_id

if not workflow_id or not run_id:
# Can't build a handle without both IDs
continue

# If we have a local workflow, start with its detailed status
wf = in_memory_workflows.get((workflow_id, run_id))
if wf is not None:
status_dict = await wf.get_status()
else:
# Create a minimal status when not tracked locally
status_dict = {
"id": run_id,
"workflow_id": workflow_id,
"run_id": run_id,
"name": workflow_info.workflow_type or workflow_id,
"status": "unknown",
"running": False,
"state": {"status": "unknown", "metadata": {}, "error": None},
}

temporal_status: Dict[str, Any] = {}
try:
status: str | None = None
if workflow_info.status:
status = (
workflow_info.status.name
if workflow_info.status.name
else str(workflow_info.status)
)

start_time = workflow_info.start_time
close_time = workflow_info.close_time
execution_time = workflow_info.execution_time

def _to_timestamp(dt: datetime):
try:
return dt.timestamp() if dt is not None else None
except Exception:
return dt
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Unsafe exception handling in timestamp conversion: The _to_timestamp() function catches all exceptions and returns the original datetime object on error, but this could cause type inconsistency issues downstream where code expects a numeric timestamp. If dt.timestamp() fails, returning the datetime object could cause runtime errors in code that expects a number. Fix: Return None or a default numeric value on conversion failure.

Suggested change
def _to_timestamp(dt: datetime):
try:
return dt.timestamp() if dt is not None else None
except Exception:
return dt
def _to_timestamp(dt: datetime):
if dt is None:
return None
try:
return dt.timestamp()
except Exception:
return None

Spotted by Diamond

Fix in Graphite


Is this helpful? React 👍 or 👎 to let us know.


workflow_type = workflow_info.workflow_type

temporal_status = {
"id": workflow_id,
"workflow_id": workflow_id,
"run_id": run_id,
"name": workflow_info.id,
"type": workflow_type,
"status": status,
"start_time": _to_timestamp(start_time),
"execution_time": _to_timestamp(execution_time),
"close_time": _to_timestamp(close_time),
"history_length": workflow_info.history_length,
"parent_workflow_id": workflow_info.parent_id,
"parent_run_id": workflow_info.parent_run_id,
}
except Exception:
temporal_status = await self._get_temporal_workflow_status(
workflow_id=workflow_id, run_id=run_id
)

status_dict["temporal"] = temporal_status

# Reflect Temporal status into top-level summary
try:
ts = (
temporal_status.get("status")
if isinstance(temporal_status, dict)
else None
)
if isinstance(ts, str):
status_dict["status"] = ts.lower()
status_dict["running"] = ts.upper() in {"RUNNING", "OPEN"}
except Exception:
pass

results.append(status_dict)
count += 1
if max_count is not None and count >= max_count:
break

if iterator.next_page_token:
return WorkflowRunsPage(
runs=results,
next_page_token=base64.b64encode(iterator.next_page_token).decode(
"ascii"
),
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Potential encoding/decoding mismatch: The code base64-encodes iterator.next_page_token assuming it's bytes, but later in the server code (lines 1095-1097) it's decoded back to bytes. If iterator.next_page_token is already a string, this will cause a TypeError on b64encode(). The code should verify the type of next_page_token before encoding or handle both string and bytes cases.

Suggested change
next_page_token=base64.b64encode(iterator.next_page_token).decode(
"ascii"
),
next_page_token=base64.b64encode(
iterator.next_page_token.encode('utf-8') if isinstance(iterator.next_page_token, str) else iterator.next_page_token
).decode("ascii"),

Spotted by Diamond

Fix in Graphite


Is this helpful? React 👍 or 👎 to let us know.

)
else:
return results
except Exception as e:
logger.warning(
f"Error listing workflows from Temporal; falling back to local cache: {e}"
)
# Fallback – return local cache augmented with Temporal describe where possible
for run_id, wf in self._local_workflows.items():
status = await wf.get_status()
workflow_id = wf.id or wf.name
try:
status["temporal"] = await self._get_temporal_workflow_status(
workflow_id=workflow_id, run_id=run_id
)
except Exception:
# This is expected if we couldn't get a hold of the temporal client
pass

results.append(status)
return results

async def list_workflows(self) -> List["Workflow"]:
"""
Expand Down
2 changes: 2 additions & 0 deletions src/mcp_agent/executor/workflow.py
Original file line number Diff line number Diff line change
Expand Up @@ -586,6 +586,8 @@ async def get_status(self) -> Dict[str, Any]:
"""
status = {
"id": self._run_id,
"workflow_id": self.id,
"run_id": self._run_id,
"name": self.name,
"status": self.state.status,
"running": self._run_task is not None and not self._run_task.done()
Expand Down
Loading
Loading