diff --git a/examples/temporal_plugin/.gitignore b/examples/temporal_plugin/.gitignore new file mode 100644 index 000000000..807a8d6f8 --- /dev/null +++ b/examples/temporal_plugin/.gitignore @@ -0,0 +1,93 @@ +# MCP-Agent +mcp_agent.secrets.yaml +*.secrets.yaml +.mcp-agent/ + +# Python +__pycache__/ +*.py[cod] +*$py.class +*.so +.Python +build/ +develop-eggs/ +dist/ +downloads/ +eggs/ +.eggs/ +lib/ +lib64/ +parts/ +sdist/ +var/ +wheels/ +share/python-wheels/ +*.egg-info/ +.installed.cfg +*.egg +MANIFEST +pip-log.txt +pip-delete-this-directory.txt + +# Virtual Environment +.env +.venv +env/ +venv/ +ENV/ +env.bak/ +venv.bak/ + +# PyCharm +.idea/ + +# VS Code +.vscode/ +*.code-workspace + +# Vim +[._]*.s[a-v][a-z] +[._]*.sw[a-p] +[._]s[a-rt-v][a-z] +[._]ss[a-gi-z] +[._]sw[a-p] +*~ + +# Logs +logs/ +*.log +*.jsonl + +# OS +.DS_Store +.DS_Store? +._* +.Spotlight-V100 +.Trashes +ehthumbs.db +Thumbs.db + +# Testing +.pytest_cache/ +.coverage +htmlcov/ +.tox/ +.hypothesis/ + +# Jupyter Notebook +.ipynb_checkpoints + +# pyenv +.python-version + +# mypy +.mypy_cache/ +.dmypy.json +dmypy.json + +# Pyre type checker +.pyre/ + +# Local environment variables +.env.local +.env.*.local \ No newline at end of file diff --git a/examples/temporal_plugin/README.md b/examples/temporal_plugin/README.md new file mode 100644 index 000000000..519b9ad1e --- /dev/null +++ b/examples/temporal_plugin/README.md @@ -0,0 +1,217 @@ +# MCP-Agent with Temporal Plugin + +This example demonstrates multiple ways to use the Temporal plugin with MCP-Agent for workflow orchestration. + +## Prerequisites + +1. **Temporal Server**: Ensure you have a Temporal server running locally: + ```bash + temporal server start-dev + ``` + This starts a development server at `localhost:7233` + +2. **API Keys**: Add your API keys to `mcp_agent.secrets.yaml`: + ```yaml + OPENAI_API_KEY: "your-key-here" + ANTHROPIC_API_KEY: "your-key-here" # optional + ``` + +3. **Configuration**: Set the execution engine to `temporal` in `mcp_agent.config.yaml`: + ```yaml + execution_engine: temporal + + temporal: + host: "localhost:7233" + namespace: "default" + task_queue: "mcp-agent" + ``` + +## Usage Methods + +### Method 1: MCP Server with Temporal Workflows + +This approach exposes Temporal workflows as MCP tools that can be called by Claude Desktop or other MCP clients. + +**Step 1: Define your workflow** (`basic_workflow.py`): +```python +from temporalio import workflow +from mcp_agent.agents.agent import Agent +from mcp_agent.workflows.llm.augmented_llm_openai import OpenAIAugmentedLLM + +@workflow.defn +class BasicWorkflow: + @workflow.run + async def run(self, prompt: str) -> str: + simple_agent = Agent( + name="finder", + instruction="You are a helpful agent", + server_names=["fetch"], + ) + + async with simple_agent: + llm = await simple_agent.attach_llm(OpenAIAugmentedLLM) + result = await llm.generate_str(prompt) + return result +``` + +**Step 2: Start the MCP server with worker** (`basic_agent_server.py`): +```bash +uv run basic_agent_server.py +``` +This starts both a Temporal worker and an MCP server that exposes your workflows as tools. + +**Step 3: Test with MCP Inspector**: + +```bash +# UI mode - Opens a interactive visual interface +npx @modelcontextprotocol/inspector + +# or CLI mode - Connect to a remote MCP server (with SSE transport) +npx @modelcontextprotocol/inspector --cli http://127.0.0.1:8000/sse --transport sse --method tools/list +``` + +In inspector, you can: +- Call `workflows-list` to see available workflows +- Call `workflows-BasicWorkflow-run` with a prompt parameter to execute the workflow +- Monitor workflow execution in the Temporal UI at http://localhost:8233 + +### Method 2: Separate Worker and Workflow Files + +This approach separates the worker and workflow execution into different processes, useful for distributed systems. + +**Step 1: Define your workflow** (already shown above) + +**Step 2: Run the worker** (`run_worker.py`): +```bash +uv run run_worker.py +``` + +**Step 3: Execute the workflow** (in another terminal): +```bash +uv run run_basic_workflow.py +``` + +### Method 3: Single File Execution (temporal_agent.py) + +This approach combines worker and workflow execution in a single file, ideal for simpler deployments or testing. + +```bash +uv run temporal_agent.py +``` + +This file: +- Defines the workflow +- Starts the worker +- Executes the workflow +- All within the same process using `async with Worker(...)` + +**Key difference**: The single-file approach runs both the worker and client in the same process: +```python +async with Worker( + client, + task_queue=running_app.config.temporal.task_queue, + workflows=[BasicWorkflow], +): + # Execute workflow while worker is running + output = await client.execute_workflow(...) +``` + +## Important Configuration Notes + +### Execution Engine Setting + +The `execution_engine` in `mcp_agent.config.yaml` **MUST** be set to `temporal` for the Temporal plugin to work: + +```yaml +execution_engine: temporal # Required for Temporal plugin +``` + +Without this setting, MCP-Agent will use the default `asyncio` engine and Temporal features won't be available. + +### Temporal Configuration + +Configure Temporal settings in `mcp_agent.config.yaml`: + +```yaml +temporal: + host: "localhost:7233" # Temporal server address + namespace: "default" # Temporal namespace + task_queue: "mcp-agent" # Task queue name + max_concurrent_activities: 10 # Concurrency limit + rpc_metadata: + X-Client-Name: "mcp-agent" # Client identification +``` + +## File Structure + +``` +temporal_plugin/ +├── basic_workflow.py # Workflow definitions +├── basic_agent_server.py # MCP server with integrated worker +├── run_worker.py # Standalone worker process +├── run_basic_workflow.py # Workflow client (direct execution) +├── temporal_agent.py # Single-file approach +├── replay.py # Workflow replay testing for safe deployments +├── main.py # MCP-Agent app setup +├── mcp_agent.config.yaml # Configuration (MUST set execution_engine: temporal) +└── mcp_agent.secrets.yaml # API keys +``` + +## When to Use Each Method + +- **MCP Server with Worker (Method 1)**: Use when you need: + - Integration with Claude Desktop or other MCP clients + - Exposing workflows as callable tools + - Simplified deployment with fewer processes + +- **Separate Files (Method 2)**: Use when you need: + - Distributed workers across multiple machines + - Independent scaling of workers and clients + - Clear separation of concerns + - Production deployments with direct workflow execution + +- **Single File (Method 3)**: Use when you need: + - Quick prototyping and testing + - Simple deployments + - All-in-one execution for demos + - Development and debugging + +## Safe Deployments with Workflow Replay + +The `replay.py` script provides a two-phase deployment strategy for safe Temporal deployments: + +### Phase 1: Verification +Test workflow determinism before deployment: +```bash +uv run replay.py verify +``` +Replays recent workflow histories (last 10 hours by default) to ensure code changes maintain determinism. + +### Phase 2: Deployment +Run the worker in production: +```bash +uv run replay.py run +``` + +### Why Use Replay Testing? +- **Prevents Breaking Changes**: Catches non-deterministic changes before they affect production +- **Ensures Durability**: Maintains Temporal's guarantees for workflow recovery +- **Safe Iteration**: Allows confident updates to workflow logic + +The replay script tests workflows: +- `BasicWorkflow` +- `OrchestratorWorkflow` +- `ParallelAgentWorkflow` + +## Troubleshooting + +1. **Temporal not working**: Ensure `execution_engine: temporal` in config +2. **Connection refused**: Start Temporal server with `temporal server start-dev` +3. **Task queue mismatch**: Verify task queue names match between worker and client +4. **Workflow errors**: Run `uv run replay.py verify` to check for determinism issues + +## Further Resources + +- [Temporal Documentation](https://docs.temporal.io/) +- [MCP-Agent Documentation](https://docs.mcp-agent.com/) +- [MCP-Agent GitHub](https://github.com/lastmile-ai/mcp-agent) diff --git a/examples/temporal_plugin/basic_agent_server.py b/examples/temporal_plugin/basic_agent_server.py new file mode 100644 index 000000000..bbe67cf1c --- /dev/null +++ b/examples/temporal_plugin/basic_agent_server.py @@ -0,0 +1,35 @@ +import asyncio +from temporalio.client import Client +from mcp_agent.executor.temporal.plugin import MCPAgentPlugin +from mcp_agent.app import MCPApp +from temporalio.worker import Worker +from basic_workflow import BasicWorkflow +from mcp_agent.server.app_server import create_mcp_server_for_app + +app = MCPApp(name="mcp_agent_server") + + +async def main(): + async with app.run() as running_app: + plugin = MCPAgentPlugin(running_app) + + client = await Client.connect( + running_app.config.temporal.host, + plugins=[plugin], + ) + + async with Worker( + client, + task_queue=running_app.config.temporal.task_queue, + workflows=[BasicWorkflow], + ): + print("Registered workflows:") + for workflow_id in running_app.workflows: + print(f" - {workflow_id}") + + mcp_server = create_mcp_server_for_app(running_app) + await mcp_server.run_sse_async() + + +if __name__ == "__main__": + asyncio.run(main()) diff --git a/examples/temporal_plugin/basic_workflow.py b/examples/temporal_plugin/basic_workflow.py new file mode 100644 index 000000000..9b42189c0 --- /dev/null +++ b/examples/temporal_plugin/basic_workflow.py @@ -0,0 +1,19 @@ +from temporalio import workflow +from mcp_agent.workflows.llm.augmented_llm_openai import OpenAIAugmentedLLM +from mcp_agent.agents.agent import Agent + + +@workflow.defn +class BasicWorkflow: + @workflow.run + async def run(self, prompt: str) -> str: + simple_agent = Agent( + name="finder", + instruction="You are a helpful agent", + server_names=["fetch"], + ) + + async with simple_agent: + llm = await simple_agent.attach_llm(OpenAIAugmentedLLM) + result = await llm.generate_str(prompt) + return result diff --git a/examples/temporal_plugin/evaluator_optimizer.py b/examples/temporal_plugin/evaluator_optimizer.py new file mode 100644 index 000000000..38f90f880 --- /dev/null +++ b/examples/temporal_plugin/evaluator_optimizer.py @@ -0,0 +1,115 @@ +import asyncio +from uuid import uuid4 +from temporalio import workflow +from mcp_agent.core.context import get_current_context +from mcp_agent.workflows.evaluator_optimizer.evaluator_optimizer import ( + EvaluatorOptimizerLLM, + QualityRating, +) +from mcp_agent.workflows.llm.augmented_llm import RequestParams +from mcp_agent.workflows.llm.augmented_llm_openai import OpenAIAugmentedLLM +from mcp_agent.agents.agent import Agent +from temporalio.client import Client +from mcp_agent.executor.temporal.plugin import MCPAgentPlugin +from mcp_agent.app import MCPApp +from temporalio.worker import Worker + +app = MCPApp(name="mcp_basic_agent") + + +@workflow.defn +class EvaluatorOptimizerWorkflow: + @workflow.run + async def run(self, prompt: str) -> str: + context = get_current_context() + logger = context.app.logger + + logger.info("Current config:", data=context.config.model_dump()) + + optimizer = Agent( + name="optimizer", + instruction="""You are a career coach specializing in cover letter writing. + You are tasked with generating a compelling cover letter given the job posting, + candidate details, and company information. Tailor the response to the company and job requirements. + """, + server_names=["fetch"], + ) + + evaluator = Agent( + name="evaluator", + instruction="""Evaluate the following response based on the criteria below: + 1. Clarity: Is the language clear, concise, and grammatically correct? + 2. Specificity: Does the response include relevant and concrete details tailored to the job description? + 3. Relevance: Does the response align with the prompt and avoid unnecessary information? + 4. Tone and Style: Is the tone professional and appropriate for the context? + 5. Persuasiveness: Does the response effectively highlight the candidate's value? + 6. Grammar and Mechanics: Are there any spelling or grammatical issues? + 7. Feedback Alignment: Has the response addressed feedback from previous iterations? + + For each criterion: + - Provide a rating (EXCELLENT, GOOD, FAIR, or POOR). + - Offer specific feedback or suggestions for improvement. + + Summarize your evaluation as a structured response with: + - Overall quality rating. + - Specific feedback and areas for improvement.""", + ) + + evaluator_optimizer = EvaluatorOptimizerLLM( + optimizer=optimizer, + evaluator=evaluator, + llm_factory=OpenAIAugmentedLLM, + min_rating=QualityRating.EXCELLENT, + context=context, + ) + + result = await evaluator_optimizer.generate_str( + message=prompt, + request_params=RequestParams(model="gpt-4o"), + ) + + return result + + +async def main(): + async with app.run() as running_app: + plugin = MCPAgentPlugin(running_app) + + client = await Client.connect( + running_app.config.temporal.host, + plugins=[plugin], + ) + + async with Worker( + client, + task_queue=running_app.config.temporal.task_queue, + workflows=[EvaluatorOptimizerWorkflow], + ): + job_posting = ( + "Software Engineer at LastMile AI. Responsibilities include developing AI systems, " + "collaborating with cross-functional teams, and enhancing scalability. Skills required: " + "Python, distributed systems, and machine learning." + ) + candidate_details = ( + "Alex Johnson, 3 years in machine learning, contributor to open-source AI projects, " + "proficient in Python and TensorFlow. Motivated by building scalable AI systems to solve real-world problems." + ) + + # This should trigger a 'fetch' call to get the company information + company_information = ( + "Look up from the LastMile AI page: https://lastmileai.dev" + ) + + task = f"Write a cover letter for the following job posting: {job_posting}\n\nCandidate Details: {candidate_details}\n\nCompany information: {company_information}" + + output = await client.execute_workflow( + EvaluatorOptimizerWorkflow.run, + task, + id=f"basic-workflow-{uuid4()}", + task_queue=running_app.config.temporal.task_queue, + ) + print(output) + + +if __name__ == "__main__": + asyncio.run(main()) diff --git a/examples/temporal_plugin/graded_report.md b/examples/temporal_plugin/graded_report.md new file mode 100644 index 000000000..328fa7c14 --- /dev/null +++ b/examples/temporal_plugin/graded_report.md @@ -0,0 +1,36 @@ +## Graded Report: The Battle of Glimmerwood + +### Proofreading Feedback: +1. **Sentence Structure:** + - Correction: "Amidst the chaos, a young girl named Elara stood her ground. She rallied the villagers and devised a clever plan." + - **Feedback:** Correct use of punctuation for independent clauses. + +2. **Clause Clarity:** + - Improved Version: "Elara and the villagers cleverly used the forest's natural defenses to lure the marauders into a trap." + - **Feedback:** Enhances clarity and strategic emphasis. + +3. **Sentence Flow and Punctuation:** + - Correction: "As the bandits approached the village square, a herd of Glimmerfoxes emerged and blinded the marauders with their dazzling light. The villagers then seized the opportunity to capture the invaders." + - **Feedback:** Breaks run-on for clearer sequence. + +Overall, the story is well-written; improvements focus on sentence structure and clarity for better readability. + +### Factual Consistency & Logical Coherence: +1. **Glimmerwood Setting:** Consistent depiction as a mystical forest. +2. **Conflict and Elara’s Strategy:** Plausible yet could explore villagers' preparation. +3. **Capture Aftermath:** Lacks detail on what happens to marauders. +4. **Glimmerstones:** Intriguing but underexplored. +5. **Character Consistency:** Elara remains consistent in her role. + +While internally consistent, expanding underdeveloped areas can enhance depth. + +### APA Style Adherence: +1. **Title/Sections:** Missing APA elements like a title page. +2. **Formatting:** Ensure compliance with APA font/margins. +3. **Narrative Flow:** Clear sequence, can benefit from improved transitions. + +### Suggestions: +- Add a title page following APA. +- Clarify ambiguous plot elements for better coherence. + +In conclusion, the story maintains good narrative quality and internal logic, but formatting and APA compliance need attention to meet academic standards. \ No newline at end of file diff --git a/examples/temporal_plugin/main.py b/examples/temporal_plugin/main.py new file mode 100644 index 000000000..4d80e7d37 --- /dev/null +++ b/examples/temporal_plugin/main.py @@ -0,0 +1,4 @@ +from mcp_agent.app import MCPApp + +# Initialize the app to get context +app = MCPApp(name="mcp_basic_agent") diff --git a/examples/temporal_plugin/mcp_agent.config.yaml b/examples/temporal_plugin/mcp_agent.config.yaml new file mode 100644 index 000000000..cfe4ddb10 --- /dev/null +++ b/examples/temporal_plugin/mcp_agent.config.yaml @@ -0,0 +1,77 @@ +# MCP-Agent Configuration File +# Config definition: https://github.com/lastmile-ai/mcp-agent/blob/main/src/mcp_agent/config.py +$schema: https://raw.githubusercontent.com/lastmile-ai/mcp-agent/refs/heads/main/schema/mcp-agent.config.schema.json + +# Execution engine: asyncio or temporal +# For temporal mode, see: https://github.com/lastmile-ai/mcp-agent/blob/main/examples/temporal/README.md +execution_engine: temporal + +temporal: + host: "localhost:7233" # Default Temporal server address + namespace: "default" # Default Temporal namespace + task_queue: "mcp-agent" # Task queue for workflows and activities + max_concurrent_activities: 10 # Maximum number of concurrent activities + rpc_metadata: + X-Client-Name: "mcp-agent" + +logger: + transports: [console, file] + level: info + path: logs/mcp-agent.log + +# Configure MCP Servers connections (supports stdio, sse, streamable_http, and websockets) +mcp: + servers: + # Filesystem access server + filesystem: + command: npx + args: ["-y", "@modelcontextprotocol/server-filesystem", "."] + + # Web fetch server + fetch: + command: uvx + args: ["mcp-server-fetch"] + #env: # Environment variables passed to the stdio server + # ROOT_PATH: "/workspace" + + # sse_server: + # transport: "sse" + # url: "https://api.example.com/sse" + # headers: + # Authorization: "Bearer ${API_TOKEN}" + + # streamable_http_server: + # transport: streamable_http + # url: "https://api.example.com/mcp" + # headers: + # Authorization: "Bearer ${API_TOKEN}" + # Content-Type: "application/json" + # http_timeout_seconds: 30 + # read_timeout_seconds: 120 + # terminate_on_close: true + +# Optional: Define Agent definitions in config +agents: + definitions: + - name: filesystem_helper + instruction: "You can read files and summarize their contents." + server_names: [filesystem] + - name: web_helper + instruction: "You can fetch web pages and summarize their content." + server_names: [fetch] + +# Model provider defaults (API keys go in mcp_agent.secrets.yaml) +openai: + default_model: gpt-4o-mini + +anthropic: + default_model: claude-sonnet-4-0 +# google: +# default_model: "gemini-1.5-pro" + +# OpenTelemetry configuration (optional) +# otel: +# enabled: true +# exporters: ["file", "otlp"] +# otlp_settings: +# endpoint: "http://localhost:4318/v1/traces" diff --git a/examples/temporal_plugin/orchestrator.py b/examples/temporal_plugin/orchestrator.py new file mode 100644 index 000000000..47857e1b9 --- /dev/null +++ b/examples/temporal_plugin/orchestrator.py @@ -0,0 +1,121 @@ +import asyncio +import os +from uuid import uuid4 +from temporalio import workflow +from mcp_agent.core.context import get_current_context +from mcp_agent.workflows.llm.augmented_llm import RequestParams +from mcp_agent.workflows.llm.augmented_llm_openai import OpenAIAugmentedLLM +from mcp_agent.agents.agent import Agent +from temporalio.client import Client +from mcp_agent.executor.temporal.plugin import MCPAgentPlugin +from mcp_agent.app import MCPApp +from temporalio.worker import Worker +from mcp_agent.workflows.orchestrator.orchestrator import Orchestrator + +app = MCPApp(name="mcp_basic_agent") + + +@workflow.defn +class OrchestratorWorkflow: + @workflow.run + async def run(self, prompt: str) -> str: + context = get_current_context() + + finder_agent = Agent( + name="finder", + instruction="""You are an agent with access to the filesystem, + as well as the ability to fetch URLs. Your job is to identify + the closest match to a user's request, make the appropriate tool calls, + and return the URI and CONTENTS of the closest match.""", + server_names=["fetch", "filesystem"], + ) + + writer_agent = Agent( + name="writer", + instruction="""You are an agent that can write to the filesystem. + You are tasked with taking the user's input, addressing it, and + writing the result to disk in the appropriate location.""", + server_names=["filesystem"], + ) + + proofreader = Agent( + name="proofreader", + instruction="""Review the short story for grammar, spelling, and punctuation errors. + Identify any awkward phrasing or structural issues that could improve clarity. + Provide detailed feedback on corrections.""", + server_names=["fetch"], + ) + + fact_checker = Agent( + name="fact_checker", + instruction="""Verify the factual consistency within the story. Identify any contradictions, + logical inconsistencies, or inaccuracies in the plot, character actions, or setting. + Highlight potential issues with reasoning or coherence.""", + server_names=["fetch"], + ) + + style_enforcer = Agent( + name="style_enforcer", + instruction="""Analyze the story for adherence to style guidelines. + Evaluate the narrative flow, clarity of expression, and tone. Suggest improvements to + enhance storytelling, readability, and engagement.""", + server_names=["fetch"], + ) + + orchestrator = Orchestrator( + llm_factory=OpenAIAugmentedLLM, + available_agents=[ + finder_agent, + writer_agent, + proofreader, + fact_checker, + style_enforcer, + ], + # We will let the orchestrator iteratively plan the task at every step + plan_type="full", + context=context, + ) + + result = await orchestrator.generate_str( + message=prompt, + request_params=RequestParams(model="gpt-4o", max_iterations=100), + ) + + return result + + +async def main(): + async with app.run() as running_app: + plugin = MCPAgentPlugin(running_app) + + client = await Client.connect( + running_app.config.temporal.host, + plugins=[plugin], + ) + + async with Worker( + client, + task_queue=running_app.config.temporal.task_queue, + workflows=[OrchestratorWorkflow], + ): + running_app.context.config.mcp.servers["filesystem"].args.extend( + [os.getcwd()] + ) + + task = """Load the student's short story from short_story.md, + and generate a report with feedback across proofreading, + factuality/logical consistency and style adherence. Use the style rules from + https://owl.purdue.edu/owl/research_and_citation/apa_style/apa_formatting_and_style_guide/general_format.html. + Write the graded report to graded_report.md as soon as you complete your task. Don't take too many steps.""" + + output = await client.execute_workflow( + OrchestratorWorkflow.run, + task, + id=f"basic-workflow-{uuid4()}", + task_queue=running_app.config.temporal.task_queue, + ) + print(output) + + +if __name__ == "__main__": + asyncio.run(main()) diff --git a/examples/temporal_plugin/parallel_agent.py b/examples/temporal_plugin/parallel_agent.py new file mode 100644 index 000000000..7d75eff65 --- /dev/null +++ b/examples/temporal_plugin/parallel_agent.py @@ -0,0 +1,112 @@ +import asyncio +from uuid import uuid4 +from temporalio import workflow +from mcp_agent.core.context import get_current_context +from mcp_agent.workflows.llm.augmented_llm_openai import OpenAIAugmentedLLM +from mcp_agent.agents.agent import Agent +from temporalio.client import Client +from mcp_agent.executor.temporal.plugin import MCPAgentPlugin +from mcp_agent.app import MCPApp +from temporalio.worker import Worker +from mcp_agent.workflows.parallel.parallel_llm import ParallelLLM + +SHORT_STORY = """ +The Battle of Glimmerwood + +In the heart of Glimmerwood, a mystical forest knowed for its radiant trees, a small village thrived. +The villagers, who were live peacefully, shared their home with the forest's magical creatures, +especially the Glimmerfoxes whose fur shimmer like moonlight. + +One fateful evening, the peace was shaterred when the infamous Dark Marauders attack. +Lead by the cunning Captain Thorn, the bandits aim to steal the precious Glimmerstones which was believed to grant immortality. + +Amidst the choas, a young girl named Elara stood her ground, she rallied the villagers and devised a clever plan. +Using the forests natural defenses they lured the marauders into a trap. +As the bandits aproached the village square, a herd of Glimmerfoxes emerged, blinding them with their dazzling light, +the villagers seized the opportunity to captured the invaders. + +Elara's bravery was celebrated and she was hailed as the "Guardian of Glimmerwood". +The Glimmerstones were secured in a hidden grove protected by an ancient spell. + +However, not all was as it seemed. The Glimmerstones true power was never confirm, +and whispers of a hidden agenda linger among the villagers. +""" + + +app = MCPApp(name="mcp_basic_agent") + + +@workflow.defn +class ParallelAgentWorkflow: + @workflow.run + async def run(self, prompt: str) -> str: + context = get_current_context() + + proofreader = Agent( + name="proofreader", + instruction="""Review the short story for grammar, spelling, and punctuation errors. + Identify any awkward phrasing or structural issues that could improve clarity. + Provide detailed feedback on corrections.""", + ) + + fact_checker = Agent( + name="fact_checker", + instruction="""Verify the factual consistency within the story. Identify any contradictions, + logical inconsistencies, or inaccuracies in the plot, character actions, or setting. + Highlight potential issues with reasoning or coherence.""", + ) + + style_enforcer = Agent( + name="style_enforcer", + instruction="""Analyze the story for adherence to style guidelines. + Evaluate the narrative flow, clarity of expression, and tone. Suggest improvements to + enhance storytelling, readability, and engagement.""", + ) + + grader = Agent( + name="grader", + instruction="""Compile the feedback from the Proofreader, Fact Checker, and Style Enforcer + into a structured report. Summarize key issues and categorize them by type. + Provide actionable recommendations for improving the story, + and give an overall grade based on the feedback.""", + ) + + parallel = ParallelLLM( + fan_in_agent=grader, + fan_out_agents=[proofreader, fact_checker, style_enforcer], + llm_factory=OpenAIAugmentedLLM, + context=context, + ) + + result = await parallel.generate_str( + message=f"Student short story submission: {prompt}", + ) + + return result + + +async def main(): + async with app.run() as running_app: + plugin = MCPAgentPlugin(running_app) + + client = await Client.connect( + running_app.config.temporal.host, + plugins=[plugin], + ) + + async with Worker( + client, + task_queue=running_app.config.temporal.task_queue, + workflows=[ParallelAgentWorkflow], + ): + output = await client.execute_workflow( + ParallelAgentWorkflow.run, + args=[SHORT_STORY], + id=f"basic-workflow-{uuid4()}", + task_queue=running_app.config.temporal.task_queue, + ) + print(output) + + +if __name__ == "__main__": + asyncio.run(main()) diff --git a/examples/temporal_plugin/replay.py b/examples/temporal_plugin/replay.py new file mode 100644 index 000000000..21e1a0e45 --- /dev/null +++ b/examples/temporal_plugin/replay.py @@ -0,0 +1,83 @@ +""" +Temporal safe deployment example with workflow replay verification. + +This example demonstrates the recommended pattern for safe Temporal deployments by +implementing a two-phase deployment strategy: + +Phase 1 - Verification: + Run replay tests against recent workflow histories to ensure code changes maintain + determinism. This catches breaking changes before they affect production. + +Phase 2 - Deployment: + Deploy the verified worker code to handle new workflow executions. + +Usage: + uv run replay.py verify # Test workflow determinism before deployment + uv run replay.py run # Run the worker in production + +This pattern ensures that workflow code changes don't break existing executions, +which is critical for Temporal's durability guarantees. By separating verification +from production deployment, you can safely iterate on workflow logic. +""" + +import asyncio +import argparse +from datetime import datetime, timedelta, timezone +from temporalio.client import Client +from temporalio.worker import Replayer, Worker +from basic_workflow import BasicWorkflow +from orchestrator import OrchestratorWorkflow +from parallel_agent import ParallelAgentWorkflow +from router import RouterWorkflow +from evaluator_optimizer import EvaluatorOptimizerWorkflow +from mcp_agent.executor.temporal.plugin import MCPAgentPlugin +from mcp_agent.app import MCPApp + + +async def main(): + parser = argparse.ArgumentParser(prog="MyTemporalWorker") + parser.add_argument("mode", choices=["verify", "run"]) + args = parser.parse_args() + + temporal_url = "localhost:7233" + task_queue = "mcp-agent" + my_workflows = [ + BasicWorkflow, + OrchestratorWorkflow, + ParallelAgentWorkflow, + RouterWorkflow, + EvaluatorOptimizerWorkflow, + ] + my_activities = [] + + app = MCPApp(name="mcp_basic_agent") + async with app.run() as running_app: + plugin = MCPAgentPlugin(running_app) + + client = await Client.connect( + temporal_url, + plugins=[plugin], + ) + + if args.mode == "verify": + start_time = (datetime.now(timezone.utc) - timedelta(hours=10)).isoformat() + workflows = client.list_workflows( + f"TaskQueue='{task_queue}' AND StartTime > '{start_time}' AND ExecutionStatus='Completed'", + limit=100, + ) + histories = workflows.map_histories() + replayer = Replayer(workflows=my_workflows, plugins=[plugin]) + results = await replayer.replay_workflows(histories) + return results + else: + worker = Worker( + client, + task_queue=task_queue, + workflows=my_workflows, + activities=my_activities, + ) + await worker.run() + + +if __name__ == "__main__": + asyncio.run(main()) diff --git a/examples/temporal_plugin/router.py b/examples/temporal_plugin/router.py new file mode 100644 index 000000000..a49bae4f3 --- /dev/null +++ b/examples/temporal_plugin/router.py @@ -0,0 +1,158 @@ +import asyncio +import os +from uuid import uuid4 +from temporalio import workflow +from mcp_agent.core.context import get_current_context +from mcp_agent.workflows.llm.augmented_llm_openai import OpenAIAugmentedLLM +from mcp_agent.agents.agent import Agent +from temporalio.client import Client +from mcp_agent.executor.temporal.plugin import MCPAgentPlugin +from mcp_agent.app import MCPApp +from temporalio.worker import Worker +from mcp_agent.workflows.router.router_llm import LLMRouter +from mcp_agent.workflows.router.router_llm_anthropic import AnthropicLLMRouter + +app = MCPApp(name="mcp_basic_agent") + + +def print_to_console(message: str): + """ + A simple function that prints a message to the console. + """ + print(message) + + +def print_hello_world(): + """ + A simple function that prints "Hello, world!" to the console. + """ + print_to_console("Hello, world!") + + +@workflow.defn() +class RouterWorkflow: + @workflow.run + async def run(self) -> str: + context = get_current_context() + logger = context.app.logger + + finder_agent = Agent( + name="finder", + instruction="""You are an agent with access to the filesystem, + as well as the ability to fetch URLs. Your job is to identify + the closest match to a user's request, make the appropriate tool calls, + and return the URI and CONTENTS of the closest match.""", + server_names=["fetch", "filesystem"], + ) + + writer_agent = Agent( + name="writer", + instruction="""You are an agent that can write to the filesystem. + You are tasked with taking the user's input, addressing it, and + writing the result to disk in the appropriate location.""", + server_names=["filesystem"], + ) + + reasoning_agent = Agent( + name="reasoner", + instruction="""You are a generalist with knowledge about a vast + breadth of subjects. You are tasked with analyzing and reasoning over + the user's query and providing a thoughtful response.""", + server_names=[], + ) + + # You can use any LLM with an LLMRouter + llm = OpenAIAugmentedLLM(name="openai_router", instruction="You are a router") + router = LLMRouter( + llm_factory=lambda _agent: llm, + agents=[finder_agent, writer_agent, reasoning_agent], + functions=[print_to_console, print_hello_world], + context=context, + ) + + # This should route the query to finder agent, and also give an explanation of its decision + results = await router.route_to_agent( + request="Print the contents of mcp_agent.config.yaml verbatim", top_k=1 + ) + + logger.info("Router Results:", data=results) + + # We can use the agent returned by the router + agent = results[0].result + async with agent: + result = await agent.list_tools() + logger.info("Tools available:", data=result.model_dump()) + + with workflow.unsafe.sandbox_unrestricted(): + config_path = str(os.path.join(os.getcwd(), "mcp_agent.config.yaml")) + + result = await agent.call_tool( + name="read_file", + arguments={"path": config_path}, + ) + logger.info("read_file result:", data=result.model_dump()) + + # We can also use a router already configured with a particular LLM + anthropic_router = AnthropicLLMRouter( + server_names=["fetch", "filesystem"], + agents=[finder_agent, writer_agent, reasoning_agent], + functions=[print_to_console, print_hello_world], + context=context, + ) + + # This should route the query to print_to_console function + # Note that even though top_k is 2, it should only return print_to_console and not print_hello_world + results = await anthropic_router.route_to_function( + request="Print the input to console", top_k=2 + ) + logger.info("Router Results:", data=results) + function_to_call = results[0].result + function_to_call("Hello, world!") + + # This should route the query to fetch MCP server (inferring just by the server name alone!) + # You can also specify a server description in mcp_agent.config.yaml to help the router make a more informed decision + results = await anthropic_router.route_to_server( + request="Print the first two paragraphs of https://modelcontextprotocol.io/introduction", + top_k=1, + ) + logger.info("Router Results:", data=results) + + # Using the 'route' function will return the top-k results across all categories the router was initialized with (servers, agents and callables) + # top_k = 3 should likely print: 1. filesystem server, 2. finder agent and possibly 3. print_to_console function + results = await anthropic_router.route( + request="Print the contents of mcp_agent.config.yaml verbatim", + top_k=3, + ) + logger.info("Router Results:", data=results) + + return str(result) + + +async def main(): + async with app.run() as running_app: + plugin = MCPAgentPlugin(running_app) + + client = await Client.connect( + running_app.config.temporal.host, + plugins=[plugin], + ) + + async with Worker( + client, + task_queue=running_app.config.temporal.task_queue, + workflows=[RouterWorkflow], + ): + running_app.context.config.mcp.servers["filesystem"].args.extend( + [os.getcwd()] + ) + + output = await client.execute_workflow( + RouterWorkflow.run, + id=f"basic-workflow-{uuid4()}", + task_queue=running_app.config.temporal.task_queue, + ) + print(output) + + +if __name__ == "__main__": + asyncio.run(main()) diff --git a/examples/temporal_plugin/run_basic_workflow.py b/examples/temporal_plugin/run_basic_workflow.py new file mode 100644 index 000000000..5f2f51373 --- /dev/null +++ b/examples/temporal_plugin/run_basic_workflow.py @@ -0,0 +1,35 @@ +import asyncio +from basic_workflow import BasicWorkflow +from mcp_agent.executor.temporal.plugin import MCPAgentPlugin +from temporalio.client import Client +from uuid import uuid4 +from main import app + + +async def main(): + async with app.run() as running_app: + plugin = MCPAgentPlugin(running_app) + + # Create client connected to server at the given address + client = await Client.connect( + running_app.config.temporal.host, + plugins=[plugin], + ) + + # Execute a workflow + workflow_id = f"basic-workflow-{uuid4()}" + task_queue = running_app.config.temporal.task_queue + print(f"Starting workflow with ID: {workflow_id}") + print(f"Task queue: {task_queue}") + + result = await client.execute_workflow( + BasicWorkflow.run, + "Print the first 2 paragraphs of https://modelcontextprotocol.io/introduction", + id=workflow_id, + task_queue=task_queue, + ) + print(f"Result: {result}") + + +if __name__ == "__main__": + asyncio.run(main()) diff --git a/examples/temporal_plugin/run_worker.py b/examples/temporal_plugin/run_worker.py new file mode 100755 index 000000000..3794e383f --- /dev/null +++ b/examples/temporal_plugin/run_worker.py @@ -0,0 +1,33 @@ +import asyncio +from temporalio.client import Client +from temporalio.worker import Worker +from basic_workflow import BasicWorkflow +from mcp_agent.executor.temporal.plugin import MCPAgentPlugin +from main import app + + +async def main(): + async with app.run() as running_app: + plugin = MCPAgentPlugin(running_app) + + # The plugin will be applied to both client and worker through the client + client = await Client.connect( + running_app.config.temporal.host, + plugins=[plugin], + ) + + worker = Worker( + client, + task_queue=running_app.config.temporal.task_queue, + workflows=[BasicWorkflow], + ) + + print("Running worker with mcp-agent plugin...") + print(f"Task queue: {running_app.config.temporal.task_queue}") + print(f"Namespace: {running_app.config.temporal.namespace}") + + await worker.run() + + +if __name__ == "__main__": + asyncio.run(main()) diff --git a/examples/temporal_plugin/short_story.md b/examples/temporal_plugin/short_story.md new file mode 100644 index 000000000..b4319323f --- /dev/null +++ b/examples/temporal_plugin/short_story.md @@ -0,0 +1,11 @@ +## The Battle of Glimmerwood + +In the heart of Glimmerwood, a mystical forest known for its radiant trees, a small village thrived. The villagers, who lived peacefully, shared their home with the forest's magical creatures, especially the Glimmerfoxes, whose fur shimmered like moonlight. + +One fateful evening, the peace was shattered when the infamous Dark Marauders attacked. Led by the cunning Captain Thorn, the bandits aimed to steal the precious Glimmerstones, which were believed to grant immortality. + +Amidst the chaos, a young girl named Elara stood her ground; she rallied the villagers and devised a clever plan. Using the forest's natural defenses, Elara and the villagers lured the marauders into a trap. As the bandits approached the village square, a herd of Glimmerfoxes emerged, blinding the marauders with their dazzling light, and the villagers seized the opportunity to capture the invaders. + +Elara's bravery was celebrated, and she was hailed as the Guardian of Glimmerwood. The Glimmerstones were secured in a hidden grove protected by an ancient spell. + +However, not everything was as it seemed. The true power of the Glimmerstones was never confirmed, and whispers of a hidden agenda lingered among the villagers. diff --git a/examples/temporal_plugin/temporal_agent.py b/examples/temporal_plugin/temporal_agent.py new file mode 100644 index 000000000..16ec3166e --- /dev/null +++ b/examples/temporal_plugin/temporal_agent.py @@ -0,0 +1,56 @@ +import asyncio +from uuid import uuid4 +from temporalio import workflow +from mcp_agent.workflows.llm.augmented_llm_openai import OpenAIAugmentedLLM +from mcp_agent.agents.agent import Agent +from temporalio.client import Client +from mcp_agent.executor.temporal.plugin import MCPAgentPlugin +from mcp_agent.app import MCPApp +from temporalio.worker import Worker + + +@workflow.defn +class BasicWorkflow: + @workflow.run + async def run(self, prompt: str) -> str: + simple_agent = Agent( + name="finder", + instruction="You are a helpful agent", + server_names=["fetch"], + ) + + async with simple_agent: + llm = await simple_agent.attach_llm(OpenAIAugmentedLLM) + result = await llm.generate_str(prompt) + return result + + +async def main(): + app = MCPApp(name="mcp_basic_agent") + + async with app.run() as running_app: + plugin = MCPAgentPlugin(running_app) + + client = await Client.connect( + running_app.config.temporal.host, + plugins=[plugin], + ) + + async with Worker( + client, + task_queue=running_app.config.temporal.task_queue, + workflows=[BasicWorkflow], + ): + output = await client.execute_workflow( + BasicWorkflow.run, + args=[ + "Print the first 2 paragraphs of https://modelcontextprotocol.io/introduction" + ], + id=f"basic-workflow-{uuid4()}", + task_queue=running_app.config.temporal.task_queue, + ) + print(output) + + +if __name__ == "__main__": + asyncio.run(main()) diff --git a/pyproject.toml b/pyproject.toml index 7be2a2d7b..dc181e642 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -36,7 +36,7 @@ dependencies = [ [project.optional-dependencies] temporal = [ - "temporalio[opentelemetry]>=1.10.0", + "temporalio[opentelemetry]>=1.17.0", ] anthropic = [ "anthropic>=0.48.0", diff --git a/src/mcp_agent/app.py b/src/mcp_agent/app.py index 2ada0483b..507aca4ba 100644 --- a/src/mcp_agent/app.py +++ b/src/mcp_agent/app.py @@ -435,6 +435,144 @@ async def run(self): await self.context.token_counter.pop() await self.cleanup() + def _register_temporal_workflows( + self, workflows: list[Type], workflow_ids: Dict[Type, str] | None = None + ) -> None: + """ + Register pure Temporal workflow classes with the application. + + This method is specifically for Temporal workflows that are decorated with @workflow.defn. + It patches them to be compatible with the mcp-agent framework. + + Args: + workflows: A list of Temporal workflow classes (decorated with @workflow.defn) to register. + workflow_ids: Optional mapping of workflow class to custom workflow ID. + If not provided, uses the class name as the ID. + + Example: + ``` + from basic_workflow import BasicWorkflow # Has @workflow.defn + app = MCPApp(name="my_app") + app.register_temporal_workflows([BasicWorkflow]) + ``` + """ + workflow_ids = workflow_ids or {} + + for workflow_cls in workflows: + # Determine the workflow ID + workflow_id = workflow_ids.get(workflow_cls, workflow_cls.__name__) + + # Verify this is actually a Temporal workflow + if not hasattr(workflow_cls, "__temporal_workflow_definition"): + raise ValueError( + f"{workflow_cls.__name__} is not a Temporal workflow. " + "It must be decorated with @workflow.defn before registering." + ) + + # Check if it's already been patched or decorated with @app.workflow + if hasattr(workflow_cls, "_app"): + print( + f"Warning: {workflow_cls.__name__} already has MCPApp integration, skipping..." + ) + continue + + # Patch the Temporal workflow to be compatible with mcp-agent framework + self._patch_temporal_workflow(workflow_cls) + + # Register it in our workflows dictionary + self._workflows[workflow_id] = workflow_cls + + def _patch_temporal_workflow(self, temporal_workflow_cls: Type) -> None: + """ + Patch a pure Temporal workflow class to be compatible with mcp-agent framework. + + This adds the necessary methods that the mcp-agent framework expects: + - create() class method + - run_async() instance method + """ + from mcp_agent.executor.workflow import WorkflowExecution + import uuid + + # Add the _app attribute so it's recognized as an app workflow + temporal_workflow_cls._app = self + + # Add the create class method + @classmethod + async def create(cls, name=None, context=None, **kwargs): + """Factory method to create a workflow instance compatible with mcp-agent framework.""" + # Create a simple instance - pure Temporal workflows are typically stateless + instance = cls() + + # Add the necessary attributes that mcp-agent framework expects + instance.name = name or cls.__name__ + instance._workflow_id = None + instance._run_id = None + + # Set the context if provided + if context: + instance._context = context + + return instance + + # Add the run_async method + async def run_async(self, *args, **kwargs): + """Run the workflow asynchronously and return WorkflowExecution.""" + # Generate IDs for this execution + workflow_id = f"{self.name}-{uuid.uuid4().hex[:8]}" + run_id = f"{workflow_id}-run-{uuid.uuid4().hex[:8]}" + + self._workflow_id = workflow_id + self._run_id = run_id + + # For pure Temporal workflows, we need to use the executor to run them + # But since we're in the mcp-agent context, we'll delegate to the executor + from mcp_agent.core.context import get_current_context + + try: + context = get_current_context() + if context and context.executor: + # Extract special system parameters + workflow_memo = kwargs.pop("__mcp_agent_workflow_memo", None) + provided_workflow_id = kwargs.pop("__mcp_agent_workflow_id", None) + provided_task_queue = kwargs.pop("__mcp_agent_task_queue", None) + + if provided_workflow_id: + workflow_id = provided_workflow_id + self._workflow_id = workflow_id + + # Start the workflow using the executor + handle = await context.executor.start_workflow( + self.__class__.__name__, + *args, + workflow_id=workflow_id, + task_queue=provided_task_queue, + workflow_memo=workflow_memo, + **kwargs, + ) + + return WorkflowExecution( + workflow_id=handle.id, + run_id=handle.id, # Temporal uses the same ID for both + ) + else: + # Fallback - just run the workflow directly (for testing) + await self.run(*args, **kwargs) + return WorkflowExecution( + workflow_id=workflow_id, + run_id=run_id, + ) + except Exception: + # Fallback - just run the workflow directly + await self.run(*args, **kwargs) + return WorkflowExecution( + workflow_id=workflow_id, + run_id=run_id, + ) + + # Patch the class with the new methods + temporal_workflow_cls.create = create + temporal_workflow_cls.run_async = run_async + def workflow( self, cls: Type, *args, workflow_id: str | None = None, **kwargs ) -> Type: diff --git a/src/mcp_agent/executor/temporal/plugin.py b/src/mcp_agent/executor/temporal/plugin.py new file mode 100644 index 000000000..f98b48872 --- /dev/null +++ b/src/mcp_agent/executor/temporal/plugin.py @@ -0,0 +1,342 @@ +""" +MCP Agent Temporal Plugin + +This plugin provides MCP Agent functionality as a Temporal plugin, allowing users to add +MCP Agent capabilities to their Temporal workflows with a single line of configuration. +""" + +from contextlib import AbstractAsyncContextManager +from typing import AsyncIterator, TYPE_CHECKING +import warnings + +from temporalio.worker import ( + Plugin as WorkerPlugin, + Replayer, + ReplayerConfig, + Worker, + WorkerConfig, + WorkflowReplayResult, +) +from temporalio.client import ClientConfig, Plugin as ClientPlugin, WorkflowHistory +from temporalio.contrib.pydantic import ( + PydanticPayloadConverter, + pydantic_data_converter, +) +from temporalio.converter import DataConverter, DefaultPayloadConverter +from temporalio.service import ConnectConfig, ServiceClient +from temporalio.contrib.opentelemetry import TracingInterceptor + +from mcp_agent.executor.temporal.interceptor import ContextPropagationInterceptor +from mcp_agent.executor.temporal.session_proxy import SessionProxy +from mcp_agent.logging.logger import get_logger + +if TYPE_CHECKING: + from mcp_agent.app import MCPApp + +logger = get_logger(__name__) + + +class MCPAgentPlugin(ClientPlugin, WorkerPlugin): + """ + Temporal plugin for integrating MCP Agent with Temporal workflows. + + This plugin provides seamless integration between the MCP Agent SDK and + Temporal workflows. It automatically configures the necessary interceptors, + activities, and data converters to enable MCP Agent to run within + Temporal workflows with proper tracing and model execution. + + This plugin provides: + - Built-in MCP Agent activities + - Pydantic data converter for seamless serialization + - Context propagation interceptors + - Tracing support + - Auto-registration of workflow tasks as activities + + Example: + ``` + app = MCPApp(name="mcp_basic_agent") + async with app.run() as running_app: + plugin = MCPAgentPlugin(app) + client = await Client.connect("localhost:7233", plugins=[plugin]) + worker = Worker(client, task_queue="my-queue", workflows=[MyWorkflow]) + ``` + """ + + def __init__(self, app: "MCPApp"): + """Initialize MCP Agent Temporal plugin. + + Args: + app (MCPApp): MCP Agent app instance + """ + self.app = app + self.temporal_config = self.app.config.temporal + self.context = self.app.context + self._system_activities = None + self._agent_tasks = None + + # Expose a virtual upstream session (passthrough) bound to this run via activities + # This lets any code use context.upstream_session like a real session. + upstream_session = getattr(self.context, "upstream_session", None) + if upstream_session is None: + self.context.upstream_session = SessionProxy( + executor=self.context.executor, + context=self.context, + ) + app = self.context.app + if app: + # Ensure the app's logger is bound to the current context with upstream_session + if app._logger and hasattr(app._logger, "_bound_context"): + app._logger._bound_context = self.context + + # Register activities with the app so they're available in workflow context + self._register_activities() + + def init_client_plugin(self, next: ClientPlugin) -> None: + self.next_client_plugin = next + + def init_worker_plugin(self, next: WorkerPlugin) -> None: + self.next_worker_plugin = next + + def configure_client(self, config: ClientConfig) -> ClientConfig: + """Configure the Temporal client with MCP Agent settings.""" + # Set up data converter + config["data_converter"] = self._get_new_data_converter( + config.get("data_converter") + ) + + # Add interceptors + interceptors = list(config.get("interceptors") or []) + + # Add tracing if enabled + if self.context and getattr(self.context, "tracing_enabled", False): + interceptors.append(TracingInterceptor()) + + # Always add context propagation + interceptors.append(ContextPropagationInterceptor()) + + config["interceptors"] = interceptors + + # Set namespace from config if available + if self.temporal_config and self.temporal_config.namespace: + config["namespace"] = self.temporal_config.namespace + + return self.next_client_plugin.configure_client(config) + + async def connect_service_client(self, config: ConnectConfig) -> ServiceClient: + """Configure service connection with MCP Agent settings from config.""" + # Apply connection settings from TemporalSettings config + if self.temporal_config: + if self.temporal_config.host: + config.target_host = self.temporal_config.host + if self.temporal_config.namespace: + config.namespace = self.temporal_config.namespace + if self.temporal_config.api_key: + config.api_key = self.temporal_config.api_key + if self.temporal_config.tls is not None: + config.tls = self.temporal_config.tls + if self.temporal_config.rpc_metadata: + # Merge existing metadata with config metadata + existing_metadata = getattr(config, "rpc_metadata", {}) or {} + config.rpc_metadata = { + **existing_metadata, + **self.temporal_config.rpc_metadata, + } + + return await self.next_client_plugin.connect_service_client(config) + + def _register_activities(self) -> None: + """Register MCP Agent activities.""" + if not (self.context and self.app): + warnings.warn("No context and app - Activities not registered.") + return + + # Register agent tasks + if not self._agent_tasks: + from mcp_agent.agents.agent import AgentTasks + + self._agent_tasks = AgentTasks(context=self.context) + + self.app.workflow_task()(self._agent_tasks.call_tool_task) + self.app.workflow_task()(self._agent_tasks.get_capabilities_task) + self.app.workflow_task()(self._agent_tasks.get_prompt_task) + self.app.workflow_task()(self._agent_tasks.initialize_aggregator_task) + self.app.workflow_task()(self._agent_tasks.list_prompts_task) + self.app.workflow_task()(self._agent_tasks.list_tools_task) + self.app.workflow_task()(self._agent_tasks.shutdown_aggregator_task) + + # Register system activities + if not self._system_activities: + from mcp_agent.executor.temporal.system_activities import ( + SystemActivities, + ) + + self._system_activities = SystemActivities(context=self.context) + + self.app.workflow_task(name="mcp_forward_log")( + self._system_activities.forward_log + ) + self.app.workflow_task(name="mcp_request_user_input")( + self._system_activities.request_user_input + ) + self.app.workflow_task(name="mcp_relay_notify")( + self._system_activities.relay_notify + ) + self.app.workflow_task(name="mcp_relay_request")( + self._system_activities.relay_request + ) + + def _configure_activities(self, config: dict) -> None: + """Add registered activities to Worker config. + + This method modifies the config dict in place by adding activities + to config["activities"]. + """ + activities = list(config.get("activities") or []) + + if self.context and hasattr(self.context, "task_registry"): + # Collect activities from the task registry + activity_registry = self.context.task_registry + for name in activity_registry.list_activities(): + activities.append(activity_registry.get_activity(name)) + + config["activities"] = activities + + def _configure_workflows(self, config: dict) -> None: + """Add workflows from app to configuration. + + This method modifies the config dict in place. + """ + if self.app and hasattr(self.app, "workflows"): + existing_workflows = list(config.get("workflows") or []) + + # Register Temporal workflows passed to Worker + if existing_workflows: + unregistered_workflows = [] + for workflow_cls in existing_workflows: + # Check if this is a Temporal workflow + if hasattr(workflow_cls, "__temporal_workflow_definition"): + workflow_id = workflow_cls.__name__ + # If not registered with MCPApp, add it to the list + if workflow_id not in self.app.workflows: + unregistered_workflows.append(workflow_cls) + + # Register all unregistered workflows with MCPApp + if unregistered_workflows: + self.app._register_temporal_workflows(unregistered_workflows) + + app_workflows = list(self.app.workflows.values()) + + # Deduplicate workflows by class - avoid registering the same workflow class twice + all_workflows = existing_workflows + app_workflows + unique_workflows = [] + seen_classes = set() + + for workflow_cls in all_workflows: + if workflow_cls not in seen_classes: + unique_workflows.append(workflow_cls) + seen_classes.add(workflow_cls) + + config["workflows"] = unique_workflows + + def _configure_workflow_runner(self, config: dict) -> None: + """Configure workflow sandbox runner with MCP Agent modules. + + This method modifies the config dict in place. + """ + from temporalio.worker.workflow_sandbox import SandboxedWorkflowRunner + from dataclasses import replace + + runner = config.get("workflow_runner") + if isinstance(runner, SandboxedWorkflowRunner): + # Disable most restrictions for MCP Agent workflows + # This is necessary because MCP Agent code uses many libraries that aren't workflow-safe by default + config["workflow_runner"] = replace( + runner, + restrictions=runner.restrictions.with_passthrough_modules( + "mcp_agent", + "mcp", + "rich", + "logging", + "opentelemetry", + "httpx", + "httpcore", + "sniffio", + "aiohttp", + "attrs", + "numpy", + "pydantic", + ), + ) + + def _configure_interceptors(self, config: dict) -> None: + """Configure interceptors for tracing and context propagation. + + This method modifies the config dict in place. + """ + interceptors = list(config.get("interceptors") or []) + + # Add tracing if enabled + if self.context and getattr(self.context, "tracing_enabled", False): + interceptors.append(TracingInterceptor()) + + # Always add context propagation + interceptors.append(ContextPropagationInterceptor()) + + config["interceptors"] = interceptors + + def configure_worker(self, config: WorkerConfig) -> WorkerConfig: + """Configure the worker with MCP Agent activities and settings.""" + self._configure_activities(config) + + self._configure_workflows(config) + + self._configure_workflow_runner(config) + + self._configure_interceptors(config) + + # Set task queue from config if available (Worker-specific) + if self.temporal_config and self.temporal_config.task_queue: + config["task_queue"] = self.temporal_config.task_queue + + return self.next_worker_plugin.configure_worker(config) + + async def run_worker(self, worker: Worker) -> None: + """Run the worker with MCP Agent context.""" + # Set up any necessary context before running the worker + await self.next_worker_plugin.run_worker(worker) + + def configure_replayer(self, config: ReplayerConfig) -> ReplayerConfig: + """Configure the replayer with MCP Agent settings.""" + # Configure data converter + config["data_converter"] = self._get_new_data_converter( + config.get("data_converter") + ) + + self._configure_workflows(config) + + self._configure_workflow_runner(config) + + self._configure_interceptors(config) + + return self.next_worker_plugin.configure_replayer(config) + + def run_replayer( + self, + replayer: Replayer, + histories: AsyncIterator[WorkflowHistory], + ) -> AbstractAsyncContextManager[AsyncIterator[WorkflowReplayResult]]: + """Run the replayer with MCP Agent context.""" + return self.next_worker_plugin.run_replayer(replayer, histories) + + def _get_new_data_converter(self, converter: DataConverter | None) -> DataConverter: + """Get or create a Pydantic data converter, warning if replacing a custom one.""" + if converter and converter.payload_converter_class not in ( + DefaultPayloadConverter, + PydanticPayloadConverter, + ): + warnings.warn( + "A non-default Temporal data converter was provided but has been replaced " + "with the Pydantic data converter for MCP Agent compatibility." + ) + + return pydantic_data_converter diff --git a/src/mcp_agent/executor/workflow.py b/src/mcp_agent/executor/workflow.py index ad5515730..72d736e64 100644 --- a/src/mcp_agent/executor/workflow.py +++ b/src/mcp_agent/executor/workflow.py @@ -780,28 +780,6 @@ async def initialize(self): # Safe to ignore if called outside workflow sandbox or memo unavailable pass - # Expose a virtual upstream session (passthrough) bound to this run via activities - # This lets any code use context.upstream_session like a real session. - try: - from mcp_agent.executor.temporal.session_proxy import SessionProxy - - upstream_session = getattr(self.context, "upstream_session", None) - - if upstream_session is None: - self.context.upstream_session = SessionProxy( - executor=self.executor, - context=self.context, - ) - - app = self.context.app - if app: - # Ensure the app's logger is bound to the current context with upstream_session - if app._logger and hasattr(app._logger, "_bound_context"): - app._logger._bound_context = self.context - except Exception: - # Non-fatal if context is immutable early; will be set after run_id assignment in run_async - pass - self._initialized = True self.state.updated_at = datetime.now(timezone.utc).timestamp() diff --git a/src/mcp_agent/workflows/llm/augmented_llm_anthropic.py b/src/mcp_agent/workflows/llm/augmented_llm_anthropic.py index d6522af5a..c7a51e2a2 100644 --- a/src/mcp_agent/workflows/llm/augmented_llm_anthropic.py +++ b/src/mcp_agent/workflows/llm/augmented_llm_anthropic.py @@ -83,6 +83,11 @@ class RequestCompletionRequest(BaseModel): payload: dict +class RequestStreamingCompletionRequest(BaseModel): + config: AnthropicSettings + payload: dict + + def create_anthropic_instance(settings: AnthropicSettings): """Select and initialise the appropriate anthropic client instance based on settings""" if settings.provider == "bedrock": @@ -465,18 +470,16 @@ async def generate_structured( if params.stopSequences: args["stop_sequences"] = params.stopSequences - # Call Anthropic directly (one-turn streaming for consistency) - base_url = None - if self.context and self.context.config and self.context.config.anthropic: - base_url = self.context.config.anthropic.base_url - api_key = self.context.config.anthropic.api_key - client = AsyncAnthropic(api_key=api_key, base_url=base_url) - else: - client = AsyncAnthropic() + config = self.context.config + request = RequestStreamingCompletionRequest( + config=config.anthropic, + payload=args, + ) - async with client: - async with client.messages.stream(**args) as stream: - final = await stream.get_final_message() + final: Message = await self.executor.execute( + AnthropicCompletionTasks.request_streaming_completion_task, + ensure_serializable(request), + ) # Extract tool_use input and validate for block in final.content: @@ -785,6 +788,39 @@ async def request_completion_task( response = ensure_serializable(response) return response + @staticmethod + @workflow_task + @telemetry.traced() + async def request_streaming_completion_task( + request: RequestStreamingCompletionRequest, + ) -> Message: + """ + Request a streaming completion from Anthropic's API. + """ + # Prefer async client where available to avoid blocking the event loop + if request.config.provider in (None, "", "anthropic"): + client = AsyncAnthropic( + api_key=request.config.api_key, base_url=request.config.base_url + ) + payload = request.payload + async with client: + async with client.messages.stream(**payload) as stream: + final = await stream.get_final_message() + response = ensure_serializable(final) + return response + else: + anthropic = create_anthropic_instance(request.config) + payload = request.payload + + def stream_and_get_final(): + with anthropic.messages.stream(**payload) as stream: + return stream.get_final_message() + + loop = asyncio.get_running_loop() + final = await loop.run_in_executor(None, stream_and_get_final) + response = ensure_serializable(final) + return response + class AnthropicMCPTypeConverter(ProviderToMCPConverter[MessageParam, Message]): """ diff --git a/tests/workflows/llm/test_augmented_llm_anthropic.py b/tests/workflows/llm/test_augmented_llm_anthropic.py index 7a04439f9..3983e5000 100644 --- a/tests/workflows/llm/test_augmented_llm_anthropic.py +++ b/tests/workflows/llm/test_augmented_llm_anthropic.py @@ -212,8 +212,6 @@ async def test_generate_structured(self, mock_llm, default_usage): """ Tests structured output generation using native Anthropic API. """ - from unittest.mock import patch - # Define a simple response model class TestResponseModel(BaseModel): name: str @@ -237,28 +235,16 @@ class TestResponseModel(BaseModel): usage=default_usage, ) - # Mock the AsyncAnthropic client and streaming - with patch( - "mcp_agent.workflows.llm.augmented_llm_anthropic.AsyncAnthropic" - ) as MockAsyncAnthropic: - mock_client = MockAsyncAnthropic.return_value - mock_stream = AsyncMock() - mock_stream.get_final_message = AsyncMock(return_value=mock_message) - mock_stream.__aenter__ = AsyncMock(return_value=mock_stream) - mock_stream.__aexit__ = AsyncMock(return_value=None) - mock_client.messages.stream = MagicMock(return_value=mock_stream) - mock_client.__aenter__ = AsyncMock(return_value=mock_client) - mock_client.__aexit__ = AsyncMock(return_value=None) - - # Call the method - result = await AnthropicAugmentedLLM.generate_structured( - mock_llm, "Test query", TestResponseModel - ) + # Setup mock executor to return the expected message + mock_llm.executor.execute = AsyncMock(return_value=mock_message) - # Assertions - assert isinstance(result, TestResponseModel) - assert result.name == "Test" - assert result.value == 42 + # Call the method + result = await mock_llm.generate_structured("Test query", TestResponseModel) + + # Assertions + assert isinstance(result, TestResponseModel) + assert result.name == "Test" + assert result.value == 42 # Test 4: With History @pytest.mark.asyncio @@ -803,8 +789,6 @@ async def test_generate_structured_with_mixed_message_types(self, mock_llm): """ Tests generate_structured() method with mixed message types. """ - from unittest.mock import patch - # Define a simple response model class TestResponseModel(BaseModel): name: str @@ -843,26 +827,16 @@ class TestResponseModel(BaseModel): ), ) - # Mock the AsyncAnthropic client and streaming - with patch( - "mcp_agent.workflows.llm.augmented_llm_anthropic.AsyncAnthropic" - ) as MockAsyncAnthropic: - mock_client = MockAsyncAnthropic.return_value - mock_stream = AsyncMock() - mock_stream.get_final_message = AsyncMock(return_value=mock_message) - mock_stream.__aenter__ = AsyncMock(return_value=mock_stream) - mock_stream.__aexit__ = AsyncMock(return_value=None) - mock_client.messages.stream = MagicMock(return_value=mock_stream) - mock_client.__aenter__ = AsyncMock(return_value=mock_client) - mock_client.__aexit__ = AsyncMock(return_value=None) - - # Call generate_structured with mixed message types - result = await mock_llm.generate_structured(messages, TestResponseModel) - - # Assertions - assert isinstance(result, TestResponseModel) - assert result.name == "MixedTypes" - assert result.value == 123 + # Setup mock executor to return the expected message + mock_llm.executor.execute = AsyncMock(return_value=mock_message) + + # Call generate_structured with mixed message types + result = await mock_llm.generate_structured(messages, TestResponseModel) + + # Assertions + assert isinstance(result, TestResponseModel) + assert result.name == "MixedTypes" + assert result.value == 123 # Test 25: System Prompt Not None in API Call @pytest.mark.asyncio diff --git a/uv.lock b/uv.lock index 7cdf2085f..5e0579622 100644 --- a/uv.lock +++ b/uv.lock @@ -1,5 +1,5 @@ version = 1 -revision = 3 +revision = 2 requires-python = ">=3.10" resolution-markers = [ "python_full_version >= '3.13'", @@ -2041,7 +2041,7 @@ wheels = [ [[package]] name = "mcp-agent" -version = "0.1.27" +version = "0.1.28" source = { editable = "." } dependencies = [ { name = "aiohttp" }, @@ -2156,7 +2156,7 @@ requires-dist = [ { name = "pyyaml", specifier = ">=6.0.2" }, { name = "rich", specifier = ">=13.9.4" }, { name = "scikit-learn", specifier = ">=1.6.0" }, - { name = "temporalio", extras = ["opentelemetry"], marker = "extra == 'temporal'", specifier = ">=1.10.0" }, + { name = "temporalio", extras = ["opentelemetry"], marker = "extra == 'temporal'", specifier = ">=1.17.0" }, { name = "typer", specifier = ">=0.15.3" }, { name = "typer", extras = ["all"], marker = "extra == 'cli'", specifier = ">=0.15.3" }, { name = "watchdog", marker = "extra == 'cli'", specifier = ">=6.0.0" }, @@ -2440,6 +2440,18 @@ wheels = [ { url = "https://files.pythonhosted.org/packages/eb/8d/776adee7bbf76365fdd7f2552710282c79a4ead5d2a46408c9043a2b70ba/networkx-3.5-py3-none-any.whl", hash = "sha256:0030d386a9a06dee3565298b4a734b68589749a544acbb6c412dc9e2489ec6ec", size = 2034406, upload-time = "2025-05-29T11:35:04.961Z" }, ] +[[package]] +name = "nexus-rpc" +version = "1.1.0" +source = { registry = "https://pypi.org/simple" } +dependencies = [ + { name = "typing-extensions" }, +] +sdist = { url = "https://files.pythonhosted.org/packages/ef/66/540687556bd28cf1ec370cc6881456203dfddb9dab047b8979c6865b5984/nexus_rpc-1.1.0.tar.gz", hash = "sha256:d65ad6a2f54f14e53ebe39ee30555eaeb894102437125733fb13034a04a44553", size = 77383, upload-time = "2025-07-07T19:03:58.368Z" } +wheels = [ + { url = "https://files.pythonhosted.org/packages/bf/2f/9e9d0dcaa4c6ffa22b7aa31069a8a264c753ff8027b36af602cce038c92f/nexus_rpc-1.1.0-py3-none-any.whl", hash = "sha256:d1b007af2aba186a27e736f8eaae39c03aed05b488084ff6c3d1785c9ba2ad38", size = 27743, upload-time = "2025-07-07T19:03:57.556Z" }, +] + [[package]] name = "nodeenv" version = "1.9.1" @@ -4167,21 +4179,22 @@ wheels = [ [[package]] name = "temporalio" -version = "1.11.1" +version = "1.18.1" source = { registry = "https://pypi.org/simple" } dependencies = [ + { name = "nexus-rpc" }, { name = "protobuf" }, { name = "python-dateutil", marker = "python_full_version < '3.11'" }, { name = "types-protobuf" }, { name = "typing-extensions" }, ] -sdist = { url = "https://files.pythonhosted.org/packages/b7/2a/bd8cfdd116e65309c6a9f3b72126d658159d6655163802ecf719c5435d06/temporalio-1.11.1.tar.gz", hash = "sha256:d7b5e4fdcdb523fa56979fa7330903b3188980f9aec9a4564c2ad910aec0cb85", size = 1509413, upload-time = "2025-05-09T16:56:29.89Z" } +sdist = { url = "https://files.pythonhosted.org/packages/09/7a/9f7885950cc040d71340a9379134b168d557b0a0e589c75d31e797f5a8bf/temporalio-1.18.1.tar.gz", hash = "sha256:46394498f8822e61b3ce70d6735de7618f5af0501fb90f3f90f4b4f9e7816d77", size = 1787082, upload-time = "2025-09-30T15:00:19.871Z" } wheels = [ - { url = "https://files.pythonhosted.org/packages/c4/d9/60b127d9a7e313a94196f10d34bc070c1b4aa9f36d18a27a4641a8f5071a/temporalio-1.11.1-cp39-abi3-macosx_10_12_x86_64.whl", hash = "sha256:747e6562a5de53b9b72cbbf4c731c128e9db7448211df0a53f0ae74cd492e292", size = 11792321, upload-time = "2025-05-09T16:56:09.963Z" }, - { url = "https://files.pythonhosted.org/packages/96/28/f3d1829ef0fa7df8e9421172a16afec382733b3de8d1d92ff521cd548509/temporalio-1.11.1-cp39-abi3-macosx_11_0_arm64.whl", hash = "sha256:1c98d50520cb145c6219847876db8f8928d2b8f371341184686ebd30371868bf", size = 11480568, upload-time = "2025-05-09T16:56:15.074Z" }, - { url = "https://files.pythonhosted.org/packages/f2/43/245578eaeaddc4166ec5726adfeea7cc356a6600f9996e55d74b13e162e6/temporalio-1.11.1-cp39-abi3-manylinux_2_17_aarch64.manylinux2014_aarch64.whl", hash = "sha256:96711fc26387e6308ff68c12653a887230bb86cf26e6bc00c7f9208d3e6641df", size = 11864944, upload-time = "2025-05-09T16:56:19.411Z" }, - { url = "https://files.pythonhosted.org/packages/6e/43/f7fe59b4d2a4ed1b14aadaa8b3a848ea5a36958f51f78d0e32c7d44f0d37/temporalio-1.11.1-cp39-abi3-manylinux_2_17_x86_64.manylinux2014_x86_64.whl", hash = "sha256:78f6d4d1032ede5051a10e41670d6168e2821a5c2e533e2a2eafa0eb8d2d1911", size = 12041100, upload-time = "2025-05-09T16:56:23.186Z" }, - { url = "https://files.pythonhosted.org/packages/88/17/1d58baeccbd4caea47360d623e67165120023ff8bbb7bc1f24e0f300d26a/temporalio-1.11.1-cp39-abi3-win_amd64.whl", hash = "sha256:504f6ddb219bd7b39c67b7648c599edf3626043eef90d11da952af5db8c7f1a5", size = 12125170, upload-time = "2025-05-09T16:56:26.87Z" }, + { url = "https://files.pythonhosted.org/packages/82/c0/9bad907dcf968c55acee1b5cc4ec0590a0fca3bc448dc32898785a577f7b/temporalio-1.18.1-cp39-abi3-macosx_10_12_x86_64.whl", hash = "sha256:748c0ec9f48aa1ab612a58fe516d9be28c1dd98194f560fd28a2ab09c6e2ca5e", size = 12809719, upload-time = "2025-09-30T14:59:58.177Z" }, + { url = "https://files.pythonhosted.org/packages/51/c5/490a2726aa67d4b856e8288d36848e7859801889b21d251cae8e8a6c9311/temporalio-1.18.1-cp39-abi3-macosx_11_0_arm64.whl", hash = "sha256:5a789e7c483582d6d7dd49e7d2d2730d82dc94d9342fe71be76fa67afa4e6865", size = 12393639, upload-time = "2025-09-30T15:00:02.737Z" }, + { url = "https://files.pythonhosted.org/packages/92/89/e500e066df3c0fc1e6ee1a7cadbdfbc9812c62296ac0554fc09779555560/temporalio-1.18.1-cp39-abi3-manylinux_2_17_aarch64.manylinux2014_aarch64.whl", hash = "sha256:a9f5cf75c4b887476a2b39d022a9c44c495f5eb1668087a022bd9258d3adddf9", size = 12732719, upload-time = "2025-09-30T15:00:07.458Z" }, + { url = "https://files.pythonhosted.org/packages/a4/18/7e5c4082b1550c38c802af02ae60ffe39d87646856aa51909cdd2789b7a6/temporalio-1.18.1-cp39-abi3-manylinux_2_17_x86_64.manylinux2014_x86_64.whl", hash = "sha256:f28a69394bf18b4a1c22a6a784d348e93482858c505d054570b278f0f5e13e9c", size = 12926861, upload-time = "2025-09-30T15:00:12.777Z" }, + { url = "https://files.pythonhosted.org/packages/10/49/e021b3205f06a1ec8a533dc8b02dcf5784d003cf99e4fd574eedb7439357/temporalio-1.18.1-cp39-abi3-win_amd64.whl", hash = "sha256:552b360f9ccdac8d5fc5d19c6578c2f6f634399ccc37439c4794aa58487f7fd5", size = 13059005, upload-time = "2025-09-30T15:00:17.586Z" }, ] [package.optional-dependencies] @@ -4423,11 +4436,11 @@ wheels = [ [[package]] name = "types-protobuf" -version = "6.30.2.20250516" +version = "6.32.1.20250918" source = { registry = "https://pypi.org/simple" } -sdist = { url = "https://files.pythonhosted.org/packages/ac/6c/5cf088aaa3927d1cc39910f60f220f5ff573ab1a6485b2836e8b26beb58c/types_protobuf-6.30.2.20250516.tar.gz", hash = "sha256:aecd1881770a9bb225ede66872ef7f0da4505edd0b193108edd9892e48d49a41", size = 62254, upload-time = "2025-05-16T03:06:50.794Z" } +sdist = { url = "https://files.pythonhosted.org/packages/69/5a/bd06c2dbb77ebd4ea764473c9c4c014c7ba94432192cb965a274f8544b9d/types_protobuf-6.32.1.20250918.tar.gz", hash = "sha256:44ce0ae98475909ca72379946ab61a4435eec2a41090821e713c17e8faf5b88f", size = 63780, upload-time = "2025-09-18T02:50:39.391Z" } wheels = [ - { url = "https://files.pythonhosted.org/packages/c0/66/06a9c161f5dd5deb4f5c016ba29106a8f1903eb9a1ba77d407dd6588fecb/types_protobuf-6.30.2.20250516-py3-none-any.whl", hash = "sha256:8c226d05b5e8b2623111765fa32d6e648bbc24832b4c2fddf0fa340ba5d5b722", size = 76480, upload-time = "2025-05-16T03:06:49.444Z" }, + { url = "https://files.pythonhosted.org/packages/37/5a/8d93d4f4af5dc3dd62aa4f020deae746b34b1d94fb5bee1f776c6b7e9d6c/types_protobuf-6.32.1.20250918-py3-none-any.whl", hash = "sha256:22ba6133d142d11cc34d3788ad6dead2732368ebb0406eaa7790ea6ae46c8d0b", size = 77885, upload-time = "2025-09-18T02:50:38.028Z" }, ] [[package]]