Skip to content

Conversation

saqadri
Copy link
Collaborator

@saqadri saqadri commented Sep 13, 2025

After many fixes, we see in prod logs that /internal/session/by-run/{run_id}/notify requests are finally 200 OK all the way through (gateway → app: 200 “uvicorn”, content-length 11), but the client still shows no logs/notifications.

  • No more 401s or 503 “session_not_available”; authentication works and the app route accepts the request.

Hypothesis from logs: the app is writing to a stale upstream ServerSession (from an earlier client connection). Locally this doesn’t reproduce due to single-process/stable connection.

Changes

  • Rebind mapping on client calls:

    • On workflows-get_status, workflows-resume, workflows-cancel, re-register the current ctx.session to the workflow’s run_id/execution_id. Keeps the mapping fresh after reconnects.
  • Best-effort fallback on internal routes:

    • If a notify/request/log arrives without a mapped session, use app.context.upstream_session as a fallback and register it for this execution_id. Also tries fallback if primary send raises.
  • Additional logging temporarily so we can diagnose what's going on

  • If a worker notify arrives before any rebinding, the fallback uses the latest upstream session captured by the app and registers it.

  • Detailed logs make it clear which session_id handled a message, when the map was missing, and when messages were dropped.

Summary by CodeRabbit

  • New Features

    • Best-effort session recovery when a session isn’t mapped.
    • Idempotency for notifications to deduplicate repeated events.
  • Improvements

    • More resilient request/notify handling with graceful fallbacks.
    • Automatic session binding during status, resume, and cancel operations.
    • Expanded debug/info logs for better observability of inbound/outbound flows.
  • Bug Fixes

    • Fewer “session not available” errors via fallback binding and recovery.
  • Chores

    • Example client now runs the main server entry point by default.

Copy link

coderabbitai bot commented Sep 13, 2025

Note

Other AI code review bot(s) detected

CodeRabbit has detected other AI code review bot(s) in this pull request and will avoid duplicating their findings in the review comments. This may lead to a less comprehensive review.

Walkthrough

Adds a client example change to run main.py. In the server, implements best-effort upstream session fallback when mapped sessions are missing, idempotency tracking for notify, extra logging, and session re-binding for workflow status/resume/cancel paths. Public APIs unchanged.

Changes

Cohort / File(s) Summary
Example client run target
examples/mcp_agent_server/asyncio/client.py
Changes local server startup args to invoke main.py instead of basic_agent_server.py; no other logic changes.
Server session fallback, idempotency, logging
src/mcp_agent/server/app_server.py
Adds best-effort fallback to app.context.upstream_session when mapped session is missing; registers/binds fallback session to run_id/execution_id; introduces idempotency tracking for notify; augments debug/info logging across request/notify/log paths; re-binds session on get_workflow_status/resume_workflow/cancel_workflow; preserves public signatures.

Sequence Diagram(s)

sequenceDiagram
  autonumber
  participant Client
  participant AppServer
  participant Registry as SessionRegistry
  participant Upstream as UpstreamSession

  rect rgb(240,248,255)
  note over Client,AppServer: notify with idempotency and session fallback
  Client->>AppServer: notify(execution_id, idempotency_key, payload)
  AppServer->>AppServer: check idempotency set
  alt duplicate idempotency_key
    AppServer-->>Client: ack (idempotent=true)
  else first-seen
    AppServer->>Registry: lookup session by execution_id/run_id
    alt session found
      AppServer->>Upstream: forward notify(payload)
      Upstream-->>AppServer: ack
      AppServer-->>Client: ack
    else session missing
      AppServer->>AppServer: _get_fallback_upstream_session()
      alt fallback available
        AppServer->>Registry: _register_session(execution_id/run_id, fallback)
        AppServer->>Upstream: forward notify(payload)
        Upstream-->>AppServer: ack
        AppServer-->>Client: ack (fallback_used=true)
      else no fallback
        AppServer-->>Client: error 503 session_not_available
      end
    end
  end
  end
Loading
sequenceDiagram
  autonumber
  participant Client
  participant AppServer
  participant Registry
  participant Upstream

  note over Client,AppServer: request with best-effort fallback
  Client->>AppServer: request(execution_id/run_id, data)
  AppServer->>Registry: lookup session
  alt found
    AppServer->>Upstream: forward request
    Upstream-->>AppServer: response
    AppServer-->>Client: response
  else missing
    AppServer->>AppServer: _get_fallback_upstream_session()
    alt fallback available
      AppServer->>Registry: _register_session(...)
      AppServer->>Upstream: forward request
      Upstream-->>AppServer: response
      AppServer-->>Client: response (fallback_used=true)
    else none
      AppServer-->>Client: error 503 session_not_available
    end
  end
Loading

Estimated code review effort

