Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
151 changes: 151 additions & 0 deletions src/mcp_agent/app.py
Original file line number Diff line number Diff line change
Expand Up @@ -115,6 +115,17 @@ def __init__(
self._model_selector = model_selector

self._workflows: Dict[str, Type["Workflow"]] = {} # id to workflow class
# Deferred tool declarations to register with MCP server when available
# Each entry: {
# "name": str,
# "mode": "sync" | "async",
# "workflow_name": str,
# "workflow_cls": Type[Workflow],
# "tool_wrapper": Callable | None,
# "structured_output": bool | None,
# "description": str | None,
# }
self._declared_tools: list[dict[str, Any]] = []

self._logger = None
self._context: Optional[Context] = None
Expand Down Expand Up @@ -512,6 +523,146 @@ async def wrapper(*args, **kwargs):

return wrapper

def _create_workflow_from_function(
self,
fn: Callable[..., Any],
*,
workflow_name: str,
description: str | None = None,
mark_sync_tool: bool = False,
) -> Type:
"""
Create a Workflow subclass dynamically from a plain function.

The generated workflow class will:
- Have `run` implemented to call the provided function
- Be decorated with engine-specific run decorators via workflow_run
- Expose the original function for parameter schema generation
"""

import asyncio as _asyncio
from mcp_agent.executor.workflow import Workflow as _Workflow

async def _invoke_target(*args, **kwargs):
# Support both async and sync callables
res = fn(*args, **kwargs)
if _asyncio.iscoroutine(res):
res = await res

# Ensure WorkflowResult return type
try:
from mcp_agent.executor.workflow import (
WorkflowResult as _WorkflowResult,
)
except Exception:
_WorkflowResult = None # type: ignore[assignment]

if _WorkflowResult is not None and not isinstance(res, _WorkflowResult):
return _WorkflowResult(value=res)
return res

async def _run(self, *args, **kwargs): # type: ignore[no-redef]
return await _invoke_target(*args, **kwargs)

# Decorate run with engine-specific decorator
decorated_run = self.workflow_run(_run)

# Build the Workflow subclass dynamically
cls_dict: Dict[str, Any] = {
"__doc__": description or (fn.__doc__ or ""),
"run": decorated_run,
"__mcp_agent_param_source_fn__": fn,
}
if mark_sync_tool:
cls_dict["__mcp_agent_sync_tool__"] = True
else:
cls_dict["__mcp_agent_async_tool__"] = True

auto_cls = type(f"AutoWorkflow_{workflow_name}", (_Workflow,), cls_dict)

# Register with app (and apply engine-specific workflow decorator)
self.workflow(auto_cls, workflow_id=workflow_name)
return auto_cls

Comment on lines +526 to +586
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue

Bug: return undecorated class and invalid auto class names when tool name has hyphens.

  • type() requires an identifier; names like "my-tool" will fail.
  • _create_workflow_from_function returns auto_cls before engine-specific decoration; downstream may use the wrong class.

Apply this diff:

         import asyncio as _asyncio
+        import re as _re
         from mcp_agent.executor.workflow import Workflow as _Workflow
@@
-        auto_cls = type(f"AutoWorkflow_{workflow_name}", (_Workflow,), cls_dict)
+        _safe_suffix = _re.sub(r"\W|^(?=\d)", "_", workflow_name)
+        auto_cls = type(f"AutoWorkflow_{_safe_suffix}", (_Workflow,), cls_dict)
 
-        # Register with app (and apply engine-specific workflow decorator)
-        self.workflow(auto_cls, workflow_id=workflow_name)
-        return auto_cls
+        # Register with app (and apply engine-specific workflow decorator)
+        decorated_cls = self.workflow(auto_cls, workflow_id=workflow_name)
+        return decorated_cls
📝 Committable suggestion

‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.

Suggested change
def _create_workflow_from_function(
self,
fn: Callable[..., Any],
*,
workflow_name: str,
description: str | None = None,
mark_sync_tool: bool = False,
) -> Type:
"""
Create a Workflow subclass dynamically from a plain function.
The generated workflow class will:
- Have `run` implemented to call the provided function
- Be decorated with engine-specific run decorators via workflow_run
- Expose the original function for parameter schema generation
"""
import asyncio as _asyncio
from mcp_agent.executor.workflow import Workflow as _Workflow
async def _invoke_target(*args, **kwargs):
# Support both async and sync callables
res = fn(*args, **kwargs)
if _asyncio.iscoroutine(res):
res = await res
# Ensure WorkflowResult return type
try:
from mcp_agent.executor.workflow import (
WorkflowResult as _WorkflowResult,
)
except Exception:
_WorkflowResult = None # type: ignore[assignment]
if _WorkflowResult is not None and not isinstance(res, _WorkflowResult):
return _WorkflowResult(value=res)
return res
async def _run(self, *args, **kwargs): # type: ignore[no-redef]
return await _invoke_target(*args, **kwargs)
# Decorate run with engine-specific decorator
decorated_run = self.workflow_run(_run)
# Build the Workflow subclass dynamically
cls_dict: Dict[str, Any] = {
"__doc__": description or (fn.__doc__ or ""),
"run": decorated_run,
"__mcp_agent_param_source_fn__": fn,
}
if mark_sync_tool:
cls_dict["__mcp_agent_sync_tool__"] = True
else:
cls_dict["__mcp_agent_async_tool__"] = True
auto_cls = type(f"AutoWorkflow_{workflow_name}", (_Workflow,), cls_dict)
# Register with app (and apply engine-specific workflow decorator)
self.workflow(auto_cls, workflow_id=workflow_name)
return auto_cls
import asyncio as _asyncio
import re as _re
from mcp_agent.executor.workflow import Workflow as _Workflow
async def _invoke_target(*args, **kwargs):
# Support both async and sync callables
res = fn(*args, **kwargs)
if _asyncio.iscoroutine(res):
res = await res
# Ensure WorkflowResult return type
try:
from mcp_agent.executor.workflow import (
WorkflowResult as _WorkflowResult,
)
except Exception:
_WorkflowResult = None # type: ignore[assignment]
if _WorkflowResult is not None and not isinstance(res, _WorkflowResult):
return _WorkflowResult(value=res)
return res
async def _run(self, *args, **kwargs): # type: ignore[no-redef]
return await _invoke_target(*args, **kwargs)
# Decorate run with engine-specific decorator
decorated_run = self.workflow_run(_run)
# Build the Workflow subclass dynamically
cls_dict: Dict[str, Any] = {
"__doc__": description or (fn.__doc__ or ""),
"run": decorated_run,
"__mcp_agent_param_source_fn__": fn,
}
if mark_sync_tool:
cls_dict["__mcp_agent_sync_tool__"] = True
else:
cls_dict["__mcp_agent_async_tool__"] = True
# Sanitize workflow_name into a valid Python identifier suffix
_safe_suffix = _re.sub(r"\W|^(?=\d)", "_", workflow_name)
auto_cls = type(f"AutoWorkflow_{_safe_suffix}", (_Workflow,), cls_dict)
# Register with app (and apply engine-specific workflow decorator)
decorated_cls = self.workflow(auto_cls, workflow_id=workflow_name)
return decorated_cls

def tool(
self,
name: str | None = None,
*,
description: str | None = None,
structured_output: bool | None = None,
) -> Callable[[Callable[..., Any]], Callable[..., Any]]:
"""
Decorator to declare a synchronous MCP tool that runs via an auto-generated
Workflow and waits for completion before returning.

Also registers an async Workflow under the same name so that run/get_status
endpoints are available.
"""

def decorator(fn: Callable[..., Any]) -> Callable[..., Any]:
tool_name = name or fn.__name__
# Construct the workflow from function
workflow_cls = self._create_workflow_from_function(
fn,
workflow_name=tool_name,
description=description,
mark_sync_tool=True,
)

# Defer tool registration until the MCP server is created
self._declared_tools.append(
{
"name": tool_name,
"mode": "sync",
"workflow_name": tool_name,
"workflow_cls": workflow_cls,
"source_fn": fn,
"structured_output": structured_output,
"description": description or (fn.__doc__ or ""),
}
)

return fn

return decorator

def async_tool(
self,
name: str | None = None,
*,
description: str | None = None,
) -> Callable[[Callable[..., Any]], Callable[..., Any]]:
"""
Decorator to declare an asynchronous MCP tool.

Creates a Workflow class from the function and registers it so that
the standard per-workflow tools (run/get_status) are exposed by the server.
"""

def decorator(fn: Callable[..., Any]) -> Callable[..., Any]:
workflow_name = name or fn.__name__
workflow_cls = self._create_workflow_from_function(
fn,
workflow_name=workflow_name,
description=description,
mark_sync_tool=False,
)
# Defer alias tool registration for run/get_status
self._declared_tools.append(
{
"name": workflow_name,
"mode": "async",
"workflow_name": workflow_name,
"workflow_cls": workflow_cls,
"source_fn": fn,
"structured_output": None,
"description": description or (fn.__doc__ or ""),
}
)
return fn

return decorator

def workflow_task(
self,
name: str | None = None,
Expand Down
6 changes: 5 additions & 1 deletion src/mcp_agent/cli/cloud/main.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,11 @@
from typer.core import TyperGroup

from mcp_agent.cli.cloud.commands import configure_app, deploy_config, login
from mcp_agent.cli.cloud.commands.app import delete_app, get_app_status, list_app_workflows
from mcp_agent.cli.cloud.commands.app import (
delete_app,
get_app_status,
list_app_workflows,
)
from mcp_agent.cli.cloud.commands.apps import list_apps
from mcp_agent.cli.cloud.commands.workflow import get_workflow_status
from mcp_agent.cli.exceptions import CLIError
Expand Down
Loading
Loading