Skip to content

Commit cdc2cf0

Browse files
committed
Only join executors on async
1 parent 5a1e7cb commit cdc2cf0

File tree

7 files changed

+14
-16
lines changed

7 files changed

+14
-16
lines changed

pymongo/asynchronous/mongo_client.py

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1559,14 +1559,14 @@ async def close(self) -> None:
15591559
# Stop the periodic task thread and then send pending killCursor
15601560
# requests before closing the topology.
15611561
self._kill_cursors_executor.close()
1562+
if not _IS_SYNC:
1563+
await self._kill_cursors_executor.join()
15621564
await self._process_kill_cursors()
15631565
await self._topology.close()
15641566
if self._encrypter:
15651567
# TODO: PYTHON-1921 Encrypted MongoClients cannot be re-opened.
15661568
await self._encrypter.close()
15671569
self._closed = True
1568-
# Yield to the asyncio event loop so all executor tasks properly exit after cancellation
1569-
await asyncio.sleep(0)
15701570

15711571
if not _IS_SYNC:
15721572
# Add support for contextlib.aclosing.

pymongo/asynchronous/monitor.py

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -191,6 +191,8 @@ def gc_safe_close(self) -> None:
191191

192192
async def close(self) -> None:
193193
self.gc_safe_close()
194+
if not _IS_SYNC:
195+
await self._executor.join()
194196
await self._rtt_monitor.close()
195197
# Increment the generation and maybe close the socket. If the executor
196198
# thread has the socket checked out, it will be closed when checked in.
@@ -458,6 +460,8 @@ def __init__(self, topology: Topology, topology_settings: TopologySettings, pool
458460

459461
async def close(self) -> None:
460462
self.gc_safe_close()
463+
if not _IS_SYNC:
464+
await self._executor.join()
461465
# Increment the generation and maybe close the socket. If the executor
462466
# thread has the socket checked out, it will be closed when checked in.
463467
await self._pool.reset()

pymongo/periodic_executor.py

Lines changed: 2 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -75,19 +75,12 @@ def close(self, dummy: Any = None) -> None:
7575
callback; see monitor.py.
7676
"""
7777
self._stopped = True
78-
if self._task:
78+
if self._task is not None:
7979
self._task.cancel()
8080

8181
async def join(self, timeout: Optional[int] = None) -> None:
8282
if self._task is not None:
83-
try:
84-
await asyncio.wait_for(self._task, timeout=timeout) # type-ignore: [arg-type]
85-
except asyncio.TimeoutError:
86-
# Task timed out
87-
pass
88-
except asyncio.exceptions.CancelledError:
89-
# Task was already finished, or not yet started.
90-
raise
83+
await asyncio.wait([self._task], timeout=timeout) # type-ignore: [arg-type]
9184

9285
def wake(self) -> None:
9386
"""Execute the target function soon."""

pymongo/synchronous/mongo_client.py

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1553,13 +1553,14 @@ def close(self) -> None:
15531553
# Stop the periodic task thread and then send pending killCursor
15541554
# requests before closing the topology.
15551555
self._kill_cursors_executor.close()
1556+
if not _IS_SYNC:
1557+
self._kill_cursors_executor.join()
15561558
self._process_kill_cursors()
15571559
self._topology.close()
15581560
if self._encrypter:
15591561
# TODO: PYTHON-1921 Encrypted MongoClients cannot be re-opened.
15601562
self._encrypter.close()
15611563
self._closed = True
1562-
# Yield to the asyncio event loop so all executor tasks properly exit after cancellation
15631564

15641565
if not _IS_SYNC:
15651566
# Add support for contextlib.closing.

pymongo/synchronous/monitor.py

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -191,6 +191,8 @@ def gc_safe_close(self) -> None:
191191

192192
def close(self) -> None:
193193
self.gc_safe_close()
194+
if not _IS_SYNC:
195+
self._executor.join()
194196
self._rtt_monitor.close()
195197
# Increment the generation and maybe close the socket. If the executor
196198
# thread has the socket checked out, it will be closed when checked in.
@@ -458,6 +460,8 @@ def __init__(self, topology: Topology, topology_settings: TopologySettings, pool
458460

459461
def close(self) -> None:
460462
self.gc_safe_close()
463+
if not _IS_SYNC:
464+
self._executor.join()
461465
# Increment the generation and maybe close the socket. If the executor
462466
# thread has the socket checked out, it will be closed when checked in.
463467
self._pool.reset()

test/__init__.py

Lines changed: 0 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1136,8 +1136,6 @@ class IntegrationTest(PyMongoTestCase):
11361136

11371137
@client_context.require_connection
11381138
def setUp(self) -> None:
1139-
if not _IS_SYNC:
1140-
reset_client_context()
11411139
if client_context.load_balancer and not getattr(self, "RUN_ON_LOAD_BALANCER", False):
11421140
raise SkipTest("this test does not support load balancers")
11431141
if client_context.serverless and not getattr(self, "RUN_ON_SERVERLESS", False):

test/asynchronous/__init__.py

Lines changed: 0 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1154,8 +1154,6 @@ class AsyncIntegrationTest(AsyncPyMongoTestCase):
11541154

11551155
@async_client_context.require_connection
11561156
async def asyncSetUp(self) -> None:
1157-
if not _IS_SYNC:
1158-
await reset_client_context()
11591157
if async_client_context.load_balancer and not getattr(self, "RUN_ON_LOAD_BALANCER", False):
11601158
raise SkipTest("this test does not support load balancers")
11611159
if async_client_context.serverless and not getattr(self, "RUN_ON_SERVERLESS", False):

0 commit comments

Comments
 (0)