Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
34 changes: 34 additions & 0 deletions examples/mcp_agent_server/asyncio/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -258,6 +258,40 @@ def make_session(read_stream: MemoryObjectReceiveStream,

The example client (`client.py`) demonstrates this end-to-end: it registers a logging callback and calls `set_logging_level("info")` so logs from the server appear in the client's console.

## Testing Specific Features

The client supports feature flags to exercise subsets of functionality. Available flags: `workflows`, `tools`, `sampling`, `elicitation`, `notifications`, or `all`.

Examples:

```
# Default (all features)
uv run client.py

# Only workflows
uv run client.py --features workflows

# Only tools
uv run client.py --features tools

# Sampling + elicitation demos
uv run client.py --features sampling elicitation

# Only notifications (server logs + other notifications)
uv run client.py --features notifications

# Increase server logging verbosity
uv run client.py --server-log-level debug

# Use custom FastMCP settings when launching the server
uv run client.py --custom-fastmcp-settings
```

Console output:

- Server logs appear as lines prefixed with `[SERVER LOG] ...`.
- Other server-originated notifications (e.g., `notifications/progress`, `notifications/resources/list_changed`) appear as `[SERVER NOTIFY] <method>: ...`.

## MCP Clients

Since the mcp-agent app is exposed as an MCP server, it can be used in any MCP client just
Expand Down
529 changes: 336 additions & 193 deletions examples/mcp_agent_server/asyncio/client.py

Large diffs are not rendered by default.

114 changes: 114 additions & 0 deletions examples/mcp_agent_server/asyncio/main.py
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,10 @@
from mcp_agent.workflows.parallel.parallel_llm import ParallelLLM
from mcp_agent.executor.workflow import Workflow, WorkflowResult
from mcp_agent.tracing.token_counter import TokenNode
from mcp_agent.human_input.handler import console_input_callback
from mcp_agent.elicitation.handler import console_elicitation_callback
from mcp_agent.mcp.gen_client import gen_client
from mcp_agent.config import MCPServerSettings

# Note: This is purely optional:
# if not provided, a default FastMCP server will be created by MCPApp using create_mcp_server_for_app()
Expand All @@ -36,6 +40,8 @@
name="basic_agent_server",
description="Basic agent server example",
mcp=mcp,
human_input_callback=console_input_callback, # enable approval prompts for local sampling
elicitation_callback=console_elicitation_callback, # enable console-driven elicitation
)


Expand Down Expand Up @@ -109,6 +115,114 @@ async def run(self, input: str) -> WorkflowResult[str]:
return WorkflowResult(value=result)


@app.tool(name="sampling_demo")
async def sampling_demo(topic: str, app_ctx: Optional[AppContext] = None) -> str:
"""
Demonstrate MCP sampling via a nested MCP server tool.

- In asyncio (no upstream client), this triggers local sampling with a human approval prompt.
- When an MCP client is connected, the sampling request is proxied upstream.
"""
_app = app_ctx.app if app_ctx else app

# Register a simple nested server that uses sampling in its get_haiku tool
nested_name = "nested_sampling"
nested_path = os.path.abspath(
os.path.join(
os.path.dirname(__file__), "..", "shared", "nested_sampling_server.py"
)
)
_app.context.config.mcp.servers[nested_name] = MCPServerSettings(
name=nested_name,
command="uv",
args=["run", nested_path],
description="Nested server providing a haiku generator using sampling",
)

# Connect as an MCP client to the nested server and call its sampling tool
async with gen_client(
nested_name, _app.context.server_registry, context=_app.context
) as client:
result = await client.call_tool("get_haiku", {"topic": topic})

# Extract text content from CallToolResult
try:
if result.content and len(result.content) > 0:
return result.content[0].text or ""
except Exception:
pass
return ""


@app.tool(name="elicitation_demo")
async def elicitation_demo(
action: str = "proceed", app_ctx: Optional[AppContext] = None
) -> str:
"""
Demonstrate MCP elicitation via a nested MCP server tool.

- In asyncio (no upstream client), this triggers local elicitation handled by console.
- When an MCP client is connected, the elicitation request is proxied upstream.
"""
_app = app_ctx.app if app_ctx else app

nested_name = "nested_elicitation"
nested_path = os.path.abspath(
os.path.join(
os.path.dirname(__file__), "..", "shared", "nested_elicitation_server.py"
)
)
_app.context.config.mcp.servers[nested_name] = MCPServerSettings(
name=nested_name,
command="uv",
args=["run", nested_path],
description="Nested server demonstrating elicitation",
)

