Skip to content
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
147 changes: 132 additions & 15 deletions src/mcp_agent/executor/temporal/workflow_registry.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
import asyncio

import base64
from typing import (
Any,
Dict,
Expand All @@ -9,7 +9,7 @@
)

from mcp_agent.logging.logger import get_logger
from mcp_agent.executor.workflow_registry import WorkflowRegistry
from mcp_agent.executor.workflow_registry import WorkflowRegistry, WorkflowRunsPage

if TYPE_CHECKING:
from mcp_agent.executor.temporal import TemporalExecutor
Expand Down Expand Up @@ -216,23 +216,140 @@ async def get_workflow_status(

return status_dict

async def list_workflow_statuses(self) -> List[Dict[str, Any]]:
result = []
for run_id, workflow in self._local_workflows.items():
# Get the workflow status directly to have consistent behavior
status = await workflow.get_status()
workflow_id = workflow.id or workflow.name
async def list_workflow_statuses(
self,
*,
query: str | None = None,
limit: int | None = None,
page_size: int | None = None,
next_page_token: bytes | None = None,
rpc_metadata: Dict[str, str] | None = None,
rpc_timeout: Any | None = None,
) -> List[Dict[str, Any]] | WorkflowRunsPage:
"""
List workflow runs by querying Temporal visibility (preferred).

# Query Temporal for the status
temporal_status = await self._get_temporal_workflow_status(
workflow_id=workflow_id, run_id=run_id
)
- When Temporal listing succeeds, only runs returned by Temporal are included; local
cache is used to enrich entries where possible.
- On failure or when listing is unsupported, fall back to locally tracked runs.

status["temporal"] = temporal_status
Args:
query: Optional Temporal visibility list filter; defaults to newest first when unset.
limit: Maximum number of runs to return; enforced locally if backend doesn't apply it.
page_size: Page size to request from Temporal, if supported by SDK version.
next_page_token: Opaque pagination token from prior call, if supported by SDK version.
rpc_metadata: Optional per-RPC headers for Temporal (not exposed via server tool).
rpc_timeout: Optional per-RPC timeout (not exposed via server tool).

result.append(status)
Returns:
A list of dictionaries with workflow information, or a WorkflowRunsPage object.
"""
results: List[Dict[str, Any]] = []

return result
# Collect all executions for this task queue (best effort)
try:
await self._executor.ensure_client()
client = self._executor.client

# Use caller query if provided; else default to newest first
query_local = query or "order by StartTime desc"

iterator = client.list_workflows(
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Just noting for later -- when we push support for multiple users using a server, we'll need to add the auth checks in the proxy layer for this request, which will have implications for results returned (e.g. we'll only return those that the user has permission to see)

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Good point, let me mark that as a TODO comment in the code as well. I think this is reasonable for the time being but agreed we'll want this to be user-scoped in the future.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Fwiw, this client code shouldn't really need to know/care about it, as long as our rpc metadata passes through the api key. Our backend proxy layer will handle the auth checks, but will need to handle pagination and such with some in-memory processing

query=query_local,
limit=limit,
page_size=page_size,
next_page_token=next_page_token,
rpc_metadata=rpc_metadata,
rpc_timeout=rpc_timeout,
)

# Build quick lookup from local cache by (workflow_id, run_id)
in_memory_workflows: Dict[tuple[str, str], "Workflow"] = {}
for run_id, wf in self._local_workflows.items():
workflow_id = wf.id or wf.name
if workflow_id and run_id:
in_memory_workflows[(workflow_id, run_id)] = wf

count = 0
max_count = limit if isinstance(limit, int) and limit > 0 else None

async for workflow_info in iterator:
# Extract workflow_id and run_id robustly from various shapes
workflow_id = workflow_info.id
run_id = workflow_info.run_id

if not workflow_id or not run_id:
# Can't build a handle without both IDs
continue

# If we have a local workflow, start with its detailed status
wf = in_memory_workflows.get((workflow_id, run_id))
if wf is not None:
status_dict = await wf.get_status()
else:
# Create a minimal status when not tracked locally
status_dict = {
"id": run_id,
"workflow_id": workflow_id,
"run_id": run_id,
"name": workflow_info.workflow_type or workflow_id,
"status": "unknown",
"running": False,
"state": {"status": "unknown", "metadata": {}, "error": None},
}

# Merge Temporal visibility/describe details
temporal_status = await self._get_temporal_workflow_status(
workflow_id=workflow_id, run_id=run_id
)

status_dict["temporal"] = temporal_status

