Skip to content
Merged
Show file tree
Hide file tree
Changes from 10 commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions examples/temporal/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -107,6 +107,7 @@ An example showcasing a workflow that iteratively improves content based on eval

A more complex example that demonstrates how to orchestrate multiple agents:

- Uses the @app.async_tool decorator instead of explicit workflow/run definitions
- Uses a combination of finder, writer, proofreader, fact-checker and style enforcer agents
- Orchestrates these agents to collaboratively complete a task
- Dynamically plans each step of the workflow
Expand Down
159 changes: 79 additions & 80 deletions examples/temporal/orchestrator.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,99 +6,98 @@

import asyncio
import os
from typing import Optional

from main import app

from mcp_agent.agents.agent import Agent
from mcp_agent.core.context import Context as AppContext
from mcp_agent.executor.temporal import TemporalExecutor
from mcp_agent.executor.workflow import Workflow, WorkflowResult
from mcp_agent.workflows.llm.augmented_llm import RequestParams
from mcp_agent.workflows.llm.augmented_llm_openai import OpenAIAugmentedLLM
from mcp_agent.workflows.orchestrator.orchestrator import Orchestrator

from main import app
"""
A more complex example that demonstrates how to orchestrate multiple agents.
This example uses the @app.async_tool decorator instead of traditional workflow/run definitions
and will have a workflow created behind the scenes.
"""


@app.workflow
class OrchestratorWorkflow(Workflow[str]):
"""
A simple workflow that demonstrates the basic structure of a Temporal workflow.
@app.async_tool(name="OrchestratorWorkflow")
async def run_orchestrator(input: str, app_ctx: Optional[AppContext]) -> str:
"""
Run the workflow, processing the input data.
@app.workflow_run
async def run(self, input: str) -> WorkflowResult[str]:
"""
Run the workflow, processing the input data.
Args:
input_data: The data to process
Args:
input_data: The data to process
Returns:
A WorkflowResult containing the processed data
"""

context = app.context
context.config.mcp.servers["filesystem"].args.extend([os.getcwd()])

finder_agent = Agent(
name="finder",
instruction="""You are an agent with access to the filesystem,
as well as the ability to fetch URLs. Your job is to identify
the closest match to a user's request, make the appropriate tool calls,
and return the URI and CONTENTS of the closest match.""",
server_names=["fetch", "filesystem"],
)

writer_agent = Agent(
name="writer",
instruction="""You are an agent that can write to the filesystem.
You are tasked with taking the user's input, addressing it, and
writing the result to disk in the appropriate location.""",
server_names=["filesystem"],
)

proofreader = Agent(
name="proofreader",
instruction=""""Review the short story for grammar, spelling, and punctuation errors.
Identify any awkward phrasing or structural issues that could improve clarity.
Provide detailed feedback on corrections.""",
server_names=["fetch"],
)

fact_checker = Agent(
name="fact_checker",
instruction="""Verify the factual consistency within the story. Identify any contradictions,
logical inconsistencies, or inaccuracies in the plot, character actions, or setting.
Highlight potential issues with reasoning or coherence.""",
server_names=["fetch"],
)

style_enforcer = Agent(
name="style_enforcer",
instruction="""Analyze the story for adherence to style guidelines.
Evaluate the narrative flow, clarity of expression, and tone. Suggest improvements to
enhance storytelling, readability, and engagement.""",
server_names=["fetch"],
)

orchestrator = Orchestrator(
llm_factory=OpenAIAugmentedLLM,
available_agents=[
finder_agent,
writer_agent,
proofreader,
fact_checker,
style_enforcer,
],
# We will let the orchestrator iteratively plan the task at every step
plan_type="full",
context=app.context,
)

