Skip to content
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
22 changes: 21 additions & 1 deletion docs/advanced/temporal.mdx
Original file line number Diff line number Diff line change
Expand Up @@ -64,14 +64,34 @@ mcp-agent supports both `asyncio` and `temporal` execution engines. While asynci
Update your `mcp_agent.config.yaml`:
```yaml
execution_engine: temporal


# Optional: preload modules that register @workflow_task activities
workflow_task_modules:
- my_package.custom_temporal_tasks

# Optional: override retry behaviour for specific workflow tasks/activities
workflow_task_retry_policies:
my_package.custom_temporal_tasks.my_activity:
maximum_attempts: 1

temporal:
host: localhost
port: 7233
namespace: default
task_queue: mcp-agent
max_concurrent_activities: 10
```
`mcp-agent` preloads its built-in LLM providers automatically. Add extra modules
when you register custom `@workflow_task` activities outside the core packages so
the worker can discover them before starting. Entries are standard Python import paths.
The optional `workflow_task_retry_policies` mapping lets you tune Temporal retry
behaviour per activity (supports exact names, wildcards like `prefix*`, or `*`).
For provider SDKs, common non-retryable error types include:
- OpenAI/Azure OpenAI: `AuthenticationError`, `PermissionDeniedError`, `BadRequestError`, `NotFoundError`, `UnprocessableEntityError`.
- Anthropic: `AuthenticationError`, `PermissionDeniedError`, `BadRequestError`, `NotFoundError`, `UnprocessableEntityError`.
- Azure AI Inference: `HttpResponseError` (400/401/403/404/422).
- Google GenAI: `InvalidArgument`, `FailedPrecondition`, `PermissionDenied`, `NotFound`, `Unauthenticated`.
mcp-agent raises a `WorkflowApplicationError` for these cases so Temporal (or the asyncio executor) avoids retry loops even when the Temporal SDK is not installed locally.
</Step>

<Step title="Create Worker">
Expand Down
41 changes: 41 additions & 0 deletions docs/mcp-agent-sdk/advanced/durable-agents.mdx
Original file line number Diff line number Diff line change
Expand Up @@ -150,6 +150,47 @@ The same helper works on the asyncio executor via `app.context.executor.signal_b

The Temporal server example also shows how durable workflows call nested MCP servers and trigger [MCP elicitation](https://modelcontextprotocol.io/specification/2025-06-18/client/elicitation) when a human response is required. Activities such as `call_nested_elicitation` log progress via `app.app.logger` so the request trace and Temporal history stay aligned.

## Configure workflow-task modules and retry policies

Add optional top-level overrides to preload custom workflow tasks and refine retry behaviour:

```yaml
execution_engine: temporal

workflow_task_modules:
- my_project.temporal_tasks # importable module path

workflow_task_retry_policies:
my_project.temporal_tasks.generate_summary:
maximum_attempts: 1
mcp_agent.workflows.llm.augmented_llm_openai.OpenAICompletionTasks.request_completion_task:
maximum_attempts: 2
non_retryable_error_types:
- AuthenticationError
- PermissionDeniedError
- BadRequestError
- NotFoundError
- UnprocessableEntityError
custom_tasks.*:
initial_interval: 1.5 # seconds (number, string, or timedelta)
backoff_coefficient: 1.2
*:
maximum_attempts: 3
```

- `workflow_task_modules` entries are standard Python import paths; they are imported before the worker begins polling so `@workflow_task` functions register globally.
- `workflow_task_retry_policies` accepts exact activity names, module or class suffixes (`prefix.suffix`), trailing wildcards like `custom_tasks.*`, or the global `*`. The most specific match wins.
- Retry intervals accept seconds (`1.5`), strings (`"2"`), or `timedelta` objects.
- Marking error `type`s in `non_retryable_error_types` prevents Temporal from re-running an activity when the failure is not recoverable (see the [Temporal failure reference](https://docs.temporal.io/references/failures#application-failure)). For provider SDKs, useful values include:
- OpenAI/Azure OpenAI: `AuthenticationError`, `PermissionDeniedError`, `BadRequestError`, `NotFoundError`, `UnprocessableEntityError`.
- Anthropic: `AuthenticationError`, `PermissionDeniedError`, `BadRequestError`, `NotFoundError`, `UnprocessableEntityError`.
- Azure AI Inference: `HttpResponseError` (raised with non-retryable status codes such as 400/401/403/404/422).
- Google GenAI: `InvalidArgument`, `FailedPrecondition`, `PermissionDenied`, `NotFound`, `Unauthenticated`.
- mcp-agent raises `WorkflowApplicationError` (wrapping Temporal's `ApplicationError` when available) for known non-retryable provider failures, so these policies work even if you run without the Temporal extra installed.
- Inspect an activity’s fully-qualified name via `func.execution_metadata["activity_name"]` or through the Temporal UI history when adding a mapping.

With these pieces in place you can gradually introduce durability: start on asyncio, flip the config once you need retries/pause/resume, then iterate on policies and module preloading as your workflow surface grows.

## Operating durable agents

- **Temporal Web UI** (http://localhost:8233) lets you inspect history, replay workflow code, and emit signals.
Expand Down
24 changes: 20 additions & 4 deletions examples/basic/agent_factory/README.md
Original file line number Diff line number Diff line change
@@ -1,13 +1,15 @@
Agent Factory
# Agent Factory

This folder shows how to define agents and compose powerful LLM workflows using the functional helpers in [`factory.py`](https://github.com/lastmile-ai/mcp-agent/blob/main/src/mcp_agent/workflows/factory.py).
This folder shows how to define agents and compose powerful LLM workflows using the helpers in [`mcp_agent.workflows.factory`](https://github.com/lastmile-ai/mcp-agent/blob/main/src/mcp_agent/workflows/factory.py).

What's included

- `agents.yaml`: simple YAML agents
- `mcp_agent.config.yaml`: enables auto-loading subagents from inline definitions and directories
- `mcp_agent.secrets.yaml.example`: template for API keys
- `load_and_route.py`: load agents and route via an LLM
- `main.py`: load agents, register the `route_prompt` tool, and route requests
- `run_worker.py`: Temporal worker (set `execution_engine: temporal` and run this in another terminal)
- `load_and_route.py`: legacy script version of the routing demo
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do we need to maintain legacy support for examples?

- `auto_loaded_subagents.py`: discover subagents from config (Claude-style markdown and others)
- `orchestrator_demo.py`: orchestrator-workers pattern
- `parallel_demo.py`: parallel fan-out/fan-in pattern
Expand All @@ -21,7 +23,21 @@ cp examples/basic/agent_factory/mcp_agent.secrets.yaml.example examples/basic/ag
# Fill in your provider API keys (OpenAI/Anthropic/etc.)
```