🎯 4 (Complex) | ⏱️ ~55 minutes

Possibly related PRs

  • Fix: Fix Up Workflow Commands #466 — Adds server-side fallback session binding and session propagation for workflow control and idempotency, touching complementary control-flow paths in app_server.py.

Suggested reviewers

  • roman-van-der-krogt
  • rholinshead

Poem

A rabbit taps logs with a whisker’s grace,
Binds lost sessions back into place.
Idempotent hops, no duplicate trails,
Fallback burrows catch wayward emails.
Upstream and client—now hand in paw—hop on their merry rails. 🐇✨

Pre-merge checks and finishing touches

❌ Failed checks (1 warning)
Check name Status Explanation Resolution
Docstring Coverage ⚠️ Warning Docstring coverage is 38.46% which is insufficient. The required threshold is 80.00%. You can run @coderabbitai generate docstrings to improve docstring coverage.
✅ Passed checks (2 passed)
Check name Status Explanation
Description Check ✅ Passed Check skipped - CodeRabbit’s high-level summary is enabled.
Title Check ✅ Passed The title "Fix upstream session binding in app server" accurately and concisely summarizes the primary change in the PR—addressing upstream session binding and related fallback/rebind behavior in the app server. It is a short, single sentence that maps directly to the PR objectives (rebind mapping, fallback upstream_session, enhanced logging) without extraneous details. This phrasing is clear and specific enough for a teammate scanning the history to understand the main intent.
✨ Finishing touches
  • 📝 Generate Docstrings
🧪 Generate unit tests
  • Create PR with unit tests
  • Post copyable unit tests in a comment
  • Commit unit tests in branch fix/server_to_client_notifications

Tip

👮 Agentic pre-merge checks are now available in preview!

Pro plan users can now enable pre-merge checks in their settings to enforce checklists before merging PRs.

  • Built-in checks – Quickly apply ready-made checks to enforce title conventions, require pull request descriptions that follow templates, validate linked issues for compliance, and more.
  • Custom agentic checks – Define your own rules using CodeRabbit’s advanced agentic capabilities to enforce organization-specific policies and workflows. For example, you can instruct CodeRabbit’s agent to verify that API documentation is updated whenever API schema files are modified in a PR. Note: Upto 5 custom checks are currently allowed during the preview period. Pricing for this feature will be announced in a few weeks.

Please see the documentation for more information.

Example:

reviews:
  pre_merge_checks:
    custom_checks:
      - name: "Undocumented Breaking Changes"
        mode: "warning"
        instructions: |
          Pass/fail criteria: All breaking changes to public APIs, CLI flags, environment variables, configuration keys, database schemas, or HTTP/GraphQL endpoints must be documented in the "Breaking Change" section of the PR description and in CHANGELOG.md. Exclude purely internal or private changes (e.g., code not exported from package entry points or explicitly marked as internal).

Please share your feedback with us on this Discord post.


Thanks for using CodeRabbit! It's free for OSS, and your support helps us grow. If you like it, consider giving us a shout-out.

❤️ Share

Comment @coderabbitai help to get the list of available commands and usage tips.

Comment on lines +452 to +456
await _register_session(
run_id=execution_id,
execution_id=execution_id,
session=latest_session,
)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Logic error: Using execution_id as run_id parameter. The function signature is _register_session(run_id, execution_id, session), but here execution_id is passed for both parameters. This will create incorrect mappings in _RUN_EXECUTION_ID_REGISTRY where execution_id maps to itself instead of run_id mapping to execution_id, breaking the session lookup logic.

Suggested change
await _register_session(
run_id=execution_id,
execution_id=execution_id,
session=latest_session,
)
await _register_session(
run_id=run_id,
execution_id=execution_id,
session=latest_session,
)

Spotted by Diamond

Fix in Graphite


Is this helpful? React 👍 or 👎 to let us know.

Comment on lines +574 to +578
await _register_session(
run_id=execution_id,
execution_id=execution_id,
session=latest_session,
)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Logic error: Using execution_id as run_id parameter in _register_session call. This creates incorrect registry mappings and will break session resolution for subsequent requests that rely on proper run_id to execution_id mapping.

Suggested change
await _register_session(
run_id=execution_id,
execution_id=execution_id,
session=latest_session,
)
await _register_session(
run_id=run_id,
execution_id=execution_id,
session=latest_session,
)

Spotted by Diamond

Fix in Graphite


Is this helpful? React 👍 or 👎 to let us know.

Comment on lines +620 to +624
await _register_session(
run_id=execution_id,
execution_id=execution_id,
session=latest_session,
)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Logic error: Using execution_id as run_id parameter in _register_session call. Same incorrect mapping issue as previous instances.

