From 19f14a9975ffe1541e91e25f3e3a01b4402358a1 Mon Sep 17 00:00:00 2001 From: Iris Date: Thu, 3 Oct 2024 18:37:07 -0700 Subject: [PATCH 01/25] create AsyncPeriodicExecutor --- pymongo/asynchronous/mongo_client.py | 2 +- pymongo/asynchronous/monitor.py | 20 ++-- pymongo/asynchronous/periodic_executor.py | 119 ++++++++++++++++++---- pymongo/asynchronous/settings.py | 5 +- pymongo/asynchronous/topology.py | 21 ++-- pymongo/synchronous/monitor.py | 10 +- pymongo/synchronous/periodic_executor.py | 111 ++++++++++++++++---- pymongo/synchronous/settings.py | 5 +- pymongo/synchronous/topology.py | 17 ++-- test/test_collection.py | 5 +- 10 files changed, 250 insertions(+), 65 deletions(-) diff --git a/pymongo/asynchronous/mongo_client.py b/pymongo/asynchronous/mongo_client.py index 814c604562..005cbb6a23 100644 --- a/pymongo/asynchronous/mongo_client.py +++ b/pymongo/asynchronous/mongo_client.py @@ -909,7 +909,7 @@ async def target() -> bool: await AsyncMongoClient._process_periodic_tasks(client) return True - executor = periodic_executor.PeriodicExecutor( + executor = periodic_executor.AsyncPeriodicExecutor( interval=common.KILL_CURSOR_FREQUENCY, min_interval=common.MIN_HEARTBEAT_INTERVAL, target=target, diff --git a/pymongo/asynchronous/monitor.py b/pymongo/asynchronous/monitor.py index f9e912b084..a61c98cdba 100644 --- a/pymongo/asynchronous/monitor.py +++ b/pymongo/asynchronous/monitor.py @@ -16,6 +16,7 @@ from __future__ import annotations +import asyncio import atexit import logging import time @@ -76,7 +77,7 @@ async def target() -> bool: await monitor._run() # type:ignore[attr-defined] return True - executor = periodic_executor.PeriodicExecutor( + executor = periodic_executor.AsyncPeriodicExecutor( interval=interval, min_interval=min_interval, target=target, name=name ) @@ -112,9 +113,9 @@ async def close(self) -> None: """ self.gc_safe_close() - def join(self, timeout: Optional[int] = None) -> None: + async def join(self, timeout: Optional[int] = None) -> None: """Wait for the monitor to stop.""" - self._executor.join(timeout) + await self._executor.join(timeout) def request_check(self) -> None: """If the monitor is sleeping, wake it soon.""" @@ -521,14 +522,21 @@ def _shutdown_monitors() -> None: monitor = None -def _shutdown_resources() -> None: +async def _shutdown_resources() -> None: # _shutdown_monitors/_shutdown_executors may already be GC'd at shutdown. shutdown = _shutdown_monitors if shutdown: # type:ignore[truthy-function] shutdown() shutdown = _shutdown_executors if shutdown: # type:ignore[truthy-function] - shutdown() + await shutdown() + + +def _run_shutdown_resources(): + if _IS_SYNC: + _shutdown_resources() + else: + asyncio.run(_shutdown_resources()) -atexit.register(_shutdown_resources) +atexit.register(_run_shutdown_resources) diff --git a/pymongo/asynchronous/periodic_executor.py b/pymongo/asynchronous/periodic_executor.py index f3d2fddba3..7f3da7df9a 100644 --- a/pymongo/asynchronous/periodic_executor.py +++ b/pymongo/asynchronous/periodic_executor.py @@ -23,7 +23,7 @@ import weakref from typing import Any, Optional -from pymongo.lock import _ALock, _create_lock +from pymongo.lock import _create_lock _IS_SYNC = False @@ -59,24 +59,11 @@ def __init__( self._name = name self._skip_sleep = False self._thread_will_exit = False - self._lock = _ALock(_create_lock()) + self._lock = _create_lock() def __repr__(self) -> str: return f"<{self.__class__.__name__}(name={self._name}) object at 0x{id(self):x}>" - def _run_async(self) -> None: - # The default asyncio loop implementation on Windows - # has issues with sharing sockets across loops (https://github.com/python/cpython/issues/122240) - # We explicitly use a different loop implementation here to prevent that issue - if sys.platform == "win32": - loop = asyncio.SelectorEventLoop() - try: - loop.run_until_complete(self._run()) # type: ignore[func-returns-value] - finally: - loop.close() - else: - asyncio.run(self._run()) # type: ignore[func-returns-value] - def open(self) -> None: """Start. Multiple calls have no effect. @@ -104,10 +91,7 @@ def open(self) -> None: pass if not started: - if _IS_SYNC: - thread = threading.Thread(target=self._run, name=self._name) - else: - thread = threading.Thread(target=self._run_async, name=self._name) + thread = threading.Thread(target=self._run, name=self._name) thread.daemon = True self._thread = weakref.proxy(thread) _register_executor(self) @@ -179,6 +163,99 @@ async def _run(self) -> None: self._event = False +class AsyncPeriodicExecutor: + def __init__( + self, + interval: float, + min_interval: float, + target: Any, + name: Optional[str] = None, + ): + """Run a target function periodically on a background thread. + + If the target's return value is false, the executor stops. + + :param interval: Seconds between calls to `target`. + :param min_interval: Minimum seconds between calls if `wake` is + called very often. + :param target: A function. + :param name: A name to give the underlying thread. + """ + # threading.Event and its internal condition variable are expensive + # in Python 2, see PYTHON-983. Use a boolean to know when to wake. + # The executor's design is constrained by several Python issues, see + # "periodic_executor.rst" in this repository. + self._event = False + self._interval = interval + self._min_interval = min_interval + self._target = target + self._stopped = False + self._task: Optional[asyncio.Task] = None + self._name = name + self._skip_sleep = False + + def __repr__(self) -> str: + return f"<{self.__class__.__name__}(name={self._name}) object at 0x{id(self):x}>" + + def open(self) -> None: + """Start. Multiple calls have no effect.""" + self._stopped = False + started = self._task and not self._task.done() + + if not started: + self._task = asyncio.get_event_loop().create_task(self._run(), name=self._name) + + def close(self, dummy: Any = None) -> None: + """Stop. To restart, call open(). + + The dummy parameter allows an executor's close method to be a weakref + callback; see monitor.py. + """ + self._stopped = True + + async def join(self, timeout: Optional[int] = None) -> None: + if self._task is not None: + try: + await asyncio.wait_for(self._task, timeout=timeout) + except asyncio.TimeoutError: + # Task timed out + pass + except asyncio.exceptions.CancelledError: + # Task was already finished, or not yet started. + pass + + def wake(self) -> None: + """Execute the target function soon.""" + self._event = True + + def update_interval(self, new_interval: int) -> None: + self._interval = new_interval + + def skip_sleep(self) -> None: + self._skip_sleep = True + + async def _run(self) -> None: + while not self._stopped: + try: + if not await self._target(): + self._stopped = True + break + except BaseException: + self._stopped = True + raise + + if self._skip_sleep: + self._skip_sleep = False + else: + deadline = time.monotonic() + self._interval + while not self._stopped and time.monotonic() < deadline: + await asyncio.sleep(self._min_interval) + if self._event: + break # Early wake. + + self._event = False + + # _EXECUTORS has a weakref to each running PeriodicExecutor. Once started, # an executor is kept alive by a strong reference from its thread and perhaps # from other objects. When the thread dies and all other referrers are freed, @@ -197,7 +274,7 @@ def _on_executor_deleted(ref: weakref.ReferenceType[PeriodicExecutor]) -> None: _EXECUTORS.remove(ref) -def _shutdown_executors() -> None: +async def _shutdown_executors() -> None: if _EXECUTORS is None: return @@ -214,6 +291,6 @@ def _shutdown_executors() -> None: for ref in executors: executor = ref() if executor: - executor.join(1) + await executor.join(1) executor = None diff --git a/pymongo/asynchronous/settings.py b/pymongo/asynchronous/settings.py index 1103e1bd18..ec24587d9a 100644 --- a/pymongo/asynchronous/settings.py +++ b/pymongo/asynchronous/settings.py @@ -15,6 +15,7 @@ """Represent MongoClient's configuration.""" from __future__ import annotations +import asyncio import threading import traceback from typing import Any, Collection, Optional, Type, Union @@ -67,7 +68,7 @@ def __init__( self._pool_class: Type[Pool] = pool_class or pool.Pool self._pool_options: PoolOptions = pool_options or PoolOptions() self._monitor_class: Type[monitor.Monitor] = monitor_class or monitor.Monitor - self._condition_class: Type[threading.Condition] = condition_class or threading.Condition + self._condition_class: Type[asyncio.Condition] = condition_class or asyncio.Condition self._local_threshold_ms = local_threshold_ms self._server_selection_timeout = server_selection_timeout self._server_selector = server_selector @@ -106,7 +107,7 @@ def monitor_class(self) -> Type[monitor.Monitor]: return self._monitor_class @property - def condition_class(self) -> Type[threading.Condition]: + def condition_class(self) -> Type[asyncio.Condition]: return self._condition_class @property diff --git a/pymongo/asynchronous/topology.py b/pymongo/asynchronous/topology.py index 82af4257ba..90628c0f78 100644 --- a/pymongo/asynchronous/topology.py +++ b/pymongo/asynchronous/topology.py @@ -16,6 +16,7 @@ from __future__ import annotations +import asyncio import logging import os import queue @@ -44,7 +45,6 @@ WriteError, ) from pymongo.hello import Hello -from pymongo.lock import _ACondition, _ALock, _create_lock from pymongo.logger import ( _SDAM_LOGGER, _SERVER_SELECTION_LOGGER, @@ -170,9 +170,8 @@ def __init__(self, topology_settings: TopologySettings): self._seed_addresses = list(topology_description.server_descriptions()) self._opened = False self._closed = False - _lock = _create_lock() - self._lock = _ALock(_lock) - self._condition = _ACondition(self._settings.condition_class(_lock)) + self._lock = asyncio.Lock() + self._condition = self._settings.condition_class(self._lock) # type: ignore[arg-type] self._servers: dict[_Address, Server] = {} self._pid: Optional[int] = None self._max_cluster_time: Optional[ClusterTime] = None @@ -185,7 +184,7 @@ def __init__(self, topology_settings: TopologySettings): async def target() -> bool: return process_events_queue(weak) - executor = periodic_executor.PeriodicExecutor( + executor = periodic_executor.AsyncPeriodicExecutor( interval=common.EVENTS_QUEUE_FREQUENCY, min_interval=common.MIN_HEARTBEAT_INTERVAL, target=target, @@ -354,7 +353,10 @@ async def _select_servers_loop( # change, or for a timeout. We won't miss any changes that # came after our most recent apply_selector call, since we've # held the lock until now. - await self._condition.wait(common.MIN_HEARTBEAT_INTERVAL) + try: + await asyncio.wait_for(self._condition.wait(), common.MIN_HEARTBEAT_INTERVAL) + except asyncio.TimeoutError: + pass self._description.check_compatible() now = time.monotonic() server_descriptions = self._description.apply_selector( @@ -654,7 +656,10 @@ async def request_check_all(self, wait_time: int = 5) -> None: """Wake all monitors, wait for at least one to check its server.""" async with self._lock: self._request_check_all() - await self._condition.wait(wait_time) + try: + await asyncio.wait_for(self._condition.wait(), wait_time) + except TimeoutError: + pass def data_bearing_servers(self) -> list[ServerDescription]: """Return a list of all data-bearing servers. @@ -742,7 +747,7 @@ async def close(self) -> None: if self._publish_server or self._publish_tp: # Make sure the events executor thread is fully closed before publishing the remaining events self.__events_executor.close() - self.__events_executor.join(1) + await self.__events_executor.join(1) process_events_queue(weakref.ref(self._events)) # type: ignore[arg-type] @property diff --git a/pymongo/synchronous/monitor.py b/pymongo/synchronous/monitor.py index 3f9bb2ea75..dd5a051c16 100644 --- a/pymongo/synchronous/monitor.py +++ b/pymongo/synchronous/monitor.py @@ -16,6 +16,7 @@ from __future__ import annotations +import asyncio import atexit import logging import time @@ -531,4 +532,11 @@ def _shutdown_resources() -> None: shutdown() -atexit.register(_shutdown_resources) +def _run_shutdown_resources(): + if _IS_SYNC: + _shutdown_resources() + else: + asyncio.run(_shutdown_resources()) + + +atexit.register(_run_shutdown_resources) diff --git a/pymongo/synchronous/periodic_executor.py b/pymongo/synchronous/periodic_executor.py index 525268b14b..0031e3db90 100644 --- a/pymongo/synchronous/periodic_executor.py +++ b/pymongo/synchronous/periodic_executor.py @@ -64,19 +64,6 @@ def __init__( def __repr__(self) -> str: return f"<{self.__class__.__name__}(name={self._name}) object at 0x{id(self):x}>" - def _run_async(self) -> None: - # The default asyncio loop implementation on Windows - # has issues with sharing sockets across loops (https://github.com/python/cpython/issues/122240) - # We explicitly use a different loop implementation here to prevent that issue - if sys.platform == "win32": - loop = asyncio.SelectorEventLoop() - try: - loop.run_until_complete(self._run()) # type: ignore[func-returns-value] - finally: - loop.close() - else: - asyncio.run(self._run()) # type: ignore[func-returns-value] - def open(self) -> None: """Start. Multiple calls have no effect. @@ -104,10 +91,7 @@ def open(self) -> None: pass if not started: - if _IS_SYNC: - thread = threading.Thread(target=self._run, name=self._name) - else: - thread = threading.Thread(target=self._run_async, name=self._name) + thread = threading.Thread(target=self._run, name=self._name) thread.daemon = True self._thread = weakref.proxy(thread) _register_executor(self) @@ -179,6 +163,99 @@ def _run(self) -> None: self._event = False +class PeriodicExecutor: + def __init__( + self, + interval: float, + min_interval: float, + target: Any, + name: Optional[str] = None, + ): + """Run a target function periodically on a background thread. + + If the target's return value is false, the executor stops. + + :param interval: Seconds between calls to `target`. + :param min_interval: Minimum seconds between calls if `wake` is + called very often. + :param target: A function. + :param name: A name to give the underlying thread. + """ + # threading.Event and its internal condition variable are expensive + # in Python 2, see PYTHON-983. Use a boolean to know when to wake. + # The executor's design is constrained by several Python issues, see + # "periodic_executor.rst" in this repository. + self._event = False + self._interval = interval + self._min_interval = min_interval + self._target = target + self._stopped = False + self._task: Optional[asyncio.Task] = None + self._name = name + self._skip_sleep = False + + def __repr__(self) -> str: + return f"<{self.__class__.__name__}(name={self._name}) object at 0x{id(self):x}>" + + def open(self) -> None: + """Start. Multiple calls have no effect.""" + self._stopped = False + started = self._task and not self._task.done() + + if not started: + self._task = asyncio.get_event_loop().create_task(self._run(), name=self._name) + + def close(self, dummy: Any = None) -> None: + """Stop. To restart, call open(). + + The dummy parameter allows an executor's close method to be a weakref + callback; see monitor.py. + """ + self._stopped = True + + def join(self, timeout: Optional[int] = None) -> None: + if self._task is not None: + try: + asyncio.wait_for(self._task, timeout=timeout) + except asyncio.TimeoutError: + # Task timed out + pass + except asyncio.exceptions.CancelledError: + # Task was already finished, or not yet started. + pass + + def wake(self) -> None: + """Execute the target function soon.""" + self._event = True + + def update_interval(self, new_interval: int) -> None: + self._interval = new_interval + + def skip_sleep(self) -> None: + self._skip_sleep = True + + def _run(self) -> None: + while not self._stopped: + try: + if not self._target(): + self._stopped = True + break + except BaseException: + self._stopped = True + raise + + if self._skip_sleep: + self._skip_sleep = False + else: + deadline = time.monotonic() + self._interval + while not self._stopped and time.monotonic() < deadline: + time.sleep(self._min_interval) + if self._event: + break # Early wake. + + self._event = False + + # _EXECUTORS has a weakref to each running PeriodicExecutor. Once started, # an executor is kept alive by a strong reference from its thread and perhaps # from other objects. When the thread dies and all other referrers are freed, diff --git a/pymongo/synchronous/settings.py b/pymongo/synchronous/settings.py index 040776713f..24ef533410 100644 --- a/pymongo/synchronous/settings.py +++ b/pymongo/synchronous/settings.py @@ -15,6 +15,7 @@ """Represent MongoClient's configuration.""" from __future__ import annotations +import asyncio import threading import traceback from typing import Any, Collection, Optional, Type, Union @@ -67,7 +68,7 @@ def __init__( self._pool_class: Type[Pool] = pool_class or pool.Pool self._pool_options: PoolOptions = pool_options or PoolOptions() self._monitor_class: Type[monitor.Monitor] = monitor_class or monitor.Monitor - self._condition_class: Type[threading.Condition] = condition_class or threading.Condition + self._condition_class: Type[asyncio.Condition] = condition_class or asyncio.Condition self._local_threshold_ms = local_threshold_ms self._server_selection_timeout = server_selection_timeout self._server_selector = server_selector @@ -106,7 +107,7 @@ def monitor_class(self) -> Type[monitor.Monitor]: return self._monitor_class @property - def condition_class(self) -> Type[threading.Condition]: + def condition_class(self) -> Type[asyncio.Condition]: return self._condition_class @property diff --git a/pymongo/synchronous/topology.py b/pymongo/synchronous/topology.py index a350c1702e..dfcf0e4211 100644 --- a/pymongo/synchronous/topology.py +++ b/pymongo/synchronous/topology.py @@ -16,6 +16,7 @@ from __future__ import annotations +import asyncio import logging import os import queue @@ -39,7 +40,6 @@ WriteError, ) from pymongo.hello import Hello -from pymongo.lock import _create_lock, _Lock from pymongo.logger import ( _SDAM_LOGGER, _SERVER_SELECTION_LOGGER, @@ -170,9 +170,8 @@ def __init__(self, topology_settings: TopologySettings): self._seed_addresses = list(topology_description.server_descriptions()) self._opened = False self._closed = False - _lock = _create_lock() - self._lock = _Lock(_lock) - self._condition = self._settings.condition_class(_lock) + self._lock = asyncio.Lock() + self._condition = self._settings.condition_class(self._lock) # type: ignore[arg-type] self._servers: dict[_Address, Server] = {} self._pid: Optional[int] = None self._max_cluster_time: Optional[ClusterTime] = None @@ -354,7 +353,10 @@ def _select_servers_loop( # change, or for a timeout. We won't miss any changes that # came after our most recent apply_selector call, since we've # held the lock until now. - self._condition.wait(common.MIN_HEARTBEAT_INTERVAL) + try: + asyncio.wait_for(self._condition.wait(), common.MIN_HEARTBEAT_INTERVAL) + except asyncio.TimeoutError: + pass self._description.check_compatible() now = time.monotonic() server_descriptions = self._description.apply_selector( @@ -652,7 +654,10 @@ def request_check_all(self, wait_time: int = 5) -> None: """Wake all monitors, wait for at least one to check its server.""" with self._lock: self._request_check_all() - self._condition.wait(wait_time) + try: + asyncio.wait_for(self._condition.wait(), wait_time) + except TimeoutError: + pass def data_bearing_servers(self) -> list[ServerDescription]: """Return a list of all data-bearing servers. diff --git a/test/test_collection.py b/test/test_collection.py index a2c3b0b0b6..f2f01ac686 100644 --- a/test/test_collection.py +++ b/test/test_collection.py @@ -1009,7 +1009,10 @@ def test_replace_bypass_document_validation(self): db.test.insert_one({"y": 1}, bypass_document_validation=True) db_w0.test.replace_one({"y": 1}, {"x": 1}, bypass_document_validation=True) - wait_until(lambda: db_w0.test.find_one({"x": 1}), "find w:0 replaced document") + def predicate(): + return db_w0.test.find_one({"x": 1}) + + wait_until(predicate, "find w:0 replaced document") def test_update_bypass_document_validation(self): db = self.db From 59c073331369eafdc600af9e4e06661df4bbbfc1 Mon Sep 17 00:00:00 2001 From: Iris Date: Thu, 3 Oct 2024 18:37:16 -0700 Subject: [PATCH 02/25] fix tests --- test/asynchronous/test_client.py | 61 ++++++++++++++------------ test/asynchronous/test_collection.py | 8 ++-- test/asynchronous/test_cursor.py | 8 ++-- test/asynchronous/test_session.py | 1 - test/asynchronous/test_transactions.py | 6 +-- test/utils.py | 5 ++- tools/synchro.py | 1 + 7 files changed, 51 insertions(+), 39 deletions(-) diff --git a/test/asynchronous/test_client.py b/test/asynchronous/test_client.py index 2052d1cd7f..d14f3282ce 100644 --- a/test/asynchronous/test_client.py +++ b/test/asynchronous/test_client.py @@ -73,7 +73,7 @@ is_greenthread_patched, lazy_client_trial, one, - wait_until, + async_wait_until, ) import bson @@ -693,8 +693,8 @@ async def test_max_idle_time_reaper_removes_stale_minPoolSize(self): # When the reaper runs at the same time as the get_socket, two # connections could be created and checked into the pool. self.assertGreaterEqual(len(server._pool.conns), 1) - wait_until(lambda: conn not in server._pool.conns, "remove stale socket") - wait_until(lambda: len(server._pool.conns) >= 1, "replace stale socket") + await async_wait_until(lambda: conn not in server._pool.conns, "remove stale socket") + await async_wait_until(lambda: len(server._pool.conns) >= 1, "replace stale socket") async def test_max_idle_time_reaper_does_not_exceed_maxPoolSize(self): with client_knobs(kill_cursor_frequency=0.1): @@ -710,8 +710,8 @@ async def test_max_idle_time_reaper_does_not_exceed_maxPoolSize(self): # When the reaper runs at the same time as the get_socket, # maxPoolSize=1 should prevent two connections from being created. self.assertEqual(1, len(server._pool.conns)) - wait_until(lambda: conn not in server._pool.conns, "remove stale socket") - wait_until(lambda: len(server._pool.conns) == 1, "replace stale socket") + await async_wait_until(lambda: conn not in server._pool.conns, "remove stale socket") + await async_wait_until(lambda: len(server._pool.conns) == 1, "replace stale socket") async def test_max_idle_time_reaper_removes_stale(self): with client_knobs(kill_cursor_frequency=0.1): @@ -727,7 +727,7 @@ async def test_max_idle_time_reaper_removes_stale(self): async with server._pool.checkout() as conn_two: pass self.assertIs(conn_one, conn_two) - wait_until( + await async_wait_until( lambda: len(server._pool.conns) == 0, "stale socket reaped and new one NOT added to the pool", ) @@ -745,7 +745,7 @@ async def test_min_pool_size(self): server = await (await client._get_topology()).select_server( readable_server_selector, _Op.TEST ) - wait_until( + await async_wait_until( lambda: len(server._pool.conns) == 10, "pool initialized with 10 connections", ) @@ -753,7 +753,7 @@ async def test_min_pool_size(self): # Assert that if a socket is closed, a new one takes its place async with server._pool.checkout() as conn: conn.close_conn(None) - wait_until( + await async_wait_until( lambda: len(server._pool.conns) == 10, "a closed socket gets replaced from the pool", ) @@ -941,8 +941,8 @@ async def test_repr(self): async with eval(the_repr) as client_two: self.assertEqual(client_two, client) - def test_getters(self): - wait_until(lambda: async_client_context.nodes == self.client.nodes, "find all nodes") + async def test_getters(self): + await async_wait_until(lambda: async_client_context.nodes == self.client.nodes, "find all nodes") async def test_list_databases(self): cmd_docs = (await self.client.admin.command("listDatabases"))["databases"] @@ -1067,14 +1067,21 @@ async def test_uri_connect_option(self): self.assertFalse(client._topology._opened) # Ensure kill cursors thread has not been started. - kc_thread = client._kill_cursors_executor._thread - self.assertFalse(kc_thread and kc_thread.is_alive()) - + if _IS_SYNC: + kc_thread = client._kill_cursors_executor._thread + self.assertFalse(kc_thread and kc_thread.is_alive()) + else: + kc_task = client._kill_cursors_executor._task + self.assertFalse(kc_task and not kc_task.done()) # Using the client should open topology and start the thread. await client.admin.command("ping") self.assertTrue(client._topology._opened) - kc_thread = client._kill_cursors_executor._thread - self.assertTrue(kc_thread and kc_thread.is_alive()) + if _IS_SYNC: + kc_thread = client._kill_cursors_executor._thread + self.assertTrue(kc_thread and kc_thread.is_alive()) + else: + kc_task = client._kill_cursors_executor._task + self.assertTrue(kc_task and not kc_task.done()) async def test_close_does_not_open_servers(self): client = await self.async_rs_client(connect=False) @@ -1610,7 +1617,7 @@ def init(self, *args): await async_client_context.port, ) await self.async_single_client(uri, event_listeners=[listener]) - wait_until( + await async_wait_until( lambda: len(listener.results) >= 2, "record two ServerHeartbeatStartedEvents" ) @@ -1768,16 +1775,16 @@ async def test_background_connections_do_not_hold_locks(self): pool = await async_get_pool(client) original_connect = pool.connect - def stall_connect(*args, **kwargs): - time.sleep(2) - return original_connect(*args, **kwargs) + async def stall_connect(*args, **kwargs): + await asyncio.sleep(2) + return await original_connect(*args, **kwargs) pool.connect = stall_connect # Un-patch Pool.connect to break the cyclic reference. self.addCleanup(delattr, pool, "connect") # Wait for the background thread to start creating connections - wait_until(lambda: len(pool.conns) > 1, "start creating connections") + await async_wait_until(lambda: len(pool.conns) > 1, "start creating connections") # Assert that application operations do not block. for _ in range(10): @@ -1860,7 +1867,7 @@ async def test_process_periodic_tasks(self): await client.close() # Add cursor to kill cursors queue del cursor - wait_until( + await async_wait_until( lambda: client._kill_cursors_queue, "waited for cursor to be added to queue", ) @@ -2218,7 +2225,7 @@ async def test_exhaust_getmore_network_error(self): await cursor.to_list() self.assertTrue(conn.closed) - wait_until( + await async_wait_until( lambda: len(client._kill_cursors_queue) == 0, "waited for all killCursor requests to complete", ) @@ -2389,7 +2396,7 @@ async def test_discover_primary(self): ) self.addAsyncCleanup(c.close) - wait_until(lambda: len(c.nodes) == 3, "connect") + await async_wait_until(lambda: len(c.nodes) == 3, "connect") self.assertEqual(await c.address, ("a", 1)) # Fail over. @@ -2416,7 +2423,7 @@ async def test_reconnect(self): ) self.addAsyncCleanup(c.close) - wait_until(lambda: len(c.nodes) == 3, "connect") + await async_wait_until(lambda: len(c.nodes) == 3, "connect") # Total failure. c.kill_host("a:1") @@ -2458,7 +2465,7 @@ async def _test_network_error(self, operation_callback): c.set_wire_version_range("a:1", 2, 6) c.set_wire_version_range("b:2", 2, 7) await (await c._get_topology()).select_servers(writable_server_selector, _Op.TEST) - wait_until(lambda: len(c.nodes) == 2, "connect") + await async_wait_until(lambda: len(c.nodes) == 2, "connect") c.kill_host("a:1") @@ -2530,7 +2537,7 @@ async def test_rs_client_does_not_maintain_pool_to_arbiters(self): ) self.addAsyncCleanup(c.close) - wait_until(lambda: len(c.nodes) == 3, "connect") + await async_wait_until(lambda: len(c.nodes) == 3, "connect") self.assertEqual(await c.address, ("a", 1)) self.assertEqual(await c.arbiters, {("c", 3)}) # Assert that we create 2 and only 2 pooled connections. @@ -2560,7 +2567,7 @@ async def test_direct_client_maintains_pool_to_arbiter(self): ) self.addAsyncCleanup(c.close) - wait_until(lambda: len(c.nodes) == 1, "connect") + await async_wait_until(lambda: len(c.nodes) == 1, "connect") self.assertEqual(await c.address, ("c", 3)) # Assert that we create 1 pooled connection. listener.wait_for_event(monitoring.ConnectionReadyEvent, 1) diff --git a/test/asynchronous/test_collection.py b/test/asynchronous/test_collection.py index 612090b69f..470425f4ce 100644 --- a/test/asynchronous/test_collection.py +++ b/test/asynchronous/test_collection.py @@ -39,7 +39,6 @@ async_get_pool, async_is_mongos, async_wait_until, - wait_until, ) from bson import encode @@ -1022,7 +1021,10 @@ async def test_replace_bypass_document_validation(self): await db.test.insert_one({"y": 1}, bypass_document_validation=True) await db_w0.test.replace_one({"y": 1}, {"x": 1}, bypass_document_validation=True) - await async_wait_until(lambda: db_w0.test.find_one({"x": 1}), "find w:0 replaced document") + async def predicate(): + return await db_w0.test.find_one({"x": 1}) + + await async_wait_until(predicate, "find w:0 replaced document") async def test_update_bypass_document_validation(self): db = self.db @@ -1870,7 +1872,7 @@ async def test_exhaust(self): await cur.close() cur = None # Wait until the background thread returns the socket. - wait_until(lambda: pool.active_sockets == 0, "return socket") + await async_wait_until(lambda: pool.active_sockets == 0, "return socket") # The socket should be discarded. self.assertEqual(0, len(pool.conns)) diff --git a/test/asynchronous/test_cursor.py b/test/asynchronous/test_cursor.py index e79ad00641..5d640ba305 100644 --- a/test/asynchronous/test_cursor.py +++ b/test/asynchronous/test_cursor.py @@ -34,9 +34,9 @@ AllowListEventListener, EventListener, OvertCommandListener, + async_wait_until, delay, ignore_deprecations, - wait_until, ) from bson import decode_all @@ -1324,8 +1324,8 @@ async def test_timeout_kills_cursor_asynchronously(self): with self.assertRaises(ExecutionTimeout): await cursor.next() - def assertCursorKilled(): - wait_until( + async def assertCursorKilled(): + await async_wait_until( lambda: len(listener.succeeded_events), "find successful killCursors command", ) @@ -1335,7 +1335,7 @@ def assertCursorKilled(): self.assertEqual(1, len(listener.succeeded_events)) self.assertEqual("killCursors", listener.succeeded_events[0].command_name) - assertCursorKilled() + await assertCursorKilled() listener.reset() cursor = await coll.aggregate([], batchSize=1) diff --git a/test/asynchronous/test_session.py b/test/asynchronous/test_session.py index d264b5ecb0..c1dac6f56d 100644 --- a/test/asynchronous/test_session.py +++ b/test/asynchronous/test_session.py @@ -37,7 +37,6 @@ EventListener, ExceptionCatchingThread, async_wait_until, - wait_until, ) from bson import DBRef diff --git a/test/asynchronous/test_transactions.py b/test/asynchronous/test_transactions.py index b5d0686417..229046e79b 100644 --- a/test/asynchronous/test_transactions.py +++ b/test/asynchronous/test_transactions.py @@ -26,7 +26,7 @@ from test.asynchronous import AsyncIntegrationTest, async_client_context, unittest from test.utils import ( OvertCommandListener, - wait_until, + async_wait_until, ) from typing import List @@ -162,7 +162,7 @@ async def test_unpin_for_next_transaction(self): client = await self.async_rs_client( async_client_context.mongos_seeds(), localThresholdMS=1000 ) - wait_until(lambda: len(client.nodes) > 1, "discover both mongoses") + await async_wait_until(lambda: len(client.nodes) > 1, "discover both mongoses") coll = client.test.test # Create the collection. await coll.insert_one({}) @@ -191,7 +191,7 @@ async def test_unpin_for_non_transaction_operation(self): client = await self.async_rs_client( async_client_context.mongos_seeds(), localThresholdMS=1000 ) - wait_until(lambda: len(client.nodes) > 1, "discover both mongoses") + await async_wait_until(lambda: len(client.nodes) > 1, "discover both mongoses") coll = client.test.test # Create the collection. await coll.insert_one({}) diff --git a/test/utils.py b/test/utils.py index 9c78cff3ad..4d7dc58378 100644 --- a/test/utils.py +++ b/test/utils.py @@ -789,7 +789,10 @@ async def async_wait_until(predicate, success_description, timeout=10): start = time.time() interval = min(float(timeout) / 100, 0.1) while True: - retval = await predicate() + if iscoroutinefunction(predicate): + retval = await predicate() + else: + retval = predicate() if retval: return retval diff --git a/tools/synchro.py b/tools/synchro.py index 585fc5fefd..f0a9e91d1b 100644 --- a/tools/synchro.py +++ b/tools/synchro.py @@ -108,6 +108,7 @@ "async_set_fail_point": "set_fail_point", "async_ensure_all_connected": "ensure_all_connected", "async_repl_set_step_down": "repl_set_step_down", + "AsyncPeriodicExecutor": "PeriodicExecutor", } docstring_replacements: dict[tuple[str, str], str] = { From 3f5dda7bce1676a0402df6ef7f1886afde641eb7 Mon Sep 17 00:00:00 2001 From: Iris Date: Fri, 4 Oct 2024 12:24:26 -0700 Subject: [PATCH 03/25] undo lock changes --- pymongo/asynchronous/periodic_executor.py | 148 +++++++++++----------- pymongo/asynchronous/topology.py | 3 +- pymongo/synchronous/periodic_executor.py | 140 ++++++++++---------- pymongo/synchronous/topology.py | 3 +- test/asynchronous/test_client.py | 5 +- test/test_client.py | 17 ++- 6 files changed, 163 insertions(+), 153 deletions(-) diff --git a/pymongo/asynchronous/periodic_executor.py b/pymongo/asynchronous/periodic_executor.py index 7f3da7df9a..122e2ea643 100644 --- a/pymongo/asynchronous/periodic_executor.py +++ b/pymongo/asynchronous/periodic_executor.py @@ -28,7 +28,7 @@ _IS_SYNC = False -class PeriodicExecutor: +class AsyncPeriodicExecutor: def __init__( self, interval: float, @@ -55,55 +55,20 @@ def __init__( self._min_interval = min_interval self._target = target self._stopped = False - self._thread: Optional[threading.Thread] = None + self._task: Optional[asyncio.Task] = None self._name = name self._skip_sleep = False - self._thread_will_exit = False - self._lock = _create_lock() def __repr__(self) -> str: return f"<{self.__class__.__name__}(name={self._name}) object at 0x{id(self):x}>" def open(self) -> None: - """Start. Multiple calls have no effect. - - Not safe to call from multiple threads at once. - """ - with self._lock: - if self._thread_will_exit: - # If the background thread has read self._stopped as True - # there is a chance that it has not yet exited. The call to - # join should not block indefinitely because there is no - # other work done outside the while loop in self._run. - try: - assert self._thread is not None - self._thread.join() - except ReferenceError: - # Thread terminated. - pass - self._thread_will_exit = False - self._stopped = False - started: Any = False - try: - started = self._thread and self._thread.is_alive() - except ReferenceError: - # Thread terminated. - pass + """Start. Multiple calls have no effect.""" + self._stopped = False + started = self._task and not self._task.done() if not started: - thread = threading.Thread(target=self._run, name=self._name) - thread.daemon = True - self._thread = weakref.proxy(thread) - _register_executor(self) - # Mitigation to RuntimeError firing when thread starts on shutdown - # https://github.com/python/cpython/issues/114570 - try: - thread.start() - except RuntimeError as e: - if "interpreter shutdown" in str(e) or sys.is_finalizing(): - self._thread = None - return - raise + self._task = asyncio.get_event_loop().create_task(self._run(), name=self._name) def close(self, dummy: Any = None) -> None: """Stop. To restart, call open(). @@ -113,12 +78,15 @@ def close(self, dummy: Any = None) -> None: """ self._stopped = True - def join(self, timeout: Optional[int] = None) -> None: - if self._thread is not None: + async def join(self, timeout: Optional[int] = None) -> None: + if self._task is not None: try: - self._thread.join(timeout) - except (ReferenceError, RuntimeError): - # Thread already terminated, or not yet started. + await asyncio.wait_for(self._task, timeout=timeout) + except asyncio.TimeoutError: + # Task timed out + pass + except asyncio.exceptions.CancelledError: + # Task was already finished, or not yet started. pass def wake(self) -> None: @@ -131,24 +99,14 @@ def update_interval(self, new_interval: int) -> None: def skip_sleep(self) -> None: self._skip_sleep = True - async def _should_stop(self) -> bool: - async with self._lock: - if self._stopped: - self._thread_will_exit = True - return True - return False - async def _run(self) -> None: - while not await self._should_stop(): + while not self._stopped: try: if not await self._target(): self._stopped = True break except BaseException: - async with self._lock: - self._stopped = True - self._thread_will_exit = True - + self._stopped = True raise if self._skip_sleep: @@ -163,7 +121,7 @@ async def _run(self) -> None: self._event = False -class AsyncPeriodicExecutor: +class PeriodicExecutor: def __init__( self, interval: float, @@ -190,20 +148,55 @@ def __init__( self._min_interval = min_interval self._target = target self._stopped = False - self._task: Optional[asyncio.Task] = None + self._thread: Optional[threading.Thread] = None self._name = name self._skip_sleep = False + self._thread_will_exit = False + self._lock = _create_lock() def __repr__(self) -> str: return f"<{self.__class__.__name__}(name={self._name}) object at 0x{id(self):x}>" def open(self) -> None: - """Start. Multiple calls have no effect.""" - self._stopped = False - started = self._task and not self._task.done() + """Start. Multiple calls have no effect. + + Not safe to call from multiple threads at once. + """ + with self._lock: + if self._thread_will_exit: + # If the background thread has read self._stopped as True + # there is a chance that it has not yet exited. The call to + # join should not block indefinitely because there is no + # other work done outside the while loop in self._run. + try: + assert self._thread is not None + self._thread.join() + except ReferenceError: + # Thread terminated. + pass + self._thread_will_exit = False + self._stopped = False + started: Any = False + try: + started = self._thread and self._thread.is_alive() + except ReferenceError: + # Thread terminated. + pass if not started: - self._task = asyncio.get_event_loop().create_task(self._run(), name=self._name) + thread = threading.Thread(target=self._run, name=self._name) + thread.daemon = True + self._thread = weakref.proxy(thread) + _register_executor(self) + # Mitigation to RuntimeError firing when thread starts on shutdown + # https://github.com/python/cpython/issues/114570 + try: + thread.start() + except RuntimeError as e: + if "interpreter shutdown" in str(e) or sys.is_finalizing(): + self._thread = None + return + raise def close(self, dummy: Any = None) -> None: """Stop. To restart, call open(). @@ -213,15 +206,12 @@ def close(self, dummy: Any = None) -> None: """ self._stopped = True - async def join(self, timeout: Optional[int] = None) -> None: - if self._task is not None: + def join(self, timeout: Optional[int] = None) -> None: + if self._thread is not None: try: - await asyncio.wait_for(self._task, timeout=timeout) - except asyncio.TimeoutError: - # Task timed out - pass - except asyncio.exceptions.CancelledError: - # Task was already finished, or not yet started. + self._thread.join(timeout) + except (ReferenceError, RuntimeError): + # Thread already terminated, or not yet started. pass def wake(self) -> None: @@ -234,14 +224,24 @@ def update_interval(self, new_interval: int) -> None: def skip_sleep(self) -> None: self._skip_sleep = True + async def _should_stop(self) -> bool: + async with self._lock: + if self._stopped: + self._thread_will_exit = True + return True + return False + async def _run(self) -> None: - while not self._stopped: + while not await self._should_stop(): try: if not await self._target(): self._stopped = True break except BaseException: - self._stopped = True + async with self._lock: + self._stopped = True + self._thread_will_exit = True + raise if self._skip_sleep: diff --git a/pymongo/asynchronous/topology.py b/pymongo/asynchronous/topology.py index 90628c0f78..334ac166cd 100644 --- a/pymongo/asynchronous/topology.py +++ b/pymongo/asynchronous/topology.py @@ -45,6 +45,7 @@ WriteError, ) from pymongo.hello import Hello +from pymongo.lock import _create_lock from pymongo.logger import ( _SDAM_LOGGER, _SERVER_SELECTION_LOGGER, @@ -170,7 +171,7 @@ def __init__(self, topology_settings: TopologySettings): self._seed_addresses = list(topology_description.server_descriptions()) self._opened = False self._closed = False - self._lock = asyncio.Lock() + self._lock = _create_lock() self._condition = self._settings.condition_class(self._lock) # type: ignore[arg-type] self._servers: dict[_Address, Server] = {} self._pid: Optional[int] = None diff --git a/pymongo/synchronous/periodic_executor.py b/pymongo/synchronous/periodic_executor.py index 0031e3db90..bff6972a35 100644 --- a/pymongo/synchronous/periodic_executor.py +++ b/pymongo/synchronous/periodic_executor.py @@ -55,55 +55,20 @@ def __init__( self._min_interval = min_interval self._target = target self._stopped = False - self._thread: Optional[threading.Thread] = None + self._task: Optional[asyncio.Task] = None self._name = name self._skip_sleep = False - self._thread_will_exit = False - self._lock = _create_lock() def __repr__(self) -> str: return f"<{self.__class__.__name__}(name={self._name}) object at 0x{id(self):x}>" def open(self) -> None: - """Start. Multiple calls have no effect. - - Not safe to call from multiple threads at once. - """ - with self._lock: - if self._thread_will_exit: - # If the background thread has read self._stopped as True - # there is a chance that it has not yet exited. The call to - # join should not block indefinitely because there is no - # other work done outside the while loop in self._run. - try: - assert self._thread is not None - self._thread.join() - except ReferenceError: - # Thread terminated. - pass - self._thread_will_exit = False - self._stopped = False - started: Any = False - try: - started = self._thread and self._thread.is_alive() - except ReferenceError: - # Thread terminated. - pass + """Start. Multiple calls have no effect.""" + self._stopped = False + started = self._task and not self._task.done() if not started: - thread = threading.Thread(target=self._run, name=self._name) - thread.daemon = True - self._thread = weakref.proxy(thread) - _register_executor(self) - # Mitigation to RuntimeError firing when thread starts on shutdown - # https://github.com/python/cpython/issues/114570 - try: - thread.start() - except RuntimeError as e: - if "interpreter shutdown" in str(e) or sys.is_finalizing(): - self._thread = None - return - raise + self._task = asyncio.get_event_loop().create_task(self._run(), name=self._name) def close(self, dummy: Any = None) -> None: """Stop. To restart, call open(). @@ -114,11 +79,14 @@ def close(self, dummy: Any = None) -> None: self._stopped = True def join(self, timeout: Optional[int] = None) -> None: - if self._thread is not None: + if self._task is not None: try: - self._thread.join(timeout) - except (ReferenceError, RuntimeError): - # Thread already terminated, or not yet started. + asyncio.wait_for(self._task, timeout=timeout) + except asyncio.TimeoutError: + # Task timed out + pass + except asyncio.exceptions.CancelledError: + # Task was already finished, or not yet started. pass def wake(self) -> None: @@ -131,24 +99,14 @@ def update_interval(self, new_interval: int) -> None: def skip_sleep(self) -> None: self._skip_sleep = True - def _should_stop(self) -> bool: - with self._lock: - if self._stopped: - self._thread_will_exit = True - return True - return False - def _run(self) -> None: - while not self._should_stop(): + while not self._stopped: try: if not self._target(): self._stopped = True break except BaseException: - with self._lock: - self._stopped = True - self._thread_will_exit = True - + self._stopped = True raise if self._skip_sleep: @@ -190,20 +148,55 @@ def __init__( self._min_interval = min_interval self._target = target self._stopped = False - self._task: Optional[asyncio.Task] = None + self._thread: Optional[threading.Thread] = None self._name = name self._skip_sleep = False + self._thread_will_exit = False + self._lock = _create_lock() def __repr__(self) -> str: return f"<{self.__class__.__name__}(name={self._name}) object at 0x{id(self):x}>" def open(self) -> None: - """Start. Multiple calls have no effect.""" - self._stopped = False - started = self._task and not self._task.done() + """Start. Multiple calls have no effect. + + Not safe to call from multiple threads at once. + """ + with self._lock: + if self._thread_will_exit: + # If the background thread has read self._stopped as True + # there is a chance that it has not yet exited. The call to + # join should not block indefinitely because there is no + # other work done outside the while loop in self._run. + try: + assert self._thread is not None + self._thread.join() + except ReferenceError: + # Thread terminated. + pass + self._thread_will_exit = False + self._stopped = False + started: Any = False + try: + started = self._thread and self._thread.is_alive() + except ReferenceError: + # Thread terminated. + pass if not started: - self._task = asyncio.get_event_loop().create_task(self._run(), name=self._name) + thread = threading.Thread(target=self._run, name=self._name) + thread.daemon = True + self._thread = weakref.proxy(thread) + _register_executor(self) + # Mitigation to RuntimeError firing when thread starts on shutdown + # https://github.com/python/cpython/issues/114570 + try: + thread.start() + except RuntimeError as e: + if "interpreter shutdown" in str(e) or sys.is_finalizing(): + self._thread = None + return + raise def close(self, dummy: Any = None) -> None: """Stop. To restart, call open(). @@ -214,14 +207,11 @@ def close(self, dummy: Any = None) -> None: self._stopped = True def join(self, timeout: Optional[int] = None) -> None: - if self._task is not None: + if self._thread is not None: try: - asyncio.wait_for(self._task, timeout=timeout) - except asyncio.TimeoutError: - # Task timed out - pass - except asyncio.exceptions.CancelledError: - # Task was already finished, or not yet started. + self._thread.join(timeout) + except (ReferenceError, RuntimeError): + # Thread already terminated, or not yet started. pass def wake(self) -> None: @@ -234,14 +224,24 @@ def update_interval(self, new_interval: int) -> None: def skip_sleep(self) -> None: self._skip_sleep = True + def _should_stop(self) -> bool: + with self._lock: + if self._stopped: + self._thread_will_exit = True + return True + return False + def _run(self) -> None: - while not self._stopped: + while not self._should_stop(): try: if not self._target(): self._stopped = True break except BaseException: - self._stopped = True + with self._lock: + self._stopped = True + self._thread_will_exit = True + raise if self._skip_sleep: diff --git a/pymongo/synchronous/topology.py b/pymongo/synchronous/topology.py index dfcf0e4211..1cc56cb160 100644 --- a/pymongo/synchronous/topology.py +++ b/pymongo/synchronous/topology.py @@ -40,6 +40,7 @@ WriteError, ) from pymongo.hello import Hello +from pymongo.lock import _create_lock from pymongo.logger import ( _SDAM_LOGGER, _SERVER_SELECTION_LOGGER, @@ -170,7 +171,7 @@ def __init__(self, topology_settings: TopologySettings): self._seed_addresses = list(topology_description.server_descriptions()) self._opened = False self._closed = False - self._lock = asyncio.Lock() + self._lock = _create_lock() self._condition = self._settings.condition_class(self._lock) # type: ignore[arg-type] self._servers: dict[_Address, Server] = {} self._pid: Optional[int] = None diff --git a/test/asynchronous/test_client.py b/test/asynchronous/test_client.py index d14f3282ce..3f72eddaad 100644 --- a/test/asynchronous/test_client.py +++ b/test/asynchronous/test_client.py @@ -73,7 +73,6 @@ is_greenthread_patched, lazy_client_trial, one, - async_wait_until, ) import bson @@ -942,7 +941,9 @@ async def test_repr(self): self.assertEqual(client_two, client) async def test_getters(self): - await async_wait_until(lambda: async_client_context.nodes == self.client.nodes, "find all nodes") + await async_wait_until( + lambda: async_client_context.nodes == self.client.nodes, "find all nodes" + ) async def test_list_databases(self): cmd_docs = (await self.client.admin.command("listDatabases"))["databases"] diff --git a/test/test_client.py b/test/test_client.py index 936c38b8c6..23241f9616 100644 --- a/test/test_client.py +++ b/test/test_client.py @@ -1041,14 +1041,21 @@ def test_uri_connect_option(self): self.assertFalse(client._topology._opened) # Ensure kill cursors thread has not been started. - kc_thread = client._kill_cursors_executor._thread - self.assertFalse(kc_thread and kc_thread.is_alive()) - + if _IS_SYNC: + kc_thread = client._kill_cursors_executor._thread + self.assertFalse(kc_thread and kc_thread.is_alive()) + else: + kc_task = client._kill_cursors_executor._task + self.assertFalse(kc_task and not kc_task.done()) # Using the client should open topology and start the thread. client.admin.command("ping") self.assertTrue(client._topology._opened) - kc_thread = client._kill_cursors_executor._thread - self.assertTrue(kc_thread and kc_thread.is_alive()) + if _IS_SYNC: + kc_thread = client._kill_cursors_executor._thread + self.assertTrue(kc_thread and kc_thread.is_alive()) + else: + kc_task = client._kill_cursors_executor._task + self.assertTrue(kc_task and not kc_task.done()) def test_close_does_not_open_servers(self): client = self.rs_client(connect=False) From 65301cfc3ed9e823fa952e96441cb0ee3d636a2f Mon Sep 17 00:00:00 2001 From: Iris Date: Mon, 7 Oct 2024 08:35:45 -0700 Subject: [PATCH 04/25] found some more lock changes to undo and remove atexit for async --- pymongo/asynchronous/monitor.py | 15 ++++----------- pymongo/asynchronous/periodic_executor.py | 4 ++-- pymongo/asynchronous/settings.py | 5 ++--- pymongo/asynchronous/topology.py | 7 ++++--- pymongo/synchronous/monitor.py | 11 ++--------- pymongo/synchronous/settings.py | 5 ++--- pymongo/synchronous/topology.py | 7 ++++--- 7 files changed, 20 insertions(+), 34 deletions(-) diff --git a/pymongo/asynchronous/monitor.py b/pymongo/asynchronous/monitor.py index a61c98cdba..7cd3e81224 100644 --- a/pymongo/asynchronous/monitor.py +++ b/pymongo/asynchronous/monitor.py @@ -16,7 +16,6 @@ from __future__ import annotations -import asyncio import atexit import logging import time @@ -522,21 +521,15 @@ def _shutdown_monitors() -> None: monitor = None -async def _shutdown_resources() -> None: +def _shutdown_resources() -> None: # _shutdown_monitors/_shutdown_executors may already be GC'd at shutdown. shutdown = _shutdown_monitors if shutdown: # type:ignore[truthy-function] shutdown() shutdown = _shutdown_executors if shutdown: # type:ignore[truthy-function] - await shutdown() - - -def _run_shutdown_resources(): - if _IS_SYNC: - _shutdown_resources() - else: - asyncio.run(_shutdown_resources()) + shutdown() -atexit.register(_run_shutdown_resources) +if _IS_SYNC: + atexit.register(_shutdown_resources) diff --git a/pymongo/asynchronous/periodic_executor.py b/pymongo/asynchronous/periodic_executor.py index 122e2ea643..853698fae6 100644 --- a/pymongo/asynchronous/periodic_executor.py +++ b/pymongo/asynchronous/periodic_executor.py @@ -23,7 +23,7 @@ import weakref from typing import Any, Optional -from pymongo.lock import _create_lock +from pymongo.lock import _ALock, _create_lock _IS_SYNC = False @@ -152,7 +152,7 @@ def __init__( self._name = name self._skip_sleep = False self._thread_will_exit = False - self._lock = _create_lock() + self._lock = _ALock(_create_lock()) def __repr__(self) -> str: return f"<{self.__class__.__name__}(name={self._name}) object at 0x{id(self):x}>" diff --git a/pymongo/asynchronous/settings.py b/pymongo/asynchronous/settings.py index ec24587d9a..1103e1bd18 100644 --- a/pymongo/asynchronous/settings.py +++ b/pymongo/asynchronous/settings.py @@ -15,7 +15,6 @@ """Represent MongoClient's configuration.""" from __future__ import annotations -import asyncio import threading import traceback from typing import Any, Collection, Optional, Type, Union @@ -68,7 +67,7 @@ def __init__( self._pool_class: Type[Pool] = pool_class or pool.Pool self._pool_options: PoolOptions = pool_options or PoolOptions() self._monitor_class: Type[monitor.Monitor] = monitor_class or monitor.Monitor - self._condition_class: Type[asyncio.Condition] = condition_class or asyncio.Condition + self._condition_class: Type[threading.Condition] = condition_class or threading.Condition self._local_threshold_ms = local_threshold_ms self._server_selection_timeout = server_selection_timeout self._server_selector = server_selector @@ -107,7 +106,7 @@ def monitor_class(self) -> Type[monitor.Monitor]: return self._monitor_class @property - def condition_class(self) -> Type[asyncio.Condition]: + def condition_class(self) -> Type[threading.Condition]: return self._condition_class @property diff --git a/pymongo/asynchronous/topology.py b/pymongo/asynchronous/topology.py index 334ac166cd..31a87e9cca 100644 --- a/pymongo/asynchronous/topology.py +++ b/pymongo/asynchronous/topology.py @@ -45,7 +45,7 @@ WriteError, ) from pymongo.hello import Hello -from pymongo.lock import _create_lock +from pymongo.lock import _ACondition, _ALock, _create_lock from pymongo.logger import ( _SDAM_LOGGER, _SERVER_SELECTION_LOGGER, @@ -171,8 +171,9 @@ def __init__(self, topology_settings: TopologySettings): self._seed_addresses = list(topology_description.server_descriptions()) self._opened = False self._closed = False - self._lock = _create_lock() - self._condition = self._settings.condition_class(self._lock) # type: ignore[arg-type] + _lock = _create_lock() + self._lock = _ALock(_lock) + self._condition = _ACondition(self._settings.condition_class(_lock)) # type: ignore[arg-type] self._servers: dict[_Address, Server] = {} self._pid: Optional[int] = None self._max_cluster_time: Optional[ClusterTime] = None diff --git a/pymongo/synchronous/monitor.py b/pymongo/synchronous/monitor.py index dd5a051c16..7f4539d756 100644 --- a/pymongo/synchronous/monitor.py +++ b/pymongo/synchronous/monitor.py @@ -16,7 +16,6 @@ from __future__ import annotations -import asyncio import atexit import logging import time @@ -532,11 +531,5 @@ def _shutdown_resources() -> None: shutdown() -def _run_shutdown_resources(): - if _IS_SYNC: - _shutdown_resources() - else: - asyncio.run(_shutdown_resources()) - - -atexit.register(_run_shutdown_resources) +if _IS_SYNC: + atexit.register(_shutdown_resources) diff --git a/pymongo/synchronous/settings.py b/pymongo/synchronous/settings.py index 24ef533410..040776713f 100644 --- a/pymongo/synchronous/settings.py +++ b/pymongo/synchronous/settings.py @@ -15,7 +15,6 @@ """Represent MongoClient's configuration.""" from __future__ import annotations -import asyncio import threading import traceback from typing import Any, Collection, Optional, Type, Union @@ -68,7 +67,7 @@ def __init__( self._pool_class: Type[Pool] = pool_class or pool.Pool self._pool_options: PoolOptions = pool_options or PoolOptions() self._monitor_class: Type[monitor.Monitor] = monitor_class or monitor.Monitor - self._condition_class: Type[asyncio.Condition] = condition_class or asyncio.Condition + self._condition_class: Type[threading.Condition] = condition_class or threading.Condition self._local_threshold_ms = local_threshold_ms self._server_selection_timeout = server_selection_timeout self._server_selector = server_selector @@ -107,7 +106,7 @@ def monitor_class(self) -> Type[monitor.Monitor]: return self._monitor_class @property - def condition_class(self) -> Type[asyncio.Condition]: + def condition_class(self) -> Type[threading.Condition]: return self._condition_class @property diff --git a/pymongo/synchronous/topology.py b/pymongo/synchronous/topology.py index 1cc56cb160..64a4c7506f 100644 --- a/pymongo/synchronous/topology.py +++ b/pymongo/synchronous/topology.py @@ -40,7 +40,7 @@ WriteError, ) from pymongo.hello import Hello -from pymongo.lock import _create_lock +from pymongo.lock import _create_lock, _Lock from pymongo.logger import ( _SDAM_LOGGER, _SERVER_SELECTION_LOGGER, @@ -171,8 +171,9 @@ def __init__(self, topology_settings: TopologySettings): self._seed_addresses = list(topology_description.server_descriptions()) self._opened = False self._closed = False - self._lock = _create_lock() - self._condition = self._settings.condition_class(self._lock) # type: ignore[arg-type] + _lock = _create_lock() + self._lock = _Lock(_lock) + self._condition = self._settings.condition_class(_lock) # type: ignore[arg-type] self._servers: dict[_Address, Server] = {} self._pid: Optional[int] = None self._max_cluster_time: Optional[ClusterTime] = None From 8852bc9d99f1dd222b463d18be5e37bed4190b2c Mon Sep 17 00:00:00 2001 From: Iris Date: Mon, 7 Oct 2024 09:27:17 -0700 Subject: [PATCH 05/25] hacky fix to PeriodicExecutor being defined twice and fix unwanted coroutine in sync verison --- pymongo/asynchronous/topology.py | 22 ++++++++++++++-------- pymongo/synchronous/periodic_executor.py | 2 +- pymongo/synchronous/topology.py | 22 ++++++++++++++-------- tools/synchro.py | 2 +- 4 files changed, 30 insertions(+), 18 deletions(-) diff --git a/pymongo/asynchronous/topology.py b/pymongo/asynchronous/topology.py index 31a87e9cca..f32063015d 100644 --- a/pymongo/asynchronous/topology.py +++ b/pymongo/asynchronous/topology.py @@ -355,10 +355,13 @@ async def _select_servers_loop( # change, or for a timeout. We won't miss any changes that # came after our most recent apply_selector call, since we've # held the lock until now. - try: - await asyncio.wait_for(self._condition.wait(), common.MIN_HEARTBEAT_INTERVAL) - except asyncio.TimeoutError: - pass + if _IS_SYNC: + self._condition.wait(common.MIN_HEARTBEAT_INTERVAL) + else: + try: + await asyncio.wait_for(self._condition.wait(), common.MIN_HEARTBEAT_INTERVAL) + except asyncio.TimeoutError: + pass self._description.check_compatible() now = time.monotonic() server_descriptions = self._description.apply_selector( @@ -658,10 +661,13 @@ async def request_check_all(self, wait_time: int = 5) -> None: """Wake all monitors, wait for at least one to check its server.""" async with self._lock: self._request_check_all() - try: - await asyncio.wait_for(self._condition.wait(), wait_time) - except TimeoutError: - pass + if _IS_SYNC: + self._condition.wait(wait_time) + else: + try: + await asyncio.wait_for(self._condition.wait(), wait_time) + except TimeoutError: + pass def data_bearing_servers(self) -> list[ServerDescription]: """Return a list of all data-bearing servers. diff --git a/pymongo/synchronous/periodic_executor.py b/pymongo/synchronous/periodic_executor.py index bff6972a35..322229840e 100644 --- a/pymongo/synchronous/periodic_executor.py +++ b/pymongo/synchronous/periodic_executor.py @@ -28,7 +28,7 @@ _IS_SYNC = True -class PeriodicExecutor: +class SyncPeriodicExecutor: def __init__( self, interval: float, diff --git a/pymongo/synchronous/topology.py b/pymongo/synchronous/topology.py index 64a4c7506f..9ac8c779e0 100644 --- a/pymongo/synchronous/topology.py +++ b/pymongo/synchronous/topology.py @@ -355,10 +355,13 @@ def _select_servers_loop( # change, or for a timeout. We won't miss any changes that # came after our most recent apply_selector call, since we've # held the lock until now. - try: - asyncio.wait_for(self._condition.wait(), common.MIN_HEARTBEAT_INTERVAL) - except asyncio.TimeoutError: - pass + if _IS_SYNC: + self._condition.wait(common.MIN_HEARTBEAT_INTERVAL) + else: + try: + asyncio.wait_for(self._condition.wait(), common.MIN_HEARTBEAT_INTERVAL) + except asyncio.TimeoutError: + pass self._description.check_compatible() now = time.monotonic() server_descriptions = self._description.apply_selector( @@ -656,10 +659,13 @@ def request_check_all(self, wait_time: int = 5) -> None: """Wake all monitors, wait for at least one to check its server.""" with self._lock: self._request_check_all() - try: - asyncio.wait_for(self._condition.wait(), wait_time) - except TimeoutError: - pass + if _IS_SYNC: + self._condition.wait(wait_time) + else: + try: + asyncio.wait_for(self._condition.wait(), wait_time) + except TimeoutError: + pass def data_bearing_servers(self) -> list[ServerDescription]: """Return a list of all data-bearing servers. diff --git a/tools/synchro.py b/tools/synchro.py index f0a9e91d1b..bc4ae125d3 100644 --- a/tools/synchro.py +++ b/tools/synchro.py @@ -108,7 +108,7 @@ "async_set_fail_point": "set_fail_point", "async_ensure_all_connected": "ensure_all_connected", "async_repl_set_step_down": "repl_set_step_down", - "AsyncPeriodicExecutor": "PeriodicExecutor", + "AsyncPeriodicExecutor(": "PeriodicExecutor(", } docstring_replacements: dict[tuple[str, str], str] = { From 5c9b1d9f6fbd915dec3e978f08bba6b5efebebf6 Mon Sep 17 00:00:00 2001 From: Iris Date: Mon, 7 Oct 2024 09:43:36 -0700 Subject: [PATCH 06/25] add missing awaits --- pymongo/asynchronous/topology.py | 4 ++-- test/asynchronous/test_cursor.py | 2 +- 2 files changed, 3 insertions(+), 3 deletions(-) diff --git a/pymongo/asynchronous/topology.py b/pymongo/asynchronous/topology.py index f32063015d..dd9ca9ec20 100644 --- a/pymongo/asynchronous/topology.py +++ b/pymongo/asynchronous/topology.py @@ -356,7 +356,7 @@ async def _select_servers_loop( # came after our most recent apply_selector call, since we've # held the lock until now. if _IS_SYNC: - self._condition.wait(common.MIN_HEARTBEAT_INTERVAL) + await self._condition.wait(common.MIN_HEARTBEAT_INTERVAL) else: try: await asyncio.wait_for(self._condition.wait(), common.MIN_HEARTBEAT_INTERVAL) @@ -662,7 +662,7 @@ async def request_check_all(self, wait_time: int = 5) -> None: async with self._lock: self._request_check_all() if _IS_SYNC: - self._condition.wait(wait_time) + await self._condition.wait(wait_time) else: try: await asyncio.wait_for(self._condition.wait(), wait_time) diff --git a/test/asynchronous/test_cursor.py b/test/asynchronous/test_cursor.py index 5d640ba305..f7b795cdae 100644 --- a/test/asynchronous/test_cursor.py +++ b/test/asynchronous/test_cursor.py @@ -1345,7 +1345,7 @@ async def assertCursorKilled(): with self.assertRaises(ExecutionTimeout): await cursor.next() - assertCursorKilled() + await assertCursorKilled() def test_delete_not_initialized(self): # Creating a cursor with invalid arguments will not run __init__ From ce322f52bbec24aa76ebdd62dc06273cb570343d Mon Sep 17 00:00:00 2001 From: Iris Date: Mon, 7 Oct 2024 11:38:34 -0700 Subject: [PATCH 07/25] attempt to fix test --- test/asynchronous/test_client.py | 2 +- test/utils.py | 6 ++++++ tools/synchro.py | 1 + 3 files changed, 8 insertions(+), 1 deletion(-) diff --git a/test/asynchronous/test_client.py b/test/asynchronous/test_client.py index 3f72eddaad..50a8c3acc8 100644 --- a/test/asynchronous/test_client.py +++ b/test/asynchronous/test_client.py @@ -2542,7 +2542,7 @@ async def test_rs_client_does_not_maintain_pool_to_arbiters(self): self.assertEqual(await c.address, ("a", 1)) self.assertEqual(await c.arbiters, {("c", 3)}) # Assert that we create 2 and only 2 pooled connections. - listener.wait_for_event(monitoring.ConnectionReadyEvent, 2) + listener.async_wait_for_event(monitoring.ConnectionReadyEvent, 2) self.assertEqual(listener.event_count(monitoring.ConnectionCreatedEvent), 2) # Assert that we do not create connections to arbiters. arbiter = c._topology.get_server_by_address(("c", 3)) diff --git a/test/utils.py b/test/utils.py index 4d7dc58378..174b1708ba 100644 --- a/test/utils.py +++ b/test/utils.py @@ -98,6 +98,12 @@ def wait_for_event(self, event, count): """Wait for a number of events to be published, or fail.""" wait_until(lambda: self.event_count(event) >= count, f"find {count} {event} event(s)") + async def async_wait_for_event(self, event, count): + """Wait for a number of events to be published, or fail.""" + await async_wait_until( + lambda: self.event_count(event) >= count, f"find {count} {event} event(s)" + ) + class CMAPListener(BaseListener, monitoring.ConnectionPoolListener): def connection_created(self, event): diff --git a/tools/synchro.py b/tools/synchro.py index bc4ae125d3..4793f525b7 100644 --- a/tools/synchro.py +++ b/tools/synchro.py @@ -109,6 +109,7 @@ "async_ensure_all_connected": "ensure_all_connected", "async_repl_set_step_down": "repl_set_step_down", "AsyncPeriodicExecutor(": "PeriodicExecutor(", + "async_wait_for_event": "wait_for_event", } docstring_replacements: dict[tuple[str, str], str] = { From 549645b644dc7b8b30223c638b0c37849961fc64 Mon Sep 17 00:00:00 2001 From: Iris Date: Mon, 7 Oct 2024 11:43:52 -0700 Subject: [PATCH 08/25] left out an await --- pymongo/asynchronous/topology.py | 2 +- pymongo/synchronous/topology.py | 2 +- test/asynchronous/test_client.py | 2 +- 3 files changed, 3 insertions(+), 3 deletions(-) diff --git a/pymongo/asynchronous/topology.py b/pymongo/asynchronous/topology.py index dd9ca9ec20..95864387cf 100644 --- a/pymongo/asynchronous/topology.py +++ b/pymongo/asynchronous/topology.py @@ -173,7 +173,7 @@ def __init__(self, topology_settings: TopologySettings): self._closed = False _lock = _create_lock() self._lock = _ALock(_lock) - self._condition = _ACondition(self._settings.condition_class(_lock)) # type: ignore[arg-type] + self._condition = _ACondition(self._settings.condition_class(_lock)) self._servers: dict[_Address, Server] = {} self._pid: Optional[int] = None self._max_cluster_time: Optional[ClusterTime] = None diff --git a/pymongo/synchronous/topology.py b/pymongo/synchronous/topology.py index 9ac8c779e0..ac100922ec 100644 --- a/pymongo/synchronous/topology.py +++ b/pymongo/synchronous/topology.py @@ -173,7 +173,7 @@ def __init__(self, topology_settings: TopologySettings): self._closed = False _lock = _create_lock() self._lock = _Lock(_lock) - self._condition = self._settings.condition_class(_lock) # type: ignore[arg-type] + self._condition = self._settings.condition_class(_lock) self._servers: dict[_Address, Server] = {} self._pid: Optional[int] = None self._max_cluster_time: Optional[ClusterTime] = None diff --git a/test/asynchronous/test_client.py b/test/asynchronous/test_client.py index 50a8c3acc8..3f515200cf 100644 --- a/test/asynchronous/test_client.py +++ b/test/asynchronous/test_client.py @@ -2542,7 +2542,7 @@ async def test_rs_client_does_not_maintain_pool_to_arbiters(self): self.assertEqual(await c.address, ("a", 1)) self.assertEqual(await c.arbiters, {("c", 3)}) # Assert that we create 2 and only 2 pooled connections. - listener.async_wait_for_event(monitoring.ConnectionReadyEvent, 2) + await listener.async_wait_for_event(monitoring.ConnectionReadyEvent, 2) self.assertEqual(listener.event_count(monitoring.ConnectionCreatedEvent), 2) # Assert that we do not create connections to arbiters. arbiter = c._topology.get_server_by_address(("c", 3)) From c92f6afbd3ef80f23976fc9a522332e4c96f8beb Mon Sep 17 00:00:00 2001 From: Iris Date: Tue, 8 Oct 2024 10:35:17 -0700 Subject: [PATCH 09/25] add missing await and fix typing errors --- pymongo/asynchronous/monitor.py | 2 +- pymongo/asynchronous/periodic_executor.py | 6 +++--- pymongo/asynchronous/topology.py | 10 +++++++--- pymongo/synchronous/periodic_executor.py | 2 +- pymongo/synchronous/topology.py | 8 +++++--- 5 files changed, 17 insertions(+), 11 deletions(-) diff --git a/pymongo/asynchronous/monitor.py b/pymongo/asynchronous/monitor.py index 7cd3e81224..8855cfb24d 100644 --- a/pymongo/asynchronous/monitor.py +++ b/pymongo/asynchronous/monitor.py @@ -250,7 +250,7 @@ async def _check_server(self) -> ServerDescription: except (OperationFailure, NotPrimaryError) as exc: # Update max cluster time even when hello fails. details = cast(Mapping[str, Any], exc.details) - self._topology.receive_cluster_time(details.get("$clusterTime")) + await self._topology.receive_cluster_time(details.get("$clusterTime")) raise except ReferenceError: raise diff --git a/pymongo/asynchronous/periodic_executor.py b/pymongo/asynchronous/periodic_executor.py index 853698fae6..57df4d82c0 100644 --- a/pymongo/asynchronous/periodic_executor.py +++ b/pymongo/asynchronous/periodic_executor.py @@ -81,7 +81,7 @@ def close(self, dummy: Any = None) -> None: async def join(self, timeout: Optional[int] = None) -> None: if self._task is not None: try: - await asyncio.wait_for(self._task, timeout=timeout) + await asyncio.wait_for(self._task, timeout=timeout) # type-ignore: [arg-type] except asyncio.TimeoutError: # Task timed out pass @@ -274,7 +274,7 @@ def _on_executor_deleted(ref: weakref.ReferenceType[PeriodicExecutor]) -> None: _EXECUTORS.remove(ref) -async def _shutdown_executors() -> None: +def _shutdown_executors() -> None: if _EXECUTORS is None: return @@ -291,6 +291,6 @@ async def _shutdown_executors() -> None: for ref in executors: executor = ref() if executor: - await executor.join(1) + executor.join(1) executor = None diff --git a/pymongo/asynchronous/topology.py b/pymongo/asynchronous/topology.py index 95864387cf..7b7fecaf7a 100644 --- a/pymongo/asynchronous/topology.py +++ b/pymongo/asynchronous/topology.py @@ -359,7 +359,9 @@ async def _select_servers_loop( await self._condition.wait(common.MIN_HEARTBEAT_INTERVAL) else: try: - await asyncio.wait_for(self._condition.wait(), common.MIN_HEARTBEAT_INTERVAL) + await asyncio.wait_for( + self._condition.wait(), common.MIN_HEARTBEAT_INTERVAL + ) # type-ignore: [arg-type] except asyncio.TimeoutError: pass self._description.check_compatible() @@ -665,8 +667,10 @@ async def request_check_all(self, wait_time: int = 5) -> None: await self._condition.wait(wait_time) else: try: - await asyncio.wait_for(self._condition.wait(), wait_time) - except TimeoutError: + await asyncio.wait_for( + self._condition.wait(), wait_time + ) # type-ignore: [arg-type] + except asyncio.TimeoutError: pass def data_bearing_servers(self) -> list[ServerDescription]: diff --git a/pymongo/synchronous/periodic_executor.py b/pymongo/synchronous/periodic_executor.py index 322229840e..b136f49f88 100644 --- a/pymongo/synchronous/periodic_executor.py +++ b/pymongo/synchronous/periodic_executor.py @@ -81,7 +81,7 @@ def close(self, dummy: Any = None) -> None: def join(self, timeout: Optional[int] = None) -> None: if self._task is not None: try: - asyncio.wait_for(self._task, timeout=timeout) + asyncio.wait_for(self._task, timeout=timeout) # type-ignore: [arg-type] except asyncio.TimeoutError: # Task timed out pass diff --git a/pymongo/synchronous/topology.py b/pymongo/synchronous/topology.py index ac100922ec..0d5a5f6fbc 100644 --- a/pymongo/synchronous/topology.py +++ b/pymongo/synchronous/topology.py @@ -359,7 +359,9 @@ def _select_servers_loop( self._condition.wait(common.MIN_HEARTBEAT_INTERVAL) else: try: - asyncio.wait_for(self._condition.wait(), common.MIN_HEARTBEAT_INTERVAL) + asyncio.wait_for( + self._condition.wait(), common.MIN_HEARTBEAT_INTERVAL + ) # type-ignore: [arg-type] except asyncio.TimeoutError: pass self._description.check_compatible() @@ -663,8 +665,8 @@ def request_check_all(self, wait_time: int = 5) -> None: self._condition.wait(wait_time) else: try: - asyncio.wait_for(self._condition.wait(), wait_time) - except TimeoutError: + asyncio.wait_for(self._condition.wait(), wait_time) # type-ignore: [arg-type] + except asyncio.TimeoutError: pass def data_bearing_servers(self) -> list[ServerDescription]: From c58448fa22ef18c5f9cf78683b06d1b445b66dbb Mon Sep 17 00:00:00 2001 From: Iris Date: Tue, 8 Oct 2024 10:46:38 -0700 Subject: [PATCH 10/25] realizing condition needs to be reverted too --- pymongo/asynchronous/topology.py | 29 ++++++++++------------------- pymongo/synchronous/topology.py | 27 ++++++++++----------------- 2 files changed, 20 insertions(+), 36 deletions(-) diff --git a/pymongo/asynchronous/topology.py b/pymongo/asynchronous/topology.py index 7b7fecaf7a..e8a90146ef 100644 --- a/pymongo/asynchronous/topology.py +++ b/pymongo/asynchronous/topology.py @@ -16,7 +16,6 @@ from __future__ import annotations -import asyncio import logging import os import queue @@ -355,15 +354,7 @@ async def _select_servers_loop( # change, or for a timeout. We won't miss any changes that # came after our most recent apply_selector call, since we've # held the lock until now. - if _IS_SYNC: - await self._condition.wait(common.MIN_HEARTBEAT_INTERVAL) - else: - try: - await asyncio.wait_for( - self._condition.wait(), common.MIN_HEARTBEAT_INTERVAL - ) # type-ignore: [arg-type] - except asyncio.TimeoutError: - pass + await self._condition.wait(common.MIN_HEARTBEAT_INTERVAL) self._description.check_compatible() now = time.monotonic() server_descriptions = self._description.apply_selector( @@ -663,15 +654,15 @@ async def request_check_all(self, wait_time: int = 5) -> None: """Wake all monitors, wait for at least one to check its server.""" async with self._lock: self._request_check_all() - if _IS_SYNC: - await self._condition.wait(wait_time) - else: - try: - await asyncio.wait_for( - self._condition.wait(), wait_time - ) # type-ignore: [arg-type] - except asyncio.TimeoutError: - pass + # if _IS_SYNC: + await self._condition.wait(wait_time) + # else: + # try: + # await asyncio.wait_for( + # self._condition.wait(), wait_time + # ) # type-ignore: [arg-type] + # except asyncio.TimeoutError: + # pass def data_bearing_servers(self) -> list[ServerDescription]: """Return a list of all data-bearing servers. diff --git a/pymongo/synchronous/topology.py b/pymongo/synchronous/topology.py index 0d5a5f6fbc..0cee2ac0c9 100644 --- a/pymongo/synchronous/topology.py +++ b/pymongo/synchronous/topology.py @@ -16,7 +16,6 @@ from __future__ import annotations -import asyncio import logging import os import queue @@ -355,15 +354,7 @@ def _select_servers_loop( # change, or for a timeout. We won't miss any changes that # came after our most recent apply_selector call, since we've # held the lock until now. - if _IS_SYNC: - self._condition.wait(common.MIN_HEARTBEAT_INTERVAL) - else: - try: - asyncio.wait_for( - self._condition.wait(), common.MIN_HEARTBEAT_INTERVAL - ) # type-ignore: [arg-type] - except asyncio.TimeoutError: - pass + self._condition.wait(common.MIN_HEARTBEAT_INTERVAL) self._description.check_compatible() now = time.monotonic() server_descriptions = self._description.apply_selector( @@ -661,13 +652,15 @@ def request_check_all(self, wait_time: int = 5) -> None: """Wake all monitors, wait for at least one to check its server.""" with self._lock: self._request_check_all() - if _IS_SYNC: - self._condition.wait(wait_time) - else: - try: - asyncio.wait_for(self._condition.wait(), wait_time) # type-ignore: [arg-type] - except asyncio.TimeoutError: - pass + # if _IS_SYNC: + self._condition.wait(wait_time) + # else: + # try: + # asyncio.wait_for( + # self._condition.wait(), wait_time + # ) # type-ignore: [arg-type] + # except asyncio.TimeoutError: + # pass def data_bearing_servers(self) -> list[ServerDescription]: """Return a list of all data-bearing servers. From 43660d9cdc4f67adca788863002ec104c93b16d9 Mon Sep 17 00:00:00 2001 From: Iris Date: Tue, 8 Oct 2024 19:56:45 -0700 Subject: [PATCH 11/25] reset client context after primary steps down --- test/__init__.py | 6 ++++++ test/asynchronous/__init__.py | 6 ++++++ .../test_connections_survive_primary_stepdown_spec.py | 8 +++++++- test/asynchronous/test_monitoring.py | 1 + test/test_connections_survive_primary_stepdown_spec.py | 8 +++++++- test/test_monitoring.py | 1 + 6 files changed, 28 insertions(+), 2 deletions(-) diff --git a/test/__init__.py b/test/__init__.py index af12bc032a..ce3f92b592 100644 --- a/test/__init__.py +++ b/test/__init__.py @@ -859,6 +859,12 @@ def max_message_size_bytes(self): client_context = ClientContext() +def recreate_client_context(): + global client_context + client_context = ClientContext() + client_context.init() + + class PyMongoTestCase(unittest.TestCase): def assertEqualCommand(self, expected, actual, msg=None): self.assertEqual(sanitize_cmd(expected), sanitize_cmd(actual), msg) diff --git a/test/asynchronous/__init__.py b/test/asynchronous/__init__.py index 2a44785b2f..e63ac0d7d7 100644 --- a/test/asynchronous/__init__.py +++ b/test/asynchronous/__init__.py @@ -861,6 +861,12 @@ async def max_message_size_bytes(self): async_client_context = AsyncClientContext() +async def recreate_client_context(): + global async_client_context + async_client_context = AsyncClientContext() + await async_client_context.init() + + class AsyncPyMongoTestCase(unittest.IsolatedAsyncioTestCase): def assertEqualCommand(self, expected, actual, msg=None): self.assertEqual(sanitize_cmd(expected), sanitize_cmd(actual), msg) diff --git a/test/asynchronous/test_connections_survive_primary_stepdown_spec.py b/test/asynchronous/test_connections_survive_primary_stepdown_spec.py index 289cf49751..b26a080e3a 100644 --- a/test/asynchronous/test_connections_survive_primary_stepdown_spec.py +++ b/test/asynchronous/test_connections_survive_primary_stepdown_spec.py @@ -19,7 +19,12 @@ sys.path[0:0] = [""] -from test.asynchronous import AsyncIntegrationTest, async_client_context, unittest +from test.asynchronous import ( + AsyncIntegrationTest, + async_client_context, + recreate_client_context, + unittest, +) from test.asynchronous.helpers import async_repl_set_step_down from test.utils import ( CMAPListener, @@ -60,6 +65,7 @@ async def _setup_class(cls): @classmethod async def _tearDown_class(cls): await cls.client.close() + await recreate_client_context() async def asyncSetUp(self): # Note that all ops use same write-concern as self.db (majority). diff --git a/test/asynchronous/test_monitoring.py b/test/asynchronous/test_monitoring.py index b5d8708dc3..4ee5f2c621 100644 --- a/test/asynchronous/test_monitoring.py +++ b/test/asynchronous/test_monitoring.py @@ -413,6 +413,7 @@ async def test_not_primary_error(self): try: await client.pymongo_test.test.find_one_and_delete({}) except NotPrimaryError as exc: + print("an error was raised") error = exc.errors started = self.listener.started_events[0] failed = self.listener.failed_events[0] diff --git a/test/test_connections_survive_primary_stepdown_spec.py b/test/test_connections_survive_primary_stepdown_spec.py index 54cc4e0482..2b9eb87be1 100644 --- a/test/test_connections_survive_primary_stepdown_spec.py +++ b/test/test_connections_survive_primary_stepdown_spec.py @@ -19,7 +19,12 @@ sys.path[0:0] = [""] -from test import IntegrationTest, client_context, unittest +from test import ( + IntegrationTest, + client_context, + recreate_client_context, + unittest, +) from test.helpers import repl_set_step_down from test.utils import ( CMAPListener, @@ -60,6 +65,7 @@ def _setup_class(cls): @classmethod def _tearDown_class(cls): cls.client.close() + recreate_client_context() def setUp(self): # Note that all ops use same write-concern as self.db (majority). diff --git a/test/test_monitoring.py b/test/test_monitoring.py index a0c520ed27..6d9661a7f2 100644 --- a/test/test_monitoring.py +++ b/test/test_monitoring.py @@ -413,6 +413,7 @@ def test_not_primary_error(self): try: client.pymongo_test.test.find_one_and_delete({}) except NotPrimaryError as exc: + print("an error was raised") error = exc.errors started = self.listener.started_events[0] failed = self.listener.failed_events[0] From dbb51e2721b03deec1ad4053586c9a6922b90579 Mon Sep 17 00:00:00 2001 From: Iris Date: Tue, 8 Oct 2024 20:00:29 -0700 Subject: [PATCH 12/25] ignore type error on sync --- pymongo/asynchronous/periodic_executor.py | 2 +- pymongo/synchronous/periodic_executor.py | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/pymongo/asynchronous/periodic_executor.py b/pymongo/asynchronous/periodic_executor.py index 57df4d82c0..d6ce617b83 100644 --- a/pymongo/asynchronous/periodic_executor.py +++ b/pymongo/asynchronous/periodic_executor.py @@ -68,7 +68,7 @@ def open(self) -> None: started = self._task and not self._task.done() if not started: - self._task = asyncio.get_event_loop().create_task(self._run(), name=self._name) + self._task = asyncio.get_event_loop().create_task(self._run(), name=self._name) # type: ignore[func-returns-value] def close(self, dummy: Any = None) -> None: """Stop. To restart, call open(). diff --git a/pymongo/synchronous/periodic_executor.py b/pymongo/synchronous/periodic_executor.py index b136f49f88..88fc40cc8b 100644 --- a/pymongo/synchronous/periodic_executor.py +++ b/pymongo/synchronous/periodic_executor.py @@ -68,7 +68,7 @@ def open(self) -> None: started = self._task and not self._task.done() if not started: - self._task = asyncio.get_event_loop().create_task(self._run(), name=self._name) + self._task = asyncio.get_event_loop().create_task(self._run(), name=self._name) # type: ignore[func-returns-value] def close(self, dummy: Any = None) -> None: """Stop. To restart, call open(). From 3c640e8be8791f40e85126344b2efb13aed41a62 Mon Sep 17 00:00:00 2001 From: Iris Date: Wed, 9 Oct 2024 07:55:10 -0700 Subject: [PATCH 13/25] temp changing pytests's setup and teardown are run to see if this will fix everything --- test/__init__.py | 5 ++++- test/asynchronous/__init__.py | 5 ++++- test/asynchronous/conftest.py | 2 +- .../test_connections_survive_primary_stepdown_spec.py | 2 +- test/asynchronous/test_monitoring.py | 1 - test/conftest.py | 2 +- test/test_connections_survive_primary_stepdown_spec.py | 2 +- test/test_monitoring.py | 1 - 8 files changed, 12 insertions(+), 8 deletions(-) diff --git a/test/__init__.py b/test/__init__.py index ce3f92b592..bdbf3eb352 100644 --- a/test/__init__.py +++ b/test/__init__.py @@ -861,8 +861,9 @@ def max_message_size_bytes(self): def recreate_client_context(): global client_context + teardown() client_context = ClientContext() - client_context.init() + setup() class PyMongoTestCase(unittest.TestCase): @@ -1233,6 +1234,8 @@ def tearDown(self): def setup(): + global client_context + client_context = ClientContext() client_context.init() warnings.resetwarnings() warnings.simplefilter("always") diff --git a/test/asynchronous/__init__.py b/test/asynchronous/__init__.py index e63ac0d7d7..5ad00fcb68 100644 --- a/test/asynchronous/__init__.py +++ b/test/asynchronous/__init__.py @@ -863,8 +863,9 @@ async def max_message_size_bytes(self): async def recreate_client_context(): global async_client_context + await async_teardown() async_client_context = AsyncClientContext() - await async_client_context.init() + await async_setup() class AsyncPyMongoTestCase(unittest.IsolatedAsyncioTestCase): @@ -1251,6 +1252,8 @@ def tearDown(self): async def async_setup(): + global async_client_context + async_client_context = AsyncClientContext() await async_client_context.init() warnings.resetwarnings() warnings.simplefilter("always") diff --git a/test/asynchronous/conftest.py b/test/asynchronous/conftest.py index c08f224abd..e520ea7d0c 100644 --- a/test/asynchronous/conftest.py +++ b/test/asynchronous/conftest.py @@ -24,7 +24,7 @@ def event_loop_policy(): return asyncio.get_event_loop_policy() -@pytest_asyncio.fixture(scope="session", autouse=True) +@pytest_asyncio.fixture(scope="module", autouse=True) async def test_setup_and_teardown(): await async_setup() yield diff --git a/test/asynchronous/test_connections_survive_primary_stepdown_spec.py b/test/asynchronous/test_connections_survive_primary_stepdown_spec.py index b26a080e3a..5b3ef1fae3 100644 --- a/test/asynchronous/test_connections_survive_primary_stepdown_spec.py +++ b/test/asynchronous/test_connections_survive_primary_stepdown_spec.py @@ -65,7 +65,7 @@ async def _setup_class(cls): @classmethod async def _tearDown_class(cls): await cls.client.close() - await recreate_client_context() + # await recreate_client_context() async def asyncSetUp(self): # Note that all ops use same write-concern as self.db (majority). diff --git a/test/asynchronous/test_monitoring.py b/test/asynchronous/test_monitoring.py index 4ee5f2c621..b5d8708dc3 100644 --- a/test/asynchronous/test_monitoring.py +++ b/test/asynchronous/test_monitoring.py @@ -413,7 +413,6 @@ async def test_not_primary_error(self): try: await client.pymongo_test.test.find_one_and_delete({}) except NotPrimaryError as exc: - print("an error was raised") error = exc.errors started = self.listener.started_events[0] failed = self.listener.failed_events[0] diff --git a/test/conftest.py b/test/conftest.py index ca817a5a62..a0548aed0c 100644 --- a/test/conftest.py +++ b/test/conftest.py @@ -22,7 +22,7 @@ def event_loop_policy(): return asyncio.get_event_loop_policy() -@pytest.fixture(scope="session", autouse=True) +@pytest.fixture(scope="module", autouse=True) def test_setup_and_teardown(): setup() yield diff --git a/test/test_connections_survive_primary_stepdown_spec.py b/test/test_connections_survive_primary_stepdown_spec.py index 2b9eb87be1..1943336484 100644 --- a/test/test_connections_survive_primary_stepdown_spec.py +++ b/test/test_connections_survive_primary_stepdown_spec.py @@ -65,7 +65,7 @@ def _setup_class(cls): @classmethod def _tearDown_class(cls): cls.client.close() - recreate_client_context() + # recreate_client_context() def setUp(self): # Note that all ops use same write-concern as self.db (majority). diff --git a/test/test_monitoring.py b/test/test_monitoring.py index 6d9661a7f2..a0c520ed27 100644 --- a/test/test_monitoring.py +++ b/test/test_monitoring.py @@ -413,7 +413,6 @@ def test_not_primary_error(self): try: client.pymongo_test.test.find_one_and_delete({}) except NotPrimaryError as exc: - print("an error was raised") error = exc.errors started = self.listener.started_events[0] failed = self.listener.failed_events[0] From 0fc60ba326a6e6bd49615ff42d76c4d27d75c9c3 Mon Sep 17 00:00:00 2001 From: Iris Date: Wed, 9 Oct 2024 09:40:23 -0700 Subject: [PATCH 14/25] only reset client context after primary_stepdown --- test/__init__.py | 2 -- test/asynchronous/__init__.py | 2 -- test/asynchronous/conftest.py | 2 +- .../test_connections_survive_primary_stepdown_spec.py | 2 +- test/conftest.py | 2 +- test/test_connections_survive_primary_stepdown_spec.py | 2 +- 6 files changed, 4 insertions(+), 8 deletions(-) diff --git a/test/__init__.py b/test/__init__.py index bdbf3eb352..7f0cc49700 100644 --- a/test/__init__.py +++ b/test/__init__.py @@ -1234,8 +1234,6 @@ def tearDown(self): def setup(): - global client_context - client_context = ClientContext() client_context.init() warnings.resetwarnings() warnings.simplefilter("always") diff --git a/test/asynchronous/__init__.py b/test/asynchronous/__init__.py index 5ad00fcb68..20bb6e7f18 100644 --- a/test/asynchronous/__init__.py +++ b/test/asynchronous/__init__.py @@ -1252,8 +1252,6 @@ def tearDown(self): async def async_setup(): - global async_client_context - async_client_context = AsyncClientContext() await async_client_context.init() warnings.resetwarnings() warnings.simplefilter("always") diff --git a/test/asynchronous/conftest.py b/test/asynchronous/conftest.py index e520ea7d0c..c08f224abd 100644 --- a/test/asynchronous/conftest.py +++ b/test/asynchronous/conftest.py @@ -24,7 +24,7 @@ def event_loop_policy(): return asyncio.get_event_loop_policy() -@pytest_asyncio.fixture(scope="module", autouse=True) +@pytest_asyncio.fixture(scope="session", autouse=True) async def test_setup_and_teardown(): await async_setup() yield diff --git a/test/asynchronous/test_connections_survive_primary_stepdown_spec.py b/test/asynchronous/test_connections_survive_primary_stepdown_spec.py index 5b3ef1fae3..b26a080e3a 100644 --- a/test/asynchronous/test_connections_survive_primary_stepdown_spec.py +++ b/test/asynchronous/test_connections_survive_primary_stepdown_spec.py @@ -65,7 +65,7 @@ async def _setup_class(cls): @classmethod async def _tearDown_class(cls): await cls.client.close() - # await recreate_client_context() + await recreate_client_context() async def asyncSetUp(self): # Note that all ops use same write-concern as self.db (majority). diff --git a/test/conftest.py b/test/conftest.py index a0548aed0c..ca817a5a62 100644 --- a/test/conftest.py +++ b/test/conftest.py @@ -22,7 +22,7 @@ def event_loop_policy(): return asyncio.get_event_loop_policy() -@pytest.fixture(scope="module", autouse=True) +@pytest.fixture(scope="session", autouse=True) def test_setup_and_teardown(): setup() yield diff --git a/test/test_connections_survive_primary_stepdown_spec.py b/test/test_connections_survive_primary_stepdown_spec.py index 1943336484..2b9eb87be1 100644 --- a/test/test_connections_survive_primary_stepdown_spec.py +++ b/test/test_connections_survive_primary_stepdown_spec.py @@ -65,7 +65,7 @@ def _setup_class(cls): @classmethod def _tearDown_class(cls): cls.client.close() - # recreate_client_context() + recreate_client_context() def setUp(self): # Note that all ops use same write-concern as self.db (majority). From f78ee91c47755600192e7850684af79d43bbf369 Mon Sep 17 00:00:00 2001 From: Iris Date: Wed, 9 Oct 2024 10:46:27 -0700 Subject: [PATCH 15/25] only recreate client context in async version --- test/__init__.py | 3 +++ test/asynchronous/__init__.py | 3 +++ 2 files changed, 6 insertions(+) diff --git a/test/__init__.py b/test/__init__.py index 7f0cc49700..bcb26d4919 100644 --- a/test/__init__.py +++ b/test/__init__.py @@ -860,6 +860,9 @@ def max_message_size_bytes(self): def recreate_client_context(): + if _IS_SYNC: + # sync tests don't need to recreate a client context + return global client_context teardown() client_context = ClientContext() diff --git a/test/asynchronous/__init__.py b/test/asynchronous/__init__.py index 20bb6e7f18..5d03395199 100644 --- a/test/asynchronous/__init__.py +++ b/test/asynchronous/__init__.py @@ -862,6 +862,9 @@ async def max_message_size_bytes(self): async def recreate_client_context(): + if _IS_SYNC: + # sync tests don't need to recreate a client context + return global async_client_context await async_teardown() async_client_context = AsyncClientContext() From cebbd6f08def2717cd28204145414333a6c88f1a Mon Sep 17 00:00:00 2001 From: Iris Date: Wed, 9 Oct 2024 16:55:30 -0700 Subject: [PATCH 16/25] move periodic_executor.py up a lvl --- pymongo/asynchronous/mongo_client.py | 4 +- pymongo/asynchronous/monitor.py | 5 +- pymongo/asynchronous/topology.py | 3 +- .../{asynchronous => }/periodic_executor.py | 0 pymongo/synchronous/mongo_client.py | 4 +- pymongo/synchronous/monitor.py | 5 +- pymongo/synchronous/periodic_executor.py | 296 ------------------ pymongo/synchronous/topology.py | 3 +- tools/synchro.py | 2 +- 9 files changed, 11 insertions(+), 311 deletions(-) rename pymongo/{asynchronous => }/periodic_executor.py (100%) delete mode 100644 pymongo/synchronous/periodic_executor.py diff --git a/pymongo/asynchronous/mongo_client.py b/pymongo/asynchronous/mongo_client.py index 005cbb6a23..5318870ca3 100644 --- a/pymongo/asynchronous/mongo_client.py +++ b/pymongo/asynchronous/mongo_client.py @@ -59,8 +59,8 @@ from bson.codec_options import DEFAULT_CODEC_OPTIONS, CodecOptions, TypeRegistry from bson.timestamp import Timestamp -from pymongo import _csot, common, helpers_shared, uri_parser -from pymongo.asynchronous import client_session, database, periodic_executor +from pymongo import _csot, common, helpers_shared, periodic_executor, uri_parser +from pymongo.asynchronous import client_session, database from pymongo.asynchronous.change_stream import AsyncChangeStream, AsyncClusterChangeStream from pymongo.asynchronous.client_bulk import _AsyncClientBulk from pymongo.asynchronous.client_session import _EmptyServerSession diff --git a/pymongo/asynchronous/monitor.py b/pymongo/asynchronous/monitor.py index 8855cfb24d..1f8640c915 100644 --- a/pymongo/asynchronous/monitor.py +++ b/pymongo/asynchronous/monitor.py @@ -22,14 +22,13 @@ import weakref from typing import TYPE_CHECKING, Any, Mapping, Optional, cast -from pymongo import common +from pymongo import common, periodic_executor from pymongo._csot import MovingMinimum -from pymongo.asynchronous import periodic_executor -from pymongo.asynchronous.periodic_executor import _shutdown_executors from pymongo.errors import NetworkTimeout, NotPrimaryError, OperationFailure, _OperationCancelled from pymongo.hello import Hello from pymongo.lock import _create_lock from pymongo.logger import _SDAM_LOGGER, _debug_log, _SDAMStatusMessage +from pymongo.periodic_executor import _shutdown_executors from pymongo.pool_options import _is_faas from pymongo.read_preferences import MovingAverage from pymongo.server_description import ServerDescription diff --git a/pymongo/asynchronous/topology.py b/pymongo/asynchronous/topology.py index e8a90146ef..bb82a815c3 100644 --- a/pymongo/asynchronous/topology.py +++ b/pymongo/asynchronous/topology.py @@ -27,8 +27,7 @@ from pathlib import Path from typing import TYPE_CHECKING, Any, Callable, Mapping, Optional, cast -from pymongo import _csot, common, helpers_shared -from pymongo.asynchronous import periodic_executor +from pymongo import _csot, common, helpers_shared, periodic_executor from pymongo.asynchronous.client_session import _ServerSession, _ServerSessionPool from pymongo.asynchronous.monitor import SrvMonitor from pymongo.asynchronous.pool import Pool diff --git a/pymongo/asynchronous/periodic_executor.py b/pymongo/periodic_executor.py similarity index 100% rename from pymongo/asynchronous/periodic_executor.py rename to pymongo/periodic_executor.py diff --git a/pymongo/synchronous/mongo_client.py b/pymongo/synchronous/mongo_client.py index b2dff5b4ab..88c8b940ff 100644 --- a/pymongo/synchronous/mongo_client.py +++ b/pymongo/synchronous/mongo_client.py @@ -58,7 +58,7 @@ from bson.codec_options import DEFAULT_CODEC_OPTIONS, CodecOptions, TypeRegistry from bson.timestamp import Timestamp -from pymongo import _csot, common, helpers_shared, uri_parser +from pymongo import _csot, common, helpers_shared, periodic_executor, uri_parser from pymongo.client_options import ClientOptions from pymongo.errors import ( AutoReconnect, @@ -91,7 +91,7 @@ from pymongo.results import ClientBulkWriteResult from pymongo.server_selectors import writable_server_selector from pymongo.server_type import SERVER_TYPE -from pymongo.synchronous import client_session, database, periodic_executor +from pymongo.synchronous import client_session, database from pymongo.synchronous.change_stream import ChangeStream, ClusterChangeStream from pymongo.synchronous.client_bulk import _ClientBulk from pymongo.synchronous.client_session import _EmptyServerSession diff --git a/pymongo/synchronous/monitor.py b/pymongo/synchronous/monitor.py index 7f4539d756..a806670f2c 100644 --- a/pymongo/synchronous/monitor.py +++ b/pymongo/synchronous/monitor.py @@ -22,18 +22,17 @@ import weakref from typing import TYPE_CHECKING, Any, Mapping, Optional, cast -from pymongo import common +from pymongo import common, periodic_executor from pymongo._csot import MovingMinimum from pymongo.errors import NetworkTimeout, NotPrimaryError, OperationFailure, _OperationCancelled from pymongo.hello import Hello from pymongo.lock import _create_lock from pymongo.logger import _SDAM_LOGGER, _debug_log, _SDAMStatusMessage +from pymongo.periodic_executor import _shutdown_executors from pymongo.pool_options import _is_faas from pymongo.read_preferences import MovingAverage from pymongo.server_description import ServerDescription from pymongo.srv_resolver import _SrvResolver -from pymongo.synchronous import periodic_executor -from pymongo.synchronous.periodic_executor import _shutdown_executors if TYPE_CHECKING: from pymongo.synchronous.pool import Connection, Pool, _CancellationContext diff --git a/pymongo/synchronous/periodic_executor.py b/pymongo/synchronous/periodic_executor.py deleted file mode 100644 index 88fc40cc8b..0000000000 --- a/pymongo/synchronous/periodic_executor.py +++ /dev/null @@ -1,296 +0,0 @@ -# Copyright 2014-present MongoDB, Inc. -# -# Licensed under the Apache License, Version 2.0 (the "License"); you -# may not use this file except in compliance with the License. You -# may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, -# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or -# implied. See the License for the specific language governing -# permissions and limitations under the License. - -"""Run a target function on a background thread.""" - -from __future__ import annotations - -import asyncio -import sys -import threading -import time -import weakref -from typing import Any, Optional - -from pymongo.lock import _create_lock - -_IS_SYNC = True - - -class SyncPeriodicExecutor: - def __init__( - self, - interval: float, - min_interval: float, - target: Any, - name: Optional[str] = None, - ): - """Run a target function periodically on a background thread. - - If the target's return value is false, the executor stops. - - :param interval: Seconds between calls to `target`. - :param min_interval: Minimum seconds between calls if `wake` is - called very often. - :param target: A function. - :param name: A name to give the underlying thread. - """ - # threading.Event and its internal condition variable are expensive - # in Python 2, see PYTHON-983. Use a boolean to know when to wake. - # The executor's design is constrained by several Python issues, see - # "periodic_executor.rst" in this repository. - self._event = False - self._interval = interval - self._min_interval = min_interval - self._target = target - self._stopped = False - self._task: Optional[asyncio.Task] = None - self._name = name - self._skip_sleep = False - - def __repr__(self) -> str: - return f"<{self.__class__.__name__}(name={self._name}) object at 0x{id(self):x}>" - - def open(self) -> None: - """Start. Multiple calls have no effect.""" - self._stopped = False - started = self._task and not self._task.done() - - if not started: - self._task = asyncio.get_event_loop().create_task(self._run(), name=self._name) # type: ignore[func-returns-value] - - def close(self, dummy: Any = None) -> None: - """Stop. To restart, call open(). - - The dummy parameter allows an executor's close method to be a weakref - callback; see monitor.py. - """ - self._stopped = True - - def join(self, timeout: Optional[int] = None) -> None: - if self._task is not None: - try: - asyncio.wait_for(self._task, timeout=timeout) # type-ignore: [arg-type] - except asyncio.TimeoutError: - # Task timed out - pass - except asyncio.exceptions.CancelledError: - # Task was already finished, or not yet started. - pass - - def wake(self) -> None: - """Execute the target function soon.""" - self._event = True - - def update_interval(self, new_interval: int) -> None: - self._interval = new_interval - - def skip_sleep(self) -> None: - self._skip_sleep = True - - def _run(self) -> None: - while not self._stopped: - try: - if not self._target(): - self._stopped = True - break - except BaseException: - self._stopped = True - raise - - if self._skip_sleep: - self._skip_sleep = False - else: - deadline = time.monotonic() + self._interval - while not self._stopped and time.monotonic() < deadline: - time.sleep(self._min_interval) - if self._event: - break # Early wake. - - self._event = False - - -class PeriodicExecutor: - def __init__( - self, - interval: float, - min_interval: float, - target: Any, - name: Optional[str] = None, - ): - """Run a target function periodically on a background thread. - - If the target's return value is false, the executor stops. - - :param interval: Seconds between calls to `target`. - :param min_interval: Minimum seconds between calls if `wake` is - called very often. - :param target: A function. - :param name: A name to give the underlying thread. - """ - # threading.Event and its internal condition variable are expensive - # in Python 2, see PYTHON-983. Use a boolean to know when to wake. - # The executor's design is constrained by several Python issues, see - # "periodic_executor.rst" in this repository. - self._event = False - self._interval = interval - self._min_interval = min_interval - self._target = target - self._stopped = False - self._thread: Optional[threading.Thread] = None - self._name = name - self._skip_sleep = False - self._thread_will_exit = False - self._lock = _create_lock() - - def __repr__(self) -> str: - return f"<{self.__class__.__name__}(name={self._name}) object at 0x{id(self):x}>" - - def open(self) -> None: - """Start. Multiple calls have no effect. - - Not safe to call from multiple threads at once. - """ - with self._lock: - if self._thread_will_exit: - # If the background thread has read self._stopped as True - # there is a chance that it has not yet exited. The call to - # join should not block indefinitely because there is no - # other work done outside the while loop in self._run. - try: - assert self._thread is not None - self._thread.join() - except ReferenceError: - # Thread terminated. - pass - self._thread_will_exit = False - self._stopped = False - started: Any = False - try: - started = self._thread and self._thread.is_alive() - except ReferenceError: - # Thread terminated. - pass - - if not started: - thread = threading.Thread(target=self._run, name=self._name) - thread.daemon = True - self._thread = weakref.proxy(thread) - _register_executor(self) - # Mitigation to RuntimeError firing when thread starts on shutdown - # https://github.com/python/cpython/issues/114570 - try: - thread.start() - except RuntimeError as e: - if "interpreter shutdown" in str(e) or sys.is_finalizing(): - self._thread = None - return - raise - - def close(self, dummy: Any = None) -> None: - """Stop. To restart, call open(). - - The dummy parameter allows an executor's close method to be a weakref - callback; see monitor.py. - """ - self._stopped = True - - def join(self, timeout: Optional[int] = None) -> None: - if self._thread is not None: - try: - self._thread.join(timeout) - except (ReferenceError, RuntimeError): - # Thread already terminated, or not yet started. - pass - - def wake(self) -> None: - """Execute the target function soon.""" - self._event = True - - def update_interval(self, new_interval: int) -> None: - self._interval = new_interval - - def skip_sleep(self) -> None: - self._skip_sleep = True - - def _should_stop(self) -> bool: - with self._lock: - if self._stopped: - self._thread_will_exit = True - return True - return False - - def _run(self) -> None: - while not self._should_stop(): - try: - if not self._target(): - self._stopped = True - break - except BaseException: - with self._lock: - self._stopped = True - self._thread_will_exit = True - - raise - - if self._skip_sleep: - self._skip_sleep = False - else: - deadline = time.monotonic() + self._interval - while not self._stopped and time.monotonic() < deadline: - time.sleep(self._min_interval) - if self._event: - break # Early wake. - - self._event = False - - -# _EXECUTORS has a weakref to each running PeriodicExecutor. Once started, -# an executor is kept alive by a strong reference from its thread and perhaps -# from other objects. When the thread dies and all other referrers are freed, -# the executor is freed and removed from _EXECUTORS. If any threads are -# running when the interpreter begins to shut down, we try to halt and join -# them to avoid spurious errors. -_EXECUTORS = set() - - -def _register_executor(executor: PeriodicExecutor) -> None: - ref = weakref.ref(executor, _on_executor_deleted) - _EXECUTORS.add(ref) - - -def _on_executor_deleted(ref: weakref.ReferenceType[PeriodicExecutor]) -> None: - _EXECUTORS.remove(ref) - - -def _shutdown_executors() -> None: - if _EXECUTORS is None: - return - - # Copy the set. Stopping threads has the side effect of removing executors. - executors = list(_EXECUTORS) - - # First signal all executors to close... - for ref in executors: - executor = ref() - if executor: - executor.close() - - # ...then try to join them. - for ref in executors: - executor = ref() - if executor: - executor.join(1) - - executor = None diff --git a/pymongo/synchronous/topology.py b/pymongo/synchronous/topology.py index 0cee2ac0c9..977177480b 100644 --- a/pymongo/synchronous/topology.py +++ b/pymongo/synchronous/topology.py @@ -27,7 +27,7 @@ from pathlib import Path from typing import TYPE_CHECKING, Any, Callable, Mapping, Optional, cast -from pymongo import _csot, common, helpers_shared +from pymongo import _csot, common, helpers_shared, periodic_executor from pymongo.errors import ( ConnectionFailure, InvalidOperation, @@ -56,7 +56,6 @@ secondary_server_selector, writable_server_selector, ) -from pymongo.synchronous import periodic_executor from pymongo.synchronous.client_session import _ServerSession, _ServerSessionPool from pymongo.synchronous.monitor import SrvMonitor from pymongo.synchronous.pool import Pool diff --git a/tools/synchro.py b/tools/synchro.py index 4793f525b7..99f8d1c49a 100644 --- a/tools/synchro.py +++ b/tools/synchro.py @@ -108,7 +108,7 @@ "async_set_fail_point": "set_fail_point", "async_ensure_all_connected": "ensure_all_connected", "async_repl_set_step_down": "repl_set_step_down", - "AsyncPeriodicExecutor(": "PeriodicExecutor(", + "AsyncPeriodicExecutor": "PeriodicExecutor", "async_wait_for_event": "wait_for_event", } From adf050456b00b1fc937ce71b9e8560f504e21bcc Mon Sep 17 00:00:00 2001 From: Iris Date: Wed, 9 Oct 2024 17:22:17 -0700 Subject: [PATCH 17/25] mutate client context to reset it --- test/__init__.py | 8 +++++--- test/asynchronous/__init__.py | 8 +++++--- .../test_connections_survive_primary_stepdown_spec.py | 4 ++-- test/test_connections_survive_primary_stepdown_spec.py | 4 ++-- test/test_monitor.py | 2 +- 5 files changed, 15 insertions(+), 11 deletions(-) diff --git a/test/__init__.py b/test/__init__.py index bcb26d4919..63ceb2ab42 100644 --- a/test/__init__.py +++ b/test/__init__.py @@ -127,6 +127,9 @@ def __init__(self): server_api = ServerApi(MONGODB_API_VERSION) self.default_client_options["server_api"] = server_api + def reset(self): + self.__init__() + @property def client_options(self): """Return the MongoClient options for creating a duplicate client.""" @@ -859,13 +862,12 @@ def max_message_size_bytes(self): client_context = ClientContext() -def recreate_client_context(): +def reset_client_context(): if _IS_SYNC: # sync tests don't need to recreate a client context return - global client_context teardown() - client_context = ClientContext() + client_context.reset() setup() diff --git a/test/asynchronous/__init__.py b/test/asynchronous/__init__.py index 5d03395199..b1388ac2b0 100644 --- a/test/asynchronous/__init__.py +++ b/test/asynchronous/__init__.py @@ -127,6 +127,9 @@ def __init__(self): server_api = ServerApi(MONGODB_API_VERSION) self.default_client_options["server_api"] = server_api + def reset(self): + self.__init__() + @property def client_options(self): """Return the MongoClient options for creating a duplicate client.""" @@ -861,13 +864,12 @@ async def max_message_size_bytes(self): async_client_context = AsyncClientContext() -async def recreate_client_context(): +async def reset_client_context(): if _IS_SYNC: # sync tests don't need to recreate a client context return - global async_client_context await async_teardown() - async_client_context = AsyncClientContext() + async_client_context.reset() await async_setup() diff --git a/test/asynchronous/test_connections_survive_primary_stepdown_spec.py b/test/asynchronous/test_connections_survive_primary_stepdown_spec.py index b26a080e3a..dc04cb28a7 100644 --- a/test/asynchronous/test_connections_survive_primary_stepdown_spec.py +++ b/test/asynchronous/test_connections_survive_primary_stepdown_spec.py @@ -22,7 +22,7 @@ from test.asynchronous import ( AsyncIntegrationTest, async_client_context, - recreate_client_context, + reset_client_context, unittest, ) from test.asynchronous.helpers import async_repl_set_step_down @@ -65,7 +65,7 @@ async def _setup_class(cls): @classmethod async def _tearDown_class(cls): await cls.client.close() - await recreate_client_context() + await reset_client_context() async def asyncSetUp(self): # Note that all ops use same write-concern as self.db (majority). diff --git a/test/test_connections_survive_primary_stepdown_spec.py b/test/test_connections_survive_primary_stepdown_spec.py index 2b9eb87be1..984d700fb3 100644 --- a/test/test_connections_survive_primary_stepdown_spec.py +++ b/test/test_connections_survive_primary_stepdown_spec.py @@ -22,7 +22,7 @@ from test import ( IntegrationTest, client_context, - recreate_client_context, + reset_client_context, unittest, ) from test.helpers import repl_set_step_down @@ -65,7 +65,7 @@ def _setup_class(cls): @classmethod def _tearDown_class(cls): cls.client.close() - recreate_client_context() + reset_client_context() def setUp(self): # Note that all ops use same write-concern as self.db (majority). diff --git a/test/test_monitor.py b/test/test_monitor.py index f8e9443fae..a704f3d8cb 100644 --- a/test/test_monitor.py +++ b/test/test_monitor.py @@ -29,7 +29,7 @@ wait_until, ) -from pymongo.synchronous.periodic_executor import _EXECUTORS +from pymongo.periodic_executor import _EXECUTORS def unregistered(ref): From 56b37c286ec2db2da10afd1ee7f9f03ea1976a4c Mon Sep 17 00:00:00 2001 From: Iris Date: Wed, 9 Oct 2024 17:25:29 -0700 Subject: [PATCH 18/25] remove type ignore --- pymongo/periodic_executor.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/pymongo/periodic_executor.py b/pymongo/periodic_executor.py index d6ce617b83..57df4d82c0 100644 --- a/pymongo/periodic_executor.py +++ b/pymongo/periodic_executor.py @@ -68,7 +68,7 @@ def open(self) -> None: started = self._task and not self._task.done() if not started: - self._task = asyncio.get_event_loop().create_task(self._run(), name=self._name) # type: ignore[func-returns-value] + self._task = asyncio.get_event_loop().create_task(self._run(), name=self._name) def close(self, dummy: Any = None) -> None: """Stop. To restart, call open(). From 938c8a7693b1429bc331639ae9b444c4943494a5 Mon Sep 17 00:00:00 2001 From: Iris Date: Wed, 9 Oct 2024 17:39:25 -0700 Subject: [PATCH 19/25] make PeriodicExecutor sync only --- pymongo/periodic_executor.py | 16 ++++++++-------- 1 file changed, 8 insertions(+), 8 deletions(-) diff --git a/pymongo/periodic_executor.py b/pymongo/periodic_executor.py index 57df4d82c0..7bc050f96a 100644 --- a/pymongo/periodic_executor.py +++ b/pymongo/periodic_executor.py @@ -152,7 +152,7 @@ def __init__( self._name = name self._skip_sleep = False self._thread_will_exit = False - self._lock = _ALock(_create_lock()) + self._lock = _create_lock() def __repr__(self) -> str: return f"<{self.__class__.__name__}(name={self._name}) object at 0x{id(self):x}>" @@ -224,21 +224,21 @@ def update_interval(self, new_interval: int) -> None: def skip_sleep(self) -> None: self._skip_sleep = True - async def _should_stop(self) -> bool: - async with self._lock: + def _should_stop(self) -> bool: + with self._lock: if self._stopped: self._thread_will_exit = True return True return False - async def _run(self) -> None: - while not await self._should_stop(): + def _run(self) -> None: + while not self._should_stop(): try: - if not await self._target(): + if not self._target(): self._stopped = True break except BaseException: - async with self._lock: + with self._lock: self._stopped = True self._thread_will_exit = True @@ -249,7 +249,7 @@ async def _run(self) -> None: else: deadline = time.monotonic() + self._interval while not self._stopped and time.monotonic() < deadline: - await asyncio.sleep(self._min_interval) + time.sleep(self._min_interval) if self._event: break # Early wake. From 99c381572bbdc8a5c7b4429261da4dbe85c82768 Mon Sep 17 00:00:00 2001 From: Iris Date: Thu, 10 Oct 2024 10:29:43 -0700 Subject: [PATCH 20/25] fix reset_client_context --- pymongo/asynchronous/monitor.py | 2 +- pymongo/periodic_executor.py | 2 +- pymongo/synchronous/monitor.py | 2 +- test/__init__.py | 6 +++--- test/asynchronous/__init__.py | 6 +++--- 5 files changed, 9 insertions(+), 9 deletions(-) diff --git a/pymongo/asynchronous/monitor.py b/pymongo/asynchronous/monitor.py index 1f8640c915..54bcd3bc1a 100644 --- a/pymongo/asynchronous/monitor.py +++ b/pymongo/asynchronous/monitor.py @@ -138,7 +138,7 @@ def __init__( """ super().__init__( topology, - "pymongo_server_monitor_thread", + "pymongo_server_monitor_task", topology_settings.heartbeat_frequency, common.MIN_HEARTBEAT_INTERVAL, ) diff --git a/pymongo/periodic_executor.py b/pymongo/periodic_executor.py index 7bc050f96a..f63f79eb32 100644 --- a/pymongo/periodic_executor.py +++ b/pymongo/periodic_executor.py @@ -23,7 +23,7 @@ import weakref from typing import Any, Optional -from pymongo.lock import _ALock, _create_lock +from pymongo.lock import _create_lock _IS_SYNC = False diff --git a/pymongo/synchronous/monitor.py b/pymongo/synchronous/monitor.py index a806670f2c..b9f325cee8 100644 --- a/pymongo/synchronous/monitor.py +++ b/pymongo/synchronous/monitor.py @@ -138,7 +138,7 @@ def __init__( """ super().__init__( topology, - "pymongo_server_monitor_thread", + "pymongo_server_monitor_task", topology_settings.heartbeat_frequency, common.MIN_HEARTBEAT_INTERVAL, ) diff --git a/test/__init__.py b/test/__init__.py index 63ceb2ab42..df48ae7a91 100644 --- a/test/__init__.py +++ b/test/__init__.py @@ -866,9 +866,9 @@ def reset_client_context(): if _IS_SYNC: # sync tests don't need to recreate a client context return - teardown() - client_context.reset() - setup() + client_context.client.close() + client_context.client = None + client_context._init_client() class PyMongoTestCase(unittest.TestCase): diff --git a/test/asynchronous/__init__.py b/test/asynchronous/__init__.py index b1388ac2b0..5298e38334 100644 --- a/test/asynchronous/__init__.py +++ b/test/asynchronous/__init__.py @@ -868,9 +868,9 @@ async def reset_client_context(): if _IS_SYNC: # sync tests don't need to recreate a client context return - await async_teardown() - async_client_context.reset() - await async_setup() + await async_client_context.client.close() + async_client_context.client = None + await async_client_context._init_client() class AsyncPyMongoTestCase(unittest.IsolatedAsyncioTestCase): From 7ece8126fecc870f4fa2f7bec26b5349c1e912b5 Mon Sep 17 00:00:00 2001 From: Iris Date: Thu, 10 Oct 2024 11:16:04 -0700 Subject: [PATCH 21/25] change pytest fixture scope --- test/asynchronous/conftest.py | 2 +- test/conftest.py | 2 +- tools/synchro.py | 1 + 3 files changed, 3 insertions(+), 2 deletions(-) diff --git a/test/asynchronous/conftest.py b/test/asynchronous/conftest.py index c08f224abd..ad414f6a7c 100644 --- a/test/asynchronous/conftest.py +++ b/test/asynchronous/conftest.py @@ -24,7 +24,7 @@ def event_loop_policy(): return asyncio.get_event_loop_policy() -@pytest_asyncio.fixture(scope="session", autouse=True) +@pytest_asyncio.fixture(scope="packages", autouse=True) async def test_setup_and_teardown(): await async_setup() yield diff --git a/test/conftest.py b/test/conftest.py index ca817a5a62..b799b66f03 100644 --- a/test/conftest.py +++ b/test/conftest.py @@ -22,7 +22,7 @@ def event_loop_policy(): return asyncio.get_event_loop_policy() -@pytest.fixture(scope="session", autouse=True) +@pytest.fixture(scope="packages", autouse=True) def test_setup_and_teardown(): setup() yield diff --git a/tools/synchro.py b/tools/synchro.py index 99f8d1c49a..5589a8f9b9 100644 --- a/tools/synchro.py +++ b/tools/synchro.py @@ -110,6 +110,7 @@ "async_repl_set_step_down": "repl_set_step_down", "AsyncPeriodicExecutor": "PeriodicExecutor", "async_wait_for_event": "wait_for_event", + "pymongo_server_monitor_thread": "pymongo_server_monitor_task", } docstring_replacements: dict[tuple[str, str], str] = { From 3924d80b80eef91496a2012eb9a99e42110785e7 Mon Sep 17 00:00:00 2001 From: Iris Date: Thu, 10 Oct 2024 11:17:40 -0700 Subject: [PATCH 22/25] update AsyncPeriodicExecutors docs --- pymongo/periodic_executor.py | 8 ++------ 1 file changed, 2 insertions(+), 6 deletions(-) diff --git a/pymongo/periodic_executor.py b/pymongo/periodic_executor.py index f63f79eb32..216a4457c7 100644 --- a/pymongo/periodic_executor.py +++ b/pymongo/periodic_executor.py @@ -36,7 +36,7 @@ def __init__( target: Any, name: Optional[str] = None, ): - """Run a target function periodically on a background thread. + """Run a target function periodically on a background task. If the target's return value is false, the executor stops. @@ -44,12 +44,8 @@ def __init__( :param min_interval: Minimum seconds between calls if `wake` is called very often. :param target: A function. - :param name: A name to give the underlying thread. + :param name: A name to give the underlying task. """ - # threading.Event and its internal condition variable are expensive - # in Python 2, see PYTHON-983. Use a boolean to know when to wake. - # The executor's design is constrained by several Python issues, see - # "periodic_executor.rst" in this repository. self._event = False self._interval = interval self._min_interval = min_interval From 9631ed6b110be366de53fd8ee43daae4f8812210 Mon Sep 17 00:00:00 2001 From: Iris Date: Thu, 10 Oct 2024 11:27:06 -0700 Subject: [PATCH 23/25] fix scope typo --- test/asynchronous/conftest.py | 2 +- test/conftest.py | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/test/asynchronous/conftest.py b/test/asynchronous/conftest.py index ad414f6a7c..d4a6fa53b9 100644 --- a/test/asynchronous/conftest.py +++ b/test/asynchronous/conftest.py @@ -24,7 +24,7 @@ def event_loop_policy(): return asyncio.get_event_loop_policy() -@pytest_asyncio.fixture(scope="packages", autouse=True) +@pytest_asyncio.fixture(scope="package", autouse=True) async def test_setup_and_teardown(): await async_setup() yield diff --git a/test/conftest.py b/test/conftest.py index b799b66f03..2799a8f41a 100644 --- a/test/conftest.py +++ b/test/conftest.py @@ -22,7 +22,7 @@ def event_loop_policy(): return asyncio.get_event_loop_policy() -@pytest.fixture(scope="packages", autouse=True) +@pytest.fixture(scope="package", autouse=True) def test_setup_and_teardown(): setup() yield From 0aa907526ac82199aab48930e25741e2cd19d249 Mon Sep 17 00:00:00 2001 From: Iris Date: Thu, 10 Oct 2024 15:24:14 -0700 Subject: [PATCH 24/25] fix task/thread name and delete commented out code --- pymongo/asynchronous/monitor.py | 6 +++++- pymongo/asynchronous/topology.py | 8 -------- pymongo/synchronous/monitor.py | 6 +++++- pymongo/synchronous/topology.py | 8 -------- 4 files changed, 10 insertions(+), 18 deletions(-) diff --git a/pymongo/asynchronous/monitor.py b/pymongo/asynchronous/monitor.py index 54bcd3bc1a..fe577ff380 100644 --- a/pymongo/asynchronous/monitor.py +++ b/pymongo/asynchronous/monitor.py @@ -136,9 +136,13 @@ def __init__( The Topology is weakly referenced. The Pool must be exclusive to this Monitor. """ + if _IS_SYNC: + name = "pymongo_server_monitor_thread" + else: + name = "pymongo_server_monitor_task" super().__init__( topology, - "pymongo_server_monitor_task", + name, topology_settings.heartbeat_frequency, common.MIN_HEARTBEAT_INTERVAL, ) diff --git a/pymongo/asynchronous/topology.py b/pymongo/asynchronous/topology.py index bb82a815c3..f0cb56cbf1 100644 --- a/pymongo/asynchronous/topology.py +++ b/pymongo/asynchronous/topology.py @@ -653,15 +653,7 @@ async def request_check_all(self, wait_time: int = 5) -> None: """Wake all monitors, wait for at least one to check its server.""" async with self._lock: self._request_check_all() - # if _IS_SYNC: await self._condition.wait(wait_time) - # else: - # try: - # await asyncio.wait_for( - # self._condition.wait(), wait_time - # ) # type-ignore: [arg-type] - # except asyncio.TimeoutError: - # pass def data_bearing_servers(self) -> list[ServerDescription]: """Return a list of all data-bearing servers. diff --git a/pymongo/synchronous/monitor.py b/pymongo/synchronous/monitor.py index b9f325cee8..48b4b8a818 100644 --- a/pymongo/synchronous/monitor.py +++ b/pymongo/synchronous/monitor.py @@ -136,9 +136,13 @@ def __init__( The Topology is weakly referenced. The Pool must be exclusive to this Monitor. """ + if _IS_SYNC: + name = "pymongo_server_monitor_task" + else: + name = "pymongo_server_monitor_task" super().__init__( topology, - "pymongo_server_monitor_task", + name, topology_settings.heartbeat_frequency, common.MIN_HEARTBEAT_INTERVAL, ) diff --git a/pymongo/synchronous/topology.py b/pymongo/synchronous/topology.py index 977177480b..e34de6bc50 100644 --- a/pymongo/synchronous/topology.py +++ b/pymongo/synchronous/topology.py @@ -651,15 +651,7 @@ def request_check_all(self, wait_time: int = 5) -> None: """Wake all monitors, wait for at least one to check its server.""" with self._lock: self._request_check_all() - # if _IS_SYNC: self._condition.wait(wait_time) - # else: - # try: - # asyncio.wait_for( - # self._condition.wait(), wait_time - # ) # type-ignore: [arg-type] - # except asyncio.TimeoutError: - # pass def data_bearing_servers(self) -> list[ServerDescription]: """Return a list of all data-bearing servers. From 65978031f3768f77b064ccf8dc6812152b888a3d Mon Sep 17 00:00:00 2001 From: Iris Date: Thu, 10 Oct 2024 16:56:27 -0700 Subject: [PATCH 25/25] fix string typo in synchro and comment string --- pymongo/asynchronous/monitor.py | 8 ++------ pymongo/synchronous/monitor.py | 6 +----- test/__init__.py | 5 +---- test/asynchronous/__init__.py | 5 +---- tools/synchro.py | 3 ++- 5 files changed, 7 insertions(+), 20 deletions(-) diff --git a/pymongo/asynchronous/monitor.py b/pymongo/asynchronous/monitor.py index fe577ff380..bbfd6a2998 100644 --- a/pymongo/asynchronous/monitor.py +++ b/pymongo/asynchronous/monitor.py @@ -136,13 +136,9 @@ def __init__( The Topology is weakly referenced. The Pool must be exclusive to this Monitor. """ - if _IS_SYNC: - name = "pymongo_server_monitor_thread" - else: - name = "pymongo_server_monitor_task" super().__init__( topology, - name, + "pymongo_server_monitor_task", topology_settings.heartbeat_frequency, common.MIN_HEARTBEAT_INTERVAL, ) @@ -437,7 +433,7 @@ def __init__(self, topology: Topology, topology_settings: TopologySettings, pool """ super().__init__( topology, - "pymongo_server_rtt_thread", + "pymongo_server_rtt_task", topology_settings.heartbeat_frequency, common.MIN_HEARTBEAT_INTERVAL, ) diff --git a/pymongo/synchronous/monitor.py b/pymongo/synchronous/monitor.py index 48b4b8a818..a806670f2c 100644 --- a/pymongo/synchronous/monitor.py +++ b/pymongo/synchronous/monitor.py @@ -136,13 +136,9 @@ def __init__( The Topology is weakly referenced. The Pool must be exclusive to this Monitor. """ - if _IS_SYNC: - name = "pymongo_server_monitor_task" - else: - name = "pymongo_server_monitor_task" super().__init__( topology, - name, + "pymongo_server_monitor_thread", topology_settings.heartbeat_frequency, common.MIN_HEARTBEAT_INTERVAL, ) diff --git a/test/__init__.py b/test/__init__.py index df48ae7a91..6be3b49ce6 100644 --- a/test/__init__.py +++ b/test/__init__.py @@ -127,9 +127,6 @@ def __init__(self): server_api = ServerApi(MONGODB_API_VERSION) self.default_client_options["server_api"] = server_api - def reset(self): - self.__init__() - @property def client_options(self): """Return the MongoClient options for creating a duplicate client.""" @@ -864,7 +861,7 @@ def max_message_size_bytes(self): def reset_client_context(): if _IS_SYNC: - # sync tests don't need to recreate a client context + # sync tests don't need to reset a client context return client_context.client.close() client_context.client = None diff --git a/test/asynchronous/__init__.py b/test/asynchronous/__init__.py index 5298e38334..1a386fe766 100644 --- a/test/asynchronous/__init__.py +++ b/test/asynchronous/__init__.py @@ -127,9 +127,6 @@ def __init__(self): server_api = ServerApi(MONGODB_API_VERSION) self.default_client_options["server_api"] = server_api - def reset(self): - self.__init__() - @property def client_options(self): """Return the MongoClient options for creating a duplicate client.""" @@ -866,7 +863,7 @@ async def max_message_size_bytes(self): async def reset_client_context(): if _IS_SYNC: - # sync tests don't need to recreate a client context + # sync tests don't need to reset a client context return await async_client_context.client.close() async_client_context.client = None diff --git a/tools/synchro.py b/tools/synchro.py index 5589a8f9b9..50ba4b4943 100644 --- a/tools/synchro.py +++ b/tools/synchro.py @@ -110,7 +110,8 @@ "async_repl_set_step_down": "repl_set_step_down", "AsyncPeriodicExecutor": "PeriodicExecutor", "async_wait_for_event": "wait_for_event", - "pymongo_server_monitor_thread": "pymongo_server_monitor_task", + "pymongo_server_monitor_task": "pymongo_server_monitor_thread", + "pymongo_server_rtt_task": "pymongo_server_rtt_thread", } docstring_replacements: dict[tuple[str, str], str] = {