Skip to content

Commit 14c8432

Browse files
committed
PYTHON-4579 Stop gossiping $clusterTime on SDAM connections
1 parent 9ba780c commit 14c8432

File tree

5 files changed

+23
-37
lines changed

5 files changed

+23
-37
lines changed

pymongo/asynchronous/monitor.py

Lines changed: 5 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -20,13 +20,13 @@
2020
import logging
2121
import time
2222
import weakref
23-
from typing import TYPE_CHECKING, Any, Mapping, Optional, cast
23+
from typing import TYPE_CHECKING, Any, Optional
2424

2525
from pymongo import common
2626
from pymongo._csot import MovingMinimum
2727
from pymongo.asynchronous import periodic_executor
2828
from pymongo.asynchronous.periodic_executor import _shutdown_executors
29-
from pymongo.errors import NetworkTimeout, NotPrimaryError, OperationFailure, _OperationCancelled
29+
from pymongo.errors import NetworkTimeout, _OperationCancelled
3030
from pymongo.hello import Hello
3131
from pymongo.lock import _create_lock
3232
from pymongo.logger import _SDAM_LOGGER, _debug_log, _SDAMStatusMessage
@@ -245,13 +245,7 @@ async def _check_server(self) -> ServerDescription:
245245
"""
246246
start = time.monotonic()
247247
try:
248-
try:
249-
return await self._check_once()
250-
except (OperationFailure, NotPrimaryError) as exc:
251-
# Update max cluster time even when hello fails.
252-
details = cast(Mapping[str, Any], exc.details)
253-
self._topology.receive_cluster_time(details.get("$clusterTime"))
254-
raise
248+
return await self._check_once()
255249
except ReferenceError:
256250
raise
257251
except Exception as error:
@@ -345,7 +339,6 @@ async def _check_with_socket(self, conn: AsyncConnection) -> tuple[Hello, float]
345339
346340
Can raise ConnectionFailure or OperationFailure.
347341
"""
348-
cluster_time = self._topology.max_cluster_time()
349342
start = time.monotonic()
350343
if conn.more_to_come:
351344
# Read the next streaming hello (MongoDB 4.4+).
@@ -355,13 +348,13 @@ async def _check_with_socket(self, conn: AsyncConnection) -> tuple[Hello, float]
355348
):
356349
# Initiate streaming hello (MongoDB 4.4+).
357350
response = await conn._hello(
358-
cluster_time,
351+
None,
359352
self._server_description.topology_version,
360353
self._settings.heartbeat_frequency,
361354
)
362355
else:
363356
# New connection handshake or polling hello (MongoDB <4.4).
364-
response = await conn._hello(cluster_time, None, None)
357+
response = await conn._hello(None, None, None)
365358
duration = _monotonic_duration(start)
366359
return response, duration
367360

pymongo/asynchronous/topology.py

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -489,7 +489,8 @@ async def _process_change(
489489

490490
self._description = new_td
491491
await self._update_servers()
492-
self._receive_cluster_time_no_lock(server_description.cluster_time)
492+
# TODO: Verify that app errors update the $clusterTime.
493+
# self._receive_cluster_time_no_lock(server_description.cluster_time)
493494

494495
if self._publish_tp and not suppress_event:
495496
assert self._events is not None

pymongo/synchronous/monitor.py

Lines changed: 5 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -20,11 +20,11 @@
2020
import logging
2121
import time
2222
import weakref
23-
from typing import TYPE_CHECKING, Any, Mapping, Optional, cast
23+
from typing import TYPE_CHECKING, Any, Optional
2424

2525
from pymongo import common
2626
from pymongo._csot import MovingMinimum
27-
from pymongo.errors import NetworkTimeout, NotPrimaryError, OperationFailure, _OperationCancelled
27+
from pymongo.errors import NetworkTimeout, _OperationCancelled
2828
from pymongo.hello import Hello
2929
from pymongo.lock import _create_lock
3030
from pymongo.logger import _SDAM_LOGGER, _debug_log, _SDAMStatusMessage
@@ -245,13 +245,7 @@ def _check_server(self) -> ServerDescription:
245245
"""
246246
start = time.monotonic()
247247
try:
248-
try:
249-
return self._check_once()
250-
except (OperationFailure, NotPrimaryError) as exc:
251-
# Update max cluster time even when hello fails.
252-
details = cast(Mapping[str, Any], exc.details)
253-
self._topology.receive_cluster_time(details.get("$clusterTime"))
254-
raise
248+
return self._check_once()
255249
except ReferenceError:
256250
raise
257251
except Exception as error:
@@ -345,7 +339,6 @@ def _check_with_socket(self, conn: Connection) -> tuple[Hello, float]:
345339
346340
Can raise ConnectionFailure or OperationFailure.
347341
"""
348-
cluster_time = self._topology.max_cluster_time()
349342
start = time.monotonic()
350343
if conn.more_to_come:
351344
# Read the next streaming hello (MongoDB 4.4+).
@@ -355,13 +348,13 @@ def _check_with_socket(self, conn: Connection) -> tuple[Hello, float]:
355348
):
356349
# Initiate streaming hello (MongoDB 4.4+).
357350
response = conn._hello(
358-
cluster_time,
351+
None,
359352
self._server_description.topology_version,
360353
self._settings.heartbeat_frequency,
361354
)
362355
else:
363356
# New connection handshake or polling hello (MongoDB <4.4).
364-
response = conn._hello(cluster_time, None, None)
357+
response = conn._hello(None, None, None)
365358
duration = _monotonic_duration(start)
366359
return response, duration
367360

pymongo/synchronous/topology.py

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -489,7 +489,8 @@ def _process_change(
489489

490490
self._description = new_td
491491
self._update_servers()
492-
self._receive_cluster_time_no_lock(server_description.cluster_time)
492+
# TODO: Verify that app errors update the $clusterTime.
493+
# self._receive_cluster_time_no_lock(server_description.cluster_time)
493494

494495
if self._publish_tp and not suppress_event:
495496
assert self._events is not None

test/test_discovery_and_monitoring.py

Lines changed: 9 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -244,7 +244,7 @@ class TestClusterTimeComparison(unittest.TestCase):
244244
def test_cluster_time_comparison(self):
245245
t = create_mock_topology("mongodb://host")
246246

247-
def send_cluster_time(time, inc, should_update):
247+
def send_cluster_time(time, inc):
248248
old = t.max_cluster_time()
249249
new = {"clusterTime": Timestamp(time, inc)}
250250
got_hello(
@@ -259,16 +259,14 @@ def send_cluster_time(time, inc, should_update):
259259
)
260260

261261
actual = t.max_cluster_time()
262-
if should_update:
263-
self.assertEqual(actual, new)
264-
else:
265-
self.assertEqual(actual, old)
266-
267-
send_cluster_time(0, 1, True)
268-
send_cluster_time(2, 2, True)
269-
send_cluster_time(2, 1, False)
270-
send_cluster_time(1, 3, False)
271-
send_cluster_time(2, 3, True)
262+
# We never update $clusterTime from monitoring connections.
263+
self.assertEqual(actual, old)
264+
265+
send_cluster_time(0, 1)
266+
send_cluster_time(2, 2)
267+
send_cluster_time(2, 1)
268+
send_cluster_time(1, 3)
269+
send_cluster_time(2, 3)
272270

273271

274272
class TestIgnoreStaleErrors(IntegrationTest):

0 commit comments

Comments
 (0)