2. Run an example
2. Run the main demo

```bash
uv run examples/basic/agent_factory/main.py
```

To exercise the same workflow via Temporal, update `mcp_agent.config.yaml` to set `execution_engine: temporal`, start the worker in another terminal, then invoke the workflow:

```bash
uv run examples/basic/agent_factory/run_worker.py
# ...in another terminal
uv run examples/basic/agent_factory/main.py
```

Other demos in this folder remain available:

```bash
uv run examples/basic/agent_factory/load_and_route.py
Expand Down
44 changes: 44 additions & 0 deletions examples/basic/agent_factory/main.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,44 @@
import asyncio
from pathlib import Path

from mcp.server.fastmcp import Context

from mcp_agent.app import MCPApp
from mcp_agent.workflows.factory import (
create_router_llm,
load_agent_specs_from_file,
)

app = MCPApp(name="factory_demo", description="Demo of agent factory with LLM routing")


@app.async_tool()
async def route_prompt(
prompt: str = "Find the README and summarize it", app_ctx: Context | None = None
) -> str:
"""Route a prompt to the appropriate agent using an LLMRouter."""
context = app_ctx or app.context

agents_path = Path(__file__).resolve().parent / "agents.yaml"
specs = load_agent_specs_from_file(str(agents_path), context=context)

router = await create_router_llm(
server_names=["filesystem", "fetch"],
agents=specs,
provider="openai",
context=context,
)

return await router.generate_str(prompt)


async def main():
async with app.run() as agent_app:
result = await route_prompt(
prompt="Find the README and summarize it", app_ctx=agent_app.context
)
print("Routing result:", result)


if __name__ == "__main__":
asyncio.run(main())
22 changes: 22 additions & 0 deletions examples/basic/agent_factory/run_worker.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,22 @@
"""Run a Temporal worker for the agent factory demo."""

import asyncio
import logging

from mcp_agent.executor.temporal import create_temporal_worker_for_app

from main import app


logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)


async def main():
logger.info("Starting Temporal worker for agent factory demo")
async with create_temporal_worker_for_app(app) as worker:
await worker.run()


if __name__ == "__main__":
asyncio.run(main())
69 changes: 69 additions & 0 deletions examples/cloud/agent_factory/README.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,69 @@
# Cloud Agent Factory (Temporal + Custom Workflow Tasks)

This example routes customer-facing questions to specialized agents, augments
responses with in-code knowledge-base snippets, and shows how to preload custom
`@workflow_task` modules via `workflow_task_modules`.

## What's included

- `main.py` – exposes an `@app.async_tool` (`route_customer_request`) that looks up
knowledge-base context via a workflow task and then routes the enriched
question through an LLMRouter.
- `custom_tasks.py` – defines `knowledge_base_lookup_task` using the
`@workflow_task` decorator. The task provides deterministic answers drawn from
an embedded support knowledge base.
- `agents.yaml` – two sample agents (`support_specialist`, `product_expert`) that
the router can delegate to.
- `run_worker.py` – Temporal worker entry point.
- `mcp_agent.config.yaml` – configures Temporal, lists
`workflow_task_modules: [custom_tasks]` so the worker imports the module before
polling, and sets `workflow_task_retry_policies` to limit retries for the custom
activity. Entries should be importable module paths (here `custom_tasks` lives
alongside `main.py`, so we reference it by module name).

## Quick start

1. Install dependencies and add secrets:
```bash
cd examples/cloud/agent_factory
cp mcp_agent.secrets.yaml.example mcp_agent.secrets.yaml # add OPENAI_API_KEY
uv pip install -r requirements.txt
```

2. Start Temporal elsewhere:
```bash
temporal server start-dev
```

3. Launch the worker:
```bash
uv run run_worker.py
```

4. In another terminal, run the app:
```bash
uv run main.py
```
The tool will fetch knowledge-base context via the workflow task (executed as
a Temporal activity) and produce a routed response.

5. Optional: connect an MCP client while `main.py` is running:
```bash
npx @modelcontextprotocol/inspector --transport sse --server-url http://127.0.0.1:8000/sse
```

## How it works

1. `workflow_task_modules` ensures `custom_tasks.py` is imported during worker
startup, registering `knowledge_base_lookup_task` with the app.
2. `route_customer_request` runs as a Temporal workflow (courtesy of
`@app.async_tool`). Inside the workflow we call
`context.executor.execute(knowledge_base_lookup_task, {...})`; this schedules
the task as an activity, returning curated snippets.
3. The prompt is enriched with those snippets and routed through the factory
helper (`create_router_llm`) to select the best agent and compose the final
reply.

You can expand the example by adding more entries to the knowledge base or by
introducing additional workflow tasks. Simply place them in `custom_tasks.py`
and keep the module listed in `workflow_task_modules`.
18 changes: 18 additions & 0 deletions examples/cloud/agent_factory/agents.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,18 @@
agents:
- name: support_specialist
instruction: |
You are a customer support specialist. Provide empathetic answers,
reference available features, and suggest next steps or workarounds.
When relevant, mention how customers can contact support.
server_names: [fetch]

- name: product_expert
instruction: |
You are a product expert who knows roadmap milestones and integrations.
Provide concise summaries, highlight differentiators, and cite
integrations or security measures when appropriate.
server_names: []

# Note: you could alternatively inline these AgentSpec definitions under
# `agents.definitions` in `mcp_agent.config.yaml`. We keep them in a separate
# YAML file here to highlight loading specs via the factory helpers.
88 changes: 88 additions & 0 deletions examples/cloud/agent_factory/custom_tasks.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,88 @@
"""Custom workflow tasks for the cloud agent factory demo."""

from __future__ import annotations

from typing import Dict, List, Tuple

from mcp_agent.executor.workflow_task import workflow_task


_KNOWLEDGE_BASE: Tuple[Dict[str, str], ...] = (
{
"topic": "pricing",
"summary": "Current pricing tiers: Free, Pro ($29/mo), Enterprise (custom).",
"faq": (
"Pro tier includes 3 seats, Enterprise supports SSO and audit logging. "
"Discounts available for annual billing."
),
},
{
"topic": "availability",
"summary": "The service offers 99.9% uptime backed by regional failover.",
"faq": (
"Scheduled maintenance occurs Sundays 02:00-03:00 UTC. "
"Status page: https://status.example.com"
),
},
{
"topic": "integrations",
"summary": "Native integrations include Slack, Jira, and Salesforce connectors.",
"faq": (
"Slack integration supports slash commands. Jira integration syncs tickets "
"bi-directionally every 5 minutes."
),
},
{
"topic": "security",
"summary": "SOC 2 Type II certified, data encrypted in transit and at rest.",
"faq": (
"Role-based access control is available on Pro+. Admins can require MFA. "
"Security whitepaper: https://example.com/security"
),
},
)


@workflow_task(name="cloud_agent_factory.knowledge_base_lookup")
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
@workflow_task(name="cloud_agent_factory.knowledge_base_lookup")
@workflow_task(name="cloud_agent_factory.knowledge_base_lookup_task")

Should this add the _task suffix to match the locations referencing it?

async def knowledge_base_lookup_task(request: dict) -> List[str]:
"""
Return the most relevant knowledge-base snippets for a customer query.
The knowledge base is embedded in the code so the example works identically
in local and hosted environments.
"""

query = str(request.get("query", "")).lower()
limit = max(1, int(request.get("limit", 3)))

if not query.strip():
return []

ranked = sorted(
_KNOWLEDGE_BASE,
key=lambda entry: _score(query, entry),
reverse=True,
)
top_entries = ranked[:limit]

formatted: List[str] = []
for entry in top_entries:
formatted.append(
f"*Topic*: {entry['topic']}\nSummary: {entry['summary']}\nFAQ: {entry['faq']}"
)
return formatted


def _score(query: str, entry: Dict[str, str]) -> int:
score = 0
for token in query.split():
if len(token) < 3:
continue
token_lower = token.lower()
if token_lower in entry["topic"].lower():
score += 3
if token_lower in entry["summary"].lower():
score += 2
if token_lower in entry["faq"].lower():
score += 1
return score
Loading
Loading