Skip to content
Merged
Show file tree
Hide file tree
Changes from 15 commits
Commits
Show all changes
26 commits
Select commit Hold shift + click to select a range
19f14a9
create AsyncPeriodicExecutor
sleepyStick Oct 4, 2024
59c0733
fix tests
sleepyStick Oct 4, 2024
3f5dda7
undo lock changes
sleepyStick Oct 4, 2024
65301cf
found some more lock changes to undo and remove atexit for async
sleepyStick Oct 7, 2024
8852bc9
hacky fix to PeriodicExecutor being defined twice and fix unwanted co…
sleepyStick Oct 7, 2024
5c9b1d9
add missing awaits
sleepyStick Oct 7, 2024
ce322f5
attempt to fix test
sleepyStick Oct 7, 2024
549645b
left out an await
sleepyStick Oct 7, 2024
c92f6af
add missing await and fix typing errors
sleepyStick Oct 8, 2024
c58448f
realizing condition needs to be reverted too
sleepyStick Oct 8, 2024
43660d9
reset client context after primary steps down
sleepyStick Oct 9, 2024
dbb51e2
ignore type error on sync
sleepyStick Oct 9, 2024
3c640e8
temp changing pytests's setup and teardown are run to see if this wil…
sleepyStick Oct 9, 2024
0fc60ba
only reset client context after primary_stepdown
sleepyStick Oct 9, 2024
f78ee91
only recreate client context in async version
sleepyStick Oct 9, 2024
cebbd6f
move periodic_executor.py up a lvl
sleepyStick Oct 9, 2024
adf0504
mutate client context to reset it
sleepyStick Oct 10, 2024
56b37c2
remove type ignore
sleepyStick Oct 10, 2024
938c8a7
make PeriodicExecutor sync only
sleepyStick Oct 10, 2024
99c3815
fix reset_client_context
sleepyStick Oct 10, 2024
7ece812
change pytest fixture scope
sleepyStick Oct 10, 2024
3924d80
update AsyncPeriodicExecutors docs
sleepyStick Oct 10, 2024
9631ed6
fix scope typo
sleepyStick Oct 10, 2024
0aa9075
fix task/thread name and delete commented out code
sleepyStick Oct 10, 2024
6597803
fix string typo in synchro and comment string
sleepyStick Oct 10, 2024
541b72d
Merge remote-tracking branch 'upstream/async-improvements' into PYTHO…
sleepyStick Oct 11, 2024
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion pymongo/asynchronous/mongo_client.py
Original file line number Diff line number Diff line change
Expand Up @@ -909,7 +909,7 @@ async def target() -> bool:
await AsyncMongoClient._process_periodic_tasks(client)
return True

