diff --git a/docs/advanced/temporal.mdx b/docs/advanced/temporal.mdx index 6b108aef3..953b36d8c 100644 --- a/docs/advanced/temporal.mdx +++ b/docs/advanced/temporal.mdx @@ -64,7 +64,16 @@ mcp-agent supports both `asyncio` and `temporal` execution engines. While asynci Update your `mcp_agent.config.yaml`: ```yaml execution_engine: temporal - + + # Optional: preload modules that register @workflow_task activities + workflow_task_modules: + - my_package.custom_temporal_tasks + + # Optional: override retry behaviour for specific workflow tasks/activities + workflow_task_retry_policies: + my_package.custom_temporal_tasks.my_activity: + maximum_attempts: 1 + temporal: host: localhost port: 7233 @@ -72,6 +81,17 @@ mcp-agent supports both `asyncio` and `temporal` execution engines. While asynci task_queue: mcp-agent max_concurrent_activities: 10 ``` + `mcp-agent` preloads its built-in LLM providers automatically. Add extra modules + when you register custom `@workflow_task` activities outside the core packages so + the worker can discover them before starting. Entries are standard Python import paths. + The optional `workflow_task_retry_policies` mapping lets you tune Temporal retry + behaviour per activity (supports exact names, wildcards like `prefix*`, or `*`). + For provider SDKs, common non-retryable error types include: + - OpenAI/Azure OpenAI: `AuthenticationError`, `PermissionDeniedError`, `BadRequestError`, `NotFoundError`, `UnprocessableEntityError`. + - Anthropic: `AuthenticationError`, `PermissionDeniedError`, `BadRequestError`, `NotFoundError`, `UnprocessableEntityError`. + - Azure AI Inference: `HttpResponseError` (400/401/403/404/422). + - Google GenAI: `InvalidArgument`, `FailedPrecondition`, `PermissionDenied`, `NotFound`, `Unauthenticated`. + mcp-agent raises a `WorkflowApplicationError` for these cases so Temporal (or the asyncio executor) avoids retry loops even when the Temporal SDK is not installed locally. diff --git a/docs/mcp-agent-sdk/advanced/durable-agents.mdx b/docs/mcp-agent-sdk/advanced/durable-agents.mdx index bf6a2beb4..5dbd297a6 100644 --- a/docs/mcp-agent-sdk/advanced/durable-agents.mdx +++ b/docs/mcp-agent-sdk/advanced/durable-agents.mdx @@ -150,6 +150,48 @@ The same helper works on the asyncio executor via `app.context.executor.signal_b The Temporal server example also shows how durable workflows call nested MCP servers and trigger [MCP elicitation](https://modelcontextprotocol.io/specification/2025-06-18/client/elicitation) when a human response is required. Activities such as `call_nested_elicitation` log progress via `app.app.logger` so the request trace and Temporal history stay aligned. +## Configure workflow-task modules and retry policies + +Add optional top-level overrides to preload custom workflow tasks and refine retry behaviour: + +```yaml +execution_engine: temporal + +workflow_task_modules: + - my_project.temporal_tasks # importable module path + +workflow_task_retry_policies: + my_project.temporal_tasks.generate_summary: + maximum_attempts: 1 + mcp_agent.workflows.llm.augmented_llm_openai.OpenAICompletionTasks.request_completion_task: + maximum_attempts: 2 + non_retryable_error_types: + - AuthenticationError + - PermissionDeniedError + - BadRequestError + - NotFoundError + - UnprocessableEntityError + custom_tasks.*: + initial_interval: 1.5 # seconds (number, string, or timedelta) + backoff_coefficient: 1.2 + *: + maximum_attempts: 3 +``` + +- `workflow_task_modules` entries are standard Python import paths; they are imported before the worker begins polling so `@workflow_task` functions register globally. +- `workflow_task_retry_policies` accepts exact activity names, module or class suffixes (`prefix.suffix`), trailing wildcards like `custom_tasks.*`, or the global `*`. The most specific match wins. +- Retry intervals accept seconds (`1.5`), strings (`"2"`), or `timedelta` objects. +- Marking error `type`s in `non_retryable_error_types` prevents Temporal from re-running an activity when the failure is not recoverable (see the [Temporal failure reference](https://docs.temporal.io/references/failures#application-failure)). For provider SDKs, useful values include: + - OpenAI/Azure OpenAI: `AuthenticationError`, `PermissionDeniedError`, `BadRequestError`, `NotFoundError`, `UnprocessableEntityError`. + - Anthropic: `AuthenticationError`, `PermissionDeniedError`, `BadRequestError`, `NotFoundError`, `UnprocessableEntityError`. + - Azure AI Inference: `HttpResponseError` (raised with non-retryable status codes such as 400/401/403/404/422). + - Google GenAI: `InvalidArgument`, `FailedPrecondition`, `PermissionDenied`, `NotFound`, `Unauthenticated`. +- mcp-agent raises `WorkflowApplicationError` (wrapping Temporal's `ApplicationError` when available) for known non-retryable provider failures, so these policies work even if you run without the Temporal extra installed. +- Inspect an activity’s fully-qualified name via `func.execution_metadata["activity_name"]` or through the Temporal UI history when adding a mapping. +- Temporal matches `non_retryable_error_types` using the exception class name string you supply (see the [RetryPolicy reference](https://docs.temporal.io/references/sdk-apis/python/temporalio.common/#temporalio-common-RetryPolicy)). Use the narrowest names possible—overly generic entries such as `NotFoundError` can suppress legitimate retries if a workflow expects to handle that condition and try again. + +With these pieces in place you can gradually introduce durability: start on asyncio, flip the config once you need retries/pause/resume, then iterate on policies and module preloading as your workflow surface grows. + ## Operating durable agents - **Temporal Web UI** (http://localhost:8233) lets you inspect history, replay workflow code, and emit signals. diff --git a/examples/basic/agent_factory/README.md b/examples/basic/agent_factory/README.md index 669540681..7b2386ee9 100644 --- a/examples/basic/agent_factory/README.md +++ b/examples/basic/agent_factory/README.md @@ -1,13 +1,14 @@ -Agent Factory +# Agent Factory -This folder shows how to define agents and compose powerful LLM workflows using the functional helpers in [`factory.py`](https://github.com/lastmile-ai/mcp-agent/blob/main/src/mcp_agent/workflows/factory.py). +This folder shows how to define agents and compose powerful LLM workflows using the helpers in [`mcp_agent.workflows.factory`](https://github.com/lastmile-ai/mcp-agent/blob/main/src/mcp_agent/workflows/factory.py). What's included - `agents.yaml`: simple YAML agents - `mcp_agent.config.yaml`: enables auto-loading subagents from inline definitions and directories - `mcp_agent.secrets.yaml.example`: template for API keys -- `load_and_route.py`: load agents and route via an LLM +- `main.py`: load agents, register the `route_prompt` tool, and route requests +- `run_worker.py`: Temporal worker (set `execution_engine: temporal` and run this in another terminal) - `auto_loaded_subagents.py`: discover subagents from config (Claude-style markdown and others) - `orchestrator_demo.py`: orchestrator-workers pattern - `parallel_demo.py`: parallel fan-out/fan-in pattern @@ -21,10 +22,23 @@ cp examples/basic/agent_factory/mcp_agent.secrets.yaml.example examples/basic/ag # Fill in your provider API keys (OpenAI/Anthropic/etc.) ``` -2. Run an example +2. Run the main demo + +```bash +uv run examples/basic/agent_factory/main.py +``` + +To exercise the same workflow via Temporal, update `mcp_agent.config.yaml` to set `execution_engine: temporal`, start the worker in another terminal, then invoke the workflow: + +```bash +uv run examples/basic/agent_factory/run_worker.py +# ...in another terminal +uv run examples/basic/agent_factory/main.py +``` + +Other demos in this folder remain available: ```bash -uv run examples/basic/agent_factory/load_and_route.py uv run examples/basic/agent_factory/orchestrator_demo.py uv run examples/basic/agent_factory/parallel_demo.py uv run examples/basic/agent_factory/auto_loaded_subagents.py diff --git a/examples/basic/agent_factory/main.py b/examples/basic/agent_factory/main.py new file mode 100644 index 000000000..a92f13e37 --- /dev/null +++ b/examples/basic/agent_factory/main.py @@ -0,0 +1,44 @@ +import asyncio +from pathlib import Path + +from mcp_agent.core.context import Context + +from mcp_agent.app import MCPApp +from mcp_agent.workflows.factory import ( + create_router_llm, + load_agent_specs_from_file, +) + +app = MCPApp(name="factory_demo", description="Demo of agent factory with LLM routing") + + +@app.async_tool() +async def route_prompt( + prompt: str = "Find the README and summarize it", app_ctx: Context | None = None +) -> str: + """Route a prompt to the appropriate agent using an LLMRouter.""" + context = app_ctx or app.context + + agents_path = Path(__file__).resolve().parent / "agents.yaml" + specs = load_agent_specs_from_file(str(agents_path), context=context) + + router = await create_router_llm( + server_names=["filesystem", "fetch"], + agents=specs, + provider="openai", + context=context, + ) + + return await router.generate_str(prompt) + + +async def main(): + async with app.run() as agent_app: + result = await route_prompt( + prompt="Find the README and summarize it", app_ctx=agent_app.context + ) + print("Routing result:", result) + + +if __name__ == "__main__": + asyncio.run(main()) diff --git a/examples/basic/agent_factory/run_worker.py b/examples/basic/agent_factory/run_worker.py new file mode 100644 index 000000000..08fcfea38 --- /dev/null +++ b/examples/basic/agent_factory/run_worker.py @@ -0,0 +1,22 @@ +"""Run a Temporal worker for the agent factory demo.""" + +import asyncio +import logging + +from mcp_agent.executor.temporal import create_temporal_worker_for_app + +from main import app + + +logging.basicConfig(level=logging.INFO) +logger = logging.getLogger(__name__) + + +async def main(): + logger.info("Starting Temporal worker for agent factory demo") + async with create_temporal_worker_for_app(app) as worker: + await worker.run() + + +if __name__ == "__main__": + asyncio.run(main()) diff --git a/examples/cloud/agent_factory/README.md b/examples/cloud/agent_factory/README.md new file mode 100644 index 000000000..dba565735 --- /dev/null +++ b/examples/cloud/agent_factory/README.md @@ -0,0 +1,69 @@ +# Cloud Agent Factory (Temporal + Custom Workflow Tasks) + +This example routes customer-facing questions to specialized agents, augments +responses with in-code knowledge-base snippets, and shows how to preload custom +`@workflow_task` modules via `workflow_task_modules`. + +## What's included + +- `main.py` – exposes an `@app.async_tool` (`route_customer_request`) that looks up + knowledge-base context via a workflow task and then routes the enriched + question through an LLMRouter. +- `custom_tasks.py` – defines `knowledge_base_lookup_task` using the + `@workflow_task` decorator. The task provides deterministic answers drawn from + an embedded support knowledge base. +- `agents.yaml` – two sample agents (`support_specialist`, `product_expert`) that + the router can delegate to. +- `run_worker.py` – Temporal worker entry point. +- `mcp_agent.config.yaml` – configures Temporal, lists + `workflow_task_modules: [custom_tasks]` so the worker imports the module before + polling, and sets `workflow_task_retry_policies` to limit retries for the custom + activity. Entries should be importable module paths (here `custom_tasks` lives + alongside `main.py`, so we reference it by module name). + +## Quick start + +1. Install dependencies and add secrets: + ```bash + cd examples/cloud/agent_factory + cp mcp_agent.secrets.yaml.example mcp_agent.secrets.yaml # add OPENAI_API_KEY + uv pip install -r requirements.txt + ``` + +2. Start Temporal elsewhere: + ```bash + temporal server start-dev + ``` + +3. Launch the worker: + ```bash + uv run run_worker.py + ``` + +4. In another terminal, run the app: + ```bash + uv run main.py + ``` + The tool will fetch knowledge-base context via the workflow task (executed as + a Temporal activity) and produce a routed response. + +5. Optional: connect an MCP client while `main.py` is running: + ```bash + npx @modelcontextprotocol/inspector --transport sse --server-url http://127.0.0.1:8000/sse + ``` + +## How it works + +1. `workflow_task_modules` ensures `custom_tasks.py` is imported during worker + startup, registering `knowledge_base_lookup_task` with the app. +2. `route_customer_request` runs as a Temporal workflow (courtesy of + `@app.async_tool`). Inside the workflow we call + `context.executor.execute(knowledge_base_lookup_task, {...})`; this schedules + the task as an activity, returning curated snippets. +3. The prompt is enriched with those snippets and routed through the factory + helper (`create_router_llm`) to select the best agent and compose the final + reply. + +You can expand the example by adding more entries to the knowledge base or by +introducing additional workflow tasks. Simply place them in `custom_tasks.py` +and keep the module listed in `workflow_task_modules`. diff --git a/examples/cloud/agent_factory/agents.yaml b/examples/cloud/agent_factory/agents.yaml new file mode 100644 index 000000000..dc9e5128e --- /dev/null +++ b/examples/cloud/agent_factory/agents.yaml @@ -0,0 +1,18 @@ +agents: + - name: support_specialist + instruction: | + You are a customer support specialist. Provide empathetic answers, + reference available features, and suggest next steps or workarounds. + When relevant, mention how customers can contact support. + server_names: [fetch] + + - name: product_expert + instruction: | + You are a product expert who knows roadmap milestones and integrations. + Provide concise summaries, highlight differentiators, and cite + integrations or security measures when appropriate. + server_names: [] + +# Note: you could alternatively inline these AgentSpec definitions under +# `agents.definitions` in `mcp_agent.config.yaml`. We keep them in a separate +# YAML file here to highlight loading specs via the factory helpers. diff --git a/examples/cloud/agent_factory/custom_tasks.py b/examples/cloud/agent_factory/custom_tasks.py new file mode 100644 index 000000000..d7fa0d1a3 --- /dev/null +++ b/examples/cloud/agent_factory/custom_tasks.py @@ -0,0 +1,88 @@ +"""Custom workflow tasks for the cloud agent factory demo.""" + +from __future__ import annotations + +from typing import Dict, List, Tuple + +from mcp_agent.executor.workflow_task import workflow_task + + +_KNOWLEDGE_BASE: Tuple[Dict[str, str], ...] = ( + { + "topic": "pricing", + "summary": "Current pricing tiers: Free, Pro ($29/mo), Enterprise (custom).", + "faq": ( + "Pro tier includes 3 seats, Enterprise supports SSO and audit logging. " + "Discounts available for annual billing." + ), + }, + { + "topic": "availability", + "summary": "The service offers 99.9% uptime backed by regional failover.", + "faq": ( + "Scheduled maintenance occurs Sundays 02:00-03:00 UTC. " + "Status page: https://status.example.com" + ), + }, + { + "topic": "integrations", + "summary": "Native integrations include Slack, Jira, and Salesforce connectors.", + "faq": ( + "Slack integration supports slash commands. Jira integration syncs tickets " + "bi-directionally every 5 minutes." + ), + }, + { + "topic": "security", + "summary": "SOC 2 Type II certified, data encrypted in transit and at rest.", + "faq": ( + "Role-based access control is available on Pro+. Admins can require MFA. " + "Security whitepaper: https://example.com/security" + ), + }, +) + + +@workflow_task(name="cloud_agent_factory.knowledge_base_lookup") +async def knowledge_base_lookup_task(request: dict) -> List[str]: + """ + Return the most relevant knowledge-base snippets for a customer query. + + The knowledge base is embedded in the code so the example works identically + in local and hosted environments. + """ + + query = str(request.get("query", "")).lower() + limit = max(1, int(request.get("limit", 3))) + + if not query.strip(): + return [] + + ranked = sorted( + _KNOWLEDGE_BASE, + key=lambda entry: _score(query, entry), + reverse=True, + ) + top_entries = ranked[:limit] + + formatted: List[str] = [] + for entry in top_entries: + formatted.append( + f"*Topic*: {entry['topic']}\nSummary: {entry['summary']}\nFAQ: {entry['faq']}" + ) + return formatted + + +def _score(query: str, entry: Dict[str, str]) -> int: + score = 0 + for token in query.split(): + if len(token) < 3: + continue + token_lower = token.lower() + if token_lower in entry["topic"].lower(): + score += 3 + if token_lower in entry["summary"].lower(): + score += 2 + if token_lower in entry["faq"].lower(): + score += 1 + return score diff --git a/examples/cloud/agent_factory/main.py b/examples/cloud/agent_factory/main.py new file mode 100644 index 000000000..76fdf2192 --- /dev/null +++ b/examples/cloud/agent_factory/main.py @@ -0,0 +1,77 @@ +"""Temporal cloud agent factory example with custom workflow tasks.""" + +from __future__ import annotations + +import asyncio +from pathlib import Path + +from mcp_agent.core.context import Context + +from mcp_agent.app import MCPApp +from mcp_agent.server.app_server import create_mcp_server_for_app +from mcp_agent.workflows.factory import ( + create_router_llm, + load_agent_specs_from_file, +) + +try: + from .custom_tasks import knowledge_base_lookup_task +except ImportError: # pragma: no cover - executed when run as a script + from custom_tasks import knowledge_base_lookup_task + +app = MCPApp( + name="cloud_agent_factory", + description="Temporal agent factory demo that uses custom workflow tasks", +) + + +@app.async_tool() +async def route_customer_request( + prompt: str = "A customer is asking about our pricing and security posture.", + context_hits: int = 3, + app_ctx: Context | None = None, +) -> str: + """Route customer-facing questions and seed the LLM with KB context.""" + context = app_ctx or app.context + + kb_snippets = await context.executor.execute( + knowledge_base_lookup_task, + {"query": prompt, "limit": context_hits}, + ) + if isinstance(kb_snippets, BaseException): + raise kb_snippets + + kb_context = "\n\n".join(kb_snippets) if kb_snippets else "No knowledge-base hits." + agents_path = Path(__file__).resolve().parent / "agents.yaml" + specs = load_agent_specs_from_file(str(agents_path), context=context) + + router = await create_router_llm( + server_names=["filesystem", "fetch"], + agents=specs, + provider="openai", + context=context, + ) + + enriched_prompt = ( + "You are triaging a customer request.\n" + f"Customer question:\n{prompt}\n\n" + f"Knowledge-base snippets:\n{kb_context}\n\n" + "Compose a helpful, empathetic reply that references the most relevant details." + ) + return await router.generate_str(enriched_prompt) + + +# async def main(): +# async with app.run() as agent_app: +# result = await route_customer_request(app_ctx=agent_app.context) +# print("Routing result:", result) + + +async def main(): + async with app.run() as agent_app: + mcp_server = create_mcp_server_for_app(agent_app) + await mcp_server.run_sse_async() + + +if __name__ == "__main__": + asyncio.run(main()) diff --git a/examples/cloud/agent_factory/mcp_agent.config.yaml b/examples/cloud/agent_factory/mcp_agent.config.yaml new file mode 100644 index 000000000..bfa8a9132 --- /dev/null +++ b/examples/cloud/agent_factory/mcp_agent.config.yaml @@ -0,0 +1,38 @@ +# Temporal configuration for the cloud agent factory demo +$schema: ../../schema/mcp-agent.config.schema.json + +execution_engine: temporal + +workflow_task_modules: + - custom_tasks # module path relative to sys.path (here, alongside main.py) + +workflow_task_retry_policies: + cloud_agent_factory.knowledge_base_lookup: + maximum_attempts: 1 + +# Temporal settings +temporal: + host: "localhost:7233" # Default Temporal server address + namespace: "default" # Default Temporal namespace + task_queue: "mcp-agent" # Task queue for workflows and activities + max_concurrent_activities: 10 # Maximum number of concurrent activities + rpc_metadata: + X-Client-Name: "mcp-agent" + +logger: + transports: [console] + level: info + +mcp: + servers: + fetch: + command: "uvx" + args: ["mcp-server-fetch"] + description: "Fetch content from the web" + filesystem: + command: "npx" + args: ["-y", "@modelcontextprotocol/server-filesystem", "."] + description: "Read local files" + +openai: + default_model: gpt-4o-mini diff --git a/examples/cloud/agent_factory/mcp_agent.secrets.yaml.example b/examples/cloud/agent_factory/mcp_agent.secrets.yaml.example new file mode 100644 index 000000000..c8892e61e --- /dev/null +++ b/examples/cloud/agent_factory/mcp_agent.secrets.yaml.example @@ -0,0 +1,2 @@ +openai: + api_key: "your-openai-api-key" diff --git a/examples/cloud/agent_factory/requirements.txt b/examples/cloud/agent_factory/requirements.txt new file mode 100644 index 000000000..33519dd75 --- /dev/null +++ b/examples/cloud/agent_factory/requirements.txt @@ -0,0 +1,6 @@ +# Core framework dependency +mcp-agent @ file://../../../ + +# LLM providers used in this demo +openai +anthropic diff --git a/examples/cloud/agent_factory/run_worker.py b/examples/cloud/agent_factory/run_worker.py new file mode 100644 index 000000000..3cec15b40 --- /dev/null +++ b/examples/cloud/agent_factory/run_worker.py @@ -0,0 +1,21 @@ +"""Temporal worker for the cloud agent factory example.""" + +import asyncio +import logging + +from mcp_agent.executor.temporal import create_temporal_worker_for_app + +from main import app + +logging.basicConfig(level=logging.INFO) +logger = logging.getLogger(__name__) + + +async def main(): + logger.info("Starting Temporal worker for cloud agent factory demo") + async with create_temporal_worker_for_app(app) as worker: + await worker.run() + + +if __name__ == "__main__": + asyncio.run(main()) diff --git a/examples/cloud/temporal/README.md b/examples/cloud/temporal/README.md index 8237f2d5b..278d79462 100644 --- a/examples/cloud/temporal/README.md +++ b/examples/cloud/temporal/README.md @@ -74,6 +74,8 @@ openai: api_key: "your-openai-api-key" ``` +The provided `mcp_agent.config.yaml` already targets the local Temporal dev server. If you register additional `@workflow_task` activities in your own modules, uncomment the top-level `workflow_task_modules` list in that file and add your module paths so the worker imports them at startup. + ## Test Locally Before running this example, you need to have a Temporal server running: diff --git a/examples/cloud/temporal/mcp_agent.config.yaml b/examples/cloud/temporal/mcp_agent.config.yaml index 32a2bc8db..566b82d01 100644 --- a/examples/cloud/temporal/mcp_agent.config.yaml +++ b/examples/cloud/temporal/mcp_agent.config.yaml @@ -4,6 +4,15 @@ $schema: ../../schema/mcp-agent.config.schema.json # Set the execution engine to Temporal execution_engine: "temporal" +# Optional: preload modules that declare @workflow_task activities +# workflow_task_modules: +# - my_project.custom_tasks + +# Optional: override retry behaviour for specific activities +# workflow_task_retry_policies: +# my_project.custom_tasks.my_activity: +# maximum_attempts: 1 + # Temporal settings temporal: host: "localhost:7233" # Default Temporal server address diff --git a/examples/mcp_agent_server/temporal/README.md b/examples/mcp_agent_server/temporal/README.md index 82b2d2a62..dd4f4820f 100644 --- a/examples/mcp_agent_server/temporal/README.md +++ b/examples/mcp_agent_server/temporal/README.md @@ -107,10 +107,12 @@ The `mcp_agent.config.yaml` file contains paths to executables. For Claude Deskt ``` 2. Edit `mcp_agent.secrets.yaml` to add your API keys: - ```yaml - openai: - api_key: "your-openai-api-key" - ``` + ```yaml + openai: + api_key: "your-openai-api-key" + ``` + +The included `mcp_agent.config.yaml` is wired for the local Temporal dev server. If you define extra `@workflow_task` functions in your own modules, uncomment the top-level `workflow_task_modules` list in that config and add your module paths so the worker pre-imports them when it starts. ## How to Run diff --git a/examples/mcp_agent_server/temporal/mcp_agent.config.yaml b/examples/mcp_agent_server/temporal/mcp_agent.config.yaml index e1ce60ba5..ef00aeee5 100644 --- a/examples/mcp_agent_server/temporal/mcp_agent.config.yaml +++ b/examples/mcp_agent_server/temporal/mcp_agent.config.yaml @@ -4,6 +4,15 @@ $schema: ../../schema/mcp-agent.config.schema.json # Set the execution engine to Temporal execution_engine: "temporal" +# Optional: preload modules that declare @workflow_task activities +# workflow_task_modules: +# - my_project.custom_tasks + +# Optional: override retry behaviour for specific activities +# workflow_task_retry_policies: +# my_project.custom_tasks.my_activity: +# maximum_attempts: 1 + # Temporal settings temporal: host: "localhost:7233" # Default Temporal server address diff --git a/pyproject.toml b/pyproject.toml index 2d6f5052a..7de6decf9 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -110,13 +110,15 @@ mcpc = "mcp_agent.cli.cloud.main:run" include = ["mcp-agent"] [tool.setuptools.package-data] -mcp-agent = [ # TODO: should this be mcp_agent? +mcp_agent = [ "data/*.json", + "data/templates/**/*", + "data/examples/**/*", "resources/examples/**/*.py", "resources/examples/**/*.yaml", "resources/examples/**/*.yml", "resources/examples/**/*.csv", - "resources/examples/**/mount-point/*.csv" + "resources/examples/**/mount-point/*.csv", ] [tool.pytest.ini_options] diff --git a/src/mcp_agent/app.py b/src/mcp_agent/app.py index ed4a1d18c..dcb6fba19 100644 --- a/src/mcp_agent/app.py +++ b/src/mcp_agent/app.py @@ -1122,6 +1122,62 @@ def decorator(fn: Callable[P, R]) -> Callable[P, R]: return decorator + def _get_configured_retry_policy(self, activity_name: str) -> Dict[str, Any] | None: + """ + Compute the retry policy override for a workflow task. + + Matching precedence (highest first): + - Exact full activity name (e.g., ``package.module.task``) + - Dotted suffix match (``task`` or ``module.task``) + - Prefix wildcard (``package.*``), with longest prefix winning + - Global fallback (``*``) + """ + overrides = getattr(self.config, "workflow_task_retry_policies", None) + if not overrides: + return None + + def coerce(policy: Any) -> Dict[str, Any]: + if policy is None: + return {} + if hasattr(policy, "to_temporal_kwargs"): + return policy.to_temporal_kwargs() + return dict(policy) + + best_match: tuple[int, int, Dict[str, Any]] | None = None + + def record(priority: int, length: int, policy_dict: Dict[str, Any]): + nonlocal best_match + candidate = (priority, length, policy_dict) + if best_match is None or candidate > best_match: + best_match = candidate + + for key, policy_obj in overrides.items(): + policy_dict = coerce(policy_obj) + if not policy_dict: + continue + + if key == "*": + record(0, 0, policy_dict) + continue + + if key.endswith("*"): + prefix = key[:-1] + if activity_name.startswith(prefix): + record(1, len(prefix), policy_dict) + continue + + if "." in key: + if activity_name == key: + record(3, len(key), policy_dict) + elif activity_name.endswith(f".{key}"): + record(2, len(key), policy_dict) + continue + + if activity_name.split(".")[-1] == key: + record(2, len(key), policy_dict) + + return best_match[2] if best_match else None + def workflow_task( self, name: str | None = None, @@ -1162,6 +1218,11 @@ def decorator(target: Callable[..., R]) -> Callable[..., R]: **meta_kwargs, } + override_policy = self._get_configured_retry_policy(activity_name) + if override_policy: + existing_policy = metadata.get("retry_policy") or {} + metadata["retry_policy"] = {**existing_policy, **override_policy} + # bookkeeping that survives partial/bound wrappers func.is_workflow_task = True func.execution_metadata = metadata @@ -1236,6 +1297,14 @@ def _register_global_workflow_tasks(self): self._registered_global_workflow_tasks.add(activity_name) continue + override_policy = self._get_configured_retry_policy(activity_name) + if override_policy: + existing_policy = metadata.get("retry_policy") or {} + metadata["retry_policy"] = {**existing_policy, **override_policy} + + func.is_workflow_task = True + func.execution_metadata = metadata + # Apply the engine-specific decorator if available task_defn = self._decorator_registry.get_workflow_task_decorator( self.config.execution_engine diff --git a/src/mcp_agent/cli/commands/init.py b/src/mcp_agent/cli/commands/init.py index 81c4aad33..883681edd 100644 --- a/src/mcp_agent/cli/commands/init.py +++ b/src/mcp_agent/cli/commands/init.py @@ -133,8 +133,7 @@ def init( scaffolding_templates = { "basic": "Simple agent with filesystem and fetch capabilities", "server": "MCP server with workflow and parallel agents", - # "token": "Token counting example with monitoring", - # "factory": "Agent factory with router-based selection", + "factory": "Agent factory with router-based selection", "minimal": "Minimal configuration files only", } @@ -364,28 +363,11 @@ def init( if created: files_created.append(created) - elif template == "token": - token_path = dir / "token_example.py" - token_content = _load_template("token_counter.py") - if token_content and _write(token_path, token_content, force): - files_created.append("token_example.py") - # Make executable - try: - token_path.chmod(token_path.stat().st_mode | 0o111) - except Exception: - pass - - readme_content = _load_template("README_token.md") - if readme_content: - created = _write_readme(dir, readme_content, force) - if created: - files_created.append(created) - elif template == "factory": - factory_path = dir / "factory.py" + factory_path = dir / "main.py" factory_content = _load_template("agent_factory.py") if factory_content and _write(factory_path, factory_content, force): - files_created.append("factory.py") + files_created.append("main.py") # Make executable try: factory_path.chmod(factory_path.stat().st_mode | 0o111) @@ -398,6 +380,15 @@ def init( if agents_content and _write(agents_path, agents_content, force): files_created.append("agents.yaml") + run_worker_path = dir / "run_worker.py" + run_worker_content = _load_template("agent_factory_run_worker.py") + if run_worker_content and _write(run_worker_path, run_worker_content, force): + files_created.append("run_worker.py") + try: + run_worker_path.chmod(run_worker_path.stat().st_mode | 0o111) + except Exception: + pass + readme_content = _load_template("README_factory.md") if readme_content: created = _write_readme(dir, readme_content, force) @@ -425,12 +416,15 @@ def init( console.print( " Or serve: [cyan]mcp-agent dev serve --script main.py[/cyan]" ) - elif template == "token": - console.print("3. Run the example: [cyan]uv run token_example.py[/cyan]") - console.print(" Watch token usage in real-time!") elif template == "factory": console.print("3. Customize agents in [cyan]agents.yaml[/cyan]") - console.print("4. Run the factory: [cyan]uv run factory.py[/cyan]") + console.print("4. Run the factory: [cyan]uv run main.py[/cyan]") + console.print( + " Optional: to exercise Temporal locally, run [cyan]temporal server start-dev[/cyan]" + ) + console.print( + " in another terminal and start the worker with [cyan]uv run run_worker.py[/cyan]." + ) elif template == "minimal": console.print("3. Create your agent script") console.print(" See examples: [cyan]mcp-agent init --list[/cyan]") @@ -459,9 +453,8 @@ def interactive( templates = { "1": ("basic", "Simple agent with filesystem and fetch"), "2": ("server", "MCP server with workflows"), - "3": ("token", "Token counting with monitoring"), - "4": ("factory", "Agent factory with routing"), - "5": ("minimal", "Config files only"), + "3": ("factory", "Agent factory with routing"), + "4": ("minimal", "Config files only"), } console.print("\n[bold]Choose a template:[/bold]") diff --git a/src/mcp_agent/config.py b/src/mcp_agent/config.py index 09711ef2f..367236a96 100644 --- a/src/mcp_agent/config.py +++ b/src/mcp_agent/config.py @@ -8,6 +8,7 @@ from io import StringIO from pathlib import Path from typing import Any, Dict, List, Literal, Optional, Set, Union +from datetime import timedelta import threading import warnings @@ -633,10 +634,63 @@ class TemporalSettings(BaseModel): "reject_duplicate", "terminate_if_running", ] = "allow_duplicate" + workflow_task_modules: List[str] = Field(default_factory=list) + """Additional module paths to import before creating a Temporal worker. Each should be importable.""" model_config = ConfigDict(extra="allow", arbitrary_types_allowed=True) +class WorkflowTaskRetryPolicy(BaseModel): + """ + Declarative retry policy for workflow tasks / activities (mirrors Temporal RetryPolicy fields). + Durations can be specified either as seconds (number) or ISO8601 timedelta strings; both are + coerced to datetime.timedelta instances. + """ + + maximum_attempts: int | None = None + initial_interval: timedelta | float | str | None = None + backoff_coefficient: float | None = None + maximum_interval: timedelta | float | str | None = None + non_retryable_error_types: List[str] | None = None + + model_config = ConfigDict(extra="forbid") + + @field_validator("initial_interval", "maximum_interval", mode="before") + @classmethod + def _coerce_interval(cls, value): + if value is None: + return None + if isinstance(value, timedelta): + return value + if isinstance(value, (int, float)): + return timedelta(seconds=value) + if isinstance(value, str): + try: + seconds = float(value) + return timedelta(seconds=seconds) + except Exception: + raise TypeError( + "Retry interval strings must be parseable as seconds." + ) from None + raise TypeError( + "Retry interval must be seconds (number or string) or a timedelta." + ) + + def to_temporal_kwargs(self) -> Dict[str, Any]: + data: Dict[str, Any] = {} + if self.maximum_attempts is not None: + data["maximum_attempts"] = self.maximum_attempts + if self.initial_interval is not None: + data["initial_interval"] = self.initial_interval + if self.backoff_coefficient is not None: + data["backoff_coefficient"] = self.backoff_coefficient + if self.maximum_interval is not None: + data["maximum_interval"] = self.maximum_interval + if self.non_retryable_error_types: + data["non_retryable_error_types"] = list(self.non_retryable_error_types) + return data + + class UsageTelemetrySettings(BaseModel): """ Settings for usage telemetry in the MCP Agent application. @@ -1077,6 +1131,14 @@ class Settings(BaseSettings): openai: OpenAISettings | None = Field(default_factory=OpenAISettings) """Settings for using OpenAI models in the MCP Agent application""" + workflow_task_modules: List[str] = Field(default_factory=list) + """Optional list of modules to import at startup so workflow tasks register globally.""" + + workflow_task_retry_policies: Dict[str, WorkflowTaskRetryPolicy] = Field( + default_factory=dict + ) + """Optional mapping of activity names (supports '*' and 'prefix*') to retry policies.""" + azure: AzureSettings | None = Field(default_factory=AzureSettings) """Settings for using Azure models in the MCP Agent application""" @@ -1258,10 +1320,19 @@ def deep_merge(base: dict, update: dict, path: tuple = ()) -> dict: key in merged and isinstance(merged[key], list) and isinstance(value, list) - and current_path == ("otel", "exporters") + and current_path + in { + ("otel", "exporters"), + ("workflow_task_modules",), + } ): - # Concatenate exporters lists from config and secrets - merged[key] = merged[key] + value + # Concatenate list-based settings while preserving order and removing duplicates + combined = merged[key] + value + deduped = [] + for item in combined: + if not any(existing == item for existing in deduped): + deduped.append(item) + merged[key] = deduped else: merged[key] = value return merged diff --git a/src/mcp_agent/data/examples/cloud/agent_factory/README.md b/src/mcp_agent/data/examples/cloud/agent_factory/README.md new file mode 100644 index 000000000..dba565735 --- /dev/null +++ b/src/mcp_agent/data/examples/cloud/agent_factory/README.md @@ -0,0 +1,69 @@ +# Cloud Agent Factory (Temporal + Custom Workflow Tasks) + +This example routes customer-facing questions to specialized agents, augments +responses with in-code knowledge-base snippets, and shows how to preload custom +`@workflow_task` modules via `workflow_task_modules`. + +## What's included + +- `main.py` – exposes an `@app.async_tool` (`route_customer_request`) that looks up + knowledge-base context via a workflow task and then routes the enriched + question through an LLMRouter. +- `custom_tasks.py` – defines `knowledge_base_lookup_task` using the + `@workflow_task` decorator. The task provides deterministic answers drawn from + an embedded support knowledge base. +- `agents.yaml` – two sample agents (`support_specialist`, `product_expert`) that + the router can delegate to. +- `run_worker.py` – Temporal worker entry point. +- `mcp_agent.config.yaml` – configures Temporal, lists + `workflow_task_modules: [custom_tasks]` so the worker imports the module before + polling, and sets `workflow_task_retry_policies` to limit retries for the custom + activity. Entries should be importable module paths (here `custom_tasks` lives + alongside `main.py`, so we reference it by module name). + +## Quick start + +1. Install dependencies and add secrets: + ```bash + cd examples/cloud/agent_factory + cp mcp_agent.secrets.yaml.example mcp_agent.secrets.yaml # add OPENAI_API_KEY + uv pip install -r requirements.txt + ``` + +2. Start Temporal elsewhere: + ```bash + temporal server start-dev + ``` + +3. Launch the worker: + ```bash + uv run run_worker.py + ``` + +4. In another terminal, run the app: + ```bash + uv run main.py + ``` + The tool will fetch knowledge-base context via the workflow task (executed as + a Temporal activity) and produce a routed response. + +5. Optional: connect an MCP client while `main.py` is running: + ```bash + npx @modelcontextprotocol/inspector --transport sse --server-url http://127.0.0.1:8000/sse + ``` + +## How it works + +1. `workflow_task_modules` ensures `custom_tasks.py` is imported during worker + startup, registering `knowledge_base_lookup_task` with the app. +2. `route_customer_request` runs as a Temporal workflow (courtesy of + `@app.async_tool`). Inside the workflow we call + `context.executor.execute(knowledge_base_lookup_task, {...})`; this schedules + the task as an activity, returning curated snippets. +3. The prompt is enriched with those snippets and routed through the factory + helper (`create_router_llm`) to select the best agent and compose the final + reply. + +You can expand the example by adding more entries to the knowledge base or by +introducing additional workflow tasks. Simply place them in `custom_tasks.py` +and keep the module listed in `workflow_task_modules`. diff --git a/src/mcp_agent/data/examples/cloud/agent_factory/agents.yaml b/src/mcp_agent/data/examples/cloud/agent_factory/agents.yaml new file mode 100644 index 000000000..73c5fec15 --- /dev/null +++ b/src/mcp_agent/data/examples/cloud/agent_factory/agents.yaml @@ -0,0 +1,17 @@ +agents: + - name: support_specialist + instruction: | + You are a customer support specialist. Provide empathetic answers, + reference available features, and suggest next steps or workarounds. + When relevant, mention how customers can contact support. + server_names: [fetch] + + - name: product_expert + instruction: | + You are a product expert who knows roadmap milestones and integrations. + Provide concise summaries, highlight differentiators, and cite + integrations or security measures when appropriate. + server_names: [] + +# You can also inline these specs in mcp_agent.config.yaml under agents.definitions; +# this file keeps them separate to showcase loading AgentSpecs from disk via the factory helpers. diff --git a/src/mcp_agent/data/examples/cloud/agent_factory/custom_tasks.py b/src/mcp_agent/data/examples/cloud/agent_factory/custom_tasks.py new file mode 100644 index 000000000..d7fa0d1a3 --- /dev/null +++ b/src/mcp_agent/data/examples/cloud/agent_factory/custom_tasks.py @@ -0,0 +1,88 @@ +"""Custom workflow tasks for the cloud agent factory demo.""" + +from __future__ import annotations + +from typing import Dict, List, Tuple + +from mcp_agent.executor.workflow_task import workflow_task + + +_KNOWLEDGE_BASE: Tuple[Dict[str, str], ...] = ( + { + "topic": "pricing", + "summary": "Current pricing tiers: Free, Pro ($29/mo), Enterprise (custom).", + "faq": ( + "Pro tier includes 3 seats, Enterprise supports SSO and audit logging. " + "Discounts available for annual billing." + ), + }, + { + "topic": "availability", + "summary": "The service offers 99.9% uptime backed by regional failover.", + "faq": ( + "Scheduled maintenance occurs Sundays 02:00-03:00 UTC. " + "Status page: https://status.example.com" + ), + }, + { + "topic": "integrations", + "summary": "Native integrations include Slack, Jira, and Salesforce connectors.", + "faq": ( + "Slack integration supports slash commands. Jira integration syncs tickets " + "bi-directionally every 5 minutes." + ), + }, + { + "topic": "security", + "summary": "SOC 2 Type II certified, data encrypted in transit and at rest.", + "faq": ( + "Role-based access control is available on Pro+. Admins can require MFA. " + "Security whitepaper: https://example.com/security" + ), + }, +) + + +@workflow_task(name="cloud_agent_factory.knowledge_base_lookup") +async def knowledge_base_lookup_task(request: dict) -> List[str]: + """ + Return the most relevant knowledge-base snippets for a customer query. + + The knowledge base is embedded in the code so the example works identically + in local and hosted environments. + """ + + query = str(request.get("query", "")).lower() + limit = max(1, int(request.get("limit", 3))) + + if not query.strip(): + return [] + + ranked = sorted( + _KNOWLEDGE_BASE, + key=lambda entry: _score(query, entry), + reverse=True, + ) + top_entries = ranked[:limit] + + formatted: List[str] = [] + for entry in top_entries: + formatted.append( + f"*Topic*: {entry['topic']}\nSummary: {entry['summary']}\nFAQ: {entry['faq']}" + ) + return formatted + + +def _score(query: str, entry: Dict[str, str]) -> int: + score = 0 + for token in query.split(): + if len(token) < 3: + continue + token_lower = token.lower() + if token_lower in entry["topic"].lower(): + score += 3 + if token_lower in entry["summary"].lower(): + score += 2 + if token_lower in entry["faq"].lower(): + score += 1 + return score diff --git a/src/mcp_agent/data/examples/cloud/agent_factory/main.py b/src/mcp_agent/data/examples/cloud/agent_factory/main.py new file mode 100644 index 000000000..554867d93 --- /dev/null +++ b/src/mcp_agent/data/examples/cloud/agent_factory/main.py @@ -0,0 +1,70 @@ +"""Temporal cloud agent factory example with custom workflow tasks.""" + +from __future__ import annotations + +import asyncio +from pathlib import Path + +from mcp_agent.core.context import Context + +from mcp_agent.app import MCPApp +from mcp_agent.workflows.factory import ( + create_router_llm, + load_agent_specs_from_file, +) + +try: + from .custom_tasks import knowledge_base_lookup_task +except ImportError: # pragma: no cover - executed when run as a script + from custom_tasks import knowledge_base_lookup_task + +app = MCPApp( + name="cloud_agent_factory", + description="Temporal agent factory demo that uses custom workflow tasks", +) + + +@app.async_tool() +async def route_customer_request( + prompt: str = "A customer is asking about our pricing and security posture.", + context_hits: int = 3, + app_ctx: Context | None = None, +) -> str: + """Route customer-facing questions and seed the LLM with KB context.""" + context = app_ctx or app.context + + kb_snippets = await context.executor.execute( + knowledge_base_lookup_task, + {"query": prompt, "limit": context_hits}, + ) + if isinstance(kb_snippets, BaseException): + raise kb_snippets + + kb_context = "\n\n".join(kb_snippets) if kb_snippets else "No knowledge-base hits." + agents_path = Path(__file__).resolve().parent / "agents.yaml" + specs = load_agent_specs_from_file(str(agents_path), context=context) + + router = await create_router_llm( + server_names=["filesystem", "fetch"], + agents=specs, + provider="openai", + context=context, + ) + + enriched_prompt = ( + "You are triaging a customer request.\n" + f"Customer question:\n{prompt}\n\n" + f"Knowledge-base snippets:\n{kb_context}\n\n" + "Compose a helpful, empathetic reply that references the most relevant details." + ) + return await router.generate_str(enriched_prompt) + + +async def main(): + async with app.run() as agent_app: + result = await route_customer_request(app_ctx=agent_app.context) + print("Routing result:", result) + + +if __name__ == "__main__": + asyncio.run(main()) diff --git a/src/mcp_agent/data/examples/cloud/agent_factory/mcp_agent.config.yaml b/src/mcp_agent/data/examples/cloud/agent_factory/mcp_agent.config.yaml new file mode 100644 index 000000000..bc95758c8 --- /dev/null +++ b/src/mcp_agent/data/examples/cloud/agent_factory/mcp_agent.config.yaml @@ -0,0 +1,35 @@ +# Temporal configuration for the cloud agent factory demo +$schema: ../../schema/mcp-agent.config.schema.json + +execution_engine: temporal + +workflow_task_modules: + - custom_tasks # module path relative to the example package + +workflow_task_retry_policies: + cloud_agent_factory.knowledge_base_lookup: + maximum_attempts: 1 + +temporal: + host: "localhost:7233" + namespace: "default" + task_queue: "mcp-agent" + max_concurrent_activities: 10 + +logger: + transports: [console] + level: info + +mcp: + servers: + fetch: + command: "uvx" + args: ["mcp-server-fetch"] + description: "Fetch content from the web" + filesystem: + command: "npx" + args: ["-y", "@modelcontextprotocol/server-filesystem", "."] + description: "Read local files" + +openai: + default_model: gpt-4o-mini diff --git a/src/mcp_agent/data/examples/cloud/agent_factory/mcp_agent.secrets.yaml.example b/src/mcp_agent/data/examples/cloud/agent_factory/mcp_agent.secrets.yaml.example new file mode 100644 index 000000000..c8892e61e --- /dev/null +++ b/src/mcp_agent/data/examples/cloud/agent_factory/mcp_agent.secrets.yaml.example @@ -0,0 +1,2 @@ +openai: + api_key: "your-openai-api-key" diff --git a/src/mcp_agent/data/examples/cloud/agent_factory/requirements.txt b/src/mcp_agent/data/examples/cloud/agent_factory/requirements.txt new file mode 100644 index 000000000..33519dd75 --- /dev/null +++ b/src/mcp_agent/data/examples/cloud/agent_factory/requirements.txt @@ -0,0 +1,6 @@ +# Core framework dependency +mcp-agent @ file://../../../ + +# LLM providers used in this demo +openai +anthropic diff --git a/src/mcp_agent/data/examples/cloud/agent_factory/run_worker.py b/src/mcp_agent/data/examples/cloud/agent_factory/run_worker.py new file mode 100644 index 000000000..3cec15b40 --- /dev/null +++ b/src/mcp_agent/data/examples/cloud/agent_factory/run_worker.py @@ -0,0 +1,21 @@ +"""Temporal worker for the cloud agent factory example.""" + +import asyncio +import logging + +from mcp_agent.executor.temporal import create_temporal_worker_for_app + +from main import app + +logging.basicConfig(level=logging.INFO) +logger = logging.getLogger(__name__) + + +async def main(): + logger.info("Starting Temporal worker for cloud agent factory demo") + async with create_temporal_worker_for_app(app) as worker: + await worker.run() + + +if __name__ == "__main__": + asyncio.run(main()) diff --git a/src/mcp_agent/data/examples/cloud/mcp/README.md b/src/mcp_agent/data/examples/cloud/mcp/README.md index ece7e90b2..afb5ba5d4 100644 --- a/src/mcp_agent/data/examples/cloud/mcp/README.md +++ b/src/mcp_agent/data/examples/cloud/mcp/README.md @@ -184,3 +184,12 @@ This will launch the MCP Inspector UI where you can: - See all available tools - Test workflow execution - View request/response details + +Make sure Inspector is configured with the following settings: + +| Setting | Value | +| ---------------- | --------------------------------------------------- | +| _Transport Type_ | _SSE_ | +| _SSE_ | _https://[server_id].deployments.mcp-agent.com/sse_ | +| _Header Name_ | _Authorization_ | +| _Bearer Token_ | _your-mcp-agent-cloud-api-token_ | diff --git a/src/mcp_agent/data/examples/cloud/temporal/README.md b/src/mcp_agent/data/examples/cloud/temporal/README.md index 8237f2d5b..441620a11 100644 --- a/src/mcp_agent/data/examples/cloud/temporal/README.md +++ b/src/mcp_agent/data/examples/cloud/temporal/README.md @@ -74,6 +74,8 @@ openai: api_key: "your-openai-api-key" ``` +The bundled `mcp_agent.config.yaml` is configured for the local Temporal dev server. If you add additional `@workflow_task` modules, uncomment the top-level `workflow_task_modules` list in that config and add your module paths so the worker imports them when it boots. + ## Test Locally Before running this example, you need to have a Temporal server running: diff --git a/src/mcp_agent/data/examples/cloud/temporal/mcp_agent.config.yaml b/src/mcp_agent/data/examples/cloud/temporal/mcp_agent.config.yaml index 94ad20581..974b5251a 100644 --- a/src/mcp_agent/data/examples/cloud/temporal/mcp_agent.config.yaml +++ b/src/mcp_agent/data/examples/cloud/temporal/mcp_agent.config.yaml @@ -3,6 +3,15 @@ $schema: https://raw.githubusercontent.com/lastmile-ai/mcp-agent/refs/heads/main # Set the execution engine to Temporal execution_engine: "temporal" +# Optional: preload modules that declare @workflow_task activities +# workflow_task_modules: +# - my_project.custom_tasks + +# Optional: override retry behaviour for specific activities +# workflow_task_retry_policies: +# my_project.custom_tasks.my_activity: +# maximum_attempts: 1 + # Temporal settings temporal: host: "localhost:7233" # Default Temporal server address diff --git a/src/mcp_agent/data/templates/README_basic.md b/src/mcp_agent/data/templates/README_basic.md index 17dc0a6ed..0835b26ec 100644 --- a/src/mcp_agent/data/templates/README_basic.md +++ b/src/mcp_agent/data/templates/README_basic.md @@ -1,4 +1,4 @@ -# MCP-Agent Starter +# mcp-agent Starter Welcome! This project was generated by `mcp-agent init`. It’s a minimal, readable starting point you can run locally or expose as an MCP server. @@ -14,24 +14,15 @@ Welcome! This project was generated by `mcp-agent init`. It’s a minimal, reada - Decorated with `@app.async_tool`: when serving, returns a workflow ID; when run in this script, it awaits and returns the string result. ## Quick start +1. Add your OpenAI API key to `mcp_agent.secrets.yaml` (or set `OPENAI_API_KEY` env var). -1. Add API keys to `mcp_agent.secrets.yaml` (or set env vars): +NOTE: You can use another supported provider (e.g. Anthropic) instead, just be sure to set its API key in the `mcp_agent.secrets.yaml` (or set its env var) and update the provider configuration in `main.py`. -NOTE: You can use another supported provider (e.g. Anthropic) instead, just be sure to set its API key in the `mcp_agent.secrets.yaml` (or set its env var) and import/use the relevant `AugmentedLLM` in `main.py`. - -- `OPENAI_API_KEY` (recommended) -- `ANTHROPIC_API_KEY` (optional) - -2. Review `mcp_agent.config.yaml`: - - - Execution engine: `asyncio` - - Logger settings - - MCP servers: `filesystem`, `fetch` - - `agents.definitions`: sample agents (`filesystem_helper`, `web_helper`) - -3. Run locally: +2. Install dependencies and run locally: ```bash +uv init +uv add "mcp-agent[openai]" uv run main.py ``` @@ -40,7 +31,7 @@ You’ll see two summaries printed: - A summary of `README.md` from your current directory. - A summary of the intro page at modelcontextprotocol.io. -4. Run locally as an MCP server: +3. Run locally as an MCP server: - In `main.py`, UNCOMMENT the server lines that call `create_mcp_server_for_app(agent_app)` and `run_sse_async()`. @@ -54,7 +45,7 @@ You’ll see two summaries printed: npx @modelcontextprotocol/inspector --transport sse --server-url http://127.0.0.1:8000/sse ``` -5. Deploy a remote MCP server: +4. Deploy a remote MCP server: When you're ready to deploy, ensure the required API keys are set in `mcp_agent.secrets.yaml` and then run: @@ -74,7 +65,7 @@ Please enter your API key 🔑: In your terminal, deploy the MCP app: ```bash -uv run mcp-agent deploy hello_world --no-auth +uv run mcp-agent deploy hello_world ``` The `deploy` command will bundle the app files and deploy them, wrapping your app as a hosted MCP SSE server with a URL of the form: @@ -89,6 +80,15 @@ like any other MCP server. For example, you can inspect and test the server usin npx @modelcontextprotocol/inspector --transport sse --server-url https://.deployments.mcp-agent.com/sse ``` +Make sure Inspector is configured with the following settings: + +| Setting | Value | +| ---------------- | --------------------------------------------------- | +| _Transport Type_ | _SSE_ | +| _SSE_ | _https://[server_id].deployments.mcp-agent.com/sse_ | +| _Header Name_ | _Authorization_ | +| _Bearer Token_ | _your-mcp-agent-cloud-api-token_ | + ## Notes - `app_ctx` is the MCPApp Context (configuration, logger, upstream session, etc.). @@ -105,12 +105,4 @@ npx @modelcontextprotocol/inspector --transport sse --server-url https://.deployments.mcp-agent.com`. + +Anything decorated with `@app.async_tool` (or `@app.tool`) runs as a Temporal workflow in the cloud. + +Since the mcp-agent app is exposed as an MCP server, it can be used in any MCP client just +like any other MCP server. For example, you can inspect and test the server using MCP Inspector: + +```bash +npx @modelcontextprotocol/inspector --transport sse --server-url https://.deployments.mcp-agent.com/sse +``` + +Make sure Inspector is configured with the following settings: + +| Setting | Value | +| ---------------- | --------------------------------------------------- | +| _Transport Type_ | _SSE_ | +| _SSE_ | _https://[server_id].deployments.mcp-agent.com/sse_ | +| _Header Name_ | _Authorization_ | +| _Bearer Token_ | _your-mcp-agent-cloud-api-token_ | + +## Next steps + +- Tweak the agent definitions in `agents.yaml` to fit your use case. +- Try other factory workflows, such as Orchestrator. +- Add tools with `@app.tool` or `@app.async_tool` as you grow the app. +- Read the docs and explore examples: + - GitHub: https://github.com/lastmile-ai/mcp-agent + - Docs: https://docs.mcp-agent.com/ + - Discord: https://lmai.link/discord/mcp-agent + +Happy building! diff --git a/src/mcp_agent/data/templates/README_server.md b/src/mcp_agent/data/templates/README_server.md index f7b21b154..f22c05cd8 100644 --- a/src/mcp_agent/data/templates/README_server.md +++ b/src/mcp_agent/data/templates/README_server.md @@ -1,4 +1,4 @@ -# MCP-Agent Server Starter +# mcp-agent Server Starter Welcome! This project was generated by `mcp-agent init`. It demonstrates how to expose your mcp-agent application as an MCP server, making your agentic workflows available to any MCP client. @@ -30,16 +30,11 @@ When you run `main.py`, your MCP server exposes: NOTE: You can use another supported provider (e.g. Anthropic) instead, just be sure to set its API key in the `mcp_agent.secrets.yaml` (or set its env var) and import/use the relevant `AugmentedLLM` in `main.py`. -2. Review `mcp_agent.config.yaml`: - - - Execution engine: `asyncio` - - Logger settings - - MCP servers: `filesystem`, `fetch` - -3. Install dependencies and run the server: +2. Install dependencies and run the server: ```bash -uv pip install -r requirements.txt +uv init +uv add "mcp-agent[openai]" uv run main.py ``` @@ -66,7 +61,7 @@ This will launch the inspector UI where you can: - Test workflow execution - View request/response details -5. Deploy as a remote MCP server: +4. Deploy as a remote MCP server: When you're ready to deploy, ensure the required API keys are set in `mcp_agent.secrets.yaml` and then run: @@ -103,6 +98,15 @@ like any other MCP server. For example, you can inspect and test the server usin npx @modelcontextprotocol/inspector --transport sse --server-url https://.deployments.mcp-agent.com/sse ``` +Make sure Inspector is configured with the following settings: + +| Setting | Value | +| ---------------- | --------------------------------------------------- | +| _Transport Type_ | _SSE_ | +| _SSE_ | _https://[server_id].deployments.mcp-agent.com/sse_ | +| _Header Name_ | _Authorization_ | +| _Bearer Token_ | _your-mcp-agent-cloud-api-token_ | + ## Notes - `app_ctx` is the MCPApp Context (configuration, logger, upstream session, etc.). @@ -131,13 +135,4 @@ npx @modelcontextprotocol/inspector --transport sse --server-url https:// str: + """Route a prompt to the appropriate agent using an LLMRouter.""" + context = app_ctx or app.context + + agents_path = Path(__file__).resolve().parent / "agents.yaml" + specs = load_agent_specs_from_file(str(agents_path), context=context) + + router = await create_router_llm( + server_names=["filesystem", "fetch"], + agents=specs, + provider="openai", + context=context, + ) + + response = await router.generate_str(prompt) + return response + async def main(): - async with MCPApp(name="factory_demo").run() as agent_app: - context = agent_app.context - # Add current directory to filesystem server (if needed by your setup) - context.config.mcp.servers["filesystem"].args.extend(["."]) - - agents_path = Path(__file__).resolve().parent / "agents.yaml" - specs = load_agent_specs_from_file(str(agents_path), context=context) - - router = await create_router_llm( - server_names=["filesystem", "fetch"], - agents=specs, - provider="openai", - context=context, + async with app.run() as agent_app: + route_res = await route_prompt( + prompt="Find the README and summarize it", app_ctx=agent_app.context ) - res = await router.generate_str("Find the README and summarize it") - print("Routing result:", res) + print("Routing result:", route_res) if __name__ == "__main__": diff --git a/src/mcp_agent/data/templates/agent_factory_run_worker.py b/src/mcp_agent/data/templates/agent_factory_run_worker.py new file mode 100644 index 000000000..db57f0214 --- /dev/null +++ b/src/mcp_agent/data/templates/agent_factory_run_worker.py @@ -0,0 +1,25 @@ +""" +Temporal worker script for the factory demo. +Run this in a separate terminal when using the Temporal execution engine. +""" + +import asyncio +import logging + +from mcp_agent.executor.temporal import create_temporal_worker_for_app + +from main import app + + +logging.basicConfig(level=logging.INFO) +logger = logging.getLogger(__name__) + + +async def main(): + logger.info("Starting Temporal worker for factory demo") + async with create_temporal_worker_for_app(app) as worker: + await worker.run() + + +if __name__ == "__main__": + asyncio.run(main()) diff --git a/src/mcp_agent/data/templates/agents.yaml b/src/mcp_agent/data/templates/agents.yaml index f505732f1..31599ff04 100644 --- a/src/mcp_agent/data/templates/agents.yaml +++ b/src/mcp_agent/data/templates/agents.yaml @@ -2,8 +2,7 @@ # This file defines multiple specialized agents that can be dynamically selected # File system agent - searches and reads local files -filesystem_agent: - name: filesystem_agent +- name: filesystem_agent instruction: | You are a filesystem expert. Your role is to: - Search for files and directories @@ -15,8 +14,7 @@ filesystem_agent: - filesystem # Web research agent - fetches and analyzes web content -web_agent: - name: web_agent +- name: web_agent instruction: | You are a web research specialist. Your role is to: - Fetch content from URLs @@ -28,8 +26,7 @@ web_agent: - fetch # Code analysis agent - analyzes code structure and quality -code_analyst: - name: code_analyst +- name: code_analyst instruction: | You are a code analysis expert. Your role is to: - Review code for best practices @@ -41,8 +38,7 @@ code_analyst: - filesystem # Documentation agent - generates and maintains documentation -doc_writer: - name: doc_writer +- name: doc_writer instruction: | You are a documentation specialist. Your role is to: - Write clear, concise documentation @@ -54,8 +50,7 @@ doc_writer: - filesystem # General assistant - handles miscellaneous tasks -general_assistant: - name: general_assistant +- name: general_assistant instruction: | You are a helpful general assistant. Your role is to: - Answer questions @@ -63,4 +58,4 @@ general_assistant: - Assist with various tasks - Route complex requests to specialized agents Be helpful, accurate, and concise in your responses. - server_names: [] \ No newline at end of file + server_names: [] diff --git a/src/mcp_agent/data/templates/mcp_agent.config.yaml b/src/mcp_agent/data/templates/mcp_agent.config.yaml index ecf88c520..b04f272e8 100644 --- a/src/mcp_agent/data/templates/mcp_agent.config.yaml +++ b/src/mcp_agent/data/templates/mcp_agent.config.yaml @@ -6,6 +6,15 @@ $schema: https://raw.githubusercontent.com/lastmile-ai/mcp-agent/refs/heads/main # For temporal mode, see: https://github.com/lastmile-ai/mcp-agent/blob/main/examples/temporal/README.md execution_engine: asyncio +# Optional: preload modules that register @workflow_task functions +# workflow_task_modules: +# - my_project.custom_tasks + +# Optional: configure retry policies for workflow tasks / activities +# workflow_task_retry_policies: +# my_project.custom_tasks.my_activity: +# maximum_attempts: 1 + logger: transports: [console, file] level: info diff --git a/src/mcp_agent/executor/errors.py b/src/mcp_agent/executor/errors.py new file mode 100644 index 000000000..2a0634520 --- /dev/null +++ b/src/mcp_agent/executor/errors.py @@ -0,0 +1,106 @@ +"""Shared error helpers for workflow/task execution.""" + +from __future__ import annotations + +try: # Temporal optional dependency + from temporalio.exceptions import ApplicationError as TemporalApplicationError + + _TEMPORAL_AVAILABLE = True +except Exception: # pragma: no cover + _TEMPORAL_AVAILABLE = False + + class TemporalApplicationError(RuntimeError): + """Fallback ApplicationError used when Temporal SDK is not installed.""" + + def __init__( + self, + message: str, + *, + type: str | None = None, + non_retryable: bool = False, + details: object | None = None, + ): + super().__init__(message) + self.type = type + self.non_retryable = non_retryable + self.details = details + + +class WorkflowApplicationError(TemporalApplicationError): + """ApplicationError wrapper compatible with and without Temporal installed.""" + + def __init__( + self, + message: str, + *, + type: str | None = None, + non_retryable: bool = False, + details: object | None = None, + **kwargs: object, + ): + normalized_details = details + if isinstance(normalized_details, tuple): + normalized_details = list(normalized_details) + + self._workflow_details_fallback = normalized_details + + if _TEMPORAL_AVAILABLE: + detail_args: tuple = () + if normalized_details is not None: + if isinstance(normalized_details, list): + detail_args = tuple(normalized_details) + else: + detail_args = (normalized_details,) + + super().__init__( + message, + *detail_args, + type=type, + non_retryable=non_retryable, + **kwargs, + ) + + if not hasattr(self, "non_retryable"): + setattr(self, "non_retryable", non_retryable) + else: + super().__init__( + message, + type=type, + non_retryable=non_retryable, + details=normalized_details, + ) + + @property + def workflow_details(self): + details = getattr(self, "details", None) + if details: + if isinstance(details, tuple): + return list(details) + return details + return self._workflow_details_fallback + + +def to_application_error( + error: BaseException, + *, + message: str | None = None, + type: str | None = None, + non_retryable: bool | None = None, + details: object | None = None, +) -> WorkflowApplicationError: + """Wrap an existing exception as a WorkflowApplicationError.""" + + msg = message or str(error) + err_type = type or getattr(error, "type", None) or error.__class__.__name__ + nr = non_retryable + if nr is None: + nr = bool(getattr(error, "non_retryable", False)) + det = details + if det is None: + det = getattr(error, "details", None) + if isinstance(det, tuple): + det = list(det) + return WorkflowApplicationError(msg, type=err_type, non_retryable=nr, details=det) + + +__all__ = ["WorkflowApplicationError", "to_application_error"] diff --git a/src/mcp_agent/executor/temporal/__init__.py b/src/mcp_agent/executor/temporal/__init__.py index 01889b464..04f9d848f 100644 --- a/src/mcp_agent/executor/temporal/__init__.py +++ b/src/mcp_agent/executor/temporal/__init__.py @@ -6,6 +6,7 @@ """ import asyncio +import importlib from contextlib import asynccontextmanager from datetime import timedelta import functools @@ -27,7 +28,7 @@ from temporalio.client import Client as TemporalClient, WorkflowHandle from temporalio.contrib.opentelemetry import TracingInterceptor from temporalio.contrib.pydantic import pydantic_data_converter -from temporalio.common import WorkflowIDReusePolicy +from temporalio.common import RetryPolicy, WorkflowIDReusePolicy from temporalio.worker import Worker from mcp_agent.config import TemporalSettings @@ -48,6 +49,24 @@ logger = get_logger(__name__) +DEFAULT_TEMPORAL_WORKFLOW_TASK_MODULES: tuple[str, ...] = ( + "mcp_agent.workflows.llm.augmented_llm_openai", + "mcp_agent.workflows.llm.augmented_llm_anthropic", + "mcp_agent.workflows.llm.augmented_llm_azure", + "mcp_agent.workflows.llm.augmented_llm_bedrock", + "mcp_agent.workflows.llm.augmented_llm_google", + "mcp_agent.workflows.llm.augmented_llm_ollama", +) + +MODULE_OPTIONAL_EXTRAS: dict[str, str] = { + "mcp_agent.workflows.llm.augmented_llm_openai": "openai", + "mcp_agent.workflows.llm.augmented_llm_anthropic": "anthropic", + "mcp_agent.workflows.llm.augmented_llm_azure": "azure", + "mcp_agent.workflows.llm.augmented_llm_bedrock": "bedrock", + "mcp_agent.workflows.llm.augmented_llm_google": "google", + "mcp_agent.workflows.llm.augmented_llm_ollama": "ollama", +} + class TemporalExecutorConfig(ExecutorConfig, TemporalSettings): """Configuration for Temporal executors.""" @@ -191,6 +210,15 @@ async def _execute_task( schedule_to_close = timedelta(seconds=schedule_to_close) retry_policy = execution_metadata.get("retry_policy", None) + if isinstance(retry_policy, dict): + try: + retry_policy = RetryPolicy(**retry_policy) + except TypeError as exc: + logger.warning( + "Invalid retry policy configuration; falling back to default", + data={"activity": activity_name, "error": str(exc)}, + ) + retry_policy = None try: # Temporal's execute_activity accepts at most one positional arg; @@ -494,6 +522,60 @@ def random(self) -> "Random": return super().random() +def _preload_workflow_task_modules(app: "MCPApp") -> None: + """ + Import modules that define @workflow_task activities so they register with the app + before we hand the activity list to the Temporal worker. + """ + + module_names = set(DEFAULT_TEMPORAL_WORKFLOW_TASK_MODULES) + + try: + global_modules = getattr( + getattr(app.context, "config", None), "workflow_task_modules", None + ) + if global_modules: + module_names.update(module for module in global_modules if module) + except Exception: + pass + + try: + temporal_settings = getattr( + getattr(app.context, "config", None), "temporal", None + ) + if temporal_settings and getattr( + temporal_settings, "workflow_task_modules", None + ): + module_names.update( + module for module in temporal_settings.workflow_task_modules if module + ) + except Exception: + # Best-effort only + pass + + for module_name in sorted(module_names): + try: + importlib.import_module(module_name) + except ModuleNotFoundError as exc: + missing_dep = exc.name or module_name + extra_hint = MODULE_OPTIONAL_EXTRAS.get(module_name) + logger.warning( + "Workflow task module import skipped; install optional dependency", + data={ + "module": module_name, + "missing_dependency": missing_dep, + "install_hint": f'pip install "mcp-agent[{extra_hint}]"' + if extra_hint + else "Install the matching optional extras for your provider", + }, + ) + except Exception as exc: + logger.warning( + "Failed to import workflow task module", + data={"module": module_name, "error": str(exc)}, + ) + + @asynccontextmanager async def create_temporal_worker_for_app(app: "MCPApp"): """ @@ -507,6 +589,7 @@ async def create_temporal_worker_for_app(app: "MCPApp"): raise ValueError("App executor is not a TemporalExecutor.") await running_app.executor.ensure_client() + _preload_workflow_task_modules(running_app) from mcp_agent.agents.agent import AgentTasks @@ -531,6 +614,9 @@ async def create_temporal_worker_for_app(app: "MCPApp"): app.workflow_task(name="mcp_relay_notify")(system_activities.relay_notify) app.workflow_task(name="mcp_relay_request")(system_activities.relay_request) + # Ensure any newly-imported @workflow_task functions are attached to the app + running_app._register_global_workflow_tasks() + for name in activity_registry.list_activities(): activities.append(activity_registry.get_activity(name)) diff --git a/src/mcp_agent/workflows/llm/augmented_llm_anthropic.py b/src/mcp_agent/workflows/llm/augmented_llm_anthropic.py index a2f31e72c..345b6fa9f 100644 --- a/src/mcp_agent/workflows/llm/augmented_llm_anthropic.py +++ b/src/mcp_agent/workflows/llm/augmented_llm_anthropic.py @@ -4,7 +4,17 @@ from pydantic import BaseModel -from anthropic import Anthropic, AnthropicBedrock, AnthropicVertex, AsyncAnthropic +from anthropic import ( + Anthropic, + AnthropicBedrock, + AnthropicVertex, + AsyncAnthropic, + AuthenticationError, + BadRequestError, + NotFoundError, + PermissionDeniedError, + UnprocessableEntityError, +) from anthropic.types import ( ContentBlock, DocumentBlockParam, @@ -38,6 +48,7 @@ # from mcp_agent.agents.agent import HUMAN_INPUT_TOOL_NAME from mcp_agent.config import AnthropicSettings from mcp_agent.executor.workflow_task import workflow_task +from mcp_agent.executor.errors import to_application_error from mcp_agent.tracing.semconv import ( GEN_AI_AGENT_NAME, GEN_AI_REQUEST_MODEL, @@ -61,6 +72,14 @@ from mcp_agent.logging.logger import get_logger from mcp_agent.workflows.llm.multipart_converter_anthropic import AnthropicConverter +_NON_RETRYABLE_ANTHROPIC_ERRORS = ( + AuthenticationError, + PermissionDeniedError, + BadRequestError, + NotFoundError, + UnprocessableEntityError, +) + MessageParamContent = Union[ str, Iterable[ @@ -102,6 +121,13 @@ def create_anthropic_instance(settings: AnthropicSettings): return anthropic +async def _execute_anthropic_async(client: AsyncAnthropic, payload: dict) -> Message: + try: + return await client.messages.create(**payload) + except _NON_RETRYABLE_ANTHROPIC_ERRORS as exc: + raise to_application_error(exc, non_retryable=True) from exc + + class AnthropicAugmentedLLM(AugmentedLLM[MessageParam, Message]): """ The basic building block of agentic systems is an LLM enhanced with augmentations @@ -769,7 +795,7 @@ def _annotate_span_for_completion_response( class AnthropicCompletionTasks: @staticmethod - @workflow_task + @workflow_task(retry_policy={"maximum_attempts": 3}) @telemetry.traced() async def request_completion_task( request: RequestCompletionRequest, @@ -777,22 +803,23 @@ async def request_completion_task( """ Request a completion from Anthropic's API. """ - # Prefer async client where available to avoid blocking the event loop + payload = request.payload + if request.config.provider in (None, "", "anthropic"): client = AsyncAnthropic(api_key=request.config.api_key) - payload = request.payload - response = await client.messages.create(**payload) - response = ensure_serializable(response) - return response + response = await _execute_anthropic_async(client, payload) else: anthropic = create_anthropic_instance(request.config) - payload = request.payload loop = asyncio.get_running_loop() - response = await loop.run_in_executor( - None, functools.partial(anthropic.messages.create, **payload) - ) - response = ensure_serializable(response) - return response + try: + response = await loop.run_in_executor( + None, functools.partial(anthropic.messages.create, **payload) + ) + except _NON_RETRYABLE_ANTHROPIC_ERRORS as exc: + raise to_application_error(exc, non_retryable=True) from exc + + response = ensure_serializable(response) + return response class AnthropicMCPTypeConverter(ProviderToMCPConverter[MessageParam, Message]): diff --git a/src/mcp_agent/workflows/llm/augmented_llm_azure.py b/src/mcp_agent/workflows/llm/augmented_llm_azure.py index a6d27062d..a5d894267 100644 --- a/src/mcp_agent/workflows/llm/augmented_llm_azure.py +++ b/src/mcp_agent/workflows/llm/augmented_llm_azure.py @@ -30,7 +30,14 @@ from pydantic import BaseModel -from openai import AsyncAzureOpenAI +from openai import ( + AsyncAzureOpenAI, + AuthenticationError as AzureOpenAIAuthenticationError, + BadRequestError as AzureOpenAIBadRequestError, + NotFoundError as AzureOpenAINotFoundError, + PermissionDeniedError as AzureOpenAIPermissionDeniedError, + UnprocessableEntityError as AzureOpenAIUnprocessableEntityError, +) from openai.types.chat import ChatCompletion from mcp.types import ( @@ -65,6 +72,17 @@ ) from mcp_agent.logging.logger import get_logger from mcp_agent.workflows.llm.multipart_converter_azure import AzureConverter +from mcp_agent.executor.errors import to_application_error + +_NON_RETRYABLE_AZURE_STATUS_CODES = {400, 401, 403, 404, 422} + +_NON_RETRYABLE_AZURE_OPENAI_ERRORS = ( + AzureOpenAIAuthenticationError, + AzureOpenAIPermissionDeniedError, + AzureOpenAIBadRequestError, + AzureOpenAINotFoundError, + AzureOpenAIUnprocessableEntityError, +) MessageParam = Union[ SystemMessage, UserMessage, AssistantMessage, ToolMessage, DeveloperMessage @@ -538,9 +556,22 @@ def extract_response_message_attributes_for_tracing( return attrs +def _raise_non_retryable_azure( + error: Exception, status_code: int | None = None +) -> None: + message = str(error) + if status_code is not None: + message = f"{status_code}: {message}" + raise to_application_error( + error, + message=message, + non_retryable=True, + ) from error + + class AzureCompletionTasks: @staticmethod - @workflow_task + @workflow_task(retry_policy={"maximum_attempts": 3}) async def request_completion_task( request: RequestCompletionRequest, ) -> ChatCompletions: @@ -573,33 +604,33 @@ async def request_completion_task( except HttpResponseError as e: logger = get_logger(__name__) - if e.status_code != 400: - logger.error(f"Azure API call failed: {e}") - raise - - logger.warning( - f"Initial Azure API call failed: {e}. Retrying with fallback parameters." - ) - - # Create a new payload with fallback values for commonly problematic parameters - fallback_payload = {**payload, "max_tokens": None, "temperature": 1} - - try: - response = await loop.run_in_executor( - None, functools.partial(azure_client.complete, **fallback_payload) + if e.status_code == 400: + logger.warning( + "Initial Azure API call failed with status 400; retrying with fallback parameters." ) - except Exception as retry_error: - # If retry also fails, raise a more informative error - raise RuntimeError( - f"Azure API call failed even with fallback parameters. " - f"Original error: {e}. Retry error: {retry_error}" - ) from retry_error + fallback_payload = {**payload, "max_tokens": None, "temperature": 1} + try: + response = await loop.run_in_executor( + None, + functools.partial(azure_client.complete, **fallback_payload), + ) + except HttpResponseError as retry_error: + if retry_error.status_code in _NON_RETRYABLE_AZURE_STATUS_CODES: + _raise_non_retryable_azure(retry_error, retry_error.status_code) + raise + except Exception as retry_error: + _raise_non_retryable_azure(retry_error) + elif e.status_code in _NON_RETRYABLE_AZURE_STATUS_CODES: + _raise_non_retryable_azure(e, e.status_code) + else: + logger.error("Azure API call failed: %s", e) + raise return response class AzureOpenAICompletionTasks: @staticmethod - @workflow_task + @workflow_task(retry_policy={"maximum_attempts": 3}) async def request_completion_task( request: RequestCompletionRequest, ) -> ChatCompletion: @@ -655,8 +686,11 @@ def _openai_reasoning(model: str): # otherwise use the model name as deployment name deployment = request.config.azure_deployment or payload.get("model") payload["model"] = deployment + try: + response = await client.chat.completions.create(**payload) + except _NON_RETRYABLE_AZURE_OPENAI_ERRORS as exc: + _raise_non_retryable_azure(exc) - response = await client.chat.completions.create(**payload) return response diff --git a/src/mcp_agent/workflows/llm/augmented_llm_google.py b/src/mcp_agent/workflows/llm/augmented_llm_google.py index cd238d495..e86c1bfc0 100644 --- a/src/mcp_agent/workflows/llm/augmented_llm_google.py +++ b/src/mcp_agent/workflows/llm/augmented_llm_google.py @@ -5,6 +5,13 @@ from google.genai import Client from google.genai import types +from mcp_agent.executor.errors import to_application_error + +try: + from google.api_core import exceptions as google_exceptions +except Exception: # pragma: no cover + google_exceptions = None + from mcp.types import ( CallToolRequestParams, CallToolRequest, @@ -32,6 +39,17 @@ from mcp_agent.workflows.llm.multipart_converter_google import GoogleConverter from mcp_agent.tracing.token_tracking_decorator import track_tokens +if google_exceptions: + _NON_RETRYABLE_GOOGLE_ERRORS = ( + google_exceptions.InvalidArgument, + google_exceptions.FailedPrecondition, + google_exceptions.PermissionDenied, + google_exceptions.NotFound, + google_exceptions.Unauthenticated, + ) +else: # pragma: no cover + _NON_RETRYABLE_GOOGLE_ERRORS = tuple() + class GoogleAugmentedLLM( AugmentedLLM[ @@ -370,7 +388,7 @@ class RequestStructuredCompletionRequest(BaseModel): class GoogleCompletionTasks: @staticmethod - @workflow_task + @workflow_task(retry_policy={"maximum_attempts": 3}) async def request_completion_task( request: RequestCompletionRequest, ) -> types.GenerateContentResponse: @@ -388,7 +406,12 @@ async def request_completion_task( google_client = Client(api_key=request.config.api_key) payload = request.payload - response = google_client.models.generate_content(**payload) + + try: + response = google_client.models.generate_content(**payload) + except _NON_RETRYABLE_GOOGLE_ERRORS as exc: + raise to_application_error(exc, non_retryable=True) from exc + return response @staticmethod diff --git a/src/mcp_agent/workflows/llm/augmented_llm_openai.py b/src/mcp_agent/workflows/llm/augmented_llm_openai.py index e3193c08e..90f7bedba 100644 --- a/src/mcp_agent/workflows/llm/augmented_llm_openai.py +++ b/src/mcp_agent/workflows/llm/augmented_llm_openai.py @@ -6,7 +6,14 @@ from pydantic import BaseModel -from openai import AsyncOpenAI +from openai import ( + AsyncOpenAI, + AuthenticationError, + BadRequestError, + NotFoundError, + PermissionDeniedError, + UnprocessableEntityError, +) from openai.types.chat import ( ChatCompletionAssistantMessageParam, ChatCompletionContentPartParam, @@ -63,6 +70,16 @@ ) from mcp_agent.logging.logger import get_logger from mcp_agent.workflows.llm.multipart_converter_openai import OpenAIConverter +from mcp_agent.executor.errors import to_application_error + + +_NON_RETRYABLE_OPENAI_ERRORS = ( + AuthenticationError, + PermissionDeniedError, + BadRequestError, + NotFoundError, + UnprocessableEntityError, +) class RequestCompletionRequest(BaseModel): @@ -80,6 +97,15 @@ class RequestStructuredCompletionRequest(BaseModel): strict: bool = False +async def _execute_openai_request( + client: AsyncOpenAI, payload: Dict[str, Any] +) -> ChatCompletion: + try: + return await client.chat.completions.create(**payload) + except _NON_RETRYABLE_OPENAI_ERRORS as exc: + raise to_application_error(exc, non_retryable=True) from exc + + class OpenAIAugmentedLLM( AugmentedLLM[ChatCompletionMessageParam, ChatCompletionMessage] ): @@ -928,7 +954,7 @@ def _extract_chat_completion_attributes_for_tracing( class OpenAICompletionTasks: @staticmethod - @workflow_task + @workflow_task(retry_policy={"maximum_attempts": 3}) @telemetry.traced() async def request_completion_task( request: RequestCompletionRequest, @@ -947,12 +973,12 @@ async def request_completion_task( else None, ) as async_openai_client: payload = request.payload - response = await async_openai_client.chat.completions.create(**payload) + response = await _execute_openai_request(async_openai_client, payload) response = ensure_serializable(response) return response @staticmethod - @workflow_task + @workflow_task(retry_policy={"maximum_attempts": 3}) @telemetry.traced() async def request_structured_completion_task( request: RequestStructuredCompletionRequest, @@ -999,7 +1025,7 @@ async def request_structured_completion_task( if request.user: payload["user"] = request.user - completion = await async_openai_client.chat.completions.create(**payload) + completion = await _execute_openai_request(async_openai_client, payload) if not completion.choices or completion.choices[0].message.content is None: raise ValueError("No structured content returned by model") diff --git a/tests/executor/test_errors.py b/tests/executor/test_errors.py new file mode 100644 index 000000000..1b2aa5e5e --- /dev/null +++ b/tests/executor/test_errors.py @@ -0,0 +1,42 @@ +import pytest + +from mcp_agent.executor.errors import WorkflowApplicationError, to_application_error + + +def test_workflow_application_error_attributes(): + err = WorkflowApplicationError("message", type="CustomType", non_retryable=True) + assert isinstance(err, Exception) + assert getattr(err, "type", None) == "CustomType" + assert getattr(err, "non_retryable", None) is True + + +@pytest.mark.parametrize("extra_kw", [{"details": ["foo"]}, {}]) +def test_workflow_application_error_accepts_additional_kwargs(extra_kw): + # Temporal's ApplicationError accepts details; ensure our wrapper tolerates it + err = WorkflowApplicationError("msg", type="T", non_retryable=False, **extra_kw) + msg_attr = getattr(err, "message", None) + if msg_attr is None and err.args: + msg_attr = err.args[0] + assert "msg" in str(err) + if msg_attr is not None: + assert "msg" in str(msg_attr) + assert getattr(err, "type", None) == "T" + if "details" in extra_kw: + details = getattr(err, "workflow_details", None) + assert details == extra_kw["details"] + + +def test_to_application_error_from_exception(): + class CustomError(Exception): + def __init__(self, message): + super().__init__(message) + self.type = "Custom" + self.non_retryable = True + self.details = ["detail"] + + original = CustomError("boom") + converted = to_application_error(original) + assert isinstance(converted, WorkflowApplicationError) + assert converted.type == "Custom" + assert converted.non_retryable is True + assert converted.workflow_details == ["detail"] diff --git a/tests/workflows/test_llm_provider_errors.py b/tests/workflows/test_llm_provider_errors.py new file mode 100644 index 000000000..78963a402 --- /dev/null +++ b/tests/workflows/test_llm_provider_errors.py @@ -0,0 +1,79 @@ +import types + +import pytest + +from mcp_agent.executor.errors import WorkflowApplicationError + + +@pytest.mark.asyncio +async def test_execute_openai_request_non_retryable(monkeypatch): + from mcp_agent.workflows.llm import augmented_llm_openai as mod + + class DummyError(Exception): + pass + + async def create(**kwargs): + raise DummyError("boom") + + dummy_client = types.SimpleNamespace( + chat=types.SimpleNamespace(completions=types.SimpleNamespace(create=create)) + ) + + monkeypatch.setattr(mod, "_NON_RETRYABLE_OPENAI_ERRORS", (DummyError,)) + + with pytest.raises(WorkflowApplicationError) as excinfo: + await mod._execute_openai_request(dummy_client, {"foo": "bar"}) + + err = excinfo.value + assert err.non_retryable is True + assert err.type == "DummyError" + + +@pytest.mark.asyncio +async def test_execute_openai_request_propagates_rate_limit(monkeypatch): + from mcp_agent.workflows.llm import augmented_llm_openai as mod + + class DummyRateLimitError(Exception): + pass + + monkeypatch.setattr(mod, "RateLimitError", DummyRateLimitError, raising=False) + + async def create(**kwargs): + raise mod.RateLimitError("slow down") + + dummy_client = types.SimpleNamespace( + chat=types.SimpleNamespace(completions=types.SimpleNamespace(create=create)) + ) + + with pytest.raises(mod.RateLimitError): + await mod._execute_openai_request(dummy_client, {}) + + +def test_raise_non_retryable_azure(): + from mcp_agent.workflows.llm import augmented_llm_azure as mod + + with pytest.raises(WorkflowApplicationError) as excinfo: + mod._raise_non_retryable_azure(ValueError("bad"), status_code=400) + + err = excinfo.value + assert err.non_retryable is True + assert err.type == "ValueError" + assert "400" in str(err) + + +@pytest.mark.asyncio +async def test_execute_anthropic_async_non_retryable(monkeypatch): + from mcp_agent.workflows.llm import augmented_llm_anthropic as mod + + class DummyError(Exception): + pass + + async def create(**kwargs): + raise DummyError("bad") + + dummy_client = types.SimpleNamespace(messages=types.SimpleNamespace(create=create)) + + monkeypatch.setattr(mod, "_NON_RETRYABLE_ANTHROPIC_ERRORS", (DummyError,)) + + with pytest.raises(WorkflowApplicationError): + await mod._execute_anthropic_async(dummy_client, {})