Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions examples/temporal/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -107,6 +107,7 @@ An example showcasing a workflow that iteratively improves content based on eval

A more complex example that demonstrates how to orchestrate multiple agents:

- Uses the @app.async_tool decorator instead of explicit workflow/run definitions
- Uses a combination of finder, writer, proofreader, fact-checker and style enforcer agents
- Orchestrates these agents to collaboratively complete a task
- Dynamically plans each step of the workflow
Expand Down
160 changes: 80 additions & 80 deletions examples/temporal/orchestrator.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,99 +6,99 @@

import asyncio
import os
from typing import Optional

from main import app

from mcp_agent.agents.agent import Agent
from mcp_agent.core.context import Context as AppContext
from mcp_agent.executor.temporal import TemporalExecutor
from mcp_agent.executor.workflow import Workflow, WorkflowResult
from mcp_agent.workflows.llm.augmented_llm import RequestParams
from mcp_agent.workflows.llm.augmented_llm_openai import OpenAIAugmentedLLM
from mcp_agent.workflows.orchestrator.orchestrator import Orchestrator

from main import app
"""
A more complex example that demonstrates how to orchestrate multiple agents.
This example uses the @app.async_tool decorator instead of traditional workflow/run definitions
and will have a workflow created behind the scenes.
"""


@app.workflow
class OrchestratorWorkflow(Workflow[str]):
"""
A simple workflow that demonstrates the basic structure of a Temporal workflow.
@app.async_tool(name="OrchestratorWorkflow")
async def run_orchestrator(input: str, app_ctx: Optional[AppContext]) -> str:
"""
Run the workflow, processing the input data.

@app.workflow_run
async def run(self, input: str) -> WorkflowResult[str]:
"""
Run the workflow, processing the input data.
Args:
input: Task description or instruction text.
app_ctx: Optional application context for the workflow.

Args:
input_data: The data to process

Returns:
A WorkflowResult containing the processed data
"""

context = app.context
context.config.mcp.servers["filesystem"].args.extend([os.getcwd()])

finder_agent = Agent(
name="finder",
instruction="""You are an agent with access to the filesystem,
as well as the ability to fetch URLs. Your job is to identify
the closest match to a user's request, make the appropriate tool calls,
and return the URI and CONTENTS of the closest match.""",
server_names=["fetch", "filesystem"],
)

writer_agent = Agent(
name="writer",
instruction="""You are an agent that can write to the filesystem.
You are tasked with taking the user's input, addressing it, and
writing the result to disk in the appropriate location.""",
server_names=["filesystem"],
)

proofreader = Agent(
name="proofreader",
instruction=""""Review the short story for grammar, spelling, and punctuation errors.
Identify any awkward phrasing or structural issues that could improve clarity.
Provide detailed feedback on corrections.""",
server_names=["fetch"],
)

fact_checker = Agent(
name="fact_checker",
instruction="""Verify the factual consistency within the story. Identify any contradictions,
logical inconsistencies, or inaccuracies in the plot, character actions, or setting.
Highlight potential issues with reasoning or coherence.""",
server_names=["fetch"],
)

style_enforcer = Agent(
name="style_enforcer",
instruction="""Analyze the story for adherence to style guidelines.
Evaluate the narrative flow, clarity of expression, and tone. Suggest improvements to
enhance storytelling, readability, and engagement.""",
server_names=["fetch"],
)

orchestrator = Orchestrator(
llm_factory=OpenAIAugmentedLLM,
available_agents=[
finder_agent,
writer_agent,
proofreader,
fact_checker,
style_enforcer,
],
# We will let the orchestrator iteratively plan the task at every step
plan_type="full",
context=app.context,
)

