Skip to content

Commit 752eb26

Browse files
committed
JAVA-2571: Support causal consistency by appending afterClusterTime to the readConcern document
1 parent cdd2a89 commit 752eb26

34 files changed

+1112
-97
lines changed

driver-core/src/main/com/mongodb/ReadConcern.java

Lines changed: 22 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -29,15 +29,15 @@
2929
* @since 3.2
3030
*/
3131
public final class ReadConcern {
32-
private final ReadConcernLevel readConcernLevel;
32+
private final ReadConcernLevel level;
3333

3434
/**
3535
* Construct a new read concern
3636
*
37-
* @param readConcernLevel the read concern level
37+
* @param level the read concern level
3838
*/
39-
public ReadConcern(final ReadConcernLevel readConcernLevel) {
40-
this.readConcernLevel = notNull("readConcernLevel", readConcernLevel);
39+
public ReadConcern(final ReadConcernLevel level) {
40+
this.level = notNull("level", level);
4141
}
4242

4343
/**
@@ -67,12 +67,21 @@ public ReadConcern(final ReadConcernLevel readConcernLevel) {
6767
*/
6868
public static final ReadConcern LINEARIZABLE = new ReadConcern(ReadConcernLevel.LINEARIZABLE);
6969

70+
/**
71+
* Gets the read concern level.
72+
*
73+
* @return the read concern level, which may be null (which indicates to use the server's default level)
74+
* @since 3.6
75+
*/
76+
public ReadConcernLevel getLevel() {
77+
return level;
78+
}
7079

7180
/**
7281
* @return true if this is the server default read concern
7382
*/
7483
public boolean isServerDefault() {
75-
return readConcernLevel == null;
84+
return level == null;
7685
}
7786

7887
/**
@@ -82,8 +91,8 @@ public boolean isServerDefault() {
8291
*/
8392
public BsonDocument asDocument() {
8493
BsonDocument readConcern = new BsonDocument();
85-
if (!isServerDefault()){
86-
readConcern.put("level", new BsonString(readConcernLevel.getValue()));
94+
if (level != null) {
95+
readConcern.put("level", new BsonString(level.getValue()));
8796
}
8897
return readConcern;
8998
}
@@ -93,25 +102,21 @@ public boolean equals(final Object o) {
93102
if (this == o) {
94103
return true;
95104
}
96-
if (o == null) {
97-
return false;
98-
}
99-
if (getClass() != o.getClass()) {
105+
if (o == null || getClass() != o.getClass()) {
100106
return false;
101107
}
108+
102109
ReadConcern that = (ReadConcern) o;
103-
if (readConcernLevel != that.readConcernLevel) {
104-
return false;
105-
}
106-
return true;
110+
111+
return level == that.level;
107112
}
108113

109114
@Override
110115
public int hashCode() {
111-
return readConcernLevel != null ? readConcernLevel.hashCode() : 0;
116+
return level != null ? level.hashCode() : 0;
112117
}
113118

114119
private ReadConcern() {
115-
this.readConcernLevel = null;
120+
this.level = null;
116121
}
117122
}

driver-core/src/main/com/mongodb/connection/DefaultServer.java

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -239,11 +239,21 @@ public BsonDocument getSessionId() {
239239
return wrapped.getSessionId();
240240
}
241241

242+
@Override
243+
public boolean isCausallyConsistent() {
244+
return wrapped.isCausallyConsistent();
245+
}
246+
242247
@Override
243248
public long advanceTransactionNumber() {
244249
return wrapped.advanceTransactionNumber();
245250
}
246251

252+
@Override
253+
public BsonTimestamp getOperationTime() {
254+
return wrapped.getOperationTime();
255+
}
256+
247257
@Override
248258
public void advanceOperationTime(final BsonTimestamp operationTime) {
249259
wrapped.advanceOperationTime(operationTime);

driver-core/src/main/com/mongodb/connection/SessionContext.java

Lines changed: 23 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -35,28 +35,50 @@ public interface SessionContext {
3535
boolean hasSession();
3636

3737
/**
38+
* Gets the session identifier if this context has a session backing it.
3839
*
3940
* @return the session id
4041
*/
4142
BsonDocument getSessionId();
4243

4344
/**
45+
* Gets whether this context is associated with a causally consistent session.
46+
*
47+
* @return true ift his context is associated with a causally consistent session
48+
*/
49+
boolean isCausallyConsistent();
50+
51+
/**
52+
* Advance the transaction number.
4453
*
4554
* @return the next transaction number for the session
4655
*/
4756
long advanceTransactionNumber();
4857

4958
/**
59+
* Gets the current operation time for this session context
60+
*
61+
* @return the current operation time, which may be null
62+
*/
63+
BsonTimestamp getOperationTime();
64+
65+
/**
66+
* Advance the operation time. If the current operation time is greater than the given operation time, this method has no effect.
67+
*
5068
* @param operationTime the new operation time time
5169
*/
5270
void advanceOperationTime(BsonTimestamp operationTime);
5371

5472
/**
55-
* @return the cluster time
73+
* Gets the current cluster time for this session context.
74+
*
75+
* @return the cluster time, which may be null
5676
*/
5777
BsonDocument getClusterTime();
5878

5979
/**
80+
* Advance the cluster time. If the current cluster time is greater than the given cluster time, this method has no effect.
81+
*
6082
* @param clusterTime the new cluster time
6183
*/
6284
void advanceClusterTime(BsonDocument clusterTime);

driver-core/src/main/com/mongodb/internal/connection/NoOpSessionContext.java

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -43,11 +43,21 @@ public BsonDocument getSessionId() {
4343
throw new UnsupportedOperationException();
4444
}
4545

46+
@Override
47+
public boolean isCausallyConsistent() {
48+
return false;
49+
}
50+
4651
@Override
4752
public long advanceTransactionNumber() {
4853
throw new UnsupportedOperationException();
4954
}
5055

56+
@Override
57+
public BsonTimestamp getOperationTime() {
58+
return null;
59+
}
60+
5161
@Override
5262
public void advanceOperationTime(final BsonTimestamp operationTime) {
5363
}

driver-core/src/main/com/mongodb/operation/AggregateOperation.java

Lines changed: 8 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -31,6 +31,7 @@
3131
import com.mongodb.connection.Connection;
3232
import com.mongodb.connection.ConnectionDescription;
3333
import com.mongodb.connection.QueryResult;
34+
import com.mongodb.connection.SessionContext;
3435
import com.mongodb.operation.CommandOperationHelper.CommandTransformer;
3536
import org.bson.BsonArray;
3637
import org.bson.BsonBoolean;
@@ -57,6 +58,7 @@
5758
import static com.mongodb.operation.OperationHelper.serverIsAtLeastVersionThreeDotSix;
5859
import static com.mongodb.operation.OperationHelper.validateReadConcernAndCollation;
5960
import static com.mongodb.operation.OperationHelper.withConnection;
61+
import static com.mongodb.operation.ReadConcernHelper.appendReadConcernToCommand;
6062

6163
/**
6264
* An operation that executes an aggregation query.
@@ -301,7 +303,8 @@ public BatchCursor<T> execute(final ReadBinding binding) {
301303
@Override
302304
public BatchCursor<T> call(final ConnectionSource source, final Connection connection) {
303305
validateReadConcernAndCollation(connection, readConcern, collation);
304-
return executeWrappedCommandProtocol(binding, namespace.getDatabaseName(), getCommand(connection.getDescription()),
306+
return executeWrappedCommandProtocol(binding, namespace.getDatabaseName(),
307+
getCommand(connection.getDescription(), binding.getSessionContext()),
305308
CommandResultDocumentCodec.create(decoder, FIELD_NAMES_WITH_RESULT),
306309
connection, transformer(source, connection));
307310
}
@@ -327,7 +330,7 @@ public void call(final AsyncConnectionSource source, final AsyncConnection conne
327330
wrappedCallback.onResult(null, t);
328331
} else {
329332
executeWrappedCommandProtocolAsync(binding, namespace.getDatabaseName(),
330-
getCommand(connection.getDescription()),
333+
getCommand(connection.getDescription(), binding.getSessionContext()),
331334
CommandResultDocumentCodec.create(decoder, FIELD_NAMES_WITH_RESULT),
332335
connection, asyncTransformer(source, connection), wrappedCallback);
333336
}
@@ -366,8 +369,10 @@ private boolean isInline(final ConnectionDescription description) {
366369
return !serverIsAtLeastVersionThreeDotSix(description) && ((useCursor != null && !useCursor));
367370
}
368371

369-
private BsonDocument getCommand(final ConnectionDescription description) {
372+
private BsonDocument getCommand(final ConnectionDescription description, final SessionContext sessionContext) {
370373
BsonDocument commandDocument = new BsonDocument("aggregate", new BsonString(namespace.getCollectionName()));
374+
375+
appendReadConcernToCommand(readConcern, sessionContext, commandDocument);
371376
commandDocument.put("pipeline", new BsonArray(pipeline));
372377
if (maxTimeMS > 0) {
373378
commandDocument.put("maxTimeMS", new BsonInt64(maxTimeMS));
@@ -382,9 +387,6 @@ private BsonDocument getCommand(final ConnectionDescription description) {
382387
if (allowDiskUse != null) {
383388
commandDocument.put("allowDiskUse", BsonBoolean.valueOf(allowDiskUse));
384389
}
385-
if (!readConcern.isServerDefault()) {
386-
commandDocument.put("readConcern", readConcern.asDocument());
387-
}
388390
if (collation != null) {
389391
commandDocument.put("collation", collation.asDocument());
390392
}

driver-core/src/main/com/mongodb/operation/CountOperation.java

Lines changed: 15 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -26,6 +26,8 @@
2626
import com.mongodb.client.model.Collation;
2727
import com.mongodb.connection.AsyncConnection;
2828
import com.mongodb.connection.Connection;
29+
import com.mongodb.connection.SessionContext;
30+
import com.mongodb.internal.connection.NoOpSessionContext;
2931
import com.mongodb.operation.CommandOperationHelper.CommandTransformer;
3032
import com.mongodb.operation.OperationHelper.AsyncCallableWithConnection;
3133
import com.mongodb.operation.OperationHelper.CallableWithConnection;
@@ -42,10 +44,12 @@
4244
import static com.mongodb.operation.CommandOperationHelper.executeWrappedCommandProtocolAsync;
4345
import static com.mongodb.operation.DocumentHelper.putIfNotNull;
4446
import static com.mongodb.operation.DocumentHelper.putIfNotZero;
47+
import static com.mongodb.operation.ExplainHelper.asExplainCommand;
4548
import static com.mongodb.operation.OperationHelper.LOGGER;
4649
import static com.mongodb.operation.OperationHelper.validateReadConcernAndCollation;
4750
import static com.mongodb.operation.OperationHelper.releasingCallback;
4851
import static com.mongodb.operation.OperationHelper.withConnection;
52+
import static com.mongodb.operation.ReadConcernHelper.appendReadConcernToCommand;
4953

5054
/**
5155
* An operation that executes a count.
@@ -233,8 +237,8 @@ public Long execute(final ReadBinding binding) {
233237
@Override
234238
public Long call(final Connection connection) {
235239
validateReadConcernAndCollation(connection, readConcern, collation);
236-
return executeWrappedCommandProtocol(binding, namespace.getDatabaseName(), getCommand(), new BsonDocumentCodec(),
237-
connection, transformer());
240+
return executeWrappedCommandProtocol(binding, namespace.getDatabaseName(), getCommand(binding.getSessionContext()),
241+
new BsonDocumentCodec(), connection, transformer());
238242
}
239243
});
240244
}
@@ -255,8 +259,9 @@ public void call(final AsyncConnection connection, final Throwable t) {
255259
if (t != null) {
256260
wrappedCallback.onResult(null, t);
257261
} else {
258-
executeWrappedCommandProtocolAsync(binding, namespace.getDatabaseName(), getCommand(),
259-
new BsonDocumentCodec(), connection, transformer(), wrappedCallback);
262+
executeWrappedCommandProtocolAsync(binding, namespace.getDatabaseName(),
263+
getCommand(binding.getSessionContext()), new BsonDocumentCodec(), connection, transformer(),
264+
wrappedCallback);
260265
}
261266
}
262267
});
@@ -287,7 +292,7 @@ public AsyncReadOperation<BsonDocument> asExplainableOperationAsync(final Explai
287292

288293
private CommandReadOperation<BsonDocument> createExplainableOperation(final ExplainVerbosity explainVerbosity) {
289294
return new CommandReadOperation<BsonDocument>(namespace.getDatabaseName(),
290-
ExplainHelper.asExplainCommand(getCommand(), explainVerbosity),
295+
asExplainCommand(getCommand(NoOpSessionContext.INSTANCE), explainVerbosity),
291296
new BsonDocumentCodec());
292297
}
293298

@@ -300,16 +305,17 @@ public Long apply(final BsonDocument result, final ServerAddress serverAddress)
300305
};
301306
}
302307

303-
private BsonDocument getCommand() {
308+
private BsonDocument getCommand(final SessionContext sessionContext) {
304309
BsonDocument document = new BsonDocument("count", new BsonString(namespace.getCollectionName()));
310+
311+
appendReadConcernToCommand(readConcern, sessionContext, document);
312+
305313
putIfNotNull(document, "query", filter);
306314
putIfNotZero(document, "limit", limit);
307315
putIfNotZero(document, "skip", skip);
308316
putIfNotNull(document, "hint", hint);
309317
putIfNotZero(document, "maxTimeMS", maxTimeMS);
310-
if (!readConcern.isServerDefault()) {
311-
document.put("readConcern", readConcern.asDocument());
312-
}
318+
313319
if (collation != null) {
314320
document.put("collation", collation.asDocument());
315321
}

driver-core/src/main/com/mongodb/operation/DistinctOperation.java

Lines changed: 8 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -30,6 +30,7 @@
3030
import com.mongodb.connection.Connection;
3131
import com.mongodb.connection.ConnectionDescription;
3232
import com.mongodb.connection.QueryResult;
33+
import com.mongodb.connection.SessionContext;
3334
import com.mongodb.operation.CommandOperationHelper.CommandTransformer;
3435
import com.mongodb.operation.OperationHelper.AsyncCallableWithConnectionAndSource;
3536
import com.mongodb.operation.OperationHelper.CallableWithConnectionAndSource;
@@ -50,6 +51,7 @@
5051
import static com.mongodb.operation.OperationHelper.validateReadConcernAndCollation;
5152
import static com.mongodb.operation.OperationHelper.releasingCallback;
5253
import static com.mongodb.operation.OperationHelper.withConnection;
54+
import static com.mongodb.operation.ReadConcernHelper.appendReadConcernToCommand;
5355

5456
/**
5557
* Finds the distinct values for a specified field across a single collection.
@@ -184,8 +186,8 @@ public BatchCursor<T> execute(final ReadBinding binding) {
184186
@Override
185187
public BatchCursor<T> call(final ConnectionSource source, final Connection connection) {
186188
validateReadConcernAndCollation(connection, readConcern, collation);
187-
return executeWrappedCommandProtocol(binding, namespace.getDatabaseName(), getCommand(), createCommandDecoder(),
188-
connection, transformer(source, connection));
189+
return executeWrappedCommandProtocol(binding, namespace.getDatabaseName(), getCommand(binding.getSessionContext()),
190+
createCommandDecoder(), connection, transformer(source, connection));
189191
}
190192
});
191193
}
@@ -208,8 +210,8 @@ public void call(final AsyncConnectionSource source, final AsyncConnection conne
208210
if (t != null) {
209211
wrappedCallback.onResult(null, t);
210212
} else {
211-
executeWrappedCommandProtocolAsync(binding, namespace.getDatabaseName(), getCommand(),
212-
createCommandDecoder(),
213+
executeWrappedCommandProtocolAsync(binding, namespace.getDatabaseName(),
214+
getCommand(binding.getSessionContext()), createCommandDecoder(),
213215
connection, asyncTransformer(connection.getDescription()), wrappedCallback);
214216
}
215217
}
@@ -248,14 +250,12 @@ public AsyncBatchCursor<T> apply(final BsonDocument result, final ServerAddress
248250
};
249251
}
250252

251-
private BsonDocument getCommand() {
253+
private BsonDocument getCommand(final SessionContext sessionContext) {
252254
BsonDocument commandDocument = new BsonDocument("distinct", new BsonString(namespace.getCollectionName()));
255+
appendReadConcernToCommand(readConcern, sessionContext, commandDocument);
253256
commandDocument.put("key", new BsonString(fieldName));
254257
putIfNotNull(commandDocument, "query", filter);
255258
putIfNotZero(commandDocument, "maxTimeMS", maxTimeMS);
256-
if (!readConcern.isServerDefault()) {
257-
commandDocument.put("readConcern", readConcern.asDocument());
258-
}
259259
if (collation != null) {
260260
commandDocument.put("collation", collation.asDocument());
261261
}

0 commit comments

Comments
 (0)