Skip to content
Merged
Show file tree
Hide file tree
Changes from 4 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
35 changes: 20 additions & 15 deletions src/mcp_agent/cli/cloud/commands/workflows/runs/main.py
Original file line number Diff line number Diff line change
Expand Up @@ -71,7 +71,9 @@ async def _list_workflow_runs_async(
workflows = workflows_data
if status:
status_filter = _get_status_filter(status)
workflows = [w for w in workflows if _matches_status(w, status_filter)]
workflows = [
w for w in workflows if _matches_status(w, status_filter)
]

if limit:
workflows = workflows[:limit]
Expand Down Expand Up @@ -144,7 +146,7 @@ def _get_status_filter(status: str) -> str:

def _matches_status(workflow: dict, status_filter: str) -> bool:
"""Check if workflow matches the status filter.

Note: We use string-based matching instead of protobuf enum values because
the MCP tool response format returns status as strings, not enum objects.
This approach is more flexible and doesn't require maintaining sync with
Expand All @@ -158,9 +160,7 @@ def _matches_status(workflow: dict, status_filter: str) -> bool:

def _print_workflows_text(workflows, status_filter, server_id_or_url):
"""Print workflows in text format."""
console.print(
f"\n[bold blue]📊 Workflow Runs ({len(workflows)})[/bold blue]"
)
console.print(f"\n[bold blue]📊 Workflow Runs ({len(workflows)})[/bold blue]")

if not workflows:
print_info("No workflow runs found for this server.")
Expand All @@ -169,7 +169,7 @@ def _print_workflows_text(workflows, status_filter, server_id_or_url):
for i, workflow in enumerate(workflows):
if i > 0:
console.print()

if isinstance(workflow, dict):
workflow_id = workflow.get("workflow_id", "N/A")
name = workflow.get("name", "N/A")
Expand All @@ -184,27 +184,28 @@ def _print_workflows_text(workflows, status_filter, server_id_or_url):
run_id = getattr(workflow, "run_id", "N/A")
created_at = getattr(workflow, "created_at", "N/A")
principal_id = getattr(workflow, "principal_id", "N/A")

status_display = _get_status_display(execution_status)

if created_at and created_at != "N/A":
if hasattr(created_at, "strftime"):
created_display = created_at.strftime("%Y-%m-%d %H:%M:%S")
else:
try:
from datetime import datetime

dt = datetime.fromisoformat(str(created_at).replace("Z", "+00:00"))
created_display = dt.strftime("%Y-%m-%d %H:%M:%S")
except (ValueError, TypeError):
created_display = str(created_at)
else:
created_display = "N/A"

console.print(f"[bold cyan]{name or 'Unnamed'}[/bold cyan] {status_display}")
console.print(f" Workflow ID: {workflow_id}")
console.print(f" Run ID: {run_id}")
console.print(f" Created: {created_display}")

if principal_id and principal_id != "N/A":
console.print(f" Principal: {principal_id}")

Expand All @@ -228,23 +229,27 @@ def _workflow_to_dict(workflow):
"""Convert workflow dict to standardized dictionary format."""
if isinstance(workflow, dict):
return workflow

return {
"workflow_id": getattr(workflow, "workflow_id", None),
"run_id": getattr(workflow, "run_id", None),
"name": getattr(workflow, "name", None),
"created_at": getattr(workflow, "created_at", None).isoformat() if getattr(workflow, "created_at", None) else None,
"execution_status": getattr(workflow, "execution_status", None).value if getattr(workflow, "execution_status", None) else None,
"created_at": getattr(workflow, "created_at", None).isoformat()
if getattr(workflow, "created_at", None)
else None,
"execution_status": getattr(workflow, "execution_status", None).value
if getattr(workflow, "execution_status", None)
else None,
}


def _get_status_display(status):
"""Convert status to display string with emoji."""
if not status:
return "❓ Unknown"

status_str = str(status).lower()

if "running" in status_str:
return "[green]🟢 Running[/green]"
elif "completed" in status_str:
Expand Down
2 changes: 0 additions & 2 deletions src/mcp_agent/cli/mcp_app/api_client.py
Original file line number Diff line number Diff line change
Expand Up @@ -63,7 +63,6 @@ class CanDoActionsResponse(BaseModel):
canDoActions: Optional[List[CanDoActionCheck]] = []



APP_ID_PREFIX = "app_"
APP_CONFIG_ID_PREFIX = "apcnf_"

Expand Down Expand Up @@ -467,7 +466,6 @@ async def list_app_configurations(
response = await self.post("/mcp_app/list_app_configurations", payload)
return ListAppConfigurationsResponse(**response.json())


async def delete_app(self, app_id: str) -> str:
"""Delete an MCP App via the API.

Expand Down
17 changes: 4 additions & 13 deletions src/mcp_agent/mcp/mcp_aggregator.py
Original file line number Diff line number Diff line change
Expand Up @@ -237,28 +237,19 @@ async def close(self):
and self.context._mcp_connection_manager
== self._persistent_connection_manager
):
# Add timeout protection for the disconnect operation
# Close via manager's thread-aware close()
try:
await asyncio.wait_for(
self._persistent_connection_manager.disconnect_all(),
self._persistent_connection_manager.close(),
timeout=5.0,
)
except asyncio.TimeoutError:
logger.warning(
"Timeout during disconnect_all(), forcing shutdown"
"Timeout during connection manager close(), forcing shutdown"
)

