-
Notifications
You must be signed in to change notification settings - Fork 781
Get agent factory to work with Temporal, and also allow configurable activity policies #595
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Merged
Merged
Changes from all commits
Commits
Show all changes
2 commits
Select commit
Hold shift + click to select a range
File filter
Filter by extension
Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
There are no files selected for viewing
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,44 @@ | ||
| import asyncio | ||
| from pathlib import Path | ||
|
|
||
| from mcp_agent.core.context import Context | ||
|
|
||
| from mcp_agent.app import MCPApp | ||
| from mcp_agent.workflows.factory import ( | ||
| create_router_llm, | ||
| load_agent_specs_from_file, | ||
| ) | ||
|
|
||
| app = MCPApp(name="factory_demo", description="Demo of agent factory with LLM routing") | ||
|
|
||
|
|
||
| @app.async_tool() | ||
| async def route_prompt( | ||
| prompt: str = "Find the README and summarize it", app_ctx: Context | None = None | ||
| ) -> str: | ||
| """Route a prompt to the appropriate agent using an LLMRouter.""" | ||
| context = app_ctx or app.context | ||
|
|
||
| agents_path = Path(__file__).resolve().parent / "agents.yaml" | ||
| specs = load_agent_specs_from_file(str(agents_path), context=context) | ||
|
|
||
| router = await create_router_llm( | ||
| server_names=["filesystem", "fetch"], | ||
| agents=specs, | ||
| provider="openai", | ||
| context=context, | ||
| ) | ||
|
|
||
| return await router.generate_str(prompt) | ||
|
|
||
|
|
||
| async def main(): | ||
| async with app.run() as agent_app: | ||
| result = await route_prompt( | ||
| prompt="Find the README and summarize it", app_ctx=agent_app.context | ||
| ) | ||
| print("Routing result:", result) | ||
|
|
||
|
|
||
| if __name__ == "__main__": | ||
| asyncio.run(main()) |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,22 @@ | ||
| """Run a Temporal worker for the agent factory demo.""" | ||
|
|
||
| import asyncio | ||
| import logging | ||
|
|
||
| from mcp_agent.executor.temporal import create_temporal_worker_for_app | ||
|
|
||
| from main import app | ||
|
|
||
|
|
||
| logging.basicConfig(level=logging.INFO) | ||
| logger = logging.getLogger(__name__) | ||
|
|
||
|
|
||
| async def main(): | ||
| logger.info("Starting Temporal worker for agent factory demo") | ||
| async with create_temporal_worker_for_app(app) as worker: | ||
| await worker.run() | ||
|
|
||
|
|
||
| if __name__ == "__main__": | ||
| asyncio.run(main()) |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,69 @@ | ||
| # Cloud Agent Factory (Temporal + Custom Workflow Tasks) | ||
|
|
||
| This example routes customer-facing questions to specialized agents, augments | ||
| responses with in-code knowledge-base snippets, and shows how to preload custom | ||
| `@workflow_task` modules via `workflow_task_modules`. | ||
|
|
||
| ## What's included | ||
|
|
||
| - `main.py` – exposes an `@app.async_tool` (`route_customer_request`) that looks up | ||
| knowledge-base context via a workflow task and then routes the enriched | ||
| question through an LLMRouter. | ||
| - `custom_tasks.py` – defines `knowledge_base_lookup_task` using the | ||
| `@workflow_task` decorator. The task provides deterministic answers drawn from | ||
| an embedded support knowledge base. | ||
| - `agents.yaml` – two sample agents (`support_specialist`, `product_expert`) that | ||
| the router can delegate to. | ||
| - `run_worker.py` – Temporal worker entry point. | ||
| - `mcp_agent.config.yaml` – configures Temporal, lists | ||
| `workflow_task_modules: [custom_tasks]` so the worker imports the module before | ||
| polling, and sets `workflow_task_retry_policies` to limit retries for the custom | ||
| activity. Entries should be importable module paths (here `custom_tasks` lives | ||
| alongside `main.py`, so we reference it by module name). | ||
|
|
||
| ## Quick start | ||
|
|
||
| 1. Install dependencies and add secrets: | ||
| ```bash | ||
| cd examples/cloud/agent_factory | ||
| cp mcp_agent.secrets.yaml.example mcp_agent.secrets.yaml # add OPENAI_API_KEY | ||
| uv pip install -r requirements.txt | ||
| ``` | ||
|
|
||
| 2. Start Temporal elsewhere: | ||
| ```bash | ||
| temporal server start-dev | ||
| ``` | ||
|
|
||
| 3. Launch the worker: | ||
| ```bash | ||
| uv run run_worker.py | ||
| ``` | ||
|
|
||
| 4. In another terminal, run the app: | ||
| ```bash | ||
| uv run main.py | ||
| ``` | ||
| The tool will fetch knowledge-base context via the workflow task (executed as | ||
| a Temporal activity) and produce a routed response. | ||
|
|
||
| 5. Optional: connect an MCP client while `main.py` is running: | ||
| ```bash | ||
| npx @modelcontextprotocol/inspector --transport sse --server-url http://127.0.0.1:8000/sse | ||
| ``` | ||
|
|
||
| ## How it works | ||
|
|
||
| 1. `workflow_task_modules` ensures `custom_tasks.py` is imported during worker | ||
| startup, registering `knowledge_base_lookup_task` with the app. | ||
| 2. `route_customer_request` runs as a Temporal workflow (courtesy of | ||
| `@app.async_tool`). Inside the workflow we call | ||
| `context.executor.execute(knowledge_base_lookup_task, {...})`; this schedules | ||
| the task as an activity, returning curated snippets. | ||
| 3. The prompt is enriched with those snippets and routed through the factory | ||
| helper (`create_router_llm`) to select the best agent and compose the final | ||
| reply. | ||
|
|
||
| You can expand the example by adding more entries to the knowledge base or by | ||
| introducing additional workflow tasks. Simply place them in `custom_tasks.py` | ||
| and keep the module listed in `workflow_task_modules`. |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,18 @@ | ||
| agents: | ||
| - name: support_specialist | ||
| instruction: | | ||
| You are a customer support specialist. Provide empathetic answers, | ||
| reference available features, and suggest next steps or workarounds. | ||
| When relevant, mention how customers can contact support. | ||
| server_names: [fetch] | ||
|
|
||
| - name: product_expert | ||
| instruction: | | ||
| You are a product expert who knows roadmap milestones and integrations. | ||
| Provide concise summaries, highlight differentiators, and cite | ||
| integrations or security measures when appropriate. | ||
| server_names: [] | ||
|
|
||
| # Note: you could alternatively inline these AgentSpec definitions under | ||
| # `agents.definitions` in `mcp_agent.config.yaml`. We keep them in a separate | ||
| # YAML file here to highlight loading specs via the factory helpers. |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| Original file line number | Diff line number | Diff line change | ||||
|---|---|---|---|---|---|---|
| @@ -0,0 +1,88 @@ | ||||||
| """Custom workflow tasks for the cloud agent factory demo.""" | ||||||
|
|
||||||
| from __future__ import annotations | ||||||
|
|
||||||
| from typing import Dict, List, Tuple | ||||||
|
|
||||||
| from mcp_agent.executor.workflow_task import workflow_task | ||||||
|
|
||||||
|
|
||||||
| _KNOWLEDGE_BASE: Tuple[Dict[str, str], ...] = ( | ||||||
| { | ||||||
| "topic": "pricing", | ||||||
| "summary": "Current pricing tiers: Free, Pro ($29/mo), Enterprise (custom).", | ||||||
| "faq": ( | ||||||
| "Pro tier includes 3 seats, Enterprise supports SSO and audit logging. " | ||||||
| "Discounts available for annual billing." | ||||||
| ), | ||||||
| }, | ||||||
| { | ||||||
| "topic": "availability", | ||||||
| "summary": "The service offers 99.9% uptime backed by regional failover.", | ||||||
| "faq": ( | ||||||
| "Scheduled maintenance occurs Sundays 02:00-03:00 UTC. " | ||||||
| "Status page: https://status.example.com" | ||||||
| ), | ||||||
| }, | ||||||
| { | ||||||
| "topic": "integrations", | ||||||
| "summary": "Native integrations include Slack, Jira, and Salesforce connectors.", | ||||||
| "faq": ( | ||||||
| "Slack integration supports slash commands. Jira integration syncs tickets " | ||||||
| "bi-directionally every 5 minutes." | ||||||
| ), | ||||||
| }, | ||||||
| { | ||||||
| "topic": "security", | ||||||
| "summary": "SOC 2 Type II certified, data encrypted in transit and at rest.", | ||||||
| "faq": ( | ||||||
| "Role-based access control is available on Pro+. Admins can require MFA. " | ||||||
| "Security whitepaper: https://example.com/security" | ||||||
| ), | ||||||
| }, | ||||||
| ) | ||||||
|
|
||||||
|
|
||||||
| @workflow_task(name="cloud_agent_factory.knowledge_base_lookup") | ||||||
|
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more.
Suggested change
Should this add the _task suffix to match the locations referencing it? |
||||||
| async def knowledge_base_lookup_task(request: dict) -> List[str]: | ||||||
| """ | ||||||
| Return the most relevant knowledge-base snippets for a customer query. | ||||||
| The knowledge base is embedded in the code so the example works identically | ||||||
| in local and hosted environments. | ||||||
| """ | ||||||
|
|
||||||
| query = str(request.get("query", "")).lower() | ||||||
| limit = max(1, int(request.get("limit", 3))) | ||||||
|
|
||||||
| if not query.strip(): | ||||||
| return [] | ||||||
|
|
||||||
| ranked = sorted( | ||||||
| _KNOWLEDGE_BASE, | ||||||
| key=lambda entry: _score(query, entry), | ||||||
| reverse=True, | ||||||
| ) | ||||||
| top_entries = ranked[:limit] | ||||||
|
|
||||||
| formatted: List[str] = [] | ||||||
| for entry in top_entries: | ||||||
| formatted.append( | ||||||
| f"*Topic*: {entry['topic']}\nSummary: {entry['summary']}\nFAQ: {entry['faq']}" | ||||||
| ) | ||||||
| return formatted | ||||||
|
|
||||||
|
|
||||||
| def _score(query: str, entry: Dict[str, str]) -> int: | ||||||
| score = 0 | ||||||
| for token in query.split(): | ||||||
| if len(token) < 3: | ||||||
| continue | ||||||
| token_lower = token.lower() | ||||||
| if token_lower in entry["topic"].lower(): | ||||||
| score += 3 | ||||||
| if token_lower in entry["summary"].lower(): | ||||||
| score += 2 | ||||||
| if token_lower in entry["faq"].lower(): | ||||||
| score += 1 | ||||||
| return score | ||||||
Oops, something went wrong.
Oops, something went wrong.
Add this suggestion to a batch that can be applied as a single commit.
This suggestion is invalid because no changes were made to the code.
Suggestions cannot be applied while the pull request is closed.
Suggestions cannot be applied while viewing a subset of changes.
Only one suggestion per line can be applied in a batch.
Add this suggestion to a batch that can be applied as a single commit.
Applying suggestions on deleted lines is not supported.
You must change the existing code in this line in order to create a valid suggestion.
Outdated suggestions cannot be applied.
This suggestion has been applied or marked resolved.
Suggestions cannot be applied from pending reviews.
Suggestions cannot be applied on multi-line comments.
Suggestions cannot be applied while the pull request is queued to merge.
Suggestion cannot be applied right now. Please check back later.
Uh oh!
There was an error while loading. Please reload this page.