-
Notifications
You must be signed in to change notification settings - Fork 1.1k
PYTHON-4579 Stop gossiping $clusterTime on SDAM connections #1925
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Changes from 5 commits
46e4660
d38d930
f44b31d
cc875fc
8e11997
bdb810c
28c598f
c42cceb
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -21,11 +21,11 @@ | |
import logging | ||
import time | ||
import weakref | ||
from typing import TYPE_CHECKING, Any, Mapping, Optional, cast | ||
from typing import TYPE_CHECKING, Any, Optional | ||
|
||
from pymongo import common, periodic_executor | ||
from pymongo._csot import MovingMinimum | ||
from pymongo.errors import NetworkTimeout, NotPrimaryError, OperationFailure, _OperationCancelled | ||
from pymongo.errors import NetworkTimeout, _OperationCancelled | ||
from pymongo.hello import Hello | ||
from pymongo.lock import _async_create_lock | ||
from pymongo.logger import _SDAM_LOGGER, _debug_log, _SDAMStatusMessage | ||
|
@@ -255,13 +255,9 @@ async def _check_server(self) -> ServerDescription: | |
self._conn_id = None | ||
start = time.monotonic() | ||
try: | ||
try: | ||
return await self._check_once() | ||
except (OperationFailure, NotPrimaryError) as exc: | ||
# Update max cluster time even when hello fails. | ||
details = cast(Mapping[str, Any], exc.details) | ||
await self._topology.receive_cluster_time(details.get("$clusterTime")) | ||
raise | ||
return await self._check_once() | ||
except asyncio.CancelledError: | ||
raise | ||
except ReferenceError: | ||
raise | ||
except Exception as error: | ||
|
@@ -358,7 +354,6 @@ async def _check_with_socket(self, conn: AsyncConnection) -> tuple[Hello, float] | |
|
||
Can raise ConnectionFailure or OperationFailure. | ||
""" | ||
cluster_time = self._topology.max_cluster_time() | ||
start = time.monotonic() | ||
if conn.more_to_come: | ||
# Read the next streaming hello (MongoDB 4.4+). | ||
|
@@ -368,13 +363,13 @@ async def _check_with_socket(self, conn: AsyncConnection) -> tuple[Hello, float] | |
): | ||
# Initiate streaming hello (MongoDB 4.4+). | ||
response = await conn._hello( | ||
cluster_time, | ||
None, | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Can we remove There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Good call, done. |
||
self._server_description.topology_version, | ||
self._settings.heartbeat_frequency, | ||
) | ||
else: | ||
# New connection handshake or polling hello (MongoDB <4.4). | ||
response = await conn._hello(cluster_time, None, None) | ||
response = await conn._hello(None, None, None) | ||
duration = _monotonic_duration(start) | ||
return response, duration | ||
|
||
|
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -503,7 +503,8 @@ async def _process_change( | |
|
||
self._description = new_td | ||
await self._update_servers() | ||
self._receive_cluster_time_no_lock(server_description.cluster_time) | ||
# TODO: Verify that app errors update the $clusterTime. | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Is this still TODO? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Tracked in https://jira.mongodb.org/browse/DRIVERS-3118. I'll remove this TODO. |
||
# self._receive_cluster_time_no_lock(server_description.cluster_time) | ||
|
||
if self._publish_tp and not suppress_event: | ||
assert self._events is not None | ||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Do we need this extra except? We only catch
Exception
instead ofBaseException
below, soCancelledError
should be thrown uncaught.There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Ah this was a merge conflict artifact. I'll remove.