async with gen_client(
nested_name, _app.context.server_registry, context=_app.context
) as client:
result = await client.call_tool("confirm_action", {"action": action})
try:
if result.content and len(result.content) > 0:
return result.content[0].text or ""
except Exception:
pass
return ""


@app.tool(name="notify_resources")
async def notify_resources(app_ctx: Optional[AppContext] = None) -> str:
"""Trigger a non-logging resource list changed notification."""
_app = app_ctx.app if app_ctx else app
upstream = getattr(_app.context, "upstream_session", None)
if upstream is None:
_app.logger.warning("No upstream session to notify")
return "no-upstream"
await upstream.send_resource_list_changed()
_app.logger.info("Sent notifications/resources/list_changed")
return "ok"


@app.tool(name="notify_progress")
async def notify_progress(
progress: float = 0.5,
message: str | None = "Asyncio progress demo",
app_ctx: Optional[AppContext] = None,
) -> str:
"""Trigger a non-logging progress notification."""
_app = app_ctx.app if app_ctx else app
upstream = getattr(_app.context, "upstream_session", None)
if upstream is None:
_app.logger.warning("No upstream session to notify")
return "no-upstream"
await upstream.send_progress_notification(
progress_token="asyncio-demo", progress=progress, message=message
)
_app.logger.info("Sent notifications/progress")
return "ok"


@app.tool
async def grade_story(story: str, app_ctx: Optional[AppContext] = None) -> str:
"""
Expand Down
31 changes: 31 additions & 0 deletions examples/mcp_agent_server/shared/nested_elicitation_server.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,31 @@
from pydantic import BaseModel
from mcp.server.fastmcp import FastMCP
from mcp.server.elicitation import elicit_with_validation, AcceptedElicitation

mcp = FastMCP("Nested Elicitation Server")


class Confirmation(BaseModel):
confirm: bool


@mcp.tool()
async def confirm_action(action: str) -> str:
"""Ask the user to confirm an action via elicitation."""
ctx = mcp.get_context()
res = await elicit_with_validation(
ctx.session,
message=f"Do you want to {action}?",
schema=Confirmation,
)
if isinstance(res, AcceptedElicitation) and res.data.confirm:
return f"Action '{action}' confirmed by user"
return f"Action '{action}' declined by user"


def main():
mcp.run()


if __name__ == "__main__":
main()
40 changes: 40 additions & 0 deletions examples/mcp_agent_server/shared/nested_sampling_server.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,40 @@
from mcp.server.fastmcp import FastMCP
from mcp.types import ModelPreferences, ModelHint, SamplingMessage, TextContent

mcp = FastMCP("Nested Sampling Server")


@mcp.tool()
async def get_haiku(topic: str) -> str:
"""Use MCP sampling to generate a haiku about the given topic."""
result = await mcp.get_context().session.create_message(
messages=[
SamplingMessage(
role="user",
content=TextContent(
type="text", text=f"Generate a quirky haiku about {topic}."
),
)
],
system_prompt="You are a poet.",
max_tokens=100,
temperature=0.7,
model_preferences=ModelPreferences(
hints=[ModelHint(name="gpt-4o-mini")],
costPriority=0.1,
speedPriority=0.8,
intelligencePriority=0.1,
),
)

if isinstance(result.content, TextContent):
return result.content.text
return "Haiku generation failed"


def main():
mcp.run()


if __name__ == "__main__":
main()
31 changes: 31 additions & 0 deletions examples/mcp_agent_server/temporal/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -149,6 +149,37 @@ To run this example, you'll need to:
uv run client.py
```

### Testing Specific Features

The Temporal client supports feature flags to exercise subsets of functionality. Available flags: `workflows`, `tools`, `sampling`, `elicitation`, `notifications`, or `all`.

Examples:

```bash
# Default (all features)
uv run client.py

# Only workflows
uv run client.py --features workflows

# Only tools
uv run client.py --features tools

# Sampling + elicitation workflows
uv run client.py --features sampling elicitation

# Only notifications-related workflow
uv run client.py --features notifications

# Increase server logging verbosity seen by the client
uv run client.py --server-log-level debug
```

Console output:

- Server logs appear as lines prefixed with `[SERVER LOG] ...`.
- Other server-originated notifications (e.g., `notifications/progress`, `notifications/resources/list_changed`) appear as `[SERVER NOTIFY] <method>: ...`.

## Advanced Features with Temporal

### Workflow Signals
Expand Down
Loading
Loading