# Avoid calling __aexit__ directly across threads; mark inactive
try:
if hasattr(
self._persistent_connection_manager,
"_tg_active",
):
self._persistent_connection_manager._tg_active = False
self._persistent_connection_manager._tg = None
except Exception as e:
logger.warning(
f"Error during connection manager state cleanup: {e}"
f"Error during connection manager close(): {e}"
)

# Clean up the connection manager from the context
Expand Down
176 changes: 148 additions & 28 deletions src/mcp_agent/mcp/mcp_connection_manager.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
"""

from datetime import timedelta
import asyncio
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🛠️ Refactor suggestion

Catch AnyIO TimeoutError, not built-in TimeoutError

anyio.fail_after raises anyio.exceptions.TimeoutError. The current except TimeoutError won’t catch it.

-from anyio import Event, create_task_group, Lock
+from anyio import Event, create_task_group, Lock
+from anyio.exceptions import TimeoutError as AnyioTimeoutError
-                    try:
-                        with anyio.fail_after(5.0):
-                            await self._tg_closed_event.wait()
-                    except TimeoutError:
+                    try:
+                        with anyio.fail_after(5.0):
+                            await self._tg_closed_event.wait()
+                    except AnyioTimeoutError:
                         logger.warning(
                             "MCPConnectionManager: Timeout waiting for TaskGroup owner to close"
                         )
-                        try:
-                            with anyio.fail_after(5.0):
-                                await anyio.to_thread.run_sync(cfut.result)
-                        except TimeoutError:
+                        try:
+                            with anyio.fail_after(5.0):
+                                await anyio.to_thread.run_sync(cfut.result)
+                        except AnyioTimeoutError:
                             logger.warning(
                                 "MCPConnectionManager: Timeout during cross-thread shutdown/close"
                             )

Also applies to: 17-18, 292-297, 319-324

🤖 Prompt for AI Agents
In src/mcp_agent/mcp/mcp_connection_manager.py around lines 6, 17-18, 292-297,
and 319-324, the except clauses currently catch the built-in TimeoutError but
anyio.fail_after raises anyio.exceptions.TimeoutError; import anyio (or from
anyio import exceptions as anyio_exceptions) at the top and change the except
handlers to catch anyio.exceptions.TimeoutError (or the alias you import) so the
timeouts are actually caught; update all listed except blocks accordingly.

import threading
from typing import (
AsyncGenerator,
Expand Down Expand Up @@ -237,50 +238,172 @@ def __init__(
self._tg_active = False
# Track the thread this manager was created in to ensure TaskGroup cleanup
self._thread_id = threading.get_ident()
# Event loop where the TaskGroup lives
self._loop: asyncio.AbstractEventLoop | None = None
# Owner task + coordination events for safe TaskGroup lifecycle
self._tg_owner_task: asyncio.Task | None = None
self._owner_tg: TaskGroup | None = None
self._tg_ready_event: Event = Event()
self._tg_close_event: Event = Event()
self._tg_closed_event: Event = Event()
# Ensure a single close sequence at a time on the origin loop
self._close_lock = Lock()

async def __aenter__(self):
# We create a task group to manage all server lifecycle tasks
tg = create_task_group()
# Enter the task group context
await tg.__aenter__()
self._tg_active = True
self._tg = tg
# Start the TaskGroup owner task and wait until ready
await self._start_owner()
# Record the loop and thread where the TaskGroup is running
try:
self._loop = asyncio.get_running_loop()
except RuntimeError:
self._loop = None
return self

async def __aexit__(self, exc_type, exc_val, exc_tb):
"""Ensure clean shutdown of all connections before exiting."""
await self.close(exc_type, exc_val, exc_tb)
# Close the owner TaskGroup in the same task that entered it
if self._owner_tg is not None:
try:
await self._owner_tg.__aexit__(exc_type, exc_val, exc_tb)
except Exception as e:
logger.warning(
f"MCPConnectionManager: Error during owner TaskGroup cleanup: {e}"
)
finally:
self._owner_tg = None

async def close(self, exc_type=None, exc_val=None, exc_tb=None):
"""Close all connections and tear down the internal TaskGroup safely.

