Skip to content
Merged
Show file tree
Hide file tree
Changes from 2 commits
Commits
Show all changes
26 commits
Select commit Hold shift + click to select a range
19f14a9
create AsyncPeriodicExecutor
sleepyStick Oct 4, 2024
59c0733
fix tests
sleepyStick Oct 4, 2024
3f5dda7
undo lock changes
sleepyStick Oct 4, 2024
65301cf
found some more lock changes to undo and remove atexit for async
sleepyStick Oct 7, 2024
8852bc9
hacky fix to PeriodicExecutor being defined twice and fix unwanted co…
sleepyStick Oct 7, 2024
5c9b1d9
add missing awaits
sleepyStick Oct 7, 2024
ce322f5
attempt to fix test
sleepyStick Oct 7, 2024
549645b
left out an await
sleepyStick Oct 7, 2024
c92f6af
add missing await and fix typing errors
sleepyStick Oct 8, 2024
c58448f
realizing condition needs to be reverted too
sleepyStick Oct 8, 2024
43660d9
reset client context after primary steps down
sleepyStick Oct 9, 2024
dbb51e2
ignore type error on sync
sleepyStick Oct 9, 2024
3c640e8
temp changing pytests's setup and teardown are run to see if this wil…
sleepyStick Oct 9, 2024
0fc60ba
only reset client context after primary_stepdown
sleepyStick Oct 9, 2024
f78ee91
only recreate client context in async version
sleepyStick Oct 9, 2024
cebbd6f
move periodic_executor.py up a lvl
sleepyStick Oct 9, 2024
adf0504
mutate client context to reset it
sleepyStick Oct 10, 2024
56b37c2
remove type ignore
sleepyStick Oct 10, 2024
938c8a7
make PeriodicExecutor sync only
sleepyStick Oct 10, 2024
99c3815
fix reset_client_context
sleepyStick Oct 10, 2024
7ece812
change pytest fixture scope
sleepyStick Oct 10, 2024
3924d80
update AsyncPeriodicExecutors docs
sleepyStick Oct 10, 2024
9631ed6
fix scope typo
sleepyStick Oct 10, 2024
0aa9075
fix task/thread name and delete commented out code
sleepyStick Oct 10, 2024
6597803
fix string typo in synchro and comment string
sleepyStick Oct 10, 2024
541b72d
Merge remote-tracking branch 'upstream/async-improvements' into PYTHO…
sleepyStick Oct 11, 2024
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion pymongo/asynchronous/mongo_client.py
Original file line number Diff line number Diff line change
Expand Up @@ -909,7 +909,7 @@ async def target() -> bool:
await AsyncMongoClient._process_periodic_tasks(client)
return True

