Skip to content
Merged
Show file tree
Hide file tree
Changes from 22 commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 0 additions & 1 deletion examples/mcp_agent_server/asyncio/basic_agent_server.py
Original file line number Diff line number Diff line change
Expand Up @@ -188,7 +188,6 @@ async def grade_story(story: str, app_ctx: Optional[AppContext] = None) -> str:
async def grade_story_async(story: str, app_ctx: Optional[AppContext] = None) -> str:
"""
Async variant of grade_story that starts a workflow run and returns IDs.

Args:
story: The student's short story to grade
app_ctx: Optional MCPApp context for accessing app resources and logging
Expand Down
8 changes: 8 additions & 0 deletions examples/mcp_agent_server/temporal/basic_agent_server.py
Original file line number Diff line number Diff line change
Expand Up @@ -59,12 +59,20 @@ async def run(
context = app.context
context.config.mcp.servers["filesystem"].args.extend([os.getcwd()])

# Use of the app.logger will forward logs back to the mcp client
app_logger = app.logger

app_logger.info("Starting finder agent")
async with finder_agent:
finder_llm = await finder_agent.attach_llm(OpenAIAugmentedLLM)

result = await finder_llm.generate_str(
message=input,
)

# forwards the log to the caller
app_logger.info(f"Finder agent completed with result {result}")
# print to the console (for when running locally)
print(f"Agent result: {result}")
return WorkflowResult(value=result)

Expand Down
83 changes: 70 additions & 13 deletions examples/mcp_agent_server/temporal/client.py
Original file line number Diff line number Diff line change
@@ -1,12 +1,18 @@
import asyncio
import json
import time
from mcp.types import CallToolResult
import argparse
from mcp_agent.app import MCPApp
from mcp_agent.config import MCPServerSettings
from mcp_agent.executor.workflow import WorkflowExecution
from mcp_agent.mcp.gen_client import gen_client

from datetime import timedelta
from anyio.streams.memory import MemoryObjectReceiveStream, MemoryObjectSendStream
from mcp import ClientSession
from mcp_agent.mcp.mcp_agent_client_session import MCPAgentClientSession
from mcp.types import CallToolResult, LoggingMessageNotificationParams

try:
from exceptiongroup import ExceptionGroup as _ExceptionGroup # Python 3.10 backport
except Exception: # pragma: no cover
Expand All @@ -18,28 +24,68 @@


async def main():
parser = argparse.ArgumentParser()
parser.add_argument(
"--server-log-level",
type=str,
default=None,
help="Set server logging level (debug, info, notice, warning, error, critical, alert, emergency)",
)
args = parser.parse_args()
# Create MCPApp to get the server registry
app = MCPApp(name="workflow_mcp_client")
async with app.run() as client_app:
logger = client_app.logger
context = client_app.context

# Connect to the workflow server
logger.info("Connecting to workflow server...")

# Override the server configuration to point to our local script
context.server_registry.registry["basic_agent_server"] = MCPServerSettings(
name="basic_agent_server",
description="Local workflow server running the basic agent example",
transport="sse",
url="http://0.0.0.0:8000/sse",
)

# Connect to the workflow server
try:
logger.info("Connecting to workflow server...")

# Override the server configuration to point to our local script
context.server_registry.registry["basic_agent_server"] = MCPServerSettings(
name="basic_agent_server",
description="Local workflow server running the basic agent example",
transport="sse",
url="http://0.0.0.0:8000/sse",
)

Comment on lines +45 to +52
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue

Fix client URL: 0.0.0.0 is a bind address, not a connect target.

Use localhost/127.0.0.1; current value will often fail to connect.

Apply:

             context.server_registry.registry["basic_agent_server"] = MCPServerSettings(
                 name="basic_agent_server",
                 description="Local workflow server running the basic agent example",
                 transport="sse",
-                url="http://0.0.0.0:8000/sse",
+                url="http://127.0.0.1:8000/sse",
             )
📝 Committable suggestion

‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.

Suggested change
# Override the server configuration to point to our local script
context.server_registry.registry["basic_agent_server"] = MCPServerSettings(
name="basic_agent_server",
description="Local workflow server running the basic agent example",
transport="sse",
url="http://0.0.0.0:8000/sse",
)
# Override the server configuration to point to our local script
context.server_registry.registry["basic_agent_server"] = MCPServerSettings(
name="basic_agent_server",
description="Local workflow server running the basic agent example",
transport="sse",
url="http://127.0.0.1:8000/sse",
)
🤖 Prompt for AI Agents
In examples/mcp_agent_server/temporal/client.py around lines 36 to 43, the
client URL is using the bind address "0.0.0.0" which cannot be used as a connect
target; change the URL to a loopback address such as "http://127.0.0.1:8000/sse"
(or "http://localhost:8000/sse") so the client can successfully connect to the
local server, replacing the existing url value only.

# Connect to the workflow server
# Define a logging callback to receive server-side log notifications
async def on_server_log(params: LoggingMessageNotificationParams) -> None:
# Pretty-print server logs locally for demonstration
level = params.level.upper()
name = params.logger or "server"
# params.data can be any JSON-serializable data
print(f"[SERVER LOG] [{level}] [{name}] {params.data}")

# Provide a client session factory that installs our logging callback
def make_session(
read_stream: MemoryObjectReceiveStream,
write_stream: MemoryObjectSendStream,
read_timeout_seconds: timedelta | None,
) -> ClientSession:
return MCPAgentClientSession(
read_stream=read_stream,
write_stream=write_stream,
read_timeout_seconds=read_timeout_seconds,
logging_callback=on_server_log,
)

# Connect to the workflow server
async with gen_client(
"basic_agent_server", context.server_registry
"basic_agent_server",
context.server_registry,
client_session_factory=make_session,
) as server:
# Ask server to send logs at the requested level (default info)
level = (args.server_log_level or "info").lower()
print(f"[client] Setting server logging level to: {level}")
try:
await server.set_logging_level(level)
except Exception:
# Older servers may not support logging capability
print("[client] Server does not support logging/setLevel")
# Call the BasicAgentWorkflow
run_result = await server.call_tool(
"workflows-BasicAgentWorkflow-run",
Expand All @@ -56,6 +102,17 @@ async def main():
f"Started BasicAgentWorkflow-run. workflow ID={execution.workflow_id}, run ID={run_id}"
)

get_status_result = await server.call_tool(
"workflows-BasicAgentWorkflow-get_status",
arguments={"run_id": run_id},
)

execution = WorkflowExecution(**json.loads(run_result.content[0].text))
run_id = execution.run_id
logger.info(
f"Started BasicAgentWorkflow-run. workflow ID={execution.workflow_id}, run ID={run_id}"
)
Comment on lines +105 to +114
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

There appears to be a duplicate code block at lines 105-114 that repeats the same workflow execution processing. The code retrieves get_status_result, creates a WorkflowExecution object, and logs the same message twice in succession. This duplication likely resulted from a copy-paste error during development.

The second block (starting at line 115) already handles the workflow status polling correctly, so the first duplicate block should be removed to prevent:

  • Unnecessary API calls
  • Variable reassignment that has no effect
  • Redundant logging

Removing lines 105-114 would resolve this issue while preserving the intended functionality.

Spotted by Diamond

Fix in Graphite


Is this helpful? React 👍 or 👎 to let us know.

Comment on lines +105 to +114
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue

Duplicate parsing/log line; likely copy/paste error.

You re-parse run_result and log “Started …” a second time right after get_status. Remove the duplicate block.

-                get_status_result = await server.call_tool(
-                    "workflows-BasicAgentWorkflow-get_status",
-                    arguments={"run_id": run_id},
-                )
-
-                execution = WorkflowExecution(**json.loads(run_result.content[0].text))
-                run_id = execution.run_id
-                logger.info(
-                    f"Started BasicAgentWorkflow-run. workflow ID={execution.workflow_id}, run ID={run_id}"
-                )
+                get_status_result = await server.call_tool(
+                    "workflows-BasicAgentWorkflow-get_status",
+                    arguments={"run_id": run_id},
+                )
📝 Committable suggestion

‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.

Suggested change
get_status_result = await server.call_tool(
"workflows-BasicAgentWorkflow-get_status",
arguments={"run_id": run_id},
)
execution = WorkflowExecution(**json.loads(run_result.content[0].text))
run_id = execution.run_id
logger.info(
f"Started BasicAgentWorkflow-run. workflow ID={execution.workflow_id}, run ID={run_id}"
)
get_status_result = await server.call_tool(
"workflows-BasicAgentWorkflow-get_status",
arguments={"run_id": run_id},
)
🤖 Prompt for AI Agents
In examples/mcp_agent_server/temporal/client.py around lines 105 to 114, there
is a duplicate block that re-parses run_result into execution and logs "Started
BasicAgentWorkflow..." immediately after calling get_status; remove that
duplicated parsing/logging block (delete the re-instantiation of
WorkflowExecution from run_result and the subsequent logger.info) and, if you
need workflow status information, parse and/or log from get_status_result
instead so the original start log remains only once.


# Wait for the workflow to complete
while True:
get_status_result = await server.call_tool(
Expand Down
2 changes: 1 addition & 1 deletion pyproject.toml
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
[project]
name = "mcp-agent"
version = "0.1.16"
version = "0.1.17"
description = "Build effective agents with Model Context Protocol (MCP) using simple, composable patterns."
readme = "README.md"
license = { file = "LICENSE" }
Expand Down
16 changes: 16 additions & 0 deletions src/mcp_agent/app.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
from mcp_agent.executor.signal_registry import SignalRegistry
from mcp_agent.logging.event_progress import ProgressAction
from mcp_agent.logging.logger import get_logger
from mcp_agent.logging.logger import set_default_bound_context
from mcp_agent.executor.decorator_registry import (
DecoratorRegistry,
register_asyncio_decorators,
Expand Down Expand Up @@ -195,12 +196,14 @@ def logger(self):
try:
if self._context is not None:
self._logger._bound_context = self._context # type: ignore[attr-defined]

except Exception:
pass
else:
# Update the logger's bound context in case upstream_session was set after logger creation
if self._context and hasattr(self._logger, "_bound_context"):
self._logger._bound_context = self._context

return self._logger

async def initialize(self):
Expand Down Expand Up @@ -231,6 +234,12 @@ async def initialize(self):
# Store a reference to this app instance in the context for easier access
self._context.app = self

# Provide a safe default bound context for loggers created after init without explicit context
try:
set_default_bound_context(self._context)
except Exception:
pass

# Auto-load subagents if enabled in settings
try:
subagents = self._config.agents
Expand Down Expand Up @@ -840,6 +849,13 @@ def decorator(target: Callable[..., R]) -> Callable[..., R]:
)

if task_defn:
# prevent trying to decorate an already decorated function
if hasattr(target, "__temporal_activity_definition"):
self.logger.debug(
f"target {name} has __temporal_activity_definition"
)
return target # Already decorated with @activity

if isinstance(target, MethodType):
self_ref = target.__self__

Expand Down
1 change: 0 additions & 1 deletion src/mcp_agent/cli/cloud/commands/auth/whoami/main.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,5 @@
"""MCP Agent Cloud whoami command implementation."""


from rich.console import Console
from rich.panel import Panel
from rich.table import Table
Expand Down
2 changes: 1 addition & 1 deletion src/mcp_agent/cli/cloud/commands/logger/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,4 +6,4 @@

from .tail.main import tail_logs

__all__ = ["tail_logs"]
__all__ = ["tail_logs"]
Original file line number Diff line number Diff line change
Expand Up @@ -2,4 +2,4 @@

from .main import configure_logger

__all__ = ["configure_logger"]
__all__ = ["configure_logger"]
76 changes: 46 additions & 30 deletions src/mcp_agent/cli/cloud/commands/logger/configure/main.py
Original file line number Diff line number Diff line change
Expand Up @@ -32,10 +32,10 @@ def configure_logger(
),
) -> None:
"""Configure OTEL endpoint and headers for log collection.

