Skip to content

refactor: Add connection pooling for HTTP clients#160

Open
chefsale wants to merge 18 commits intomasterfrom
bounty-fix/add-connection-pooling-for-http-clients-ml7qskyb
Open

refactor: Add connection pooling for HTTP clients#160
chefsale wants to merge 18 commits intomasterfrom
bounty-fix/add-connection-pooling-for-http-clients-ml7qskyb

Conversation

@chefsale
Copy link
Member

@chefsale chefsale commented Feb 4, 2026

Implement connection pooling for HTTP clients in auth.py to improve performance and reduce connection overhead.


Open in Cursor Open in Web


Note

Medium Risk
Touches HTTP/auth request plumbing and event-loop/session lifecycle, so regressions could manifest as stuck/failed requests or resource leaks despite being a largely internal refactor with tests.

Overview
Introduces a SessionManager in auth.py that provides a shared, pooled aiohttp.ClientSession (per-loop aware, thread-safe singleton, cookies disabled) plus helpers like get_shared_session() and run_with_shared_session_cleanup().

Updates AuthManager.authenticate() and AuthManager.refresh() to reuse the pooled session instead of creating a new ClientSession per request, and wires explicit shutdown cleanup into CLI entrypoints (remote.py, node_resolver.resolve_sync, and workflow runner) to avoid unclosed-session leaks. Adds unit coverage for session creation/reuse, loop changes, and cleanup behavior in test_auth_session_manager.py.

Written by Cursor Bugbot for commit ee9231f. This will update automatically on new commits. Configure here.

Co-authored-by: Sandi Fatic <chefsale@users.noreply.github.com>
@cursor
Copy link

cursor bot commented Feb 4, 2026

Cursor Agent can help with this pull request. Just @cursor in comments and I'll start working on changes in this branch.
Learn more about Cursor Agents

@chefsale chefsale changed the title Client connection pooling refactor: Add connection pooling for HTTP clients Feb 4, 2026
@chefsale chefsale marked this pull request as ready for review February 4, 2026 16:49
- Add asyncio.Lock to SessionManager.get_session() to prevent race conditions
  when multiple coroutines try to create sessions concurrently
- Add asyncio.Lock to SessionManager.close() for thread-safe cleanup
- Move _session from class variable to instance variable for proper isolation
- Add double-check pattern after acquiring lock for efficiency
- Fix test file to properly clean up sys.modules mocks using fixtures
- Add test for concurrent session creation (test_get_session_thread_safe)
- Update docstrings to document thread-safety and cleanup responsibilities

Co-authored-by: Sandi Fatic <chefsale@users.noreply.github.com>
- Add event loop tracking to detect when loop changes and recreate resources
- Use DummyCookieJar to prevent cookies from leaking between node authentications
- Add _get_current_loop_id() and _is_same_loop() helper methods
- Add _create_session() method that creates session with disabled cookies
- Update close() to handle loop changes gracefully (clear without closing)
- Add tests for loop change detection and cookie isolation

Co-authored-by: Sandi Fatic <chefsale@users.noreply.github.com>
Copy link

@meroreviewer meroreviewer bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🤖 AI Code Reviewer

Reviewed by 3 agents | Quality score: 94% | Review time: 218.4s

🟡 3 warnings, 💡 2 suggestions, 📝 1 nitpicks. See inline comments.


🤖 Generated by AI Code Reviewer | Review ID: review-1b463572

# No running loop
return 0

def _is_same_loop(self) -> bool:
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🟡 Race condition in _ensure_lock() defeats lock protection

Two coroutines calling _ensure_lock() concurrently when _lock is None can each create separate Lock instances, causing one to use a stale lock while another overwrites _lock, defeating the synchronization in get_session().

Suggested fix:

Use a threading.Lock for the initial lock creation, or use a module-level lock created at import time, or initialize _lock in __init__ and recreate only the session/connector on loop change.

