From 0f6cd07dbd517f199c63e323dc2ad0ac10b639a8 Mon Sep 17 00:00:00 2001 From: Noah Stapp Date: Tue, 1 Apr 2025 10:12:06 -0400 Subject: [PATCH 1/2] PYTHON-5219 - Avoid awaiting coroutines when holding locks --- pymongo/asynchronous/pool.py | 23 ++++++++++++++++------- pymongo/asynchronous/topology.py | 6 +++++- pymongo/synchronous/pool.py | 23 ++++++++++++++++------- pymongo/synchronous/topology.py | 6 +++++- 4 files changed, 42 insertions(+), 16 deletions(-) diff --git a/pymongo/asynchronous/pool.py b/pymongo/asynchronous/pool.py index 6ebdb5cb20..ddf31e7139 100644 --- a/pymongo/asynchronous/pool.py +++ b/pymongo/asynchronous/pool.py @@ -931,13 +931,15 @@ async def remove_stale_sockets(self, reference_generation: int) -> None: return if self.opts.max_idle_time_seconds is not None: + close_conns = [] async with self.lock: while ( self.conns and self.conns[-1].idle_time_seconds() > self.opts.max_idle_time_seconds ): - conn = self.conns.pop() - await conn.close_conn(ConnectionClosedReason.IDLE) + close_conns.append(self.conns.pop()) + for conn in close_conns: + await conn.close_conn(ConnectionClosedReason.IDLE) while True: async with self.size_cond: @@ -957,14 +959,18 @@ async def remove_stale_sockets(self, reference_generation: int) -> None: self._pending += 1 incremented = True conn = await self.connect() + close_conn = False async with self.lock: # Close connection and return if the pool was reset during # socket creation or while acquiring the pool lock. if self.gen.get_overall() != reference_generation: - await conn.close_conn(ConnectionClosedReason.STALE) - return - self.conns.appendleft(conn) - self.active_contexts.discard(conn.cancel_context) + close_conn = True + if not close_conn: + self.conns.appendleft(conn) + self.active_contexts.discard(conn.cancel_context) + if close_conn: + await conn.close_conn(ConnectionClosedReason.STALE) + return finally: if incremented: # Notify after adding the socket to the pool. @@ -1343,17 +1349,20 @@ async def checkin(self, conn: AsyncConnection) -> None: error=ConnectionClosedReason.ERROR, ) else: + close_conn = False async with self.lock: # Hold the lock to ensure this section does not race with # Pool.reset(). if self.stale_generation(conn.generation, conn.service_id): - await conn.close_conn(ConnectionClosedReason.STALE) + close_conn = True else: conn.update_last_checkin_time() conn.update_is_writable(bool(self.is_writable)) self.conns.appendleft(conn) # Notify any threads waiting to create a connection. self._max_connecting_cond.notify() + if close_conn: + await conn.close_conn(ConnectionClosedReason.STALE) async with self.size_cond: if txn: diff --git a/pymongo/asynchronous/topology.py b/pymongo/asynchronous/topology.py index d83ceca55b..2e2a910ddf 100644 --- a/pymongo/asynchronous/topology.py +++ b/pymongo/asynchronous/topology.py @@ -240,16 +240,20 @@ async def open(self) -> None: "https://dochub.mongodb.org/core/pymongo-fork-deadlock", **kwargs, ) + close_servers = [] async with self._lock: # Close servers and clear the pools. for server in self._servers.values(): - await server.close() + close_servers.append(server) if not _IS_SYNC: self._monitor_tasks.append(server._monitor) # Reset the session pool to avoid duplicate sessions in # the child process. self._session_pool.reset() + for server in close_servers: + await server.close() + async with self._lock: await self._ensure_opened() diff --git a/pymongo/synchronous/pool.py b/pymongo/synchronous/pool.py index 6a302e2728..2fcf602057 100644 --- a/pymongo/synchronous/pool.py +++ b/pymongo/synchronous/pool.py @@ -927,13 +927,15 @@ def remove_stale_sockets(self, reference_generation: int) -> None: return if self.opts.max_idle_time_seconds is not None: + close_conns = [] with self.lock: while ( self.conns and self.conns[-1].idle_time_seconds() > self.opts.max_idle_time_seconds ): - conn = self.conns.pop() - conn.close_conn(ConnectionClosedReason.IDLE) + close_conns.append(self.conns.pop()) + for conn in close_conns: + conn.close_conn(ConnectionClosedReason.IDLE) while True: with self.size_cond: @@ -953,14 +955,18 @@ def remove_stale_sockets(self, reference_generation: int) -> None: self._pending += 1 incremented = True conn = self.connect() + close_conn = False with self.lock: # Close connection and return if the pool was reset during # socket creation or while acquiring the pool lock. if self.gen.get_overall() != reference_generation: - conn.close_conn(ConnectionClosedReason.STALE) - return - self.conns.appendleft(conn) - self.active_contexts.discard(conn.cancel_context) + close_conn = True + if not close_conn: + self.conns.appendleft(conn) + self.active_contexts.discard(conn.cancel_context) + if close_conn: + conn.close_conn(ConnectionClosedReason.STALE) + return finally: if incremented: # Notify after adding the socket to the pool. @@ -1339,17 +1345,20 @@ def checkin(self, conn: Connection) -> None: error=ConnectionClosedReason.ERROR, ) else: + close_conn = False with self.lock: # Hold the lock to ensure this section does not race with # Pool.reset(). if self.stale_generation(conn.generation, conn.service_id): - conn.close_conn(ConnectionClosedReason.STALE) + close_conn = True else: conn.update_last_checkin_time() conn.update_is_writable(bool(self.is_writable)) self.conns.appendleft(conn) # Notify any threads waiting to create a connection. self._max_connecting_cond.notify() + if close_conn: + conn.close_conn(ConnectionClosedReason.STALE) with self.size_cond: if txn: diff --git a/pymongo/synchronous/topology.py b/pymongo/synchronous/topology.py index bf9011830d..e226983f00 100644 --- a/pymongo/synchronous/topology.py +++ b/pymongo/synchronous/topology.py @@ -240,16 +240,20 @@ def open(self) -> None: "https://dochub.mongodb.org/core/pymongo-fork-deadlock", **kwargs, ) + close_servers = [] with self._lock: # Close servers and clear the pools. for server in self._servers.values(): - server.close() + close_servers.append(server) if not _IS_SYNC: self._monitor_tasks.append(server._monitor) # Reset the session pool to avoid duplicate sessions in # the child process. self._session_pool.reset() + for server in close_servers: + server.close() + with self._lock: self._ensure_opened() From d5e7290d916c1aeea214875a28dd666a2dc5dc25 Mon Sep 17 00:00:00 2001 From: Noah Stapp Date: Wed, 2 Apr 2025 14:44:29 -0400 Subject: [PATCH 2/2] Revert topology changes --- pymongo/asynchronous/topology.py | 8 +------- pymongo/synchronous/topology.py | 8 +------- 2 files changed, 2 insertions(+), 14 deletions(-) diff --git a/pymongo/asynchronous/topology.py b/pymongo/asynchronous/topology.py index 8425510f6b..b315cc33b7 100644 --- a/pymongo/asynchronous/topology.py +++ b/pymongo/asynchronous/topology.py @@ -240,20 +240,14 @@ async def open(self) -> None: "https://dochub.mongodb.org/core/pymongo-fork-deadlock", **kwargs, ) - close_servers = [] async with self._lock: # Close servers and clear the pools. for server in self._servers.values(): - close_servers.append(server) - if not _IS_SYNC: - self._monitor_tasks.append(server._monitor) + await server.close() # Reset the session pool to avoid duplicate sessions in # the child process. self._session_pool.reset() - for server in close_servers: - await server.close() - async with self._lock: await self._ensure_opened() diff --git a/pymongo/synchronous/topology.py b/pymongo/synchronous/topology.py index f79e463eb0..7df475b4c8 100644 --- a/pymongo/synchronous/topology.py +++ b/pymongo/synchronous/topology.py @@ -240,20 +240,14 @@ def open(self) -> None: "https://dochub.mongodb.org/core/pymongo-fork-deadlock", **kwargs, ) - close_servers = [] with self._lock: # Close servers and clear the pools. for server in self._servers.values(): - close_servers.append(server) - if not _IS_SYNC: - self._monitor_tasks.append(server._monitor) + server.close() # Reset the session pool to avoid duplicate sessions in # the child process. self._session_pool.reset() - for server in close_servers: - server.close() - with self._lock: self._ensure_opened()