Suggested change
await _register_session(
run_id=execution_id,
execution_id=execution_id,
session=latest_session,
)
await _register_session(
run_id=run_id,
execution_id=execution_id,
session=latest_session,
)

Spotted by Diamond

Fix in Graphite


Is this helpful? React 👍 or 👎 to let us know.

Comment on lines +654 to +658
await _register_session(
run_id=execution_id,
execution_id=execution_id,
session=latest_session,
)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Logic error: Using execution_id as run_id parameter in _register_session call. Same incorrect mapping issue as previous instances.

Suggested change
await _register_session(
run_id=execution_id,
execution_id=execution_id,
session=latest_session,
)
await _register_session(
run_id=run_id,
execution_id=execution_id,
session=latest_session,
)

Spotted by Diamond

Fix in Graphite


Is this helpful? React 👍 or 👎 to let us know.

Comment on lines +910 to +914
await _register_session(
run_id=execution_id,
execution_id=execution_id,
session=latest_session,
)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Logic error: Using execution_id as run_id parameter in _register_session call. Same incorrect mapping issue as previous instances.

Suggested change
await _register_session(
run_id=execution_id,
execution_id=execution_id,
session=latest_session,
)
await _register_session(
run_id=run_id,
execution_id=execution_id,
session=latest_session,
)

Spotted by Diamond

Fix in Graphite


Is this helpful? React 👍 or 👎 to let us know.

Copy link

@coderabbitai coderabbitai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 1

Caution

Some comments are outside the diff and can’t be posted inline due to platform limitations.

⚠️ Outside diff range comments (2)
src/mcp_agent/server/app_server.py (2)

48-56: Do not register fallback with run_id=execution_id — corrupts mapping and breaks cleanup.

In the fallback branches you call _register_session(run_id=execution_id, execution_id=execution_id, ...). If run_id ≠ execution_id (common when you separate temporal run_id vs internal execution_id), this pollutes _RUN_EXECUTION_ID_REGISTRY and prevents proper lookups and cleanup in _unregister_session(run_id). Fix by:

  • Allowing run_id to be optional in _register_session and only updating the run→exec map when you actually know run_id.
  • Looking up run_id by execution_id when possible.
  • Clearing idempotency state on unregister.

Apply these diffs:

@@
-async def _register_session(run_id: str, execution_id: str, session: Any) -> None:
-    async with _RUN_SESSION_LOCK:
-        _RUN_SESSION_REGISTRY[execution_id] = session
-        _RUN_EXECUTION_ID_REGISTRY[run_id] = execution_id
-        try:
-            logger.debug(
-                f"Registered upstream session for run_id={run_id}, execution_id={execution_id}, session_id={id(session)}"
-            )
-        except Exception:
-            pass
+async def _register_session(*, execution_id: str, session: Any, run_id: str | None = None) -> None:
+    async with _RUN_SESSION_LOCK:
+        _RUN_SESSION_REGISTRY[execution_id] = session
+        if run_id:
+            _RUN_EXECUTION_ID_REGISTRY[run_id] = execution_id
+    try:
+        logger.debug(
+            f"Registered upstream session for execution_id={execution_id}, run_id={run_id or '<unknown>'}, session_id={id(session)}"
+        )
+    except Exception:
+        pass
@@
 async def _unregister_session(run_id: str) -> None:
     async with _RUN_SESSION_LOCK:
         execution_id = _RUN_EXECUTION_ID_REGISTRY.pop(run_id, None)
         if execution_id:
             _RUN_SESSION_REGISTRY.pop(execution_id, None)
-            try:
-                logger.debug(
-                    f"Unregistered upstream session mapping for run_id={run_id}, execution_id={execution_id}"
-                )
-            except Exception:
-                pass
+    # Also clear any idempotency keys recorded for this execution
+    if execution_id:
+        async with _IDEMPOTENCY_KEYS_LOCK:
+            _IDEMPOTENCY_KEYS_SEEN.pop(execution_id, None)
+    try:
+        logger.debug(
+            f"Unregistered upstream session mapping for run_id={run_id}, execution_id={execution_id}"
+        )
+    except Exception:
+        pass

Add helper (place near other registry helpers):

async def _lookup_run_id_by_execution_id(execution_id: str) -> str | None:
    async with _RUN_SESSION_LOCK:
        for rid, eid in _RUN_EXECUTION_ID_REGISTRY.items():
            if eid == execution_id:
                return rid
    return None

Update fallback call sites to avoid forging the run→exec mapping:

@@ def _relay_notify(request: Request):
-                        await _register_session(
-                            run_id=execution_id,
-                            execution_id=execution_id,
-                            session=fallback,
-                        )
+                        run_id_for_exec = await _lookup_run_id_by_execution_id(execution_id)
+                        await _register_session(
+                            execution_id=execution_id,
+                            session=fallback,
+                            run_id=run_id_for_exec,
+                        )
@@ def _relay_request(request: Request):
-                        await _register_session(
-                            run_id=execution_id,
-                            execution_id=execution_id,
-                            session=fallback,
-                        )
+                        run_id_for_exec = await _lookup_run_id_by_execution_id(execution_id)
+                        await _register_session(
+                            execution_id=execution_id,
+                            session=fallback,
+                            run_id=run_id_for_exec,
+                        )
@@ def _internal_workflows_log(request: Request):
-                        await _register_session(
-                            run_id=execution_id,
-                            execution_id=execution_id,
-                            session=fallback,
-                        )
+                        run_id_for_exec = await _lookup_run_id_by_execution_id(execution_id)
+                        await _register_session(
+                            execution_id=execution_id,
+                            session=fallback,
+                            run_id=run_id_for_exec,
+                        )

Also applies to: 60-71, 407-416, 551-559, 676-684


360-394: Fail startup or emit a high-severity startup ERROR when MCP_GATEWAY_TOKEN is unset — internal routes are currently unauthenticated.

app_server only enforces the gateway shared-secret when MCP_GATEWAY_TOKEN is present; client code only sends X-MCP-Gateway-Token when a token exists. Evidence: src/mcp_agent/server/app_server.py (gw_token checks at ~lines 376–380, 652–656, 724–728; env fallback at ~1646–1650), src/mcp_agent/mcp/client_proxy.py (adds header when token present at ~lines 52, 92, 129, 162), tests set the token at tests/server/test_app_server_memo.py:95.

Action (pick one):

  • Required for production: fail-fast — refuse to start in non-dev environments if MCP_GATEWAY_TOKEN is not set.
  • If failing is unacceptable: emit a clear, high-severity startup ERROR + documentation calling out the risk, and restrict internal endpoints (e.g., bind to localhost or add an explicit allowlist) until a token is configured.
🧹 Nitpick comments (3)
examples/mcp_agent_server/asyncio/client.py (1)

50-55: Name/description drift (nit).

Registry name is "basic_agent_server" and description mentions the basic agent, but entrypoint is now main.py. Consider aligning these labels to avoid confusion when multiple servers are registered.

src/mcp_agent/server/app_server.py (2)

395-403: Idempotency key set can grow unbounded; at least clear per-execution on unregister.

The proposed _unregister_session change above removes keys for execution_id. Consider adding a small TTL/LRU per execution_id if keys are high-cardinality.

I can add a simple TTL-based ring buffer for _IDEMPOTENCY_KEYS_SEEN if desired.


52-57: Logging inside the critical section (minor).

The debug logs are inside the lock. Move logging after releasing the lock to reduce contention on high-throughput paths.

Also applies to: 66-71, 75-85

📜 Review details

Configuration used: CodeRabbit UI

Review profile: CHILL

Plan: Pro

📥 Commits

Reviewing files that changed from the base of the PR and between 4502f71 and e2f0848.

📒 Files selected for processing (2)
  • examples/mcp_agent_server/asyncio/client.py (1 hunks)
  • src/mcp_agent/server/app_server.py (14 hunks)
🧰 Additional context used
🧠 Learnings (1)
📚 Learning: 2025-08-28T15:07:10.015Z
Learnt from: saqadri
PR: lastmile-ai/mcp-agent#386
File: src/mcp_agent/mcp/mcp_server_registry.py:110-116
Timestamp: 2025-08-28T15:07:10.015Z
Learning: In MCP server registry methods, when client_session_factory parameters are updated to accept additional context parameters, ensure the type hints match what is actually passed (Context instance vs ServerSession) and that the default factory (MCPAgentClientSession) can handle the number of arguments being passed to avoid TypeError at runtime.

Applied to files:

  • src/mcp_agent/server/app_server.py
🧬 Code graph analysis (1)
src/mcp_agent/server/app_server.py (4)
src/mcp_agent/app.py (3)
  • logger (193-210)
  • MCPApp (42-965)
  • context (149-154)
src/mcp_agent/executor/temporal/session_proxy.py (5)
  • notify (77-96)
  • notify (303-304)
  • send_log_message (158-193)
  • request (98-113)
  • request (306-309)
src/mcp_agent/logging/listeners.py (1)
  • send_log_message (20-26)
tests/logging/test_upstream_logging.py (1)
  • send_log_message (15-23)
🔇 Additional comments (4)
examples/mcp_agent_server/asyncio/client.py (1)

44-55: Ensure uv can resolve main.py — set explicit path or cwd

main.py exists at examples/mcp_agent_server/asyncio/main.py but the registry registers args ["run","main.py"]. Either change to ["run","examples/mcp_agent_server/asyncio/main.py"] or set MCPServerSettings(..., cwd="examples/mcp_agent_server/asyncio").