executor = periodic_executor.PeriodicExecutor(
executor = periodic_executor.AsyncPeriodicExecutor(
interval=common.KILL_CURSOR_FREQUENCY,
min_interval=common.MIN_HEARTBEAT_INTERVAL,
target=target,
Expand Down
20 changes: 14 additions & 6 deletions pymongo/asynchronous/monitor.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@

from __future__ import annotations

import asyncio
import atexit
import logging
import time
Expand Down Expand Up @@ -76,7 +77,7 @@ async def target() -> bool:
await monitor._run() # type:ignore[attr-defined]
return True

executor = periodic_executor.PeriodicExecutor(
executor = periodic_executor.AsyncPeriodicExecutor(
interval=interval, min_interval=min_interval, target=target, name=name
)

Expand Down Expand Up @@ -112,9 +113,9 @@ async def close(self) -> None:
"""
self.gc_safe_close()

def join(self, timeout: Optional[int] = None) -> None:
async def join(self, timeout: Optional[int] = None) -> None:
"""Wait for the monitor to stop."""
self._executor.join(timeout)
await self._executor.join(timeout)

def request_check(self) -> None:
"""If the monitor is sleeping, wake it soon."""
Expand Down Expand Up @@ -521,14 +522,21 @@ def _shutdown_monitors() -> None:
monitor = None


def _shutdown_resources() -> None:
async def _shutdown_resources() -> None:
# _shutdown_monitors/_shutdown_executors may already be GC'd at shutdown.
shutdown = _shutdown_monitors
if shutdown: # type:ignore[truthy-function]
shutdown()
shutdown = _shutdown_executors
if shutdown: # type:ignore[truthy-function]
shutdown()
await shutdown()


def _run_shutdown_resources():
if _IS_SYNC:
_shutdown_resources()
else:
asyncio.run(_shutdown_resources())
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This won't work because there is already an event loop running, we need something like https://github.com/minrk/asyncio-atexit

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

yeah, something was funky HAHA thanks for showing me a fix!

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

would i be allowed to import this and use the package? or should i just take the code that we need and put in on our own code base? thoughts?

Copy link
Member

@ShaneHarvey ShaneHarvey Oct 4, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do we need this at all? What happens if we do nothing for async clients at exit?

Do our SDAM tasks prevent the loop from exiting?

We document that an AsyncMongoClient must always be closed() so I don't believe we need to do anything. If the app leaves the client open, that's a bug.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I took it out of async and it seems to be fine so far.



atexit.register(_shutdown_resources)
atexit.register(_run_shutdown_resources)
119 changes: 98 additions & 21 deletions pymongo/asynchronous/periodic_executor.py
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@
import weakref
from typing import Any, Optional

from pymongo.lock import _ALock, _create_lock
from pymongo.lock import _create_lock

_IS_SYNC = False

Expand Down Expand Up @@ -59,24 +59,11 @@ def __init__(
self._name = name
self._skip_sleep = False
self._thread_will_exit = False
self._lock = _ALock(_create_lock())
self._lock = _create_lock()

def __repr__(self) -> str:
return f"<{self.__class__.__name__}(name={self._name}) object at 0x{id(self):x}>"

def _run_async(self) -> None:
# The default asyncio loop implementation on Windows
# has issues with sharing sockets across loops (https://github.com/python/cpython/issues/122240)
# We explicitly use a different loop implementation here to prevent that issue
if sys.platform == "win32":
loop = asyncio.SelectorEventLoop()
try:
loop.run_until_complete(self._run()) # type: ignore[func-returns-value]
finally:
loop.close()
else:
asyncio.run(self._run()) # type: ignore[func-returns-value]

def open(self) -> None:
"""Start. Multiple calls have no effect.

Expand Down Expand Up @@ -104,10 +91,7 @@ def open(self) -> None:
pass

if not started:
if _IS_SYNC:
thread = threading.Thread(target=self._run, name=self._name)
else:
thread = threading.Thread(target=self._run_async, name=self._name)
thread = threading.Thread(target=self._run, name=self._name)
thread.daemon = True
self._thread = weakref.proxy(thread)
_register_executor(self)
Expand Down Expand Up @@ -179,6 +163,99 @@ async def _run(self) -> None:
self._event = False


class AsyncPeriodicExecutor:
def __init__(
self,
interval: float,
min_interval: float,
target: Any,
name: Optional[str] = None,
):
"""Run a target function periodically on a background thread.

If the target's return value is false, the executor stops.

:param interval: Seconds between calls to `target`.
:param min_interval: Minimum seconds between calls if `wake` is
called very often.
:param target: A function.
:param name: A name to give the underlying thread.
"""
# threading.Event and its internal condition variable are expensive
# in Python 2, see PYTHON-983. Use a boolean to know when to wake.
# The executor's design is constrained by several Python issues, see
# "periodic_executor.rst" in this repository.
self._event = False
self._interval = interval
self._min_interval = min_interval
self._target = target
self._stopped = False
self._task: Optional[asyncio.Task] = None
self._name = name
self._skip_sleep = False

def __repr__(self) -> str:
return f"<{self.__class__.__name__}(name={self._name}) object at 0x{id(self):x}>"

def open(self) -> None:
"""Start. Multiple calls have no effect."""
self._stopped = False
started = self._task and not self._task.done()

if not started:
self._task = asyncio.get_event_loop().create_task(self._run(), name=self._name)

def close(self, dummy: Any = None) -> None:
"""Stop. To restart, call open().

The dummy parameter allows an executor's close method to be a weakref
callback; see monitor.py.
"""
self._stopped = True

async def join(self, timeout: Optional[int] = None) -> None:
if self._task is not None:
try:
await asyncio.wait_for(self._task, timeout=timeout)
except asyncio.TimeoutError:
# Task timed out
pass
except asyncio.exceptions.CancelledError:
# Task was already finished, or not yet started.
pass

def wake(self) -> None:
"""Execute the target function soon."""
self._event = True

def update_interval(self, new_interval: int) -> None:
self._interval = new_interval

def skip_sleep(self) -> None:
self._skip_sleep = True

async def _run(self) -> None:
while not self._stopped:
try:
if not await self._target():
self._stopped = True
break
except BaseException:
self._stopped = True
raise

if self._skip_sleep:
self._skip_sleep = False
else:
deadline = time.monotonic() + self._interval
while not self._stopped and time.monotonic() < deadline:
await asyncio.sleep(self._min_interval)
if self._event:
break # Early wake.

self._event = False


# _EXECUTORS has a weakref to each running PeriodicExecutor. Once started,
# an executor is kept alive by a strong reference from its thread and perhaps
# from other objects. When the thread dies and all other referrers are freed,
Expand All @@ -197,7 +274,7 @@ def _on_executor_deleted(ref: weakref.ReferenceType[PeriodicExecutor]) -> None:
_EXECUTORS.remove(ref)


def _shutdown_executors() -> None:
async def _shutdown_executors() -> None:
if _EXECUTORS is None:
return

Expand All @@ -214,6 +291,6 @@ def _shutdown_executors() -> None:
for ref in executors:
executor = ref()
if executor:
executor.join(1)
await executor.join(1)

executor = None
5 changes: 3 additions & 2 deletions pymongo/asynchronous/settings.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
"""Represent MongoClient's configuration."""
from __future__ import annotations

import asyncio
import threading
import traceback
from typing import Any, Collection, Optional, Type, Union
Expand Down Expand Up @@ -67,7 +68,7 @@ def __init__(
self._pool_class: Type[Pool] = pool_class or pool.Pool
self._pool_options: PoolOptions = pool_options or PoolOptions()
self._monitor_class: Type[monitor.Monitor] = monitor_class or monitor.Monitor
self._condition_class: Type[threading.Condition] = condition_class or threading.Condition
self._condition_class: Type[asyncio.Condition] = condition_class or asyncio.Condition
self._local_threshold_ms = local_threshold_ms
self._server_selection_timeout = server_selection_timeout
self._server_selector = server_selector
Expand Down Expand Up @@ -106,7 +107,7 @@ def monitor_class(self) -> Type[monitor.Monitor]:
return self._monitor_class

@property
def condition_class(self) -> Type[threading.Condition]:
def condition_class(self) -> Type[asyncio.Condition]:
return self._condition_class

@property
Expand Down
21 changes: 13 additions & 8 deletions pymongo/asynchronous/topology.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@

from __future__ import annotations

import asyncio
import logging
import os
import queue
Expand Down Expand Up @@ -44,7 +45,6 @@
WriteError,
)
from pymongo.hello import Hello
from pymongo.lock import _ACondition, _ALock, _create_lock
from pymongo.logger import (
_SDAM_LOGGER,
_SERVER_SELECTION_LOGGER,
Expand Down Expand Up @@ -170,9 +170,8 @@ def __init__(self, topology_settings: TopologySettings):
self._seed_addresses = list(topology_description.server_descriptions())
self._opened = False
self._closed = False
_lock = _create_lock()
self._lock = _ALock(_lock)
self._condition = _ACondition(self._settings.condition_class(_lock))
self._lock = asyncio.Lock()
self._condition = self._settings.condition_class(self._lock) # type: ignore[arg-type]
self._servers: dict[_Address, Server] = {}
self._pid: Optional[int] = None
self._max_cluster_time: Optional[ClusterTime] = None
Expand All @@ -185,7 +184,7 @@ def __init__(self, topology_settings: TopologySettings):
async def target() -> bool:
return process_events_queue(weak)

executor = periodic_executor.PeriodicExecutor(
executor = periodic_executor.AsyncPeriodicExecutor(
interval=common.EVENTS_QUEUE_FREQUENCY,
min_interval=common.MIN_HEARTBEAT_INTERVAL,
target=target,
Expand Down Expand Up @@ -354,7 +353,10 @@ async def _select_servers_loop(
# change, or for a timeout. We won't miss any changes that
# came after our most recent apply_selector call, since we've
# held the lock until now.
await self._condition.wait(common.MIN_HEARTBEAT_INTERVAL)
try:
await asyncio.wait_for(self._condition.wait(), common.MIN_HEARTBEAT_INTERVAL)
except asyncio.TimeoutError:
pass
self._description.check_compatible()
now = time.monotonic()
server_descriptions = self._description.apply_selector(
Expand Down Expand Up @@ -654,7 +656,10 @@ async def request_check_all(self, wait_time: int = 5) -> None:
"""Wake all monitors, wait for at least one to check its server."""
async with self._lock:
self._request_check_all()
await self._condition.wait(wait_time)
try:
await asyncio.wait_for(self._condition.wait(), wait_time)
except TimeoutError:
pass

def data_bearing_servers(self) -> list[ServerDescription]:
"""Return a list of all data-bearing servers.
Expand Down Expand Up @@ -742,7 +747,7 @@ async def close(self) -> None:
if self._publish_server or self._publish_tp:
# Make sure the events executor thread is fully closed before publishing the remaining events
self.__events_executor.close()
self.__events_executor.join(1)
await self.__events_executor.join(1)
process_events_queue(weakref.ref(self._events)) # type: ignore[arg-type]

@property
Expand Down
10 changes: 9 additions & 1 deletion pymongo/synchronous/monitor.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@

from __future__ import annotations

import asyncio
import atexit
import logging
import time
Expand Down Expand Up @@ -531,4 +532,11 @@ def _shutdown_resources() -> None:
shutdown()


atexit.register(_shutdown_resources)
def _run_shutdown_resources():
if _IS_SYNC:
_shutdown_resources()
else:
asyncio.run(_shutdown_resources())


atexit.register(_run_shutdown_resources)
Loading
Loading