result = await orchestrator.generate_str(
message=input,
request_params=RequestParams(model="gpt-4o", max_iterations=100),
)
Returns:
A WorkflowResult containing the processed data
"""

return WorkflowResult(value=result)
context = app_ctx or app.context
context.config.mcp.servers["filesystem"].args.extend([os.getcwd()])

finder_agent = Agent(
name="finder",
instruction="""You are an agent with access to the filesystem,
as well as the ability to fetch URLs. Your job is to identify
the closest match to a user's request, make the appropriate tool calls,
and return the URI and CONTENTS of the closest match.""",
server_names=["fetch", "filesystem"],
)

writer_agent = Agent(
name="writer",
instruction="""You are an agent that can write to the filesystem.
You are tasked with taking the user's input, addressing it, and
writing the result to disk in the appropriate location.""",
server_names=["filesystem"],
)

proofreader = Agent(
name="proofreader",
instruction="""Review the short story for grammar, spelling, and punctuation errors.
Identify any awkward phrasing or structural issues that could improve clarity.
Provide detailed feedback on corrections.""",
server_names=["fetch"],
)

fact_checker = Agent(
name="fact_checker",
instruction="""Verify the factual consistency within the story. Identify any contradictions,
logical inconsistencies, or inaccuracies in the plot, character actions, or setting.
Highlight potential issues with reasoning or coherence.""",
server_names=["fetch"],
)

style_enforcer = Agent(
name="style_enforcer",
instruction="""Analyze the story for adherence to style guidelines.
Evaluate the narrative flow, clarity of expression, and tone. Suggest improvements to
enhance storytelling, readability, and engagement.""",
server_names=["fetch"],
)

orchestrator = Orchestrator(
llm_factory=OpenAIAugmentedLLM,
available_agents=[
finder_agent,
writer_agent,
proofreader,
fact_checker,
style_enforcer,
],
# We will let the orchestrator iteratively plan the task at every step
plan_type="full",
context=context,
)

return await orchestrator.generate_str(
message=input,
request_params=RequestParams(model="gpt-4o", max_iterations=100),
)


async def main():
Expand Down
6 changes: 4 additions & 2 deletions examples/temporal/requirements.txt
Original file line number Diff line number Diff line change
@@ -1,5 +1,7 @@
# Core framework dependency
mcp-agent @ file://../../ # Link to the local mcp-agent project root
mcp-agent @ file://../../ # Link to the local mcp-agent project root. Remove @ file://../../ for cloud deployment

# Additional dependencies specific to this example
temporalio
anthropic
openai
temporalio
7 changes: 2 additions & 5 deletions examples/temporal/router.py
Original file line number Diff line number Diff line change
Expand Up @@ -40,10 +40,7 @@ class RouterWorkflow(Workflow[str]):
@app.workflow_run
async def run(self) -> WorkflowResult[str]:
"""
Run the workflow, processing the input data.

Args:
input_data: The data to process
Run the workflow, routing to the correct agents.