src/mcp_agent/server/app_server.py (3)

962-971: Rebind on get_status is a solid mitigation.

Re-registering ctx.session to the run’s execution mapping opportunistically should eliminate stale-session drops after reconnects.


1007-1015: Rebind on resume/cancel looks good.

Keeps the run→session mapping fresh on control-path calls.

Also applies to: 1072-1080


347-359: No change required — no session health flag to check and fallback is already best-effort.

Context.upstream_session is a ServerSession (src/mcp_agent/core/context.py); SessionProxy (src/mcp_agent/executor/temporal/session_proxy.py) does not expose is_closed/closed/is_connected, and app_server handlers already wrap calls in try/except and treat fallback/closed sessions as best‑effort, so keep the helper as-is.

Copy link

@coderabbitai coderabbitai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 1

Caution

Some comments are outside the diff and can’t be posted inline due to platform limitations.

⚠️ Outside diff range comments (1)
src/mcp_agent/server/app_server.py (1)

881-930: Human prompts path: prefer mapped session first; only fallback to latest if unmapped or mapped fails.

Reorder to mapped→fallback and fix binding as above to prevent routing prompts to an unrelated client session.

♻️ Duplicate comments (4)
src/mcp_agent/server/app_server.py (4)

452-456: Critical: incorrect run→execution binding (run_id mistakenly set to execution_id).

Passing run_id=execution_id corrupts _RUN_EXECUTION_ID_REGISTRY and breaks later lookups by run_id. Use the actual run_id if known, or only bind by execution when run_id is unknown. This issue appears at all the listed callsites.

Apply this minimal fix pattern (example for notify path), and mirror it at the other callsites:

-                        await _register_session(
-                            run_id=execution_id,
-                            execution_id=execution_id,
-                            session=latest_session,
-                        )
+                        # Best-effort: resolve run_id; if unknown, only bind by execution
+                        run_id_resolved = await _find_run_id_for_execution(execution_id)  # type: ignore[name-defined]
+                        if run_id_resolved:
+                            await _register_session(
+                                run_id=run_id_resolved,
+                                execution_id=execution_id,
+                                session=latest_session,
+                            )
+                        else:
+                            await _register_session_for_execution(  # type: ignore[name-defined]
+                                execution_id=execution_id,
+                                session=latest_session,
+                            )

Add these helpers near the other registry helpers:

+async def _register_session_for_execution(execution_id: str, session: Any) -> None:
+    async with _RUN_SESSION_LOCK:
+        _RUN_SESSION_REGISTRY[execution_id] = session
+    try:
+        logger.debug(
+            f"Registered upstream session for execution_id={execution_id}, session_id={id(session)}"
+        )
+    except Exception:
+        pass
+
+async def _find_run_id_for_execution(execution_id: str) -> str | None:
+    async with _RUN_SESSION_LOCK:
+        for run_id, exec_id in _RUN_EXECUTION_ID_REGISTRY.items():
+            if exec_id == execution_id:
+                return run_id
+    return None

Also applies to: 574-584, 598-604, 620-626, 637-643, 654-660, 803-807, 910-914


563-664: Request path: same ordering and binding issues as notify.

Try mapped session first; if unavailable or it errors, fallback to latest. Fix the same _register_session misuse here.

Apply analogous changes as in notify, for both the generic rpc.request path and the typed send_request fallbacks. Ensure the post-delivery binding uses run_id_resolved or *_for_execution(...) as shown above.

-            # Prefer latest upstream session first
+            # Prefer mapped session first, then fallback to latest on miss/failure

Also replace all occurrences of:

await _register_session(run_id=execution_id, execution_id=execution_id, session=latest_session)

with the resolution pattern using _find_run_id_for_execution(...) or _register_session_for_execution(...).

Also applies to: 669-677, 679-749


786-818: Log path: prefer mapped session first and correct binding.

Mirror the notify/request fixes here to avoid sending logs to the wrong client, and correct the _register_session misuse.

Also applies to: 819-827, 831-847


452-456: Fix incorrect _register_session parameter: replace run_id=execution_id with the actual run_id

Multiple calls are passing execution_id into the run_id parameter, breaking the run→execution mapping. Replace run_id=execution_id with the correct run id (e.g. run_id=run_id or run_id=execution.run_id) at:

  • src/mcp_agent/server/app_server.py:452-456
  • src/mcp_agent/server/app_server.py:574-578
  • src/mcp_agent/server/app_server.py:598-603
  • src/mcp_agent/server/app_server.py:620-624
  • src/mcp_agent/server/app_server.py:637-641
  • src/mcp_agent/server/app_server.py:654-658
  • src/mcp_agent/server/app_server.py:803-807
  • src/mcp_agent/server/app_server.py:910-914

