Skip to content

PYTHON-5053 - AsyncMongoClient.close() should await all background tasks #2127

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 8 commits into from
Feb 5, 2025
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions pymongo/asynchronous/mongo_client.py
Original file line number Diff line number Diff line change
Expand Up @@ -1564,6 +1564,8 @@ async def close(self) -> None:
if self._encrypter:
# TODO: PYTHON-1921 Encrypted MongoClients cannot be re-opened.
await self._encrypter.close()
if not _IS_SYNC:
await self._kill_cursors_executor.join()
self._closed = True

if not _IS_SYNC:
Expand Down
6 changes: 6 additions & 0 deletions pymongo/asynchronous/monitor.py
Original file line number Diff line number Diff line change
Expand Up @@ -111,6 +111,8 @@ async def close(self) -> None:
open() restarts the monitor after closing.
"""
self.gc_safe_close()
if not _IS_SYNC:
await self._executor.join()
Copy link
Member

@ShaneHarvey ShaneHarvey Feb 3, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Currently, it is not safe to call join() in close(). The problem is there are cases (at least one) where the Monitor task itself calls close(). That would attempt to join() itself which will hang forever (or maybe python detects that case and raises an error, either way it's a problem).

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

So we either need to remove self calls to close() or move the join() logic to another method.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Good catch. It makes more sense to me to have a separate Monitor.join() method that we call in the async API whenever we call Monitor.close() from a non-monitor task.


async def join(self, timeout: Optional[int] = None) -> None:
"""Wait for the monitor to stop."""
Expand Down Expand Up @@ -191,6 +193,8 @@ def gc_safe_close(self) -> None:

async def close(self) -> None:
self.gc_safe_close()
if not _IS_SYNC:
await self._executor.join()
await self._rtt_monitor.close()
# Increment the generation and maybe close the socket. If the executor
# thread has the socket checked out, it will be closed when checked in.
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think moving the join to happen after _reset_connection will result in faster close() in some cases.

Expand Down Expand Up @@ -460,6 +464,8 @@ async def close(self) -> None:
self.gc_safe_close()
# Increment the generation and maybe close the socket. If the executor
# thread has the socket checked out, it will be closed when checked in.
if not _IS_SYNC:
await self._executor.join()
await self._pool.reset()

async def add_sample(self, sample: float) -> None:
Expand Down
2 changes: 2 additions & 0 deletions pymongo/asynchronous/server.py
Original file line number Diff line number Diff line change
Expand Up @@ -115,6 +115,8 @@ async def close(self) -> None:
)

await self._monitor.close()
if not _IS_SYNC:
await self._monitor.join()
await self._pool.close()

def request_check(self) -> None:
Expand Down
2 changes: 2 additions & 0 deletions pymongo/asynchronous/topology.py
Original file line number Diff line number Diff line change
Expand Up @@ -705,6 +705,8 @@ async def close(self) -> None:
# Stop SRV polling thread.
if self._srv_monitor:
await self._srv_monitor.close()
if not _IS_SYNC:
await self._srv_monitor.join()

self._opened = False
self._closed = True
Expand Down
2 changes: 2 additions & 0 deletions pymongo/periodic_executor.py
Original file line number Diff line number Diff line change
Expand Up @@ -75,6 +75,8 @@ def close(self, dummy: Any = None) -> None:
callback; see monitor.py.
"""
self._stopped = True
if self._task is not None:
self._task.cancel()

async def join(self, timeout: Optional[int] = None) -> None:
if self._task is not None:
Expand Down
2 changes: 2 additions & 0 deletions pymongo/synchronous/mongo_client.py
Original file line number Diff line number Diff line change
Expand Up @@ -1558,6 +1558,8 @@ def close(self) -> None:
if self._encrypter:
# TODO: PYTHON-1921 Encrypted MongoClients cannot be re-opened.
self._encrypter.close()
if not _IS_SYNC:
self._kill_cursors_executor.join()
self._closed = True

if not _IS_SYNC:
Expand Down
6 changes: 6 additions & 0 deletions pymongo/synchronous/monitor.py
Original file line number Diff line number Diff line change
Expand Up @@ -111,6 +111,8 @@ def close(self) -> None:
open() restarts the monitor after closing.
"""
self.gc_safe_close()
if not _IS_SYNC:
self._executor.join()

def join(self, timeout: Optional[int] = None) -> None:
"""Wait for the monitor to stop."""
Expand Down Expand Up @@ -191,6 +193,8 @@ def gc_safe_close(self) -> None:

def close(self) -> None:
self.gc_safe_close()
if not _IS_SYNC:
self._executor.join()
self._rtt_monitor.close()
# Increment the generation and maybe close the socket. If the executor
# thread has the socket checked out, it will be closed when checked in.
Expand Down Expand Up @@ -460,6 +464,8 @@ def close(self) -> None:
self.gc_safe_close()
# Increment the generation and maybe close the socket. If the executor
# thread has the socket checked out, it will be closed when checked in.
if not _IS_SYNC:
self._executor.join()
self._pool.reset()

def add_sample(self, sample: float) -> None:
Expand Down
2 changes: 2 additions & 0 deletions pymongo/synchronous/server.py
Original file line number Diff line number Diff line change
Expand Up @@ -115,6 +115,8 @@ def close(self) -> None:
)

self._monitor.close()
if not _IS_SYNC:
self._monitor.join()
self._pool.close()

def request_check(self) -> None:
Expand Down
2 changes: 2 additions & 0 deletions pymongo/synchronous/topology.py
Original file line number Diff line number Diff line change
Expand Up @@ -703,6 +703,8 @@ def close(self) -> None:
# Stop SRV polling thread.
if self._srv_monitor:
self._srv_monitor.close()
if not _IS_SYNC:
self._srv_monitor.join()

self._opened = False
self._closed = True
Expand Down
Loading