Skip to content

Commit f2d4b1c

Browse files
committed
PYTHON-4860 - Async client should use asyncio.Lock and asyncio.Condition
1 parent ee18313 commit f2d4b1c

36 files changed

+635
-834
lines changed

pymongo/asynchronous/cursor.py

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -45,7 +45,7 @@
4545
)
4646
from pymongo.cursor_shared import _CURSOR_CLOSED_ERRORS, _QUERY_OPTIONS, CursorType, _Hint, _Sort
4747
from pymongo.errors import ConnectionFailure, InvalidOperation, OperationFailure
48-
from pymongo.lock import _ALock, _create_lock
48+
from pymongo.lock import _async_create_lock
4949
from pymongo.message import (
5050
_CursorAddress,
5151
_GetMore,
@@ -77,7 +77,7 @@ class _ConnectionManager:
7777
def __init__(self, conn: AsyncConnection, more_to_come: bool):
7878
self.conn: Optional[AsyncConnection] = conn
7979
self.more_to_come = more_to_come
80-
self._alock = _ALock(_create_lock())
80+
self._lock = _async_create_lock()
8181

8282
def update_exhaust(self, more_to_come: bool) -> None:
8383
self.more_to_come = more_to_come

pymongo/asynchronous/mongo_client.py

Lines changed: 8 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -82,7 +82,11 @@
8282
WaitQueueTimeoutError,
8383
WriteConcernError,
8484
)
85-
from pymongo.lock import _HAS_REGISTER_AT_FORK, _ALock, _create_lock, _release_locks
85+
from pymongo.lock import (
86+
_HAS_REGISTER_AT_FORK,
87+
_async_create_lock,
88+
_release_locks,
89+
)
8690
from pymongo.logger import _CLIENT_LOGGER, _log_or_warn
8791
from pymongo.message import _CursorAddress, _GetMore, _Query
8892
from pymongo.monitoring import ConnectionClosedReason
@@ -842,7 +846,7 @@ def __init__(
842846
self._options = options = ClientOptions(username, password, dbase, opts, _IS_SYNC)
843847

844848
self._default_database_name = dbase
845-
self._lock = _ALock(_create_lock())
849+
self._lock = _async_create_lock()
846850
self._kill_cursors_queue: list = []
847851

848852
self._event_listeners = options.pool_options._event_listeners
@@ -1728,7 +1732,7 @@ async def _run_operation(
17281732
address=address,
17291733
)
17301734

1731-
async with operation.conn_mgr._alock:
1735+
async with operation.conn_mgr._lock:
17321736
async with _MongoClientErrorHandler(self, server, operation.session) as err_handler: # type: ignore[arg-type]
17331737
err_handler.contribute_socket(operation.conn_mgr.conn)
17341738
return await server.run_operation(
@@ -1976,7 +1980,7 @@ async def _close_cursor_now(
19761980

19771981
try:
19781982
if conn_mgr:
1979-
async with conn_mgr._alock:
1983+
async with conn_mgr._lock:
19801984
# Cursor is pinned to LB outside of a transaction.
19811985
assert address is not None
19821986
assert conn_mgr.conn is not None

pymongo/asynchronous/pool.py

Lines changed: 12 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -65,7 +65,11 @@
6565
_CertificateError,
6666
)
6767
from pymongo.hello import Hello, HelloCompat
68-
from pymongo.lock import _ACondition, _ALock, _create_lock
68+
from pymongo.lock import (
69+
_async_cond_wait,
70+
_async_create_condition,
71+
_async_create_lock,
72+
)
6973
from pymongo.logger import (
7074
_CONNECTION_LOGGER,
7175
_ConnectionStatusMessage,
@@ -208,11 +212,6 @@ def _raise_connection_failure(
208212
raise AutoReconnect(msg) from error
209213

210214

211-
async def _cond_wait(condition: _ACondition, deadline: Optional[float]) -> bool:
212-
timeout = deadline - time.monotonic() if deadline else None
213-
return await condition.wait(timeout)
214-
215-
216215
def _get_timeout_details(options: PoolOptions) -> dict[str, float]:
217216
details = {}
218217
timeout = _csot.get_timeout()
@@ -992,8 +991,9 @@ def __init__(
992991
# from the right side.
993992
self.conns: collections.deque = collections.deque()
994993
self.active_contexts: set[_CancellationContext] = set()
995-
_lock = _create_lock()
996-
self.lock = _ALock(_lock)
994+
self.lock = _async_create_lock()
995+
self.size_cond = _async_create_condition(self.lock, threading.Condition)
996+
self._max_connecting_cond = _async_create_condition(self.lock, threading.Condition)
997997
self.active_sockets = 0
998998
# Monotonically increasing connection ID required for CMAP Events.
999999
self.next_connection_id = 1
@@ -1019,15 +1019,13 @@ def __init__(
10191019
# The first portion of the wait queue.
10201020
# Enforces: maxPoolSize
10211021
# Also used for: clearing the wait queue
1022-
self.size_cond = _ACondition(threading.Condition(_lock))
10231022
self.requests = 0
10241023
self.max_pool_size = self.opts.max_pool_size
10251024
if not self.max_pool_size:
10261025
self.max_pool_size = float("inf")
10271026
# The second portion of the wait queue.
10281027
# Enforces: maxConnecting
10291028
# Also used for: clearing the wait queue
1030-
self._max_connecting_cond = _ACondition(threading.Condition(_lock))
10311029
self._max_connecting = self.opts.max_connecting
10321030
self._pending = 0
10331031
self._client_id = client_id
@@ -1456,7 +1454,8 @@ async def _get_conn(
14561454
async with self.size_cond:
14571455
self._raise_if_not_ready(checkout_started_time, emit_event=True)
14581456
while not (self.requests < self.max_pool_size):
1459-
if not await _cond_wait(self.size_cond, deadline):
1457+
timeout = deadline - time.monotonic() if deadline else None
1458+
if not await _async_cond_wait(self.size_cond, timeout):
14601459
# Timed out, notify the next thread to ensure a
14611460
# timeout doesn't consume the condition.
14621461
if self.requests < self.max_pool_size:
@@ -1479,7 +1478,8 @@ async def _get_conn(
14791478
async with self._max_connecting_cond:
14801479
self._raise_if_not_ready(checkout_started_time, emit_event=False)
14811480
while not (self.conns or self._pending < self._max_connecting):
1482-
if not await _cond_wait(self._max_connecting_cond, deadline):
1481+
timeout = deadline - time.monotonic() if deadline else None
1482+
if not await _async_cond_wait(self._max_connecting_cond, timeout):
14831483
# Timed out, notify the next thread to ensure a
14841484
# timeout doesn't consume the condition.
14851485
if self.conns or self._pending < self._max_connecting:

pymongo/asynchronous/topology.py

Lines changed: 9 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -43,7 +43,11 @@
4343
WriteError,
4444
)
4545
from pymongo.hello import Hello
46-
from pymongo.lock import _ACondition, _ALock, _create_lock
46+
from pymongo.lock import (
47+
_async_cond_wait,
48+
_async_create_condition,
49+
_async_create_lock,
50+
)
4751
from pymongo.logger import (
4852
_SDAM_LOGGER,
4953
_SERVER_SELECTION_LOGGER,
@@ -169,9 +173,8 @@ def __init__(self, topology_settings: TopologySettings):
169173
self._seed_addresses = list(topology_description.server_descriptions())
170174
self._opened = False
171175
self._closed = False
172-
_lock = _create_lock()
173-
self._lock = _ALock(_lock)
174-
self._condition = _ACondition(self._settings.condition_class(_lock))
176+
self._lock = _async_create_lock()
177+
self._condition = _async_create_condition(self._lock, self._settings.condition_class)
175178
self._servers: dict[_Address, Server] = {}
176179
self._pid: Optional[int] = None
177180
self._max_cluster_time: Optional[ClusterTime] = None
@@ -353,7 +356,7 @@ async def _select_servers_loop(
353356
# change, or for a timeout. We won't miss any changes that
354357
# came after our most recent apply_selector call, since we've
355358
# held the lock until now.
356-
await self._condition.wait(common.MIN_HEARTBEAT_INTERVAL)
359+
await _async_cond_wait(self._condition, common.MIN_HEARTBEAT_INTERVAL)
357360
self._description.check_compatible()
358361
now = time.monotonic()
359362
server_descriptions = self._description.apply_selector(
@@ -653,7 +656,7 @@ async def request_check_all(self, wait_time: int = 5) -> None:
653656
"""Wake all monitors, wait for at least one to check its server."""
654657
async with self._lock:
655658
self._request_check_all()
656-
await self._condition.wait(wait_time)
659+
await _async_cond_wait(self._condition, wait_time)
657660

658661
def data_bearing_servers(self) -> list[ServerDescription]:
659662
"""Return a list of all data-bearing servers.

0 commit comments

Comments
 (0)