Skip to content

Commit b165339

Browse files
authored
PYTHON-3345 CSOT use connection handshake RTT for load balanced mode (#998)
1 parent b40f13b commit b165339

File tree

3 files changed

+20
-0
lines changed

3 files changed

+20
-0
lines changed

pymongo/mongo_client.py

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1336,6 +1336,11 @@ def is_retrying():
13361336
bulk.started_retryable_write = True
13371337

13381338
while True:
1339+
if is_retrying():
1340+
remaining = _csot.remaining()
1341+
if remaining is not None and remaining <= 0:
1342+
assert last_error is not None
1343+
raise last_error
13391344
try:
13401345
server = self._select_server(writable_server_selector, session)
13411346
supports_session = (
@@ -1394,6 +1399,11 @@ def _retryable_read(self, func, read_pref, session, address=None, retryable=True
13941399
multiple_retries = _csot.get_timeout() is not None
13951400

13961401
while True:
1402+
if retrying:
1403+
remaining = _csot.remaining()
1404+
if remaining is not None and remaining <= 0:
1405+
assert last_error is not None
1406+
raise last_error
13971407
try:
13981408
server = self._select_server(read_pref, session, address=address)
13991409
with self._socket_from_server(read_pref, server, session) as (sock_info, read_pref):

pymongo/pool.py

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -559,6 +559,7 @@ def __init__(self, sock, pool, address, id):
559559
self.pinned_cursor = False
560560
self.active = False
561561
self.last_timeout = self.opts.socket_timeout
562+
self.connect_rtt = 0.0
562563

563564
def set_socket_timeout(self, timeout):
564565
"""Cache last timeout to avoid duplicate calls to sock.settimeout."""
@@ -580,6 +581,8 @@ def apply_timeout(self, client, cmd, write_concern=None):
580581
return None
581582
# RTT validation.
582583
rtt = _csot.get_rtt()
584+
if rtt is None:
585+
rtt = self.connect_rtt
583586
max_time_ms = timeout - rtt
584587
if max_time_ms < 0:
585588
# CSOT: raise an error without running the command since we know it will time out.
@@ -655,7 +658,11 @@ def _hello(self, cluster_time, topology_version, heartbeat_frequency):
655658
else:
656659
auth_ctx = None
657660

661+
if performing_handshake:
662+
start = time.monotonic()
658663
doc = self.command("admin", cmd, publish_events=False, exhaust_allowed=awaitable)
664+
if performing_handshake:
665+
self.connect_rtt = time.monotonic() - start
659666
hello = Hello(doc, awaitable=awaitable)
660667
self.is_writable = hello.is_writable
661668
self.max_wire_version = hello.max_wire_version

test/test_csot.py

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -33,6 +33,9 @@
3333

3434

3535
class TestCSOT(IntegrationTest):
36+
RUN_ON_SERVERLESS = True
37+
RUN_ON_LOAD_BALANCER = True
38+
3639
def test_timeout_nested(self):
3740
coll = self.db.coll
3841
self.assertEqual(_csot.get_timeout(), None)

0 commit comments

Comments
 (0)