# Try to reflect Temporal status into top-level summary
try:
ts = (
temporal_status.get("status")
if isinstance(temporal_status, dict)
else None
)
if isinstance(ts, str):
status_dict["status"] = ts.lower()
status_dict["running"] = ts.upper() in {"RUNNING", "OPEN"}
except Exception:
pass

results.append(status_dict)
count += 1
if max_count is not None and count >= max_count:
break

if iterator.next_page_token:
return WorkflowRunsPage(
runs=results,
next_page_token=base64.b64encode(iterator.next_page_token).decode(
"ascii"
),
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Potential encoding/decoding mismatch: The code base64-encodes iterator.next_page_token assuming it's bytes, but later in the server code (lines 1095-1097) it's decoded back to bytes. If iterator.next_page_token is already a string, this will cause a TypeError on b64encode(). The code should verify the type of next_page_token before encoding or handle both string and bytes cases.

Suggested change
next_page_token=base64.b64encode(iterator.next_page_token).decode(
"ascii"
),
next_page_token=base64.b64encode(
iterator.next_page_token.encode('utf-8') if isinstance(iterator.next_page_token, str) else iterator.next_page_token
).decode("ascii"),

Spotted by Diamond

Fix in Graphite


Is this helpful? React 👍 or 👎 to let us know.

)
else:
return results
except Exception as e:
logger.warning(
f"Error listing workflows from Temporal; falling back to local cache: {e}"
)
# Fallback – return local cache augmented with Temporal describe where possible
for run_id, wf in self._local_workflows.items():
status = await wf.get_status()
workflow_id = wf.id or wf.name
try:
status["temporal"] = await self._get_temporal_workflow_status(
workflow_id=workflow_id, run_id=run_id
)
except Exception:
# This is expected if we couldn't get a hold of the temporal client
pass

results.append(status)
return results