async def get_session(self) -> aiohttp.ClientSession:
"""Get or create the shared aiohttp.ClientSession.

This method is thread-safe and uses a lock to prevent race conditions
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🟡 Resource leak when event loop changes - connections not closed

When the event loop changes, the old session and connector are abandoned without calling close(), potentially leaking TCP connections and file descriptors.

Suggested fix:

Log a warning when orphaning resources, or attempt cleanup using asyncio.run_coroutine_threadsafe() to the original loop if available, or document this as expected behavior for CLI tools.

self._session = None
if self._connector is not None and not self._connector.closed:
await self._connector.close()
self._connector = None
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

💡 Singleton creation is not thread-safe

The get_shared_instance() method uses a check-then-create pattern without synchronization, which could create multiple instances if called concurrently from different threads.

Suggested fix:

Add a threading.Lock to protect the singleton creation, or document that the class should only be accessed from a single thread.

different node authentications, which could cause auth state leakage.
"""
return aiohttp.ClientSession(
connector=connector,
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🟡 Orphaned resources may cause unclosed connection warnings

When the event loop changes, the old session and connector are set to None without being closed first, which can cause 'Unclosed client session' warnings and potential file descriptor leaks.

Suggested fix:

Consider storing orphaned resources in a list and attempting to close them in a try/except block, or document this as expected behavior for cross-loop scenarios.


# Store original modules to restore later
_original_modules = {}

Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

💡 Module mocking approach is fragile

Direct manipulation of sys.modules in _mock_modules() and _restore_modules() can cause test isolation issues and makes the test setup hard to maintain.

Suggested fix:

Consider using pytest-mock's `mocker.patch.dict(sys.modules, {...})` or restructuring the code to make it more testable without module-level mocking.

@@ -236,42 +471,47 @@ async def authenticate(
auth_endpoint = f"{normalized_url}{AUTH_TOKEN_ENDPOINT}"
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

📝 Nit: Duplicated session acquisition pattern

The session acquisition logic (checking for injected session_manager vs. using global) is duplicated in both authenticate() and refresh() methods.

Suggested fix:

Extract to a private helper method like `async def _get_session(session_manager: Optional[SessionManager]) -> aiohttp.ClientSession`.

- Use threading.Lock for thread-safe singleton creation
- Use threading.Lock to protect asyncio.Lock creation (fixes race in _ensure_lock)
- Remove unsafe fast path in get_session() - always acquire lock
- Add atexit handler to warn about unclosed shared sessions
- Add ResourceWarning when event loop changes with unclosed session
- Extract _get_session() helper to reduce code duplication in auth methods
- Update tests for new lock attribute name (_async_lock)

Co-authored-by: Sandi Fatic <chefsale@users.noreply.github.com>
Copy link

@meroreviewer meroreviewer bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🤖 AI Code Reviewer

Reviewed by 3 agents | Quality score: 100% | Review time: 309.9s

🔴 1 critical, 💡 4 suggestions, 📝 1 nitpicks. See inline comments.


🤖 Generated by AI Code Reviewer | Review ID: review-1f24f0e0

Thread-safe: uses a threading.Lock to protect creation.
"""
current_loop_id = self._get_current_loop_id()
with self._sync_lock:
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🔴 Race condition: _get_async_lock() creates new locks without updating _loop_id

When the event loop changes, _get_async_lock() creates a new asyncio.Lock but doesn't update _loop_id, so subsequent callers will also create new locks, allowing multiple coroutines to hold different locks and enter the critical section simultaneously.

Suggested fix:

Update `self._loop_id = current_loop_id` inside the `if` block after creating the new lock.

def _is_same_loop(self) -> bool:
"""Check if we're running in the same event loop as when resources were created."""
if self._loop_id is None:
return False
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

💡 Dead code: _is_same_loop() method is unused

The _is_same_loop() method is defined but never called; the same logic is duplicated inline in get_session() and close() methods.

Suggested fix:

Either use this helper method in `get_session()` and `close()` to reduce duplication (DRY), or remove it entirely (YAGNI).

preventing cross-loop errors.

Returns:
The shared aiohttp.ClientSession instance with connection pooling.
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

💡 Orphaned session resources when event loop changes

When the event loop changes, the old session and connector are set to None without cleanup, orphaning the underlying TCP connections until GC runs.

Suggested fix:

This is partially unavoidable since cross-loop operations aren't safe, but consider logging the orphaned resource count for observability.

Returns:
The global SessionManager instance (creates one if needed).
"""
if cls._instance is None:
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

💡 Atexit handler warns but doesn't close session

The _cleanup_shared_session() atexit handler only emits a warning but doesn't attempt to close the session, leaving connections potentially leaked at process exit.

Suggested fix:

Consider attempting a synchronous close if an event loop is available, or document that callers must call `close_shared_session()` before exit.

lock = self._get_async_lock()

async with lock:
# Only close if we're in the same loop
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

💡 Context manager closes session which may surprise callers

The __aexit__ unconditionally calls close(), so using async with SessionManager() as context manager always closes the session; this differs from the shared-instance pattern where session persists.

Suggested fix:

Add a note in the class docstring clarifying that context-manager usage creates a short-lived session, while `get_shared_instance()` provides a long-lived one.

# Store original modules to restore later
_original_modules = {}


Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

📝 Nit: Module-level mocking at import time is fragile

Mocking sys.modules at import time and reloading modules can cause subtle test ordering issues and makes tests harder to maintain.

Suggested fix:

Consider isolating the `SessionManager` class or using dependency injection to make it more testable without module-level patching.

Copy link

@meroreviewer meroreviewer bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🤖 AI Code Reviewer

Reviewed by 3 agents | Quality score: 100% | Review time: 253.1s

🟡 2 warnings, 💡 2 suggestions, 📝 2 nitpicks. See inline comments.


🤖 Generated by AI Code Reviewer | Review ID: review-cf0b2c5d

if self._loop_id is not None and self._loop_id != current_loop_id:
# Event loop changed - old resources are orphaned
# Log warning about orphaned resources
if self._session is not None and not self._session.closed:
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🟡 Connector resource leak when session is closed externally

When self._session.closed is True but self._connector is still open, overwriting self._connector without closing it first leaks TCP connections.

Suggested fix:

Before creating a new connector, check if the existing connector is not closed and await its close(): `if self._connector is not None and not self._connector.closed: await self._connector.close()`


The session is automatically recreated if the event loop has changed,
preventing cross-loop errors.

Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

💡 Cross-loop resource orphaning silently leaks connections

When event loop changes, old session/connector references are cleared without closing, orphaning connections until GC; the warning helps but connections still leak.

Suggested fix:

Consider attempting a synchronous close or documenting that callers must explicitly close before switching event loops

self._session = None
self._connector = None

# Create session if needed
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

💡 Context manager closes session defeating pooling benefit

Using async with SessionManager() creates a new instance (not the singleton) and closes the session on exit, which defeats the purpose of connection pooling for that usage pattern.

Suggested fix:

Consider documenting that context manager usage is for one-off operations, or have `__aexit__` only close if not a shared instance.


# Store original modules to restore later
_original_modules = {}

Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

📝 Nit: Module mocking at import time is fragile

Directly manipulating sys.modules before importing the module under test is brittle and can break if module dependencies change or tests run in different orders.

Suggested fix:

Consider using `pytest-mock`'s `mocker.patch.dict(sys.modules, ...)` or restructuring the code to allow easier dependency injection for tests.


Returns:
The global SessionManager instance (creates one if needed).
"""
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

📝 Nit: atexit handler limitation should suggest mitigation

The docstring explains the handler can only warn, but doesn't mention that integrators should use run_with_shared_session_cleanup() or similar to ensure proper cleanup.

Suggested fix:

Add a brief note pointing to `run_with_shared_session_cleanup()` as the recommended cleanup pattern.

Copy link

@cursor cursor bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Cursor Bugbot has reviewed your changes and found 1 potential issue.

Bugbot Autofix is OFF. To automatically fix reported issues with Cloud Agents, enable Autofix in the Cursor dashboard.

skip_auth=skip_auth,
)
)
return loop.run_until_complete(coro)
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Shared coroutine reused after consumption in fallback path