Returns:
A WorkflowResult containing the processed data
Expand Down Expand Up @@ -81,7 +78,7 @@ async def run(self) -> WorkflowResult[str]:
# You can use any LLM with an LLMRouter
llm = OpenAIAugmentedLLM(name="openai_router", instruction="You are a router")
router = LLMRouter(
llm_factory=lambda agent: llm,
llm_factory=lambda _agent: llm,
agents=[finder_agent, writer_agent, reasoning_agent],
functions=[print_to_console, print_hello_world],
context=app.context,
Expand Down
7 changes: 1 addition & 6 deletions examples/temporal/run_worker.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,13 +9,8 @@
import asyncio
import logging

import workflows # noqa: F401
from main import app
from basic import SimpleWorkflow # noqa: F401
from evaluator_optimizer import EvaluatorOptimizerWorkflow # noqa: F401
from orchestrator import OrchestratorWorkflow # noqa: F401
from parallel import ParallelWorkflow # noqa: F401
from router import RouterWorkflow # noqa: F401
from interactive import WorkflowWithInteraction # noqa: F401

from mcp_agent.executor.temporal import create_temporal_worker_for_app

Expand Down
6 changes: 6 additions & 0 deletions examples/temporal/workflows.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
from basic import SimpleWorkflow # noqa: F401
from evaluator_optimizer import EvaluatorOptimizerWorkflow # noqa: F401
from orchestrator import run_orchestrator # noqa: F401
from parallel import ParallelWorkflow # noqa: F401
from router import RouterWorkflow # noqa: F401
from interactive import WorkflowWithInteraction # noqa: F401
4 changes: 2 additions & 2 deletions src/mcp_agent/executor/temporal/interactive_workflow.py
Original file line number Diff line number Diff line change
Expand Up @@ -38,8 +38,8 @@ async def run(self, input: str) -> WorkflowResult[str]:
# etc.
"""

def __init__(self) -> None:
super().__init__()
def __init__(self, *args, **kwargs) -> None:
super().__init__(*args, **kwargs)
self._lock = asyncio.Lock()
self._request: HumanInputRequest = None
self._response: str = None
Expand Down
2 changes: 1 addition & 1 deletion src/mcp_agent/server/app_server.py
Original file line number Diff line number Diff line change
Expand Up @@ -772,7 +772,7 @@ async def resume_workflow(
run_id: str | None = None,
workflow_id: str | None = None,
signal_name: str | None = "resume",
payload: str | None = None,
payload: Dict[str, Any] | None = None,
) -> bool:
Comment on lines +775 to 776
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

💡 Verification agent

🧩 Analysis chain

Broadened payload type is fine; validate input and redact logs.

Two guardrails:

  • Enforce payload to be a JSON object for consistency with signal handlers.
  • Avoid logging full payloads (may contain secrets); log keys or a truncated JSON string.
@@
     async def resume_workflow(
         ctx: MCPContext,
         run_id: str | None = None,
         workflow_id: str | None = None,
         signal_name: str | None = "resume",
-        payload: Dict[str, Any] | None = None,
+        payload: Dict[str, Any] | None = None,
     ) -> bool:
@@
-        logger.info(
-            f"Resuming workflow ID {workflow_id or 'unknown'}, run ID {run_id or 'unknown'} with signal '{signal_name}' and payload '{payload}'"
-        )
+        # Validate input payload
+        if payload is not None and not isinstance(payload, dict):
+            raise ToolError("payload must be a JSON object (dict).")
+
+        # Redact payload in logs (structure only, truncate to avoid giant logs)
+        try:
+            payload_summary = (
+                {"keys": sorted(list(payload.keys()))} if isinstance(payload, dict) else str(type(payload))
+            )
+        except Exception:
+            payload_summary = "uninspectable-payload"
+        logger.info(
+            f"Resuming workflow ID {workflow_id or 'unknown'}, run ID {run_id or 'unknown'} with signal '{signal_name}' and payload_summary '{payload_summary}'"
+        )

Run to find other callers assuming string payloads:


🏁 Script executed:

#!/bin/bash
rg -nP -C2 '\bresume_workflow\s*\('
rg -nP -C2 '\bworkflows-resume\b|\"workflows-resume\"'

Length of output: 12268


🏁 Script executed:

#!/bin/bash
# Inspect CLI command signature and payload type declaration
rg -nP 'def resume_workflow' -C5 --type=py src/mcp_agent/cli/cloud/commands/workflows/resume/main.py
rg -nP 'payload:' -C3 --type=py src/mcp_agent/cli/cloud/commands/workflows/resume/main.py

Length of output: 2354


Normalize and validate payload, redact logs
In src/mcp_agent/server/app_server.py → resume_workflow, accept both str and dict for payload: if it’s a str, do json.loads; if the result isn’t a dict, raise ToolError("payload must be a JSON object"). Then replace the existing logger.info call to log only a redacted payload_summary (e.g. list of keys or truncated JSON), never the full payload.

🤖 Prompt for AI Agents
In src/mcp_agent/server/app_server.py around lines 775 to 776, modify
resume_workflow so payload may be either a str or dict: if payload is a str,
parse it with json.loads (catching JSON errors to raise ToolError("payload must
be a JSON object")); after parsing ensure the result is a dict and if not raise
ToolError("payload must be a JSON object"). Replace the existing logger.info
that prints the full payload with a log of a redacted payload_summary only (for
example, list(payload.keys()) or a truncated/serialized snippet) so the full
payload is never logged.

"""
Resume a paused workflow.
Expand Down
Loading