Skip to content

Conversation

saqadri
Copy link
Collaborator

@saqadri saqadri commented Sep 9, 2025

This has to do with how we are shutting down tasks, particularly in mcp_connection_manager.
One example that hung at the end was examples/workflows/workflow_evaluator_optimizer.
Now it runs all the way through

Summary by CodeRabbit

  • New Features
    • Added explicit connection close control for safer cross-thread shutdowns.
  • Bug Fixes
    • Reduced hangs during shutdown with improved timeout handling and clearer error messages.
    • Relaxed app ID validation to accept a broader range of IDs.
    • Enforced numeric semantics for quality ratings for more consistent behavior.
  • Refactor
    • Reworked connection lifecycle management for greater stability; no user-facing behavior changes.
  • Tests
    • Added concurrency test to validate cross-thread shutdown behavior.
  • Style
    • Minor formatting and docstring cleanups.

Copy link

coderabbitai bot commented Sep 9, 2025

Note

Other AI code review bot(s) detected

CodeRabbit has detected other AI code review bot(s) in this pull request and will avoid duplicating their findings in the review comments. This may lead to a less comprehensive review.

Walkthrough

Reworks MCPConnectionManager to own a TaskGroup and support safe cross-thread shutdown via a new close() API; updates MCPAggregator to call the new close path; changes QualityRating to an int Enum; removes an APP_ID_PREFIX constant; adds a concurrency test; minor CLI formatting edits.

Changes

Cohort / File(s) Summary
Connection manager lifecycle
src/mcp_agent/mcp/mcp_connection_manager.py
Adds TaskGroup owner lifecycle: _start_owner, _tg_owner; records owner loop/thread; adds close(...) with cross-thread shutdown via asyncio.run_coroutine_threadsafe, bounded timeouts, and enhanced logging; updates __aenter__/__aexit__ and launch_server to ensure owner.
Aggregator shutdown flow
src/mcp_agent/mcp/mcp_aggregator.py
Replaces disconnect_all() shutdown with awaiting connection_manager.close(...) wrapped in asyncio.wait_for(5s); removes manual private-thread toggles and updates log/error messages.
Concurrency test
tests/mcp/test_connection_manager_concurrency.py
Adds test exercising concurrent close() calls from the event loop and a separate thread; includes DummyServerRegistry and asserts _tg cleared and _tg_active false after shutdown.
API client cleanup
src/mcp_agent/cli/mcp_app/api_client.py
Removes module-level constant APP_ID_PREFIX = "app_".
Evaluator enum type change
src/mcp_agent/workflows/evaluator_optimizer/evaluator_optimizer.py
Changes QualityRating base from str, Enum to int, Enum (members/values unchanged: POOR=0, FAIR=1, GOOD=2, EXCELLENT=3).
CLI formatting
src/mcp_agent/cli/cloud/commands/workflows/runs/main.py
Non-functional formatting/whitespace adjustments (reflowed comprehensions, print/output formatting, minor docstring spacing).

Sequence Diagram(s)

sequenceDiagram
    autonumber
    participant Aggregator as MCPAggregator
    participant Manager as MCPConnectionManager
    participant Owner as Owner TaskGroup (original loop)
    participant Thread as Other Thread

    Aggregator->>Manager: __aenter__ (ensure owner)
    Manager->>Owner: _start_owner / start _tg_owner
    Owner-->>Manager: ready

    Thread->>Manager: close() (cross-thread)
    alt cross-thread path
        Manager->>Owner: run_coroutine_threadsafe(_shutdown_and_close)
        Owner-->>Manager: close signalled / shutdown runs (or timeout)
    else same-thread path
        Manager->>Owner: inline shutdown (await and set closed)
    end

    Aggregator->>Manager: close() during overall shutdown
    Manager->>Owner: await close (with timeout)
    Owner-->>Manager: cleared _tg and _tg_active = False
    Manager-->>Aggregator: closed
Loading

Estimated code review effort

🎯 4 (Complex) | ⏱️ ~45 minutes

Possibly related PRs

Suggested reviewers

  • rholinshead

Pre-merge checks (2 passed, 1 warning)

❌ Failed checks (1 warning)
Check name Status Explanation Resolution
Docstring Coverage ⚠️ Warning Docstring coverage is 61.90% which is insufficient. The required threshold is 80.00%. You can run @coderabbitai generate docstrings to improve docstring coverage.
✅ Passed checks (2 passed)
Check name Status Explanation
Title Check ✅ Passed The title succinctly captures the primary change by stating that the pull request resolves a shutdown hang, directly reflecting the core modification to the mcp_connection_manager without extraneous details. It is concise, specific, and aligned with the PR’s objective of fixing the hang that occurred at shutdown.
Description Check ✅ Passed The description directly relates to the changeset by explaining that the pull request modifies task shutdown behavior—especially within mcp_connection_manager—to prevent a hang and verifies it using the workflow_evaluator_optimizer example. It clearly ties to the code changes and the reported issue.

Poem

"I hopped through code with careful paws,
I tended TaskGroups without a pause.
Cross-thread close done neat and clear,
No hang, no frost — the shutdown's here.
🐇✨"


📜 Recent review details

Configuration used: CodeRabbit UI

Review profile: CHILL

Plan: Pro

📥 Commits

Reviewing files that changed from the base of the PR and between 2507545 and ac7f904.

📒 Files selected for processing (1)
  • src/mcp_agent/mcp/mcp_connection_manager.py (3 hunks)
🚧 Files skipped from review as they are similar to previous changes (1)
  • src/mcp_agent/mcp/mcp_connection_manager.py
✨ Finishing Touches
  • 📝 Generate Docstrings
🧪 Generate unit tests
  • Create PR with unit tests
  • Post copyable unit tests in a comment
  • Commit unit tests in branch fix/shutdown_hang

Thanks for using CodeRabbit! It's free for OSS, and your support helps us grow. If you like it, consider giving us a shout-out.

❤️ Share

Comment @coderabbitai help to get the list of available commands and usage tips.

