Skip to content

Commit 6e39ae0

Browse files
committed
PYTHON-2170 Add support for 4.4 hedged reads
1 parent 8747837 commit 6e39ae0

File tree

4 files changed

+151
-27
lines changed

4 files changed

+151
-27
lines changed

pymongo/message.py

Lines changed: 4 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -95,21 +95,18 @@ def _randint():
9595
def _maybe_add_read_preference(spec, read_preference):
9696
"""Add $readPreference to spec when appropriate."""
9797
mode = read_preference.mode
98-
tag_sets = read_preference.tag_sets
99-
max_staleness = read_preference.max_staleness
98+
document = read_preference.document
10099
# Only add $readPreference if it's something other than primary to avoid
101100
# problems with mongos versions that don't support read preferences. Also,
102101
# for maximum backwards compatibility, don't add $readPreference for
103102
# secondaryPreferred unless tags or maxStalenessSeconds are in use (setting
104103
# the slaveOkay bit has the same effect).
105104
if mode and (
106-
mode != ReadPreference.SECONDARY_PREFERRED.mode
107-
or tag_sets != [{}]
108-
or max_staleness != -1):
109-
105+
mode != ReadPreference.SECONDARY_PREFERRED.mode or
106+
len(document) > 1):
110107
if "$query" not in spec:
111108
spec = SON([("$query", spec)])
112-
spec["$readPreference"] = read_preference.document
109+
spec["$readPreference"] = document
113110
return spec
114111

115112

pymongo/read_preferences.py

Lines changed: 76 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -81,17 +81,30 @@ def _validate_max_staleness(max_staleness):
8181
return max_staleness
8282

8383

84+
def _validate_hedge(hedge):
85+
"""Validate hedge."""
86+
if hedge is None:
87+
return None
88+
89+
if not isinstance(hedge, dict):
90+
raise TypeError("hedge must be a dictionary, not %r" % (hedge,))
91+
92+
return hedge
93+
94+
8495
class _ServerMode(object):
8596
"""Base class for all read preferences.
8697
"""
8798

88-
__slots__ = ("__mongos_mode", "__mode", "__tag_sets", "__max_staleness")
99+
__slots__ = ("__mongos_mode", "__mode", "__tag_sets", "__max_staleness",
100+
"__hedge")
89101

90-
def __init__(self, mode, tag_sets=None, max_staleness=-1):
102+
def __init__(self, mode, tag_sets=None, max_staleness=-1, hedge=None):
91103
self.__mongos_mode = _MONGOS_MODES[mode]
92104
self.__mode = mode
93105
self.__tag_sets = _validate_tag_sets(tag_sets)
94106
self.__max_staleness = _validate_max_staleness(max_staleness)
107+
self.__hedge = _validate_hedge(hedge)
95108

96109
@property
97110
def name(self):
@@ -114,6 +127,8 @@ def document(self):
114127
doc['tags'] = self.__tag_sets
115128
if self.__max_staleness != -1:
116129
doc['maxStalenessSeconds'] = self.__max_staleness
130+
if self.__hedge not in (None, {}):
131+
doc['hedge'] = self.__hedge
117132
return doc
118133

119134
@property
@@ -144,6 +159,30 @@ def max_staleness(self):
144159
no longer be selected for operations, or -1 for no maximum."""
145160
return self.__max_staleness
146161

162+
@property
163+
def hedge(self):
164+
"""The read preference ``hedge`` parameter.
165+
166+
A dictionary that configures how the server will perform hedged reads.
167+
It consists of the following keys:
168+
169+
- ``enabled``: Enables or disables hedged reads in sharded clusters.
170+
171+
Hedged reads are automatically enabled in MongoDB 4.4+ when using a
172+
``nearest`` read preference. To explicitly enable hedged reads, set
173+
the ``enabled`` key to ``true``::
174+
175+
>>> Nearest(hedge={'enabled': True})
176+
177+
To explicitly disable hedged reads, set the ``enabled`` key to
178+
``False``::
179+
180+
>>> Nearest(hedge={'enabled': False})
181+
182+
.. versionadded:: 3.11
183+
"""
184+
return self.__hedge
185+
147186
@property
148187
def min_wire_version(self):
149188
"""The wire protocol version the server must support.
@@ -158,14 +197,15 @@ def min_wire_version(self):
158197
return 0 if self.__max_staleness == -1 else 5
159198

160199
def __repr__(self):
161-
return "%s(tag_sets=%r, max_staleness=%r)" % (
162-
self.name, self.__tag_sets, self.__max_staleness)
200+
return "%s(tag_sets=%r, max_staleness=%r, hedge=%r)" % (
201+
self.name, self.__tag_sets, self.__max_staleness, self.__hedge)
163202

