Skip to content

Commit c212c28

Browse files
committed
PYTHON-2767 Support snapshot reads on secondaries (#656)
Add the MongoClient.start_session snapshot option. (cherry picked from commit 14160ae)
1 parent 8455b76 commit c212c28

12 files changed

+1193
-32
lines changed

pymongo/client_session.py

Lines changed: 53 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -125,14 +125,29 @@ class SessionOptions(object):
125125
"""Options for a new :class:`ClientSession`.
126126
127127
:Parameters:
128-
- `causal_consistency` (optional): If True (the default), read
129-
operations are causally ordered within the session.
128+
- `causal_consistency` (optional): If True, read operations are causally
129+
ordered within the session. Defaults to True when the ``snapshot``
130+
option is ``False``.
130131
- `default_transaction_options` (optional): The default
131132
TransactionOptions to use for transactions started on this session.
133+
- `snapshot` (optional): If True, then all reads performed using this
134+
session will read from the same snapshot. This option is incompatible
135+
with ``causal_consistency=True``. Defaults to ``False``.
136+
137+
.. versionchanged:: 3.12
138+
Added the ``snapshot`` parameter.
132139
"""
133140
def __init__(self,
134-
causal_consistency=True,
135-
default_transaction_options=None):
141+
causal_consistency=None,
142+
default_transaction_options=None,
143+
snapshot=False):
144+
if snapshot:
145+
if causal_consistency:
146+
raise ConfigurationError('snapshot reads do not support '
147+
'causal_consistency=True')
148+
causal_consistency = False
149+
elif causal_consistency is None:
150+
causal_consistency = True
136151
self._causal_consistency = causal_consistency
137152
if default_transaction_options is not None:
138153
if not isinstance(default_transaction_options, TransactionOptions):
@@ -141,6 +156,7 @@ def __init__(self,
141156
"pymongo.client_session.TransactionOptions, not: %r" %
142157
(default_transaction_options,))
143158
self._default_transaction_options = default_transaction_options
159+
self._snapshot = snapshot
144160