@saqadri saqadri requested a review from alienzach September 9, 2025 20:44
Comment on lines 283 to 297
if current_thread == self._thread_id:
# Same thread: perform shutdown inline
logger.debug("MCPConnectionManager: shutting down all server tasks...")
await self.disconnect_all()
await anyio.sleep(0.5)
if self._tg_active:
self._tg_close_event.set()
# Wait for owner to report TaskGroup closed with an anyio timeout
try:
with anyio.fail_after(5.0):
await self._tg_closed_event.wait()
except TimeoutError:
logger.warning(
"MCPConnectionManager: Timeout waiting for TaskGroup owner to close"
)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Potential Race Condition in Thread-Aware Cleanup

The current implementation has a race condition risk between the thread identity check and the subsequent async operations. Since await points allow task switching, another thread could call close() after the thread check passes but before the TaskGroup cleanup completes.

Consider restructuring to make the critical section more atomic:

if current_thread == self._thread_id:
    # Capture state atomically before any awaits
    is_active = self._tg_active
    
    # Then perform async operations
    logger.debug("MCPConnectionManager: shutting down all server tasks...")
    await self.disconnect_all()
    await anyio.sleep(0.5)
    
    # Use the captured state for the conditional
    if is_active:
        self._tg_close_event.set()
        # ...

Alternatively, consider using a lock mechanism to ensure exclusive access during the entire cleanup sequence.

Suggested change
if current_thread == self._thread_id:
# Same thread: perform shutdown inline
logger.debug("MCPConnectionManager: shutting down all server tasks...")
await self.disconnect_all()
await anyio.sleep(0.5)
if self._tg_active:
self._tg_close_event.set()
# Wait for owner to report TaskGroup closed with an anyio timeout
try:
with anyio.fail_after(5.0):
await self._tg_closed_event.wait()
except TimeoutError:
logger.warning(
"MCPConnectionManager: Timeout waiting for TaskGroup owner to close"
)
if current_thread == self._thread_id:
# Same thread: perform shutdown inline
# Capture state atomically before any awaits
is_active = self._tg_active
logger.debug("MCPConnectionManager: shutting down all server tasks...")
await self.disconnect_all()
await anyio.sleep(0.5)
# Use the captured state for the conditional
if is_active:
self._tg_close_event.set()
# Wait for owner to report TaskGroup closed with an anyio timeout
try:
with anyio.fail_after(5.0):
await self._tg_closed_event.wait()
except TimeoutError:
logger.warning(
"MCPConnectionManager: Timeout waiting for TaskGroup owner to close"
)

Spotted by Diamond

Fix in Graphite


Is this helpful? React 👍 or 👎 to let us know.

Comment on lines +314 to +324
cfut = asyncio.run_coroutine_threadsafe(
_shutdown_and_close(), self._loop
)
# Wait in a worker thread to avoid blocking non-asyncio contexts
try:
with anyio.fail_after(5.0):
await anyio.to_thread.run_sync(cfut.result)
except TimeoutError:
logger.warning(
"MCPConnectionManager: Timeout during cross-thread shutdown/close"
)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Potential deadlock risk in cross-thread coordination

The current implementation creates a complex threading scenario that could lead to deadlocks. When asyncio.run_coroutine_threadsafe() schedules work on the original loop, and then anyio.to_thread.run_sync(cfut.result) blocks waiting for that work, there's a dependency chain across threads that can become problematic.

If the original event loop is already in the process of shutting down or becomes blocked, the future may never complete, leaving resources hanging despite the timeout. While the timeout provides some protection, it doesn't guarantee cleanup of the scheduled coroutine.

Consider alternatives:

  1. Use a simpler signaling mechanism between threads
  2. Implement a more robust cancellation pattern that ensures the coroutine is properly cancelled if the timeout occurs
  3. Add explicit cleanup code that runs after the timeout to ensure the future doesn't remain pending

This would improve shutdown reliability, especially in complex threading scenarios.

Suggested change
cfut = asyncio.run_coroutine_threadsafe(
_shutdown_and_close(), self._loop
)
# Wait in a worker thread to avoid blocking non-asyncio contexts
try:
with anyio.fail_after(5.0):
await anyio.to_thread.run_sync(cfut.result)
except TimeoutError:
logger.warning(
"MCPConnectionManager: Timeout during cross-thread shutdown/close"
)
cfut = asyncio.run_coroutine_threadsafe(
_shutdown_and_close(), self._loop
)
# Wait in a worker thread to avoid blocking non-asyncio contexts
try:
with anyio.fail_after(5.0):
await anyio.to_thread.run_sync(cfut.result)
except TimeoutError:
logger.warning(
"MCPConnectionManager: Timeout during cross-thread shutdown/close"
)
# Explicitly cancel the future to prevent the coroutine from
# continuing to run in the background after timeout
cfut.cancel()

Spotted by Diamond

Fix in Graphite


Is this helpful? React 👍 or 👎 to let us know.

Comment on lines 340 to 341
if self._tg_owner_task and not self._tg_owner_task.done():
return
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The task lifecycle check has a potential issue: checking if self._tg_owner_task and not self._tg_owner_task.done() only verifies that an existing task hasn't completed, but doesn't distinguish between successful completion and failure. A task that failed with an exception would have done() return True, causing this check to miss it, but the TaskGroup might not be properly initialized.

Consider enhancing this check to also verify the TaskGroup's state or to explicitly handle tasks that completed with exceptions. Perhaps something like:

if (self._tg_owner_task and not self._tg_owner_task.done()) or (self._tg_active and self._tg is not None):
    return

Or alternatively, add explicit exception handling to restart the TaskGroup if the previous one failed.

Spotted by Diamond

Fix in Graphite


Is this helpful? React 👍 or 👎 to let us know.

Comment on lines 377 to 380
try:
self._tg_closed_event.set()
except Exception:
pass
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The silent exception handling here could create a deadlock risk. If _tg_closed_event.set() fails, any code waiting on _tg_closed_event.wait() will hang indefinitely, potentially causing the entire shutdown process to stall.