This is thread-aware: if called from a different thread than the one where the
TaskGroup was created, it will signal the owner task on the original loop to
perform cleanup and await completion without violating task affinity.
"""
try:
# First request all servers to shutdown
logger.debug("MCPConnectionManager: shutting down all server tasks...")
await self.disconnect_all()
current_thread = threading.get_ident()
if current_thread == self._thread_id:
# Same thread: perform shutdown inline with exclusive access
async with self._close_lock:
logger.debug(
"MCPConnectionManager: shutting down all server tasks..."
)
await self.disconnect_all()
await anyio.sleep(0.5)
if self._tg_active:
self._tg_close_event.set()
# Wait for owner to report TaskGroup closed with an anyio timeout
try:
with anyio.fail_after(5.0):
await self._tg_closed_event.wait()
except TimeoutError:
logger.warning(
"MCPConnectionManager: Timeout waiting for TaskGroup owner to close"
)
# Do not attempt to close the owner TaskGroup here; __aexit__ will handle it
else:
# Different thread – run entire shutdown on the original loop to avoid cross-thread Event.set
if self._loop is not None:

# Add a small delay to allow for clean shutdown of subprocess transports, etc.
await anyio.sleep(0.5)
async def _shutdown_and_close():
logger.debug(
"MCPConnectionManager: shutting down all server tasks (origin loop)..."
)
async with self._close_lock:
await self.disconnect_all()
await anyio.sleep(0.5)
if self._tg_active:
self._tg_close_event.set()
await self._tg_closed_event.wait()

# Then close the task group if it's active and we're in the same thread
if self._tg_active and self._tg:
current_thread = threading.get_ident()
if current_thread == self._thread_id:
try:
await self._tg.__aexit__(exc_type, exc_val, exc_tb)
cfut = asyncio.run_coroutine_threadsafe(
_shutdown_and_close(), self._loop
)
# Wait in a worker thread to avoid blocking non-asyncio contexts
try:
with anyio.fail_after(5.0):
await anyio.to_thread.run_sync(cfut.result)
except TimeoutError:
logger.warning(
"MCPConnectionManager: Timeout during cross-thread shutdown/close"
)
Comment on lines +322 to +332
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Potential deadlock risk in cross-thread coordination

The current implementation creates a complex threading scenario that could lead to deadlocks. When asyncio.run_coroutine_threadsafe() schedules work on the original loop, and then anyio.to_thread.run_sync(cfut.result) blocks waiting for that work, there's a dependency chain across threads that can become problematic.

If the original event loop is already in the process of shutting down or becomes blocked, the future may never complete, leaving resources hanging despite the timeout. While the timeout provides some protection, it doesn't guarantee cleanup of the scheduled coroutine.

Consider alternatives:

  1. Use a simpler signaling mechanism between threads
  2. Implement a more robust cancellation pattern that ensures the coroutine is properly cancelled if the timeout occurs
  3. Add explicit cleanup code that runs after the timeout to ensure the future doesn't remain pending

This would improve shutdown reliability, especially in complex threading scenarios.

Suggested change
cfut = asyncio.run_coroutine_threadsafe(
_shutdown_and_close(), self._loop
)
# Wait in a worker thread to avoid blocking non-asyncio contexts
try:
with anyio.fail_after(5.0):
await anyio.to_thread.run_sync(cfut.result)
except TimeoutError:
logger.warning(
"MCPConnectionManager: Timeout during cross-thread shutdown/close"
)
cfut = asyncio.run_coroutine_threadsafe(
_shutdown_and_close(), self._loop
)
# Wait in a worker thread to avoid blocking non-asyncio contexts
try:
with anyio.fail_after(5.0):
await anyio.to_thread.run_sync(cfut.result)
except TimeoutError:
logger.warning(
"MCPConnectionManager: Timeout during cross-thread shutdown/close"
)
# Explicitly cancel the future to prevent the coroutine from
# continuing to run in the background after timeout
cfut.cancel()

Spotted by Diamond

Fix in Graphite


Is this helpful? React 👍 or 👎 to let us know.

try:
cfut.cancel()
except Exception:
pass
except Exception as e:
logger.warning(
f"MCPConnectionManager: Error during task group cleanup: {e}"
f"MCPConnectionManager: Error scheduling cross-thread shutdown: {e}"
)
else:
# Different thread – cannot safely cleanup anyio TaskGroup
logger.warning(
f"MCPConnectionManager: Task group cleanup called from different thread "
f"(created in {self._thread_id}, called from {current_thread}). Skipping cleanup."
"MCPConnectionManager: No event loop recorded for cleanup; skipping TaskGroup close"
)
# Always mark as inactive and drop reference
self._tg_active = False
self._tg = None
except AttributeError: # Handle missing `_exceptions`
pass
except Exception as e:
logger.warning(f"MCPConnectionManager: Error during shutdown: {e}")

async def _start_owner(self):
"""Start the TaskGroup owner task if not already running."""
# If an owner is active or TaskGroup is already active, nothing to do
if (self._tg_owner_task and not self._tg_owner_task.done()) or (
self._tg_active and self._tg is not None
):
return
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

There appears to be a potential race condition in the _start_owner() method. The check for an active owner task is not atomic with the subsequent operations. Between evaluating self._tg_owner_task and not self._tg_owner_task.done() and returning from the method, another thread could modify these variables. This could cause the method to incorrectly return early when it should actually start a new owner task, potentially resulting in a non-functional TaskGroup.

Consider using a lock or other synchronization mechanism to ensure this check-and-return operation is atomic across threads.

Spotted by Diamond

Fix in Graphite


Is this helpful? React 👍 or 👎 to let us know.

# If previous owner exists but is done (possibly with error), log and restart
if self._tg_owner_task and self._tg_owner_task.done():
try:
exc = self._tg_owner_task.exception()
if exc:
logger.warning(
f"MCPConnectionManager: restarting owner after error: {exc}"
)
except Exception:
logger.warning(
"MCPConnectionManager: restarting owner after unknown state"
)
# Reset coordination events
self._tg_ready_event = Event()
self._tg_close_event = Event()
self._tg_closed_event = Event()
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Potential Race Condition in Event Object Replacement

There's a synchronization concern in how the coordination events are being replaced. When creating new Event() instances:

self._tg_ready_event = Event()
self._tg_close_event = Event()
self._tg_closed_event = Event()

If other threads are currently waiting on any of these events (via await event.wait()), replacing the event objects will leave those threads waiting on stale event instances that will never be signaled. This could lead to deadlocks or indefinite hangs.

Consider implementing a synchronization mechanism to ensure safe event replacement, or restructure the code to avoid replacing events that might have waiters. One approach would be to reset existing events (if they have a .clear() method) rather than creating new instances.

Suggested change
# Reset coordination events
self._tg_ready_event = Event()
self._tg_close_event = Event()
self._tg_closed_event = Event()
# Reset coordination events
if self._tg_ready_event:
self._tg_ready_event.clear()
else:
self._tg_ready_event = Event()
if self._tg_close_event:
self._tg_close_event.clear()
else:
self._tg_close_event = Event()
if self._tg_closed_event:
self._tg_closed_event.clear()
else:
self._tg_closed_event = Event()

Spotted by Diamond

Fix in Graphite


Is this helpful? React 👍 or 👎 to let us know.

# Record loop and thread
try:
self._loop = asyncio.get_running_loop()
except RuntimeError:
self._loop = None
self._thread_id = threading.get_ident()
# Create an owner TaskGroup and start the owner task within it
owner_tg = create_task_group()
await owner_tg.__aenter__()
self._owner_tg = owner_tg
owner_tg.start_soon(self._tg_owner)
# Wait until the TaskGroup is ready
await self._tg_ready_event.wait()

async def _tg_owner(self):
"""Own the TaskGroup lifecycle so __aexit__ runs in the same task it was entered."""
try:
async with create_task_group() as tg:
self._tg = tg
self._tg_active = True
# Signal that TaskGroup is ready
self._tg_ready_event.set()
# Wait for close request
await self._tg_close_event.wait()
except Exception as e:
logger.warning(f"MCPConnectionManager: Error in TaskGroup owner: {e}")
finally:
# Mark closed and clear references
self._tg_active = False
self._tg = None
# Signal that TaskGroup has been closed
try:
self._tg_closed_event.set()
except Exception as e:
logger.warning(f"Failed to set _tg_closed_event: {e}")
Comment on lines +405 to +408
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The exception handling in the _tg_owner finally block could lead to deadlocks. If _tg_closed_event.set() fails, any threads waiting on this event will hang indefinitely while only a warning is logged. Consider making this signaling mechanism more robust by either:

  1. Using a more reliable synchronization primitive
  2. Implementing a retry mechanism with backoff
  3. Propagating the error to waiting threads so they can time out appropriately

This is particularly important since this event is used for cross-thread coordination during shutdown, where hanging threads can prevent clean application termination.

Suggested change
try:
self._tg_closed_event.set()
except Exception as e:
logger.warning(f"Failed to set _tg_closed_event: {e}")
max_retries = 3
retry_delay = 0.1 # seconds
for attempt in range(max_retries):
try:
self._tg_closed_event.set()
break
except Exception as e:
if attempt < max_retries - 1:
logger.warning(f"Failed to set _tg_closed_event (attempt {attempt+1}/{max_retries}): {e}. Retrying in {retry_delay}s")
time.sleep(retry_delay)
retry_delay *= 2 # Exponential backoff
else:
logger.error(f"Failed to set _tg_closed_event after {max_retries} attempts: {e}. Threads waiting on this event may hang.")

Spotted by Diamond

Fix in Graphite


Is this helpful? React 👍 or 👎 to let us know.


async def launch_server(
self,
server_name: str,
Expand All @@ -295,12 +418,9 @@ async def launch_server(
Connect to a server and return a RunningServer instance that will persist
until explicitly disconnected.
"""
# Create task group if it doesn't exist yet - make this method more resilient
# Ensure the TaskGroup owner is running - make this method more resilient
if not self._tg_active:
tg = create_task_group()
await tg.__aenter__()
self._tg_active = True
self._tg = tg
await self._start_owner()
logger.info(
f"MCPConnectionManager: Auto-created task group for server: {server_name}"
)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@
logger = get_logger(__name__)


class QualityRating(str, Enum):
class QualityRating(int, Enum):
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Type Safety Consideration: Changing QualityRating base class from str to int may impact backward compatibility. This could affect code that expects string values during JSON serialization, string comparisons, or logging operations.

Before merging, it would be valuable to:

  1. Verify all consumers of this enum can handle integer values
  2. Check if any serialization/deserialization logic depends on string representation
  3. Consider adding a string conversion method if needed to maintain compatibility

If this is an intentional change to enforce numeric semantics, adding a brief comment explaining the rationale would help future maintainers understand the design decision.

Suggested change
class QualityRating(int, Enum):
class QualityRating(int, Enum):
"""
Quality rating enum using integer values for numeric comparisons.
Provides string conversion methods for backward compatibility.
"""

Spotted by Diamond

Fix in Graphite


Is this helpful? React 👍 or 👎 to let us know.

"""Enum for evaluation quality ratings"""

POOR = 0 # Major improvements needed
Expand Down
Loading
Loading