164203
def __eq__(self, other):
165204
if isinstance(other, _ServerMode):
166205
return (self.mode == other.mode and
167206
self.tag_sets == other.tag_sets and
168-
self.max_staleness == other.max_staleness)
207+
self.max_staleness == other.max_staleness and
208+
self.hedge == other.hedge)
169209
return NotImplemented
170210

171211
def __ne__(self, other):
@@ -178,14 +218,16 @@ def __getstate__(self):
178218
"""
179219
return {'mode': self.__mode,
180220
'tag_sets': self.__tag_sets,
181-
'max_staleness': self.__max_staleness}
221+
'max_staleness': self.__max_staleness,
222+
'hedge': self.__hedge}
182223

183224
def __setstate__(self, value):
184225
"""Restore from pickling."""
185226
self.__mode = value['mode']
186227
self.__mongos_mode = _MONGOS_MODES[self.__mode]
187228
self.__tag_sets = _validate_tag_sets(value['tag_sets'])
188229
self.__max_staleness = _validate_max_staleness(value['max_staleness'])
230+
self.__hedge = _validate_hedge(value['hedge'])
189231

190232

191233
class Primary(_ServerMode):
@@ -234,14 +276,17 @@ class PrimaryPreferred(_ServerMode):
234276
replication before it will no longer be selected for operations.
235277
Default -1, meaning no maximum. If it is set, it must be at least
236278
90 seconds.
279+
- `hedge`: The :attr:`~hedge` to use if the primary is not available.
280+
281+
.. versionchanged:: 3.11
282+
Added ``hedge`` parameter.
237283
"""
238284

239285
__slots__ = ()
240286

241-
def __init__(self, tag_sets=None, max_staleness=-1):
242-
super(PrimaryPreferred, self).__init__(_PRIMARY_PREFERRED,
243-
tag_sets,
244-
max_staleness)
287+
def __init__(self, tag_sets=None, max_staleness=-1, hedge=None):
288+
super(PrimaryPreferred, self).__init__(
289+
_PRIMARY_PREFERRED, tag_sets, max_staleness, hedge)
245290

246291
def __call__(self, selection):
247292
"""Apply this read preference to Selection."""
@@ -271,12 +316,17 @@ class Secondary(_ServerMode):
271316
replication before it will no longer be selected for operations.
272317
Default -1, meaning no maximum. If it is set, it must be at least
273318
90 seconds.
319+
- `hedge`: The :attr:`~hedge` for this read preference.
320+
321+
.. versionchanged:: 3.11
322+
Added ``hedge`` parameter.
274323
"""
275324

276325
__slots__ = ()
277326

278-
def __init__(self, tag_sets=None, max_staleness=-1):
279-
super(Secondary, self).__init__(_SECONDARY, tag_sets, max_staleness)
327+
def __init__(self, tag_sets=None, max_staleness=-1, hedge=None):
328+
super(Secondary, self).__init__(
329+
_SECONDARY, tag_sets, max_staleness, hedge)
280330

281331
def __call__(self, selection):
282332
"""Apply this read preference to Selection."""
@@ -303,14 +353,17 @@ class SecondaryPreferred(_ServerMode):
303353
replication before it will no longer be selected for operations.
304354
Default -1, meaning no maximum. If it is set, it must be at least
305355
90 seconds.
356+
- `hedge`: The :attr:`~hedge` for this read preference.
357+
358+
.. versionchanged:: 3.11
359+
Added ``hedge`` parameter.
306360
"""
307361

308362
__slots__ = ()
309363

310-
def __init__(self, tag_sets=None, max_staleness=-1):
311-
super(SecondaryPreferred, self).__init__(_SECONDARY_PREFERRED,
312-
tag_sets,
313-
max_staleness)
364+
def __init__(self, tag_sets=None, max_staleness=-1, hedge=None):
365+
super(SecondaryPreferred, self).__init__(
366+
_SECONDARY_PREFERRED, tag_sets, max_staleness, hedge)
314367

