Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
249 changes: 129 additions & 120 deletions examples/mcp_agent_server/asyncio/basic_agent_server.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,8 @@
import logging
from typing import Dict, Any

from mcp.server.fastmcp import FastMCP

from mcp_agent.app import MCPApp
from mcp_agent.server.app_server import create_mcp_server_for_app
from mcp_agent.agents.agent import Agent
Expand All @@ -28,8 +30,16 @@
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)

# Create a single FastMCPApp instance (which extends MCPApp)
app = MCPApp(name="basic_agent_server", description="Basic agent server example")
# Note: This is purely optional:
# if not provided, a default FastMCP server will be created by MCPApp using create_mcp_server_for_app()
mcp = FastMCP(name="basic_agent_server", description="My basic agent server example.")

# Define the MCPApp instance
app = MCPApp(
name="basic_agent_server",
description="Basic agent server example",
mcp=mcp,
)


@app.workflow
Expand Down Expand Up @@ -169,6 +179,123 @@ async def run(self, input: str) -> WorkflowResult[str]:
return WorkflowResult(value=result)


# Add custom tool to get token usage for a workflow
@mcp.tool(
name="get_token_usage",
structured_output=True,
description="""
Get detailed token usage information for a specific workflow run.

This provides a comprehensive breakdown of token usage including:
- Total tokens used across all LLM calls within the workflow
- Breakdown by model provider and specific models
- Hierarchical usage tree showing usage at each level (workflow -> agent -> llm)
- Total cost estimate based on model pricing

Args:
workflow_id: Optional workflow ID (if multiple workflows have the same name)
run_id: Optional ID of the workflow run to get token usage for
workflow_name: Optional name of the workflow (used as fallback)

Returns:
Detailed token usage information for the specific workflow run
""",
)
async def get_workflow_token_usage(
workflow_id: str | None = None,
run_id: str | None = None,
workflow_name: str | None = None,
) -> Dict[str, Any]:
"""Get token usage information for a specific workflow run."""
context = app.context

if not context.token_counter:
return {
"error": "Token counter not available",
"message": "Token tracking is not enabled for this application",
}

# Find the specific workflow node
workflow_node = await context.token_counter.get_workflow_node(
name=workflow_name, workflow_id=workflow_id, run_id=run_id
)

if not workflow_node:
return {
"error": "Workflow not found",
"message": f"Could not find workflow with run_id='{run_id}'",
}

# Get the aggregated usage for this workflow
workflow_usage = workflow_node.aggregate_usage()

# Calculate cost for this workflow
workflow_cost = context.token_counter._calculate_node_cost(workflow_node)

# Build the response
result = {
"workflow": {
"name": workflow_node.name,
"run_id": workflow_node.metadata.get("run_id"),
"workflow_id": workflow_node.metadata.get("workflow_id"),
},
"usage": {
"input_tokens": workflow_usage.input_tokens,
"output_tokens": workflow_usage.output_tokens,
"total_tokens": workflow_usage.total_tokens,
},
"cost": round(workflow_cost, 4),
"model_breakdown": {},
"usage_tree": workflow_node.to_dict(),
}

# Get model breakdown for this workflow
model_usage = {}

def collect_model_usage(node: TokenNode):
"""Recursively collect model usage from a node tree"""
if node.usage.model_name:
model_name = node.usage.model_name
provider = node.usage.model_info.provider if node.usage.model_info else None

# Use tuple as key to handle same model from different providers
model_key = (model_name, provider)

if model_key not in model_usage:
model_usage[model_key] = {
"model_name": model_name,
"provider": provider,
"input_tokens": 0,
"output_tokens": 0,
"total_tokens": 0,
}

model_usage[model_key]["input_tokens"] += node.usage.input_tokens
model_usage[model_key]["output_tokens"] += node.usage.output_tokens
model_usage[model_key]["total_tokens"] += node.usage.total_tokens

for child in node.children:
collect_model_usage(child)

collect_model_usage(workflow_node)

# Calculate costs for each model and format for output
for (model_name, provider), usage in model_usage.items():
cost = context.token_counter.calculate_cost(
model_name, usage["input_tokens"], usage["output_tokens"], provider
)

# Create display key with provider info if available
display_key = f"{model_name} ({provider})" if provider else model_name

result["model_breakdown"][display_key] = {
**usage,
"cost": round(cost, 4),
}

return result


