Skip to content

Commit 1294de8

Browse files

File tree

47 files changed

+2381
-118
lines changed

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

47 files changed

+2381
-118
lines changed

driver-core/src/main/com/mongodb/ClientSessionOptions.java

Lines changed: 52 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -21,6 +21,8 @@
2121
import com.mongodb.lang.Nullable;
2222
import com.mongodb.session.ClientSession;
2323

24+
import java.util.Objects;
25+
2426
import static com.mongodb.assertions.Assertions.notNull;
2527

2628
/**
@@ -35,6 +37,7 @@
3537
public final class ClientSessionOptions {
3638

3739
private final Boolean causallyConsistent;
40+
private final Boolean snapshot;
3841
private final TransactionOptions defaultTransactionOptions;
3942

4043
/**
@@ -49,6 +52,20 @@ public Boolean isCausallyConsistent() {
4952
return causallyConsistent;
5053
}
5154

55+
/**
56+
* Whether read operations using this session should all share the same snapshot.
57+
*
58+
* @return whether read operations using this session should all share the same snapshot. A null value indicates to use the global
59+
* default, which is false.
60+
* @since 4.3
61+
* @mongodb.server.release 5.0
62+
* @mongodb.driver.manual reference/read-concern-snapshot/#read-concern-and-atclustertime Snapshot reads
63+
*/
64+
@Nullable
65+
public Boolean isSnapshot() {
66+
return snapshot;
67+
}
68+
5269
/**
5370
* Gets the default transaction options for the session.
5471
*
@@ -71,11 +88,14 @@ public boolean equals(final Object o) {
7188

7289
ClientSessionOptions that = (ClientSessionOptions) o;
7390

74-
if (causallyConsistent != null ? !causallyConsistent.equals(that.causallyConsistent) : that.causallyConsistent != null) {
91+
if (!Objects.equals(causallyConsistent, that.causallyConsistent)) {
92+
return false;
93+
}
94+
95+
if (!Objects.equals(snapshot, that.snapshot)) {
7596
return false;
7697
}
77-
if (defaultTransactionOptions != null ? !defaultTransactionOptions.equals(that.defaultTransactionOptions)
78-
: that.defaultTransactionOptions != null) {
98+
if (!Objects.equals(defaultTransactionOptions, that.defaultTransactionOptions)) {
7999
return false;
80100
}
81101

@@ -85,6 +105,7 @@ public boolean equals(final Object o) {
85105
@Override
86106
public int hashCode() {
87107
int result = causallyConsistent != null ? causallyConsistent.hashCode() : 0;
108+
result = 31 * result + (snapshot != null ? snapshot.hashCode() : 0);
88109
result = 31 * result + (defaultTransactionOptions != null ? defaultTransactionOptions.hashCode() : 0);
89110
return result;
90111
}
@@ -93,6 +114,7 @@ public int hashCode() {
93114
public String toString() {
94115
return "ClientSessionOptions{"
95116
+ "causallyConsistent=" + causallyConsistent
117+
+ "snapshot=" + snapshot
96118
+ ", defaultTransactionOptions=" + defaultTransactionOptions
97119
+ '}';
98120
}
@@ -117,6 +139,7 @@ public static Builder builder(final ClientSessionOptions options) {
117139
notNull("options", options);
118140
Builder builder = new Builder();
119141
builder.causallyConsistent = options.isCausallyConsistent();
142+
builder.snapshot = options.isSnapshot();
120143
builder.defaultTransactionOptions = options.getDefaultTransactionOptions();
121144
return builder;
122145
}
@@ -127,6 +150,7 @@ public static Builder builder(final ClientSessionOptions options) {
127150
@NotThreadSafe
128151
public static final class Builder {
129152
private Boolean causallyConsistent;
153+
private Boolean snapshot;
130154
private TransactionOptions defaultTransactionOptions = TransactionOptions.builder().build();
131155

132156
/**
@@ -141,6 +165,24 @@ public Builder causallyConsistent(final boolean causallyConsistent) {
141165
return this;
142166
}
143167

168+
/**
169+
* Sets whether read operations using the session should share the same snapshot.
170+
*
171+
* <p>
172+
* The default value is unset, in which case the driver will use the global default value, which is currently false.
173+
* </p>
174+
*
175+
* @param snapshot true for snapshot reads, false otherwise
176+
* @return this
177+
* @since 4.3
178+
* @mongodb.server.release 5.0
179+
* @mongodb.driver.manual reference/read-concern-snapshot/#read-concern-and-atclustertime Snapshot reads
180+
*/
181+
public Builder snapshot(final boolean snapshot) {
182+
this.snapshot = snapshot;
183+
return this;
184+
}
185+
144186
/**
145187
* Sets whether operations using the session should causally consistent with each other.
146188
*
@@ -168,7 +210,13 @@ private Builder() {
168210
}
169211

170212
private ClientSessionOptions(final Builder builder) {
171-
this.causallyConsistent = builder.causallyConsistent;
213+
if (builder.causallyConsistent != null && builder.causallyConsistent && builder.snapshot != null && builder.snapshot) {
214+
throw new IllegalArgumentException("A session can not be both a snapshot and causally consistent");
215+
}
216+
this.causallyConsistent = builder.causallyConsistent != null || builder.snapshot == null
217+
? builder.causallyConsistent
218+
: Boolean.valueOf(!builder.snapshot);
219+
this.snapshot = builder.snapshot;
172220
this.defaultTransactionOptions = builder.defaultTransactionOptions;
173221
}
174222
}

driver-core/src/main/com/mongodb/internal/async/client/ClientSessionBinding.java

Lines changed: 20 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -30,6 +30,7 @@
3030
import com.mongodb.internal.session.ClientSessionContext;
3131
import com.mongodb.internal.session.SessionContext;
3232
import com.mongodb.lang.Nullable;
33+
import org.bson.BsonTimestamp;
3334

3435
import static com.mongodb.assertions.Assertions.notNull;
3536
import static com.mongodb.connection.ClusterType.LOAD_BALANCED;
@@ -217,6 +218,23 @@ public boolean notifyMessageSent() {
217218
return clientSession.notifyMessageSent();
218219
}
219220

221+
@Override
222+
public boolean isSnapshot() {
223+
Boolean snapshot = clientSession.getOptions().isSnapshot();
224+
return snapshot != null && snapshot;
225+
}
226+
227+
@Override
228+
public void setSnapshotTimestamp(final BsonTimestamp snapshotTimestamp) {
229+
clientSession.setSnapshotTimestamp(snapshotTimestamp);
230+
}
231+
232+
@Override
233+
@Nullable
234+
public BsonTimestamp getSnapshotTimestamp() {
235+
return clientSession.getSnapshotTimestamp();
236+
}
237+
220238
@Override
221239
public boolean hasActiveTransaction() {
222240
return clientSession.hasActiveTransaction();
@@ -226,6 +244,8 @@ public boolean hasActiveTransaction() {
226244
public ReadConcern getReadConcern() {
227245
if (clientSession.hasActiveTransaction()) {
228246
return clientSession.getTransactionOptions().getReadConcern();
247+
} else if (isSnapshot()) {
248+
return ReadConcern.SNAPSHOT;
229249
} else {
230250
return wrapped.getSessionContext().getReadConcern();
231251
}

driver-core/src/main/com/mongodb/internal/connection/ClusterClockAdvancingSessionContext.java

Lines changed: 17 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,7 @@
1818

1919
import com.mongodb.ReadConcern;
2020
import com.mongodb.internal.session.SessionContext;
21+
import com.mongodb.lang.Nullable;
2122
import org.bson.BsonDocument;
2223
import org.bson.BsonTimestamp;
2324

@@ -87,6 +88,22 @@ public void advanceClusterTime(final BsonDocument clusterTime) {
8788
clusterClock.advance(clusterTime);
8889
}
8990

91+
@Override
92+
public boolean isSnapshot() {
93+
return wrapped.isSnapshot();
94+
}
95+
96+
@Override
97+
public void setSnapshotTimestamp(final BsonTimestamp snapshotTimestamp) {
98+
wrapped.setSnapshotTimestamp(snapshotTimestamp);
99+
}
100+
101+
@Override
102+
@Nullable
103+
public BsonTimestamp getSnapshotTimestamp() {
104+
return wrapped.getSnapshotTimestamp();
105+
}
106+
90107
@Override
91108
public boolean hasActiveTransaction() {
92109
return wrapped.hasActiveTransaction();

driver-core/src/main/com/mongodb/internal/connection/CommandMessage.java

Lines changed: 6 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -44,6 +44,7 @@
4444

4545
import static com.mongodb.ReadPreference.primary;
4646
import static com.mongodb.ReadPreference.primaryPreferred;
47+
import static com.mongodb.assertions.Assertions.assertFalse;
4748
import static com.mongodb.assertions.Assertions.isTrue;
4849
import static com.mongodb.connection.ClusterConnectionMode.MULTIPLE;
4950
import static com.mongodb.connection.ClusterConnectionMode.SINGLE;
@@ -278,6 +279,8 @@ private List<BsonElement> getExtraElements(final SessionContext sessionContext)
278279
extraElements.add(new BsonElement("lsid", sessionContext.getSessionId()));
279280
}
280281
boolean firstMessageInTransaction = sessionContext.notifyMessageSent();
282+
283+
assertFalse(sessionContext.hasActiveTransaction() && sessionContext.isSnapshot());
281284
if (sessionContext.hasActiveTransaction()) {
282285
checkServerVersionForTransactionSupport();
283286
extraElements.add(new BsonElement("txnNumber", new BsonInt64(sessionContext.getTransactionNumber())));
@@ -286,6 +289,8 @@ private List<BsonElement> getExtraElements(final SessionContext sessionContext)
286289
addReadConcernDocument(extraElements, sessionContext);
287290
}
288291
extraElements.add(new BsonElement("autocommit", BsonBoolean.FALSE));
292+
} else if (sessionContext.isSnapshot()) {
293+
addReadConcernDocument(extraElements, sessionContext);
289294
}
290295

291296
if (serverApi != null) {
@@ -322,7 +327,7 @@ private void checkServerVersionForTransactionSupport() {
322327

323328

324329
private void addReadConcernDocument(final List<BsonElement> extraElements, final SessionContext sessionContext) {
325-
BsonDocument readConcernDocument = getReadConcernDocument(sessionContext);
330+
BsonDocument readConcernDocument = getReadConcernDocument(sessionContext, getSettings().getMaxWireVersion());
326331
if (!readConcernDocument.isEmpty()) {
327332
extraElements.add(new BsonElement("readConcern", readConcernDocument));
328333
}

driver-core/src/main/com/mongodb/internal/connection/InternalStreamConnection.java

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -72,6 +72,7 @@
7272
import static com.mongodb.internal.connection.ProtocolHelper.getMessageSettings;
7373
import static com.mongodb.internal.connection.ProtocolHelper.getOperationTime;
7474
import static com.mongodb.internal.connection.ProtocolHelper.getRecoveryToken;
75+
import static com.mongodb.internal.connection.ProtocolHelper.getSnapshotTimestamp;
7576
import static com.mongodb.internal.connection.ProtocolHelper.isCommandOk;
7677
import static java.lang.String.format;
7778
import static java.util.Arrays.asList;
@@ -642,6 +643,7 @@ private ServerAddress getServerAddress() {
642643
private void updateSessionContext(final SessionContext sessionContext, final ResponseBuffers responseBuffers) {
643644
sessionContext.advanceOperationTime(getOperationTime(responseBuffers));
644645
sessionContext.advanceClusterTime(getClusterTime(responseBuffers));
646+
sessionContext.setSnapshotTimestamp(getSnapshotTimestamp(responseBuffers));
645647
if (sessionContext.hasActiveTransaction()) {
646648
BsonDocument recoveryToken = getRecoveryToken(responseBuffers);
647649
if (recoveryToken != null) {

driver-core/src/main/com/mongodb/internal/connection/NoOpSessionContext.java

Lines changed: 16 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,7 @@
1818

1919
import com.mongodb.ReadConcern;
2020
import com.mongodb.internal.session.SessionContext;
21+
import com.mongodb.lang.Nullable;
2122
import org.bson.BsonDocument;
2223
import org.bson.BsonTimestamp;
2324

@@ -86,6 +87,21 @@ public BsonDocument getClusterTime() {
8687
public void advanceClusterTime(final BsonDocument clusterTime) {
8788
}
8889

90+
@Override
91+
public boolean isSnapshot() {
92+
return false;
93+
}
94+
95+
@Override
96+
public void setSnapshotTimestamp(final BsonTimestamp snapshotTimestamp) {
97+
}
98+
99+
@Override
100+
@Nullable
101+
public BsonTimestamp getSnapshotTimestamp() {
102+
return null;
103+
}
104+
89105
@Override
90106
public boolean hasActiveTransaction() {
91107
return false;

driver-core/src/main/com/mongodb/internal/connection/ProtocolHelper.java

Lines changed: 51 additions & 22 deletions
Original file line numberDiff line numberDiff line change
@@ -31,6 +31,7 @@
3131
import com.mongodb.event.CommandListener;
3232
import com.mongodb.event.CommandStartedEvent;
3333
import com.mongodb.event.CommandSucceededEvent;
34+
import com.mongodb.lang.Nullable;
3435
import org.bson.BsonBinaryReader;
3536
import org.bson.BsonDocument;
3637
import org.bson.BsonInt32;
@@ -89,40 +90,49 @@ static MongoException createSpecialWriteConcernException(final ResponseBuffers r
8990
}
9091

9192
static BsonTimestamp getOperationTime(final ResponseBuffers responseBuffers) {
92-
try {
93-
BsonValue operationTime = getField(createBsonReader(responseBuffers), "operationTime");
94-
if (operationTime == null) {
95-
return null;
96-
}
97-
return operationTime.asTimestamp();
98-
} finally {
99-
responseBuffers.reset();
100-
}
93+
return getFieldValueAsTimestamp(responseBuffers, "operationTime");
10194
}
10295

10396
static BsonDocument getClusterTime(final ResponseBuffers responseBuffers) {
104-
return getFieldDocument(responseBuffers, "$clusterTime");
97+
return getFieldValueAsDocument(responseBuffers, "$clusterTime");
10598
}
10699

107-
static BsonDocument getClusterTime(final BsonDocument response) {
108-
BsonValue clusterTime = response.get("$clusterTime");
109-
if (clusterTime == null) {
110-
return null;
100+
@Nullable
101+
static BsonTimestamp getSnapshotTimestamp(final ResponseBuffers responseBuffers) {
102+
BsonValue atClusterTimeValue = getNestedFieldValue(responseBuffers, "cursor", "atClusterTime");
103+
if (atClusterTimeValue == null) {
104+
atClusterTimeValue = getFieldValue(responseBuffers, "atClusterTime");
105+
}
106+
if (atClusterTimeValue != null && atClusterTimeValue.isTimestamp()) {
107+
return atClusterTimeValue.asTimestamp();
111108
}
112-
return clusterTime.asDocument();
109+
return null;
113110
}
114111

115112
static BsonDocument getRecoveryToken(final ResponseBuffers responseBuffers) {
116-
return getFieldDocument(responseBuffers, "recoveryToken");
113+
return getFieldValueAsDocument(responseBuffers, "recoveryToken");
114+
}
115+
116+
@SuppressWarnings("SameParameterValue")
117+
private static BsonTimestamp getFieldValueAsTimestamp(final ResponseBuffers responseBuffers, final String fieldName) {
118+
BsonValue value = getFieldValue(responseBuffers, fieldName);
119+
if (value == null) {
120+
return null;
121+
}
122+
return value.asTimestamp();
117123
}
118124

119-
private static BsonDocument getFieldDocument(final ResponseBuffers responseBuffers, final String fieldName) {
125+
private static BsonDocument getFieldValueAsDocument(final ResponseBuffers responseBuffers, final String fieldName) {
126+
BsonValue value = getFieldValue(responseBuffers, fieldName);
127+
if (value == null) {
128+
return null;
129+
}
130+
return value.asDocument();
131+
}
132+
133+
private static BsonValue getFieldValue(final ResponseBuffers responseBuffers, final String fieldName) {
120134
try {
121-
BsonValue fieldValue = getField(createBsonReader(responseBuffers), fieldName);
122-
if (fieldValue == null) {
123-
return null;
124-
}
125-
return fieldValue.asDocument();
135+
return getField(createBsonReader(responseBuffers), fieldName);
126136
} finally {
127137
responseBuffers.reset();
128138
}
@@ -146,6 +156,25 @@ private static BsonValue getField(final BsonReader bsonReader, final String fiel
146156
return null;
147157
}
148158

159+
@SuppressWarnings("SameParameterValue")
160+
private static BsonValue getNestedFieldValue(final ResponseBuffers responseBuffers, final String topLevelFieldName,
161+
final String nestedFieldName) {
162+
try {
163+
BsonReader bsonReader = createBsonReader(responseBuffers);
164+
bsonReader.readStartDocument();
165+
while (bsonReader.readBsonType() != BsonType.END_OF_DOCUMENT) {
166+
if (bsonReader.readName().equals(topLevelFieldName)) {
167+
return getField(bsonReader, nestedFieldName);
168+
}
169+
bsonReader.skipValue();
170+
}
171+
bsonReader.readEndDocument();
172+
return null;
173+
} finally {
174+
responseBuffers.reset();
175+
}
176+
}
177+
149178
private static boolean isCommandOk(final BsonValue okValue) {
150179
if (okValue == null) {
151180
return false;

0 commit comments

Comments
 (0)