Skip to content
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 0 additions & 8 deletions src/mcp_agent/executor/temporal/system_activities.py
Original file line number Diff line number Diff line change
Expand Up @@ -23,11 +23,9 @@ async def forward_log(
message: str,
data: Dict[str, Any] | None = None,
) -> bool:
registry = self.context.server_registry
gateway_url = getattr(self.context, "gateway_url", None)
gateway_token = getattr(self.context, "gateway_token", None)
return await log_via_proxy(
registry,
execution_id=execution_id,
level=level,
namespace=namespace,
Expand All @@ -47,11 +45,9 @@ async def request_user_input(
signal_name: str = "human_input",
) -> Dict[str, Any]:
# Reuse proxy ask API; returns {result} or {error}
registry = self.context.server_registry
gateway_url = getattr(self.context, "gateway_url", None)
gateway_token = getattr(self.context, "gateway_token", None)
return await ask_via_proxy(
registry,
execution_id=execution_id,
prompt=prompt,
metadata={
Expand All @@ -67,11 +63,9 @@ async def request_user_input(
async def relay_notify(
self, execution_id: str, method: str, params: Dict[str, Any] | None = None
) -> bool:
registry = self.context.server_registry
gateway_url = getattr(self.context, "gateway_url", None)
gateway_token = getattr(self.context, "gateway_token", None)
return await notify_via_proxy(
registry,
execution_id=execution_id,
method=method,
params=params or {},
Expand All @@ -83,11 +77,9 @@ async def relay_notify(
async def relay_request(
self, execution_id: str, method: str, params: Dict[str, Any] | None = None
) -> Dict[str, Any]:
registry = self.context.server_registry
gateway_url = getattr(self.context, "gateway_url", None)
gateway_token = getattr(self.context, "gateway_token", None)
return await request_via_proxy(
registry,
execution_id=execution_id,
method=method,
params=params or {},
Expand Down
110 changes: 55 additions & 55 deletions src/mcp_agent/mcp/client_proxy.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,47 +3,53 @@
import os
import httpx

from mcp_agent.mcp.mcp_server_registry import ServerRegistry
from urllib.parse import quote


def _resolve_gateway_url(
server_registry: Optional[ServerRegistry] = None,
server_name: Optional[str] = None,
*,
gateway_url: Optional[str] = None,
context_gateway_url: Optional[str] = None,
) -> str:
"""Resolve the base URL for the MCP gateway.

Precedence:
1) Explicit override (gateway_url parameter)
2) Context-provided URL (context_gateway_url)
3) Environment variable MCP_GATEWAY_URL
4) Fallback to http://127.0.0.1:8000 (dev default)
"""
# Highest precedence: explicit override
if gateway_url:
return gateway_url.rstrip("/")

# Next: context-provided URL (e.g., from Temporal workflow memo)
if context_gateway_url:
return context_gateway_url.rstrip("/")

# Next: environment variable
env_url = os.environ.get("MCP_GATEWAY_URL")
if env_url:
return env_url.rstrip("/")

# Next: a registry entry (if provided)
if server_registry and server_name:
cfg = server_registry.get_server_config(server_name)
if cfg and getattr(cfg, "url", None):
return cfg.url.rstrip("/")

# Fallback: default local server
return "http://127.0.0.1:8000"


async def log_via_proxy(
server_registry: Optional[ServerRegistry],
execution_id: str,
level: str,
namespace: str,
message: str,
data: Dict[str, Any] | None = None,
*,
server_name: Optional[str] = None,
gateway_url: Optional[str] = None,
gateway_token: Optional[str] = None,
) -> bool:
base = _resolve_gateway_url(server_registry, server_name, gateway_url)
async def log_via_proxy(*args, **kwargs) -> bool:
"""Backward-compatible wrapper.

Accepts either:
- legacy: (server_registry, execution_id, level, namespace, message, data=None, *, gateway_url=None, gateway_token=None)
- new: (execution_id, level, namespace, message, data=None, *, gateway_url=None, gateway_token=None)
"""
if args and (args[0] is None or not isinstance(args[0], str)):
args = args[1:]
execution_id, level, namespace, message, *rest = args
data = rest[0] if rest else None
gateway_url = kwargs.get("gateway_url")
gateway_token = kwargs.get("gateway_token")
base = _resolve_gateway_url(gateway_url=gateway_url, context_gateway_url=None)
url = f"{base}/internal/workflows/log"
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🛠️ Refactor suggestion

Honor context_gateway_url in wrappers

Wrappers ignore a passed context-provided URL (e.g., from workflow memo). Plumb it through to match your stated precedence.

-    gateway_url = kwargs.get("gateway_url")
-    gateway_token = kwargs.get("gateway_token")
-    base = _resolve_gateway_url(gateway_url=gateway_url, context_gateway_url=None)
+    gateway_url = kwargs.get("gateway_url")
+    gateway_token = kwargs.get("gateway_token")
+    context_gateway_url = kwargs.get("context_gateway_url")
+    base = _resolve_gateway_url(gateway_url=gateway_url, context_gateway_url=context_gateway_url)
🤖 Prompt for AI Agents
In src/mcp_agent/mcp/client_proxy.py around lines 39 to 53, the wrapper ignores
a context-provided gateway URL; read context_gateway_url from kwargs (e.g.,
kwargs.get("context_gateway_url")) and pass it into _resolve_gateway_url instead
of always passing None so the resolver can apply the intended precedence between
explicit gateway_url and context-provided URL; ensure the wrapper does not drop
the context value when shifting legacy args and that any callers passing
context_gateway_url via kwargs are honored.

headers: Dict[str, str] = {}
tok = gateway_token or os.environ.get("MCP_GATEWAY_TOKEN")
Expand Down Expand Up @@ -74,17 +80,15 @@ async def log_via_proxy(
return bool(resp.get("ok", True))


async def ask_via_proxy(
server_registry: Optional[ServerRegistry],
execution_id: str,
prompt: str,
metadata: Dict[str, Any] | None = None,
*,
server_name: Optional[str] = None,
gateway_url: Optional[str] = None,
gateway_token: Optional[str] = None,
) -> Dict[str, Any]:
base = _resolve_gateway_url(server_registry, server_name, gateway_url)
async def ask_via_proxy(*args, **kwargs) -> Dict[str, Any]:
# legacy: (server_registry, execution_id, prompt, metadata=None, ...)
if args and (args[0] is None or not isinstance(args[0], str)):
args = args[1:]
execution_id, prompt, *rest = args
metadata = rest[0] if rest else None
gateway_url = kwargs.get("gateway_url")
gateway_token = kwargs.get("gateway_token")
base = _resolve_gateway_url(gateway_url=gateway_url, context_gateway_url=None)
url = f"{base}/internal/human/prompts"
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🛠️ Refactor suggestion

Apply the same context URL and timeout fixes here

Mirror the changes from log_via_proxy: pass context_gateway_url and harden timeout parsing.

-    gateway_url = kwargs.get("gateway_url")
-    gateway_token = kwargs.get("gateway_token")
-    base = _resolve_gateway_url(gateway_url=gateway_url, context_gateway_url=None)
+    gateway_url = kwargs.get("gateway_url")
+    gateway_token = kwargs.get("gateway_token")
+    context_gateway_url = kwargs.get("context_gateway_url")
+    base = _resolve_gateway_url(gateway_url=gateway_url, context_gateway_url=context_gateway_url)
@@
-    timeout = float(os.environ.get("MCP_GATEWAY_TIMEOUT", "10"))
+    try:
+        timeout = float(os.environ.get("MCP_GATEWAY_TIMEOUT", "10"))
+    except ValueError:
+        timeout = 10.0
📝 Committable suggestion

‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.

Suggested change
async def ask_via_proxy(*args, **kwargs) -> Dict[str, Any]:
# legacy: (server_registry, execution_id, prompt, metadata=None, ...)
if args and (args[0] is None or not isinstance(args[0], str)):
args = args[1:]
execution_id, prompt, *rest = args
metadata = rest[0] if rest else None
gateway_url = kwargs.get("gateway_url")
gateway_token = kwargs.get("gateway_token")
base = _resolve_gateway_url(gateway_url=gateway_url, context_gateway_url=None)
url = f"{base}/internal/human/prompts"
async def ask_via_proxy(*args, **kwargs) -> Dict[str, Any]:
# legacy: (server_registry, execution_id, prompt, metadata=None, ...)
if args and (args[0] is None or not isinstance(args[0], str)):
args = args[1:]
execution_id, prompt, *rest = args
metadata = rest[0] if rest else None
gateway_url = kwargs.get("gateway_url")
gateway_token = kwargs.get("gateway_token")
context_gateway_url = kwargs.get("context_gateway_url")
base = _resolve_gateway_url(
gateway_url=gateway_url,
context_gateway_url=context_gateway_url
)
url = f"{base}/internal/human/prompts"
try:
timeout = float(os.environ.get("MCP_GATEWAY_TIMEOUT", "10"))
except ValueError:
timeout = 10.0
# …rest of implementation…

headers: Dict[str, str] = {}
tok = gateway_token or os.environ.get("MCP_GATEWAY_TOKEN")
Expand Down Expand Up @@ -112,17 +116,15 @@ async def ask_via_proxy(
return {"error": "invalid_response"}


async def notify_via_proxy(
server_registry: Optional[ServerRegistry],
execution_id: str,
method: str,
params: Dict[str, Any] | None = None,
*,
server_name: Optional[str] = None,
gateway_url: Optional[str] = None,
gateway_token: Optional[str] = None,
) -> bool:
base = _resolve_gateway_url(server_registry, server_name, gateway_url)
async def notify_via_proxy(*args, **kwargs) -> bool:
# legacy: (server_registry, execution_id, method, params=None, ...)
if args and (args[0] is None or not isinstance(args[0], str)):
args = args[1:]
execution_id, method, *rest = args
params = rest[0] if rest else None
gateway_url = kwargs.get("gateway_url")
gateway_token = kwargs.get("gateway_token")
base = _resolve_gateway_url(gateway_url=gateway_url, context_gateway_url=None)
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🛠️ Refactor suggestion

Apply context URL in notify wrapper

-    gateway_url = kwargs.get("gateway_url")
-    gateway_token = kwargs.get("gateway_token")
-    base = _resolve_gateway_url(gateway_url=gateway_url, context_gateway_url=None)
+    gateway_url = kwargs.get("gateway_url")
+    gateway_token = kwargs.get("gateway_token")
+    context_gateway_url = kwargs.get("context_gateway_url")
+    base = _resolve_gateway_url(gateway_url=gateway_url, context_gateway_url=context_gateway_url)
📝 Committable suggestion

‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.

Suggested change
async def notify_via_proxy(*args, **kwargs) -> bool:
# legacy: (server_registry, execution_id, method, params=None, ...)
if args and (args[0] is None or not isinstance(args[0], str)):
args = args[1:]
execution_id, method, *rest = args
params = rest[0] if rest else None
gateway_url = kwargs.get("gateway_url")
gateway_token = kwargs.get("gateway_token")
base = _resolve_gateway_url(gateway_url=gateway_url, context_gateway_url=None)
async def notify_via_proxy(*args, **kwargs) -> bool:
# legacy: (server_registry, execution_id, method, params=None, ...)
if args and (args[0] is None or not isinstance(args[0], str)):
args = args[1:]
execution_id, method, *rest = args
params = rest[0] if rest else None
gateway_url = kwargs.get("gateway_url")
gateway_token = kwargs.get("gateway_token")
context_gateway_url = kwargs.get("context_gateway_url")
base = _resolve_gateway_url(gateway_url=gateway_url, context_gateway_url=context_gateway_url)
🤖 Prompt for AI Agents
In src/mcp_agent/mcp/client_proxy.py around lines 119 to 127, the
notify_via_proxy wrapper always passes context_gateway_url=None into
_resolve_gateway_url, so any provided context URL in kwargs is ignored; update
the wrapper to read context_gateway_url = kwargs.get("context_gateway_url") (or
similarly named key used by callers) and pass that value into
_resolve_gateway_url instead of None, preserving existing
gateway_url/gateway_token handling so resolution uses the provided context when
present.

url = f"{base}/internal/session/by-run/{quote(execution_id, safe='')}/notify"
headers: Dict[str, str] = {}
tok = gateway_token or os.environ.get("MCP_GATEWAY_TOKEN")
Expand All @@ -146,17 +148,15 @@ async def notify_via_proxy(
return bool(resp.get("ok", True))


async def request_via_proxy(
server_registry: Optional[ServerRegistry],
execution_id: str,
method: str,
params: Dict[str, Any] | None = None,
*,
server_name: Optional[str] = None,
gateway_url: Optional[str] = None,
gateway_token: Optional[str] = None,
) -> Dict[str, Any]:
base = _resolve_gateway_url(server_registry, server_name, gateway_url)
async def request_via_proxy(*args, **kwargs) -> Dict[str, Any]:
# legacy: (server_registry, execution_id, method, params=None, ...)
if args and (args[0] is None or not isinstance(args[0], str)):
args = args[1:]
execution_id, method, *rest = args
params = rest[0] if rest else None
gateway_url = kwargs.get("gateway_url")
gateway_token = kwargs.get("gateway_token")
base = _resolve_gateway_url(gateway_url=gateway_url, context_gateway_url=None)
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🛠️ Refactor suggestion

Apply context URL in request wrapper

-    gateway_url = kwargs.get("gateway_url")
-    gateway_token = kwargs.get("gateway_token")
-    base = _resolve_gateway_url(gateway_url=gateway_url, context_gateway_url=None)
+    gateway_url = kwargs.get("gateway_url")
+    gateway_token = kwargs.get("gateway_token")
+    context_gateway_url = kwargs.get("context_gateway_url")
+    base = _resolve_gateway_url(gateway_url=gateway_url, context_gateway_url=context_gateway_url)
📝 Committable suggestion

‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.

Suggested change
async def request_via_proxy(*args, **kwargs) -> Dict[str, Any]:
# legacy: (server_registry, execution_id, method, params=None, ...)
if args and (args[0] is None or not isinstance(args[0], str)):
args = args[1:]
execution_id, method, *rest = args
params = rest[0] if rest else None
gateway_url = kwargs.get("gateway_url")
gateway_token = kwargs.get("gateway_token")
base = _resolve_gateway_url(gateway_url=gateway_url, context_gateway_url=None)
async def request_via_proxy(*args, **kwargs) -> Dict[str, Any]:
# legacy: (server_registry, execution_id, method, params=None, ...)
if args and (args[0] is None or not isinstance(args[0], str)):
args = args[1:]
execution_id, method, *rest = args
params = rest[0] if rest else None
gateway_url = kwargs.get("gateway_url")
gateway_token = kwargs.get("gateway_token")
context_gateway_url = kwargs.get("context_gateway_url")
base = _resolve_gateway_url(
gateway_url=gateway_url,
context_gateway_url=context_gateway_url
)
🤖 Prompt for AI Agents
In src/mcp_agent/mcp/client_proxy.py around lines 151 to 159, the request
wrapper currently calls _resolve_gateway_url with context_gateway_url=None so
any supplied context URL is ignored; update the call to pass the context URL
from the wrapper (e.g., read kwargs.get("context_gateway_url") or the
appropriate context variable) into context_gateway_url so the resolved base
reflects the provided context; ensure you fall back to None only if no context
is supplied.

url = f"{base}/internal/session/by-run/{quote(execution_id, safe='')}/request"
headers: Dict[str, str] = {}
tok = gateway_token or os.environ.get("MCP_GATEWAY_TOKEN")
Expand Down
74 changes: 46 additions & 28 deletions src/mcp_agent/server/app_server.py
Original file line number Diff line number Diff line change
Expand Up @@ -1367,38 +1367,56 @@ async def _workflow_run(
# Build memo for Temporal runs if gateway info is available
workflow_memo = None
try:
# Prefer explicit kwargs, else infer from request headers/environment
# FastMCP keeps raw request under ctx.request_context.request if available
# Prefer explicit kwargs, else infer from request context/headers
gateway_url = kwargs.get("gateway_url")
gateway_token = kwargs.get("gateway_token")

if gateway_url is None:
try:
req = getattr(ctx.request_context, "request", None)
if req is not None:
# Custom header if present
h = req.headers
gateway_url = (
h.get("X-MCP-Gateway-URL")
or h.get("X-Forwarded-Url")
or h.get("X-Forwarded-Proto")
)
# Best-effort reconstruction if only proto/host provided
if gateway_url is None:
proto = h.get("X-Forwarded-Proto") or "http"
host = h.get("X-Forwarded-Host") or h.get("Host")
if host:
gateway_url = f"{proto}://{host}"
except Exception:
pass
req = getattr(ctx.request_context, "request", None)
if req is not None:
h = req.headers
# Highest precedence: caller-provided full base URL
header_url = h.get("X-MCP-Gateway-URL") or h.get("X-Forwarded-Url")
if gateway_url is None and header_url:
gateway_url = header_url

# Token may be provided by the gateway/proxy
if gateway_token is None:
gateway_token = h.get("X-MCP-Gateway-Token")

# Prefer explicit reconstruction from X-Forwarded-* if present
if gateway_url is None and (h.get("X-Forwarded-Host") or h.get("Host")):
proto = h.get("X-Forwarded-Proto") or "http"
host = h.get("X-Forwarded-Host") or h.get("Host")
prefix = h.get("X-Forwarded-Prefix") or ""
if prefix and not prefix.startswith("/"):
prefix = "/" + prefix
if host:
gateway_url = f"{proto}://{host}{prefix}"

# Fallback to request's base_url which already includes scheme/host and any mount prefix
if gateway_url is None:
try:
if getattr(req, "base_url", None):
base_url = str(req.base_url).rstrip("/")
if base_url and base_url.lower() != "none":
gateway_url = base_url
except Exception:
gateway_url = None

if gateway_token is None:
try:
req = getattr(ctx.request_context, "request", None)
if req is not None:
gateway_token = req.headers.get("X-MCP-Gateway-Token")
except Exception:
pass
# Final fallback: environment variables (useful if proxies don't set headers)
try:
import os as _os

if gateway_url is None:
env_url = _os.environ.get("MCP_GATEWAY_URL")
if env_url:
gateway_url = env_url
if gateway_token is None:
env_tok = _os.environ.get("MCP_GATEWAY_TOKEN")
if env_tok:
gateway_token = env_tok
except Exception:
pass

if gateway_url or gateway_token:
workflow_memo = {
Expand Down
12 changes: 6 additions & 6 deletions tests/executor/temporal/test_execution_id_and_interceptor.py
Original file line number Diff line number Diff line change
Expand Up @@ -94,19 +94,19 @@ async def post(self, url, json=None, headers=None):
client_proxy.httpx, "AsyncClient", lambda timeout: Client(rcodes)
)

ok = await client_proxy.log_via_proxy(None, "run", "info", "ns", "msg")
ok = await client_proxy.log_via_proxy("run", "info", "ns", "msg")
assert ok is True
ok = await client_proxy.log_via_proxy(None, "run", "info", "ns", "msg")
ok = await client_proxy.log_via_proxy("run", "info", "ns", "msg")
assert ok is False

# notify ok, then error
ok = await client_proxy.notify_via_proxy(None, "run", "m", {})
ok = await client_proxy.notify_via_proxy("run", "m", {})
assert ok is True
ok = await client_proxy.notify_via_proxy(None, "run", "m", {})
ok = await client_proxy.notify_via_proxy("run", "m", {})
assert ok is False

# request ok, then error
res = await client_proxy.request_via_proxy(None, "run", "m", {})
res = await client_proxy.request_via_proxy("run", "m", {})
assert isinstance(res, dict) and res.get("ok", True) in (True,)
res = await client_proxy.request_via_proxy(None, "run", "m", {})
res = await client_proxy.request_via_proxy("run", "m", {})
assert isinstance(res, dict) and "error" in res
Loading
Loading