Medium Severity

The coro coroutine object is created once and shared between two branches in the try/except RuntimeError block. If a running loop exists, the try branch submits coro to a thread via asyncio.run(coro), which consumes the coroutine. If that execution raises a RuntimeError (a common exception type), future.result() propagates it to the except RuntimeError handler, which then attempts loop.run_until_complete(coro) on the already-exhausted coroutine. This fails with "cannot reuse already awaited coroutine," masking the original error. The old code avoided this by creating a fresh coroutine in each branch.

Fix in Cursor Fix in Web

Copy link

@meroreviewer meroreviewer bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🤖 AI Code Reviewer

Reviewed by 3 agents | Quality score: 90% | Review time: 320.4s

🟡 3 warnings, 💡 2 suggestions, 📝 2 nitpicks. See inline comments.


🤖 Generated by AI Code Reviewer | Review ID: review-cd3c9866


def run_async(coro):
"""Run an async function in a new event loop."""
"""Run an async function and clean up shared auth sessions."""
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🟡 run_async will fail if called from async context

If get_running_loop() succeeds (meaning there's already a running loop), calling run_until_complete() on that loop will raise 'This event loop is already running'; while unlikely for CLI entry points, this is a latent bug.

Suggested fix:

Add a guard: if a loop is already running, either raise an explicit error or use `asyncio.run_coroutine_threadsafe()` to schedule on the running loop.

lock = self._get_async_lock()
async with lock:
# Check if we need to recreate due to event loop change
if (
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🟡 Resource leak when event loop changes

When the event loop changes, the old session and connector are dereferenced without being closed, orphaning those resources since they can't be closed from a different loop.

Suggested fix:

This is a known limitation with asyncio resources across loops; consider documenting this more prominently or tracking orphaned resources for debugging.

sys.modules[mod_name] = _original_modules[mod_name]
elif mod_name in sys.modules:
del sys.modules[mod_name]

Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

💡 Unit tests mock aiohttp but don't verify actual connection pooling

Tests verify mock interactions but don't confirm that TCPConnector actually pools connections or that sessions are reused across requests. Integration tests would increase confidence in the pooling behavior.

Suggested fix:

Consider adding an integration test that makes multiple requests and verifies connection reuse via connector statistics or response timing.


def run_async(coro):
"""Run an async function in a new event loop."""
"""Run an async function and clean up shared auth sessions."""
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🟡 Semantic change from get_event_loop to get_running_loop may cause unexpected behavior

Changing from get_event_loop() to get_running_loop() alters behavior: when an event loop is set but not running, the old code reused it while new code creates a fresh loop. If get_running_loop() succeeds (active loop exists), run_until_complete() will raise RuntimeError since you cannot nest event loop runs.

Suggested fix:

Either document this intentional semantic change, or handle the 'running loop' case explicitly with a clearer error message explaining that nested async contexts aren't supported.

self._pool_connections_per_host = pool_connections_per_host
self._keepalive_timeout = keepalive_timeout
self._connector: Optional[aiohttp.TCPConnector] = None
self._session: Optional[aiohttp.ClientSession] = None
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

💡 Loop ID of 0 when no running loop could cause subtle bugs

_get_current_loop_id() returns 0 when there's no running loop; if async methods are somehow invoked from contexts where this happens, the comparison logic in get_session() and close() may behave unexpectedly.

Suggested fix:

Return `None` instead of `0` for no-running-loop case and handle `None` explicitly in comparisons to make the intent clearer.


# All results should be the same session
for result in results:
assert result == mock_session
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

📝 Nit: Thread-safety test doesn't test actual threading

test_get_session_thread_safe uses asyncio.gather which runs concurrent coroutines in the same thread, not actual multi-threaded access; the test name is misleading.

Suggested fix:

Either rename to `test_get_session_concurrent_coroutines` or add a true multi-threaded test using `concurrent.futures.ThreadPoolExecutor`.


Returns:
The global SessionManager instance (creates one if needed).
"""
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

📝 Nit: Race condition when reading session state outside lock

After releasing _instance_lock, the code accesses instance._session which could change concurrently; however, this only affects a warning message at exit so impact is minimal.

Suggested fix:

Move the `instance._session` check inside the `with SessionManager._instance_lock:` block, or accept the benign race since it only affects a warning.

Copy link

@cursor cursor bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Cursor Bugbot has reviewed your changes and found 1 potential issue.

Bugbot Autofix is OFF. To automatically fix reported issues with Cloud Agents, enable Autofix in the Cursor dashboard.

skip_auth=skip_auth,
)
)
return loop.run_until_complete(coro)
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Shared coroutine reused after consumption in error path

Medium Severity

The resolve_sync method creates a single coro coroutine object and uses it in both branches of a try/except RuntimeError. If asyncio.get_running_loop() succeeds and asyncio.run(coro) in the thread pool executes but raises a RuntimeError (from any cause within self.resolve()), future.result() re-raises it. The except RuntimeError catches it, and then loop.run_until_complete(coro) is called on the already-consumed coroutine, which fails with "cannot reuse already awaited coroutine". The old code created separate coroutine objects in each branch, avoiding this issue.

Fix in Cursor Fix in Web

Copy link

@meroreviewer meroreviewer bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🤖 AI Code Reviewer

Reviewed by 3 agents | Quality score: 87% | Review time: 381.1s

💡 3 suggestions, 📝 2 nitpicks. See inline comments.


🤖 Generated by AI Code Reviewer | Review ID: review-87eace97

# Double-check after acquiring lock
if cls._instance is None:
cls._instance = cls()
return cls._instance
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

💡 Race window in close_shared_instance allows duplicate instances

Setting _instance = None before await instance.close() completes allows get_shared_instance() to create a new instance while the old one is still closing, potentially exceeding expected connection limits.

Suggested fix:

Consider keeping `_instance` set until close completes, or use a closing flag to prevent new instance creation during shutdown.

Important:
- For connection pooling benefits, use Option 1 (get_shared_session()).
- Context manager (Option 2) defeats pooling - use for isolated requests only.
- Do NOT use get_shared_instance() as a context manager - it will close
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

💡 Context manager implementation contradicts pooling goal

Implementing __aenter__/__aexit__ that closes the session on exit is confusing since the docstring explicitly warns this 'defeats pooling'; consider removing the context manager protocol or not closing on exit.

Suggested fix:

Either remove the context manager methods or have `__aexit__` be a no-op with clear docs that callers must explicitly close.

async def get_session(self) -> aiohttp.ClientSession:
"""Get or create the shared aiohttp.ClientSession.

This method is thread-safe and uses a lock to prevent race conditions
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

📝 Nit: get_session acquires lock even when session exists and is valid

The lock is always acquired in get_session() even when the session is valid and loop hasn't changed; for high-frequency calls, this adds unnecessary contention.

Suggested fix:

Consider a double-check pattern: first check without lock, then acquire lock only if session needs creation/recreation.

@@ -0,0 +1,411 @@
"""Unit tests for SessionManager connection pooling."""
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

📝 Nit: Aggressive module mocking may hide integration issues

Mocking many modules at import time (calimero_client_py, ed25519, etc.) makes tests fragile to import order changes and doesn't verify real integration.

Suggested fix:

Consider using pytest fixtures with more targeted patching, or add a separate integration test that imports the real modules.

instance = None
with cls._instance_lock:
if cls._instance is not None:
instance = cls._instance
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

💡 Atexit handler cannot actually clean up resources

The _cleanup_shared_session atexit handler only warns but cannot run async cleanup; this leaves the promise of cleanup unfulfilled and may confuse maintainers.

Suggested fix:

Document in the module docstring that callers MUST call `close_shared_session()` before exit, or attempt sync cleanup via `asyncio.run()` in the atexit handler.

Copy link

@cursor cursor bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Cursor Bugbot has reviewed your changes and found 1 potential issue.

Bugbot Autofix is OFF. To automatically fix reported issues with Cloud Agents, enable Autofix in the Cursor dashboard.

skip_auth=skip_auth,
)
)
return loop.run_until_complete(coro)
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Coroutine object reused across two execution branches

