Skip to content

Commit ee18313

Browse files
authored
PYTHON-4725 Async client should use tasks for SDAM instead of threads (#1896)
1 parent c2338d8 commit ee18313

24 files changed

+228
-314
lines changed

pymongo/asynchronous/mongo_client.py

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -59,8 +59,8 @@
5959

6060
from bson.codec_options import DEFAULT_CODEC_OPTIONS, CodecOptions, TypeRegistry
6161
from bson.timestamp import Timestamp
62-
from pymongo import _csot, common, helpers_shared, uri_parser
63-
from pymongo.asynchronous import client_session, database, periodic_executor
62+
from pymongo import _csot, common, helpers_shared, periodic_executor, uri_parser
63+
from pymongo.asynchronous import client_session, database
6464
from pymongo.asynchronous.change_stream import AsyncChangeStream, AsyncClusterChangeStream
6565
from pymongo.asynchronous.client_bulk import _AsyncClientBulk
6666
from pymongo.asynchronous.client_session import _EmptyServerSession
@@ -908,7 +908,7 @@ async def target() -> bool:
908908
await AsyncMongoClient._process_periodic_tasks(client)
909909
return True
910910

911-
executor = periodic_executor.PeriodicExecutor(
911+
executor = periodic_executor.AsyncPeriodicExecutor(
912912
interval=common.KILL_CURSOR_FREQUENCY,
913913
min_interval=common.MIN_HEARTBEAT_INTERVAL,
914914
target=target,

pymongo/asynchronous/monitor.py

Lines changed: 10 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -22,14 +22,13 @@
2222
import weakref
2323
from typing import TYPE_CHECKING, Any, Mapping, Optional, cast
2424

25-
from pymongo import common
25+
from pymongo import common, periodic_executor
2626
from pymongo._csot import MovingMinimum
27-
from pymongo.asynchronous import periodic_executor
28-
from pymongo.asynchronous.periodic_executor import _shutdown_executors
2927
from pymongo.errors import NetworkTimeout, NotPrimaryError, OperationFailure, _OperationCancelled
3028
from pymongo.hello import Hello
3129
from pymongo.lock import _create_lock
3230
from pymongo.logger import _SDAM_LOGGER, _debug_log, _SDAMStatusMessage
31+
from pymongo.periodic_executor import _shutdown_executors
3332
from pymongo.pool_options import _is_faas
3433
from pymongo.read_preferences import MovingAverage
3534
from pymongo.server_description import ServerDescription
@@ -76,7 +75,7 @@ async def target() -> bool:
7675
await monitor._run() # type:ignore[attr-defined]
7776
return True
7877

79-
executor = periodic_executor.PeriodicExecutor(
78+
executor = periodic_executor.AsyncPeriodicExecutor(
8079
interval=interval, min_interval=min_interval, target=target, name=name
8180
)
8281

@@ -112,9 +111,9 @@ async def close(self) -> None:
112111
"""
113112
self.gc_safe_close()
114113

115-
def join(self, timeout: Optional[int] = None) -> None:
114+
async def join(self, timeout: Optional[int] = None) -> None:
116115
"""Wait for the monitor to stop."""
117-
self._executor.join(timeout)
116+
await self._executor.join(timeout)
118117

119118
def request_check(self) -> None:
120119
"""If the monitor is sleeping, wake it soon."""
@@ -139,7 +138,7 @@ def __init__(
139138
"""
140139
super().__init__(
141140
topology,
142-
"pymongo_server_monitor_thread",
141+
"pymongo_server_monitor_task",
143142
topology_settings.heartbeat_frequency,
144143
common.MIN_HEARTBEAT_INTERVAL,
145144
)
@@ -250,7 +249,7 @@ async def _check_server(self) -> ServerDescription:
250249
except (OperationFailure, NotPrimaryError) as exc:
251250
# Update max cluster time even when hello fails.
252251
details = cast(Mapping[str, Any], exc.details)
253-
self._topology.receive_cluster_time(details.get("$clusterTime"))
252+
await self._topology.receive_cluster_time(details.get("$clusterTime"))
254253
raise
255254
except ReferenceError:
256255
raise
@@ -434,7 +433,7 @@ def __init__(self, topology: Topology, topology_settings: TopologySettings, pool
434433
"""
435434
super().__init__(
436435
topology,
437-
"pymongo_server_rtt_thread",
436+
"pymongo_server_rtt_task",
438437
topology_settings.heartbeat_frequency,
439438
common.MIN_HEARTBEAT_INTERVAL,
440439
)
@@ -531,4 +530,5 @@ def _shutdown_resources() -> None:
531530
shutdown()
532531

533532

534-
atexit.register(_shutdown_resources)
533+
if _IS_SYNC:
534+
atexit.register(_shutdown_resources)

pymongo/asynchronous/periodic_executor.py

Lines changed: 0 additions & 219 deletions
This file was deleted.

pymongo/asynchronous/topology.py

Lines changed: 3 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -27,8 +27,7 @@
2727
from pathlib import Path
2828
from typing import TYPE_CHECKING, Any, Callable, Mapping, Optional, cast
2929

30-
from pymongo import _csot, common, helpers_shared
31-
from pymongo.asynchronous import periodic_executor
30+
from pymongo import _csot, common, helpers_shared, periodic_executor
3231
from pymongo.asynchronous.client_session import _ServerSession, _ServerSessionPool
3332
from pymongo.asynchronous.monitor import SrvMonitor
3433
from pymongo.asynchronous.pool import Pool
@@ -185,7 +184,7 @@ def __init__(self, topology_settings: TopologySettings):
185184
async def target() -> bool:
186185
return process_events_queue(weak)
187186

188-
executor = periodic_executor.PeriodicExecutor(
187+
executor = periodic_executor.AsyncPeriodicExecutor(
189188
interval=common.EVENTS_QUEUE_FREQUENCY,
190189
min_interval=common.MIN_HEARTBEAT_INTERVAL,
191190
target=target,
@@ -742,7 +741,7 @@ async def close(self) -> None:
742741
if self._publish_server or self._publish_tp:
743742
# Make sure the events executor thread is fully closed before publishing the remaining events
744743
self.__events_executor.close()
745-
self.__events_executor.join(1)
744+
await self.__events_executor.join(1)
746745
process_events_queue(weakref.ref(self._events)) # type: ignore[arg-type]
747746

748747
@property

0 commit comments

Comments
 (0)