executor = periodic_executor.PeriodicExecutor(
executor = periodic_executor.AsyncPeriodicExecutor(
interval=common.KILL_CURSOR_FREQUENCY,
min_interval=common.MIN_HEARTBEAT_INTERVAL,
target=target,
Expand Down
11 changes: 6 additions & 5 deletions pymongo/asynchronous/monitor.py
Original file line number Diff line number Diff line change
Expand Up @@ -76,7 +76,7 @@ async def target() -> bool:
await monitor._run() # type:ignore[attr-defined]
return True

executor = periodic_executor.PeriodicExecutor(
executor = periodic_executor.AsyncPeriodicExecutor(
interval=interval, min_interval=min_interval, target=target, name=name
)

Expand Down Expand Up @@ -112,9 +112,9 @@ async def close(self) -> None:
"""
self.gc_safe_close()

def join(self, timeout: Optional[int] = None) -> None:
async def join(self, timeout: Optional[int] = None) -> None:
"""Wait for the monitor to stop."""
self._executor.join(timeout)
await self._executor.join(timeout)

def request_check(self) -> None:
"""If the monitor is sleeping, wake it soon."""
Expand Down Expand Up @@ -250,7 +250,7 @@ async def _check_server(self) -> ServerDescription:
except (OperationFailure, NotPrimaryError) as exc:
# Update max cluster time even when hello fails.
details = cast(Mapping[str, Any], exc.details)
self._topology.receive_cluster_time(details.get("$clusterTime"))
await self._topology.receive_cluster_time(details.get("$clusterTime"))
raise
except ReferenceError:
raise
Expand Down Expand Up @@ -531,4 +531,5 @@ def _shutdown_resources() -> None:
shutdown()


atexit.register(_shutdown_resources)
if _IS_SYNC:
atexit.register(_shutdown_resources)
111 changes: 94 additions & 17 deletions pymongo/asynchronous/periodic_executor.py
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,99 @@
_IS_SYNC = False


class AsyncPeriodicExecutor:
def __init__(
self,
interval: float,
min_interval: float,
target: Any,
name: Optional[str] = None,
):
"""Run a target function periodically on a background thread.

If the target's return value is false, the executor stops.

:param interval: Seconds between calls to `target`.
:param min_interval: Minimum seconds between calls if `wake` is
called very often.
:param target: A function.
:param name: A name to give the underlying thread.
"""
# threading.Event and its internal condition variable are expensive
# in Python 2, see PYTHON-983. Use a boolean to know when to wake.
# The executor's design is constrained by several Python issues, see
# "periodic_executor.rst" in this repository.
self._event = False
self._interval = interval
self._min_interval = min_interval
self._target = target
self._stopped = False
self._task: Optional[asyncio.Task] = None
self._name = name
self._skip_sleep = False

def __repr__(self) -> str:
return f"<{self.__class__.__name__}(name={self._name}) object at 0x{id(self):x}>"

def open(self) -> None:
"""Start. Multiple calls have no effect."""
self._stopped = False
started = self._task and not self._task.done()

if not started:
self._task = asyncio.get_event_loop().create_task(self._run(), name=self._name) # type: ignore[func-returns-value]

def close(self, dummy: Any = None) -> None:
"""Stop. To restart, call open().

The dummy parameter allows an executor's close method to be a weakref
callback; see monitor.py.
"""
self._stopped = True

async def join(self, timeout: Optional[int] = None) -> None:
if self._task is not None:
try:
await asyncio.wait_for(self._task, timeout=timeout) # type-ignore: [arg-type]
except asyncio.TimeoutError:
# Task timed out
pass
except asyncio.exceptions.CancelledError:
# Task was already finished, or not yet started.
pass

def wake(self) -> None:
"""Execute the target function soon."""
self._event = True

def update_interval(self, new_interval: int) -> None:
self._interval = new_interval

def skip_sleep(self) -> None:
self._skip_sleep = True

async def _run(self) -> None:
while not self._stopped:
try:
if not await self._target():
self._stopped = True
break
except BaseException:
self._stopped = True
raise

if self._skip_sleep:
self._skip_sleep = False
else:
deadline = time.monotonic() + self._interval
while not self._stopped and time.monotonic() < deadline:
await asyncio.sleep(self._min_interval)
if self._event:
break # Early wake.

self._event = False


class PeriodicExecutor:
def __init__(
self,
Expand Down Expand Up @@ -64,19 +157,6 @@ def __init__(
def __repr__(self) -> str:
return f"<{self.__class__.__name__}(name={self._name}) object at 0x{id(self):x}>"

def _run_async(self) -> None:
# The default asyncio loop implementation on Windows
# has issues with sharing sockets across loops (https://github.com/python/cpython/issues/122240)
# We explicitly use a different loop implementation here to prevent that issue
if sys.platform == "win32":
loop = asyncio.SelectorEventLoop()
try:
loop.run_until_complete(self._run()) # type: ignore[func-returns-value]
finally:
loop.close()
else:
asyncio.run(self._run()) # type: ignore[func-returns-value]

def open(self) -> None:
"""Start. Multiple calls have no effect.

Expand Down Expand Up @@ -104,10 +184,7 @@ def open(self) -> None:
pass

if not started:
if _IS_SYNC:
thread = threading.Thread(target=self._run, name=self._name)
else:
thread = threading.Thread(target=self._run_async, name=self._name)
thread = threading.Thread(target=self._run, name=self._name)
thread.daemon = True
self._thread = weakref.proxy(thread)
_register_executor(self)
Expand Down
12 changes: 10 additions & 2 deletions pymongo/asynchronous/topology.py
Original file line number Diff line number Diff line change
Expand Up @@ -185,7 +185,7 @@ def __init__(self, topology_settings: TopologySettings):
async def target() -> bool:
return process_events_queue(weak)

executor = periodic_executor.PeriodicExecutor(
executor = periodic_executor.AsyncPeriodicExecutor(
interval=common.EVENTS_QUEUE_FREQUENCY,
min_interval=common.MIN_HEARTBEAT_INTERVAL,
target=target,
Expand Down Expand Up @@ -654,7 +654,15 @@ async def request_check_all(self, wait_time: int = 5) -> None:
"""Wake all monitors, wait for at least one to check its server."""
async with self._lock:
self._request_check_all()
# if _IS_SYNC:
await self._condition.wait(wait_time)
# else:
# try:
# await asyncio.wait_for(
# self._condition.wait(), wait_time
# ) # type-ignore: [arg-type]
# except asyncio.TimeoutError:
# pass

def data_bearing_servers(self) -> list[ServerDescription]:
"""Return a list of all data-bearing servers.
Expand Down Expand Up @@ -742,7 +750,7 @@ async def close(self) -> None:
if self._publish_server or self._publish_tp:
# Make sure the events executor thread is fully closed before publishing the remaining events
self.__events_executor.close()
self.__events_executor.join(1)
await self.__events_executor.join(1)
process_events_queue(weakref.ref(self._events)) # type: ignore[arg-type]

@property
Expand Down
3 changes: 2 additions & 1 deletion pymongo/synchronous/monitor.py
Original file line number Diff line number Diff line change
Expand Up @@ -531,4 +531,5 @@ def _shutdown_resources() -> None:
shutdown()


atexit.register(_shutdown_resources)
if _IS_SYNC:
atexit.register(_shutdown_resources)
111 changes: 94 additions & 17 deletions pymongo/synchronous/periodic_executor.py
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,99 @@
_IS_SYNC = True


class SyncPeriodicExecutor:
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We should organize the code such that we don't sync this code since it will never be used.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

As in move this file out of the async / sync folders right? If so, that makes sense to me :)

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yep. We can have one file that has both.

def __init__(
self,
interval: float,
min_interval: float,
target: Any,
name: Optional[str] = None,
):
"""Run a target function periodically on a background thread.

If the target's return value is false, the executor stops.

:param interval: Seconds between calls to `target`.
:param min_interval: Minimum seconds between calls if `wake` is
called very often.
:param target: A function.
:param name: A name to give the underlying thread.
"""
# threading.Event and its internal condition variable are expensive
# in Python 2, see PYTHON-983. Use a boolean to know when to wake.
# The executor's design is constrained by several Python issues, see
# "periodic_executor.rst" in this repository.
self._event = False
self._interval = interval
self._min_interval = min_interval
self._target = target
self._stopped = False
self._task: Optional[asyncio.Task] = None
self._name = name
self._skip_sleep = False

def __repr__(self) -> str:
return f"<{self.__class__.__name__}(name={self._name}) object at 0x{id(self):x}>"

def open(self) -> None:
"""Start. Multiple calls have no effect."""
self._stopped = False
started = self._task and not self._task.done()

if not started:
self._task = asyncio.get_event_loop().create_task(self._run(), name=self._name) # type: ignore[func-returns-value]

def close(self, dummy: Any = None) -> None:
"""Stop. To restart, call open().

The dummy parameter allows an executor's close method to be a weakref
callback; see monitor.py.
"""
self._stopped = True

def join(self, timeout: Optional[int] = None) -> None:
if self._task is not None:
try:
asyncio.wait_for(self._task, timeout=timeout) # type-ignore: [arg-type]
except asyncio.TimeoutError:
# Task timed out
pass
except asyncio.exceptions.CancelledError:
# Task was already finished, or not yet started.
pass

def wake(self) -> None:
"""Execute the target function soon."""
self._event = True

def update_interval(self, new_interval: int) -> None:
self._interval = new_interval

def skip_sleep(self) -> None:
self._skip_sleep = True

def _run(self) -> None:
while not self._stopped:
try:
if not self._target():
self._stopped = True
break
except BaseException:
self._stopped = True
raise

if self._skip_sleep:
self._skip_sleep = False
else:
deadline = time.monotonic() + self._interval
while not self._stopped and time.monotonic() < deadline:
time.sleep(self._min_interval)
if self._event:
break # Early wake.

self._event = False


class PeriodicExecutor:
def __init__(
self,
Expand Down Expand Up @@ -64,19 +157,6 @@ def __init__(
def __repr__(self) -> str:
return f"<{self.__class__.__name__}(name={self._name}) object at 0x{id(self):x}>"

def _run_async(self) -> None:
# The default asyncio loop implementation on Windows
# has issues with sharing sockets across loops (https://github.com/python/cpython/issues/122240)
# We explicitly use a different loop implementation here to prevent that issue
if sys.platform == "win32":
loop = asyncio.SelectorEventLoop()
try:
loop.run_until_complete(self._run()) # type: ignore[func-returns-value]
finally:
loop.close()
else:
asyncio.run(self._run()) # type: ignore[func-returns-value]

def open(self) -> None:
"""Start. Multiple calls have no effect.

Expand Down Expand Up @@ -104,10 +184,7 @@ def open(self) -> None:
pass

if not started:
if _IS_SYNC:
thread = threading.Thread(target=self._run, name=self._name)
else:
thread = threading.Thread(target=self._run_async, name=self._name)
thread = threading.Thread(target=self._run, name=self._name)
thread.daemon = True
self._thread = weakref.proxy(thread)
_register_executor(self)
Expand Down
8 changes: 8 additions & 0 deletions pymongo/synchronous/topology.py
Original file line number Diff line number Diff line change
Expand Up @@ -652,7 +652,15 @@ def request_check_all(self, wait_time: int = 5) -> None:
"""Wake all monitors, wait for at least one to check its server."""
with self._lock:
self._request_check_all()
# if _IS_SYNC:
self._condition.wait(wait_time)
# else:
# try:
# asyncio.wait_for(
# self._condition.wait(), wait_time
# ) # type-ignore: [arg-type]
# except asyncio.TimeoutError:
# pass

def data_bearing_servers(self) -> list[ServerDescription]:
"""Return a list of all data-bearing servers.
Expand Down
10 changes: 10 additions & 0 deletions test/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -859,6 +859,16 @@ def max_message_size_bytes(self):
client_context = ClientContext()


def recreate_client_context():
if _IS_SYNC:
# sync tests don't need to recreate a client context
return
global client_context
teardown()
client_context = ClientContext()
setup()


class PyMongoTestCase(unittest.TestCase):
def assertEqualCommand(self, expected, actual, msg=None):
self.assertEqual(sanitize_cmd(expected), sanitize_cmd(actual), msg)
Expand Down
Loading
Loading