Skip to content
Merged
Show file tree
Hide file tree
Changes from 18 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 0 additions & 1 deletion examples/mcp_agent_server/asyncio/basic_agent_server.py
Original file line number Diff line number Diff line change
Expand Up @@ -188,7 +188,6 @@ async def grade_story(story: str, app_ctx: Optional[AppContext] = None) -> str:
async def grade_story_async(story: str, app_ctx: Optional[AppContext] = None) -> str:
"""
Async variant of grade_story that starts a workflow run and returns IDs.

Args:
story: The student's short story to grade
app_ctx: Optional MCPApp context for accessing app resources and logging
Expand Down
8 changes: 8 additions & 0 deletions examples/mcp_agent_server/temporal/basic_agent_server.py
Original file line number Diff line number Diff line change
Expand Up @@ -59,12 +59,20 @@ async def run(
context = app.context
context.config.mcp.servers["filesystem"].args.extend([os.getcwd()])

# Use of the app.logger will forward logs back to the mcp client
app_logger = app.logger

app_logger.info("Starting finder agent")
async with finder_agent:
finder_llm = await finder_agent.attach_llm(OpenAIAugmentedLLM)

result = await finder_llm.generate_str(
message=input,
)

# forwards the log to the caller
app_logger.info(f"Finder agent completed with result {result}")
# print to the console (for when running locally)
print(f"Agent result: {result}")
return WorkflowResult(value=result)

Expand Down
66 changes: 53 additions & 13 deletions examples/mcp_agent_server/temporal/client.py
Original file line number Diff line number Diff line change
@@ -1,12 +1,17 @@
import asyncio
import json
import time
from mcp.types import CallToolResult
from mcp_agent.app import MCPApp
from mcp_agent.config import MCPServerSettings
from mcp_agent.executor.workflow import WorkflowExecution
from mcp_agent.mcp.gen_client import gen_client

from datetime import timedelta
from anyio.streams.memory import MemoryObjectReceiveStream, MemoryObjectSendStream
from mcp import ClientSession
from mcp_agent.mcp.mcp_agent_client_session import MCPAgentClientSession
from mcp.types import CallToolResult, LoggingMessageNotificationParams

try:
from exceptiongroup import ExceptionGroup as _ExceptionGroup # Python 3.10 backport
except Exception: # pragma: no cover
Expand All @@ -24,21 +29,45 @@ async def main():
logger = client_app.logger
context = client_app.context

# Connect to the workflow server
logger.info("Connecting to workflow server...")

# Override the server configuration to point to our local script
context.server_registry.registry["basic_agent_server"] = MCPServerSettings(
name="basic_agent_server",
description="Local workflow server running the basic agent example",
transport="sse",
url="http://0.0.0.0:8000/sse",
)

# Connect to the workflow server
try:
logger.info("Connecting to workflow server...")

# Override the server configuration to point to our local script
context.server_registry.registry["basic_agent_server"] = MCPServerSettings(
name="basic_agent_server",
description="Local workflow server running the basic agent example",
transport="sse",
url="http://0.0.0.0:8000/sse",
)

Comment on lines +45 to +52
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue

Fix client URL: 0.0.0.0 is a bind address, not a connect target.

Use localhost/127.0.0.1; current value will often fail to connect.

Apply:

             context.server_registry.registry["basic_agent_server"] = MCPServerSettings(
                 name="basic_agent_server",
                 description="Local workflow server running the basic agent example",
                 transport="sse",
-                url="http://0.0.0.0:8000/sse",
+                url="http://127.0.0.1:8000/sse",
             )
📝 Committable suggestion

‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.

Suggested change
# Override the server configuration to point to our local script
context.server_registry.registry["basic_agent_server"] = MCPServerSettings(
name="basic_agent_server",
description="Local workflow server running the basic agent example",
transport="sse",
url="http://0.0.0.0:8000/sse",
)
# Override the server configuration to point to our local script
context.server_registry.registry["basic_agent_server"] = MCPServerSettings(
name="basic_agent_server",
description="Local workflow server running the basic agent example",
transport="sse",
url="http://127.0.0.1:8000/sse",
)
🤖 Prompt for AI Agents
In examples/mcp_agent_server/temporal/client.py around lines 36 to 43, the
client URL is using the bind address "0.0.0.0" which cannot be used as a connect
target; change the URL to a loopback address such as "http://127.0.0.1:8000/sse"
(or "http://localhost:8000/sse") so the client can successfully connect to the
local server, replacing the existing url value only.

# Connect to the workflow server
# Define a logging callback to receive server-side log notifications
async def on_server_log(params: LoggingMessageNotificationParams) -> None:
# Pretty-print server logs locally for demonstration
level = params.level.upper()
name = params.logger or "server"
# params.data can be any JSON-serializable data
print(f"[SERVER LOG] [{level}] [{name}] {params.data}")

# Provide a client session factory that installs our logging callback
def make_session(
read_stream: MemoryObjectReceiveStream,
write_stream: MemoryObjectSendStream,
read_timeout_seconds: timedelta | None,
) -> ClientSession:
return MCPAgentClientSession(
read_stream=read_stream,
write_stream=write_stream,
read_timeout_seconds=read_timeout_seconds,
logging_callback=on_server_log,
)

# Connect to the workflow server
async with gen_client(
"basic_agent_server", context.server_registry
"basic_agent_server",
context.server_registry,
client_session_factory=make_session,
) as server:
# Call the BasicAgentWorkflow
run_result = await server.call_tool(
Expand All @@ -56,6 +85,17 @@ async def main():
f"Started BasicAgentWorkflow-run. workflow ID={execution.workflow_id}, run ID={run_id}"
)

get_status_result = await server.call_tool(
"workflows-BasicAgentWorkflow-get_status",
arguments={"run_id": run_id},
)

execution = WorkflowExecution(**json.loads(run_result.content[0].text))
run_id = execution.run_id
logger.info(
f"Started BasicAgentWorkflow-run. workflow ID={execution.workflow_id}, run ID={run_id}"
)
Comment on lines +105 to +114
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

There appears to be a duplicate code block at lines 105-114 that repeats the same workflow execution processing. The code retrieves get_status_result, creates a WorkflowExecution object, and logs the same message twice in succession. This duplication likely resulted from a copy-paste error during development.

The second block (starting at line 115) already handles the workflow status polling correctly, so the first duplicate block should be removed to prevent:

  • Unnecessary API calls
  • Variable reassignment that has no effect
  • Redundant logging

Removing lines 105-114 would resolve this issue while preserving the intended functionality.

Spotted by Diamond

Fix in Graphite


Is this helpful? React 👍 or 👎 to let us know.

Comment on lines +105 to +114
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue

Duplicate parsing/log line; likely copy/paste error.

You re-parse run_result and log “Started …” a second time right after get_status. Remove the duplicate block.

-                get_status_result = await server.call_tool(
-                    "workflows-BasicAgentWorkflow-get_status",
-                    arguments={"run_id": run_id},
-                )
-
-                execution = WorkflowExecution(**json.loads(run_result.content[0].text))
-                run_id = execution.run_id
-                logger.info(
-                    f"Started BasicAgentWorkflow-run. workflow ID={execution.workflow_id}, run ID={run_id}"
-                )
+                get_status_result = await server.call_tool(
+                    "workflows-BasicAgentWorkflow-get_status",
+                    arguments={"run_id": run_id},
+                )
📝 Committable suggestion

‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.

Suggested change
get_status_result = await server.call_tool(
"workflows-BasicAgentWorkflow-get_status",
arguments={"run_id": run_id},
)
execution = WorkflowExecution(**json.loads(run_result.content[0].text))
run_id = execution.run_id
logger.info(
f"Started BasicAgentWorkflow-run. workflow ID={execution.workflow_id}, run ID={run_id}"
)
get_status_result = await server.call_tool(
"workflows-BasicAgentWorkflow-get_status",
arguments={"run_id": run_id},
)
🤖 Prompt for AI Agents
In examples/mcp_agent_server/temporal/client.py around lines 105 to 114, there
is a duplicate block that re-parses run_result into execution and logs "Started
BasicAgentWorkflow..." immediately after calling get_status; remove that
duplicated parsing/logging block (delete the re-instantiation of
WorkflowExecution from run_result and the subsequent logger.info) and, if you
need workflow status information, parse and/or log from get_status_result
instead so the original start log remains only once.


# Wait for the workflow to complete
while True:
get_status_result = await server.call_tool(
Expand Down
9 changes: 9 additions & 0 deletions src/mcp_agent/app.py
Original file line number Diff line number Diff line change
Expand Up @@ -195,12 +195,14 @@ def logger(self):
try:
if self._context is not None:
self._logger._bound_context = self._context # type: ignore[attr-defined]

except Exception:
pass
else:
# Update the logger's bound context in case upstream_session was set after logger creation
if self._context and hasattr(self._logger, "_bound_context"):
self._logger._bound_context = self._context

return self._logger

async def initialize(self):
Expand Down Expand Up @@ -840,6 +842,13 @@ def decorator(target: Callable[..., R]) -> Callable[..., R]:
)

if task_defn:
# prevent trying to decorate an already decorated function
if hasattr(target, "__temporal_activity_definition"):
self.logger.debug(
f"target {name} has __temporal_activity_definition"
)
return target # Already decorated with @activity

if isinstance(target, MethodType):
self_ref = target.__self__

Expand Down
8 changes: 7 additions & 1 deletion src/mcp_agent/cli/cloud/main.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,13 @@
from rich.panel import Panel
from typer.core import TyperGroup

from mcp_agent.cli.cloud.commands import configure_app, deploy_config, login, logout, whoami
from mcp_agent.cli.cloud.commands import (
configure_app,
deploy_config,
login,
logout,
whoami,
)
from mcp_agent.cli.cloud.commands.app import (
delete_app,
get_app_status,
Expand Down
4 changes: 4 additions & 0 deletions src/mcp_agent/core/context.py
Original file line number Diff line number Diff line change
Expand Up @@ -89,6 +89,10 @@ class Context(BaseModel):
# Token counting and cost tracking
token_counter: Optional[TokenCounter] = None

# Dynamic gateway configuration (per-run overrides via Temporal memo)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think this might make more sense to be done in TemporalSettings instead of the root context

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@rholinshead I think you're thinking of the config, not context. This does need to be a runtime value, though possibly the value could be loaded from a config in the future. IMO this is fine as is.

gateway_url: str | None = None
gateway_token: str | None = None

model_config = ConfigDict(
extra="allow",
arbitrary_types_allowed=True, # Tell Pydantic to defer type evaluation
Expand Down
17 changes: 15 additions & 2 deletions src/mcp_agent/executor/temporal/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -32,10 +32,13 @@

from mcp_agent.config import TemporalSettings
from mcp_agent.executor.executor import Executor, ExecutorConfig, R

from mcp_agent.executor.temporal.workflow_signal import TemporalSignalHandler
from mcp_agent.executor.workflow_signal import SignalHandler
from mcp_agent.logging.logger import get_logger
from mcp_agent.utils.common import unwrap
from mcp_agent.executor.temporal.system_activities import SystemActivities
from mcp_agent.executor.temporal.interceptor import ContextPropagationInterceptor

if TYPE_CHECKING:
from mcp_agent.app import MCPApp
Expand Down Expand Up @@ -263,9 +266,9 @@ async def ensure_client(self):
api_key=self.config.api_key,
tls=self.config.tls,
data_converter=pydantic_data_converter,
interceptors=[TracingInterceptor()]
interceptors=[TracingInterceptor(), ContextPropagationInterceptor()]
if self.context.tracing_enabled
else [],
else [ContextPropagationInterceptor()],
rpc_metadata=self.config.rpc_metadata or {},
)

Expand All @@ -278,6 +281,7 @@ async def start_workflow(
wait_for_result: bool = False,
workflow_id: str | None = None,
task_queue: str | None = None,
workflow_memo: Dict[str, Any] | None = None,
**kwargs: Any,
) -> WorkflowHandle:
"""
Expand Down Expand Up @@ -369,6 +373,7 @@ async def start_workflow(
task_queue=task_queue,
id_reuse_policy=id_reuse_policy,
rpc_metadata=self.config.rpc_metadata or {},
memo=workflow_memo or {},
)
else:
handle: WorkflowHandle = await self.client.start_workflow(
Expand All @@ -377,6 +382,7 @@ async def start_workflow(
task_queue=task_queue,
id_reuse_policy=id_reuse_policy,
rpc_metadata=self.config.rpc_metadata or {},
memo=workflow_memo or {},
)

# Wait for the result if requested
Expand Down Expand Up @@ -497,6 +503,13 @@ async def create_temporal_worker_for_app(app: "MCPApp"):
# Collect activities from the global registry
activity_registry = running_app.context.task_registry

# Register system activities (logging, human input proxy, generic relays)
sys_acts = SystemActivities(context=running_app.context)
app.workflow_task(name="mcp_forward_log")(sys_acts.forward_log)
app.workflow_task(name="mcp_request_user_input")(sys_acts.request_user_input)
app.workflow_task(name="mcp_relay_notify")(sys_acts.relay_notify)
app.workflow_task(name="mcp_relay_request")(sys_acts.relay_request)

for name in activity_registry.list_activities():
activities.append(activity_registry.get_activity(name))

Expand Down
Loading
Loading