diff --git a/pymongo/asynchronous/pool.py b/pymongo/asynchronous/pool.py index 18644cf7de..a67cc5f3c8 100644 --- a/pymongo/asynchronous/pool.py +++ b/pymongo/asynchronous/pool.py @@ -931,13 +931,15 @@ async def remove_stale_sockets(self, reference_generation: int) -> None: return if self.opts.max_idle_time_seconds is not None: + close_conns = [] async with self.lock: while ( self.conns and self.conns[-1].idle_time_seconds() > self.opts.max_idle_time_seconds ): - conn = self.conns.pop() - await conn.close_conn(ConnectionClosedReason.IDLE) + close_conns.append(self.conns.pop()) + for conn in close_conns: + await conn.close_conn(ConnectionClosedReason.IDLE) while True: async with self.size_cond: @@ -957,14 +959,18 @@ async def remove_stale_sockets(self, reference_generation: int) -> None: self._pending += 1 incremented = True conn = await self.connect() + close_conn = False async with self.lock: # Close connection and return if the pool was reset during # socket creation or while acquiring the pool lock. if self.gen.get_overall() != reference_generation: - await conn.close_conn(ConnectionClosedReason.STALE) - return - self.conns.appendleft(conn) - self.active_contexts.discard(conn.cancel_context) + close_conn = True + if not close_conn: + self.conns.appendleft(conn) + self.active_contexts.discard(conn.cancel_context) + if close_conn: + await conn.close_conn(ConnectionClosedReason.STALE) + return finally: if incremented: # Notify after adding the socket to the pool. @@ -1343,17 +1349,20 @@ async def checkin(self, conn: AsyncConnection) -> None: error=ConnectionClosedReason.ERROR, ) else: + close_conn = False async with self.lock: # Hold the lock to ensure this section does not race with # Pool.reset(). if self.stale_generation(conn.generation, conn.service_id): - await conn.close_conn(ConnectionClosedReason.STALE) + close_conn = True else: conn.update_last_checkin_time() conn.update_is_writable(bool(self.is_writable)) self.conns.appendleft(conn) # Notify any threads waiting to create a connection. self._max_connecting_cond.notify() + if close_conn: + await conn.close_conn(ConnectionClosedReason.STALE) async with self.size_cond: if txn: diff --git a/pymongo/asynchronous/topology.py b/pymongo/asynchronous/topology.py index 9de069af7e..b315cc33b7 100644 --- a/pymongo/asynchronous/topology.py +++ b/pymongo/asynchronous/topology.py @@ -244,8 +244,6 @@ async def open(self) -> None: # Close servers and clear the pools. for server in self._servers.values(): await server.close() - if not _IS_SYNC: - self._monitor_tasks.append(server._monitor) # Reset the session pool to avoid duplicate sessions in # the child process. self._session_pool.reset() diff --git a/pymongo/synchronous/pool.py b/pymongo/synchronous/pool.py index 1151776b94..224834af31 100644 --- a/pymongo/synchronous/pool.py +++ b/pymongo/synchronous/pool.py @@ -927,13 +927,15 @@ def remove_stale_sockets(self, reference_generation: int) -> None: return if self.opts.max_idle_time_seconds is not None: + close_conns = [] with self.lock: while ( self.conns and self.conns[-1].idle_time_seconds() > self.opts.max_idle_time_seconds ): - conn = self.conns.pop() - conn.close_conn(ConnectionClosedReason.IDLE) + close_conns.append(self.conns.pop()) + for conn in close_conns: + conn.close_conn(ConnectionClosedReason.IDLE) while True: with self.size_cond: @@ -953,14 +955,18 @@ def remove_stale_sockets(self, reference_generation: int) -> None: self._pending += 1 incremented = True conn = self.connect() + close_conn = False with self.lock: # Close connection and return if the pool was reset during # socket creation or while acquiring the pool lock. if self.gen.get_overall() != reference_generation: - conn.close_conn(ConnectionClosedReason.STALE) - return - self.conns.appendleft(conn) - self.active_contexts.discard(conn.cancel_context) + close_conn = True + if not close_conn: + self.conns.appendleft(conn) + self.active_contexts.discard(conn.cancel_context) + if close_conn: + conn.close_conn(ConnectionClosedReason.STALE) + return finally: if incremented: # Notify after adding the socket to the pool. @@ -1339,17 +1345,20 @@ def checkin(self, conn: Connection) -> None: error=ConnectionClosedReason.ERROR, ) else: + close_conn = False with self.lock: # Hold the lock to ensure this section does not race with # Pool.reset(). if self.stale_generation(conn.generation, conn.service_id): - conn.close_conn(ConnectionClosedReason.STALE) + close_conn = True else: conn.update_last_checkin_time() conn.update_is_writable(bool(self.is_writable)) self.conns.appendleft(conn) # Notify any threads waiting to create a connection. self._max_connecting_cond.notify() + if close_conn: + conn.close_conn(ConnectionClosedReason.STALE) with self.size_cond: if txn: diff --git a/pymongo/synchronous/topology.py b/pymongo/synchronous/topology.py index bccc8a2eb7..7df475b4c8 100644 --- a/pymongo/synchronous/topology.py +++ b/pymongo/synchronous/topology.py @@ -244,8 +244,6 @@ def open(self) -> None: # Close servers and clear the pools. for server in self._servers.values(): server.close() - if not _IS_SYNC: - self._monitor_tasks.append(server._monitor) # Reset the session pool to avoid duplicate sessions in # the child process. self._session_pool.reset()