diff --git a/examples/mcp_agent_server/README.md b/examples/mcp_agent_server/README.md index 23ba52f4c..12f5b88b4 100644 --- a/examples/mcp_agent_server/README.md +++ b/examples/mcp_agent_server/README.md @@ -27,6 +27,7 @@ This directory includes two implementations of the MCP Agent Server pattern: ### [Asyncio](./asyncio) The asyncio implementation provides: + - In-memory execution with minimal setup - Simple deployment with no external dependencies - Fast startup and execution @@ -35,6 +36,7 @@ The asyncio implementation provides: ### [Temporal](./temporal) The Temporal implementation provides: + - Durable execution of workflows using Temporal as the orchestration engine - Pause/resume capabilities via Temporal signals - Automatic retry and recovery from failures @@ -50,21 +52,49 @@ Each implementation demonstrates: ## Key MCP Agent Server Advantages -| Capability | Description | -|------------|-------------| -| **Protocol Standardization** | Agents communicate via standardized MCP protocol, ensuring interoperability | -| **Workflow Encapsulation** | Complex agent workflows are exposed as simple MCP tools | -| **Execution Flexibility** | Choose between in-memory (asyncio) or durable (Temporal) execution | -| **Client Independence** | Connect from any MCP client: Claude, VSCode, Cursor, MCP Inspector, or custom apps | -| **Multi-Agent Ecosystems** | Build systems where multiple agents can interact and collaborate | +| Capability | Description | +| ---------------------------- | ---------------------------------------------------------------------------------- | +| **Protocol Standardization** | Agents communicate via standardized MCP protocol, ensuring interoperability | +| **Workflow Encapsulation** | Complex agent workflows are exposed as simple MCP tools | +| **Execution Flexibility** | Choose between in-memory (asyncio) or durable (Temporal) execution | +| **Client Independence** | Connect from any MCP client: Claude, VSCode, Cursor, MCP Inspector, or custom apps | +| **Multi-Agent Ecosystems** | Build systems where multiple agents can interact and collaborate | ## Getting Started -Each implementation directory contains its own README with detailed instructions: +Each implementation directory contains its own README with detailed instructions. Prefer the decorator-based tool definition (`@app.tool` / `@app.async_tool`) for the simplest developer experience: - [Asyncio Implementation](./asyncio/README.md) - [Temporal Implementation](./temporal/README.md) +### Preferred: Declare tools with decorators + +Instead of only defining workflow classes, you can expose tools directly from functions: + +```python +from mcp_agent.app import MCPApp + +app = MCPApp(name="my_agent_server") + +@app.tool +async def do_something(arg: str) -> str: + """Do something synchronously and return the final result.""" + return "done" + +@app.async_tool(name="do_something_async") +async def do_something_async(arg: str) -> str: + """ + Start work asynchronously. + + Returns 'workflow_id' and 'run_id'. Use 'workflows-get_status' with the returned + IDs to retrieve status and results. + """ + return "started" +``` + +- Sync tool returns the final result; no status polling needed. +- Async tool returns IDs for polling via the generic `workflows-get_status` endpoint. + ## Multi-Agent Interaction Pattern One of the most powerful capabilities enabled by the MCP Agent Server pattern is multi-agent interaction. Here's a conceptual example: @@ -88,6 +118,7 @@ One of the most powerful capabilities enabled by the MCP Agent Server pattern is ``` In this example: + 1. Claude Desktop can use both agent servers 2. The Writing Agent can also use the Research Agent as a tool 3. All communication happens via the MCP protocol diff --git a/examples/mcp_agent_server/asyncio/README.md b/examples/mcp_agent_server/asyncio/README.md index 8b5b59965..e0f13de70 100644 --- a/examples/mcp_agent_server/asyncio/README.md +++ b/examples/mcp_agent_server/asyncio/README.md @@ -13,9 +13,49 @@ https://github.com/user-attachments/assets/f651af86-222d-4df0-8241-616414df66e4 - Creating workflows with the `Workflow` base class - Registering workflows with an `MCPApp` - Exposing workflows as MCP tools using `create_mcp_server_for_app`, optionally using custom FastMCP settings +- Preferred: Declaring MCP tools with `@app.tool` and `@app.async_tool` - Connecting to an MCP server using `gen_client` - Running workflows remotely and monitoring their status +## Preferred: Define tools with decorators + +You can declare tools directly from plain Python functions using `@app.tool` (sync) and `@app.async_tool` (async). This is the simplest and recommended way to expose agent logic. + +```python +from mcp_agent.app import MCPApp +from typing import Optional + +app = MCPApp(name="basic_agent_server") + +# Synchronous tool – returns the final result to the caller +@app.tool +async def grade_story(story: str, app_ctx: Optional[Context] = None) -> str: + """ + Grade a student's short story and return a structured report. + """ + # ... implement using your agents/LLMs ... + return "Report..." + +# Asynchronous tool – starts a workflow and returns IDs to poll later +@app.async_tool(name="grade_story_async") +async def grade_story_async(story: str, app_ctx: Optional[Context] = None) -> str: + """ + Start grading the story asynchronously. + + This tool starts the workflow and returns 'workflow_id' and 'run_id'. Use the + generic 'workflows-get_status' tool with the returned IDs to retrieve status/results. + """ + # ... implement using your agents/LLMs ... + return "(async run)" +``` + +What gets exposed: + +- Sync tools appear as `` and return the final result (no status polling needed). +- Async tools appear as `` and return `{"workflow_id","run_id"}`; use `workflows-get_status` to query status. + +These decorator-based tools are registered automatically when you call `create_mcp_server_for_app(app)`. + ## Components in this Example 1. **BasicAgentWorkflow**: A simple workflow that demonstrates basic agent functionality: @@ -34,12 +74,16 @@ https://github.com/user-attachments/assets/f651af86-222d-4df0-8241-616414df66e4 The MCP agent server exposes the following tools: -- `workflows-list` - Lists all available workflows -- `workflows-BasicAgentWorkflow-run` - Runs the BasicAgentWorkflow, returns the wf run ID -- `workflows-BasicAgentWorkflow-get_status` - Gets the status of a running workflow -- `workflows-ParallelWorkflow-run` - Runs the ParallelWorkflow, returns the wf run ID -- `workflows-ParallelWorkflow-get_status` - Gets the status of a running workflow -- `workflows-cancel` - Cancels a running workflow +- `workflows-list` - Lists available workflows and their parameter schemas +- `workflows-get_status` - Get status for a running workflow by `run_id` (and optional `workflow_id`) +- `workflows-cancel` - Cancel a running workflow + +If you use the preferred decorator approach: + +- Sync tool: `grade_story` (returns final result) +- Async tool: `grade_story_async` (returns `workflow_id/run_id`; poll with `workflows-get_status`) + +The workflow-based endpoints (e.g., `workflows--run`) are still available when you define explicit workflow classes. ## Prerequisites @@ -55,17 +99,18 @@ Before running the example, you'll need to configure the necessary paths and API 1. Copy the example secrets file: - ```bash - cp mcp_agent.secrets.yaml.example mcp_agent.secrets.yaml - ``` +``` +cp mcp_agent.secrets.yaml.example mcp_agent.secrets.yaml +``` 2. Edit `mcp_agent.secrets.yaml` to add your API keys: - ```yaml - anthropic: - api_key: "your-anthropic-api-key" - openai: - api_key: "your-openai-api-key" - ``` + +``` +anthropic: + api_key: "your-anthropic-api-key" +openai: + api_key: "your-openai-api-key" +``` ## How to Run @@ -73,7 +118,7 @@ Before running the example, you'll need to configure the necessary paths and API The simplest way to run the example is using the provided client script: -```bash +``` # Make sure you're in the mcp_agent_server/asyncio directory uv run client.py ``` @@ -91,21 +136,52 @@ You can also run the server and client separately: 1. In one terminal, start the server: - ```bash - uv run basic_agent_server.py +``` +uv run basic_agent_server.py - # Optionally, run with the example custom FastMCP settings - uv run basic_agent_server.py --custom-fastmcp-settings - ``` +# Optionally, run with the example custom FastMCP settings +uv run basic_agent_server.py --custom-fastmcp-settings +``` 2. In another terminal, run the client: - ```bash - uv run client.py +``` +uv run client.py + +# Optionally, run with the example custom FastMCP settings +uv run client.py --custom-fastmcp-settings +``` + +## Receiving Server Logs in the Client + +The server advertises the `logging` capability (via `logging/setLevel`) and forwards its structured logs upstream using `notifications/message`. To receive these logs in a client session, pass a `logging_callback` when constructing the client session and set the desired level: + +```python +from datetime import timedelta +from anyio.streams.memory import MemoryObjectReceiveStream, MemoryObjectSendStream +from mcp import ClientSession +from mcp.types import LoggingMessageNotificationParams +from mcp_agent.mcp.mcp_agent_client_session import MCPAgentClientSession + +async def on_server_log(params: LoggingMessageNotificationParams) -> None: + print(f"[SERVER LOG] [{params.level.upper()}] [{params.logger}] {params.data}") + +def make_session(read_stream: MemoryObjectReceiveStream, + write_stream: MemoryObjectSendStream, + read_timeout_seconds: timedelta | None) -> ClientSession: + return MCPAgentClientSession( + read_stream=read_stream, + write_stream=write_stream, + read_timeout_seconds=read_timeout_seconds, + logging_callback=on_server_log, + ) - # Optionally, run with the example custom FastMCP settings - uv run client.py --custom-fastmcp-settings - ``` +# Later, when connecting via gen_client(..., client_session_factory=make_session) +# you can request the minimum server log level: +# await server.set_logging_level("info") +``` + +The example client (`client.py`) demonstrates this end-to-end: it registers a logging callback and calls `set_logging_level("info")` so logs from the server appear in the client's console. ## MCP Clients @@ -116,7 +192,7 @@ like any other MCP server. You can inspect and test the server using [MCP Inspector](https://github.com/modelcontextprotocol/inspector): -```bash +``` npx @modelcontextprotocol/inspector \ uv \ --directory /path/to/mcp-agent/examples/mcp_agent_server/asyncio \ @@ -138,41 +214,41 @@ To use this server with Claude Desktop: 2. Add a new server configuration: - ```json - "basic-agent-server": { - "command": "/path/to/uv", - "args": [ - "--directory", - "/path/to/mcp-agent/examples/mcp_agent_server/asyncio", - "run", - "basic_agent_server.py" - ] - } - ``` +```json +"basic-agent-server": { + "command": "/path/to/uv", + "args": [ + "--directory", + "/path/to/mcp-agent/examples/mcp_agent_server/asyncio", + "run", + "basic_agent_server.py" + ] +} +``` 3. Restart Claude Desktop, and you'll see the server available in the tool drawer 4. (**claude desktop workaround**) Update `mcp_agent.config.yaml` file with the full paths to npx/uvx on your system: - Find the full paths to `uvx` and `npx` on your system: - - ```bash - which uvx - which npx - ``` - - Update the `mcp_agent.config.yaml` file with these paths: - - ```yaml - mcp: - servers: - fetch: - command: "/full/path/to/uvx" # Replace with your path - args: ["mcp-server-fetch"] - filesystem: - command: "/full/path/to/npx" # Replace with your path - args: ["-y", "@modelcontextprotocol/server-filesystem"] - ``` +Find the full paths to `uvx` and `npx` on your system: + +``` +which uvx +which npx +``` + +Update the `mcp_agent.config.yaml` file with these paths: + +```yaml +mcp: + servers: + fetch: + command: "/full/path/to/uvx" # Replace with your path + args: ["mcp-server-fetch"] + filesystem: + command: "/full/path/to/npx" # Replace with your path + args: ["-y", "@modelcontextprotocol/server-filesystem"] +``` ## Code Structure diff --git a/examples/mcp_agent_server/asyncio/basic_agent_server.py b/examples/mcp_agent_server/asyncio/basic_agent_server.py index f54f171d9..ff498fcad 100644 --- a/examples/mcp_agent_server/asyncio/basic_agent_server.py +++ b/examples/mcp_agent_server/asyncio/basic_agent_server.py @@ -10,10 +10,10 @@ import argparse import asyncio import os -import logging -from typing import Dict, Any +from typing import Dict, Any, Optional from mcp.server.fastmcp import FastMCP +from mcp_agent.core.context import Context as AppContext from mcp_agent.app import MCPApp from mcp_agent.server.app_server import create_mcp_server_for_app @@ -26,15 +26,12 @@ from mcp_agent.executor.workflow import Workflow, WorkflowResult from mcp_agent.tracing.token_counter import TokenNode -# Initialize logging -logging.basicConfig(level=logging.INFO) -logger = logging.getLogger(__name__) - # Note: This is purely optional: # if not provided, a default FastMCP server will be created by MCPApp using create_mcp_server_for_app() mcp = FastMCP(name="basic_agent_server", description="My basic agent server example.") -# Define the MCPApp instance +# Define the MCPApp instance. The server created for this app will advertise the +# MCP logging capability and forward structured logs upstream to connected clients. app = MCPApp( name="basic_agent_server", description="Basic agent server example", @@ -112,71 +109,146 @@ async def run(self, input: str) -> WorkflowResult[str]: return WorkflowResult(value=result) -@app.workflow -class ParallelWorkflow(Workflow[str]): +@app.tool +async def grade_story(story: str, app_ctx: Optional[AppContext] = None) -> str: """ - This workflow can be used to grade a student's short story submission and generate a report. + This tool can be used to grade a student's short story submission and generate a report. It uses multiple agents to perform different tasks in parallel. The agents include: - Proofreader: Reviews the story for grammar, spelling, and punctuation errors. - Fact Checker: Verifies the factual consistency within the story. - Style Enforcer: Analyzes the story for adherence to style guidelines. - Grader: Compiles the feedback from the other agents into a structured report. + + Args: + story: The student's short story to grade + app_ctx: Optional MCPApp context for accessing app resources and logging """ + # Use the context's app if available for proper logging with upstream_session + _app = app_ctx.app if app_ctx else app + # Ensure the app's logger is bound to the current context with upstream_session + if _app._logger and hasattr(_app._logger, "_bound_context"): + _app._logger._bound_context = app_ctx + logger = _app.logger + logger.info(f"grade_story: Received input: {story}") + + proofreader = Agent( + name="proofreader", + instruction=""""Review the short story for grammar, spelling, and punctuation errors. + Identify any awkward phrasing or structural issues that could improve clarity. + Provide detailed feedback on corrections.""", + ) - @app.workflow_run - async def run(self, input: str) -> WorkflowResult[str]: - """ - Run the workflow, processing the input data. + fact_checker = Agent( + name="fact_checker", + instruction="""Verify the factual consistency within the story. Identify any contradictions, + logical inconsistencies, or inaccuracies in the plot, character actions, or setting. + Highlight potential issues with reasoning or coherence.""", + ) - Args: - input_data: The data to process + style_enforcer = Agent( + name="style_enforcer", + instruction="""Analyze the story for adherence to style guidelines. + Evaluate the narrative flow, clarity of expression, and tone. Suggest improvements to + enhance storytelling, readability, and engagement.""", + ) - Returns: - A WorkflowResult containing the processed data - """ + grader = Agent( + name="grader", + instruction="""Compile the feedback from the Proofreader, Fact Checker, and Style Enforcer + into a structured report. Summarize key issues and categorize them by type. + Provide actionable recommendations for improving the story, + and give an overall grade based on the feedback.""", + ) - proofreader = Agent( - name="proofreader", - instruction=""""Review the short story for grammar, spelling, and punctuation errors. - Identify any awkward phrasing or structural issues that could improve clarity. - Provide detailed feedback on corrections.""", - ) + parallel = ParallelLLM( + fan_in_agent=grader, + fan_out_agents=[proofreader, fact_checker, style_enforcer], + llm_factory=OpenAIAugmentedLLM, + context=app_ctx if app_ctx else app.context, + ) - fact_checker = Agent( - name="fact_checker", - instruction="""Verify the factual consistency within the story. Identify any contradictions, - logical inconsistencies, or inaccuracies in the plot, character actions, or setting. - Highlight potential issues with reasoning or coherence.""", + try: + result = await parallel.generate_str( + message=f"Student short story submission: {story}", ) + except Exception as e: + logger.error(f"grade_story: Error generating result: {e}") + return None - style_enforcer = Agent( - name="style_enforcer", - instruction="""Analyze the story for adherence to style guidelines. - Evaluate the narrative flow, clarity of expression, and tone. Suggest improvements to - enhance storytelling, readability, and engagement.""", - ) + if not result: + logger.error("grade_story: No result from parallel LLM") + else: + logger.info(f"grade_story: Result: {result}") - grader = Agent( - name="grader", - instruction="""Compile the feedback from the Proofreader, Fact Checker, and Style Enforcer - into a structured report. Summarize key issues and categorize them by type. - Provide actionable recommendations for improving the story, - and give an overall grade based on the feedback.""", - ) + return result - parallel = ParallelLLM( - fan_in_agent=grader, - fan_out_agents=[proofreader, fact_checker, style_enforcer], - llm_factory=OpenAIAugmentedLLM, - context=app.context, - ) +@app.async_tool(name="grade_story_async") +async def grade_story_async(story: str, app_ctx: Optional[AppContext] = None) -> str: + """ + Async variant of grade_story that starts a workflow run and returns IDs. + + Args: + story: The student's short story to grade + app_ctx: Optional MCPApp context for accessing app resources and logging + """ + + # Use the context's app if available for proper logging with upstream_session + _app = app_ctx.app if app_ctx else app + # Ensure the app's logger is bound to the current context with upstream_session + if _app._logger and hasattr(_app._logger, "_bound_context"): + _app._logger._bound_context = app_ctx + logger = _app.logger + logger.info(f"grade_story_async: Received input: {story}") + + proofreader = Agent( + name="proofreader", + instruction="""Review the short story for grammar, spelling, and punctuation errors. + Identify any awkward phrasing or structural issues that could improve clarity. + Provide detailed feedback on corrections.""", + ) + + fact_checker = Agent( + name="fact_checker", + instruction="""Verify the factual consistency within the story. Identify any contradictions, + logical inconsistencies, or inaccuracies in the plot, character actions, or setting. + Highlight potential issues with reasoning or coherence.""", + ) + + style_enforcer = Agent( + name="style_enforcer", + instruction="""Analyze the story for adherence to style guidelines. + Evaluate the narrative flow, clarity of expression, and tone. Suggest improvements to + enhance storytelling, readability, and engagement.""", + ) + + grader = Agent( + name="grader", + instruction="""Compile the feedback from the Proofreader, Fact Checker, and Style Enforcer + into a structured report. Summarize key issues and categorize them by type. + Provide actionable recommendations for improving the story, + and give an overall grade based on the feedback.""", + ) + + parallel = ParallelLLM( + fan_in_agent=grader, + fan_out_agents=[proofreader, fact_checker, style_enforcer], + llm_factory=OpenAIAugmentedLLM, + context=app_ctx if app_ctx else app.context, + ) + + logger.info("grade_story_async: Starting parallel LLM") + + try: result = await parallel.generate_str( - message=f"Student short story submission: {input}", + message=f"Student short story submission: {story}", ) + except Exception as e: + logger.error(f"grade_story_async: Error generating result: {e}") + return None - return WorkflowResult(value=result) + return result # Add custom tool to get token usage for a workflow @@ -313,11 +385,11 @@ async def main(): context.config.mcp.servers["filesystem"].args.extend([os.getcwd()]) # Log registered workflows and agent configurations - logger.info(f"Creating MCP server for {agent_app.name}") + agent_app.logger.info(f"Creating MCP server for {agent_app.name}") - logger.info("Registered workflows:") + agent_app.logger.info("Registered workflows:") for workflow_id in agent_app.workflows: - logger.info(f" - {workflow_id}") + agent_app.logger.info(f" - {workflow_id}") # Create the MCP server that exposes both workflows and agent configurations, # optionally using custom FastMCP settings @@ -327,7 +399,7 @@ async def main(): else None ) mcp_server = create_mcp_server_for_app(agent_app, **(fast_mcp_settings or {})) - logger.info(f"MCP Server settings: {mcp_server.settings}") + agent_app.logger.info(f"MCP Server settings: {mcp_server.settings}") # Run the server await mcp_server.run_stdio_async() diff --git a/examples/mcp_agent_server/asyncio/client.py b/examples/mcp_agent_server/asyncio/client.py index 27ea442ed..73d6a2453 100644 --- a/examples/mcp_agent_server/asyncio/client.py +++ b/examples/mcp_agent_server/asyncio/client.py @@ -2,11 +2,15 @@ import asyncio import json import time -from mcp.types import CallToolResult +from datetime import timedelta +from anyio.streams.memory import MemoryObjectReceiveStream, MemoryObjectSendStream +from mcp import ClientSession +from mcp.types import CallToolResult, LoggingMessageNotificationParams from mcp_agent.app import MCPApp from mcp_agent.config import MCPServerSettings from mcp_agent.executor.workflow import WorkflowExecution from mcp_agent.mcp.gen_client import gen_client +from mcp_agent.mcp.mcp_agent_client_session import MCPAgentClientSession from rich import print @@ -18,6 +22,12 @@ async def main(): action="store_true", help="Enable custom FastMCP settings for the server", ) + parser.add_argument( + "--server-log-level", + type=str, + default=None, + help="Set initial server logging level (debug, info, notice, warning, error, critical, alert, emergency)", + ) args = parser.parse_args() use_custom_fastmcp_settings = args.custom_fastmcp_settings @@ -45,7 +55,41 @@ async def main(): ) # Connect to the workflow server - async with gen_client("basic_agent_server", context.server_registry) as server: + # Define a logging callback to receive server-side log notifications + async def on_server_log(params: LoggingMessageNotificationParams) -> None: + # Pretty-print server logs locally for demonstration + level = params.level.upper() + name = params.logger or "server" + # params.data can be any JSON-serializable data + print(f"[SERVER LOG] [{level}] [{name}] {params.data}") + + # Provide a client session factory that installs our logging callback + def make_session( + read_stream: MemoryObjectReceiveStream, + write_stream: MemoryObjectSendStream, + read_timeout_seconds: timedelta | None, + ) -> ClientSession: + return MCPAgentClientSession( + read_stream=read_stream, + write_stream=write_stream, + read_timeout_seconds=read_timeout_seconds, + logging_callback=on_server_log, + ) + + async with gen_client( + "basic_agent_server", + context.server_registry, + client_session_factory=make_session, + ) as server: + # Ask server to send logs at the requested level (default info) + level = (args.server_log_level or "info").lower() + print(f"[client] Setting server logging level to: {level}") + try: + await server.set_logging_level(level) + except Exception: + # Older servers may not support logging capability + print("[client] Server does not support logging/setLevel") + # List available tools tools_result = await server.list_tools() logger.info( @@ -61,7 +105,7 @@ async def main(): data=_tool_result_to_json(workflows_response) or workflows_response, ) - # Call the BasicAgentWorkflow + # Call the BasicAgentWorkflow (run + status) run_result = await server.call_tool( "workflows-BasicAgentWorkflow-run", arguments={ @@ -71,7 +115,22 @@ async def main(): }, ) - execution = WorkflowExecution(**json.loads(run_result.content[0].text)) + # Tolerant parsing of run IDs from tool result + run_payload = _tool_result_to_json(run_result) + if not run_payload: + sc = getattr(run_result, "structuredContent", None) + if isinstance(sc, dict): + run_payload = sc.get("result") or sc + if not run_payload: + # Last resort: parse unstructured content if present and non-empty + if getattr(run_result, "content", None) and run_result.content[0].text: + run_payload = json.loads(run_result.content[0].text) + else: + raise RuntimeError( + "Unable to extract workflow run IDs from tool result" + ) + + execution = WorkflowExecution(**run_payload) run_id = execution.run_id logger.info( f"Started BasicAgentWorkflow-run. workflow ID={execution.workflow_id}, run ID={run_id}" @@ -84,7 +143,12 @@ async def main(): arguments={"run_id": run_id}, ) + # Tolerant parsing of get_status result workflow_status = _tool_result_to_json(get_status_result) + if workflow_status is None: + sc = getattr(get_status_result, "structuredContent", None) + if isinstance(sc, dict): + workflow_status = sc.get("result") or sc if workflow_status is None: logger.error( f"Failed to parse workflow status response: {get_status_result}" @@ -108,7 +172,6 @@ async def main(): f"Workflow run {run_id} completed successfully! Result:", data=workflow_status.get("result"), ) - break elif workflow_status.get("status") == "error": logger.error( @@ -135,12 +198,6 @@ async def main(): await asyncio.sleep(5) - # TODO: UNCOMMENT ME to try out cancellation: - # await server.call_tool( - # "workflows-cancel", - # arguments={"workflow_id": "BasicAgentWorkflow", "run_id": run_id}, - # ) - # Get the token usage summary logger.info("Fetching token usage summary...") token_usage_result = await server.call_tool( @@ -159,6 +216,72 @@ async def main(): # Display the token usage summary print(token_usage_result.structuredContent) + await asyncio.sleep(5) + + # Call the sync tool 'grade_story' separately (no run/status loop) + try: + grade_result = await server.call_tool( + "grade_story", + arguments={"story": "This is a test story."}, + ) + grade_payload = _tool_result_to_json(grade_result) or ( + ( + grade_result.structuredContent.get("result") + if getattr(grade_result, "structuredContent", None) + else None + ) + or (grade_result.content[0].text if grade_result.content else None) + ) + logger.info("grade_story result:", data=grade_payload) + except Exception as e: + logger.error("grade_story call failed", data=str(e)) + + # Call the async tool 'grade_story_async': start then poll status + try: + async_run_result = await server.call_tool( + "grade_story_async", + arguments={"story": "This is a test story."}, + ) + async_ids = ( + (getattr(async_run_result, "structuredContent", {}) or {}).get( + "result" + ) + or _tool_result_to_json(async_run_result) + or json.loads(async_run_result.content[0].text) + ) + async_run_id = async_ids["run_id"] + logger.info( + f"Started grade_story_async. run ID={async_run_id}", + ) + + # Poll status until completion + while True: + async_status = await server.call_tool( + "workflows-get_status", + arguments={"run_id": async_run_id}, + ) + async_status_json = ( + getattr(async_status, "structuredContent", {}) or {} + ).get("result") or _tool_result_to_json(async_status) + if async_status_json is None: + logger.error( + "grade_story_async: failed to parse status", + data=async_status, + ) + break + logger.info("grade_story_async status:", data=async_status_json) + if async_status_json.get("status") in ( + "completed", + "error", + "cancelled", + ): + break + await asyncio.sleep(2) + except Exception as e: + logger.error("grade_story_async call failed", data=str(e)) + + await asyncio.sleep(5) + def _tool_result_to_json(tool_result: CallToolResult): if tool_result.content and len(tool_result.content) > 0: diff --git a/examples/mcp_agent_server/temporal/basic_agent_server.py b/examples/mcp_agent_server/temporal/basic_agent_server.py index d56a492fc..5a29ab4f0 100644 --- a/examples/mcp_agent_server/temporal/basic_agent_server.py +++ b/examples/mcp_agent_server/temporal/basic_agent_server.py @@ -14,6 +14,7 @@ from mcp_agent.app import MCPApp from mcp_agent.agents.agent import Agent +from mcp_agent.core.context import Context from mcp_agent.executor.workflow_signal import Signal from mcp_agent.server.app_server import create_mcp_server_for_app from mcp_agent.executor.workflow import Workflow, WorkflowResult @@ -68,6 +69,48 @@ async def run( return WorkflowResult(value=result) +@app.tool +async def finder_tool(request: str, app_ctx: Context | None = None) -> str: + """ + Run the basic agent workflow using the app.tool decorator to set up the workflow. + The code in this function is run in workflow context. + LLM calls are executed in the activity context. + You can use the app_ctx to access the executor to run activities explicitly. + Functions decorated with @app.workflow_task will be run in activity context. + + Args: + input: The input string to prompt the agent. + + Returns: + The result of the agent call. This tool will be run syncronously and block until workflow completion. + To create this as an async tool, use @app.async_tool instead, which will return the workflow ID and run ID. + """ + + app = app_ctx.app + + logger = app.logger + logger.info(f"Running finder_tool with input: {request}") + + finder_agent = Agent( + name="finder", + instruction="""You are a helpful assistant.""", + server_names=["fetch", "filesystem"], + ) + + context = app.context + context.config.mcp.servers["filesystem"].args.extend([os.getcwd()]) + + async with finder_agent: + finder_llm = await finder_agent.attach_llm(OpenAIAugmentedLLM) + + result = await finder_llm.generate_str( + message=request, + ) + logger.info(f"Agent result: {result}") + + return result + + @app.workflow class PauseResumeWorkflow(Workflow[str]): """ diff --git a/examples/mcp_agent_server/temporal/client.py b/examples/mcp_agent_server/temporal/client.py index 634dea5c3..dd5f9b1ee 100644 --- a/examples/mcp_agent_server/temporal/client.py +++ b/examples/mcp_agent_server/temporal/client.py @@ -7,6 +7,15 @@ from mcp_agent.executor.workflow import WorkflowExecution from mcp_agent.mcp.gen_client import gen_client +try: + from exceptiongroup import ExceptionGroup as _ExceptionGroup # Python 3.10 backport +except Exception: # pragma: no cover + _ExceptionGroup = None # type: ignore +try: + from anyio import BrokenResourceError as _BrokenResourceError +except Exception: # pragma: no cover + _BrokenResourceError = None # type: ignore + async def main(): # Create MCPApp to get the server registry @@ -27,88 +36,137 @@ async def main(): ) # Connect to the workflow server - async with gen_client("basic_agent_server", context.server_registry) as server: - # Call the BasicAgentWorkflow - run_result = await server.call_tool( - "workflows-BasicAgentWorkflow-run", - arguments={ - "run_parameters": { - "input": "Print the first 2 paragraphs of https://modelcontextprotocol.io/introduction" - } - }, - ) - - execution = WorkflowExecution(**json.loads(run_result.content[0].text)) - run_id = execution.run_id - logger.info( - f"Started BasicAgentWorkflow-run. workflow ID={execution.workflow_id}, run ID={run_id}" - ) - - # Wait for the workflow to complete - while True: - get_status_result = await server.call_tool( - "workflows-BasicAgentWorkflow-get_status", - arguments={"run_id": run_id}, + try: + async with gen_client( + "basic_agent_server", context.server_registry + ) as server: + # Call the BasicAgentWorkflow + run_result = await server.call_tool( + "workflows-BasicAgentWorkflow-run", + arguments={ + "run_parameters": { + "input": "Print the first 2 paragraphs of https://modelcontextprotocol.io/introduction" + } + }, ) - workflow_status = _tool_result_to_json(get_status_result) - if workflow_status is None: - logger.error( - f"Failed to parse workflow status response: {get_status_result}" - ) - break - + execution = WorkflowExecution(**json.loads(run_result.content[0].text)) + run_id = execution.run_id logger.info( - f"Workflow run {run_id} status:", - data=workflow_status, + f"Started BasicAgentWorkflow-run. workflow ID={execution.workflow_id}, run ID={run_id}" ) - if not workflow_status.get("status"): - logger.error( - f"Workflow run {run_id} status is empty. get_status_result:", - data=get_status_result, + # Wait for the workflow to complete + while True: + get_status_result = await server.call_tool( + "workflows-BasicAgentWorkflow-get_status", + arguments={"run_id": run_id}, ) - break - if workflow_status.get("status") == "completed": - logger.info( - f"Workflow run {run_id} completed successfully! Result:", - data=workflow_status.get("result"), - ) + workflow_status = _tool_result_to_json(get_status_result) + if workflow_status is None: + logger.error( + f"Failed to parse workflow status response: {get_status_result}" + ) + break - break - elif workflow_status.get("status") == "error": - logger.error( - f"Workflow run {run_id} failed with error:", + logger.info( + f"Workflow run {run_id} status:", data=workflow_status, ) - break - elif workflow_status.get("status") == "running": - logger.info( - f"Workflow run {run_id} is still running...", + + if not workflow_status.get("status"): + logger.error( + f"Workflow run {run_id} status is empty. get_status_result:", + data=get_status_result, + ) + break + + if workflow_status.get("status") == "completed": + logger.info( + f"Workflow run {run_id} completed successfully! Result:", + data=workflow_status.get("result"), + ) + + break + elif workflow_status.get("status") == "error": + logger.error( + f"Workflow run {run_id} failed with error:", + data=workflow_status, + ) + break + elif workflow_status.get("status") == "running": + logger.info( + f"Workflow run {run_id} is still running...", + ) + elif workflow_status.get("status") == "cancelled": + logger.error( + f"Workflow run {run_id} was cancelled.", + data=workflow_status, + ) + break + else: + logger.error( + f"Unknown workflow status: {workflow_status.get('status')}", + data=workflow_status, + ) + break + + await asyncio.sleep(5) + + # TODO: UNCOMMENT ME to try out cancellation: + # await server.call_tool( + # "workflows-cancel", + # arguments={"workflow_id": "BasicAgentWorkflow", "run_id": run_id}, + # ) + + print(run_result) + + # Call the sync tool 'finder_tool' (no run/status loop) + try: + finder_result = await server.call_tool( + "finder_tool", + arguments={ + "request": "Summarize the Model Context Protocol introduction from https://modelcontextprotocol.io/introduction." + }, ) - elif workflow_status.get("status") == "cancelled": - logger.error( - f"Workflow run {run_id} was cancelled.", - data=workflow_status, + finder_payload = _tool_result_to_json(finder_result) or ( + ( + finder_result.structuredContent.get("result") + if getattr(finder_result, "structuredContent", None) + else None + ) + or ( + finder_result.content[0].text + if getattr(finder_result, "content", None) + else None + ) ) - break + logger.info("finder_tool result:", data=finder_payload) + except Exception as e: + logger.error("finder_tool call failed", data=str(e)) + except Exception as e: + # Tolerate benign shutdown races from SSE client (BrokenResourceError within ExceptionGroup) + if _ExceptionGroup is not None and isinstance(e, _ExceptionGroup): + subs = getattr(e, "exceptions", []) or [] + if ( + _BrokenResourceError is not None + and subs + and all(isinstance(se, _BrokenResourceError) for se in subs) + ): + logger.debug("Ignored BrokenResourceError from SSE shutdown") else: - logger.error( - f"Unknown workflow status: {workflow_status.get('status')}", - data=workflow_status, - ) - break - - await asyncio.sleep(5) - - # TODO: UNCOMMENT ME to try out cancellation: - # await server.call_tool( - # "workflows-cancel", - # arguments={"workflow_id": "BasicAgentWorkflow", "run_id": run_id}, - # ) - - print(run_result) + raise + elif _BrokenResourceError is not None and isinstance( + e, _BrokenResourceError + ): + logger.debug("Ignored BrokenResourceError from SSE shutdown") + elif "BrokenResourceError" in str(e): + logger.debug( + "Ignored BrokenResourceError from SSE shutdown (string match)" + ) + else: + raise def _tool_result_to_json(tool_result: CallToolResult): diff --git a/pyproject.toml b/pyproject.toml index aa8fdf819..99ead1149 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -1,6 +1,6 @@ [project] name = "mcp-agent" -version = "0.1.14" +version = "0.1.15" description = "Build effective agents with Model Context Protocol (MCP) using simple, composable patterns." readme = "README.md" license = { file = "LICENSE" } diff --git a/src/mcp_agent/app.py b/src/mcp_agent/app.py index c4fb8f6c2..925c91d51 100644 --- a/src/mcp_agent/app.py +++ b/src/mcp_agent/app.py @@ -189,7 +189,18 @@ def session_id(self): def logger(self): if self._logger is None: session_id = self._context.session_id if self._context else None + # Do not pass context kwarg to match expected call signature in tests self._logger = get_logger(f"mcp_agent.{self.name}", session_id=session_id) + # Bind context for upstream forwarding and other contextual logging + try: + if self._context is not None: + self._logger._bound_context = self._context # type: ignore[attr-defined] + except Exception: + pass + else: + # Update the logger's bound context in case upstream_session was set after logger creation + if self._context and hasattr(self._logger, "_bound_context"): + self._logger._bound_context = self._context return self._logger async def initialize(self): @@ -543,9 +554,73 @@ def _create_workflow_from_function( import asyncio as _asyncio from mcp_agent.executor.workflow import Workflow as _Workflow - async def _invoke_target(*args, **kwargs): + async def _invoke_target(workflow_self, *args, **kwargs): + # Inject app_ctx (AppContext) and shim ctx (FastMCP Context) if requested by the function + import inspect as _inspect + + call_kwargs = dict(kwargs) + + # If Temporal passed a single positional dict payload, merge into kwargs + if len(args) == 1 and isinstance(args[0], dict): + try: + call_kwargs = {**args[0], **call_kwargs} + args = () + except Exception: + pass + + # Detect if function expects an AppContext parameter (named 'app_ctx' or annotated with our Context) + try: + sig = _inspect.signature(fn) + app_context_param_name = None + + for param_name, param in sig.parameters.items(): + if param_name == "app_ctx": + app_context_param_name = param_name + break + if param.annotation != _inspect.Parameter.empty: + ann_str = str(param.annotation) + if "mcp_agent.core.context.Context" in ann_str: + app_context_param_name = param_name + break + # If requested, inject the workflow's context (use property for fallback) + if app_context_param_name: + try: + _ctx_obj = workflow_self.context + except Exception: + _ctx_obj = getattr(workflow_self, "_context", None) + if _ctx_obj is not None: + call_kwargs[app_context_param_name] = _ctx_obj + except Exception: + pass + + # If the function expects a FastMCP Context (ctx/context), ensure it's present (None inside workflow) + try: + from mcp.server.fastmcp import Context as _Ctx # type: ignore + except Exception: + _Ctx = None # type: ignore + + try: + sig = sig if "sig" in locals() else _inspect.signature(fn) + for p in sig.parameters.values(): + if ( + p.annotation is not _inspect._empty + and _Ctx is not None + and p.annotation is _Ctx + ): + if p.name not in call_kwargs: + call_kwargs[p.name] = None + if p.name in ("ctx", "context") and p.name not in call_kwargs: + call_kwargs[p.name] = None + except Exception: + pass + + # If user passed a single positional dict (Temporal AutoWorkflow payload), merge it + if not call_kwargs and len(args) == 1 and isinstance(args[0], dict): + call_kwargs = dict(args[0]) + args = () + # Support both async and sync callables - res = fn(*args, **kwargs) + res = fn(*args, **call_kwargs) if _asyncio.iscoroutine(res): res = await res @@ -562,10 +637,17 @@ async def _invoke_target(*args, **kwargs): return res async def _run(self, *args, **kwargs): # type: ignore[no-redef] - return await _invoke_target(*args, **kwargs) + return await _invoke_target(self, *args, **kwargs) # Decorate run with engine-specific decorator - decorated_run = self.workflow_run(_run) + engine_type = self.config.execution_engine + if engine_type == "temporal": + # Temporal requires the @workflow.run to be applied on a top-level + # class method, not on a local function. We'll assign _run as-is + # for now and decorate it after creating and publishing the class. + decorated_run = _run + else: + decorated_run = self.workflow_run(_run) # Build the Workflow subclass dynamically cls_dict: Dict[str, Any] = { @@ -580,6 +662,40 @@ async def _run(self, *args, **kwargs): # type: ignore[no-redef] auto_cls = type(f"AutoWorkflow_{workflow_name}", (_Workflow,), cls_dict) + # Workaround for Temporal: publish the dynamically created class as a + # top-level (module global) so it is not considered a "local class". + # Temporal requires workflow classes to be importable from a module. + try: + import sys as _sys + + target_module = getattr(fn, "__module__", __name__) + auto_cls.__module__ = target_module + _mod = _sys.modules.get(target_module) + if _mod is not None: + setattr(_mod, auto_cls.__name__, auto_cls) + except Exception: + pass + + # For Temporal, now that the class exists and is published at module-level, + # decorate the run method with the engine-specific run decorator. + if engine_type == "temporal": + try: + run_decorator = self._decorator_registry.get_workflow_run_decorator( + engine_type + ) + if run_decorator: + fn_run = getattr(auto_cls, "run") + # Ensure method appears as top-level for Temporal + target_module = getattr(fn, "__module__", __name__) + try: + fn_run.__module__ = target_module # type: ignore[attr-defined] + fn_run.__qualname__ = f"{auto_cls.__name__}.run" # type: ignore[attr-defined] + except Exception: + pass + setattr(auto_cls, "run", run_decorator(fn_run)) + except Exception: + pass + # Register with app (and apply engine-specific workflow decorator) self.workflow(auto_cls, workflow_id=workflow_name) return auto_cls @@ -624,6 +740,12 @@ def decorator(fn: Callable[..., Any]) -> Callable[..., Any]: return fn + # Support bare usage: @app.tool without parentheses + if callable(name) and description is None and structured_output is None: + fn = name # type: ignore[assignment] + name = None + return decorator(fn) # type: ignore[arg-type] + return decorator def async_tool( @@ -661,6 +783,12 @@ def decorator(fn: Callable[..., Any]) -> Callable[..., Any]: ) return fn + # Support bare usage: @app.async_tool without parentheses + if callable(name) and description is None: + fn = name # type: ignore[assignment] + name = None + return decorator(fn) # type: ignore[arg-type] + return decorator def workflow_task( diff --git a/src/mcp_agent/executor/temporal/__init__.py b/src/mcp_agent/executor/temporal/__init__.py index 97ade82db..34984508e 100644 --- a/src/mcp_agent/executor/temporal/__init__.py +++ b/src/mcp_agent/executor/temporal/__init__.py @@ -175,7 +175,7 @@ async def _execute_task( try: result = await workflow.execute_activity( activity_task, - args=args, + *args, task_queue=self.config.task_queue, schedule_to_close_timeout=schedule_to_close, retry_policy=retry_policy, @@ -304,38 +304,45 @@ async def start_workflow( # Inspect the `run(self, …)` signature sig = inspect.signature(wf.run) + # Work with a signature that excludes any leading 'self' for binding/validation params = [p for p in sig.parameters.values() if p.name != "self"] - - # Bind args in declaration order - bound_args = [] - for idx, param in enumerate(params): - if idx < len(args): - bound_args.append(args[idx]) - elif param.name in kwargs: - bound_args.append(kwargs[param.name]) - elif param.default is not inspect.Parameter.empty: - # optional param, skip if not provided - continue - else: - raise ValueError(f"Missing required workflow argument '{param.name}'") - - # Too many positionals? - if len(args) > len(params): - raise ValueError( - f"Got {len(args)} positional args but run() only takes {len(params)}" - ) + has_var_positional = any( + p.kind == inspect.Parameter.VAR_POSITIONAL for p in params + ) + has_var_keyword = any(p.kind == inspect.Parameter.VAR_KEYWORD for p in params) + sig_no_self = inspect.Signature(parameters=params) # Determine what to pass to the start_workflow function - input_arg = None - if not bound_args: - # zero-arg workflow - pass - elif len(bound_args) == 1: - # single-arg workflow - input_arg = bound_args[0] + # If the workflow run is varargs/kwargs (AutoWorkflow), pass kwargs as a single payload + if has_var_keyword or has_var_positional: + input_arg = kwargs if kwargs else (args[0] if args else None) else: - # multi-arg workflow - pack into a sequence - input_arg = bound_args + # Bind provided args/kwargs to validate and order them against signature without 'self' + try: + bound = sig_no_self.bind_partial(*args, **kwargs) + except TypeError as e: + raise ValueError(str(e)) + + # Check for missing required (non-default) parameters + for p in params: + if p.default is inspect._empty and p.name not in bound.arguments: + raise ValueError(f"Missing required workflow argument '{p.name}'") + + bound_vals = [ + bound.arguments.get(p.name) for p in params if p.name in bound.arguments + ] + if len(bound_vals) == 0: + input_arg = None + elif len(bound_vals) == 1: + input_arg = bound_vals[0] + else: + input_arg = bound_vals + # Too many positionals for strict (non-varargs) run signatures? + if not (has_var_positional or has_var_keyword): + if len(args) > len(params): + raise ValueError( + f"Got {len(args)} positional args but run() only takes {len(params)}" + ) # Use provided workflow_id or generate a unique one if workflow_id is None: diff --git a/src/mcp_agent/executor/workflow.py b/src/mcp_agent/executor/workflow.py index a7b2c6ba0..e05b741cb 100644 --- a/src/mcp_agent/executor/workflow.py +++ b/src/mcp_agent/executor/workflow.py @@ -7,6 +7,7 @@ Any, Dict, Generic, + Literal, Optional, TypeVar, TYPE_CHECKING, @@ -53,6 +54,8 @@ def record_error(self, error: Exception) -> None: class WorkflowResult(BaseModel, Generic[T]): + # Discriminator to disambiguate from arbitrary dicts + kind: Literal["workflow_result"] = "workflow_result" value: Optional[T] = None metadata: Dict[str, Any] = Field(default_factory=dict) start_time: float | None = None @@ -94,7 +97,9 @@ def __init__( ContextDependent.__init__(self, context=context) self.name = name or self.__class__.__name__ - self._logger = get_logger(f"workflow.{self.name}") + # Bind workflow logger to the provided context so events can carry + # the current upstream_session even when emitted from background tasks. + self._logger = get_logger(f"workflow.{self.name}", context=context) self._initialized = False self._workflow_id = None # Will be set during run_async self._run_id = None # Will be set during run_async diff --git a/src/mcp_agent/logging/events.py b/src/mcp_agent/logging/events.py index 3f934d427..1244d7e54 100644 --- a/src/mcp_agent/logging/events.py +++ b/src/mcp_agent/logging/events.py @@ -50,6 +50,10 @@ class Event(BaseModel): data: Dict[str, Any] = Field(default_factory=dict) context: EventContext | None = None + # Runtime-only handle for upstream forwarding. Present for listeners to + # use, explicitly excluded from any serialization/dumps. + upstream_session: Any | None = Field(default=None, exclude=True) + # For distributed tracing span_id: str | None = None trace_id: str | None = None diff --git a/src/mcp_agent/logging/listeners.py b/src/mcp_agent/logging/listeners.py index 8d66b6862..0abc4329f 100644 --- a/src/mcp_agent/logging/listeners.py +++ b/src/mcp_agent/logging/listeners.py @@ -7,11 +7,24 @@ import time from abc import ABC, abstractmethod -from typing import Dict, List +from typing import Any, Dict, List, Optional, Protocol, TYPE_CHECKING from mcp_agent.logging.events import Event, EventFilter, EventType from mcp_agent.logging.event_progress import convert_log_event +if TYPE_CHECKING: # pragma: no cover - for type checking only + from mcp.types import LoggingLevel + + +class UpstreamServerSessionProtocol(Protocol): + async def send_log_message( + self, + level: "LoggingLevel", + data: Dict[str, Any], + logger: str | None = None, + related_request_id: str | None = None, + ) -> None: ... + class EventListener(ABC): """Base async listener that processes events.""" @@ -217,3 +230,68 @@ async def flush(self): async def _process_batch(self, events: List[Event]): pass + + +class MCPUpstreamLoggingListener(FilteredListener): + """ + Sends matched log events to the connected MCP client via the upstream_session + carried on each Event (runtime-only field). If no upstream_session is present, + the event is skipped. + """ + + def __init__(self, event_filter: EventFilter | None = None) -> None: + super().__init__(event_filter=event_filter) + + async def handle_matched_event(self, event: Event) -> None: + # Use upstream session provided on the event + upstream_session: Optional[UpstreamServerSessionProtocol] = getattr( + event, "upstream_session", None + ) + + if upstream_session is None: + # No upstream_session available, event cannot be forwarded + return + + # Map our EventType to MCP LoggingLevel; fold progress -> info + mcp_level_map: Dict[str, str] = { + "debug": "debug", + "info": "info", + "warning": "warning", + "error": "error", + "progress": "info", + } + # Use string type to avoid hard dependency; annotated for type checkers + mcp_level: "LoggingLevel" = mcp_level_map.get(event.type, "info") # type: ignore[assignment] + + # Build structured data payload + data: Dict[str, Any] = { + "message": event.message, + "namespace": event.namespace, + "name": event.name, + "timestamp": event.timestamp.isoformat(), + } + if event.data: + # Merge user-provided event data under 'data' + data["data"] = event.data + if event.trace_id or event.span_id: + data["trace"] = {"trace_id": event.trace_id, "span_id": event.span_id} + if event.context is not None: + try: + data["context"] = event.context.dict() + except Exception: + pass + + # Determine logger name (namespace + optional name) + logger_name: str = ( + event.namespace if not event.name else f"{event.namespace}.{event.name}" + ) + + try: + await upstream_session.send_log_message( + level=mcp_level, # type: ignore[arg-type] + data=data, + logger=logger_name, + ) + except Exception as e: + # Avoid raising inside listener; best-effort delivery + _ = e diff --git a/src/mcp_agent/logging/logger.py b/src/mcp_agent/logging/logger.py index dca6940e1..e69327d06 100644 --- a/src/mcp_agent/logging/logger.py +++ b/src/mcp_agent/logging/logger.py @@ -11,11 +11,16 @@ import threading import time -from typing import Any, Dict +from typing import Any, Dict, Final from contextlib import asynccontextmanager, contextmanager -from mcp_agent.logging.events import Event, EventContext, EventFilter, EventType +from mcp_agent.logging.events import ( + Event, + EventContext, + EventFilter, + EventType, +) from mcp_agent.logging.listeners import ( BatchingListener, LoggingListener, @@ -31,10 +36,16 @@ class Logger: - `name` can be a custom domain-specific event name, e.g. "ORDER_PLACED". """ - def __init__(self, namespace: str, session_id: str | None = None): + def __init__( + self, namespace: str, session_id: str | None = None, bound_context=None + ): self.namespace = namespace self.session_id = session_id self.event_bus = AsyncEventBus.get() + # Optional reference to an application/context object that may carry + # an "upstream_session" attribute. This allows cached loggers to + # observe the current upstream session without relying on globals. + self._bound_context = bound_context def _ensure_event_loop(self): """Ensure we have an event loop we can use.""" @@ -92,6 +103,25 @@ def event( elif context.session_id is None: context.session_id = self.session_id + # Attach upstream_session to the event so the upstream listener + # can forward reliably, regardless of the current task context. + # 1) Prefer logger-bound app context (set at creation or refreshed by caller) + extra_event_fields: Dict[str, Any] = {} + try: + upstream = ( + getattr(self._bound_context, "upstream_session", None) + if getattr(self, "_bound_context", None) is not None + else None + ) + if upstream is not None: + extra_event_fields["upstream_session"] = upstream + except Exception: + pass + + # No further fallbacks; upstream forwarding must be enabled by passing + # a bound context when creating the logger or by server code attaching + # upstream_session to the application context. + evt = Event( type=etype, name=ename, @@ -99,6 +129,7 @@ def event( message=message, context=context, data=data, + **extra_event_fields, ) self._emit_event(evt) @@ -212,7 +243,8 @@ async def async_event_context( class LoggingConfig: """Global configuration for the logging system.""" - _initialized = False + _initialized: bool = False + _event_filter_ref: EventFilter | None = None @classmethod async def configure( @@ -233,10 +265,32 @@ async def configure( flush_interval: Default flush interval for batching listener **kwargs: Additional configuration options """ + bus = AsyncEventBus.get(transport=transport) + # Keep a reference to the provided filter so we can update at runtime + if event_filter is not None: + cls._event_filter_ref = event_filter + + # If already initialized, ensure critical listeners exist and return if cls._initialized: - return + # Forward logs upstream via MCP notifications if upstream_session is configured + try: + from mcp_agent.logging.listeners import MCPUpstreamLoggingListener - bus = AsyncEventBus.get(transport=transport) + has_upstream_listener = any( + isinstance(listener, MCPUpstreamLoggingListener) + for listener in bus.listeners.values() + ) + if not has_upstream_listener: + from typing import Final as _Final + + MCP_UPSTREAM_LISTENER_NAME: _Final[str] = "mcp_upstream" + bus.add_listener( + MCP_UPSTREAM_LISTENER_NAME, + MCPUpstreamLoggingListener(event_filter=cls._event_filter_ref), + ) + except Exception: + pass + return # Add standard listeners if "logging" not in bus.listeners: @@ -259,6 +313,25 @@ async def configure( ), ) + # Forward logs upstream via MCP notifications if upstream_session is configured + # Avoid duplicate registration by checking existing instances, not key name. + try: + from mcp_agent.logging.listeners import MCPUpstreamLoggingListener + + has_upstream_listener = any( + isinstance(listener, MCPUpstreamLoggingListener) + for listener in bus.listeners.values() + ) + if not has_upstream_listener: + MCP_UPSTREAM_LISTENER_NAME: Final[str] = "mcp_upstream" + bus.add_listener( + MCP_UPSTREAM_LISTENER_NAME, + MCPUpstreamLoggingListener(event_filter=event_filter), + ) + except Exception: + # Non-fatal if import fails + pass + await bus.start() cls._initialized = True @@ -271,6 +344,31 @@ async def shutdown(cls): await bus.stop() cls._initialized = False + @classmethod + def set_min_level(cls, level: EventType | str) -> None: + """Update the minimum logging level on the shared event filter, if available.""" + if cls._event_filter_ref is None: + return + # Normalize level + normalized = str(level).lower() + # Map synonyms to our EventType scale + mapping: Dict[str, EventType] = { + "debug": "debug", + "info": "info", + "notice": "info", + "warning": "warning", + "warn": "warning", + "error": "error", + "critical": "error", + "alert": "error", + "emergency": "error", + } + cls._event_filter_ref.min_level = mapping.get(normalized, "info") + + @classmethod + def get_event_filter(cls) -> EventFilter | None: + return cls._event_filter_ref + @classmethod @asynccontextmanager async def managed(cls, **config_kwargs): @@ -286,7 +384,7 @@ async def managed(cls, **config_kwargs): _loggers: Dict[str, Logger] = {} -def get_logger(namespace: str, session_id: str | None = None) -> Logger: +def get_logger(namespace: str, session_id: str | None = None, context=None) -> Logger: """ Get a logger instance for a given namespace. Creates a new logger if one doesn't exist for this namespace. @@ -294,13 +392,24 @@ def get_logger(namespace: str, session_id: str | None = None) -> Logger: Args: namespace: The namespace for the logger (e.g. "agent.helper", "workflow.demo") session_id: Optional session ID to associate with all events from this logger + context: Deprecated/ignored. Present for backwards compatibility. Returns: A Logger instance for the given namespace """ with _logger_lock: - # Create a new logger if one doesn't exist - if namespace not in _loggers: - _loggers[namespace] = Logger(namespace, session_id) - return _loggers[namespace] + existing = _loggers.get(namespace) + if existing is None: + logger = Logger(namespace, session_id, bound_context=context) + _loggers[namespace] = logger + return logger + # Update session_id/bound context if caller provides them + if session_id is not None: + existing.session_id = session_id + if context is not None: + try: + existing._bound_context = context + except Exception: + pass + return existing diff --git a/src/mcp_agent/logging/transport.py b/src/mcp_agent/logging/transport.py index 1aa732992..2bf78a968 100644 --- a/src/mcp_agent/logging/transport.py +++ b/src/mcp_agent/logging/transport.py @@ -325,7 +325,15 @@ def reset(cls) -> None: # Signal shutdown cls._instance._running = False if hasattr(cls._instance, "_stop_event"): - cls._instance._stop_event.set() + try: + # _stop_event.set() schedules on the event's loop; this can fail if + # the loop is already closed in test teardown. Swallow to ensure + # reset never raises in those cases. + cls._instance._stop_event.set() + except RuntimeError: + pass + except Exception: + pass # Clear the singleton instance cls._instance = None diff --git a/src/mcp_agent/server/app_server.py b/src/mcp_agent/server/app_server.py index 3751cbe1a..023e544ed 100644 --- a/src/mcp_agent/server/app_server.py +++ b/src/mcp_agent/server/app_server.py @@ -21,6 +21,7 @@ InMemoryWorkflowRegistry, ) from mcp_agent.logging.logger import get_logger +from mcp_agent.logging.logger import LoggingConfig from mcp_agent.mcp.mcp_server_registry import ServerRegistry if TYPE_CHECKING: @@ -105,6 +106,34 @@ def _get_attached_server_context(mcp: FastMCP) -> ServerContext | None: return getattr(mcp, "_mcp_agent_server_context", None) +def _set_upstream_from_request_ctx_if_available(ctx: MCPContext) -> None: + """Attach the low-level server session to the app context for upstream log forwarding. + + This ensures logs emitted from background workflow tasks are forwarded to the client + even when the low-level request contextvar is not available in those tasks. + """ + # First, try to use the session property from the FastMCP Context + session = None + try: + session = ( + ctx.session + ) # This accesses the property which returns ctx.request_context.session + except (AttributeError, ValueError): + # ctx.session property might raise ValueError if context not available + pass + + if session is not None: + app: MCPApp | None = _get_attached_app(ctx.fastmcp) + if app is not None and getattr(app, "context", None) is not None: + # Set on global app context so the logger can access it + # Previously captured; no need to keep old value + # Use direct assignment for Pydantic model + app.context.upstream_session = session + return + else: + return + + def _resolve_workflows_and_context( ctx: MCPContext, ) -> Tuple[Dict[str, Type["Workflow"]] | None, Optional["Context"]]: @@ -123,7 +152,13 @@ def _resolve_workflows_and_context( # Fall back to app attached to FastMCP app: MCPApp | None = _get_attached_app(ctx.fastmcp) + if app is not None: + # Ensure the app context has the current request's session set so background logs forward + try: + _set_upstream_from_request_ctx_if_available(ctx) + except Exception: + pass return app.workflows, app.context return None, None @@ -160,29 +195,57 @@ def _get_param_source_function_from_workflow(workflow_cls: Type["Workflow"]): def _build_run_param_tool(workflow_cls: Type["Workflow"]) -> FastTool: - """Return a FastTool built from the proper parameter source, skipping 'self'.""" + """Return a FastTool for schema purposes, filtering internals like 'self', 'app_ctx', and FastMCP Context.""" param_source = _get_param_source_function_from_workflow(workflow_cls) import inspect as _inspect - if param_source is getattr(workflow_cls, "run"): - + def _make_filtered_schema_proxy(fn): def _schema_fn_proxy(*args, **kwargs): return None - sig = _inspect.signature(param_source) + sig = _inspect.signature(fn) params = list(sig.parameters.values()) + + # Drop leading 'self' if present if params and params[0].name == "self": params = params[1:] - _schema_fn_proxy.__annotations__ = dict( - getattr(param_source, "__annotations__", {}) - ) - if "self" in _schema_fn_proxy.__annotations__: - _schema_fn_proxy.__annotations__.pop("self", None) + + # Drop internal-only params: app_ctx and any FastMCP Context (ctx/context) + try: + from mcp.server.fastmcp import Context as _Ctx # type: ignore + except Exception: + _Ctx = None # type: ignore + + filtered_params = [] + for p in params: + if p.name == "app_ctx": + continue + if p.name in ("ctx", "context"): + continue + ann = p.annotation + if ann is not _inspect._empty and _Ctx is not None and ann is _Ctx: + continue + filtered_params.append(p) + + # Copy annotations and remove filtered keys + ann_map = dict(getattr(fn, "__annotations__", {})) + for k in ["self", "app_ctx", "ctx", "context"]: + if k in ann_map: + ann_map.pop(k, None) + + _schema_fn_proxy.__annotations__ = ann_map _schema_fn_proxy.__signature__ = _inspect.Signature( - parameters=params, return_annotation=sig.return_annotation + parameters=filtered_params, return_annotation=sig.return_annotation ) - return FastTool.from_function(_schema_fn_proxy) - return FastTool.from_function(param_source) + return _schema_fn_proxy + + # If using run method, filter and drop 'self' + if param_source is getattr(workflow_cls, "run"): + return FastTool.from_function(_make_filtered_schema_proxy(param_source)) + + # Otherwise, param_source is likely the original function from @app.tool/@app.async_tool + # Filter out app_ctx/ctx/context from the schema + return FastTool.from_function(_make_filtered_schema_proxy(param_source)) def create_mcp_server_for_app(app: MCPApp, **kwargs: Any) -> FastMCP: @@ -250,6 +313,25 @@ async def app_specific_lifespan(mcp: FastMCP) -> AsyncIterator[ServerContext]: app.mcp = mcp setattr(mcp, "_mcp_agent_app", app) + # Register logging/setLevel handler so client can adjust verbosity dynamically + # This enables MCP logging capability in InitializeResult.capabilities.logging + lowlevel_server = getattr(mcp, "_mcp_server", None) + try: + if lowlevel_server is not None: + + @lowlevel_server.set_logging_level() + async def _set_level( + level: str, + ) -> None: # mcp.types.LoggingLevel is a Literal[str] + try: + LoggingConfig.set_min_level(level) + except Exception: + # Best-effort, do not crash server on invalid level + pass + except Exception: + # If handler registration fails, continue without dynamic level updates + pass + # region Workflow Tools @mcp.tool(name="workflows-list") @@ -259,18 +341,32 @@ def list_workflows(ctx: MCPContext) -> Dict[str, Dict[str, Any]]: Returns information about each workflow type including name, description, and parameters. This helps in making an informed decision about which workflow to run. """ + # Ensure upstream session is set for any logs emitted during this call + try: + _set_upstream_from_request_ctx_if_available(ctx) + except Exception: + pass result: Dict[str, Dict[str, Any]] = {} workflows, _ = _resolve_workflows_and_context(ctx) workflows = workflows or {} for workflow_name, workflow_cls in workflows.items(): - # Get workflow documentation - run_fn_tool = FastTool.from_function(workflow_cls.run) - - # Define common endpoints for all workflows - endpoints = [ - f"workflows-{workflow_name}-run", - f"workflows-{workflow_name}-get_status", - ] + # Determine parameter schema (strip self / prefer original function) + run_fn_tool = _build_run_param_tool(workflow_cls) + + # Determine endpoints based on whether this is an auto sync/async tool + if getattr(workflow_cls, "__mcp_agent_sync_tool__", False): + endpoints = [ + f"{workflow_name}", + ] + elif getattr(workflow_cls, "__mcp_agent_async_tool__", False): + endpoints = [ + f"{workflow_name}", + ] + else: + endpoints = [ + f"workflows-{workflow_name}-run", + f"workflows-{workflow_name}-get_status", + ] result[workflow_name] = { "name": workflow_name, @@ -294,6 +390,12 @@ async def list_workflow_runs(ctx: MCPContext) -> List[Dict[str, Any]]: Returns: A dictionary mapping workflow instance IDs to their detailed status information. """ + # Ensure upstream session is set for any logs emitted during this call + try: + _set_upstream_from_request_ctx_if_available(ctx) + except Exception: + pass + server_context = getattr( ctx.request_context, "lifespan_context", None ) or _get_attached_server_context(ctx.fastmcp) @@ -326,11 +428,16 @@ async def run_workflow( A dict with workflow_id and run_id for the started workflow run, can be passed to workflows/get_status, workflows/resume, and workflows/cancel. """ + # Ensure upstream session is set before starting the workflow + try: + _set_upstream_from_request_ctx_if_available(ctx) + except Exception: + pass return await _workflow_run(ctx, workflow_name, run_parameters, **kwargs) @mcp.tool(name="workflows-get_status") async def get_workflow_status( - ctx: MCPContext, workflow_name: str, run_id: str + ctx: MCPContext, run_id: str, workflow_id: str | None = None ) -> Dict[str, Any]: """ Get the status of a running workflow. @@ -339,14 +446,20 @@ async def get_workflow_status( whether it's running or completed, and any results or errors encountered. Args: - workflow_name: The name of the workflow to check. - run_id: The ID of the workflow instance to check, + run_id: The run ID of the workflow to check. + workflow_id: Optional workflow identifier (usually the tool/workflow name). + If omitted, the server will infer it from the run metadata when possible. received from workflows/run or workflows/runs/list. Returns: A dictionary with comprehensive information about the workflow status. """ - return await _workflow_status(ctx, run_id, workflow_name) + # Ensure upstream session is available for any status-related logs + try: + _set_upstream_from_request_ctx_if_available(ctx) + except Exception: + pass + return await _workflow_status(ctx, run_id=run_id, workflow_name=workflow_id) @mcp.tool(name="workflows-resume") async def resume_workflow( @@ -373,6 +486,11 @@ async def resume_workflow( Returns: True if the workflow was resumed, False otherwise. """ + # Ensure upstream session is available for any status-related logs + try: + _set_upstream_from_request_ctx_if_available(ctx) + except Exception: + pass server_context: ServerContext = ctx.request_context.lifespan_context workflow_registry = server_context.workflow_registry @@ -415,6 +533,11 @@ async def cancel_workflow( Returns: True if the workflow was cancelled, False otherwise. """ + # Ensure upstream session is available for any status-related logs + try: + _set_upstream_from_request_ctx_if_available(ctx) + except Exception: + pass server_context: ServerContext = ctx.request_context.lifespan_context workflow_registry = server_context.workflow_registry @@ -498,40 +621,76 @@ def create_declared_function_tools(mcp: FastMCP, server_context: ServerContext): # Utility: build a wrapper function with the same signature and return annotation import inspect import asyncio + import time async def _wait_for_completion( - ctx: MCPContext, run_id: str, timeout: float | None = None + ctx: MCPContext, + run_id: str, + *, + workflow_name: str | None = None, + timeout: float | None = None, + registration_grace: float = 1.0, + poll_initial: float = 0.05, + poll_max: float = 1.0, ): registry = _resolve_workflow_registry(ctx) if not registry: raise ToolError("Workflow registry not found for MCPApp Server.") - # Try to get the workflow and wait on its task if available - start = asyncio.get_event_loop().time() - # Ensure the workflow is registered locally to retrieve the task + + DEFAULT_SYNC_TOOL_TIMEOUT = 120.0 + overall_timeout = timeout or DEFAULT_SYNC_TOOL_TIMEOUT + deadline = time.monotonic() + overall_timeout + + def remaining() -> float: + return max(0.0, deadline - time.monotonic()) + + async def _await_task(task: asyncio.Task): + return await asyncio.wait_for(task, timeout=remaining()) + + # Fast path: immediate local task try: - wf = await registry.get_workflow(run_id) - if wf is None and hasattr(registry, "register"): - # Best-effort: some registries need explicit register; try to find by status - # and skip if unavailable. This is a no-op for InMemory which registers at run_async. - pass - except Exception: - pass - while True: wf = await registry.get_workflow(run_id) if wf is not None: task = getattr(wf, "_run_task", None) if isinstance(task, asyncio.Task): - return await asyncio.wait_for(task, timeout=timeout) - # Fallback to polling the status - status = await wf.get_status() - if status.get("completed"): - return status.get("result") - if ( - timeout is not None - and (asyncio.get_event_loop().time() - start) > timeout - ): + return await _await_task(task) + except Exception: + pass + + # Short grace window for registration + sleep = poll_initial + grace_deadline = time.monotonic() + registration_grace + while time.monotonic() < grace_deadline and remaining() > 0: + try: + wf = await registry.get_workflow(run_id) + if wf is not None: + task = getattr(wf, "_run_task", None) + if isinstance(task, asyncio.Task): + return await _await_task(task) + except Exception: + pass + await asyncio.sleep(sleep) + sleep = min(poll_max, sleep * 1.5) + + # Fallback: status polling (works for external/temporal engines) + sleep = poll_initial + while True: + if remaining() <= 0: raise ToolError("Timed out waiting for workflow completion") - await asyncio.sleep(0.1) + + status = await _workflow_status(ctx, run_id, workflow_name) + s = str( + status.get("status") or (status.get("state") or {}).get("status") or "" + ).lower() + + if s in {"completed", "error", "cancelled"}: + if s == "completed": + return status.get("result") + err = status.get("error") or status + raise ToolError(f"Workflow ended with status={s}: {err}") + + await asyncio.sleep(sleep) + sleep = min(poll_max, sleep * 2.0) for decl in declared: name = decl["name"] @@ -543,44 +702,53 @@ async def _wait_for_completion( description = decl.get("description") structured_output = decl.get("structured_output") + # Bind per-iteration values to avoid late-binding closure bugs + name_local = name + wname_local = workflow_name + if mode == "sync" and fn is not None: sig = inspect.signature(fn) return_ann = sig.return_annotation - async def _wrapper(**kwargs): - # Context will be injected by FastMCP using the special annotation below - ctx: MCPContext = kwargs.pop( - "__context__" - ) # placeholder, reassigned below via signature name - # Start workflow and wait for completion - result_ids = await _workflow_run(ctx, workflow_name, kwargs) - run_id = result_ids["run_id"] - result = await _wait_for_completion(ctx, run_id) - # Unwrap WorkflowResult to match the original function's return type - try: - from mcp_agent.executor.workflow import WorkflowResult as _WFRes - except Exception: - _WFRes = None # type: ignore - if _WFRes is not None and isinstance(result, _WFRes): - return getattr(result, "value", None) - # If get_status returned dict/str, pass through; otherwise return model - return result + def _make_wrapper(bound_wname: str): + async def _wrapper(**kwargs): + ctx: MCPContext = kwargs.pop("__context__") + result_ids = await _workflow_run(ctx, bound_wname, kwargs) + run_id = result_ids["run_id"] + result = await _wait_for_completion( + ctx, run_id, workflow_name=bound_wname + ) + try: + from mcp_agent.executor.workflow import WorkflowResult as _WFRes + except Exception: + _WFRes = None # type: ignore + if _WFRes is not None and isinstance(result, _WFRes): + return getattr(result, "value", None) + # If status payload returned a dict that looks like WorkflowResult, unwrap safely via 'kind' + if ( + isinstance(result, dict) + and result.get("kind") == "workflow_result" + ): + return result.get("value") + return result + + return _wrapper + + _wrapper = _make_wrapper(wname_local) - # Attach introspection metadata to match the original function ann = dict(getattr(fn, "__annotations__", {})) + ann.pop("app_ctx", None) - # Choose a context kwarg name unlikely to clash with user params ctx_param_name = "ctx" from mcp.server.fastmcp import Context as _Ctx ann[ctx_param_name] = _Ctx ann["return"] = getattr(fn, "__annotations__", {}).get("return", return_ann) _wrapper.__annotations__ = ann - _wrapper.__name__ = name + _wrapper.__name__ = name_local _wrapper.__doc__ = description or (fn.__doc__ or "") - # Build a fake signature containing original params plus context kwarg - params = list(sig.parameters.values()) + params = [p for p in sig.parameters.values() if p.name != "app_ctx"] ctx_param = inspect.Parameter( ctx_param_name, kind=inspect.Parameter.KEYWORD_ONLY, @@ -590,68 +758,143 @@ async def _wrapper(**kwargs): parameters=params + [ctx_param], return_annotation=return_ann ) - # FastMCP expects the actual kwarg name for context; it detects it by annotation - # We need to map the injected kwarg inside the wrapper body. Achieve this by - # creating a thin adapter that renames the injected context kwarg. - async def _adapter(**kw): - # Receive validated args plus injected context kwarg - if ctx_param_name not in kw: - raise ToolError("Context not provided") - # Rename to the placeholder expected by _wrapper - kw["__context__"] = kw.pop(ctx_param_name) - return await _wrapper(**kw) - - # Copy the visible signature/annotations to adapter for correct schema - _adapter.__annotations__ = _wrapper.__annotations__ - _adapter.__name__ = _wrapper.__name__ - _adapter.__doc__ = _wrapper.__doc__ - _adapter.__signature__ = _wrapper.__signature__ - - # Register the main tool with the same signature as original + def _make_adapter(context_param_name: str, inner_wrapper): + async def _adapter(**kw): + if context_param_name not in kw: + raise ToolError("Context not provided") + _ctx_obj = kw.get(context_param_name) + if _ctx_obj is not None: + try: + _set_upstream_from_request_ctx_if_available(_ctx_obj) + except Exception: + pass + kw["__context__"] = kw.pop(context_param_name) + return await inner_wrapper(**kw) + + _adapter.__annotations__ = _wrapper.__annotations__ + _adapter.__name__ = _wrapper.__name__ + _adapter.__doc__ = _wrapper.__doc__ + _adapter.__signature__ = _wrapper.__signature__ + return _adapter + + _adapter = _make_adapter(ctx_param_name, _wrapper) + mcp.add_tool( _adapter, - name=name, + name=name_local, description=description or (fn.__doc__ or ""), structured_output=structured_output, ) - registered.add(name) - - # Also register a per-run status tool: -get_status - status_tool_name = f"{name}-get_status" - if status_tool_name not in registered: - - @mcp.tool(name=status_tool_name) - async def _sync_status(ctx: MCPContext, run_id: str) -> Dict[str, Any]: - return await _workflow_status( - ctx, run_id=run_id, workflow_name=workflow_name - ) - - registered.add(status_tool_name) + registered.add(name_local) elif mode == "async": - # Create named aliases for async: -async-run and -get_status - run_tool_name = f"{name}-async-run" - status_tool_name = f"{name}-get_status" + # Use the declared name as the async run endpoint + run_tool_name = f"{name_local}" if run_tool_name not in registered: + # Build a wrapper mirroring original function params (excluding app_ctx/ctx) + async def _async_wrapper(**kwargs): + ctx: MCPContext = kwargs.pop("__context__") + # Start workflow and return workflow_id/run_id (do not wait) + return await _workflow_run(ctx, wname_local, kwargs) + + # Mirror original signature and annotations similar to sync path + ann = dict(getattr(fn, "__annotations__", {})) + ann.pop("app_ctx", None) + try: + from mcp.server.fastmcp import Context as _Ctx + except Exception: + _Ctx = None # type: ignore + + # Choose context kw-only parameter + ctx_param_name = "ctx" + if _Ctx is not None: + ann[ctx_param_name] = _Ctx + + # Async run returns workflow_id/run_id + from typing import Dict as _Dict # type: ignore + + ann["return"] = _Dict[str, str] + _async_wrapper.__annotations__ = ann + _async_wrapper.__name__ = run_tool_name + + # Description: original docstring + async note + base_desc = description or (fn.__doc__ or "") + async_note = ( + f"\n\nThis tool starts the '{wname_local}' workflow asynchronously and returns " + "'workflow_id' and 'run_id'. Use the 'workflows-get_status' tool " + "with the returned 'workflow_id' and the returned " + "'run_id' to retrieve status/results." + ) + full_desc = (base_desc or "").strip() + async_note + _async_wrapper.__doc__ = full_desc - @mcp.tool(name=run_tool_name) - async def _alias_run( - ctx: MCPContext, run_parameters: Dict[str, Any] | None = None - ) -> Dict[str, str]: - return await _workflow_run(ctx, workflow_name, run_parameters or {}) - - registered.add(run_tool_name) - - if status_tool_name not in registered: - - @mcp.tool(name=status_tool_name) - async def _alias_status(ctx: MCPContext, run_id: str) -> Dict[str, Any]: - return await _workflow_status( - ctx, run_id=run_id, workflow_name=workflow_name + # Build mirrored signature: drop app_ctx and any FastMCP Context params + params = [] + try: + sig_async = inspect.signature(fn) + for p in sig_async.parameters.values(): + if p.name == "app_ctx": + continue + if p.name in ("ctx", "context"): + continue + if ( + _Ctx is not None + and p.annotation is not inspect._empty + and p.annotation is _Ctx + ): + continue + params.append(p) + except Exception: + params = [] + + # Append kw-only context param + if _Ctx is not None: + ctx_param = inspect.Parameter( + ctx_param_name, + kind=inspect.Parameter.KEYWORD_ONLY, + annotation=_Ctx, ) + else: + ctx_param = inspect.Parameter( + ctx_param_name, + kind=inspect.Parameter.KEYWORD_ONLY, + ) + + _async_wrapper.__signature__ = inspect.Signature( + parameters=params + [ctx_param], return_annotation=ann.get("return") + ) - registered.add(status_tool_name) + # Adapter to map injected FastMCP context kwarg and set upstream + def _make_async_adapter(context_param_name: str, inner_wrapper): + async def _adapter(**kw): + if context_param_name not in kw: + raise ToolError("Context not provided") + _ctx_obj = kw.get(context_param_name) + if _ctx_obj is not None: + try: + _set_upstream_from_request_ctx_if_available(_ctx_obj) + except Exception: + pass + kw["__context__"] = kw.pop(context_param_name) + return await inner_wrapper(**kw) + + _adapter.__annotations__ = _async_wrapper.__annotations__ + _adapter.__name__ = _async_wrapper.__name__ + _adapter.__doc__ = _async_wrapper.__doc__ + _adapter.__signature__ = _async_wrapper.__signature__ + return _adapter + + _async_adapter = _make_async_adapter(ctx_param_name, _async_wrapper) + + # Register the async run tool + mcp.add_tool( + _async_adapter, + name=run_tool_name, + description=full_desc, + structured_output=False, + ) + registered.add(run_tool_name) _set_registered_function_tools(mcp, registered) @@ -705,6 +948,7 @@ async def run( ctx: MCPContext, run_parameters: Dict[str, Any] | None = None, ) -> Dict[str, str]: + _set_upstream_from_request_ctx_if_available(ctx) return await _workflow_run(ctx, workflow_name, run_parameters) @mcp.tool( @@ -717,6 +961,7 @@ async def run( """, ) async def get_status(ctx: MCPContext, run_id: str) -> Dict[str, Any]: + _set_upstream_from_request_ctx_if_available(ctx) return await _workflow_status(ctx, run_id=run_id, workflow_name=workflow_name) @@ -772,6 +1017,7 @@ async def _workflow_run( **kwargs: Any, ) -> Dict[str, str]: # Resolve workflows and app context irrespective of startup mode + # This now returns a context with upstream_session already set workflows_dict, app_context = _resolve_workflows_and_context(ctx) if not workflows_dict or not app_context: raise ToolError("Server context not available for MCPApp Server.") @@ -782,9 +1028,20 @@ async def _workflow_run( # Get the workflow class workflow_cls = workflows_dict[workflow_name] + # Bind the app-level logger (cached) to this per-request context so logs + # emitted from AutoWorkflow path forward upstream even outside request_ctx. + try: + app = _get_attached_app(ctx.fastmcp) + if app is not None and getattr(app, "name", None): + from mcp_agent.logging.logger import get_logger as _get_logger + + _get_logger(f"mcp_agent.{app.name}", context=app_context) + except Exception: + pass + # Create and initialize the workflow instance using the factory method try: - # Create workflow instance + # Create workflow instance with context that has upstream_session workflow = await workflow_cls.create(name=workflow_name, context=app_context) run_parameters = run_parameters or {} @@ -819,6 +1076,11 @@ async def _workflow_run( async def _workflow_status( ctx: MCPContext, run_id: str, workflow_name: str | None = None ) -> Dict[str, Any]: + # Ensure upstream session so status-related logs are forwarded + try: + _set_upstream_from_request_ctx_if_available(ctx) + except Exception: + pass workflow_registry: WorkflowRegistry | None = _resolve_workflow_registry(ctx) if not workflow_registry: diff --git a/src/mcp_agent/workflows/llm/augmented_llm_azure.py b/src/mcp_agent/workflows/llm/augmented_llm_azure.py index 6c272aec5..726fdcf20 100644 --- a/src/mcp_agent/workflows/llm/augmented_llm_azure.py +++ b/src/mcp_agent/workflows/llm/augmented_llm_azure.py @@ -194,7 +194,7 @@ async def generate(self, message, request_params: RequestParams | None = None): "stop": params.stopSequences, "tools": tools, } - + # Add user parameter if present in params or config user = params.user or getattr(self.context.config.azure, "user", None) if user: diff --git a/src/mcp_agent/workflows/llm/augmented_llm_ollama.py b/src/mcp_agent/workflows/llm/augmented_llm_ollama.py index a77a50db4..df0dd289d 100644 --- a/src/mcp_agent/workflows/llm/augmented_llm_ollama.py +++ b/src/mcp_agent/workflows/llm/augmented_llm_ollama.py @@ -104,9 +104,8 @@ async def request_structured_completion_task( api_key=request.config.api_key, base_url=request.config.base_url, http_client=request.config.http_client - if hasattr(request.config, "http_client") + if hasattr(request.config, "http_client") else None, - ) as async_client: client = instructor.from_openai( async_client, diff --git a/tests/logging/test_upstream_logging.py b/tests/logging/test_upstream_logging.py new file mode 100644 index 000000000..75263fddb --- /dev/null +++ b/tests/logging/test_upstream_logging.py @@ -0,0 +1,72 @@ +import asyncio +import pytest + +from types import SimpleNamespace + +from mcp_agent.logging.logger import LoggingConfig, get_logger +from mcp_agent.logging.events import EventFilter +from mcp_agent.logging.transport import AsyncEventBus + + +class DummyUpstreamSession: + def __init__(self): + self.calls = [] + + async def send_log_message(self, level, data, logger, related_request_id=None): + self.calls.append( + { + "level": level, + "data": data, + "logger": logger, + "related_request_id": related_request_id, + } + ) + + +@pytest.mark.asyncio +async def test_upstream_logging_listener_sends_notifications(monkeypatch): + # Ensure clean bus state + AsyncEventBus.reset() + + dummy_session = DummyUpstreamSession() + + # Configure logging with low threshold so our event passes + await LoggingConfig.configure(event_filter=EventFilter(min_level="debug")) + + try: + # Bind a context carrying upstream_session directly to the logger + ctx_with_upstream = SimpleNamespace(upstream_session=dummy_session) + logger = get_logger("tests.logging", context=ctx_with_upstream) + logger.info("hello world", name="unit", foo="bar") + + # Give the async bus a moment to process + await asyncio.sleep(0.05) + + assert len(dummy_session.calls) >= 1 + call = dummy_session.calls[-1] + assert call["level"] in ("info", "debug", "warning", "error") + assert call["logger"].startswith("tests.logging") + # Ensure our message and custom data are included + data = call["data"] + assert data.get("message") == "hello world" + assert data.get("data", {}).get("foo") == "bar" + finally: + await LoggingConfig.shutdown() + AsyncEventBus.reset() + + +@pytest.mark.asyncio +async def test_logging_capability_registered_in_fastmcp(): + # Import here to avoid heavy imports at module import time + from mcp_agent.app import MCPApp + from mcp_agent.server.app_server import create_mcp_server_for_app + import mcp.types as types + + app = MCPApp(name="test_app") + mcp = create_mcp_server_for_app(app) + + low = getattr(mcp, "_mcp_server", None) + assert low is not None + + # The presence of a SetLevelRequest handler indicates logging capability will be advertised + assert types.SetLevelRequest in low.request_handlers diff --git a/tests/server/test_tool_decorators.py b/tests/server/test_tool_decorators.py index f39143fad..25206fe26 100644 --- a/tests/server/test_tool_decorators.py +++ b/tests/server/test_tool_decorators.py @@ -75,13 +75,12 @@ async def echo(text: str) -> str: create_workflow_tools(mcp, server_context) create_declared_function_tools(mcp, server_context) - # Verify tool names: sync tool and its status tool - decorated_names = {name for name, _ in mcp.decorated_tools} + # Verify tool names: only the sync tool endpoint is added + _decorated_names = {name for name, _ in mcp.decorated_tools} added_names = {name for name, *_ in mcp.added_tools} - # No workflows-* for sync tools; check echo and echo-get_status + # No workflows-* or per-tool get_status aliases for sync tools; check only echo assert "echo" in added_names # synchronous tool - assert "echo-get_status" in decorated_names # Execute the synchronous tool function and ensure it returns unwrapped value # Find the registered sync tool function @@ -127,10 +126,13 @@ async def long_task(x: int) -> str: create_declared_function_tools(mcp, server_context) decorated_names = {name for name, _ in mcp.decorated_tools} + added_names = {name for name, *_ in mcp.added_tools} - # async aliases only (we suppress workflows-* for async auto tools) - assert "long-async-run" in decorated_names - assert "long-get_status" in decorated_names + # We register the async tool under its given name via add_tool + assert "long" in added_names + # And we suppress workflows-* for async auto tools + assert "workflows-long-run" not in decorated_names + assert "workflows-long-get_status" not in decorated_names @pytest.mark.asyncio diff --git a/uv.lock b/uv.lock index d49cb3afc..218056688 100644 --- a/uv.lock +++ b/uv.lock @@ -2040,7 +2040,7 @@ wheels = [ [[package]] name = "mcp-agent" -version = "0.1.14" +version = "0.1.15" source = { editable = "." } dependencies = [ { name = "aiohttp" },