Skip to content
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
84 changes: 84 additions & 0 deletions examples/mcp_agent_server/asyncio/main.py
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,10 @@
from mcp_agent.workflows.parallel.parallel_llm import ParallelLLM
from mcp_agent.executor.workflow import Workflow, WorkflowResult
from mcp_agent.tracing.token_counter import TokenNode
from mcp_agent.human_input.handler import console_input_callback
from mcp_agent.elicitation.handler import console_elicitation_callback
from mcp_agent.mcp.gen_client import gen_client
from mcp_agent.config import MCPServerSettings

# Note: This is purely optional:
# if not provided, a default FastMCP server will be created by MCPApp using create_mcp_server_for_app()
Expand All @@ -36,6 +40,8 @@
name="basic_agent_server",
description="Basic agent server example",
mcp=mcp,
human_input_callback=console_input_callback, # enable approval prompts for local sampling
elicitation_callback=console_elicitation_callback, # enable console-driven elicitation
)


Expand Down Expand Up @@ -109,6 +115,84 @@ async def run(self, input: str) -> WorkflowResult[str]:
return WorkflowResult(value=result)


@app.tool(name="sampling_demo")
async def sampling_demo(topic: str, app_ctx: Optional[AppContext] = None) -> str:
"""
Demonstrate MCP sampling via a nested MCP server tool.

- In asyncio (no upstream client), this triggers local sampling with a human approval prompt.
- When an MCP client is connected, the sampling request is proxied upstream.
"""
_app = app_ctx.app if app_ctx else app

# Register a simple nested server that uses sampling in its get_haiku tool
nested_name = "nested_sampling"
nested_path = os.path.abspath(
os.path.join(
os.path.dirname(__file__), "..", "shared", "nested_sampling_server.py"
)
)
_app.context.config.mcp.servers[nested_name] = MCPServerSettings(
name=nested_name,
command="uv",
args=["run", nested_path],
description="Nested server providing a haiku generator using sampling",
)

# Connect as an MCP client to the nested server and call its sampling tool
async with gen_client(
nested_name, _app.context.server_registry, context=_app.context
) as client:
result = await client.call_tool("get_haiku", {"topic": topic})

# Extract text content from CallToolResult
try:
if result.content and len(result.content) > 0:
return result.content[0].text or ""
except Exception:
pass
return ""


@app.tool(name="elicitation_demo")
async def elicitation_demo(
action: str = "proceed", app_ctx: Optional[AppContext] = None
) -> str:
"""
Demonstrate MCP elicitation via a nested MCP server tool.

- In asyncio (no upstream client), this triggers local elicitation handled by console.
- When an MCP client is connected, the elicitation request is proxied upstream.
"""
_app = app_ctx.app if app_ctx else app

nested_name = "nested_elicitation"
nested_path = os.path.abspath(
os.path.join(
os.path.dirname(__file__), "..", "shared", "nested_elicitation_server.py"
)
)
_app.context.config.mcp.servers[nested_name] = MCPServerSettings(
name=nested_name,
command="uv",
args=["run", nested_path],
description="Nested server demonstrating elicitation",
)

async with gen_client(
nested_name, _app.context.server_registry, context=_app.context
) as client:
# The nested server will call context.session.elicit() internally
result = await client.call_tool("confirm_action", {"action": action})

try:
if result.content and len(result.content) > 0:
return result.content[0].text or ""
except Exception:
pass
return ""


