-
Notifications
You must be signed in to change notification settings - Fork 768
Query temporal directly for workflow runs #471
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Changes from 5 commits
dc2abb4
084cb16
a09f09b
32dd041
445210c
6d13d24
4f06ad5
b0facff
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change | ||||||||||||||||||||||||
---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|
@@ -1,5 +1,6 @@ | ||||||||||||||||||||||||||
import asyncio | ||||||||||||||||||||||||||
|
||||||||||||||||||||||||||
import base64 | ||||||||||||||||||||||||||
from datetime import datetime, timedelta | ||||||||||||||||||||||||||
from typing import ( | ||||||||||||||||||||||||||
Any, | ||||||||||||||||||||||||||
Dict, | ||||||||||||||||||||||||||
|
@@ -9,7 +10,7 @@ | |||||||||||||||||||||||||
) | ||||||||||||||||||||||||||
|
||||||||||||||||||||||||||
from mcp_agent.logging.logger import get_logger | ||||||||||||||||||||||||||
from mcp_agent.executor.workflow_registry import WorkflowRegistry | ||||||||||||||||||||||||||
from mcp_agent.executor.workflow_registry import WorkflowRegistry, WorkflowRunsPage | ||||||||||||||||||||||||||
|
||||||||||||||||||||||||||
if TYPE_CHECKING: | ||||||||||||||||||||||||||
from mcp_agent.executor.temporal import TemporalExecutor | ||||||||||||||||||||||||||
|
@@ -216,23 +217,178 @@ async def get_workflow_status( | |||||||||||||||||||||||||
|
||||||||||||||||||||||||||
return status_dict | ||||||||||||||||||||||||||
|
||||||||||||||||||||||||||
async def list_workflow_statuses(self) -> List[Dict[str, Any]]: | ||||||||||||||||||||||||||
result = [] | ||||||||||||||||||||||||||
for run_id, workflow in self._local_workflows.items(): | ||||||||||||||||||||||||||
# Get the workflow status directly to have consistent behavior | ||||||||||||||||||||||||||
status = await workflow.get_status() | ||||||||||||||||||||||||||
workflow_id = workflow.id or workflow.name | ||||||||||||||||||||||||||
async def list_workflow_statuses( | ||||||||||||||||||||||||||
self, | ||||||||||||||||||||||||||
*, | ||||||||||||||||||||||||||
query: str | None = None, | ||||||||||||||||||||||||||
limit: int | None = None, | ||||||||||||||||||||||||||
page_size: int | None = None, | ||||||||||||||||||||||||||
next_page_token: bytes | None = None, | ||||||||||||||||||||||||||
rpc_metadata: Dict[str, str] | None = None, | ||||||||||||||||||||||||||
rpc_timeout: timedelta | None = None, | ||||||||||||||||||||||||||
) -> List[Dict[str, Any]] | WorkflowRunsPage: | ||||||||||||||||||||||||||
""" | ||||||||||||||||||||||||||
List workflow runs by querying Temporal visibility (preferred). | ||||||||||||||||||||||||||
|
||||||||||||||||||||||||||
# Query Temporal for the status | ||||||||||||||||||||||||||
temporal_status = await self._get_temporal_workflow_status( | ||||||||||||||||||||||||||
workflow_id=workflow_id, run_id=run_id | ||||||||||||||||||||||||||
) | ||||||||||||||||||||||||||
- When Temporal listing succeeds, only runs returned by Temporal are included; local | ||||||||||||||||||||||||||
cache is used to enrich entries where possible. | ||||||||||||||||||||||||||
- On failure or when listing is unsupported, fall back to locally tracked runs. | ||||||||||||||||||||||||||
|
||||||||||||||||||||||||||
status["temporal"] = temporal_status | ||||||||||||||||||||||||||
Args: | ||||||||||||||||||||||||||
query: Optional Temporal visibility list filter; defaults to newest first when unset. | ||||||||||||||||||||||||||
limit: Maximum number of runs to return; enforced locally if backend doesn't apply it. | ||||||||||||||||||||||||||
page_size: Page size to request from Temporal, if supported by SDK version. | ||||||||||||||||||||||||||
next_page_token: Opaque pagination token from prior call, if supported by SDK version. | ||||||||||||||||||||||||||
rpc_metadata: Optional per-RPC headers for Temporal (not exposed via server tool). | ||||||||||||||||||||||||||
rpc_timeout: Optional per-RPC timeout (not exposed via server tool). | ||||||||||||||||||||||||||
|
||||||||||||||||||||||||||
result.append(status) | ||||||||||||||||||||||||||
Returns: | ||||||||||||||||||||||||||
A list of dictionaries with workflow information, or a WorkflowRunsPage object. | ||||||||||||||||||||||||||
""" | ||||||||||||||||||||||||||
results: List[Dict[str, Any]] = [] | ||||||||||||||||||||||||||
|
||||||||||||||||||||||||||
# Collect all executions for this task queue (best effort) | ||||||||||||||||||||||||||
try: | ||||||||||||||||||||||||||
await self._executor.ensure_client() | ||||||||||||||||||||||||||
client = self._executor.client | ||||||||||||||||||||||||||
|
||||||||||||||||||||||||||
# TODO(saqadri): Multi-user auth scoping | ||||||||||||||||||||||||||
# When supporting multiple users on one server, auth scoping should be enforced | ||||||||||||||||||||||||||
# by the proxy layer using RPC metadata (e.g., API key). This client code should | ||||||||||||||||||||||||||
# simply pass through rpc_metadata and let the backend filter results and manage | ||||||||||||||||||||||||||
# pagination accordingly. | ||||||||||||||||||||||||||
iterator = client.list_workflows( | ||||||||||||||||||||||||||
query=query, | ||||||||||||||||||||||||||
limit=limit, | ||||||||||||||||||||||||||
page_size=page_size or 1000, | ||||||||||||||||||||||||||
next_page_token=next_page_token, | ||||||||||||||||||||||||||
rpc_metadata=rpc_metadata or {}, | ||||||||||||||||||||||||||
rpc_timeout=rpc_timeout, | ||||||||||||||||||||||||||
) | ||||||||||||||||||||||||||
|
||||||||||||||||||||||||||
return result | ||||||||||||||||||||||||||
# Build quick lookup from local cache by (workflow_id, run_id) | ||||||||||||||||||||||||||
in_memory_workflows: Dict[tuple[str, str], "Workflow"] = {} | ||||||||||||||||||||||||||
for run_id, wf in self._local_workflows.items(): | ||||||||||||||||||||||||||
workflow_id = wf.id or wf.name | ||||||||||||||||||||||||||
if workflow_id and run_id: | ||||||||||||||||||||||||||
in_memory_workflows[(workflow_id, run_id)] = wf | ||||||||||||||||||||||||||
|
||||||||||||||||||||||||||
count = 0 | ||||||||||||||||||||||||||
max_count = limit if isinstance(limit, int) and limit > 0 else None | ||||||||||||||||||||||||||
|
||||||||||||||||||||||||||
async for workflow_info in iterator: | ||||||||||||||||||||||||||
# Extract workflow_id and run_id robustly from various shapes | ||||||||||||||||||||||||||
workflow_id = workflow_info.id | ||||||||||||||||||||||||||
run_id = workflow_info.run_id | ||||||||||||||||||||||||||
|
||||||||||||||||||||||||||
if not workflow_id or not run_id: | ||||||||||||||||||||||||||
# Can't build a handle without both IDs | ||||||||||||||||||||||||||
continue | ||||||||||||||||||||||||||
|
||||||||||||||||||||||||||
# If we have a local workflow, start with its detailed status | ||||||||||||||||||||||||||
wf = in_memory_workflows.get((workflow_id, run_id)) | ||||||||||||||||||||||||||
if wf is not None: | ||||||||||||||||||||||||||
status_dict = await wf.get_status() | ||||||||||||||||||||||||||
else: | ||||||||||||||||||||||||||
# Create a minimal status when not tracked locally | ||||||||||||||||||||||||||
status_dict = { | ||||||||||||||||||||||||||
"id": run_id, | ||||||||||||||||||||||||||
"workflow_id": workflow_id, | ||||||||||||||||||||||||||
"run_id": run_id, | ||||||||||||||||||||||||||
"name": workflow_info.workflow_type or workflow_id, | ||||||||||||||||||||||||||
"status": "unknown", | ||||||||||||||||||||||||||
"running": False, | ||||||||||||||||||||||||||
"state": {"status": "unknown", "metadata": {}, "error": None}, | ||||||||||||||||||||||||||
} | ||||||||||||||||||||||||||
|
||||||||||||||||||||||||||
temporal_status: Dict[str, Any] = {} | ||||||||||||||||||||||||||
try: | ||||||||||||||||||||||||||
status: str | None = None | ||||||||||||||||||||||||||
if workflow_info.status: | ||||||||||||||||||||||||||
status = ( | ||||||||||||||||||||||||||
workflow_info.status.name | ||||||||||||||||||||||||||
if workflow_info.status.name | ||||||||||||||||||||||||||
else str(workflow_info.status) | ||||||||||||||||||||||||||
) | ||||||||||||||||||||||||||
|
||||||||||||||||||||||||||
start_time = workflow_info.start_time | ||||||||||||||||||||||||||
close_time = workflow_info.close_time | ||||||||||||||||||||||||||
execution_time = workflow_info.execution_time | ||||||||||||||||||||||||||
|
||||||||||||||||||||||||||
def _to_timestamp(dt: datetime): | ||||||||||||||||||||||||||
try: | ||||||||||||||||||||||||||
return dt.timestamp() if dt is not None else None | ||||||||||||||||||||||||||
except Exception: | ||||||||||||||||||||||||||
return dt | ||||||||||||||||||||||||||
|
def _to_timestamp(dt: datetime): | |
try: | |
return dt.timestamp() if dt is not None else None | |
except Exception: | |
return dt | |
def _to_timestamp(dt: datetime): | |
if dt is None: | |
return None | |
try: | |
return dt.timestamp() | |
except Exception: | |
return None |
Spotted by Diamond
Is this helpful? React 👍 or 👎 to let us know.
Outdated
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Potential encoding/decoding mismatch: The code base64-encodes iterator.next_page_token
assuming it's bytes, but later in the server code (lines 1095-1097) it's decoded back to bytes. If iterator.next_page_token
is already a string, this will cause a TypeError on b64encode()
. The code should verify the type of next_page_token
before encoding or handle both string and bytes cases.
next_page_token=base64.b64encode(iterator.next_page_token).decode( | |
"ascii" | |
), | |
next_page_token=base64.b64encode( | |
iterator.next_page_token.encode('utf-8') if isinstance(iterator.next_page_token, str) else iterator.next_page_token | |
).decode("ascii"), |
Spotted by Diamond
Is this helpful? React 👍 or 👎 to let us know.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Just noting for later -- when we push support for multiple users using a server, we'll need to add the auth checks in the proxy layer for this request, which will have implications for results returned (e.g. we'll only return those that the user has permission to see)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Good point, let me mark that as a TODO comment in the code as well. I think this is reasonable for the time being but agreed we'll want this to be user-scoped in the future.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Fwiw, this client code shouldn't really need to know/care about it, as long as our rpc metadata passes through the api key. Our backend proxy layer will handle the auth checks, but will need to handle pagination and such with some in-memory processing