145161
@property
146162
def causal_consistency(self):
@@ -156,6 +172,14 @@ def default_transaction_options(self):
156172
"""
157173
return self._default_transaction_options
158174

175+
@property
176+
def snapshot(self):
177+
"""Whether snapshot reads are configured.
178+
179+
.. versionadded:: 3.12
180+
"""
181+
return self._snapshot
182+
159183

160184
class TransactionOptions(object):
161185
"""Options for :meth:`ClientSession.start_transaction`.
@@ -388,6 +412,7 @@ def __init__(self, client, server_session, options, authset, implicit):
388412
self._authset = authset
389413
self._cluster_time = None
390414
self._operation_time = None
415+
self._snapshot_time = None
391416
# Is this an implicitly created session?
392417
self._implicit = implicit
393418
self._transaction = _Transaction(None, client)
@@ -603,6 +628,10 @@ def start_transaction(self, read_concern=None, write_concern=None,
603628
"""
604629
self._check_ended()
605630

631+
if self.options.snapshot:
632+
raise InvalidOperation("Transactions are not supported in "
633+
"snapshot sessions")
634+
606635
if self.in_transaction:
607636
raise InvalidOperation("Transaction already in progress")
608637

@@ -781,6 +810,12 @@ def _process_response(self, reply):
781810
"""Process a response to a command that was run with this session."""
782811
self._advance_cluster_time(reply.get('$clusterTime'))
783812
self._advance_operation_time(reply.get('operationTime'))
813+
if self._options.snapshot and self._snapshot_time is None:
814+
if 'cursor' in reply:
815+
ct = reply['cursor'].get('atClusterTime')
816+
else:
817+
ct = reply.get('atClusterTime')
818+
self._snapshot_time = ct
784819
if self.in_transaction and self._transaction.sharded:
785820
recovery_token = reply.get('recoveryToken')
786821
if recovery_token:
@@ -854,15 +889,9 @@ def _apply_to(self, command, is_retryable, read_preference):
854889

855890
if self._transaction.opts.read_concern:
856891
rc = self._transaction.opts.read_concern.document
857-
else:
858-
rc = {}
859-
860-
if (self.options.causal_consistency
861-
and self.operation_time is not None):
862-
rc['afterClusterTime'] = self.operation_time
863-
864-
if rc:
865-
command['readConcern'] = rc
892+
if rc:
893+
command['readConcern'] = rc
894+
self._update_read_concern(command)
866895

867896
command['txnNumber'] = self._server_session.transaction_id
868897
command['autocommit'] = False
@@ -871,6 +900,17 @@ def _start_retryable_write(self):
871900
self._check_ended()
872901
self._server_session.inc_transaction_id()
873902

903+
def _update_read_concern(self, cmd):
904+
if (self.options.causal_consistency
905+
and self.operation_time is not None):
906+
cmd.setdefault('readConcern', {})[
907+
'afterClusterTime'] = self.operation_time
908+
if self.options.snapshot:
909+
rc = cmd.setdefault('readConcern', {})
910+
rc['level'] = 'snapshot'
911+
if self._snapshot_time is not None:
912+
rc['atClusterTime'] = self._snapshot_time
913+
874914

875915
class _ServerSession(object):
876916
def __init__(self, generation):

pymongo/message.py

Lines changed: 2 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -317,12 +317,8 @@ def as_command(self, sock_info):
317317
if session:
318318
session._apply_to(cmd, False, self.read_preference)
319319
# Explain does not support readConcern.
320-
if (not explain and session.options.causal_consistency
321-
and session.operation_time is not None
322-
and not session.in_transaction):
323-
cmd.setdefault(
324-
'readConcern', {})[
325-
'afterClusterTime'] = session.operation_time
320+
if not explain and not session.in_transaction:
321+
session._update_read_concern(cmd)
326322
sock_info.send_cluster_time(cmd, session, self.client)
327323
# Support auto encryption
328324
client = self.client

pymongo/mongo_client.py

Lines changed: 5 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -1878,8 +1878,9 @@ def __start_session(self, implicit, **kwargs):
18781878
self, server_session, opts, authset, implicit)
18791879

18801880
def start_session(self,
1881-
causal_consistency=True,
1882-
default_transaction_options=None):
1881+
causal_consistency=None,
1882+
default_transaction_options=None,
1883+
snapshot=False):
18831884
"""Start a logical session.
18841885
18851886
This method takes the same parameters as
@@ -1904,7 +1905,8 @@ def start_session(self,
19041905
return self.__start_session(
19051906
False,
19061907
causal_consistency=causal_consistency,
1907-
default_transaction_options=default_transaction_options)
1908+
default_transaction_options=default_transaction_options,
1909+
snapshot=snapshot)
19081910

19091911
def _get_server_session(self):
19101912
"""Internal: start or resume a _ServerSession."""

pymongo/network.py

Lines changed: 2 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -93,10 +93,8 @@ def command(sock_info, dbname, spec, slave_ok, is_mongos,
9393
if read_concern and not (session and session.in_transaction):
9494
if read_concern.level:
9595
spec['readConcern'] = read_concern.document
96-
if (session and session.options.causal_consistency
97-
and session.operation_time is not None):
98-
spec.setdefault(
99-
'readConcern', {})['afterClusterTime'] = session.operation_time
96+
if session:
97+
session._update_read_concern(spec)
10098
if collation is not None:
10199
spec['collation'] = collation
102100

Lines changed: 23 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,23 @@
1+
# Copyright 2021-present MongoDB, Inc.
2+
#
3+
# Licensed under the Apache License, Version 2.0 (the "License");
4+
# you may not use this file except in compliance with the License.
5+
# You may obtain a copy of the License at
6+
#
7+
# http://www.apache.org/licenses/LICENSE-2.0
8+
#
9+
# Unless required by applicable law or agreed to in writing, software
10+
# distributed under the License is distributed on an "AS IS" BASIS,
11+
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12+
# See the License for the specific language governing permissions and
13+
# limitations under the License.
14+
15+
import sys
16+
17+
sys.path[0:0] = [""]
18+
19+
from test import unittest
20+
from test.test_sessions_unified import *
21+
22+
if __name__ == '__main__':
23+
unittest.main()

test/sessions/dirty-session-errors.json renamed to test/sessions/legacy/dirty-session-errors.json

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -668,4 +668,4 @@
668668
}
669669
}
670670
]
671-
}
671+
}
Lines changed: 105 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,105 @@
1+
{
2+
"description": "snapshot-sessions-not-supported-server-error",
3+
"schemaVersion": "1.0",
4+
"runOnRequirements": [
5+
{
6+
"minServerVersion": "3.6",
7+
"maxServerVersion": "4.4.99",
8+
"topologies": [
9+
"replicaset, sharded-replicaset"
10+
]
11+
}
12+
],
13+
"createEntities": [
14+
{
15+
"client": {
16+
"id": "client0",
17+
"observeEvents": [
18+
"commandStartedEvent",
19+
"commandFailedEvent"
20+
]
21+
}
22+
},
23+
{
24+
"database": {
25+
"id": "database0",
26+
"client": "client0",
27+
"databaseName": "database0"
28+
}
29+
},
30+
{
31+
"collection": {
32+
"id": "collection0",
33+
"database": "database0",
34+
"collectionName": "collection0"
35+
}
36+
},
37+
{
38+
"session": {
39+
"id": "session0",
40+
"client": "client0",
41+
"sessionOptions": {
42+
"snapshot": true
43+
}
44+
}
45+
}
46+
],
47+
"initialData": [
48+
{
49+
"collectionName": "collection0",
50+
"databaseName": "database0",
51+
"documents": [
52+
{
53+
"_id": 1,
54+
"x": 11
55+
}
56+
]
57+
}
58+
],
59+
"tests": [
60+
{
61+
"description": "Server returns an error on find with snapshot",
62+
"operations": [
63+
{
64+
"name": "find",
65+
"object": "collection0",
66+
"arguments": {
67+
"session": "session0",
68+
"filter": {}
69+
},
70+
"expectError": {
71+
"isError": true,
72+
"isClientError": false
73+
}
74+
}
75+
],
76+
"expectEvents": [
77+
{
78+
"client": "client0",
79+
"events": [
80+
{
81+
"commandStartedEvent": {
82+
"command": {
83+
"find": "collection0",
84+
"readConcern": {
85+
"level": "snapshot",
86+
"atClusterTime": {
87+
"$$exists": false
88+
}
89+
}
90+
},
91+
"commandName": "find",
92+
"databaseName": "database0"
93+
}
94+
},
95+
{
96+
"commandFailedEvent": {
97+
"commandName": "find"
98+
}
99+
}
100+
]
101+
}
102+
]
103+
}
104+
]
105+
}

0 commit comments

Comments
 (0)