Skip to content

Commit cf97450

Browse files
praisonai-triage-agent[bot]MervinPraisonCopilot
authored
fix: concurrency safety gaps in Core SDK (fixes #1458) (#1459)
* fix: concurrency safety gaps in Core SDK - Replace unsafe asyncio.Semaphore private attribute manipulation with proper sync-to-async bridge in concurrency.py - Fix ThreadPoolExecutor resource leak by using reusable agent-level executor in tool_execution.py - Add thread lock protection for unprotected plugin state in plugins/__init__.py Fixes #1458 Co-authored-by: MervinPraison <MervinPraison@users.noreply.github.com> * fix: avoid acquire_sync deadlock under running event loop Agent-Logs-Url: https://github.com/MervinPraison/PraisonAI/sessions/c9e7c913-a80d-474a-bd5d-731c129ad614 Co-authored-by: MervinPraison <454862+MervinPraison@users.noreply.github.com> * test: enforce fail-fast acquire_sync behavior in async contexts Agent-Logs-Url: https://github.com/MervinPraison/PraisonAI/sessions/c9e7c913-a80d-474a-bd5d-731c129ad614 Co-authored-by: MervinPraison <454862+MervinPraison@users.noreply.github.com> --------- Co-authored-by: praisonai-triage-agent[bot] <272766704+praisonai-triage-agent[bot]@users.noreply.github.com> Co-authored-by: MervinPraison <MervinPraison@users.noreply.github.com> Co-authored-by: copilot-swe-agent[bot] <198982749+Copilot@users.noreply.github.com> Co-authored-by: MervinPraison <454862+MervinPraison@users.noreply.github.com>
1 parent 9147e55 commit cf97450

File tree

4 files changed

+58
-40
lines changed

4 files changed

+58
-40
lines changed

src/praisonai-agents/praisonaiagents/agent/concurrency.py

Lines changed: 13 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -86,26 +86,27 @@ async def acquire(self, agent_name: str) -> None:
8686
def acquire_sync(self, agent_name: str) -> None:
8787
"""Synchronous acquire — for non-async code paths.
8888
89-
Note: This creates/reuses an event loop internally.
9089
Prefer async acquire() when possible.
90+
If called while an event loop is already running in the current thread,
91+
this method raises RuntimeError to avoid deadlock.
9192
"""
9293
sem = self._get_semaphore(agent_name)
9394
if sem is None:
9495
return
9596
try:
9697
asyncio.get_running_loop()
97-
# If we're in an async context, we can't block
98-
# Just try_acquire or no-op with warning
99-
if not sem._value > 0:
100-
logger.warning(
101-
f"Sync acquire for '{agent_name}' while async loop running and semaphore full. "
102-
f"Consider using async acquire() instead."
103-
)
104-
# Decrement manually for sync context
105-
sem._value = max(0, sem._value - 1)
10698
except RuntimeError:
107-
# No running loop — safe to use asyncio.run
108-
asyncio.get_event_loop().run_until_complete(sem.acquire())
99+
# No running loop — safe to create one
100+
loop = asyncio.new_event_loop()
101+
try:
102+
loop.run_until_complete(sem.acquire())
103+
finally:
104+
loop.close()
105+
else:
106+
raise RuntimeError(
107+
f"acquire_sync('{agent_name}') cannot be called with a running event loop; "
108+
"use async acquire() in async contexts."
109+
)
109110

110111
def release(self, agent_name: str) -> None:
111112
"""Release concurrency slot for agent. No-op if unlimited."""

src/praisonai-agents/praisonaiagents/agent/tool_execution.py

Lines changed: 12 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -202,25 +202,19 @@ def execute_with_context():
202202
with with_injection_context(state):
203203
return self._execute_tool_impl(function_name, arguments)
204204

205-
# Use explicit executor lifecycle to actually bound execution time
206-
executor = concurrent.futures.ThreadPoolExecutor(max_workers=1)
205+
# Use reusable executor to prevent resource leaks
206+
if not hasattr(self, '_tool_executor'):
207+
self._tool_executor = concurrent.futures.ThreadPoolExecutor(
208+
max_workers=2, thread_name_prefix=f"tool-{self.name}"
209+
)
210+
211+
future = self._tool_executor.submit(ctx.run, execute_with_context)
207212
try:
208-
future = executor.submit(ctx.run, execute_with_context)
209-
try:
210-
result = future.result(timeout=tool_timeout)
211-
except concurrent.futures.TimeoutError:
212-
# Cancel and shutdown immediately to avoid blocking
213-
future.cancel()
214-
executor.shutdown(wait=False, cancel_futures=True)
215-
logging.warning(f"Tool {function_name} timed out after {tool_timeout}s")
216-
result = {"error": f"Tool timed out after {tool_timeout}s", "timeout": True}
217-
else:
218-
# Normal completion - shutdown gracefully
219-
executor.shutdown(wait=False)
220-
finally:
221-
# Ensure executor is always cleaned up
222-
if not executor._shutdown:
223-
executor.shutdown(wait=False)
213+
result = future.result(timeout=tool_timeout)
214+
except concurrent.futures.TimeoutError:
215+
future.cancel()
216+
logging.warning(f"Tool {function_name} timed out after {tool_timeout}s")
217+
result = {"error": f"Tool timed out after {tool_timeout}s", "timeout": True}
224218
else:
225219
with with_injection_context(state):
226220
result = self._execute_tool_impl(function_name, arguments)

src/praisonai-agents/praisonaiagents/plugins/__init__.py

Lines changed: 18 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -78,6 +78,9 @@ def my_plugin_func(hook_type, *args, **kwargs):
7878
# ============================================================================
7979

8080
# Global state for plugin system (lazy initialized)
81+
import threading
82+
83+
_plugins_lock = threading.Lock()
8184
_plugins_enabled: bool = False
8285
_enabled_plugin_names: list = None # None = all, list = specific
8386

@@ -108,8 +111,9 @@ def enable(plugins: list = None) -> None:
108111
"""
109112
global _plugins_enabled, _enabled_plugin_names
110113

111-
_plugins_enabled = True
112-
_enabled_plugin_names = plugins # None = all, list = specific
114+
with _plugins_lock:
115+
_plugins_enabled = True
116+
_enabled_plugin_names = plugins # None = all, list = specific
113117

114118
# Get plugin manager and auto-discover
115119
from .manager import get_plugin_manager
@@ -119,10 +123,14 @@ def enable(plugins: list = None) -> None:
119123
manager.auto_discover_plugins()
120124
manager.discover_entry_points()
121125

126+
# Snapshot the names under lock to avoid TOCTOU
127+
with _plugins_lock:
128+
target_plugins = list(_enabled_plugin_names) if _enabled_plugin_names is not None else None
129+
122130
# Enable specific plugins or all
123-
if plugins is not None:
131+
if target_plugins is not None:
124132
# Enable only specified plugins
125-
for name in plugins:
133+
for name in target_plugins:
126134
manager.enable(name)
127135
else:
128136
# Enable all discovered plugins
@@ -158,8 +166,9 @@ def disable(plugins: list = None) -> None:
158166
manager.disable(name)
159167
else:
160168
# Disable all plugins
161-
_plugins_enabled = False
162-
_enabled_plugin_names = None
169+
with _plugins_lock:
170+
_plugins_enabled = False
171+
_enabled_plugin_names = None
163172
for plugin_info in manager.list_plugins():
164173
manager.disable(plugin_info.get("name", ""))
165174

@@ -225,10 +234,9 @@ def is_enabled(name: str = None) -> bool:
225234
Returns:
226235
True if enabled, False otherwise.
227236
"""
228-
global _plugins_enabled
229-
230-
if name is None:
231-
return _plugins_enabled
237+
with _plugins_lock:
238+
if name is None:
239+
return _plugins_enabled
232240

233241
from .manager import get_plugin_manager
234242
manager = get_plugin_manager()

src/praisonai-agents/tests/unit/agent/test_agent_concurrency.py

Lines changed: 15 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -106,3 +106,18 @@ def test_sync_acquire_release(self):
106106
# Sync acquire should work
107107
reg.acquire_sync("sync_agent")
108108
reg.release("sync_agent")
109+
110+
@pytest.mark.asyncio
111+
async def test_sync_acquire_running_loop_noop(self):
112+
"""Sync acquire in async context should fail fast without changing state."""
113+
from praisonaiagents.agent.concurrency import ConcurrencyRegistry
114+
reg = ConcurrencyRegistry()
115+
reg.set_limit("loop_agent", 1)
116+
await reg.acquire("loop_agent")
117+
with pytest.raises(RuntimeError, match="running event loop"):
118+
reg.acquire_sync("loop_agent")
119+
with pytest.raises(asyncio.TimeoutError):
120+
await asyncio.wait_for(reg.acquire("loop_agent"), timeout=0.05)
121+
reg.release("loop_agent")
122+
await asyncio.wait_for(reg.acquire("loop_agent"), timeout=0.05)
123+
reg.release("loop_agent")

0 commit comments

Comments
 (0)