async def list_workflows(self) -> List["Workflow"]:
"""
Expand Down
2 changes: 2 additions & 0 deletions src/mcp_agent/executor/workflow.py
Original file line number Diff line number Diff line change
Expand Up @@ -586,6 +586,8 @@ async def get_status(self) -> Dict[str, Any]:
"""
status = {
"id": self._run_id,
"workflow_id": self.id,
"run_id": self._run_id,
"name": self.name,
"status": self.state.status,
"running": self._run_task is not None and not self._run_task.done()
Expand Down
68 changes: 60 additions & 8 deletions src/mcp_agent/executor/workflow_registry.py
Original file line number Diff line number Diff line change
@@ -1,9 +1,13 @@
import asyncio
from datetime import timedelta

from pydantic import BaseModel

from abc import ABC, abstractmethod
from typing import (
Any,
Dict,
Mapping,
Optional,
List,
TYPE_CHECKING,
Expand All @@ -17,6 +21,11 @@
logger = get_logger(__name__)


class WorkflowRunsPage(BaseModel):
runs: List[Dict[str, Any]]
next_page_token: str | None


class WorkflowRegistry(ABC):
"""
Abstract base class for registry tracking workflow instances.
Expand Down Expand Up @@ -127,12 +136,34 @@ async def get_workflow_status(
pass

@abstractmethod
async def list_workflow_statuses(self) -> List[Dict[str, Any]]:
async def list_workflow_statuses(
self,
*,
query: str | None = None,
limit: int | None = None,
page_size: int | None = None,
next_page_token: bytes | None = None,
rpc_metadata: Mapping[str, str] | None = None,
rpc_timeout: timedelta | None = None,
) -> List[Dict[str, Any]] | WorkflowRunsPage:
"""
List all registered workflow instances with their status.
List workflow runs with their status.

Implementations may query an external backend (e.g., Temporal) or use local state.
The server tool defaults limit to 100 if not provided here.

Args:
query: Optional backend-specific visibility filter (advanced).
limit: Maximum number of results to return.
page_size: Page size for backends that support paging.
next_page_token: Opaque pagination token from a prior call.
rpc_metadata: Optional per-RPC headers for backends.
rpc_timeout: Optional per-RPC timeout for backends.

Returns:
A list of dictionaries with workflow information
A list of dictionaries with workflow information.
Implementations should only return the WorkflowRunsPage when a next_page_token exists. The token
should be base64-encoded for JSON transport.
"""
pass

Expand Down Expand Up @@ -267,12 +298,33 @@ async def get_workflow_status(

return await workflow.get_status()

async def list_workflow_statuses(self) -> List[Dict[str, Any]]:
result = []
for workflow in self._workflows.values():
# Get the workflow status directly to have consistent behavior
status = await workflow.get_status()
async def list_workflow_statuses(
self,
*,
query: str | None = None,
limit: int | None = None,
page_size: int | None = None,
next_page_token: bytes | None = None,
rpc_metadata: Mapping[str, str] | None = None,
rpc_timeout: timedelta | None = None,
) -> List[Dict[str, Any]] | WorkflowRunsPage:
# For in-memory engine, ignore query/paging tokens; apply simple limit and recency sort
workflows = list(self._workflows.values()) if self._workflows else []
try:
workflows.sort(
key=lambda wf: (wf.state.updated_at if wf.state else None) or 0,
reverse=True,
)
except Exception:
pass

result: List[Dict[str, Any]] = []
max_count = limit if isinstance(limit, int) and limit > 0 else None
for wf in workflows:
status = await wf.get_status()
result.append(status)
if max_count is not None and len(result) >= max_count:
break

return result

Expand Down
38 changes: 33 additions & 5 deletions src/mcp_agent/server/app_server.py
Original file line number Diff line number Diff line change
Expand Up @@ -22,8 +22,9 @@
from mcp_agent.core.context_dependent import ContextDependent
from mcp_agent.executor.workflow import Workflow
from mcp_agent.executor.workflow_registry import (
WorkflowRegistry,
InMemoryWorkflowRegistry,
WorkflowRegistry,
WorkflowRunsPage,
)

from mcp_agent.logging.logger import get_logger
Expand Down Expand Up @@ -1043,16 +1044,27 @@ def list_workflows(ctx: MCPContext) -> Dict[str, Dict[str, Any]]:
return result

@mcp.tool(name="workflows-runs-list")
async def list_workflow_runs(ctx: MCPContext) -> List[Dict[str, Any]]:
async def list_workflow_runs(
ctx: MCPContext,
limit: int = 100,
page_size: int | None = 100,
next_page_token: str | None = None,
) -> List[Dict[str, Any]] | WorkflowRunsPage:
"""
List all workflow instances (runs) with their detailed status information.

This returns information about actual workflow instances (runs), not workflow types.
For each running workflow, returns its ID, name, current state, and available operations.
This helps in identifying and managing active workflow instances.


Args:
limit: Maximum number of runs to return. Default: 100.
page_size: Page size for paginated backends. Default: 100.
next_page_token: Optional Base64-encoded token for pagination resume. Only provide if you received a next_page_token from a previous call.

Returns:
A dictionary mapping workflow instance IDs to their detailed status information.
A list of workflow run status dictionaries with detailed workflow information.
"""
# Ensure upstream session is set for any logs emitted during this call
try:
Expand All @@ -1066,10 +1078,26 @@ async def list_workflow_runs(ctx: MCPContext) -> List[Dict[str, Any]]:
if server_context is None or not hasattr(server_context, "workflow_registry"):
raise ToolError("Server context not available for MCPApp Server.")

# Get all workflow statuses from the registry
# Decode next_page_token if provided (base64-encoded string -> bytes)
token_bytes = None
if next_page_token:
try:
import base64 as _b64

token_bytes = _b64.b64decode(next_page_token)
except Exception:
token_bytes = None

# Get workflow statuses from the registry with pagination/query hints
workflow_statuses = (
await server_context.workflow_registry.list_workflow_statuses()
await server_context.workflow_registry.list_workflow_statuses(
query=None,
limit=limit,
page_size=page_size,
next_page_token=token_bytes,
)
)

return workflow_statuses
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue

Ensure JSON-serializable return (Pydantic to dict).

FastMCP tools should return JSON-serializable data. If the registry returns a WorkflowRunsPage (Pydantic), convert it to a dict before returning.

-        return workflow_statuses
+        if isinstance(workflow_statuses, WorkflowRunsPage):
+            return workflow_statuses.model_dump(by_alias=True, mode="json", exclude_none=True)
+        return workflow_statuses
📝 Committable suggestion

‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.

Suggested change
return workflow_statuses
if isinstance(workflow_statuses, WorkflowRunsPage):
return workflow_statuses.model_dump(by_alias=True, mode="json", exclude_none=True)
return workflow_statuses
🤖 Prompt for AI Agents
In src/mcp_agent/server/app_server.py around line 1111, the function currently
returns a Pydantic WorkflowRunsPage object (not JSON-serializable); convert the
Pydantic model to a plain dict before returning (e.g., call the model's .dict()
or .dict(by_alias=True) as appropriate, or serialize nested fields) so the
FastMCP tool returns JSON-serializable data instead of a Pydantic object.


@mcp.tool(name="workflows-run")
Expand Down
2 changes: 1 addition & 1 deletion uv.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

Loading