Skip to content
Merged
Show file tree
Hide file tree
Changes from 3 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
35 changes: 20 additions & 15 deletions src/mcp_agent/cli/cloud/commands/workflows/runs/main.py
Original file line number Diff line number Diff line change
Expand Up @@ -71,7 +71,9 @@ async def _list_workflow_runs_async(
workflows = workflows_data
if status:
status_filter = _get_status_filter(status)
workflows = [w for w in workflows if _matches_status(w, status_filter)]
workflows = [
w for w in workflows if _matches_status(w, status_filter)
]

if limit:
workflows = workflows[:limit]
Expand Down Expand Up @@ -144,7 +146,7 @@ def _get_status_filter(status: str) -> str:

def _matches_status(workflow: dict, status_filter: str) -> bool:
"""Check if workflow matches the status filter.

Note: We use string-based matching instead of protobuf enum values because
the MCP tool response format returns status as strings, not enum objects.
This approach is more flexible and doesn't require maintaining sync with
Expand All @@ -158,9 +160,7 @@ def _matches_status(workflow: dict, status_filter: str) -> bool:

def _print_workflows_text(workflows, status_filter, server_id_or_url):
"""Print workflows in text format."""
console.print(
f"\n[bold blue]📊 Workflow Runs ({len(workflows)})[/bold blue]"
)
console.print(f"\n[bold blue]📊 Workflow Runs ({len(workflows)})[/bold blue]")

if not workflows:
print_info("No workflow runs found for this server.")
Expand All @@ -169,7 +169,7 @@ def _print_workflows_text(workflows, status_filter, server_id_or_url):
for i, workflow in enumerate(workflows):
if i > 0:
console.print()

if isinstance(workflow, dict):
workflow_id = workflow.get("workflow_id", "N/A")
name = workflow.get("name", "N/A")
Expand All @@ -184,27 +184,28 @@ def _print_workflows_text(workflows, status_filter, server_id_or_url):
run_id = getattr(workflow, "run_id", "N/A")
created_at = getattr(workflow, "created_at", "N/A")
principal_id = getattr(workflow, "principal_id", "N/A")

status_display = _get_status_display(execution_status)

if created_at and created_at != "N/A":
if hasattr(created_at, "strftime"):
created_display = created_at.strftime("%Y-%m-%d %H:%M:%S")
else:
try:
from datetime import datetime

dt = datetime.fromisoformat(str(created_at).replace("Z", "+00:00"))
created_display = dt.strftime("%Y-%m-%d %H:%M:%S")
except (ValueError, TypeError):
created_display = str(created_at)
else:
created_display = "N/A"

console.print(f"[bold cyan]{name or 'Unnamed'}[/bold cyan] {status_display}")
console.print(f" Workflow ID: {workflow_id}")
console.print(f" Run ID: {run_id}")
console.print(f" Created: {created_display}")

if principal_id and principal_id != "N/A":
console.print(f" Principal: {principal_id}")

Expand All @@ -228,23 +229,27 @@ def _workflow_to_dict(workflow):
"""Convert workflow dict to standardized dictionary format."""
if isinstance(workflow, dict):
return workflow

return {
"workflow_id": getattr(workflow, "workflow_id", None),
"run_id": getattr(workflow, "run_id", None),
"name": getattr(workflow, "name", None),
"created_at": getattr(workflow, "created_at", None).isoformat() if getattr(workflow, "created_at", None) else None,
"execution_status": getattr(workflow, "execution_status", None).value if getattr(workflow, "execution_status", None) else None,
"created_at": getattr(workflow, "created_at", None).isoformat()
if getattr(workflow, "created_at", None)
else None,
"execution_status": getattr(workflow, "execution_status", None).value
if getattr(workflow, "execution_status", None)
else None,
}


def _get_status_display(status):
"""Convert status to display string with emoji."""
if not status:
return "❓ Unknown"

status_str = str(status).lower()

if "running" in status_str:
return "[green]🟢 Running[/green]"
elif "completed" in status_str:
Expand Down
2 changes: 0 additions & 2 deletions src/mcp_agent/cli/mcp_app/api_client.py
Original file line number Diff line number Diff line change
Expand Up @@ -63,7 +63,6 @@ class CanDoActionsResponse(BaseModel):
canDoActions: Optional[List[CanDoActionCheck]] = []



APP_ID_PREFIX = "app_"
APP_CONFIG_ID_PREFIX = "apcnf_"

Expand Down Expand Up @@ -467,7 +466,6 @@ async def list_app_configurations(
response = await self.post("/mcp_app/list_app_configurations", payload)
return ListAppConfigurationsResponse(**response.json())


async def delete_app(self, app_id: str) -> str:
"""Delete an MCP App via the API.

Expand Down
17 changes: 4 additions & 13 deletions src/mcp_agent/mcp/mcp_aggregator.py
Original file line number Diff line number Diff line change
Expand Up @@ -237,28 +237,19 @@ async def close(self):
and self.context._mcp_connection_manager
== self._persistent_connection_manager
):
# Add timeout protection for the disconnect operation
# Close via manager's thread-aware close()
try:
await asyncio.wait_for(
self._persistent_connection_manager.disconnect_all(),
self._persistent_connection_manager.close(),
timeout=5.0,
)
except asyncio.TimeoutError:
logger.warning(
"Timeout during disconnect_all(), forcing shutdown"
"Timeout during connection manager close(), forcing shutdown"
)