async def main():
parser = argparse.ArgumentParser()
parser.add_argument(
Expand Down Expand Up @@ -202,124 +329,6 @@ async def main():
mcp_server = create_mcp_server_for_app(agent_app, **(fast_mcp_settings or {}))
logger.info(f"MCP Server settings: {mcp_server.settings}")

# Add custom tool to get token usage for a workflow
@mcp_server.tool(
name="get_token_usage",
structured_output=True,
description="""
Get detailed token usage information for a specific workflow run.

This provides a comprehensive breakdown of token usage including:
- Total tokens used across all LLM calls within the workflow
- Breakdown by model provider and specific models
- Hierarchical usage tree showing usage at each level (workflow -> agent -> llm)
- Total cost estimate based on model pricing

Args:
workflow_id: Optional workflow ID (if multiple workflows have the same name)
run_id: Optional ID of the workflow run to get token usage for
workflow_name: Optional name of the workflow (used as fallback)

Returns:
Detailed token usage information for the specific workflow run
""",
)
async def get_workflow_token_usage(
workflow_id: str | None = None,
run_id: str | None = None,
workflow_name: str | None = None,
) -> Dict[str, Any]:
"""Get token usage information for a specific workflow run."""
if not context.token_counter:
return {
"error": "Token counter not available",
"message": "Token tracking is not enabled for this application",
}

# Find the specific workflow node
workflow_node = await context.token_counter.get_workflow_node(
name=workflow_name, workflow_id=workflow_id, run_id=run_id
)

if not workflow_node:
return {
"error": "Workflow not found",
"message": f"Could not find workflow with run_id='{run_id}'",
}

# Get the aggregated usage for this workflow
workflow_usage = workflow_node.aggregate_usage()

# Calculate cost for this workflow
workflow_cost = context.token_counter._calculate_node_cost(workflow_node)

# Build the response
result = {
"workflow": {
"name": workflow_node.name,
"run_id": workflow_node.metadata.get("run_id"),
"workflow_id": workflow_node.metadata.get("workflow_id"),
},
"usage": {
"input_tokens": workflow_usage.input_tokens,
"output_tokens": workflow_usage.output_tokens,
"total_tokens": workflow_usage.total_tokens,
},
"cost": round(workflow_cost, 4),
"model_breakdown": {},
"usage_tree": workflow_node.to_dict(),
}

# Get model breakdown for this workflow
model_usage = {}

def collect_model_usage(node: TokenNode):
"""Recursively collect model usage from a node tree"""
if node.usage.model_name:
model_name = node.usage.model_name
provider = (
node.usage.model_info.provider
if node.usage.model_info
else None
)

# Use tuple as key to handle same model from different providers
model_key = (model_name, provider)

if model_key not in model_usage:
model_usage[model_key] = {
"model_name": model_name,
"provider": provider,
"input_tokens": 0,
"output_tokens": 0,
"total_tokens": 0,
}

model_usage[model_key]["input_tokens"] += node.usage.input_tokens
model_usage[model_key]["output_tokens"] += node.usage.output_tokens
model_usage[model_key]["total_tokens"] += node.usage.total_tokens

for child in node.children:
collect_model_usage(child)

collect_model_usage(workflow_node)

# Calculate costs for each model and format for output
for (model_name, provider), usage in model_usage.items():
cost = context.token_counter.calculate_cost(
model_name, usage["input_tokens"], usage["output_tokens"], provider
)

# Create display key with provider info if available
display_key = f"{model_name} ({provider})" if provider else model_name

result["model_breakdown"][display_key] = {
**usage,
"cost": round(cost, 4),
}

return result

# Run the server
await mcp_server.run_stdio_async()

Expand Down
16 changes: 11 additions & 5 deletions src/mcp_agent/app.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@
from contextlib import asynccontextmanager

from mcp import ServerSession
from mcp.server.fastmcp import FastMCP
from mcp_agent.core.context import Context, initialize_context, cleanup_context
from mcp_agent.config import Settings, get_settings
from mcp_agent.executor.signal_registry import SignalRegistry
Expand Down Expand Up @@ -57,12 +58,13 @@ def __init__(
self,
name: str = "mcp_application",
description: str | None = None,
settings: Optional[Settings] | str = None,
human_input_callback: Optional[HumanInputCallback] = None,
elicitation_callback: Optional[ElicitationCallback] = None,
signal_notification: Optional[SignalWaitCallback] = None,
settings: Settings | str | None = None,
mcp: FastMCP | None = None,
human_input_callback: HumanInputCallback | None = None,
elicitation_callback: ElicitationCallback | None = None,
signal_notification: SignalWaitCallback | None = None,
upstream_session: Optional["ServerSession"] = None,
model_selector: ModelSelector = None,
model_selector: ModelSelector | None = None,
):
"""
Initialize the application with a name and optional settings.
Expand All @@ -72,13 +74,17 @@ def __init__(
provide a detailed description, since it will be used as the server's description.
settings: Application configuration - If unspecified, the settings are loaded from mcp_agent.config.yaml.
If this is a string, it is treated as the path to the config file to load.
mcp: MCP server instance to use for the application to expose agents and workflows as tools.
If not provided, a default FastMCP server will be created by create_mcp_server_for_app().
If provided, the MCPApp will add tools to the provided server instance.
human_input_callback: Callback for handling human input
signal_notification: Callback for getting notified on workflow signals/events.
upstream_session: Upstream session if the MCPApp is running as a server to an MCP client.
initialize_model_selector: Initializes the built-in ModelSelector to help with model selection. Defaults to False.
"""
self.name = name
self.description = description or "MCP Agent Application"
self.mcp = mcp

# We use these to initialize the context in initialize()
if settings is None:
Expand Down
5 changes: 5 additions & 0 deletions src/mcp_agent/core/context.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@
from pydantic import BaseModel, ConfigDict

from mcp import ServerSession
from mcp.server.fastmcp import FastMCP

from opentelemetry import trace

Expand Down Expand Up @@ -88,6 +89,10 @@ class Context(BaseModel):
arbitrary_types_allowed=True, # Tell Pydantic to defer type evaluation
)