result = await orchestrator.generate_str(
message=input,
request_params=RequestParams(model="gpt-4o", max_iterations=100),
)
Returns:
A WorkflowResult containing the processed data
"""

return WorkflowResult(value=result)
context = app_ctx or app.context
context.config.mcp.servers["filesystem"].args.extend([os.getcwd()])

finder_agent = Agent(
name="finder",
instruction="""You are an agent with access to the filesystem,
as well as the ability to fetch URLs. Your job is to identify
the closest match to a user's request, make the appropriate tool calls,
and return the URI and CONTENTS of the closest match.""",
server_names=["fetch", "filesystem"],
)

writer_agent = Agent(
name="writer",
instruction="""You are an agent that can write to the filesystem.
You are tasked with taking the user's input, addressing it, and
writing the result to disk in the appropriate location.""",
server_names=["filesystem"],
)

proofreader = Agent(
name="proofreader",
instruction=""""Review the short story for grammar, spelling, and punctuation errors.
Identify any awkward phrasing or structural issues that could improve clarity.
Provide detailed feedback on corrections.""",
server_names=["fetch"],
)

fact_checker = Agent(
name="fact_checker",
instruction="""Verify the factual consistency within the story. Identify any contradictions,
logical inconsistencies, or inaccuracies in the plot, character actions, or setting.
Highlight potential issues with reasoning or coherence.""",
server_names=["fetch"],
)

style_enforcer = Agent(
name="style_enforcer",
instruction="""Analyze the story for adherence to style guidelines.
Evaluate the narrative flow, clarity of expression, and tone. Suggest improvements to
enhance storytelling, readability, and engagement.""",
server_names=["fetch"],
)

orchestrator = Orchestrator(
llm_factory=OpenAIAugmentedLLM,
available_agents=[
finder_agent,
writer_agent,
proofreader,
fact_checker,
style_enforcer,
],
# We will let the orchestrator iteratively plan the task at every step
plan_type="full",
context=app.context,
)

return await orchestrator.generate_str(
message=input,
request_params=RequestParams(model="gpt-4o", max_iterations=100),
)


async def main():
Expand Down
6 changes: 4 additions & 2 deletions examples/temporal/requirements.txt
Original file line number Diff line number Diff line change
@@ -1,5 +1,7 @@
# Core framework dependency
mcp-agent @ file://../../ # Link to the local mcp-agent project root
mcp-agent @ file://../../ # Link to the local mcp-agent project root. Remove @ file://../../ for cloud deployment

# Additional dependencies specific to this example
temporalio
anthropic
openai
temporalio
10 changes: 3 additions & 7 deletions examples/temporal/router.py
Original file line number Diff line number Diff line change
Expand Up @@ -38,12 +38,9 @@ class RouterWorkflow(Workflow[str]):
"""

@app.workflow_run
async def run(self, input: str) -> WorkflowResult[str]:
async def run(self) -> WorkflowResult[str]:
"""
Run the workflow, processing the input data.

Args:
input_data: The data to process
Run the workflow, routing to the correct agents.

Returns:
A WorkflowResult containing the processed data
Expand Down Expand Up @@ -81,7 +78,7 @@ async def run(self, input: str) -> WorkflowResult[str]:
# You can use any LLM with an LLMRouter
llm = OpenAIAugmentedLLM(name="openai_router", instruction="You are a router")
router = LLMRouter(
llm=llm,
llm_factory=lambda _agent: llm,
agents=[finder_agent, writer_agent, reasoning_agent],
functions=[print_to_console, print_hello_world],
context=app.context,
Expand Down Expand Up @@ -150,7 +147,6 @@ async def main():

handle = await executor.start_workflow(
"RouterWorkflow",
None,
)
a = await handle.result()
print(a)
Expand Down
7 changes: 1 addition & 6 deletions examples/temporal/run_worker.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,13 +9,8 @@
import asyncio
import logging

import workflows # noqa: F401
from main import app
from basic import SimpleWorkflow # noqa: F401
from evaluator_optimizer import EvaluatorOptimizerWorkflow # noqa: F401
from orchestrator import OrchestratorWorkflow # noqa: F401
from parallel import ParallelWorkflow # noqa: F401
from router import RouterWorkflow # noqa: F401
from interactive import WorkflowWithInteraction # noqa: F401

from mcp_agent.executor.temporal import create_temporal_worker_for_app

Expand Down
6 changes: 6 additions & 0 deletions examples/temporal/workflows.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
from basic import SimpleWorkflow # noqa: F401
from evaluator_optimizer import EvaluatorOptimizerWorkflow # noqa: F401
from orchestrator import run_orchestrator # noqa: F401
from parallel import ParallelWorkflow # noqa: F401
from router import RouterWorkflow # noqa: F401
from interactive import WorkflowWithInteraction # noqa: F401
Loading