Medium Severity

The coro coroutine object is created once but referenced in both the try branch (asyncio.run(coro) via thread pool) and the except RuntimeError branch (loop.run_until_complete(coro)). A Python coroutine can only be awaited once. If the first branch begins executing coro and a RuntimeError propagates (e.g., from future.result() re-raising an error from inside asyncio.run), the except clause catches it and tries to run the already-consumed coroutine, which fails. The old code avoided this by creating a fresh self.resolve(...) coroutine independently in each branch.

Fix in Cursor Fix in Web

Copy link

@meroreviewer meroreviewer bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🤖 AI Code Reviewer

Reviewed by 3 agents | Quality score: 100% | Review time: 445.3s

🟡 2 warnings, 💡 4 suggestions. See inline comments.


🤖 Generated by AI Code Reviewer | Review ID: review-0dd419d0

@@ -39,13 +40,14 @@


def run_async(coro):
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🟡 run_async crashes if called from async context

If get_running_loop() succeeds (running inside an async context), the code proceeds to call run_until_complete() on an already-running loop, which raises RuntimeError: This event loop is already running.

Suggested fix:

Add explicit handling when a running loop is detected: either raise an error explaining this is a sync-only API, or use `asyncio.run_coroutine_threadsafe()` to safely dispatch to the running loop.


