Skip to content

Commit fed2ac5

Browse files
committed
Support for majority read concern level to aggregation $out
JAVA-3211
1 parent 00aac43 commit fed2ac5

File tree

19 files changed

+591
-104
lines changed

19 files changed

+591
-104
lines changed

driver-async/src/main/com/mongodb/async/client/AggregateIterableImpl.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -71,8 +71,8 @@ class AggregateIterableImpl<TDocument, TResult> extends MongoIterableImpl<TResul
7171
final ReadConcern readConcern, final WriteConcern writeConcern, final OperationExecutor executor,
7272
final List<? extends Bson> pipeline, final AggregationLevel aggregationLevel, final boolean retryReads) {
7373
super(clientSession, executor, readConcern, readPreference, retryReads);
74-
this.operations = new AsyncOperations<TDocument>(namespace, documentClass, readPreference, codecRegistry, writeConcern, false,
75-
retryReads);
74+
this.operations = new AsyncOperations<TDocument>(namespace, documentClass, readPreference, codecRegistry, readConcern, writeConcern,
75+
false, retryReads);
7676
this.namespace = notNull("namespace", namespace);
7777
this.documentClass = notNull("documentClass", documentClass);
7878
this.resultClass = notNull("resultClass", resultClass);

driver-async/src/main/com/mongodb/async/client/MapReduceIterableImpl.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -71,7 +71,7 @@ class MapReduceIterableImpl<TDocument, TResult> extends MongoIterableImpl<TResul
7171
final ReadConcern readConcern, final WriteConcern writeConcern, final OperationExecutor executor,
7272
final String mapFunction, final String reduceFunction) {
7373
super(clientSession, executor, readConcern, readPreference, false);
74-
this.operations = new AsyncOperations<TDocument>(namespace, documentClass, readPreference, codecRegistry, writeConcern,
74+
this.operations = new AsyncOperations<TDocument>(namespace, documentClass, readPreference, codecRegistry, readConcern, writeConcern,
7575
false, false);
7676
this.namespace = notNull("namespace", namespace);
7777
this.resultClass = notNull("resultClass", resultClass);

driver-async/src/main/com/mongodb/async/client/MongoCollectionImpl.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -97,7 +97,7 @@ class MongoCollectionImpl<TDocument> implements MongoCollection<TDocument> {
9797
this.retryReads = retryReads;
9898
this.readConcern = notNull("readConcern", readConcern);
9999
this.executor = notNull("executor", executor);
100-
this.operations = new AsyncOperations<TDocument>(namespace, documentClass, readPreference, codecRegistry, writeConcern,
100+
this.operations = new AsyncOperations<TDocument>(namespace, documentClass, readPreference, codecRegistry, readConcern, writeConcern,
101101
retryWrites, retryReads);
102102
}
103103

driver-async/src/test/functional/com/mongodb/async/client/CrudTest.java

Lines changed: 22 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -23,6 +23,7 @@
2323
import com.mongodb.event.CommandEvent;
2424
import com.mongodb.internal.connection.TestCommandListener;
2525
import org.bson.BsonArray;
26+
import org.bson.BsonBoolean;
2627
import org.bson.BsonDocument;
2728
import org.bson.BsonString;
2829
import org.bson.BsonValue;
@@ -58,6 +59,7 @@ public class CrudTest {
5859
private final String filename;
5960
private final String description;
6061
private final String databaseName;
62+
private final String collectionName;
6163
private final BsonArray data;
6264
private final BsonDocument definition;
6365
private final boolean skipTest;
@@ -67,13 +69,12 @@ public class CrudTest {
6769
private JsonPoweredCrudTestHelper helper;
6870
private final TestCommandListener commandListener;
6971

70-
private static final long MIN_HEARTBEAT_FREQUENCY_MS = 50L;
71-
72-
public CrudTest(final String filename, final String description, final String databaseName,
72+
public CrudTest(final String filename, final String description, final String databaseName, final String collectionName,
7373
final BsonArray data, final BsonDocument definition, final boolean skipTest) {
7474
this.filename = filename;
7575
this.description = description;
7676
this.databaseName = databaseName;
77+
this.collectionName = collectionName;
7778
this.data = data;
7879
this.definition = definition;
7980
this.skipTest = skipTest;
@@ -86,9 +87,7 @@ public void setUp() {
8687
// No runOn syntax for legacy CRUD, so skipping these manually for now
8788
assumeFalse(isSharded() && description.startsWith("Aggregate with $currentOp"));
8889

89-
String collectionName = "test";
9090
collectionHelper = new CollectionHelper<Document>(new DocumentCodec(), new MongoNamespace(databaseName, collectionName));
91-
9291
collectionHelper.killAllSessions();
9392
collectionHelper.create(collectionName, new CreateCollectionOptions(), WriteConcern.MAJORITY);
9493

@@ -123,25 +122,33 @@ public void cleanUp() {
123122

124123
@Test
125124
public void shouldPassAllOutcomes() {
126-
BsonDocument expectedOutcome = definition.getDocument("outcome");
125+
BsonDocument expectedOutcome = definition.getDocument("outcome", null);
127126

128127
// check if v1 test
129128
if (definition.containsKey("operation")) {
130-
runOperation(expectedOutcome, helper.getOperationResults(definition.getDocument("operation")),
129+
runOperation(expectedOutcome, definition.getDocument("operation"),
131130
expectedOutcome.containsKey("result") && expectedOutcome.isDocument("result")
132131
? expectedOutcome.get("result").asDocument() : null);
133132
} else { // v2 test
134133
BsonArray operations = definition.getArray("operations");
135134
for (BsonValue operation : operations) {
136-
runOperation(expectedOutcome, helper.getOperationResults(operation.asDocument()),
135+
runOperation(expectedOutcome, operation.asDocument(),
137136
operation.asDocument().containsKey("result") ? operation.asDocument().getDocument("result") : null);
138137
}
139138
}
140139
}
141140

142-
private void runOperation(final BsonDocument expectedOutcome, final BsonDocument outcome, final BsonDocument expectedResult) {
143-
if (expectedOutcome.containsKey("error")) {
144-
assertEquals("Expected error", expectedOutcome.getBoolean("error"), outcome.get("error"));
141+
private void runOperation(final BsonDocument expectedOutcome, final BsonDocument operation, final BsonDocument expectedResult) {
142+
BsonDocument outcome = null;
143+
boolean wasException = false;
144+
try {
145+
outcome = helper.getOperationResults(operation);
146+
} catch (Exception e) {
147+
wasException = true;
148+
}
149+
150+
if (operation.getBoolean("error", BsonBoolean.FALSE).getValue()) {
151+
assertEquals(operation.containsKey("error"), wasException);
145152
}
146153

147154
if (expectedResult != null) {
@@ -161,15 +168,14 @@ private void runOperation(final BsonDocument expectedOutcome, final BsonDocument
161168

162169
assertEquals(description, expectedResult, actualResult);
163170
}
164-
165-
166171
if (definition.containsKey("expectations")) {
167172
List<CommandEvent> expectedEvents = getExpectedEvents(definition.getArray("expectations"), databaseName, null);
168173
List<CommandEvent> events = commandListener.getCommandStartedEvents();
169174

170-
assertEventsEquality(expectedEvents, events);
175+
176+
assertEventsEquality(expectedEvents, events.subList(0, expectedEvents.size()));
171177
}
172-
if (expectedOutcome.containsKey("collection")) {
178+
if (expectedOutcome != null && expectedOutcome.containsKey("collection")) {
173179
assertCollectionEquals(expectedOutcome.getDocument("collection"));
174180
}
175181
}
@@ -182,6 +188,7 @@ public static Collection<Object[]> data() throws URISyntaxException, IOException
182188
for (BsonValue test : testDocument.getArray("tests")) {
183189
data.add(new Object[]{file.getName(), test.asDocument().getString("description").getValue(),
184190
testDocument.getString("database_name", new BsonString(getDefaultDatabaseName())).getValue(),
191+
testDocument.getString("collection_name", new BsonString("test")).getValue(),
185192
testDocument.getArray("data"), test.asDocument(), skipTest(testDocument, test.asDocument())});
186193
}
187194
}

driver-async/src/test/unit/com/mongodb/async/client/AggregateIterableSpecification.groovy

Lines changed: 10 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -133,7 +133,8 @@ class AggregateIterableSpecification extends Specification {
133133

134134
then: 'should use the overrides'
135135
expect operation.getAggregateToCollectionOperation(), isTheSameAs(new AggregateToCollectionOperation(namespace,
136-
[new BsonDocument('$match', new BsonInt32(1)), new BsonDocument('$out', new BsonString(collectionName))], writeConcern)
136+
[new BsonDocument('$match', new BsonInt32(1)), new BsonDocument('$out', new BsonString(collectionName))],
137+
readConcern, writeConcern)
137138
.maxTime(999, MILLISECONDS)
138139
.allowDiskUse(true)
139140
.collation(collation)
@@ -167,8 +168,8 @@ class AggregateIterableSpecification extends Specification {
167168

168169
then: 'should use the overrides'
169170
expect operation.getAggregateToCollectionOperation(), isTheSameAs(new AggregateToCollectionOperation(namespace,
170-
[new BsonDocument('$match', new BsonInt32(1)), new BsonDocument('$out', new BsonString(collectionName))], writeConcern,
171-
AggregationLevel.DATABASE)
171+
[new BsonDocument('$match', new BsonInt32(1)), new BsonDocument('$out', new BsonString(collectionName))],
172+
readConcern, writeConcern, AggregationLevel.DATABASE)
172173
.maxTime(999, MILLISECONDS)
173174
.allowDiskUse(true)
174175
.collation(collation)
@@ -200,7 +201,8 @@ class AggregateIterableSpecification extends Specification {
200201

201202
then:
202203
expect operation, isTheSameAs(new AggregateToCollectionOperation(namespace,
203-
[new BsonDocument('$match', new BsonInt32(1)), new BsonDocument('$out', new BsonString(collectionName))], writeConcern)
204+
[new BsonDocument('$match', new BsonInt32(1)), new BsonDocument('$out', new BsonString(collectionName))],
205+
readConcern, writeConcern)
204206
.allowDiskUse(true)
205207
.collation(collation)
206208
.hint(new BsonDocument('a', new BsonInt32(1)))
@@ -240,7 +242,7 @@ class AggregateIterableSpecification extends Specification {
240242
expect operation.getAggregateToCollectionOperation(), isTheSameAs(new AggregateToCollectionOperation(namespace,
241243
[new BsonDocument('$match', new BsonInt32(1)),
242244
new BsonDocument('$merge', new BsonDocument('into', new BsonString(collectionName)))],
243-
writeConcern)
245+
readConcern, writeConcern)
244246
.maxTime(999, MILLISECONDS)
245247
.allowDiskUse(true)
246248
.collation(collation)
@@ -276,7 +278,7 @@ class AggregateIterableSpecification extends Specification {
276278
[new BsonDocument('$match', new BsonInt32(1)),
277279
new BsonDocument('$merge', new BsonDocument('into',
278280
new BsonDocument('db', new BsonString('db2')).append('coll', new BsonString(collectionName))))],
279-
writeConcern,
281+
readConcern, writeConcern,
280282
AggregationLevel.COLLECTION)
281283
.maxTime(999, MILLISECONDS)
282284
.allowDiskUse(true)
@@ -314,7 +316,7 @@ class AggregateIterableSpecification extends Specification {
314316
expect operation.getAggregateToCollectionOperation(), isTheSameAs(new AggregateToCollectionOperation(namespace,
315317
[new BsonDocument('$match', new BsonInt32(1)),
316318
new BsonDocument('$merge', new BsonDocument('into', new BsonString(collectionName)))],
317-
writeConcern,
319+
readConcern, writeConcern,
318320
AggregationLevel.DATABASE)
319321
.maxTime(999, MILLISECONDS)
320322
.allowDiskUse(true)
@@ -349,7 +351,7 @@ class AggregateIterableSpecification extends Specification {
349351
expect operation, isTheSameAs(new AggregateToCollectionOperation(namespace,
350352
[new BsonDocument('$match', new BsonInt32(1)),
351353
new BsonDocument('$merge', new BsonDocument('into', new BsonString(collectionName)))],
352-
writeConcern)
354+
readConcern, writeConcern)
353355
.allowDiskUse(true)
354356
.collation(collation)
355357
.hint(new BsonDocument('a', new BsonInt32(1)))

driver-core/src/main/com/mongodb/internal/operation/AsyncOperations.java

Lines changed: 15 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,7 @@
1717
package com.mongodb.internal.operation;
1818

1919
import com.mongodb.MongoNamespace;
20+
import com.mongodb.ReadConcern;
2021
import com.mongodb.ReadPreference;
2122
import com.mongodb.WriteConcern;
2223
import com.mongodb.async.AsyncBatchCursor;
@@ -58,34 +59,39 @@ public final class AsyncOperations<TDocument> {
5859

5960
public AsyncOperations(final Class<TDocument> documentClass, final ReadPreference readPreference,
6061
final CodecRegistry codecRegistry) {
61-
this(null, documentClass, readPreference, codecRegistry, WriteConcern.ACKNOWLEDGED, false, true);
62+
this(null, documentClass, readPreference, codecRegistry, ReadConcern.DEFAULT, WriteConcern.ACKNOWLEDGED, false, true);
6263
}
6364

6465
public AsyncOperations(final Class<TDocument> documentClass, final ReadPreference readPreference,
6566
final CodecRegistry codecRegistry, final boolean retryReads) {
66-
this(null, documentClass, readPreference, codecRegistry, WriteConcern.ACKNOWLEDGED, false, retryReads);
67+
this(null, documentClass, readPreference, codecRegistry, ReadConcern.DEFAULT, WriteConcern.ACKNOWLEDGED, false, retryReads);
6768
}
6869

6970
public AsyncOperations(final MongoNamespace namespace, final Class<TDocument> documentClass, final ReadPreference readPreference,
7071
final CodecRegistry codecRegistry) {
71-
this(namespace, documentClass, readPreference, codecRegistry, WriteConcern.ACKNOWLEDGED, false, true);
72+
this(namespace, documentClass, readPreference, codecRegistry, ReadConcern.DEFAULT, WriteConcern.ACKNOWLEDGED, false, true);
7273
}
7374

7475
public AsyncOperations(final MongoNamespace namespace, final Class<TDocument> documentClass, final ReadPreference readPreference,
7576
final CodecRegistry codecRegistry, final boolean retryReads) {
76-
this(namespace, documentClass, readPreference, codecRegistry, WriteConcern.ACKNOWLEDGED, false, retryReads);
77+
this(namespace, documentClass, readPreference, codecRegistry, ReadConcern.DEFAULT, WriteConcern.ACKNOWLEDGED, false, retryReads);
7778
}
7879

7980
public AsyncOperations(final MongoNamespace namespace, final Class<TDocument> documentClass, final ReadPreference readPreference,
8081
final CodecRegistry codecRegistry, final WriteConcern writeConcern) {
81-
this(namespace, documentClass, readPreference, codecRegistry, writeConcern, false, true);
82+
this(namespace, documentClass, readPreference, codecRegistry, ReadConcern.DEFAULT, writeConcern, false, true);
8283
}
8384

8485
public AsyncOperations(final MongoNamespace namespace, final Class<TDocument> documentClass, final ReadPreference readPreference,
85-
final CodecRegistry codecRegistry, final WriteConcern writeConcern, final boolean retryWrites,
86-
final boolean retryReads) {
87-
this.operations = new Operations<TDocument>(namespace, documentClass, readPreference, codecRegistry, writeConcern, retryWrites,
88-
retryReads);
86+
final CodecRegistry codecRegistry, final ReadConcern readConcern, final WriteConcern writeConcern) {
87+
this(namespace, documentClass, readPreference, codecRegistry, readConcern, writeConcern, false, true);
88+
}
89+
90+
public AsyncOperations(final MongoNamespace namespace, final Class<TDocument> documentClass, final ReadPreference readPreference,
91+
final CodecRegistry codecRegistry, final ReadConcern readConcern, final WriteConcern writeConcern,
92+
final boolean retryWrites, final boolean retryReads) {
93+
this.operations = new Operations<TDocument>(namespace, documentClass, readPreference, codecRegistry, readConcern, writeConcern,
94+
retryWrites, retryReads);
8995
}
9096

9197
public AsyncReadOperation<Long> count(final Bson filter, final CountOptions options, final CountStrategy countStrategy) {

driver-core/src/main/com/mongodb/internal/operation/Operations.java

Lines changed: 5 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,7 @@
1717
package com.mongodb.internal.operation;
1818

1919
import com.mongodb.MongoNamespace;
20+
import com.mongodb.ReadConcern;
2021
import com.mongodb.ReadPreference;
2122
import com.mongodb.WriteConcern;
2223
import com.mongodb.bulk.DeleteRequest;
@@ -94,17 +95,19 @@ final class Operations<TDocument> {
9495
private final Class<TDocument> documentClass;
9596
private final ReadPreference readPreference;
9697
private final CodecRegistry codecRegistry;
98+
private final ReadConcern readConcern;
9799
private final WriteConcern writeConcern;
98100
private final boolean retryWrites;
99101
private boolean retryReads;
100102

101103
Operations(final MongoNamespace namespace, final Class<TDocument> documentClass, final ReadPreference readPreference,
102-
final CodecRegistry codecRegistry, final WriteConcern writeConcern, final boolean retryWrites,
104+
final CodecRegistry codecRegistry, final ReadConcern readConcern, final WriteConcern writeConcern, final boolean retryWrites,
103105
final boolean retryReads) {
104106
this.namespace = namespace;
105107
this.documentClass = documentClass;
106108
this.readPreference = readPreference;
107109
this.codecRegistry = codecRegistry;
110+
this.readConcern = readConcern;
108111
this.writeConcern = writeConcern;
109112
this.retryWrites = retryWrites;
110113
this.retryReads = retryReads;
@@ -205,7 +208,7 @@ AggregateToCollectionOperation aggregateToCollection(final List<? extends Bson>
205208
final Boolean allowDiskUse, final Boolean bypassDocumentValidation,
206209
final Collation collation, final Bson hint, final String comment,
207210
final AggregationLevel aggregationLevel) {
208-
return new AggregateToCollectionOperation(namespace, toBsonDocumentList(pipeline), writeConcern, aggregationLevel)
211+
return new AggregateToCollectionOperation(namespace, toBsonDocumentList(pipeline), readConcern, writeConcern, aggregationLevel)
209212
.maxTime(maxTimeMS, MILLISECONDS)
210213
.allowDiskUse(allowDiskUse)
211214
.bypassDocumentValidation(bypassDocumentValidation)

0 commit comments

Comments
 (0)