Existing _register_session calls at ~1135, ~1179, ~1244 and ~1844 already use the correct run_id and should be left as-is.

🧹 Nitpick comments (5)
src/mcp_agent/server/app_server.py (5)

1131-1139: Guard rebind to avoid writing run_id→run_id when execution_id is unknown.

Today, if _RUN_EXECUTION_ID_REGISTRY lacks run_id, you bind (run_id→run_id) and store session under execution_id=run_id, which can be misleading. Suggest guarding and only rebinding when exec_id exists.

-            if sess and run_id:
-                exec_id = _RUN_EXECUTION_ID_REGISTRY.get(run_id, run_id)
-                await _register_session(
-                    run_id=run_id, execution_id=exec_id, session=sess
-                )
+            if sess and run_id:
+                exec_id = _RUN_EXECUTION_ID_REGISTRY.get(run_id)
+                if exec_id:
+                    await _register_session(run_id=run_id, execution_id=exec_id, session=sess)

Also applies to: 1175-1183, 1241-1248


60-70: Optional: clean up idempotency keys (and pending prompts) on unregister.

To prevent unbounded growth, clear _IDEMPOTENCY_KEYS_SEEN[execution_id] and any _PENDING_PROMPTS entries for that execution on terminal states.

         if execution_id:
             _RUN_SESSION_REGISTRY.pop(execution_id, None)
+            _IDEMPOTENCY_KEYS_SEEN.pop(execution_id, None)
+            # Best-effort purge of pending prompts for this execution
+            try:
+                async with _PENDING_PROMPTS_LOCK:
+                    for k in [k for k, v in _PENDING_PROMPTS.items() if v.get("execution_id") == execution_id]:
+                        _PENDING_PROMPTS.pop(k, None)
+            except Exception:
+                pass

360-365: Route name/param mismatch: “by-run” path uses {execution_id}.

Either rename the path to by-execution or change the param to {run_id} for clarity. If changing the path risks breakage, consider registering both routes to keep compatibility.


347-359: Helper looks good. Consider caching the attribute lookup on hot paths.

Micro-nit: store app_obj = _get_attached_app(mcp_server) once per request path segment if this becomes hot; current cost is tiny though.


396-403: Idempotency handling LGTM; consider TTL/LRU later.

Works as-is; optional TTL-based eviction can be added if high churn is expected.

📜 Review details

Configuration used: CodeRabbit UI

Review profile: CHILL

Plan: Pro

📥 Commits

Reviewing files that changed from the base of the PR and between e2f0848 and d26a68b.

📒 Files selected for processing (1)
  • src/mcp_agent/server/app_server.py (15 hunks)
🧰 Additional context used
🧠 Learnings (1)
📚 Learning: 2025-08-28T15:07:10.015Z
Learnt from: saqadri
PR: lastmile-ai/mcp-agent#386
File: src/mcp_agent/mcp/mcp_server_registry.py:110-116
Timestamp: 2025-08-28T15:07:10.015Z
Learning: In MCP server registry methods, when client_session_factory parameters are updated to accept additional context parameters, ensure the type hints match what is actually passed (Context instance vs ServerSession) and that the default factory (MCPAgentClientSession) can handle the number of arguments being passed to avoid TypeError at runtime.

Applied to files:

  • src/mcp_agent/server/app_server.py
🧬 Code graph analysis (1)
src/mcp_agent/server/app_server.py (2)
src/mcp_agent/app.py (2)
  • logger (193-210)
  • context (149-154)
src/mcp_agent/executor/temporal/session_proxy.py (7)
  • send_log_message (158-193)
  • send_progress_notification (195-213)
  • notify (77-96)
  • notify (303-304)
  • request (98-113)
  • request (306-309)
  • send_request (135-156)
🔇 Additional comments (5)
src/mcp_agent/server/app_server.py (5)

52-58: Logging additions LGTM.

Non-intrusive, wrapped in try/except, and helpful for tracing.

Also applies to: 65-71, 75-85


981-999: Dynamic logging level handler registration LGTM.

Best-effort guards are appropriate here.


1842-1856: Initial run binding is correct.

Uses run_id and execution_id appropriately.


1891-1901: Nice: cleanup mapping on terminal states.

Complements the proposed idempotency/prompt cleanup.


1-344: General: thoughtful context/session propagation.

Good use of ctx.session capture and propagation via app.context.upstream_session.