The session is automatically recreated if the event loop has changed,
preventing cross-loop errors.

Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

💡 Session and connector leaked when event loop changes

When the event loop changes, the old session and connector are set to None without being closed, causing resource leaks (file descriptors, connections). The warning helps but doesn't prevent the leak.

Suggested fix:

Consider tracking orphaned sessions and attempting cleanup from the original loop via `call_soon_threadsafe`, or document that callers must explicitly close before loop changes.


Thread-safe: uses a threading.Lock to protect creation.
When the event loop changes, creates a new lock for the new loop.
"""
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🟡 Race condition when async lock is recreated on loop change

When the event loop changes, _get_async_lock() creates a new lock. If concurrent coroutines from both old and new loops access the SessionManager, they use different locks, allowing races on _session and _connector.

Suggested fix:

Use a single threading.Lock for all critical sections instead of event-loop-specific async locks, or document that loop changes must not occur concurrently with session access.


async with lock:
# Only close if we're in the same loop
if (
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

💡 Context manager on SessionManager enables singleton misuse

Implementing __aenter__/__aexit__ that calls close() allows users to accidentally close the shared singleton via async with SessionManager.get_shared_instance(), defeating pooling for other callers despite the docstring warning.

Suggested fix:

Consider removing context manager support from SessionManager or raising an error in `__aexit__` if `self is SessionManager._instance`.

get_shared_session,
run_with_shared_session_cleanup,
)

Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

💡 Nit: Test fixture leaks sessions by only nulling references

The cleanup_session_manager fixture sets _instance = None without closing any open session, potentially leaking connections across test runs.

Suggested fix:

Call `await SessionManager.close_shared_instance()` in an async cleanup, or at minimum call `instance.close()` if `_instance` had an unclosed session before resetting.

"""Test get_session creates a new session when none exists."""
manager = SessionManager()

mock_connector = MagicMock()
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

💡 Tests coupled to internal method names

Tests patch private methods like _create_connector and _create_session, which couples tests to implementation details and will break if those methods are renamed or refactored.

Suggested fix:

Consider testing through the public API by patching `aiohttp.TCPConnector` and `aiohttp.ClientSession` at the module level instead.

Copy link

@cursor cursor bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Cursor Bugbot has reviewed your changes and found 1 potential issue.

Bugbot Autofix is OFF. To automatically fix reported issues with Cloud Agents, enable Autofix in the Cursor dashboard.

skip_auth=skip_auth,
)
)
return loop.run_until_complete(coro)
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Coroutine reuse after consumption causes crash on error

Medium Severity

The coro object is created once and shared across both the try and except RuntimeError branches. When called from an async context (which happens — e.g., from async def execute in step classes), get_running_loop() succeeds and coro is consumed by asyncio.run in the thread pool. If future.result() then raises a RuntimeError (from the resolve logic itself), the except block catches it and attempts loop.run_until_complete(coro) on the already-consumed coroutine, resulting in RuntimeError: cannot reuse already awaited coroutine. The old code avoided this by creating separate coroutine instances in each branch.

Fix in Cursor Fix in Web

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

2 participants