# Avoid calling __aexit__ directly across threads; mark inactive
try:
if hasattr(
self._persistent_connection_manager,
"_tg_active",
):
self._persistent_connection_manager._tg_active = False
self._persistent_connection_manager._tg = None
except Exception as e:
logger.warning(
f"Error during connection manager state cleanup: {e}"
f"Error during connection manager close(): {e}"
)

# Clean up the connection manager from the context
Expand Down
151 changes: 123 additions & 28 deletions src/mcp_agent/mcp/mcp_connection_manager.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
"""

from datetime import timedelta
import asyncio
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🛠️ Refactor suggestion

Catch AnyIO TimeoutError, not built-in TimeoutError

anyio.fail_after raises anyio.exceptions.TimeoutError. The current except TimeoutError won’t catch it.

-from anyio import Event, create_task_group, Lock
+from anyio import Event, create_task_group, Lock
+from anyio.exceptions import TimeoutError as AnyioTimeoutError
-                    try:
-                        with anyio.fail_after(5.0):
-                            await self._tg_closed_event.wait()
-                    except TimeoutError:
+                    try:
+                        with anyio.fail_after(5.0):
+                            await self._tg_closed_event.wait()
+                    except AnyioTimeoutError:
                         logger.warning(
                             "MCPConnectionManager: Timeout waiting for TaskGroup owner to close"
                         )
-                        try:
-                            with anyio.fail_after(5.0):
-                                await anyio.to_thread.run_sync(cfut.result)
-                        except TimeoutError:
+                        try:
+                            with anyio.fail_after(5.0):
+                                await anyio.to_thread.run_sync(cfut.result)
+                        except AnyioTimeoutError:
                             logger.warning(
                                 "MCPConnectionManager: Timeout during cross-thread shutdown/close"
                             )

Also applies to: 17-18, 292-297, 319-324

🤖 Prompt for AI Agents
In src/mcp_agent/mcp/mcp_connection_manager.py around lines 6, 17-18, 292-297,
and 319-324, the except clauses currently catch the built-in TimeoutError but
anyio.fail_after raises anyio.exceptions.TimeoutError; import anyio (or from
anyio import exceptions as anyio_exceptions) at the top and change the except
handlers to catch anyio.exceptions.TimeoutError (or the alias you import) so the
timeouts are actually caught; update all listed except blocks accordingly.

import threading
from typing import (
AsyncGenerator,
Expand Down Expand Up @@ -237,50 +238,147 @@ def __init__(
self._tg_active = False
# Track the thread this manager was created in to ensure TaskGroup cleanup
self._thread_id = threading.get_ident()
# Event loop where the TaskGroup lives
self._loop: asyncio.AbstractEventLoop | None = None
# Owner task + coordination events for safe TaskGroup lifecycle
self._tg_owner_task: asyncio.Task | None = None
self._owner_tg: TaskGroup | None = None
self._tg_ready_event: Event = Event()
self._tg_close_event: Event = Event()
self._tg_closed_event: Event = Event()

async def __aenter__(self):
# We create a task group to manage all server lifecycle tasks
tg = create_task_group()
# Enter the task group context
await tg.__aenter__()
self._tg_active = True
self._tg = tg
# Start the TaskGroup owner task and wait until ready
await self._start_owner()
# Record the loop and thread where the TaskGroup is running
try:
self._loop = asyncio.get_running_loop()
except RuntimeError:
self._loop = None
return self

async def __aexit__(self, exc_type, exc_val, exc_tb):
"""Ensure clean shutdown of all connections before exiting."""
await self.close(exc_type, exc_val, exc_tb)
# Close the owner TaskGroup in the same task that entered it
if self._owner_tg is not None:
try:
await self._owner_tg.__aexit__(exc_type, exc_val, exc_tb)
except Exception as e:
logger.warning(
f"MCPConnectionManager: Error during owner TaskGroup cleanup: {e}"
)
finally:
self._owner_tg = None

async def close(self, exc_type=None, exc_val=None, exc_tb=None):
"""Close all connections and tear down the internal TaskGroup safely.