@app.tool
async def grade_story(story: str, app_ctx: Optional[AppContext] = None) -> str:
"""
Expand Down
31 changes: 31 additions & 0 deletions examples/mcp_agent_server/shared/nested_elicitation_server.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,31 @@
from pydantic import BaseModel
from mcp.server.fastmcp import FastMCP
from mcp.server.elicitation import elicit_with_validation, AcceptedElicitation

mcp = FastMCP("Nested Elicitation Server")


class Confirmation(BaseModel):
confirm: bool


@mcp.tool()
async def confirm_action(action: str) -> str:
"""Ask the user to confirm an action via elicitation."""
ctx = mcp.get_context()
res = await elicit_with_validation(
ctx.session,
message=f"Do you want to {action}?",
schema=Confirmation,
)
if isinstance(res, AcceptedElicitation) and res.data.confirm:
return f"Action '{action}' confirmed by user"
return f"Action '{action}' declined by user"


def main():
mcp.run()


if __name__ == "__main__":
main()
40 changes: 40 additions & 0 deletions examples/mcp_agent_server/shared/nested_sampling_server.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,40 @@
from mcp.server.fastmcp import FastMCP
from mcp.types import ModelPreferences, ModelHint, SamplingMessage, TextContent

mcp = FastMCP("Nested Sampling Server")


@mcp.tool()
async def get_haiku(topic: str) -> str:
"""Use MCP sampling to generate a haiku about the given topic."""
result = await mcp.get_context().session.create_message(
messages=[
SamplingMessage(
role="user",
content=TextContent(
type="text", text=f"Generate a quirky haiku about {topic}."
),
)
],
system_prompt="You are a poet.",
max_tokens=100,
temperature=0.7,
model_preferences=ModelPreferences(
hints=[ModelHint(name="gpt-4o-mini")],
costPriority=0.1,
speedPriority=0.8,
intelligencePriority=0.1,
),
)

if isinstance(result.content, TextContent):
return result.content.text
return "Haiku generation failed"


def main():
mcp.run()


if __name__ == "__main__":
main()
144 changes: 143 additions & 1 deletion examples/mcp_agent_server/temporal/basic_agent_server.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,13 +19,23 @@
from mcp_agent.server.app_server import create_mcp_server_for_app
from mcp_agent.executor.workflow import Workflow, WorkflowResult
from mcp_agent.workflows.llm.augmented_llm_openai import OpenAIAugmentedLLM
from mcp_agent.human_input.handler import console_input_callback
from mcp_agent.elicitation.handler import console_elicitation_callback
from mcp_agent.mcp.gen_client import gen_client
from mcp_agent.config import MCPServerSettings
from mcp.types import SamplingMessage, TextContent, ModelPreferences, ModelHint

# Initialize logging
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)

# Create a single FastMCPApp instance (which extends MCPApp)
app = MCPApp(name="basic_agent_server", description="Basic agent server example")
app = MCPApp(
name="basic_agent_server",
description="Basic agent server example",
human_input_callback=console_input_callback, # for local sampling approval
elicitation_callback=console_elicitation_callback, # for local elicitation
)


@app.workflow
Expand Down Expand Up @@ -156,6 +166,138 @@ async def run(
return WorkflowResult(value=result)


@app.workflow_task(name="call_nested_sampling")
async def call_nested_sampling(topic: str, app_ctx: Context) -> str:
"""Activity: call a nested MCP server tool that uses sampling."""
nested_name = "nested_sampling"
nested_path = os.path.abspath(
os.path.join(
os.path.dirname(__file__), "..", "shared", "nested_sampling_server.py"
)
)
app_ctx.config.mcp.servers[nested_name] = MCPServerSettings(
name=nested_name,
command="uv",
args=["run", nested_path],
description="Nested server providing a haiku generator using sampling",
)

async with gen_client(
nested_name, app_ctx.server_registry, context=app_ctx
) as client:
result = await client.call_tool("get_haiku", {"topic": topic})
try:
if result.content and len(result.content) > 0:
return result.content[0].text or ""
except Exception:
pass
return ""


@app.workflow_task(name="call_nested_elicitation")
async def call_nested_elicitation(action: str, app_ctx: Context) -> str:
"""Activity: call a nested MCP server tool that triggers elicitation."""
nested_name = "nested_elicitation"
nested_path = os.path.abspath(
os.path.join(
os.path.dirname(__file__), "..", "shared", "nested_elicitation_server.py"
)
)
app_ctx.config.mcp.servers[nested_name] = MCPServerSettings(
name=nested_name,
command="uv",
args=["run", nested_path],
description="Nested server demonstrating elicitation",
)

async with gen_client(
nested_name, app_ctx.server_registry, context=app_ctx
) as client:
result = await client.call_tool("confirm_action", {"action": action})
try:
if result.content and len(result.content) > 0:
return result.content[0].text or ""
except Exception:
pass
return ""


@app.workflow
class SamplingWorkflow(Workflow[str]):
"""Temporal workflow that triggers an MCP sampling request via a nested server."""

@app.workflow_run
async def run(self, input: str = "space exploration") -> WorkflowResult[str]:
logger.info(
"Running SamplingWorkflow; sampling should be proxied via activities"
)
# 1) Direct workflow sampling via SessionProxy (will schedule mcp_relay_request activity)
direct = await app.context.upstream_session.create_message(
messages=[
SamplingMessage(
role="user",
content=TextContent(
type="text", text=f"Write a haiku about {input}."
),
)
],
system_prompt="You are a poet.",
max_tokens=80,
model_preferences=ModelPreferences(
hints=[ModelHint(name="gpt-4o-mini")],
costPriority=0.1,
speedPriority=0.8,
intelligencePriority=0.1,
),
)
try:
direct_text = (
direct.content.text if isinstance(direct.content, TextContent) else ""
)
except Exception:
direct_text = ""
app.logger.info(f"Direct sampling result: {direct_text}")

# 2) Nested server sampling executed as an activity
result = await app.context.executor.execute(
call_nested_sampling, {"topic": input, "app_ctx": app.context}
)
# Log and return
app.logger.info(f"Nested sampling result: {result}")
return WorkflowResult(value=f"direct={direct_text}\nnested={result}")


@app.workflow
class ElicitationWorkflow(Workflow[str]):
"""Temporal workflow that triggers elicitation via direct session and nested server."""

@app.workflow_run
async def run(self, input: str = "proceed") -> WorkflowResult[str]:
logger.info(
"Running ElicitationWorkflow; should proxy to /request elicitation/create"
)

# 1) Direct elicitation via SessionProxy (schedules mcp_relay_request)
schema = {
"type": "object",
"properties": {"confirm": {"type": "boolean"}},
"required": ["confirm"],
}
direct = await app.context.upstream_session.elicit(
message=f"Do you want to {input}?",
requestedSchema=schema,
)
direct_text = f"accepted={getattr(direct, 'action', '')}"

# 2) Nested elicitation via activity
nested = await app.context.executor.execute(
call_nested_elicitation, {"action": input, "app_ctx": app.context}
)

app.logger.info(f"Elicitation results: direct={direct_text} nested={nested}")
return WorkflowResult(value=f"direct={direct_text}\nnested={nested}")


async def main():
async with app.run() as agent_app:
# Log registered workflows and agent configurations
Expand Down
6 changes: 4 additions & 2 deletions src/mcp_agent/cli/core/utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -27,14 +27,16 @@ def run_async(coro):
raise


def load_user_app(script_path: Path | None, settings_override: Optional[Settings] = None) -> MCPApp:
def load_user_app(
script_path: Path | None, settings_override: Optional[Settings] = None
) -> MCPApp:
"""Import a user script and return an MCPApp instance.

Resolution order within module globals:
1) variable named 'app' that is MCPApp
2) callable 'create_app' or 'get_app' that returns MCPApp
3) first MCPApp instance found in globals

Args:
script_path: Path to the Python script containing the MCPApp
settings_override: Optional settings to override the app's configuration
Expand Down
Loading
Loading