Comment on lines +404 to +467
# Prefer latest upstream session first
latest_session = _get_fallback_upstream_session()
tried_latest = False
if latest_session is not None:
tried_latest = True
try:
if method == "notifications/message":
level = str(params.get("level", "info"))
data = params.get("data")
logger_name = params.get("logger")
related_request_id = params.get("related_request_id")
await latest_session.send_log_message( # type: ignore[attr-defined]
level=level, # type: ignore[arg-type]
data=data,
logger=logger_name,
related_request_id=related_request_id,
)
logger.debug(
f"[notify] delivered via latest session_id={id(latest_session)} (message)"
)
elif method == "notifications/progress":
progress_token = params.get("progressToken")
progress = params.get("progress")
total = params.get("total")
message = params.get("message")
await latest_session.send_progress_notification( # type: ignore[attr-defined]
progress_token=progress_token,
progress=progress,
total=total,
message=message,
)
logger.debug(
f"[notify] delivered via latest session_id={id(latest_session)} (progress)"
)
else:
rpc = getattr(latest_session, "rpc", None)
if rpc and hasattr(rpc, "notify"):
await rpc.notify(method, params)
logger.debug(
f"[notify] delivered via latest session_id={id(latest_session)} (generic '{method}')"
)
else:
return JSONResponse(
{"ok": False, "error": f"unsupported method: {method}"},
status_code=400,
)
# Successful with latest → bind mapping for consistency
try:
await _register_session(
run_id=execution_id,
execution_id=execution_id,
session=latest_session,
)
logger.info(
f"[notify] rebound mapping to latest session_id={id(latest_session)} for execution_id={execution_id}"
)
except Exception:
pass
return JSONResponse({"ok": True})
except Exception as e_latest:
logger.warning(
f"[notify] latest session delivery failed for execution_id={execution_id}: {e_latest}"
)

Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🛠️ Refactor suggestion

Route logic order should prefer mapped session, then fallback; also attempt fallback on send failure.

Current code uses latest upstream first and drops on error, which can misroute to the wrong client and contradicts the stated fallback behavior.

Apply this restructuring (preserves method-specific handling and adds proper fallback):

-            # Prefer latest upstream session first
-            latest_session = _get_fallback_upstream_session()
-            tried_latest = False
-            if latest_session is not None:
-                tried_latest = True
-                try:
-                    ...
-                    # Successful with latest → bind mapping for consistency
-                    try:
-                        await _register_session(
-                            run_id=execution_id,
-                            execution_id=execution_id,
-                            session=latest_session,
-                        )
-                        logger.info(
-                            f"[notify] rebound mapping to latest session_id={id(latest_session)} for execution_id={execution_id}"
-                        )
-                    except Exception:
-                        pass
-                    return JSONResponse({"ok": True})
-                except Exception as e_latest:
-                    logger.warning(
-                        f"[notify] latest session delivery failed for execution_id={execution_id}: {e_latest}"
-                    )
-
-            # Fallback to mapped session
-            mapped_session = await _get_session(execution_id)
-            if not mapped_session:
-                logger.warning(
-                    f"[notify] session_not_available for execution_id={execution_id} (tried_latest={tried_latest})"
-                )
-                return JSONResponse(
-                    {"ok": False, "error": "session_not_available"}, status_code=503
-                )
+            # 1) Try mapped session first
+            mapped_session = await _get_session(execution_id)
+            latest_session = _get_fallback_upstream_session()
+            if not mapped_session and not latest_session:
+                logger.warning(f"[notify] session_not_available for execution_id={execution_id}")
+                return JSONResponse({"ok": False, "error": "session_not_available"}, status_code=503)
             try:
                 if method == "notifications/message":
                     level = str(params.get("level", "info"))
                     data = params.get("data")
                     logger_name = params.get("logger")
                     related_request_id = params.get("related_request_id")