@property
def mcp(self) -> FastMCP | None:
return self.app.mcp if self.app else None
Comment on lines +92 to +94
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

💡 Verification agent

🧩 Analysis chain

Consider adding type annotation consistency check.

The property implementation is correct. However, ensure that the app attribute's type annotation in the class definition is updated to explicitly indicate that when set, app.mcp will be of type FastMCP | None.

Let me verify that the app attribute type is properly defined and that MCPApp has the mcp attribute:


🏁 Script executed:

#!/bin/bash
# Check MCPApp class definition for mcp attribute
ast-grep --pattern 'class MCPApp {
  $$$
  def __init__($$$) {
    $$$
  }
  $$$
}'

# Also check for mcp attribute definition
rg -A 3 'class MCPApp' src/mcp_agent/app.py | head -20

Length of output: 242


🏁 Script executed:

#!/bin/bash
set -euo pipefail

# Print context.py around the referenced lines with numbers
echo "== src/mcp_agent/core/context.py =="
nl -ba src/mcp_agent/core/context.py | sed -n '1,240p' || true

echo
echo "== src/mcp_agent/app.py =="
nl -ba src/mcp_agent/app.py | sed -n '1,240p' || true

echo
echo "== Search for 'mcp' mentions across repo =="
rg -n "mcp" || true

echo
echo "== Search for 'class MCPApp' with context =="
rg -n --hidden -n "class MCPApp" -S || true

Length of output: 264217


🏁 Script executed:

#!/bin/bash
set -euo pipefail

echo "== src/mcp_agent/core/context.py (around mcp property) =="
awk 'NR>=1 && NR<=220 { printf("%4d: %s\n", NR, $0) }' src/mcp_agent/core/context.py || true

echo
echo "== src/mcp_agent/app.py (around class and __init__) =="
awk 'NR>=1 && NR<=260 { printf("%4d: %s\n", NR, $0) }' src/mcp_agent/app.py || true

echo
echo "== grep for 'self.mcp' and 'mcp:' annotations in app.py =="
rg -n --hidden --glob '!**/node_modules/**' "self.mcp|mcp: FastMCP" src/mcp_agent/app.py || true

Length of output: 20622


Add explicit class-level type annotation for MCPApp.mcp

Verified: Context.app is Optional["MCPApp"] and Context.mcp returns self.app.mcp if present. MCPApp.init accepts mcp: FastMCP | None and assigns self.mcp = mcp, but MCPApp does not declare a class-level annotation for the attribute. Add an explicit annotation so static type-checkers reliably know app.mcp is FastMCP | None.

Files to update:

  • src/mcp_agent/core/context.py — Context.app: Optional["MCPApp"] = None (OK; keeps using app.mcp)
  • src/mcp_agent/app.py — add class-level annotation for mcp

Suggested small patch (add inside the MCPApp class, e.g. after the docstring):

class MCPApp:
    """
    Main application class ...
    """
    mcp: FastMCP | None = None

    def __init__(...
🤖 Prompt for AI Agents
In src/mcp_agent/core/context.py around lines 92-94, Context.app is
Optional["MCPApp"] and Context.mcp returns self.app.mcp if present; ensure
static type-checkers recognize MCPApp.mcp by adding an explicit class-level
annotation in src/mcp_agent/app.py inside the MCPApp class (e.g., add "mcp:
FastMCP | None = None" immediately after the class docstring) so that app.mcp is
typed as FastMCP | None.



async def configure_otel(
config: "Settings", session_id: str | None = None
Expand Down
Loading
Loading