This is thread-aware: if called from a different thread than the one where the
TaskGroup was created, it will signal the owner task on the original loop to
perform cleanup and await completion without violating task affinity.
"""
try:
# First request all servers to shutdown
logger.debug("MCPConnectionManager: shutting down all server tasks...")
await self.disconnect_all()
current_thread = threading.get_ident()
if current_thread == self._thread_id:
# Same thread: perform shutdown inline
logger.debug("MCPConnectionManager: shutting down all server tasks...")
await self.disconnect_all()
await anyio.sleep(0.5)
if self._tg_active:
self._tg_close_event.set()
# Wait for owner to report TaskGroup closed with an anyio timeout
try:
with anyio.fail_after(5.0):
await self._tg_closed_event.wait()
except TimeoutError:
logger.warning(
"MCPConnectionManager: Timeout waiting for TaskGroup owner to close"
)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Potential Race Condition in Thread-Aware Cleanup

The current implementation has a race condition risk between the thread identity check and the subsequent async operations. Since await points allow task switching, another thread could call close() after the thread check passes but before the TaskGroup cleanup completes.

Consider restructuring to make the critical section more atomic:

if current_thread == self._thread_id:
    # Capture state atomically before any awaits
    is_active = self._tg_active
    
    # Then perform async operations
    logger.debug("MCPConnectionManager: shutting down all server tasks...")
    await self.disconnect_all()
    await anyio.sleep(0.5)
    
    # Use the captured state for the conditional
    if is_active:
        self._tg_close_event.set()
        # ...

Alternatively, consider using a lock mechanism to ensure exclusive access during the entire cleanup sequence.

Suggested change
if current_thread == self._thread_id:
# Same thread: perform shutdown inline
logger.debug("MCPConnectionManager: shutting down all server tasks...")
await self.disconnect_all()
await anyio.sleep(0.5)
if self._tg_active:
self._tg_close_event.set()
# Wait for owner to report TaskGroup closed with an anyio timeout
try:
with anyio.fail_after(5.0):
await self._tg_closed_event.wait()
except TimeoutError:
logger.warning(
"MCPConnectionManager: Timeout waiting for TaskGroup owner to close"
)
if current_thread == self._thread_id:
# Same thread: perform shutdown inline
# Capture state atomically before any awaits
is_active = self._tg_active
logger.debug("MCPConnectionManager: shutting down all server tasks...")
await self.disconnect_all()
await anyio.sleep(0.5)
# Use the captured state for the conditional
if is_active:
self._tg_close_event.set()
# Wait for owner to report TaskGroup closed with an anyio timeout
try:
with anyio.fail_after(5.0):
await self._tg_closed_event.wait()
except TimeoutError:
logger.warning(
"MCPConnectionManager: Timeout waiting for TaskGroup owner to close"
)

Spotted by Diamond

Fix in Graphite


Is this helpful? React 👍 or 👎 to let us know.

# Do not attempt to close the owner TaskGroup here; __aexit__ will handle it
else:
# Different thread – run entire shutdown on the original loop to avoid cross-thread Event.set
if self._loop is not None:

# Add a small delay to allow for clean shutdown of subprocess transports, etc.
await anyio.sleep(0.5)
async def _shutdown_and_close():
logger.debug(
"MCPConnectionManager: shutting down all server tasks (origin loop)..."
)
await self.disconnect_all()
await anyio.sleep(0.5)
if self._tg_active:
self._tg_close_event.set()
await self._tg_closed_event.wait()

# Then close the task group if it's active and we're in the same thread
if self._tg_active and self._tg:
current_thread = threading.get_ident()
if current_thread == self._thread_id:
try:
await self._tg.__aexit__(exc_type, exc_val, exc_tb)
cfut = asyncio.run_coroutine_threadsafe(
_shutdown_and_close(), self._loop
)
# Wait in a worker thread to avoid blocking non-asyncio contexts
try:
with anyio.fail_after(5.0):
await anyio.to_thread.run_sync(cfut.result)
except TimeoutError:
logger.warning(
"MCPConnectionManager: Timeout during cross-thread shutdown/close"
)
Comment on lines +322 to +332
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Potential deadlock risk in cross-thread coordination

The current implementation creates a complex threading scenario that could lead to deadlocks. When asyncio.run_coroutine_threadsafe() schedules work on the original loop, and then anyio.to_thread.run_sync(cfut.result) blocks waiting for that work, there's a dependency chain across threads that can become problematic.

If the original event loop is already in the process of shutting down or becomes blocked, the future may never complete, leaving resources hanging despite the timeout. While the timeout provides some protection, it doesn't guarantee cleanup of the scheduled coroutine.

Consider alternatives:

  1. Use a simpler signaling mechanism between threads
  2. Implement a more robust cancellation pattern that ensures the coroutine is properly cancelled if the timeout occurs
  3. Add explicit cleanup code that runs after the timeout to ensure the future doesn't remain pending

This would improve shutdown reliability, especially in complex threading scenarios.

Suggested change
cfut = asyncio.run_coroutine_threadsafe(
_shutdown_and_close(), self._loop
)
# Wait in a worker thread to avoid blocking non-asyncio contexts
try:
with anyio.fail_after(5.0):
await anyio.to_thread.run_sync(cfut.result)
except TimeoutError:
logger.warning(
"MCPConnectionManager: Timeout during cross-thread shutdown/close"
)
cfut = asyncio.run_coroutine_threadsafe(
_shutdown_and_close(), self._loop
)
# Wait in a worker thread to avoid blocking non-asyncio contexts
try:
with anyio.fail_after(5.0):
await anyio.to_thread.run_sync(cfut.result)
except TimeoutError:
logger.warning(
"MCPConnectionManager: Timeout during cross-thread shutdown/close"
)
# Explicitly cancel the future to prevent the coroutine from
# continuing to run in the background after timeout
cfut.cancel()

Spotted by Diamond

Fix in Graphite


Is this helpful? React 👍 or 👎 to let us know.

except Exception as e:
logger.warning(
f"MCPConnectionManager: Error during task group cleanup: {e}"
f"MCPConnectionManager: Error scheduling cross-thread shutdown: {e}"
)
else:
# Different thread – cannot safely cleanup anyio TaskGroup
logger.warning(
f"MCPConnectionManager: Task group cleanup called from different thread "
f"(created in {self._thread_id}, called from {current_thread}). Skipping cleanup."
"MCPConnectionManager: No event loop recorded for cleanup; skipping TaskGroup close"
)
# Always mark as inactive and drop reference
self._tg_active = False
self._tg = None
except AttributeError: # Handle missing `_exceptions`
pass
except Exception as e:
logger.warning(f"MCPConnectionManager: Error during shutdown: {e}")

async def _start_owner(self):
"""Start the TaskGroup owner task if not already running."""
if self._tg_owner_task and not self._tg_owner_task.done():
return
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The task lifecycle check has a potential issue: checking if self._tg_owner_task and not self._tg_owner_task.done() only verifies that an existing task hasn't completed, but doesn't distinguish between successful completion and failure. A task that failed with an exception would have done() return True, causing this check to miss it, but the TaskGroup might not be properly initialized.

Consider enhancing this check to also verify the TaskGroup's state or to explicitly handle tasks that completed with exceptions. Perhaps something like:

if (self._tg_owner_task and not self._tg_owner_task.done()) or (self._tg_active and self._tg is not None):
    return

Or alternatively, add explicit exception handling to restart the TaskGroup if the previous one failed.

Spotted by Diamond

Fix in Graphite


Is this helpful? React 👍 or 👎 to let us know.

# Reset coordination events
self._tg_ready_event = Event()
self._tg_close_event = Event()
self._tg_closed_event = Event()
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Potential Race Condition in Event Object Replacement

There's a synchronization concern in how the coordination events are being replaced. When creating new Event() instances:

self._tg_ready_event = Event()
self._tg_close_event = Event()
self._tg_closed_event = Event()

If other threads are currently waiting on any of these events (via await event.wait()), replacing the event objects will leave those threads waiting on stale event instances that will never be signaled. This could lead to deadlocks or indefinite hangs.

Consider implementing a synchronization mechanism to ensure safe event replacement, or restructure the code to avoid replacing events that might have waiters. One approach would be to reset existing events (if they have a .clear() method) rather than creating new instances.

Suggested change
# Reset coordination events
self._tg_ready_event = Event()
self._tg_close_event = Event()
self._tg_closed_event = Event()
# Reset coordination events
if self._tg_ready_event:
self._tg_ready_event.clear()
else:
self._tg_ready_event = Event()
if self._tg_close_event:
self._tg_close_event.clear()
else:
self._tg_close_event = Event()
if self._tg_closed_event:
self._tg_closed_event.clear()
else:
self._tg_closed_event = Event()

Spotted by Diamond

Fix in Graphite


Is this helpful? React 👍 or 👎 to let us know.

# Record loop and thread
try:
self._loop = asyncio.get_running_loop()
except RuntimeError:
self._loop = None
self._thread_id = threading.get_ident()
# Create an owner TaskGroup and start the owner task within it
owner_tg = create_task_group()
await owner_tg.__aenter__()
self._owner_tg = owner_tg
owner_tg.start_soon(self._tg_owner)
# Wait until the TaskGroup is ready
await self._tg_ready_event.wait()

async def _tg_owner(self):
"""Own the TaskGroup lifecycle so __aexit__ runs in the same task it was entered."""
try:
async with create_task_group() as tg:
self._tg = tg
self._tg_active = True
# Signal that TaskGroup is ready
self._tg_ready_event.set()
# Wait for close request
await self._tg_close_event.wait()
except Exception as e:
logger.warning(f"MCPConnectionManager: Error in TaskGroup owner: {e}")
finally:
# Mark closed and clear references
self._tg_active = False
self._tg = None
# Signal that TaskGroup has been closed
try:
self._tg_closed_event.set()
except Exception:
pass
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The silent exception handling here could create a deadlock risk. If _tg_closed_event.set() fails, any code waiting on _tg_closed_event.wait() will hang indefinitely, potentially causing the entire shutdown process to stall.

Consider either:

  1. Logging the exception to help with debugging: logger.warning(f"Failed to set _tg_closed_event: {e}")
  2. Or implementing a fallback mechanism to ensure waiting tasks can proceed even if the event can't be set

This is particularly important since preventing shutdown hangs is a core goal of this PR.

Suggested change
try:
self._tg_closed_event.set()
except Exception:
pass
try:
self._tg_closed_event.set()
except Exception as e:
logger.warning(f"Failed to set _tg_closed_event: {e}")

Spotted by Diamond

Fix in Graphite


Is this helpful? React 👍 or 👎 to let us know.


async def launch_server(
self,
server_name: str,
Expand All @@ -295,12 +393,9 @@ async def launch_server(
Connect to a server and return a RunningServer instance that will persist
until explicitly disconnected.
"""
# Create task group if it doesn't exist yet - make this method more resilient
# Ensure the TaskGroup owner is running - make this method more resilient
if not self._tg_active:
tg = create_task_group()
await tg.__aenter__()
self._tg_active = True
self._tg = tg
await self._start_owner()
logger.info(
f"MCPConnectionManager: Auto-created task group for server: {server_name}"
)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@
logger = get_logger(__name__)


class QualityRating(str, Enum):
class QualityRating(int, Enum):
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Type Safety Consideration: Changing QualityRating base class from str to int may impact backward compatibility. This could affect code that expects string values during JSON serialization, string comparisons, or logging operations.

Before merging, it would be valuable to:

  1. Verify all consumers of this enum can handle integer values
  2. Check if any serialization/deserialization logic depends on string representation
  3. Consider adding a string conversion method if needed to maintain compatibility

If this is an intentional change to enforce numeric semantics, adding a brief comment explaining the rationale would help future maintainers understand the design decision.

Suggested change
class QualityRating(int, Enum):
class QualityRating(int, Enum):
"""
Quality rating enum using integer values for numeric comparisons.
Provides string conversion methods for backward compatibility.
"""

Spotted by Diamond

Fix in Graphite


Is this helpful? React 👍 or 👎 to let us know.

"""Enum for evaluation quality ratings"""

POOR = 0 # Major improvements needed
Expand Down
Loading