diff --git a/src/mcp_agent/app.py b/src/mcp_agent/app.py index 4294a4012..c4fb8f6c2 100644 --- a/src/mcp_agent/app.py +++ b/src/mcp_agent/app.py @@ -115,6 +115,17 @@ def __init__( self._model_selector = model_selector self._workflows: Dict[str, Type["Workflow"]] = {} # id to workflow class + # Deferred tool declarations to register with MCP server when available + # Each entry: { + # "name": str, + # "mode": "sync" | "async", + # "workflow_name": str, + # "workflow_cls": Type[Workflow], + # "tool_wrapper": Callable | None, + # "structured_output": bool | None, + # "description": str | None, + # } + self._declared_tools: list[dict[str, Any]] = [] self._logger = None self._context: Optional[Context] = None @@ -512,6 +523,146 @@ async def wrapper(*args, **kwargs): return wrapper + def _create_workflow_from_function( + self, + fn: Callable[..., Any], + *, + workflow_name: str, + description: str | None = None, + mark_sync_tool: bool = False, + ) -> Type: + """ + Create a Workflow subclass dynamically from a plain function. + + The generated workflow class will: + - Have `run` implemented to call the provided function + - Be decorated with engine-specific run decorators via workflow_run + - Expose the original function for parameter schema generation + """ + + import asyncio as _asyncio + from mcp_agent.executor.workflow import Workflow as _Workflow + + async def _invoke_target(*args, **kwargs): + # Support both async and sync callables + res = fn(*args, **kwargs) + if _asyncio.iscoroutine(res): + res = await res + + # Ensure WorkflowResult return type + try: + from mcp_agent.executor.workflow import ( + WorkflowResult as _WorkflowResult, + ) + except Exception: + _WorkflowResult = None # type: ignore[assignment] + + if _WorkflowResult is not None and not isinstance(res, _WorkflowResult): + return _WorkflowResult(value=res) + return res + + async def _run(self, *args, **kwargs): # type: ignore[no-redef] + return await _invoke_target(*args, **kwargs) + + # Decorate run with engine-specific decorator + decorated_run = self.workflow_run(_run) + + # Build the Workflow subclass dynamically + cls_dict: Dict[str, Any] = { + "__doc__": description or (fn.__doc__ or ""), + "run": decorated_run, + "__mcp_agent_param_source_fn__": fn, + } + if mark_sync_tool: + cls_dict["__mcp_agent_sync_tool__"] = True + else: + cls_dict["__mcp_agent_async_tool__"] = True + + auto_cls = type(f"AutoWorkflow_{workflow_name}", (_Workflow,), cls_dict) + + # Register with app (and apply engine-specific workflow decorator) + self.workflow(auto_cls, workflow_id=workflow_name) + return auto_cls + + def tool( + self, + name: str | None = None, + *, + description: str | None = None, + structured_output: bool | None = None, + ) -> Callable[[Callable[..., Any]], Callable[..., Any]]: + """ + Decorator to declare a synchronous MCP tool that runs via an auto-generated + Workflow and waits for completion before returning. + + Also registers an async Workflow under the same name so that run/get_status + endpoints are available. + """ + + def decorator(fn: Callable[..., Any]) -> Callable[..., Any]: + tool_name = name or fn.__name__ + # Construct the workflow from function + workflow_cls = self._create_workflow_from_function( + fn, + workflow_name=tool_name, + description=description, + mark_sync_tool=True, + ) + + # Defer tool registration until the MCP server is created + self._declared_tools.append( + { + "name": tool_name, + "mode": "sync", + "workflow_name": tool_name, + "workflow_cls": workflow_cls, + "source_fn": fn, + "structured_output": structured_output, + "description": description or (fn.__doc__ or ""), + } + ) + + return fn + + return decorator + + def async_tool( + self, + name: str | None = None, + *, + description: str | None = None, + ) -> Callable[[Callable[..., Any]], Callable[..., Any]]: + """ + Decorator to declare an asynchronous MCP tool. + + Creates a Workflow class from the function and registers it so that + the standard per-workflow tools (run/get_status) are exposed by the server. + """ + + def decorator(fn: Callable[..., Any]) -> Callable[..., Any]: + workflow_name = name or fn.__name__ + workflow_cls = self._create_workflow_from_function( + fn, + workflow_name=workflow_name, + description=description, + mark_sync_tool=False, + ) + # Defer alias tool registration for run/get_status + self._declared_tools.append( + { + "name": workflow_name, + "mode": "async", + "workflow_name": workflow_name, + "workflow_cls": workflow_cls, + "source_fn": fn, + "structured_output": None, + "description": description or (fn.__doc__ or ""), + } + ) + return fn + + return decorator + def workflow_task( self, name: str | None = None, diff --git a/src/mcp_agent/cli/cloud/main.py b/src/mcp_agent/cli/cloud/main.py index 4de6dd7a3..a883c7c46 100644 --- a/src/mcp_agent/cli/cloud/main.py +++ b/src/mcp_agent/cli/cloud/main.py @@ -14,7 +14,11 @@ from typer.core import TyperGroup from mcp_agent.cli.cloud.commands import configure_app, deploy_config, login -from mcp_agent.cli.cloud.commands.app import delete_app, get_app_status, list_app_workflows +from mcp_agent.cli.cloud.commands.app import ( + delete_app, + get_app_status, + list_app_workflows, +) from mcp_agent.cli.cloud.commands.apps import list_apps from mcp_agent.cli.cloud.commands.workflow import get_workflow_status from mcp_agent.cli.exceptions import CLIError diff --git a/src/mcp_agent/server/app_server.py b/src/mcp_agent/server/app_server.py index c3b5a6bb8..3751cbe1a 100644 --- a/src/mcp_agent/server/app_server.py +++ b/src/mcp_agent/server/app_server.py @@ -132,6 +132,12 @@ def _resolve_workflows_and_context( def _resolve_workflow_registry(ctx: MCPContext) -> WorkflowRegistry | None: """Resolve the workflow registry regardless of startup mode.""" lifespan_ctx = getattr(ctx.request_context, "lifespan_context", None) + # Prefer the underlying app context's registry if available + if lifespan_ctx is not None and hasattr(lifespan_ctx, "context"): + ctx_inner = getattr(lifespan_ctx, "context", None) + if ctx_inner is not None and hasattr(ctx_inner, "workflow_registry"): + return ctx_inner.workflow_registry + # Fallback: top-level lifespan registry if present if lifespan_ctx is not None and hasattr(lifespan_ctx, "workflow_registry"): return lifespan_ctx.workflow_registry @@ -142,6 +148,43 @@ def _resolve_workflow_registry(ctx: MCPContext) -> WorkflowRegistry | None: return None +def _get_param_source_function_from_workflow(workflow_cls: Type["Workflow"]): + """Return the function to use for parameter schema for a workflow's run. + + For auto-generated workflows from @app.tool/@app.async_tool, prefer the original + function that defined the parameters if available; fall back to the class run. + """ + return getattr(workflow_cls, "__mcp_agent_param_source_fn__", None) or getattr( + workflow_cls, "run" + ) + + +def _build_run_param_tool(workflow_cls: Type["Workflow"]) -> FastTool: + """Return a FastTool built from the proper parameter source, skipping 'self'.""" + param_source = _get_param_source_function_from_workflow(workflow_cls) + import inspect as _inspect + + if param_source is getattr(workflow_cls, "run"): + + def _schema_fn_proxy(*args, **kwargs): + return None + + sig = _inspect.signature(param_source) + params = list(sig.parameters.values()) + if params and params[0].name == "self": + params = params[1:] + _schema_fn_proxy.__annotations__ = dict( + getattr(param_source, "__annotations__", {}) + ) + if "self" in _schema_fn_proxy.__annotations__: + _schema_fn_proxy.__annotations__.pop("self", None) + _schema_fn_proxy.__signature__ = _inspect.Signature( + parameters=params, return_annotation=sig.return_annotation + ) + return FastTool.from_function(_schema_fn_proxy) + return FastTool.from_function(param_source) + + def create_mcp_server_for_app(app: MCPApp, **kwargs: Any) -> FastMCP: """ Create an MCP server for a given MCPApp instance. @@ -166,6 +209,8 @@ async def app_specific_lifespan(mcp: FastMCP) -> AsyncIterator[ServerContext]: # Register initial workflow tools when running with our managed lifespan create_workflow_tools(mcp, server_context) + # Register function-declared tools (from @app.tool/@app.async_tool) + create_declared_function_tools(mcp, server_context) try: yield server_context @@ -189,6 +234,8 @@ async def app_specific_lifespan(mcp: FastMCP) -> AsyncIterator[ServerContext]: # Register per-workflow tools create_workflow_tools(mcp, server_context) + # Register function-declared tools (from @app.tool/@app.async_tool) + create_declared_function_tools(mcp, server_context) else: mcp = FastMCP( name=app.name or "mcp_agent_server", @@ -403,6 +450,11 @@ def create_workflow_tools(mcp: FastMCP, server_context: ServerContext): registered_workflow_tools = _get_registered_workflow_tools(mcp) for workflow_name, workflow_cls in server_context.workflows.items(): + # Skip creating generic workflows-* tools for sync/async auto tools + if getattr(workflow_cls, "__mcp_agent_sync_tool__", False): + continue + if getattr(workflow_cls, "__mcp_agent_async_tool__", False): + continue if workflow_name not in registered_workflow_tools: create_workflow_specific_tools(mcp, workflow_name, workflow_cls) registered_workflow_tools.add(workflow_name) @@ -410,12 +462,229 @@ def create_workflow_tools(mcp: FastMCP, server_context: ServerContext): setattr(mcp, "_registered_workflow_tools", registered_workflow_tools) +def _get_registered_function_tools(mcp: FastMCP) -> Set[str]: + return getattr(mcp, "_registered_function_tools", set()) + + +def _set_registered_function_tools(mcp: FastMCP, tools: Set[str]): + setattr(mcp, "_registered_function_tools", tools) + + +def create_declared_function_tools(mcp: FastMCP, server_context: ServerContext): + """ + Register tools declared via @app.tool/@app.async_tool on the attached app. + - @app.tool registers a synchronous tool with the same signature as the function + that runs the auto-generated workflow and waits for completion. + - @app.async_tool registers alias tools -run and -get_status + that proxy to the workflow run/status utilities. + """ + app = _get_attached_app(mcp) + if app is None: + # Fallbacks for tests or externally provided contexts + app = getattr(server_context, "app", None) + if app is None: + ctx = getattr(server_context, "context", None) + if ctx is not None: + app = getattr(ctx, "app", None) + if app is None: + return + + declared = getattr(app, "_declared_tools", []) or [] + if not declared: + return + + registered = _get_registered_function_tools(mcp) + + # Utility: build a wrapper function with the same signature and return annotation + import inspect + import asyncio + + async def _wait_for_completion( + ctx: MCPContext, run_id: str, timeout: float | None = None + ): + registry = _resolve_workflow_registry(ctx) + if not registry: + raise ToolError("Workflow registry not found for MCPApp Server.") + # Try to get the workflow and wait on its task if available + start = asyncio.get_event_loop().time() + # Ensure the workflow is registered locally to retrieve the task + try: + wf = await registry.get_workflow(run_id) + if wf is None and hasattr(registry, "register"): + # Best-effort: some registries need explicit register; try to find by status + # and skip if unavailable. This is a no-op for InMemory which registers at run_async. + pass + except Exception: + pass + while True: + wf = await registry.get_workflow(run_id) + if wf is not None: + task = getattr(wf, "_run_task", None) + if isinstance(task, asyncio.Task): + return await asyncio.wait_for(task, timeout=timeout) + # Fallback to polling the status + status = await wf.get_status() + if status.get("completed"): + return status.get("result") + if ( + timeout is not None + and (asyncio.get_event_loop().time() - start) > timeout + ): + raise ToolError("Timed out waiting for workflow completion") + await asyncio.sleep(0.1) + + for decl in declared: + name = decl["name"] + if name in registered: + continue + mode = decl["mode"] + workflow_name = decl["workflow_name"] + fn = decl.get("source_fn") + description = decl.get("description") + structured_output = decl.get("structured_output") + + if mode == "sync" and fn is not None: + sig = inspect.signature(fn) + return_ann = sig.return_annotation + + async def _wrapper(**kwargs): + # Context will be injected by FastMCP using the special annotation below + ctx: MCPContext = kwargs.pop( + "__context__" + ) # placeholder, reassigned below via signature name + # Start workflow and wait for completion + result_ids = await _workflow_run(ctx, workflow_name, kwargs) + run_id = result_ids["run_id"] + result = await _wait_for_completion(ctx, run_id) + # Unwrap WorkflowResult to match the original function's return type + try: + from mcp_agent.executor.workflow import WorkflowResult as _WFRes + except Exception: + _WFRes = None # type: ignore + if _WFRes is not None and isinstance(result, _WFRes): + return getattr(result, "value", None) + # If get_status returned dict/str, pass through; otherwise return model + return result + + # Attach introspection metadata to match the original function + ann = dict(getattr(fn, "__annotations__", {})) + + # Choose a context kwarg name unlikely to clash with user params + ctx_param_name = "ctx" + from mcp.server.fastmcp import Context as _Ctx + + ann[ctx_param_name] = _Ctx + ann["return"] = getattr(fn, "__annotations__", {}).get("return", return_ann) + _wrapper.__annotations__ = ann + _wrapper.__name__ = name + _wrapper.__doc__ = description or (fn.__doc__ or "") + + # Build a fake signature containing original params plus context kwarg + params = list(sig.parameters.values()) + ctx_param = inspect.Parameter( + ctx_param_name, + kind=inspect.Parameter.KEYWORD_ONLY, + annotation=_Ctx, + ) + _wrapper.__signature__ = inspect.Signature( + parameters=params + [ctx_param], return_annotation=return_ann + ) + + # FastMCP expects the actual kwarg name for context; it detects it by annotation + # We need to map the injected kwarg inside the wrapper body. Achieve this by + # creating a thin adapter that renames the injected context kwarg. + async def _adapter(**kw): + # Receive validated args plus injected context kwarg + if ctx_param_name not in kw: + raise ToolError("Context not provided") + # Rename to the placeholder expected by _wrapper + kw["__context__"] = kw.pop(ctx_param_name) + return await _wrapper(**kw) + + # Copy the visible signature/annotations to adapter for correct schema + _adapter.__annotations__ = _wrapper.__annotations__ + _adapter.__name__ = _wrapper.__name__ + _adapter.__doc__ = _wrapper.__doc__ + _adapter.__signature__ = _wrapper.__signature__ + + # Register the main tool with the same signature as original + mcp.add_tool( + _adapter, + name=name, + description=description or (fn.__doc__ or ""), + structured_output=structured_output, + ) + registered.add(name) + + # Also register a per-run status tool: -get_status + status_tool_name = f"{name}-get_status" + if status_tool_name not in registered: + + @mcp.tool(name=status_tool_name) + async def _sync_status(ctx: MCPContext, run_id: str) -> Dict[str, Any]: + return await _workflow_status( + ctx, run_id=run_id, workflow_name=workflow_name + ) + + registered.add(status_tool_name) + + elif mode == "async": + # Create named aliases for async: -async-run and -get_status + run_tool_name = f"{name}-async-run" + status_tool_name = f"{name}-get_status" + + if run_tool_name not in registered: + + @mcp.tool(name=run_tool_name) + async def _alias_run( + ctx: MCPContext, run_parameters: Dict[str, Any] | None = None + ) -> Dict[str, str]: + return await _workflow_run(ctx, workflow_name, run_parameters or {}) + + registered.add(run_tool_name) + + if status_tool_name not in registered: + + @mcp.tool(name=status_tool_name) + async def _alias_status(ctx: MCPContext, run_id: str) -> Dict[str, Any]: + return await _workflow_status( + ctx, run_id=run_id, workflow_name=workflow_name + ) + + registered.add(status_tool_name) + + _set_registered_function_tools(mcp, registered) + + def create_workflow_specific_tools( mcp: FastMCP, workflow_name: str, workflow_cls: Type["Workflow"] ): """Create specific tools for a given workflow.""" - - run_fn_tool = FastTool.from_function(workflow_cls.run) + param_source = _get_param_source_function_from_workflow(workflow_cls) + # Ensure we don't include 'self' in tool schema; FastMCP will ignore Context but not 'self' + import inspect as _inspect + + if param_source is getattr(workflow_cls, "run"): + # Wrap to drop the first positional param (self) for schema purposes + def _schema_fn_proxy(*args, **kwargs): + return None + + sig = _inspect.signature(param_source) + params = list(sig.parameters.values()) + # remove leading 'self' if present + if params and params[0].name == "self": + params = params[1:] + _schema_fn_proxy.__annotations__ = dict( + getattr(param_source, "__annotations__", {}) + ) + if "self" in _schema_fn_proxy.__annotations__: + _schema_fn_proxy.__annotations__.pop("self", None) + _schema_fn_proxy.__signature__ = _inspect.Signature( + parameters=params, return_annotation=sig.return_annotation + ) + run_fn_tool = FastTool.from_function(_schema_fn_proxy) + else: + run_fn_tool = FastTool.from_function(param_source) run_fn_tool_params = json.dumps(run_fn_tool.parameters, indent=2) @mcp.tool( diff --git a/tests/cli/commands/test_app_delete.py b/tests/cli/commands/test_app_delete.py index e2dc544cd..4d4dfa463 100644 --- a/tests/cli/commands/test_app_delete.py +++ b/tests/cli/commands/test_app_delete.py @@ -1,4 +1,5 @@ """Tests for the configure command.""" + import datetime from unittest.mock import AsyncMock, MagicMock, patch @@ -8,7 +9,8 @@ from mcp_agent.cli.mcp_app.api_client import MCPApp, MCPAppConfiguration from mcp_agent.cli.mcp_app.mock_client import ( MOCK_APP_CONFIG_ID, - MOCK_APP_ID, MockMCPAppClient, + MOCK_APP_ID, + MockMCPAppClient, ) @@ -72,17 +74,13 @@ def test_delete_app(patched_delete_app, mock_mcp_client): app_id_or_url=MOCK_APP_ID, ) - patched_delete_app( - app_id_or_url=MOCK_APP_ID, - dry_run=False - ) + patched_delete_app(app_id_or_url=MOCK_APP_ID, dry_run=False) mock_mcp_client.delete_app.assert_called_once_with(MOCK_APP_ID) def test_delete_app_config(patched_delete_app, mock_mcp_client): app_config = MCPAppConfiguration( - appConfigurationId=MOCK_APP_CONFIG_ID, - creatorId="creator" + appConfigurationId=MOCK_APP_CONFIG_ID, creatorId="creator" ) mock_mcp_client.get_app_or_config = AsyncMock(return_value=app_config) @@ -91,12 +89,10 @@ def test_delete_app_config(patched_delete_app, mock_mcp_client): app_id_or_url=MOCK_APP_ID, ) - patched_delete_app( - app_id_or_url=MOCK_APP_ID, - dry_run=False - ) + patched_delete_app(app_id_or_url=MOCK_APP_ID, dry_run=False) mock_mcp_client.delete_app_configuration.assert_called_once_with(MOCK_APP_CONFIG_ID) + def test_missing_app_id(patched_delete_app): """Test with missing app_id.""" @@ -122,8 +118,8 @@ def test_missing_api_key(patched_delete_app): # Patch load_api_key_credentials to return None with patch( - "mcp_agent.cli.cloud.commands.configure.main.load_api_key_credentials", - return_value=None, + "mcp_agent.cli.cloud.commands.configure.main.load_api_key_credentials", + return_value=None, ): with pytest.raises(CLIError): patched_delete_app( diff --git a/tests/cli/commands/test_app_status.py b/tests/cli/commands/test_app_status.py index 70c570728..0d10f39da 100644 --- a/tests/cli/commands/test_app_status.py +++ b/tests/cli/commands/test_app_status.py @@ -1,4 +1,5 @@ """Tests for the configure command.""" + import datetime from unittest.mock import AsyncMock, MagicMock, patch, Mock @@ -10,7 +11,8 @@ from mcp_agent.cli.mcp_app.api_client import MCPApp, MCPAppConfiguration, AppServerInfo from mcp_agent.cli.mcp_app.mock_client import ( MOCK_APP_CONFIG_ID, - MOCK_APP_ID, MockMCPAppClient, + MOCK_APP_ID, + MockMCPAppClient, ) @@ -68,14 +70,14 @@ def test_status_app(patched_status_app, mock_mcp_client): creatorId="creatorId", createdAt=datetime.datetime.now(), updatedAt=datetime.datetime.now(), - appServerInfo=app_server_info + appServerInfo=app_server_info, ) mock_mcp_client.get_app_or_config = AsyncMock(return_value=app) mock_mcp_print_server_details = Mock() with patch( - "mcp_agent.cli.cloud.commands.app.status.main.print_mcp_server_details", - side_effect=mock_mcp_print_server_details + "mcp_agent.cli.cloud.commands.app.status.main.print_mcp_server_details", + side_effect=mock_mcp_print_server_details, ) as mocked_function: mock_mcp_print_server_details.return_value = None @@ -85,7 +87,9 @@ def test_status_app(patched_status_app, mock_mcp_client): api_key=settings.API_KEY, ) - mocked_function.assert_called_once_with(server_url=server_url, api_key=settings.API_KEY) + mocked_function.assert_called_once_with( + server_url=server_url, api_key=settings.API_KEY + ) def test_status_app_config(patched_status_app, mock_mcp_client): @@ -97,14 +101,14 @@ def test_status_app_config(patched_status_app, mock_mcp_client): app_config = MCPAppConfiguration( appConfigurationId=MOCK_APP_CONFIG_ID, creatorId="creator", - appServerInfo=app_server_info + appServerInfo=app_server_info, ) mock_mcp_client.get_app_or_config = AsyncMock(return_value=app_config) mock_mcp_print_server_details = Mock() with patch( - "mcp_agent.cli.cloud.commands.app.status.main.print_mcp_server_details", - side_effect=mock_mcp_print_server_details + "mcp_agent.cli.cloud.commands.app.status.main.print_mcp_server_details", + side_effect=mock_mcp_print_server_details, ) as mocked_function: mock_mcp_print_server_details.return_value = None @@ -114,7 +118,10 @@ def test_status_app_config(patched_status_app, mock_mcp_client): api_key=settings.API_KEY, ) - mocked_function.assert_called_once_with(server_url=server_url, api_key=settings.API_KEY) + mocked_function.assert_called_once_with( + server_url=server_url, api_key=settings.API_KEY + ) + def test_missing_app_id(patched_status_app): """Test with missing app_id.""" @@ -141,8 +148,8 @@ def test_missing_api_key(patched_status_app): # Patch load_api_key_credentials to return None with patch( - "mcp_agent.cli.cloud.commands.configure.main.load_api_key_credentials", - return_value=None, + "mcp_agent.cli.cloud.commands.configure.main.load_api_key_credentials", + return_value=None, ): with pytest.raises(CLIError): patched_status_app( diff --git a/tests/cli/commands/test_app_workflows.py b/tests/cli/commands/test_app_workflows.py index 5ad850b1b..15c578225 100644 --- a/tests/cli/commands/test_app_workflows.py +++ b/tests/cli/commands/test_app_workflows.py @@ -1,4 +1,5 @@ """Tests for the configure command.""" + import datetime from unittest.mock import AsyncMock, MagicMock, patch, Mock @@ -10,7 +11,8 @@ from mcp_agent.cli.mcp_app.api_client import MCPApp, MCPAppConfiguration, AppServerInfo from mcp_agent.cli.mcp_app.mock_client import ( MOCK_APP_CONFIG_ID, - MOCK_APP_ID, MockMCPAppClient, + MOCK_APP_ID, + MockMCPAppClient, ) @@ -68,14 +70,14 @@ def test_status_app(patched_workflows_app, mock_mcp_client): creatorId="creatorId", createdAt=datetime.datetime.now(), updatedAt=datetime.datetime.now(), - appServerInfo=app_server_info + appServerInfo=app_server_info, ) mock_mcp_client.get_app_or_config = AsyncMock(return_value=app) mock_mcp_print_mcp_server_workflow_details = Mock() with patch( - "mcp_agent.cli.cloud.commands.app.workflows.main.print_mcp_server_workflow_details", - side_effect=mock_mcp_print_mcp_server_workflow_details + "mcp_agent.cli.cloud.commands.app.workflows.main.print_mcp_server_workflow_details", + side_effect=mock_mcp_print_mcp_server_workflow_details, ) as mocked_function: mock_mcp_print_mcp_server_workflow_details.return_value = None @@ -85,7 +87,9 @@ def test_status_app(patched_workflows_app, mock_mcp_client): api_key=settings.API_KEY, ) - mocked_function.assert_called_once_with(server_url=server_url, api_key=settings.API_KEY) + mocked_function.assert_called_once_with( + server_url=server_url, api_key=settings.API_KEY + ) def test_status_app_config(patched_workflows_app, mock_mcp_client): @@ -97,14 +101,14 @@ def test_status_app_config(patched_workflows_app, mock_mcp_client): app_config = MCPAppConfiguration( appConfigurationId=MOCK_APP_CONFIG_ID, creatorId="creator", - appServerInfo=app_server_info + appServerInfo=app_server_info, ) mock_mcp_client.get_app_or_config = AsyncMock(return_value=app_config) mock_mcp_print_mcp_server_workflow_details = Mock() with patch( - "mcp_agent.cli.cloud.commands.app.workflows.main.print_mcp_server_workflow_details", - side_effect=mock_mcp_print_mcp_server_workflow_details + "mcp_agent.cli.cloud.commands.app.workflows.main.print_mcp_server_workflow_details", + side_effect=mock_mcp_print_mcp_server_workflow_details, ) as mocked_function: mock_mcp_print_mcp_server_workflow_details.return_value = None @@ -114,7 +118,10 @@ def test_status_app_config(patched_workflows_app, mock_mcp_client): api_key=settings.API_KEY, ) - mocked_function.assert_called_once_with(server_url=server_url, api_key=settings.API_KEY) + mocked_function.assert_called_once_with( + server_url=server_url, api_key=settings.API_KEY + ) + def test_missing_app_id(patched_workflows_app): """Test with missing app_id.""" @@ -141,8 +148,8 @@ def test_missing_api_key(patched_workflows_app): # Patch load_api_key_credentials to return None with patch( - "mcp_agent.cli.cloud.commands.configure.main.load_api_key_credentials", - return_value=None, + "mcp_agent.cli.cloud.commands.configure.main.load_api_key_credentials", + return_value=None, ): with pytest.raises(CLIError): patched_workflows_app( diff --git a/tests/cli/commands/test_cli_secrets.py b/tests/cli/commands/test_cli_secrets.py index 4e32bd7f4..6df6edf43 100644 --- a/tests/cli/commands/test_cli_secrets.py +++ b/tests/cli/commands/test_cli_secrets.py @@ -431,15 +431,12 @@ def test_cli_error_handling(mock_api_credentials): # Error message should mention the file doesn't exist combined_output = result.stderr + result.stdout # remove all lines, dashes, etc - ascii_text = re.sub(r'[^A-z0-9 .,-]+', ' ', combined_output) + ascii_text = re.sub(r"[^A-z0-9 .,-]+", " ", combined_output) # remove any remnants of colour codes - without_escape_codes = re.sub(r'\[\d+m', ' ', ascii_text) + without_escape_codes = re.sub(r"\[\d+m", " ", ascii_text) # normalize spaces and convert to lower case - clean_text = ' '.join(without_escape_codes.split()).lower() - assert ( - "does not exist" in clean_text - or "no such file" in clean_text - ) + clean_text = " ".join(without_escape_codes.split()).lower() + assert "does not exist" in clean_text or "no such file" in clean_text # Test with the secret value not having a tag cmd = [ @@ -464,7 +461,9 @@ def test_cli_error_handling(mock_api_credentials): # It should mention using the tags combined_output = result.stderr + result.stdout - clean_text = ' '.join(re.sub(r'[^\x00-\x7F]+', ' ', combined_output).split()).lower() + clean_text = " ".join( + re.sub(r"[^\x00-\x7F]+", " ", combined_output).split() + ).lower() assert ( "secrets must be tagged with !developer_secret or !user_secret" in clean_text diff --git a/tests/cli/commands/test_configure.py b/tests/cli/commands/test_configure.py index 95c13a4f3..90b9be93c 100644 --- a/tests/cli/commands/test_configure.py +++ b/tests/cli/commands/test_configure.py @@ -7,7 +7,8 @@ from mcp_agent.cli.exceptions import CLIError from mcp_agent.cli.mcp_app.mock_client import ( MOCK_APP_CONFIG_ID, - MOCK_APP_ID, MockMCPAppClient, + MOCK_APP_ID, + MockMCPAppClient, ) diff --git a/tests/cli/commands/test_deploy_command.py b/tests/cli/commands/test_deploy_command.py index ac1bbb208..2a28b3873 100644 --- a/tests/cli/commands/test_deploy_command.py +++ b/tests/cli/commands/test_deploy_command.py @@ -62,11 +62,11 @@ def test_deploy_command_help(runner): assert result.exit_code == 0 # remove all lines, dashes, etc - ascii_text = re.sub(r'[^A-z0-9.,-]+', '', result.stdout) + ascii_text = re.sub(r"[^A-z0-9.,-]+", "", result.stdout) # remove any remnants of colour codes - without_escape_codes = re.sub(r'\[[0-9 ]+m', '', ascii_text) + without_escape_codes = re.sub(r"\[[0-9 ]+m", "", ascii_text) # normalize spaces and convert to lower case - clean_text = ' '.join(without_escape_codes.split()).lower() + clean_text = " ".join(without_escape_codes.split()).lower() # Expected options from the updated CLAUDE.md spec assert "--config-dir" in clean_text or "-c" in clean_text @@ -131,7 +131,9 @@ async def mock_process_secrets(*args, **kwargs): def test_deploy_command_no_secrets(runner, temp_config_dir): """Test deploy command with --no-secrets flag when a secrets file DOES NOT exist.""" # Run with --no-secrets flag and --dry-run to avoid real deployment - with patch("mcp_agent.cli.cloud.commands.deploy.main.wrangler_deploy") as mock_deploy: + with patch( + "mcp_agent.cli.cloud.commands.deploy.main.wrangler_deploy" + ) as mock_deploy: # Mock the wrangler deployment mock_deploy.return_value = None @@ -162,7 +164,9 @@ def test_deploy_command_no_secrets(runner, temp_config_dir): def test_deploy_command_no_secrets_with_existing_secrets(runner, temp_config_dir): """Test deploy command with --no-secrets flag when a secrets file DOES exist.""" # Run with --no-secrets flag and --dry-run to avoid real deployment - with patch("mcp_agent.cli.cloud.commands.deploy.main.wrangler_deploy") as mock_deploy: + with patch( + "mcp_agent.cli.cloud.commands.deploy.main.wrangler_deploy" + ) as mock_deploy: # Mock the wrangler deployment mock_deploy.return_value = None @@ -292,7 +296,9 @@ def test_rollback_secrets_file(temp_config_dir): pre_deploy_secrets_content = f.read() # Call deploy_config with wrangler_deploy mocked - with patch("mcp_agent.cli.cloud.commands.deploy.main.wrangler_deploy") as mock_deploy: + with patch( + "mcp_agent.cli.cloud.commands.deploy.main.wrangler_deploy" + ) as mock_deploy: # Mock wrangler_deploy to prevent actual deployment mock_deploy.side_effect = Exception("Deployment failed") diff --git a/tests/cli/utils/jwt_generator.py b/tests/cli/utils/jwt_generator.py index 82c5c4e17..6202321e0 100644 --- a/tests/cli/utils/jwt_generator.py +++ b/tests/cli/utils/jwt_generator.py @@ -206,5 +206,6 @@ def generate_test_token(): expiry_days=365, ) + if __name__ == "__main__": main() diff --git a/tests/server/test_app_server_workflow_schema.py b/tests/server/test_app_server_workflow_schema.py new file mode 100644 index 000000000..05f387127 --- /dev/null +++ b/tests/server/test_app_server_workflow_schema.py @@ -0,0 +1,58 @@ +import pytest +from types import SimpleNamespace + +from mcp_agent.app import MCPApp +from mcp_agent.executor.workflow import Workflow, WorkflowResult +from mcp_agent.server.app_server import create_workflow_tools + + +class _ToolRecorder: + def __init__(self): + self.decorated = [] + + def tool(self, *args, **kwargs): + name = kwargs.get("name", args[0] if args else None) + + def _decorator(func): + self.decorated.append((name, func, kwargs)) + return func + + return _decorator + + +@pytest.mark.asyncio +async def test_workflow_run_schema_strips_self_and_uses_param_annotations(): + app = MCPApp(name="schema_app") + await app.initialize() + + @app.workflow + class MyWF(Workflow[str]): + """Doc for MyWF""" + + @app.workflow_run + async def run(self, q: int, flag: bool = False) -> WorkflowResult[str]: + return WorkflowResult(value=f"{q}:{flag}") + + mcp = _ToolRecorder() + server_context = SimpleNamespace(workflows=app.workflows, context=app.context) + + # This should create per-workflow tools; run tool must be built from run signature + create_workflow_tools(mcp, server_context) + + # Find the "workflows-MyWF-run" tool and inspect its parameters schema via FastMCP + names = [name for name, *_ in mcp.decorated] + assert "workflows-MyWF-run" in names + + # We can’t call FastTool.from_function here since the tool is already created inside create_workflow_tools, + # but we can at least ensure that the schema text embedded in the description JSON includes our parameters (q, flag) + # Description contains a pretty-printed JSON of parameters; locate and parse it + run_entry = next( + (entry for entry in mcp.decorated if entry[0] == "workflows-MyWF-run"), None + ) + assert run_entry is not None + _, _, kwargs = run_entry + desc = kwargs.get("description", "") + # The description embeds the JSON schema; assert basic fields are referenced + assert "q" in desc + assert "flag" in desc + assert "self" not in desc diff --git a/tests/server/test_tool_decorators.py b/tests/server/test_tool_decorators.py new file mode 100644 index 000000000..f39143fad --- /dev/null +++ b/tests/server/test_tool_decorators.py @@ -0,0 +1,173 @@ +import asyncio +import pytest + +from mcp_agent.app import MCPApp +from mcp_agent.server.app_server import ( + create_workflow_tools, + create_declared_function_tools, + _workflow_run, + _workflow_status, +) + + +class _ToolRecorder: + """Helper to record tools registered via FastMCP-like interface.""" + + def __init__(self): + self.decorated_tools = [] # via mcp.tool decorator (workflow endpoints) + self.added_tools = [] # via mcp.add_tool (sync @app.tool) + + def tool(self, *args, **kwargs): + name = kwargs.get("name", args[0] if args else None) + + def _decorator(func): + self.decorated_tools.append((name, func)) + return func + + return _decorator + + def add_tool( + self, + fn, + *, + name=None, + title=None, + description=None, + annotations=None, + structured_output=None, + ): + self.added_tools.append((name, fn, description, structured_output)) + + +def _make_ctx(server_context): + # Minimal fake MCPContext with request_context.lifespan_context + from types import SimpleNamespace + + ctx = SimpleNamespace() + # Ensure a workflow registry is available for status waits + if not hasattr(server_context, "workflow_registry"): + from mcp_agent.executor.workflow_registry import InMemoryWorkflowRegistry + + server_context.workflow_registry = InMemoryWorkflowRegistry() + + req = SimpleNamespace(lifespan_context=server_context) + ctx.request_context = req + ctx.fastmcp = SimpleNamespace(_mcp_agent_app=None) + return ctx + + +@pytest.mark.asyncio +async def test_app_tool_registers_and_executes_sync_tool(): + app = MCPApp(name="test_app_tool") + await app.initialize() + + @app.tool(name="echo", description="Echo input") + async def echo(text: str) -> str: + return text + "!" + + # Prepare mock FastMCP and server context + mcp = _ToolRecorder() + server_context = type( + "SC", (), {"workflows": app.workflows, "context": app.context} + )() + + # Register generated per-workflow tools and function-declared tools + create_workflow_tools(mcp, server_context) + create_declared_function_tools(mcp, server_context) + + # Verify tool names: sync tool and its status tool + decorated_names = {name for name, _ in mcp.decorated_tools} + added_names = {name for name, *_ in mcp.added_tools} + + # No workflows-* for sync tools; check echo and echo-get_status + assert "echo" in added_names # synchronous tool + assert "echo-get_status" in decorated_names + + # Execute the synchronous tool function and ensure it returns unwrapped value + # Find the registered sync tool function + sync_tool_fn = next(fn for name, fn, *_ in mcp.added_tools if name == "echo") + ctx = _make_ctx(server_context) + result = await sync_tool_fn(text="hi", ctx=ctx) + assert result == "hi!" # unwrapped (not WorkflowResult) + + # Also ensure the underlying workflow returned a WorkflowResult + # Start via workflow_run to get run_id, then wait for completion and inspect + run_info = await _workflow_run(ctx, "echo", {"text": "ok"}) + run_id = run_info["run_id"] + # Poll status until completed (bounded wait) + for _ in range(200): + status = await _workflow_status(ctx, run_id, "echo") + if status.get("completed"): + break + await asyncio.sleep(0.01) + assert status.get("completed") is True + # The recorded result is a WorkflowResult model dump; check value field + result_payload = status.get("result") + if isinstance(result_payload, dict) and "value" in result_payload: + assert result_payload["value"] == "ok!" + else: + assert result_payload in ("ok!", {"result": "ok!"}) + + +@pytest.mark.asyncio +async def test_app_async_tool_registers_aliases_and_workflow_tools(): + app = MCPApp(name="test_app_async_tool") + await app.initialize() + + @app.async_tool(name="long") + async def long_task(x: int) -> str: + return f"done:{x}" + + mcp = _ToolRecorder() + server_context = type( + "SC", (), {"workflows": app.workflows, "context": app.context} + )() + + create_workflow_tools(mcp, server_context) + create_declared_function_tools(mcp, server_context) + + decorated_names = {name for name, _ in mcp.decorated_tools} + + # async aliases only (we suppress workflows-* for async auto tools) + assert "long-async-run" in decorated_names + assert "long-get_status" in decorated_names + + +@pytest.mark.asyncio +async def test_auto_workflow_wraps_plain_return_in_workflowresult(): + app = MCPApp(name="test_wrap") + await app.initialize() + + @app.async_tool(name="wrapme") + async def wrapme(v: int) -> int: + # plain int, should be wrapped inside WorkflowResult internally + return v + 1 + + mcp = _ToolRecorder() + server_context = type( + "SC", (), {"workflows": app.workflows, "context": app.context} + )() + create_workflow_tools(mcp, server_context) + create_declared_function_tools(mcp, server_context) + + ctx = _make_ctx(server_context) + run_info = await _workflow_run(ctx, "wrapme", {"v": 41}) + run_id = run_info["run_id"] + + # Inspect workflow's task result type by polling status for completion + for _ in range(100): + status = await _workflow_status(ctx, run_id, "wrapme") + if status.get("completed"): + break + await asyncio.sleep(0.01) + assert status.get("completed") is True + + # Cross-check that the underlying run returned a WorkflowResult by re-running via registry path + # We can't import the internal task here; assert observable effect: result equals expected and no exceptions + assert status.get("error") in (None, "") + # And the computed value was correct + result_payload = status.get("result") + if isinstance(result_payload, dict) and "value" in result_payload: + assert result_payload["value"] == 42 + else: + assert result_payload in (42, {"result": 42})