-                    await mapped_session.send_log_message(  # type: ignore[attr-defined]
+                    await (mapped_session or latest_session).send_log_message(  # type: ignore[attr-defined]
                         level=level,  # type: ignore[arg-type]
                         data=data,
                         logger=logger_name,
                         related_request_id=related_request_id,
                     )
-                    logger.debug(
-                        f"[notify] delivered via mapped session_id={id(mapped_session)} (message)"
-                    )
+                    logger.debug(f"[notify] delivered via "
+                                 f"{'mapped' if mapped_session else 'latest'} session_id="
+                                 f"{id(mapped_session or latest_session)} (message)")
                 elif method == "notifications/progress":
                     progress_token = params.get("progressToken")
                     progress = params.get("progress")
                     total = params.get("total")
                     message = params.get("message")
-                    await mapped_session.send_progress_notification(  # type: ignore[attr-defined]
+                    await (mapped_session or latest_session).send_progress_notification(  # type: ignore[attr-defined]
                         progress_token=progress_token,
                         progress=progress,
                         total=total,
                         message=message,
                     )
-                    logger.debug(
-                        f"[notify] delivered via mapped session_id={id(mapped_session)} (progress)"
-                    )
+                    logger.debug(f"[notify] delivered via "
+                                 f"{'mapped' if mapped_session else 'latest'} session_id="
+                                 f"{id(mapped_session or latest_session)} (progress)")
                 else:
-                    rpc = getattr(mapped_session, "rpc", None)
+                    rpc = getattr((mapped_session or latest_session), "rpc", None)
                     if rpc and hasattr(rpc, "notify"):
                         await rpc.notify(method, params)
-                        logger.debug(
-                            f"[notify] delivered via mapped session_id={id(mapped_session)} (generic '{method}')"
-                        )
+                        logger.debug(f"[notify] delivered via "
+                                     f"{'mapped' if mapped_session else 'latest'} session_id="
+                                     f"{id(mapped_session or latest_session)} (generic '{method}')")
                     else:
                         return JSONResponse(
                             {"ok": False, "error": f"unsupported method: {method}"},
                             status_code=400,
                         )
-                return JSONResponse({"ok": True})
+                # Post-delivery: (re)bind mapping for consistency
+                try:
+                    chosen = mapped_session or latest_session
+                    if chosen is latest_session:
+                        # We only know execution_id here; bind by execution and, if possible, by run
+                        await _register_session_for_execution(execution_id, chosen)  # type: ignore[name-defined]
+                        run_id_resolved = await _find_run_id_for_execution(execution_id)  # type: ignore[name-defined]
+                        if run_id_resolved:
+                            await _register_session(run_id=run_id_resolved, execution_id=execution_id, session=chosen)
+                except Exception:
+                    pass
+                return JSONResponse({"ok": True})
             except Exception as e_mapped:
-                # Best-effort for notifications
-                if isinstance(method, str) and method.startswith("notifications/"):
-                    logger.warning(
-                        f"[notify] dropped notification for execution_id={execution_id}: {e_mapped}"
-                    )
-                    return JSONResponse({"ok": True, "dropped": True})
-                logger.error(
-                    f"[notify] error forwarding for execution_id={execution_id}: {e_mapped}"
-                )
-                return JSONResponse({"ok": False, "error": str(e_mapped)}, status_code=500)
+                # 2) If mapped failed, best-effort fallback to latest (preserve method semantics)
+                if mapped_session and latest_session:
+                    try:
+                        if method == "notifications/message":
+                            await latest_session.send_log_message(  # type: ignore[attr-defined]
+                                level=str(params.get("level", "info")),  # type: ignore[arg-type]
+                                data=params.get("data"),
+                                logger=params.get("logger"),
+                                related_request_id=params.get("related_request_id"),
+                            )
+                        elif method == "notifications/progress":
+                            await latest_session.send_progress_notification(  # type: ignore[attr-defined]
+                                progress_token=params.get("progressToken"),
+                                progress=params.get("progress"),
+                                total=params.get("total"),
+                                message=params.get("message"),
+                            )
+                        else:
+                            rpc_fb = getattr(latest_session, "rpc", None)
+                            if rpc_fb and hasattr(rpc_fb, "notify"):
+                                await rpc_fb.notify(method, params)
+                            else:
+                                raise RuntimeError("fallback session lacks notify capability")
+                        await _register_session_for_execution(execution_id, latest_session)  # type: ignore[name-defined]
+                        return JSONResponse({"ok": True, "fallback": True})
+                    except Exception:
+                        pass
+                # Best-effort: don't fail the worker on notification drops
+                if isinstance(method, str) and method.startswith("notifications/"):
+                    logger.warning(f"[notify] dropped notification for execution_id={execution_id}: {e_mapped}")
+                    return JSONResponse({"ok": True, "dropped": True})
+                logger.error(f"[notify] error forwarding for execution_id={execution_id}: {e_mapped}")
+                return JSONResponse({"ok": False, "error": str(e_mapped)}, status_code=500)

Also applies to: 468-477, 478-531

🤖 Prompt for AI Agents
In src/mcp_agent/server/app_server.py around lines 404-467, the handler
currently prefers the latest upstream session first and returns on error, but it
should prefer the mapped session for execution_id and only use the fallback
latest session if no mapped session exists or if delivery to the mapped session
fails; on any successful delivery (mapped or fallback) rebind the mapping to
that session. Change the control flow to: 1) look up the mapped session for
execution_id and attempt the same method-specific sends (notifications/message,
notifications/progress, or generic rpc.notify) to that mapped session; 2) if
there is no mapped session or the send raises an exception, retrieve the latest
upstream session and attempt the same send there; 3) if latest also fails return
the same error response; 4) on a successful send (either mapped or latest) call
_register_session to bind execution_id to the successful session and return
JSONResponse({"ok": True}); preserve existing logging, unsupported-method 400
response, and the try/except around _register_session; apply the same
restructuring pattern to the analogous blocks at lines 468-477 and 478-531.

@saqadri saqadri merged commit df7803d into main Sep 13, 2025
8 checks passed
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

1 participant