Skip to content

Commit b2c278e

Browse files
authored
refactor: improve MCP tool execution handling with concurrent futures (#3854)
- Enhanced the MCP tool execution in both synchronous and asynchronous contexts by utilizing for better event loop management. - Updated error handling to provide clearer messages for connection issues and task cancellations. - Added tests to validate MCP tool execution in both sync and async scenarios, ensuring robust functionality across different contexts.
1 parent f6aed97 commit b2c278e

File tree

3 files changed

+97
-15
lines changed

3 files changed

+97
-15
lines changed

lib/crewai/src/crewai/agent/core.py

Lines changed: 21 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -860,19 +860,29 @@ async def _setup_client_and_list_tools() -> list[dict[str, Any]]:
860860

861861
try:
862862
try:
863-
tools_list = asyncio.run(_setup_client_and_list_tools())
864-
except RuntimeError as e:
865-
error_msg = str(e).lower()
866-
if "cancel scope" in error_msg or "task" in error_msg:
863+
asyncio.get_running_loop()
864+
import concurrent.futures
865+
866+
with concurrent.futures.ThreadPoolExecutor() as executor:
867+
future = executor.submit(
868+
asyncio.run, _setup_client_and_list_tools()
869+
)
870+
tools_list = future.result()
871+
except RuntimeError:
872+
try:
873+
tools_list = asyncio.run(_setup_client_and_list_tools())
874+
except RuntimeError as e:
875+
error_msg = str(e).lower()
876+
if "cancel scope" in error_msg or "task" in error_msg:
877+
raise ConnectionError(
878+
"MCP connection failed due to event loop cleanup issues. "
879+
"This may be due to authentication errors or server unavailability."
880+
) from e
881+
except asyncio.CancelledError as e:
867882
raise ConnectionError(
868-
"MCP connection failed due to event loop cleanup issues. "
869-
"This may be due to authentication errors or server unavailability."
883+
"MCP connection was cancelled. This may indicate an authentication "
884+
"error or server unavailability."
870885
) from e
871-
except asyncio.CancelledError as e:
872-
raise ConnectionError(
873-
"MCP connection was cancelled. This may indicate an authentication "
874-
"error or server unavailability."
875-
) from e
876886

877887
if mcp_config.tool_filter:
878888
filtered_tools = []

lib/crewai/src/crewai/tools/mcp_native_tool.py

Lines changed: 11 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -86,9 +86,17 @@ def _run(self, **kwargs) -> str:
8686
Result from the MCP tool execution.
8787
"""
8888
try:
89-
# Always use asyncio.run() to create a fresh event loop
90-
# This ensures the async context managers work correctly
91-
return asyncio.run(self._run_async(**kwargs))
89+
try:
90+
asyncio.get_running_loop()
91+
92+
import concurrent.futures
93+
94+
with concurrent.futures.ThreadPoolExecutor() as executor:
95+
coro = self._run_async(**kwargs)
96+
future = executor.submit(asyncio.run, coro)
97+
return future.result()
98+
except RuntimeError:
99+
return asyncio.run(self._run_async(**kwargs))
92100

93101
except Exception as e:
94102
raise RuntimeError(

lib/crewai/tests/mcp/test_mcp_config.py

Lines changed: 65 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,5 @@
1-
from unittest.mock import AsyncMock, patch
1+
import asyncio
2+
from unittest.mock import AsyncMock, MagicMock, patch
23

34
import pytest
45
from crewai.agent.core import Agent
@@ -134,3 +135,66 @@ def test_agent_with_sse_mcp_config(mock_tool_definitions):
134135
transport = call_args.kwargs["transport"]
135136
assert transport.url == "https://api.example.com/mcp/sse"
136137
assert transport.headers == {"Authorization": "Bearer test_token"}
138+
139+
140+
def test_mcp_tool_execution_in_sync_context(mock_tool_definitions):
141+
"""Test MCPNativeTool execution in synchronous context (normal crew execution)."""
142+
http_config = MCPServerHTTP(url="https://api.example.com/mcp")
143+
144+
with patch("crewai.agent.core.MCPClient") as mock_client_class:
145+
mock_client = AsyncMock()
146+
mock_client.list_tools = AsyncMock(return_value=mock_tool_definitions)
147+
mock_client.connected = False
148+
mock_client.connect = AsyncMock()
149+
mock_client.disconnect = AsyncMock()
150+
mock_client.call_tool = AsyncMock(return_value="test result")
151+
mock_client_class.return_value = mock_client
152+
153+
agent = Agent(
154+
role="Test Agent",
155+
goal="Test goal",
156+
backstory="Test backstory",
157+
mcps=[http_config],
158+
)
159+
160+
tools = agent.get_mcp_tools([http_config])
161+
assert len(tools) == 2
162+
163+
164+
tool = tools[0]
165+
result = tool.run(query="test query")
166+
167+
assert result == "test result"
168+
mock_client.call_tool.assert_called()
169+
170+
171+
@pytest.mark.asyncio
172+
async def test_mcp_tool_execution_in_async_context(mock_tool_definitions):
173+
"""Test MCPNativeTool execution in async context (e.g., from a Flow)."""
174+
http_config = MCPServerHTTP(url="https://api.example.com/mcp")
175+
176+
with patch("crewai.agent.core.MCPClient") as mock_client_class:
177+
mock_client = AsyncMock()
178+
mock_client.list_tools = AsyncMock(return_value=mock_tool_definitions)
179+
mock_client.connected = False
180+
mock_client.connect = AsyncMock()
181+
mock_client.disconnect = AsyncMock()
182+
mock_client.call_tool = AsyncMock(return_value="test result")
183+
mock_client_class.return_value = mock_client
184+
185+
agent = Agent(
186+
role="Test Agent",
187+
goal="Test goal",
188+
backstory="Test backstory",
189+
mcps=[http_config],
190+
)
191+
192+
tools = agent.get_mcp_tools([http_config])
193+
assert len(tools) == 2
194+
195+
196+
tool = tools[0]
197+
result = tool.run(query="test query")
198+
199+
assert result == "test result"
200+
mock_client.call_tool.assert_called()

0 commit comments

Comments
 (0)