This command allows you to configure the OpenTelemetry endpoint and headers
that will be used for collecting logs from your deployed MCP apps.

Examples:
mcp-agent cloud logger configure https://otel.example.com:4318/v1/logs
mcp-agent cloud logger configure https://otel.example.com --headers "Authorization=Bearer token,X-Custom=value"
Expand All @@ -44,17 +44,19 @@ def configure_logger(
if not endpoint and not test:
console.print("[red]Error: Must specify endpoint or use --test[/red]")
raise typer.Exit(1)

config_path = _find_config_file()

if test:
if config_path and config_path.exists():
config = _load_config(config_path)
otel_config = config.get("otel", {})
endpoint = otel_config.get("endpoint")
headers_dict = otel_config.get("headers", {})
else:
console.print("[yellow]No configuration file found. Use --endpoint to set up OTEL configuration.[/yellow]")
console.print(
"[yellow]No configuration file found. Use --endpoint to set up OTEL configuration.[/yellow]"
)
raise typer.Exit(1)
else:
headers_dict = {}
Expand All @@ -64,54 +66,68 @@ def configure_logger(
key, value = header_pair.strip().split("=", 1)
headers_dict[key.strip()] = value.strip()
except ValueError:
console.print("[red]Error: Headers must be in format 'key=value,key2=value2'[/red]")
console.print(
"[red]Error: Headers must be in format 'key=value,key2=value2'[/red]"
)
raise typer.Exit(1)

if endpoint:
console.print(f"[blue]Testing connection to {endpoint}...[/blue]")

try:
with httpx.Client(timeout=10.0) as client:
response = client.get(
endpoint.replace("/v1/logs", "/health") if "/v1/logs" in endpoint else f"{endpoint}/health",
headers=headers_dict
endpoint.replace("/v1/logs", "/health")
if "/v1/logs" in endpoint
else f"{endpoint}/health",
headers=headers_dict,
)

if response.status_code in [200, 404]: # 404 is fine, means endpoint exists

if response.status_code in [
200,
404,
]: # 404 is fine, means endpoint exists
console.print("[green]✓ Connection successful[/green]")
else:
console.print(f"[yellow]⚠ Got status {response.status_code}, but endpoint is reachable[/yellow]")

console.print(
f"[yellow]⚠ Got status {response.status_code}, but endpoint is reachable[/yellow]"
)

except httpx.RequestError as e:
console.print(f"[red]✗ Connection failed: {e}[/red]")
if not test:
console.print("[yellow]Configuration will be saved anyway. Check your endpoint URL and network connection.[/yellow]")

console.print(
"[yellow]Configuration will be saved anyway. Check your endpoint URL and network connection.[/yellow]"
)

if not test:
if not config_path:
config_path = Path.cwd() / "mcp_agent.config.yaml"

config = _load_config(config_path) if config_path.exists() else {}

if "otel" not in config:
config["otel"] = {}

config["otel"]["endpoint"] = endpoint
config["otel"]["headers"] = headers_dict

try:
config_path.parent.mkdir(parents=True, exist_ok=True)
with open(config_path, "w") as f:
yaml.dump(config, f, default_flow_style=False, sort_keys=False)

console.print(Panel(
f"[green]✓ OTEL configuration saved to {config_path}[/green]\n\n"
f"Endpoint: {endpoint}\n"
f"Headers: {len(headers_dict)} configured" + (f" ({', '.join(headers_dict.keys())})" if headers_dict else ""),
title="Configuration Saved",
border_style="green"
))


console.print(
Panel(
f"[green]✓ OTEL configuration saved to {config_path}[/green]\n\n"
f"Endpoint: {endpoint}\n"
f"Headers: {len(headers_dict)} configured"
+ (f" ({', '.join(headers_dict.keys())})" if headers_dict else ""),
title="Configuration Saved",
border_style="green",
)
)

except Exception as e:
console.print(f"[red]Error saving configuration: {e}[/red]")
raise typer.Exit(1)
Expand All @@ -134,4 +150,4 @@ def _load_config(config_path: Path) -> dict:
with open(config_path, "r") as f:
return yaml.safe_load(f) or {}
except Exception as e:
raise CLIError(f"Failed to load config from {config_path}: {e}")
raise CLIError(f"Failed to load config from {config_path}: {e}")
2 changes: 1 addition & 1 deletion src/mcp_agent/cli/cloud/commands/logger/tail/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,4 +2,4 @@

from .main import tail_logs

__all__ = ["tail_logs"]
__all__ = ["tail_logs"]
Loading
Loading