-
Notifications
You must be signed in to change notification settings - Fork 768
Temporal plugin implementation #518
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: main
Are you sure you want to change the base?
Changes from 19 commits
9a9b001
95487d7
d211248
29d35c7
2ea85fd
8cd266c
9b03bb9
5051dd3
6a75bde
0e5f157
b3b9f56
5de198a
9494112
b9fd945
b5fca11
b24ec16
ed56e3f
529dee3
d467cda
de7603b
04aab67
e4588cd
9d64710
02735f0
d596208
46f7a00
efb282c
fc3613c
ab21d00
0c453f9
3dd5722
d22223f
af39096
aef8aa3
0a97ce1
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,93 @@ | ||
# MCP-Agent | ||
mcp_agent.secrets.yaml | ||
*.secrets.yaml | ||
.mcp-agent/ | ||
|
||
# Python | ||
__pycache__/ | ||
*.py[cod] | ||
*$py.class | ||
*.so | ||
.Python | ||
build/ | ||
develop-eggs/ | ||
dist/ | ||
downloads/ | ||
eggs/ | ||
.eggs/ | ||
lib/ | ||
lib64/ | ||
parts/ | ||
sdist/ | ||
var/ | ||
wheels/ | ||
share/python-wheels/ | ||
*.egg-info/ | ||
.installed.cfg | ||
*.egg | ||
MANIFEST | ||
pip-log.txt | ||
pip-delete-this-directory.txt | ||
|
||
# Virtual Environment | ||
.env | ||
.venv | ||
env/ | ||
venv/ | ||
ENV/ | ||
env.bak/ | ||
venv.bak/ | ||
|
||
# PyCharm | ||
.idea/ | ||
|
||
# VS Code | ||
.vscode/ | ||
*.code-workspace | ||
|
||
# Vim | ||
[._]*.s[a-v][a-z] | ||
[._]*.sw[a-p] | ||
[._]s[a-rt-v][a-z] | ||
[._]ss[a-gi-z] | ||
[._]sw[a-p] | ||
*~ | ||
|
||
# Logs | ||
logs/ | ||
*.log | ||
*.jsonl | ||
|
||
# OS | ||
.DS_Store | ||
.DS_Store? | ||
._* | ||
.Spotlight-V100 | ||
.Trashes | ||
ehthumbs.db | ||
Thumbs.db | ||
|
||
# Testing | ||
.pytest_cache/ | ||
.coverage | ||
htmlcov/ | ||
.tox/ | ||
.hypothesis/ | ||
|
||
# Jupyter Notebook | ||
.ipynb_checkpoints | ||
|
||
# pyenv | ||
.python-version | ||
|
||
# mypy | ||
.mypy_cache/ | ||
.dmypy.json | ||
dmypy.json | ||
|
||
# Pyre type checker | ||
.pyre/ | ||
|
||
# Local environment variables | ||
.env.local | ||
.env.*.local |
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,155 @@ | ||
# MCP-Agent with Temporal Plugin | ||
|
||
This example demonstrates multiple ways to use the Temporal plugin with MCP-Agent for workflow orchestration. | ||
|
||
## Prerequisites | ||
|
||
1. **Temporal Server**: Ensure you have a Temporal server running locally: | ||
```bash | ||
temporal server start-dev | ||
``` | ||
This starts a development server at `localhost:7233` | ||
|
||
2. **API Keys**: Add your API keys to `mcp_agent.secrets.yaml`: | ||
```yaml | ||
OPENAI_API_KEY: "your-key-here" | ||
ANTHROPIC_API_KEY: "your-key-here" # optional | ||
``` | ||
|
||
3. **Configuration**: Set the execution engine to `temporal` in `mcp_agent.config.yaml`: | ||
```yaml | ||
execution_engine: temporal | ||
|
||
temporal: | ||
host: "localhost:7233" | ||
namespace: "default" | ||
task_queue: "mcp-agent" | ||
``` | ||
|
||
## Usage Methods | ||
|
||
### Method 1: Separate Worker and Workflow Files | ||
|
||
This approach separates the worker and workflow execution into different processes, useful for distributed systems. | ||
|
||
**Step 1: Define your workflow** (`basic_workflow.py`): | ||
```python | ||
from temporalio import workflow | ||
from mcp_agent.agents.agent import Agent | ||
from mcp_agent.workflows.llm.augmented_llm_openai import OpenAIAugmentedLLM | ||
|
||
@workflow.defn | ||
class BasicWorkflow: | ||
@workflow.run | ||
async def run(self, prompt: str) -> str: | ||
simple_agent = Agent( | ||
name="finder", | ||
instruction="You are a helpful agent", | ||
server_names=["fetch"], | ||
) | ||
|
||
async with simple_agent: | ||
llm = await simple_agent.attach_llm(OpenAIAugmentedLLM) | ||
result = await llm.generate_str(prompt) | ||
return result | ||
``` | ||
|
||
**Step 2: Run the worker** (`run_worker.py`): | ||
```bash | ||
uv run run_worker.py | ||
``` | ||
|
||
**Step 3: Execute the workflow** (in another terminal): | ||
```bash | ||
uv run run_basic_workflow.py | ||
``` | ||
|
||
### Method 2: Single File Execution (temporal_agent.py) | ||
|
||
This approach combines worker and workflow execution in a single file, ideal for simpler deployments or testing. | ||
|
||
```bash | ||
uv run temporal_agent.py | ||
``` | ||
|
||
This file: | ||
- Defines the workflow | ||
- Starts the worker | ||
- Executes the workflow | ||
- All within the same process using `async with Worker(...)` | ||
|
||
**Key difference**: The single-file approach runs both the worker and client in the same process: | ||
```python | ||
async with Worker( | ||
client, | ||
task_queue=running_app.config.temporal.task_queue, | ||
workflows=[BasicWorkflow], | ||
): | ||
# Execute workflow while worker is running | ||
output = await client.execute_workflow(...) | ||
``` | ||
|
||
## Important Configuration Notes | ||
|
||
### Execution Engine Setting | ||
|
||
The `execution_engine` in `mcp_agent.config.yaml` **MUST** be set to `temporal` for the Temporal plugin to work: | ||
|
||
```yaml | ||
execution_engine: temporal # Required for Temporal plugin | ||
``` | ||
|
||
Without this setting, MCP-Agent will use the default `asyncio` engine and Temporal features won't be available. | ||
|
||
### Temporal Configuration | ||
|
||
Configure Temporal settings in `mcp_agent.config.yaml`: | ||
|
||
```yaml | ||
temporal: | ||
host: "localhost:7233" # Temporal server address | ||
namespace: "default" # Temporal namespace | ||
task_queue: "mcp-agent" # Task queue name | ||
max_concurrent_activities: 10 # Concurrency limit | ||
rpc_metadata: | ||
X-Client-Name: "mcp-agent" # Client identification | ||
``` | ||
|
||
## File Structure | ||
|
||
``` | ||
temporal_plugin/ | ||
├── basic_workflow.py # Workflow definition | ||
├── run_worker.py # Worker process (Method 1) | ||
├── run_basic_workflow.py # Workflow client (Method 1) | ||
├── temporal_agent.py # Single-file approach (Method 2) | ||
├── main.py # MCP-Agent app setup | ||
├── mcp_agent.config.yaml # Configuration (MUST set execution_engine: temporal) | ||
└── mcp_agent.secrets.yaml # API keys | ||
``` | ||
|
||
## When to Use Each Method | ||
|
||
- **Separate Files (Method 1)**: Use when you need: | ||
- Distributed workers across multiple machines | ||
- Independent scaling of workers and clients | ||
- Clear separation of concerns | ||
- Production deployments | ||
|
||
- **Single File (Method 2)**: Use when you need: | ||
- Quick prototyping and testing | ||
- Simple deployments | ||
- All-in-one execution for demos | ||
- Development and debugging | ||
|
||
## Troubleshooting | ||
|
||
1. **Temporal not working**: Ensure `execution_engine: temporal` in config | ||
2. **Connection refused**: Start Temporal server with `temporal server start-dev` | ||
3. **Task queue mismatch**: Verify task queue names match between worker and client | ||
|
||
## Further Resources | ||
|
||
- [Temporal Documentation](https://docs.temporal.io/) | ||
- [MCP-Agent Documentation](https://docs.mcp-agent.com/) | ||
- [MCP-Agent GitHub](https://github.com/lastmile-ai/mcp-agent) |
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,19 @@ | ||
from temporalio import workflow | ||
from mcp_agent.workflows.llm.augmented_llm_openai import OpenAIAugmentedLLM | ||
from mcp_agent.agents.agent import Agent | ||
|
||
|
||
@workflow.defn | ||
class BasicWorkflow: | ||
@workflow.run | ||
async def run(self, prompt: str) -> str: | ||
simple_agent = Agent( | ||
name="finder", | ||
instruction="You are a helpful agent", | ||
server_names=["fetch"], | ||
) | ||
|
||
async with simple_agent: | ||
llm = await simple_agent.attach_llm(OpenAIAugmentedLLM) | ||
result = await llm.generate_str(prompt) | ||
return result | ||
Original file line number | Diff line number | Diff line change | ||||||||||||||||||||||
---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|
@@ -0,0 +1,115 @@ | ||||||||||||||||||||||||
import asyncio | ||||||||||||||||||||||||
from uuid import uuid4 | ||||||||||||||||||||||||
from temporalio import workflow | ||||||||||||||||||||||||
from mcp_agent.core.context import get_current_context | ||||||||||||||||||||||||
from mcp_agent.workflows.evaluator_optimizer.evaluator_optimizer import ( | ||||||||||||||||||||||||
EvaluatorOptimizerLLM, | ||||||||||||||||||||||||
QualityRating, | ||||||||||||||||||||||||
) | ||||||||||||||||||||||||
from mcp_agent.workflows.llm.augmented_llm import RequestParams | ||||||||||||||||||||||||
from mcp_agent.workflows.llm.augmented_llm_openai import OpenAIAugmentedLLM | ||||||||||||||||||||||||
from mcp_agent.agents.agent import Agent | ||||||||||||||||||||||||
from temporalio.client import Client | ||||||||||||||||||||||||
from mcp_agent.executor.temporal.plugin import MCPAgentPlugin | ||||||||||||||||||||||||
from mcp_agent.app import MCPApp | ||||||||||||||||||||||||
from temporalio.worker import Worker | ||||||||||||||||||||||||
|
||||||||||||||||||||||||
app = MCPApp(name="mcp_basic_agent") | ||||||||||||||||||||||||
|
||||||||||||||||||||||||
|
||||||||||||||||||||||||
@workflow.defn | ||||||||||||||||||||||||
class BasicWorkflow: | ||||||||||||||||||||||||
@workflow.run | ||||||||||||||||||||||||
async def run(self, prompt: str) -> str: | ||||||||||||||||||||||||
context = get_current_context() | ||||||||||||||||||||||||
logger = context.app.logger | ||||||||||||||||||||||||
|
||||||||||||||||||||||||
logger.info("Current config:", data=context.config.model_dump()) | ||||||||||||||||||||||||
|
||||||||||||||||||||||||
Comment on lines
+27
to
+28
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Avoid dumping full config to logs (secrets risk) context.config.model_dump() may include API keys. Don’t log entire config. - logger.info("Current config:", data=context.config.model_dump())
+ logger.info("Config initialized") 📝 Committable suggestion
Suggested change
🤖 Prompt for AI Agents
|
||||||||||||||||||||||||
optimizer = Agent( | ||||||||||||||||||||||||
name="optimizer", | ||||||||||||||||||||||||
instruction="""You are a career coach specializing in cover letter writing. | ||||||||||||||||||||||||
You are tasked with generating a compelling cover letter given the job posting, | ||||||||||||||||||||||||
candidate details, and company information. Tailor the response to the company and job requirements. | ||||||||||||||||||||||||
""", | ||||||||||||||||||||||||
server_names=["fetch"], | ||||||||||||||||||||||||
) | ||||||||||||||||||||||||
|
||||||||||||||||||||||||
evaluator = Agent( | ||||||||||||||||||||||||
name="evaluator", | ||||||||||||||||||||||||
instruction="""Evaluate the following response based on the criteria below: | ||||||||||||||||||||||||
1. Clarity: Is the language clear, concise, and grammatically correct? | ||||||||||||||||||||||||
2. Specificity: Does the response include relevant and concrete details tailored to the job description? | ||||||||||||||||||||||||
3. Relevance: Does the response align with the prompt and avoid unnecessary information? | ||||||||||||||||||||||||
4. Tone and Style: Is the tone professional and appropriate for the context? | ||||||||||||||||||||||||
5. Persuasiveness: Does the response effectively highlight the candidate's value? | ||||||||||||||||||||||||
6. Grammar and Mechanics: Are there any spelling or grammatical issues? | ||||||||||||||||||||||||
7. Feedback Alignment: Has the response addressed feedback from previous iterations? | ||||||||||||||||||||||||
|
||||||||||||||||||||||||
For each criterion: | ||||||||||||||||||||||||
- Provide a rating (EXCELLENT, GOOD, FAIR, or POOR). | ||||||||||||||||||||||||
- Offer specific feedback or suggestions for improvement. | ||||||||||||||||||||||||
|
||||||||||||||||||||||||
Summarize your evaluation as a structured response with: | ||||||||||||||||||||||||
- Overall quality rating. | ||||||||||||||||||||||||
- Specific feedback and areas for improvement.""", | ||||||||||||||||||||||||
) | ||||||||||||||||||||||||
|
||||||||||||||||||||||||
evaluator_optimizer = EvaluatorOptimizerLLM( | ||||||||||||||||||||||||
optimizer=optimizer, | ||||||||||||||||||||||||
evaluator=evaluator, | ||||||||||||||||||||||||
llm_factory=OpenAIAugmentedLLM, | ||||||||||||||||||||||||
min_rating=QualityRating.EXCELLENT, | ||||||||||||||||||||||||
context=context, | ||||||||||||||||||||||||
) | ||||||||||||||||||||||||
Comment on lines
+24
to
+64
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Critical: Non-deterministic operations in workflow violate Temporal guarantees. Temporal workflows must be deterministic to support replay. This workflow performs several non-deterministic operations directly in the workflow method:
These operations will break Temporal's replay mechanism and cause workflow failures when the worker restarts or the workflow history is replayed. Solution: Move all agent/LLM initialization and API calls into Temporal activities. The workflow should only orchestrate activity calls and handle deterministic control flow. Based on learnings about Temporal SDK best practices: workflows must be deterministic and avoid external I/O or state access. |
||||||||||||||||||||||||
|
||||||||||||||||||||||||
result = await evaluator_optimizer.generate_str( | ||||||||||||||||||||||||
message=input, | ||||||||||||||||||||||||
request_params=RequestParams(model="gpt-4o"), | ||||||||||||||||||||||||
) | ||||||||||||||||||||||||
|
||||||||||||||||||||||||
return result | ||||||||||||||||||||||||
|
||||||||||||||||||||||||
|
||||||||||||||||||||||||
async def main(): | ||||||||||||||||||||||||
async with app.run() as running_app: | ||||||||||||||||||||||||
plugin = MCPAgentPlugin(running_app) | ||||||||||||||||||||||||
|
||||||||||||||||||||||||
client = await Client.connect( | ||||||||||||||||||||||||
running_app.config.temporal.host, | ||||||||||||||||||||||||
plugins=[plugin], | ||||||||||||||||||||||||
) | ||||||||||||||||||||||||
|
||||||||||||||||||||||||
async with Worker( | ||||||||||||||||||||||||
client, | ||||||||||||||||||||||||
task_queue=running_app.config.temporal.task_queue, | ||||||||||||||||||||||||
workflows=[BasicWorkflow], | ||||||||||||||||||||||||
): | ||||||||||||||||||||||||
Comment on lines
83
to
87
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Worker plugin not attached — activities won’t be registered Same issue as in router example; attach the plugin to Worker. - async with Worker(
- client,
- task_queue=running_app.config.temporal.task_queue,
- workflows=[BasicWorkflow],
- ):
+ async with Worker(
+ client,
+ task_queue=running_app.config.temporal.task_queue,
+ workflows=[BasicWorkflow],
+ plugins=[plugin],
+ ): 📝 Committable suggestion
Suggested change
🤖 Prompt for AI Agents
|
||||||||||||||||||||||||
job_posting = ( | ||||||||||||||||||||||||
"Software Engineer at LastMile AI. Responsibilities include developing AI systems, " | ||||||||||||||||||||||||
"collaborating with cross-functional teams, and enhancing scalability. Skills required: " | ||||||||||||||||||||||||
"Python, distributed systems, and machine learning." | ||||||||||||||||||||||||
) | ||||||||||||||||||||||||
candidate_details = ( | ||||||||||||||||||||||||
"Alex Johnson, 3 years in machine learning, contributor to open-source AI projects, " | ||||||||||||||||||||||||
"proficient in Python and TensorFlow. Motivated by building scalable AI systems to solve real-world problems." | ||||||||||||||||||||||||
) | ||||||||||||||||||||||||
|
||||||||||||||||||||||||
# This should trigger a 'fetch' call to get the company information | ||||||||||||||||||||||||
company_information = ( | ||||||||||||||||||||||||
"Look up from the LastMile AI page: https://lastmileai.dev" | ||||||||||||||||||||||||
) | ||||||||||||||||||||||||
|
||||||||||||||||||||||||
task = f"Write a cover letter for the following job posting: {job_posting}\n\nCandidate Details: {candidate_details}\n\nCompany information: {company_information}" | ||||||||||||||||||||||||
|
||||||||||||||||||||||||
output = await client.execute_workflow( | ||||||||||||||||||||||||
BasicWorkflow.run, | ||||||||||||||||||||||||
task, | ||||||||||||||||||||||||
id=f"basic-workflow-{uuid4()}", | ||||||||||||||||||||||||
task_queue=running_app.config.temporal.task_queue, | ||||||||||||||||||||||||
) | ||||||||||||||||||||||||
print(output) | ||||||||||||||||||||||||
|
||||||||||||||||||||||||
|
||||||||||||||||||||||||
if __name__ == "__main__": | ||||||||||||||||||||||||
asyncio.run(main()) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
🧩 Analysis chain
Workflow determinism/I-O safety: ensure all LLM/network calls run as activities (not in workflow code).
Agent(...); async with simple_agent; llm.generate_str(...)
will perform I/O unless your executor reroutes to activities underexecution_engine: temporal
. If not fully offloaded, this violates Temporal workflow rules and will fail nondeterminism/sandbox checks.Please confirm your executor uses
workflow.execute_activity
/local activities for model calls and MCP tool use whenexecution_engine=temporal
. If not, we should refactor the workflow to call explicit activities.🌐 Web query:
💡 Result:
Short answer: No — Temporal Python Workflows must not perform network I/O directly. Use Activities (or Local Activities / child workflows / async Activities) to do external I/O such as HTTP calls or LLM requests. Workflows remain deterministic and only orchestrate; Activities execute non‑deterministic, blocking, or network work. [1][2][3]
Recommended pattern (concise):
Sources:
Critical: Move LLM/HTTP/network I/O out of the workflow — invoke via Activities
Temporal Python workflows must not perform network I/O. examples/temporal_plugin/basic_workflow.py (lines 10–19) calls simple_agent.attach_llm(...) and llm.generate_str(...) inside the workflow; this performs external I/O and will break Temporal determinism/sandbox checks. Refactor those LLM/HTTP calls into Activities (or execute_local_activity) and invoke them from the workflow using workflow.execute_activity / workflow.execute_local_activity. If an executor already transparently offloads these calls to Temporal Activities, point to that implementation; otherwise refactor.
🤖 Prompt for AI Agents