Consider either:

  1. Logging the exception to help with debugging: logger.warning(f"Failed to set _tg_closed_event: {e}")
  2. Or implementing a fallback mechanism to ensure waiting tasks can proceed even if the event can't be set

This is particularly important since preventing shutdown hangs is a core goal of this PR.

Suggested change
try:
self._tg_closed_event.set()
except Exception:
pass
try:
self._tg_closed_event.set()
except Exception as e:
logger.warning(f"Failed to set _tg_closed_event: {e}")

Spotted by Diamond

Fix in Graphite


Is this helpful? React 👍 or 👎 to let us know.



class QualityRating(str, Enum):
class QualityRating(int, Enum):
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Type Safety Consideration: Changing QualityRating base class from str to int may impact backward compatibility. This could affect code that expects string values during JSON serialization, string comparisons, or logging operations.

Before merging, it would be valuable to:

  1. Verify all consumers of this enum can handle integer values
  2. Check if any serialization/deserialization logic depends on string representation
  3. Consider adding a string conversion method if needed to maintain compatibility

If this is an intentional change to enforce numeric semantics, adding a brief comment explaining the rationale would help future maintainers understand the design decision.

Suggested change
class QualityRating(int, Enum):
class QualityRating(int, Enum):
"""
Quality rating enum using integer values for numeric comparisons.
Provides string conversion methods for backward compatibility.
"""

Spotted by Diamond

Fix in Graphite


Is this helpful? React 👍 or 👎 to let us know.

Copy link

@coderabbitai coderabbitai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 2

🧹 Nitpick comments (7)
src/mcp_agent/workflows/evaluator_optimizer/evaluator_optimizer.py (1)

25-31: Enum now based on int: ensure prompts/telemetry don’t emit Enum objects

Switching to int semantics is good for comparisons. However, places that render the rating should use .name (and optionally .value) to avoid strings like "QualityRating.GOOD" leaking into prompts/trace. Suggested tweaks:

  • In _build_refinement_prompt (Lines 473-476), render rating as {feedback.rating.name} (and optionally ({feedback.rating.value})).
  • In span events (e.g., Lines 248-257, 270-275), record "rating_name" and "rating_value" instead of the Enum object.

Apply diffs in the respective blocks:

-        Quality Rating: {feedback.rating}
+        Quality Rating: {feedback.rating.name} ({feedback.rating.value})
-                            "rating": evaluation_result.rating,
+                            "rating_name": evaluation_result.rating.name,
+                            "rating_value": int(evaluation_result.rating),
-                        {
-                            "rating": best_rating,
+                        {
+                            "rating_name": best_rating.name,
+                            "rating_value": int(best_rating),
src/mcp_agent/cli/cloud/commands/workflows/runs/main.py (1)

251-268: Replace non-standard Rich color “orange”

Rich doesn’t consistently recognize “[orange]”. Use a supported color (e.g., yellow/yellow3/orange1). Also keep “purple” if desired, though “magenta” is the canonical name.

-    elif "timeout" in status_str or "timed_out" in status_str:
-        return "[orange]⏰ Timed Out[/orange]"
+    elif "timeout" in status_str or "timed_out" in status_str:
+        return "[yellow3]⏰ Timed Out[/yellow3]"
src/mcp_agent/mcp/mcp_connection_manager.py (3)

360-381: Track owner task and clear on exit

Optional, but helps diagnostics and future guards.

     async def _tg_owner(self):
         """Own the TaskGroup lifecycle so __aexit__ runs in the same task it was entered."""
         try:
+            # Track the owner task for observability/guards
+            self._tg_owner_task = asyncio.current_task()
             async with create_task_group() as tg:
                 self._tg = tg
                 self._tg_active = True
                 # Signal that TaskGroup is ready
                 self._tg_ready_event.set()
                 # Wait for close request
                 await self._tg_close_event.wait()
         except Exception as e:
             logger.warning(f"MCPConnectionManager: Error in TaskGroup owner: {e}")
         finally:
             # Mark closed and clear references
             self._tg_active = False
             self._tg = None
             # Signal that TaskGroup has been closed
             try:
                 self._tg_closed_event.set()
             except Exception:
                 pass
+            finally:
+                self._tg_owner_task = None

274-333: Consider closing owner TaskGroup inside close() when not context-managed

If callers use MCPConnectionManager without “async with …”, _owner_tg stays entered, potentially leaking a TaskGroup even after shutdown. Optionally close it here (same-thread or via the origin loop) after _tg_closed_event.

     async def close(self, exc_type=None, exc_val=None, exc_tb=None):
@@
-                # Do not attempt to close the owner TaskGroup here; __aexit__ will handle it
+                # If not used as a context manager, proactively close the owner TaskGroup
+                if self._owner_tg is not None:
+                    try:
+                        await self._owner_tg.__aexit__(exc_type, exc_val, exc_tb)
+                    except Exception as e:
+                        logger.warning(
+                            f"MCPConnectionManager: Error during owner TaskGroup close(): {e}"
+                        )
+                    finally:
+                        self._owner_tg = None
@@
                 if self._loop is not None:
@@
                             await self._tg_closed_event.wait()
 
                     try:
                         cfut = asyncio.run_coroutine_threadsafe(
                             _shutdown_and_close(), self._loop
                         )
                         # Wait in a worker thread to avoid blocking non-asyncio contexts
                         try:
                             with anyio.fail_after(5.0):
                                 await anyio.to_thread.run_sync(cfut.result)
                         except AnyioTimeoutError:
                             logger.warning(
                                 "MCPConnectionManager: Timeout during cross-thread shutdown/close"
                             )
+                        # Also close owner TaskGroup from the origin loop if present
+                        if self._owner_tg is not None:
+                            def _owner_close():
+                                return self._owner_tg.__aexit__(exc_type, exc_val, exc_tb)
+                            cfut2 = asyncio.run_coroutine_threadsafe(_owner_close(), self._loop)
+                            try:
+                                with anyio.fail_after(5.0):
+                                    await anyio.to_thread.run_sync(cfut2.result)
+                            except AnyioTimeoutError:
+                                logger.warning(
+                                    "MCPConnectionManager: Timeout during owner TaskGroup close()"
+                                )
+                            finally:
+                                self._owner_tg = None

251-258: Minor: duplicate loop capture

aenter records self._loop, and _start_owner() also records it. Not harmful, but one capture would suffice.

src/mcp_agent/mcp/mcp_aggregator.py (2)

246-253: Log text is misleading about “forcing shutdown”

On timeout, no additional “forcing” occurs. Either adjust the message (as in the diff above) or actually escalate with a best-effort fallback. Since we now invoke __aexit__, the clarified message is sufficient.


242-245: Consider a slightly longer or configurable timeout and shielding

Manager’s internal waits already use ~5s windows; giving a bit more headroom or making this configurable can avoid spurious timeouts during heavy teardown. You already used asyncio.shield in the previous diff; if you don’t adopt that, at least bump to ~7.5–10s or read from config.

-                                    await asyncio.wait_for(
-                                        self._persistent_connection_manager.close(),
-                                        timeout=5.0,
-                                    )
+                                    await asyncio.wait_for(
+                                        self._persistent_connection_manager.close(),
+                                        timeout=7.5,
+                                    )
📜 Review details

Configuration used: CodeRabbit UI

Review profile: CHILL

Plan: Pro

📥 Commits

Reviewing files that changed from the base of the PR and between ae27182 and f91c881.

📒 Files selected for processing (5)
  • src/mcp_agent/cli/cloud/commands/workflows/runs/main.py (6 hunks)
  • src/mcp_agent/cli/mcp_app/api_client.py (0 hunks)
  • src/mcp_agent/mcp/mcp_aggregator.py (1 hunks)
  • src/mcp_agent/mcp/mcp_connection_manager.py (3 hunks)
  • src/mcp_agent/workflows/evaluator_optimizer/evaluator_optimizer.py (1 hunks)
💤 Files with no reviewable changes (1)
  • src/mcp_agent/cli/mcp_app/api_client.py
🧰 Additional context used
🧬 Code graph analysis (3)
src/mcp_agent/mcp/mcp_aggregator.py (1)
src/mcp_agent/mcp/mcp_connection_manager.py (1)
  • close (274-336)
src/mcp_agent/cli/cloud/commands/workflows/runs/main.py (1)
src/mcp_agent/executor/workflow.py (1)
  • run_id (133-138)
src/mcp_agent/mcp/mcp_connection_manager.py (1)
src/mcp_agent/mcp/mcp_aggregator.py (1)
  • close (198-273)
🔇 Additional comments (7)
src/mcp_agent/cli/cloud/commands/workflows/runs/main.py (2)

74-76: LGTM: clearer multi-line filter expression

Readability improved; semantics unchanged.


233-243: Robust dict conversion for objects

Handling .isoformat() and .value when present is correct and preserves JSON-friendliness.

src/mcp_agent/mcp/mcp_connection_manager.py (4)

569-596: Disconnect-all semantics are fine

Signaling shutdown outside the lock, brief delay, and logging are appropriate for graceful teardown.


382-490: Launch flow resilience looks good

Auto-starting the owner TaskGroup when absent prevents hangs when called outside a context.


165-223: Lifecycle task error handling is solid

Initialization, shutdown wait, and error reporting (including ExceptionGroup) are handled correctly.


491-536: Health-check and re-launch path is reasonable

Unhealthy servers are recycled; initialization failure surfaces a clear exception.

src/mcp_agent/mcp/mcp_aggregator.py (1)

235-259: I’m inspecting both the aggregator’s enter/close logic and the connection manager’s shutdown to confirm whether close() already finalizes the TaskGroup via its own __aexit__. Stand by.

"""

from datetime import timedelta
import asyncio
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🛠️ Refactor suggestion

Catch AnyIO TimeoutError, not built-in TimeoutError

anyio.fail_after raises anyio.exceptions.TimeoutError. The current except TimeoutError won’t catch it.

-from anyio import Event, create_task_group, Lock
+from anyio import Event, create_task_group, Lock
+from anyio.exceptions import TimeoutError as AnyioTimeoutError
-                    try:
-                        with anyio.fail_after(5.0):
-                            await self._tg_closed_event.wait()
-                    except TimeoutError:
+                    try:
+                        with anyio.fail_after(5.0):
+                            await self._tg_closed_event.wait()
+                    except AnyioTimeoutError:
                         logger.warning(
                             "MCPConnectionManager: Timeout waiting for TaskGroup owner to close"
                         )
-                        try:
-                            with anyio.fail_after(5.0):
-                                await anyio.to_thread.run_sync(cfut.result)
-                        except TimeoutError:
+                        try:
+                            with anyio.fail_after(5.0):
+                                await anyio.to_thread.run_sync(cfut.result)
+                        except AnyioTimeoutError:
                             logger.warning(
                                 "MCPConnectionManager: Timeout during cross-thread shutdown/close"
                             )

Also applies to: 17-18, 292-297, 319-324

🤖 Prompt for AI Agents
In src/mcp_agent/mcp/mcp_connection_manager.py around lines 6, 17-18, 292-297,
and 319-324, the except clauses currently catch the built-in TimeoutError but
anyio.fail_after raises anyio.exceptions.TimeoutError; import anyio (or from
anyio import exceptions as anyio_exceptions) at the top and change the except
handlers to catch anyio.exceptions.TimeoutError (or the alias you import) so the
timeouts are actually caught; update all listed except blocks accordingly.

Comment on lines 351 to 354
if (self._tg_owner_task and not self._tg_owner_task.done()) or (
self._tg_active and self._tg is not None
):
return
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

There appears to be a potential race condition in the _start_owner() method. The check for an active owner task is not atomic with the subsequent operations. Between evaluating self._tg_owner_task and not self._tg_owner_task.done() and returning from the method, another thread could modify these variables. This could cause the method to incorrectly return early when it should actually start a new owner task, potentially resulting in a non-functional TaskGroup.

Consider using a lock or other synchronization mechanism to ensure this check-and-return operation is atomic across threads.

Spotted by Diamond

Fix in Graphite


Is this helpful? React 👍 or 👎 to let us know.

Comment on lines 367 to 370
# Reset coordination events
self._tg_ready_event = Event()
self._tg_close_event = Event()
self._tg_closed_event = Event()
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Potential Race Condition in Event Object Replacement

There's a synchronization concern in how the coordination events are being replaced. When creating new Event() instances:

self._tg_ready_event = Event()
self._tg_close_event = Event()
self._tg_closed_event = Event()

If other threads are currently waiting on any of these events (via await event.wait()), replacing the event objects will leave those threads waiting on stale event instances that will never be signaled. This could lead to deadlocks or indefinite hangs.

Consider implementing a synchronization mechanism to ensure safe event replacement, or restructure the code to avoid replacing events that might have waiters. One approach would be to reset existing events (if they have a .clear() method) rather than creating new instances.

Suggested change
# Reset coordination events
self._tg_ready_event = Event()
self._tg_close_event = Event()
self._tg_closed_event = Event()
# Reset coordination events
if self._tg_ready_event:
self._tg_ready_event.clear()
else:
self._tg_ready_event = Event()
if self._tg_close_event:
self._tg_close_event.clear()
else:
self._tg_close_event = Event()
if self._tg_closed_event:
self._tg_closed_event.clear()
else:
self._tg_closed_event = Event()

Spotted by Diamond

Fix in Graphite


Is this helpful? React 👍 or 👎 to let us know.

Comment on lines +402 to +405
try:
self._tg_closed_event.set()
except Exception as e:
logger.warning(f"Failed to set _tg_closed_event: {e}")
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The exception handling in the _tg_owner finally block could lead to deadlocks. If _tg_closed_event.set() fails, any threads waiting on this event will hang indefinitely while only a warning is logged. Consider making this signaling mechanism more robust by either:

  1. Using a more reliable synchronization primitive
  2. Implementing a retry mechanism with backoff
  3. Propagating the error to waiting threads so they can time out appropriately

This is particularly important since this event is used for cross-thread coordination during shutdown, where hanging threads can prevent clean application termination.

Suggested change
try:
self._tg_closed_event.set()
except Exception as e:
logger.warning(f"Failed to set _tg_closed_event: {e}")
max_retries = 3
retry_delay = 0.1 # seconds
for attempt in range(max_retries):
try:
self._tg_closed_event.set()
break
except Exception as e:
if attempt < max_retries - 1:
logger.warning(f"Failed to set _tg_closed_event (attempt {attempt+1}/{max_retries}): {e}. Retrying in {retry_delay}s")
time.sleep(retry_delay)
retry_delay *= 2 # Exponential backoff
else:
logger.error(f"Failed to set _tg_closed_event after {max_retries} attempts: {e}. Threads waiting on this event may hang.")

Spotted by Diamond

Fix in Graphite


Is this helpful? React 👍 or 👎 to let us know.

Copy link

@coderabbitai coderabbitai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 0

Caution

Some comments are outside the diff and can’t be posted inline due to platform limitations.

⚠️ Outside diff range comments (1)
src/mcp_agent/mcp/mcp_connection_manager.py (1)

175-189: Bug: session-id callback set before session is created

You check isinstance(server_conn.session, MCPAgentClientSession) before calling create_session, so the callback is never set. Move the callback wiring after create_session(...).

-            # If the transport provides a session ID callback (streamable_http does),
-            # store it in the server connection
-            if (
-                len(extras) > 0
-                and callable(extras[0])
-                and isinstance(server_conn.session, MCPAgentClientSession)
-            ):
-                server_conn.session.set_session_id_callback(extras[0])
-
-            # Build a session
-            server_conn.create_session(read_stream, write_stream)
-
-            async with server_conn.session:
+            # Build a session
+            server_conn.create_session(read_stream, write_stream)
+
+            # If the transport provides a session ID callback (streamable_http does),
+            # store it in the server connection
+            if len(extras) > 0 and callable(extras[0]) and isinstance(
+                server_conn.session, MCPAgentClientSession
+            ):
+                server_conn.session.set_session_id_callback(extras[0])
+
+            async with server_conn.session:
♻️ Duplicate comments (2)
src/mcp_agent/mcp/mcp_connection_manager.py (2)

16-18: Timeouts aren’t caught: handle AnyIO’s TimeoutError, not built-in

anyio.fail_after raises anyio.exceptions.TimeoutError. Your except TimeoutError: won’t fire.

 import anyio
-from anyio import Event, create_task_group, Lock
+from anyio import Event, create_task_group, Lock
+from anyio.exceptions import TimeoutError as AnyioTimeoutError
@@
-                        try:
-                            with anyio.fail_after(5.0):
-                                await self._tg_closed_event.wait()
-                        except TimeoutError:
+                        try:
+                            with anyio.fail_after(5.0):
+                                await self._tg_closed_event.wait()
+                        except AnyioTimeoutError:
                             logger.warning(
                                 "MCPConnectionManager: Timeout waiting for TaskGroup owner to close"
                             )
@@
-                        try:
-                            with anyio.fail_after(5.0):
-                                await anyio.to_thread.run_sync(cfut.result)
-                        except TimeoutError:
+                        try:
+                            with anyio.fail_after(5.0):
+                                await anyio.to_thread.run_sync(cfut.result)
+                        except AnyioTimeoutError:
                             logger.warning(
                                 "MCPConnectionManager: Timeout during cross-thread shutdown/close"
                             )
                             try:
                                 cfut.cancel()
                             except Exception:
                                 pass

Also applies to: 297-303, 325-335


241-251: Serialize owner startup; remove unused _tg_owner_task to prevent races

concurrent calls to _start_owner() can reset Events and create multiple owner groups. Guard with a dedicated lock and rely on _owner_tg/_tg_active instead of the never-assigned _tg_owner_task.

         # Owner task + coordination events for safe TaskGroup lifecycle
-        self._tg_owner_task: asyncio.Task | None = None
         self._owner_tg: TaskGroup | None = None
         self._tg_ready_event: Event = Event()
         self._tg_close_event: Event = Event()
         self._tg_closed_event: Event = Event()
         # Ensure a single close sequence at a time on the origin loop
         self._close_lock = Lock()
+        # Ensure only one owner startup runs at a time
+        self._owner_lock = Lock()
@@
-    async def _start_owner(self):
+    async def _start_owner(self):
         """Start the TaskGroup owner task if not already running."""
-        # If an owner is active or TaskGroup is already active, nothing to do
-        if (self._tg_owner_task and not self._tg_owner_task.done()) or (
-            self._tg_active and self._tg is not None
-        ):
-            return
-        # If previous owner exists but is done (possibly with error), log and restart
-        if self._tg_owner_task and self._tg_owner_task.done():
-            try:
-                exc = self._tg_owner_task.exception()
-                if exc:
-                    logger.warning(
-                        f"MCPConnectionManager: restarting owner after error: {exc}"
-                    )
-            except Exception:
-                logger.warning(
-                    "MCPConnectionManager: restarting owner after unknown state"
-                )
-        # Reset coordination events
-        self._tg_ready_event = Event()
-        self._tg_close_event = Event()
-        self._tg_closed_event = Event()
-        # Record loop and thread
-        try:
-            self._loop = asyncio.get_running_loop()
-        except RuntimeError:
-            self._loop = None
-        self._thread_id = threading.get_ident()
-        # Create an owner TaskGroup and start the owner task within it
-        owner_tg = create_task_group()
-        await owner_tg.__aenter__()
-        self._owner_tg = owner_tg
-        owner_tg.start_soon(self._tg_owner)
-        # Wait until the TaskGroup is ready
-        await self._tg_ready_event.wait()
+        async with self._owner_lock:
+            # If TG already active, or owner exists, wait for readiness and return
+            if self._tg_active and self._tg is not None:
+                return
+            if self._owner_tg is not None:
+                await self._tg_ready_event.wait()
+                return
+
+            # Reset coordination events
+            self._tg_ready_event = Event()
+            self._tg_close_event = Event()
+            self._tg_closed_event = Event()
+
+            # Record loop and thread
+            try:
+                self._loop = asyncio.get_running_loop()
+            except RuntimeError:
+                self._loop = None
+            self._thread_id = threading.get_ident()
+
+            # Create an owner TaskGroup and start the owner task within it
+            owner_tg = create_task_group()
+            await owner_tg.__aenter__()
+            self._owner_tg = owner_tg
+            owner_tg.start_soon(self._tg_owner)
+
+            # Wait until the TaskGroup is ready
+            await self._tg_ready_event.wait()

Also applies to: 348-384

🧹 Nitpick comments (2)
src/mcp_agent/mcp/mcp_connection_manager.py (1)

343-347: Don’t swallow AttributeError in close()

This masks real bugs and complicates debugging of shutdown issues (the very thing this PR addresses).

-        except AttributeError:  # Handle missing `_exceptions`
-            pass
         except Exception as e:
             logger.warning(f"MCPConnectionManager: Error during shutdown: {e}")
tests/mcp/test_connection_manager_concurrency.py (1)

18-20: Prefer async with for context management

Use the context manager to guarantee cleanup even if assertions fail.

-    mgr = MCPConnectionManager(server_registry=DummyServerRegistry())
-    await mgr.__aenter__()
+    async with MCPConnectionManager(server_registry=DummyServerRegistry()) as mgr:
@@
-    # Now exit context to close the owner TaskGroup on the origin loop
-    await mgr.__aexit__(None, None, None)
-
-    # Verify TaskGroup cleared
-    assert getattr(mgr, "_tg", None) is None
-    assert getattr(mgr, "_tg_active", False) is False
+    # Verify TaskGroup cleared
+    assert getattr(mgr, "_tg", None) is None
+    assert getattr(mgr, "_tg_active", False) is False

Also applies to: 55-59

📜 Review details

Configuration used: CodeRabbit UI

Review profile: CHILL

Plan: Pro

📥 Commits

Reviewing files that changed from the base of the PR and between f91c881 and 2507545.

📒 Files selected for processing (2)
  • src/mcp_agent/mcp/mcp_connection_manager.py (3 hunks)
  • tests/mcp/test_connection_manager_concurrency.py (1 hunks)
🧰 Additional context used
🧬 Code graph analysis (2)
tests/mcp/test_connection_manager_concurrency.py (1)
src/mcp_agent/mcp/mcp_connection_manager.py (2)
  • MCPConnectionManager (224-621)
  • close (276-346)
src/mcp_agent/mcp/mcp_connection_manager.py (1)
src/mcp_agent/mcp/mcp_aggregator.py (1)
  • close (198-273)
🔇 Additional comments (1)
tests/mcp/test_connection_manager_concurrency.py (1)

16-16: Confirm anyio pytest marker style matches your plugin version

Some setups use @pytest.mark.anyio (optionally with backend="asyncio"). Verify your plugin accepts the positional form.

Would you like me to adjust to the canonical form after confirming your pytest-anyio version?

@saqadri saqadri merged commit 1c876c7 into main Sep 9, 2025
8 checks passed
andrew-lastmile added a commit that referenced this pull request Sep 11, 2025
* Temporarily exclude CLI from test coverage (#429)

### TL;DR

Exclude CLI code from test coverage metrics for now. Will add tests when we're done sprinting 10000 mph 

![Added via Giphy](https://media4.giphy.com/media/v1.Y2lkPWM5NDg3NzQzOTNudmtpNXcyazNnZWo2enIzem5neXR2a3l0cGx5aWFlbDB6ZTA1dyZlcD12MV9naWZzX3NlYXJjaCZjdD1n/sRKg9r2YWeCTG5JTTo/giphy.gif)

<!-- This is an auto-generated comment: release notes by coderabbit.ai -->

## Summary by CodeRabbit

* **Tests**
  * Adjusted test coverage collection to exclude non-critical CLI components, resulting in more accurate coverage metrics for core functionality.

* **Chores**
  * Updated coverage reporting configuration to align with the new exclusion rules, ensuring consistent results across local and CI runs.

<!-- end of auto-generated comment: release notes by coderabbit.ai -->

* Add workflow commands to CLI (#424)

### TL;DR

Added workflow management commands to the MCP Agent CLI, including describe, suspend, resume, and cancel operations.

### What changed?

- Added four new workflow management commands:
    - `describe_workflow`: Shows detailed information about a workflow execution
    - `suspend_workflow`: Pauses a running workflow execution
    - `resume_workflow`: Resumes a previously suspended workflow
    - `cancel_workflow`: Permanently stops a workflow execution
- Implemented corresponding API client methods in `WorkflowAPIClient`:
    - `suspend_workflow`
    - `resume_workflow`
    - `cancel_workflow`
- Updated the CLI structure to expose these commands under `mcp-agent cloud workflows`
- Added an alias for `describe_workflow` as `status` for backward compatibility

### How to test?

Test the new workflow commands with a running workflow:

```
# Get workflow details
mcp-agent cloud workflows describe run_abc123
mcp-agent cloud workflows status run_abc123  # alias

# Suspend a workflow
mcp-agent cloud workflows suspend run_abc123

# Resume a workflow (with optional payload)
mcp-agent cloud workflows resume run_abc123
mcp-agent cloud workflows resume run_abc123 --payload '{"data": "value"}'

# Cancel a workflow (with optional reason)
mcp-agent cloud workflows cancel run_abc123
mcp-agent cloud workflows cancel run_abc123 --reason "User requested cancellation"
```

### Why make this change?

These commands provide essential workflow lifecycle management capabilities to users, allowing them to monitor and control workflow executions through the CLI. The ability to suspend, resume, and cancel workflows gives users more control over long-running operations and helps manage resources more efficiently.

<!-- This is an auto-generated comment: release notes by coderabbit.ai -->

## Summary by CodeRabbit

- New Features
  - Introduced “workflows” CLI group with commands: describe (alias: status), resume, suspend, and cancel.
  - Describe supports text, JSON, and YAML output; all commands work with server ID or URL and include improved error messages.

- Refactor
  - Renamed CLI group from “workflow” to “workflows” and reorganized command registrations.
  - Consolidated internal utility imports (no behavior change).

- Chores
  - Updated module descriptions.
  - Removed legacy workflow status package/exports in favor of the new workflows commands.

<!-- end of auto-generated comment: release notes by coderabbit.ai -->

* add servers workflow subcommand (#428)

# Add servers workflows subcommand

This PR adds a new `workflows` subcommand to the `mcp-agent cloud servers` command that allows users to list workflows associated with a specific server. The command supports:

- Filtering by workflow status
- Limiting the number of results
- Multiple output formats (text, JSON, YAML)
- Accepting server IDs, app config IDs, or server URLs as input

Examples:
```
mcp-agent cloud servers workflows app_abc123
mcp-agent cloud servers workflows https://server.example.com --status running
mcp-agent cloud servers workflows apcnf_xyz789 --limit 10 --format json
```

The PR also cleans up the examples in the existing workflow commands and adds the necessary API client support for listing workflows.

* add workflow list and runs (#430)

### TL;DR

Reorganized workflow commands

`mcp-agent cloud workflows runs`
`mcp-agent cloud workflows list`
`mcp-agent cloud server workflows` (alias of workflows list)

### What changed?

- Moved `list_workflows_for_server` from the servers module to the workflows module as `list_workflow_runs`
- Added new workflow commands: `list_workflows` and `list_workflow_runs`
- Updated CLI command structure to make workflows commands more intuitive
- Applied consistent code formatting with black across all server and workflow related files

### How to test?

Test the new and reorganized workflow commands:

```bash
# List available workflow definitions
mcp-agent cloud workflows list app_abc123

# List workflow runs (previously under servers workflows)
mcp-agent cloud workflows runs app_abc123

# Test with different output formats
mcp-agent cloud workflows list app_abc123 --format json
mcp-agent cloud workflows runs app_abc123 --format yaml

# Verify existing commands still work
mcp-agent cloud servers list
mcp-agent cloud workflows describe app_abc123 run_xyz789
```

* [ez] Move deploy command to cloud namespace (#431)

### TL;DR

Added `cloud deploy` command as an alias for the existing `deploy` command.

* First pass at implementing the mcp-agent CLI (#409)

* Initial scaffolding

* initial CLI

* checkpoint

* checkpoint 2

* various updates to cli

* fix lint and format

* fix: should load secrets.yaml template instead when running init cli command

* fix: prevent None values in either mcp-agent secrets and config yaml files from overwriting one another when merging both

* fix: when running config check, use get_settings() instead of Settings() to ensure settings are loaded.

* fix: handle None values for servers in MCPSettings so it defaults to empty dict and update secrets.yaml template so it does not overwrite mcp servers in config

* Inform users to save and close editor to continue when running config edit command

* fix: Update openai, anthropic and azure regex for keys cli command

* Sort model list by provider and model name

* Add filtering support for models list cli command

* disable untested commands

* lint, format, gen_schema

* get rid of accidental otlp exporter changes from another branch

* get rid of accidental commit from other branch

---------

Co-authored-by: StreetLamb <[email protected]>

* Docs MVP (#436)

* Initial scaffolding

* initial CLI

* checkpoint

* checkpoint 2

* various updates to cli

* fix lint and format

* fix: should load secrets.yaml template instead when running init cli command

* fix: prevent None values in either mcp-agent secrets and config yaml files from overwriting one another when merging both

* fix: when running config check, use get_settings() instead of Settings() to ensure settings are loaded.

* fix: handle None values for servers in MCPSettings so it defaults to empty dict and update secrets.yaml template so it does not overwrite mcp servers in config

* Inform users to save and close editor to continue when running config edit command

* fix: Update openai, anthropic and azure regex for keys cli command

* Sort model list by provider and model name

* Add filtering support for models list cli command

* disable untested commands

* Fixes to docs

* Updating the main.py and !developer_secrets for secrets

* updating python entry files to main.py

* Fix tracer.py

---------

Co-authored-by: StreetLamb <[email protected]>
Co-authored-by: Andrew Hoh <[email protected]>

* fix: max complete token for openai gen structured (#438)

* Fix regression in CLI ("cloud cloud")

* docs fixes

* Fix top-level cli cloud commands (deploy, login, etc)

* Add eager tool validation to ensure json serializability of input params/result types

* More docs updates

* Refactor workflow runs list to use MCP tool calls (#439)

### TL;DR

Refactored the workflow runs listing command to use MCP tool calls instead of direct API client calls.

### What changed?

- Replaced the direct API client approach with MCP tool calls to retrieve workflow runs
- Added a new `_list_workflow_runs_async` function that uses the MCP App and gen_client to communicate with the server
- Improved status filtering and display logic to work with both object and dictionary response formats
- Enhanced error handling and formatting of workflow run information
- Updated the workflow data processing to handle different response formats more robustly

### How to test?

```bash
# List workflow runs from a server
mcp-agent cloud workflows runs <server_id_or_url>

# Filter by status
mcp-agent cloud workflows runs <server_id_or_url> --status running

# Limit results
mcp-agent cloud workflows runs <server_id_or_url> --limit 10

# Change output format
mcp-agent cloud workflows runs <server_id_or_url> --format json
```

<!-- This is an auto-generated comment: release notes by coderabbit.ai -->

## Summary by CodeRabbit

- New Features
  - Add status filtering for workflow runs, with common aliases (e.g., timeout → timed_out).
  - Add an optional limit to constrain the number of results.
  - Allow server selection via direct URL or config-based server ID.

- Refactor
  - Update text output: columns now show Workflow ID, Name, Status, Run ID, Created; Principal removed.
  - Improve date formatting and consistent JSON/YAML/Text rendering.

- Bug Fixes
  - Clearer error messages and safer handling when server info is missing or no data is returned.

<!-- end of auto-generated comment: release notes by coderabbit.ai -->

* Update workflows commands UI to be more consistant with the rest of the CLI (#432)

### TL;DR

Improved CLI workflow command output formatting with better visual indicators and consistent styling.

### How to test?

```
mcp-agent cloud workflows cancel <run-id>
mcp-agent cloud workflows describe <run-id>
mcp-agent cloud workflows resume <run-id>
```

<!-- This is an auto-generated comment: release notes by coderabbit.ai -->

## Summary by CodeRabbit

* **Style**
  * Cancel workflow: added a blank line before the status and changed the success icon to 🚫 (yellow).
  * Describe workflow: replaced panel UI with a clean, header-based text layout (“🔍 Workflow Details”), showing name with colorized status and fields for Workflow ID, Run ID, and Created. Updated status indicators with emojis and colors; timestamp is now plain text on its own line.
  * Resume workflow: success message now applies consistent coloring to the entire line for improved readability.

<!-- end of auto-generated comment: release notes by coderabbit.ai -->

* Feature: Update Workflow Tool Calls to Work with workflow_id (#435)

* Support for workflow_id and run_id

* Update temporal workflow registry

* tests

* Update LLMS.txt

* Fix config

* Return bool for cancel result

* Validate ids provided

* Fix cancel workflow id

* Fix workflows-resume response

* Add workflow-name specific resume and cancel tools

* Fix return type

* Fix examples

* Remove redundant workflows-{name}-tool tool calls

* Add _workflow_status back

* Use registry helper

* Changes from review

* Add back evaluator_optimizer enum fix

* Fix a hang that can happen at shutdown (#440)

* Fix a shutdown hang

* Fix tests

* fix taskgroup closed in a different context than when it was started in error

* some PR feedback fixes

* PR feedback

* Fix random failures of server aggregator not found for agent in temporal (#441)

* Fix a shutdown hang

* Fix tests

* fix taskgroup closed in a different context than when it was started in error

* some PR feedback fixes

* Fix random failures of server aggregator not found for agent in temporal environment

* Bump pyproject version

* Fix gateway URL resolution (#443)

* Fix gateway URL resolution

Removed incorrect dependence on ServerRegistry for gateway URLs; the gateway is not an MCP server.
App server (src/mcp_agent/server/app_server.py) builds workflow memo with:
- gateway_url precedence: X-MCP-Gateway-URL or X-Forwarded-Url → reconstruct X-Forwarded-Proto/Host/Prefix → request.base_url → MCP_GATEWAY_URL env.
- gateway_token precedence: X-MCP-Gateway-Token → MCP_GATEWAY_TOKEN env.
Worker-side (SystemActivities/SessionProxy) uses memo.gateway_url and gateway_token; falls back to worker env.
Client proxy helpers (src/mcp_agent/mcp/client_proxy.py):
- _resolve_gateway_url: explicit param → context → env → local default.
- Updated public signatures to drop server_registry parameter.

* Cloud/deployable temporal example (#395)

* Move workflows to workflows.py file

* Fix router example

* Add remaining dependencies

* Update orchestrator to @app.async_tool example

* Changes from review

* Fix interactive_workflow to be runnable via tool

* Fix resume tool params

* Fix: Use helpful typer and invoke for root cli commands (#444)

* Use helpful typer and invoke for root cli commands

* Fix lint

* Fix enum check (#445)

* Fix/swap relative mcp agent dependency on deploy (#446)

* Update wrangler wrapper to handle requirements.txt processing

* Fix backup handling

* pass api key to workflow (#447)

* pass api key to workflow

* guard against settings not existing

---------

Co-authored-by: John Corbett <[email protected]>
Co-authored-by: Sarmad Qadri <[email protected]>
Co-authored-by: StreetLamb <[email protected]>
Co-authored-by: Yi <[email protected]>
Co-authored-by: Ryan Holinshead <[email protected]>
Co-authored-by: roman-van-der-krogt <[email protected]>
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

1 participant