-
Notifications
You must be signed in to change notification settings - Fork 1.1k
PYTHON-5053 - AsyncMongoClient.close() should await all background tasks #2127
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Merged
Merged
Changes from 5 commits
Commits
Show all changes
8 commits
Select commit
Hold shift + click to select a range
68c4a6e
PYTHON-5053 - AsyncMongoClient.close() should await all background tasks
NoahStapp d14d8e8
Don't call join() inside close()
NoahStapp 6c6a32d
Store tasks to be awaited inside Topology
NoahStapp a0a85c5
Merge branch 'master' into PYTHON-5053
NoahStapp 861dbb5
Address review
NoahStapp 2ac4ea3
Address review
NoahStapp 24e96f0
return_exceptions=True for gather calls
NoahStapp a2eb4bf
Cleanup gathers
NoahStapp File filter
Filter by extension
Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
There are no files selected for viewing
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -16,6 +16,7 @@ | |
|
||
from __future__ import annotations | ||
|
||
import asyncio | ||
import logging | ||
import os | ||
import queue | ||
|
@@ -29,7 +30,7 @@ | |
|
||
from pymongo import _csot, common, helpers_shared, periodic_executor | ||
from pymongo.asynchronous.client_session import _ServerSession, _ServerSessionPool | ||
from pymongo.asynchronous.monitor import SrvMonitor | ||
from pymongo.asynchronous.monitor import MonitorBase, SrvMonitor | ||
from pymongo.asynchronous.pool import Pool | ||
from pymongo.asynchronous.server import Server | ||
from pymongo.errors import ( | ||
|
@@ -207,6 +208,9 @@ async def target() -> bool: | |
if self._settings.fqdn is not None and not self._settings.load_balanced: | ||
self._srv_monitor = SrvMonitor(self, self._settings) | ||
|
||
# Stores all monitor tasks that need to be joined on close or server selection | ||
self._monitor_tasks: list[MonitorBase] = [] | ||
|
||
async def open(self) -> None: | ||
"""Start monitoring, or restart after a fork. | ||
|
||
|
@@ -241,6 +245,8 @@ async def open(self) -> None: | |
# Close servers and clear the pools. | ||
for server in self._servers.values(): | ||
await server.close() | ||
if not _IS_SYNC: | ||
self._monitor_tasks.append(server._monitor) | ||
# Reset the session pool to avoid duplicate sessions in | ||
# the child process. | ||
self._session_pool.reset() | ||
|
@@ -288,10 +294,22 @@ async def select_servers( | |
selector, server_timeout, operation, operation_id, address | ||
) | ||
|
||
return [ | ||
servers = [ | ||
cast(Server, self.get_server_by_address(sd.address)) for sd in server_descriptions | ||
] | ||
|
||
if not _IS_SYNC and self._monitor_tasks: | ||
join_tasks = [] | ||
try: | ||
while self._monitor_tasks: | ||
join_tasks.append(self._monitor_tasks.pop()) | ||
except IndexError: | ||
pass | ||
join_tasks = [t.join() for t in join_tasks] # type: ignore[func-returns-value] | ||
await asyncio.gather(*join_tasks) | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Could you refactor this into a method that MongoClient.Close can call too? |
||
|
||
return servers | ||
|
||
async def _select_servers_loop( | ||
self, | ||
selector: Callable[[Selection], Selection], | ||
|
@@ -520,6 +538,8 @@ async def _process_change( | |
and self._description.topology_type not in SRV_POLLING_TOPOLOGIES | ||
): | ||
await self._srv_monitor.close() | ||
if not _IS_SYNC: | ||
self._monitor_tasks.append(self._srv_monitor) | ||
|
||
# Clear the pool from a failed heartbeat. | ||
if reset_pool: | ||
|
@@ -695,6 +715,8 @@ async def close(self) -> None: | |
old_td = self._description | ||
for server in self._servers.values(): | ||
await server.close() | ||
if not _IS_SYNC: | ||
self._monitor_tasks.append(server._monitor) | ||
|
||
# Mark all servers Unknown. | ||
self._description = self._description.reset() | ||
|
@@ -705,6 +727,8 @@ async def close(self) -> None: | |
# Stop SRV polling thread. | ||
if self._srv_monitor: | ||
await self._srv_monitor.close() | ||
if not _IS_SYNC: | ||
self._monitor_tasks.append(self._srv_monitor) | ||
|
||
self._opened = False | ||
self._closed = True | ||
|
@@ -944,6 +968,8 @@ async def _update_servers(self) -> None: | |
for address, server in list(self._servers.items()): | ||
if not self._description.has_server(address): | ||
await server.close() | ||
if not _IS_SYNC: | ||
self._monitor_tasks.append(server._monitor) | ||
self._servers.pop(address) | ||
|
||
def _create_pool_for_server(self, address: _Address) -> Pool: | ||
|
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Oops, something went wrong.
Add this suggestion to a batch that can be applied as a single commit.
This suggestion is invalid because no changes were made to the code.
Suggestions cannot be applied while the pull request is closed.
Suggestions cannot be applied while viewing a subset of changes.
Only one suggestion per line can be applied in a batch.
Add this suggestion to a batch that can be applied as a single commit.
Applying suggestions on deleted lines is not supported.
You must change the existing code in this line in order to create a valid suggestion.
Outdated suggestions cannot be applied.
This suggestion has been applied or marked resolved.
Suggestions cannot be applied from pending reviews.
Suggestions cannot be applied on multi-line comments.
Suggestions cannot be applied while the pull request is queued to merge.
Suggestion cannot be applied right now. Please check back later.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Worth putting a comment here to explain why this code is here. Also this should happen before selecting the server. Doing it after will increase the risk of returning stale information.
Uh oh!
There was an error while loading. Please reload this page.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The risk being the delay added by the cleanup between selecting the server and actually returning it? Makes sense.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yep that's it.