Skip to content

Commit cdfd5f5

Browse files
authored
Get agent factory to work with Temporal, and also allow configurable activity policies (#595)
* Get agent factory to work with Temporal, and also allow configurable activity policies * update with PR feedback
1 parent d70ce31 commit cdfd5f5

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

48 files changed

+1653
-161
lines changed

docs/advanced/temporal.mdx

Lines changed: 21 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -64,14 +64,34 @@ mcp-agent supports both `asyncio` and `temporal` execution engines. While asynci
6464
Update your `mcp_agent.config.yaml`:
6565
```yaml
6666
execution_engine: temporal
67-
67+
68+
# Optional: preload modules that register @workflow_task activities
69+
workflow_task_modules:
70+
- my_package.custom_temporal_tasks
71+
72+
# Optional: override retry behaviour for specific workflow tasks/activities
73+
workflow_task_retry_policies:
74+
my_package.custom_temporal_tasks.my_activity:
75+
maximum_attempts: 1
76+
6877
temporal:
6978
host: localhost
7079
port: 7233
7180
namespace: default
7281
task_queue: mcp-agent
7382
max_concurrent_activities: 10
7483
```
84+
`mcp-agent` preloads its built-in LLM providers automatically. Add extra modules
85+
when you register custom `@workflow_task` activities outside the core packages so
86+
the worker can discover them before starting. Entries are standard Python import paths.
87+
The optional `workflow_task_retry_policies` mapping lets you tune Temporal retry
88+
behaviour per activity (supports exact names, wildcards like `prefix*`, or `*`).
89+
For provider SDKs, common non-retryable error types include:
90+
- OpenAI/Azure OpenAI: `AuthenticationError`, `PermissionDeniedError`, `BadRequestError`, `NotFoundError`, `UnprocessableEntityError`.
91+
- Anthropic: `AuthenticationError`, `PermissionDeniedError`, `BadRequestError`, `NotFoundError`, `UnprocessableEntityError`.
92+
- Azure AI Inference: `HttpResponseError` (400/401/403/404/422).
93+
- Google GenAI: `InvalidArgument`, `FailedPrecondition`, `PermissionDenied`, `NotFound`, `Unauthenticated`.
94+
mcp-agent raises a `WorkflowApplicationError` for these cases so Temporal (or the asyncio executor) avoids retry loops even when the Temporal SDK is not installed locally.
7595
</Step>
7696

7797
<Step title="Create Worker">

docs/mcp-agent-sdk/advanced/durable-agents.mdx

Lines changed: 42 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -150,6 +150,48 @@ The same helper works on the asyncio executor via `app.context.executor.signal_b
150150

151151
The Temporal server example also shows how durable workflows call nested MCP servers and trigger [MCP elicitation](https://modelcontextprotocol.io/specification/2025-06-18/client/elicitation) when a human response is required. Activities such as `call_nested_elicitation` log progress via `app.app.logger` so the request trace and Temporal history stay aligned.
152152

153+
## Configure workflow-task modules and retry policies
154+
155+
Add optional top-level overrides to preload custom workflow tasks and refine retry behaviour:
156+
157+
```yaml
158+
execution_engine: temporal
159+
160+
workflow_task_modules:
161+
- my_project.temporal_tasks # importable module path
162+
163+
workflow_task_retry_policies:
164+
my_project.temporal_tasks.generate_summary:
165+
maximum_attempts: 1
166+
mcp_agent.workflows.llm.augmented_llm_openai.OpenAICompletionTasks.request_completion_task:
167+
maximum_attempts: 2
168+
non_retryable_error_types:
169+
- AuthenticationError
170+
- PermissionDeniedError
171+
- BadRequestError
172+
- NotFoundError
173+
- UnprocessableEntityError
174+
custom_tasks.*:
175+
initial_interval: 1.5 # seconds (number, string, or timedelta)
176+
backoff_coefficient: 1.2
177+
*:
178+
maximum_attempts: 3
179+
```
180+
181+
- `workflow_task_modules` entries are standard Python import paths; they are imported before the worker begins polling so `@workflow_task` functions register globally.
182+
- `workflow_task_retry_policies` accepts exact activity names, module or class suffixes (`prefix.suffix`), trailing wildcards like `custom_tasks.*`, or the global `*`. The most specific match wins.
183+
- Retry intervals accept seconds (`1.5`), strings (`"2"`), or `timedelta` objects.
184+
- Marking error `type`s in `non_retryable_error_types` prevents Temporal from re-running an activity when the failure is not recoverable (see the [Temporal failure reference](https://docs.temporal.io/references/failures#application-failure)). For provider SDKs, useful values include:
185+
- OpenAI/Azure OpenAI: `AuthenticationError`, `PermissionDeniedError`, `BadRequestError`, `NotFoundError`, `UnprocessableEntityError`.
186+
- Anthropic: `AuthenticationError`, `PermissionDeniedError`, `BadRequestError`, `NotFoundError`, `UnprocessableEntityError`.
187+
- Azure AI Inference: `HttpResponseError` (raised with non-retryable status codes such as 400/401/403/404/422).
188+
- Google GenAI: `InvalidArgument`, `FailedPrecondition`, `PermissionDenied`, `NotFound`, `Unauthenticated`.
189+
- mcp-agent raises `WorkflowApplicationError` (wrapping Temporal's `ApplicationError` when available) for known non-retryable provider failures, so these policies work even if you run without the Temporal extra installed.
190+
- Inspect an activity’s fully-qualified name via `func.execution_metadata["activity_name"]` or through the Temporal UI history when adding a mapping.
191+
- Temporal matches `non_retryable_error_types` using the exception class name string you supply (see the [RetryPolicy reference](https://docs.temporal.io/references/sdk-apis/python/temporalio.common/#temporalio-common-RetryPolicy)). Use the narrowest names possible—overly generic entries such as `NotFoundError` can suppress legitimate retries if a workflow expects to handle that condition and try again.
192+
193+
With these pieces in place you can gradually introduce durability: start on asyncio, flip the config once you need retries/pause/resume, then iterate on policies and module preloading as your workflow surface grows.
194+
153195
## Operating durable agents
154196

155197
- **Temporal Web UI** (http://localhost:8233) lets you inspect history, replay workflow code, and emit signals.

examples/basic/agent_factory/README.md

Lines changed: 19 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -1,13 +1,14 @@
1-
Agent Factory
1+
# Agent Factory
22

3-
This folder shows how to define agents and compose powerful LLM workflows using the functional helpers in [`factory.py`](https://github.com/lastmile-ai/mcp-agent/blob/main/src/mcp_agent/workflows/factory.py).
3+
This folder shows how to define agents and compose powerful LLM workflows using the helpers in [`mcp_agent.workflows.factory`](https://github.com/lastmile-ai/mcp-agent/blob/main/src/mcp_agent/workflows/factory.py).
44

55
What's included
66

77
- `agents.yaml`: simple YAML agents
88
- `mcp_agent.config.yaml`: enables auto-loading subagents from inline definitions and directories
99
- `mcp_agent.secrets.yaml.example`: template for API keys
10-
- `load_and_route.py`: load agents and route via an LLM
10+
- `main.py`: load agents, register the `route_prompt` tool, and route requests
11+
- `run_worker.py`: Temporal worker (set `execution_engine: temporal` and run this in another terminal)
1112
- `auto_loaded_subagents.py`: discover subagents from config (Claude-style markdown and others)
1213
- `orchestrator_demo.py`: orchestrator-workers pattern
1314
- `parallel_demo.py`: parallel fan-out/fan-in pattern
@@ -21,10 +22,23 @@ cp examples/basic/agent_factory/mcp_agent.secrets.yaml.example examples/basic/ag
2122
# Fill in your provider API keys (OpenAI/Anthropic/etc.)
2223
```
2324

24-
2. Run an example
25+
2. Run the main demo
26+
27+
```bash
28+
uv run examples/basic/agent_factory/main.py
29+
```
30+
31+
To exercise the same workflow via Temporal, update `mcp_agent.config.yaml` to set `execution_engine: temporal`, start the worker in another terminal, then invoke the workflow:
32+
33+
```bash
34+
uv run examples/basic/agent_factory/run_worker.py
35+
# ...in another terminal
36+
uv run examples/basic/agent_factory/main.py
37+
```
38+
39+
Other demos in this folder remain available:
2540

2641
```bash
27-
uv run examples/basic/agent_factory/load_and_route.py
2842
uv run examples/basic/agent_factory/orchestrator_demo.py
2943
uv run examples/basic/agent_factory/parallel_demo.py
3044
uv run examples/basic/agent_factory/auto_loaded_subagents.py
Lines changed: 44 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,44 @@
1+
import asyncio
2+
from pathlib import Path
3+
4+
from mcp_agent.core.context import Context
5+
6+
from mcp_agent.app import MCPApp
7+
from mcp_agent.workflows.factory import (
8+
create_router_llm,
9+
load_agent_specs_from_file,
10+
)
11+
12+
app = MCPApp(name="factory_demo", description="Demo of agent factory with LLM routing")
13+
14+
15+
@app.async_tool()
16+
async def route_prompt(
17+
prompt: str = "Find the README and summarize it", app_ctx: Context | None = None
18+
) -> str:
19+
"""Route a prompt to the appropriate agent using an LLMRouter."""
20+
context = app_ctx or app.context
21+
22+
agents_path = Path(__file__).resolve().parent / "agents.yaml"
23+
specs = load_agent_specs_from_file(str(agents_path), context=context)
24+
25+
router = await create_router_llm(
26+
server_names=["filesystem", "fetch"],
27+
agents=specs,
28+
provider="openai",
29+
context=context,
30+
)
31+
32+
return await router.generate_str(prompt)
33+
34+
35+
async def main():
36+
async with app.run() as agent_app:
37+
result = await route_prompt(
38+
prompt="Find the README and summarize it", app_ctx=agent_app.context
39+
)
40+
print("Routing result:", result)
41+
42+
43+
if __name__ == "__main__":
44+
asyncio.run(main())
Lines changed: 22 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,22 @@
1+
"""Run a Temporal worker for the agent factory demo."""
2+
3+
import asyncio
4+
import logging
5+
6+
from mcp_agent.executor.temporal import create_temporal_worker_for_app
7+
8+
from main import app
9+
10+
11+
logging.basicConfig(level=logging.INFO)
12+
logger = logging.getLogger(__name__)
13+
14+
15+
async def main():
16+
logger.info("Starting Temporal worker for agent factory demo")
17+
async with create_temporal_worker_for_app(app) as worker:
18+
await worker.run()
19+
20+
21+
if __name__ == "__main__":
22+
asyncio.run(main())
Lines changed: 69 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,69 @@
1+
# Cloud Agent Factory (Temporal + Custom Workflow Tasks)
2+
3+
This example routes customer-facing questions to specialized agents, augments
4+
responses with in-code knowledge-base snippets, and shows how to preload custom
5+
`@workflow_task` modules via `workflow_task_modules`.
6+
7+
## What's included
8+
9+
- `main.py` – exposes an `@app.async_tool` (`route_customer_request`) that looks up
10+
knowledge-base context via a workflow task and then routes the enriched
11+
question through an LLMRouter.
12+
- `custom_tasks.py` – defines `knowledge_base_lookup_task` using the
13+
`@workflow_task` decorator. The task provides deterministic answers drawn from
14+
an embedded support knowledge base.
15+
- `agents.yaml` – two sample agents (`support_specialist`, `product_expert`) that
16+
the router can delegate to.
17+
- `run_worker.py` – Temporal worker entry point.
18+
- `mcp_agent.config.yaml` – configures Temporal, lists
19+
`workflow_task_modules: [custom_tasks]` so the worker imports the module before
20+
polling, and sets `workflow_task_retry_policies` to limit retries for the custom
21+
activity. Entries should be importable module paths (here `custom_tasks` lives
22+
alongside `main.py`, so we reference it by module name).
23+
24+
## Quick start
25+
26+
1. Install dependencies and add secrets:
27+
```bash
28+
cd examples/cloud/agent_factory
29+
cp mcp_agent.secrets.yaml.example mcp_agent.secrets.yaml # add OPENAI_API_KEY
30+
uv pip install -r requirements.txt
31+
```
32+
33+
2. Start Temporal elsewhere:
34+
```bash
35+
temporal server start-dev
36+
```
37+
38+
3. Launch the worker:
39+
```bash
40+
uv run run_worker.py
41+
```
42+
43+
4. In another terminal, run the app:
44+
```bash
45+
uv run main.py
46+
```
47+
The tool will fetch knowledge-base context via the workflow task (executed as
48+
a Temporal activity) and produce a routed response.
49+
50+
5. Optional: connect an MCP client while `main.py` is running:
51+
```bash
52+
npx @modelcontextprotocol/inspector --transport sse --server-url http://127.0.0.1:8000/sse
53+
```
54+
55+
## How it works
56+
57+
1. `workflow_task_modules` ensures `custom_tasks.py` is imported during worker
58+
startup, registering `knowledge_base_lookup_task` with the app.
59+
2. `route_customer_request` runs as a Temporal workflow (courtesy of
60+
`@app.async_tool`). Inside the workflow we call
61+
`context.executor.execute(knowledge_base_lookup_task, {...})`; this schedules
62+
the task as an activity, returning curated snippets.
63+
3. The prompt is enriched with those snippets and routed through the factory
64+
helper (`create_router_llm`) to select the best agent and compose the final
65+
reply.
66+
67+
You can expand the example by adding more entries to the knowledge base or by
68+
introducing additional workflow tasks. Simply place them in `custom_tasks.py`
69+
and keep the module listed in `workflow_task_modules`.
Lines changed: 18 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,18 @@
1+
agents:
2+
- name: support_specialist
3+
instruction: |
4+
You are a customer support specialist. Provide empathetic answers,
5+
reference available features, and suggest next steps or workarounds.
6+
When relevant, mention how customers can contact support.
7+
server_names: [fetch]
8+
9+
- name: product_expert
10+
instruction: |
11+
You are a product expert who knows roadmap milestones and integrations.
12+
Provide concise summaries, highlight differentiators, and cite
13+
integrations or security measures when appropriate.
14+
server_names: []
15+
16+
# Note: you could alternatively inline these AgentSpec definitions under
17+
# `agents.definitions` in `mcp_agent.config.yaml`. We keep them in a separate
18+
# YAML file here to highlight loading specs via the factory helpers.
Lines changed: 88 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,88 @@
1+
"""Custom workflow tasks for the cloud agent factory demo."""
2+
3+
from __future__ import annotations
4+
5+
from typing import Dict, List, Tuple
6+
7+
from mcp_agent.executor.workflow_task import workflow_task
8+
9+
10+
_KNOWLEDGE_BASE: Tuple[Dict[str, str], ...] = (
11+
{
12+
"topic": "pricing",
13+
"summary": "Current pricing tiers: Free, Pro ($29/mo), Enterprise (custom).",
14+
"faq": (
15+
"Pro tier includes 3 seats, Enterprise supports SSO and audit logging. "
16+
"Discounts available for annual billing."
17+
),
18+
},
19+
{
20+
"topic": "availability",
21+
"summary": "The service offers 99.9% uptime backed by regional failover.",
22+
"faq": (
23+
"Scheduled maintenance occurs Sundays 02:00-03:00 UTC. "
24+
"Status page: https://status.example.com"
25+
),
26+
},
27+
{
28+
"topic": "integrations",
29+
"summary": "Native integrations include Slack, Jira, and Salesforce connectors.",
30+
"faq": (
31+
"Slack integration supports slash commands. Jira integration syncs tickets "
32+
"bi-directionally every 5 minutes."
33+
),
34+
},
35+
{
36+
"topic": "security",
37+
"summary": "SOC 2 Type II certified, data encrypted in transit and at rest.",
38+
"faq": (
39+
"Role-based access control is available on Pro+. Admins can require MFA. "
40+
"Security whitepaper: https://example.com/security"
41+
),
42+
},
43+
)
44+
45+
46+
@workflow_task(name="cloud_agent_factory.knowledge_base_lookup")
47+
async def knowledge_base_lookup_task(request: dict) -> List[str]:
48+
"""
49+
Return the most relevant knowledge-base snippets for a customer query.
50+
51+
The knowledge base is embedded in the code so the example works identically
52+
in local and hosted environments.
53+
"""
54+
55+
query = str(request.get("query", "")).lower()
56+
limit = max(1, int(request.get("limit", 3)))
57+
58+
if not query.strip():
59+
return []
60+
61+
ranked = sorted(
62+
_KNOWLEDGE_BASE,
63+
key=lambda entry: _score(query, entry),
64+
reverse=True,
65+
)
66+
top_entries = ranked[:limit]
67+
68+
formatted: List[str] = []
69+
for entry in top_entries:
70+
formatted.append(
71+
f"*Topic*: {entry['topic']}\nSummary: {entry['summary']}\nFAQ: {entry['faq']}"
72+
)
73+
return formatted
74+
75+
76+
def _score(query: str, entry: Dict[str, str]) -> int:
77+
score = 0
78+
for token in query.split():
79+
if len(token) < 3:
80+
continue
81+
token_lower = token.lower()
82+
if token_lower in entry["topic"].lower():
83+
score += 3
84+
if token_lower in entry["summary"].lower():
85+
score += 2
86+
if token_lower in entry["faq"].lower():
87+
score += 1
88+
return score

0 commit comments

Comments
 (0)