315368
def __call__(self, selection):
316369
"""Apply this read preference to Selection."""
@@ -342,12 +395,17 @@ class Nearest(_ServerMode):
342395
replication before it will no longer be selected for operations.
343396
Default -1, meaning no maximum. If it is set, it must be at least
344397
90 seconds.
398+
- `hedge`: The :attr:`~hedge` for this read preference.
399+
400+
.. versionchanged:: 3.11
401+
Added ``hedge`` parameter.
345402
"""
346403

347404
__slots__ = ()
348405

349-
def __init__(self, tag_sets=None, max_staleness=-1):
350-
super(Nearest, self).__init__(_NEAREST, tag_sets, max_staleness)
406+
def __init__(self, tag_sets=None, max_staleness=-1, hedge=None):
407+
super(Nearest, self).__init__(
408+
_NEAREST, tag_sets, max_staleness, hedge)
351409

352410
def __call__(self, selection):
353411
"""Apply this read preference to Selection."""

test/test_read_preferences.py

Lines changed: 68 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -46,6 +46,7 @@
4646
from test.utils import (connected,
4747
ignore_deprecations,
4848
one,
49+
OvertCommandListener,
4950
rs_client,
5051
single_client,
5152
wait_until)
@@ -567,6 +568,73 @@ def test_read_preference_document(self):
567568
with self.assertRaises(ValueError):
568569
Nearest(max_staleness=-2)
569570

571+
def test_read_preference_document_hedge(self):
572+
cases = {
573+
'primaryPreferred': PrimaryPreferred,
574+
'secondary': Secondary,
575+
'secondaryPreferred': SecondaryPreferred,
576+
'nearest': Nearest,
577+
}
578+
for mode, cls in cases.items():
579+
with self.assertRaises(TypeError):
580+
cls(hedge=[])
581+
582+
pref = cls(hedge={})
583+
self.assertEqual(pref.document, {'mode': mode})
584+
out = _maybe_add_read_preference({}, pref)
585+
if cls == SecondaryPreferred:
586+
# SecondaryPreferred without hedge doesn't add $readPreference.
587+
self.assertEqual(out, {})
588+
else:
589+
self.assertEqual(
590+
out,
591+
SON([("$query", {}), ("$readPreference", pref.document)]))
592+
593+
hedge = {'enabled': True}
594+
pref = cls(hedge=hedge)
595+
self.assertEqual(pref.document, {'mode': mode, 'hedge': hedge})
596+
out = _maybe_add_read_preference({}, pref)
597+
self.assertEqual(
598+
out, SON([("$query", {}), ("$readPreference", pref.document)]))
599+
600+
hedge = {'enabled': False}
601+
pref = cls(hedge=hedge)
602+
self.assertEqual(pref.document, {'mode': mode, 'hedge': hedge})
603+
out = _maybe_add_read_preference({}, pref)
604+
self.assertEqual(
605+
out, SON([("$query", {}), ("$readPreference", pref.document)]))
606+
607+
hedge = {'enabled': False, 'extra': 'option'}
608+
pref = cls(hedge=hedge)
609+
self.assertEqual(pref.document, {'mode': mode, 'hedge': hedge})
610+
out = _maybe_add_read_preference({}, pref)
611+
self.assertEqual(
612+
out, SON([("$query", {}), ("$readPreference", pref.document)]))
613+
614+
# Require OP_MSG so that $readPreference is visible in the command event.
615+
@client_context.require_version_min(3, 6)
616+
def test_send_hedge(self):
617+
cases = {
618+
'primaryPreferred': PrimaryPreferred,
619+
'secondary': Secondary,
620+
'secondaryPreferred': SecondaryPreferred,
621+
'nearest': Nearest,
622+
}
623+
listener = OvertCommandListener()
624+
client = rs_client(event_listeners=[listener])
625+
self.addCleanup(client.close)
626+
client.admin.command('ping')
627+
for mode, cls in cases.items():
628+
pref = cls(hedge={'enabled': True})
629+
coll = client.test.get_collection('test', read_preference=pref)
630+
listener.reset()
631+
coll.find_one()
632+
started = listener.results['started']
633+
self.assertEqual(len(started), 1, started)
634+
cmd = started[0].command
635+
self.assertIn('$readPreference', cmd)
636+
self.assertEqual(cmd['$readPreference'], pref.document)
637+
570638
def test_maybe_add_read_preference(self):
571639

572640
# Primary doesn't add $readPreference

test/test_topology.py

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -788,12 +788,13 @@ def test_no_secondary(self):
788788

789789
self.assertMessage(
790790
'No replica set members match selector'
791-
' "Secondary(tag_sets=None, max_staleness=-1)"',
791+
' "Secondary(tag_sets=None, max_staleness=-1, hedge=None)"',
792792
t, ReadPreference.SECONDARY)
793793

794794
self.assertMessage(
795795
"No replica set members match selector"
796-
" \"Secondary(tag_sets=[{'dc': 'ny'}], max_staleness=-1)\"",
796+
" \"Secondary(tag_sets=[{'dc': 'ny'}], max_staleness=-1, "
797+
"hedge=None)\"",
797798
t, Secondary(tag_sets=[{'dc': 'ny'}]))
798799

799800
def test_bad_replica_set_name(self):

0 commit comments

Comments
 (0)