Skip to content

Commit dbaadb8

Browse files
committed
Support $merge aggregation stage
* Recognize pipelines that end with $merge and treat them as writes * Recognize the various flavors of the "into" field so that the correct find command can be constructed when attempting to iterate an aggregation ending with a $merge stage. JAVA-3319
1 parent 4453921 commit dbaadb8

File tree

12 files changed

+365
-50
lines changed

12 files changed

+365
-50
lines changed

driver-async/src/main/com/mongodb/async/client/AggregateIterable.java

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -81,10 +81,10 @@ public interface AggregateIterable<TResult> extends MongoIterable<TResult> {
8181
AggregateIterable<TResult> useCursor(@Nullable Boolean useCursor);
8282

8383
/**
84-
* Aggregates documents according to the specified aggregation pipeline, which must end with a $out stage.
84+
* Aggregates documents according to the specified aggregation pipeline, which must end with a $out or $merge stage.
8585
*
8686
* @param callback the callback, which is called when the aggregation completes
87-
* @throws IllegalStateException if the pipeline does not end with a $out stage
87+
* @throws IllegalStateException if the pipeline does not end with a $out or $merge stage
8888
* @mongodb.driver.manual aggregation/ Aggregation
8989
*/
9090
void toCollection(SingleResultCallback<Void> callback);
@@ -101,7 +101,7 @@ public interface AggregateIterable<TResult> extends MongoIterable<TResult> {
101101
/**
102102
* Sets the bypass document level validation flag.
103103
*
104-
* <p>Note: This only applies when an $out stage is specified</p>.
104+
* <p>Note: This only applies when a $out or $merge stage is specified</p>.
105105
*
106106
* @param bypassDocumentValidation If true, allows the write to opt-out of document level validation.
107107
* @return this

driver-async/src/main/com/mongodb/async/client/AggregateIterableImpl.java

Lines changed: 22 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -30,7 +30,7 @@
3030
import com.mongodb.operation.AsyncReadOperation;
3131
import com.mongodb.operation.AsyncWriteOperation;
3232
import org.bson.BsonDocument;
33-
import org.bson.BsonValue;
33+
import org.bson.BsonString;
3434
import org.bson.codecs.configuration.CodecRegistry;
3535
import org.bson.conversions.Bson;
3636

@@ -84,8 +84,8 @@ class AggregateIterableImpl<TDocument, TResult> extends MongoIterableImpl<TResul
8484
@Override
8585
public void toCollection(final SingleResultCallback<Void> callback) {
8686

87-
if (getOutCollection() == null) {
88-
throw new IllegalStateException("The last stage of the aggregation pipeline must be $out");
87+
if (getOutNamespace() == null) {
88+
throw new IllegalStateException("The last stage of the aggregation pipeline must be $out or $merge");
8989
}
9090

9191
getExecutor().execute(operations.aggregateToCollection(pipeline, maxTimeMS, allowDiskUse, bypassDocumentValidation, collation, hint,
@@ -152,9 +152,9 @@ public AggregateIterable<TResult> hint(@Nullable final Bson hint) {
152152
@Override
153153
@SuppressWarnings("deprecation")
154154
AsyncReadOperation<AsyncBatchCursor<TResult>> asAsyncReadOperation() {
155-
BsonValue outCollection = getOutCollection();
155+
MongoNamespace outNamespace = getOutNamespace();
156156

157-
if (outCollection != null) {
157+
if (outNamespace != null) {
158158
AsyncWriteOperation<Void> aggregateToCollectionOperation =
159159
operations.aggregateToCollection(pipeline, maxTimeMS, allowDiskUse, bypassDocumentValidation, collation, hint, comment,
160160
aggregationLevel);
@@ -165,8 +165,7 @@ AsyncReadOperation<AsyncBatchCursor<TResult>> asAsyncReadOperation() {
165165
findOptions.batchSize(batchSize);
166166
}
167167
AsyncReadOperation<AsyncBatchCursor<TResult>> findOperation =
168-
operations.find(new MongoNamespace(namespace.getDatabaseName(), outCollection.asString().getValue()),
169-
new BsonDocument(), resultClass, findOptions);
168+
operations.find(outNamespace, new BsonDocument(), resultClass, findOptions);
170169

171170
return new WriteOperationThenCursorReadOperation<TResult>(aggregateToCollectionOperation, findOperation);
172171
} else {
@@ -177,12 +176,26 @@ AsyncReadOperation<AsyncBatchCursor<TResult>> asAsyncReadOperation() {
177176
}
178177

179178
@Nullable
180-
private BsonValue getOutCollection() {
179+
private MongoNamespace getOutNamespace() {
181180
if (pipeline.size() == 0) {
182181
return null;
183182
}
184183

185184
Bson lastStage = notNull("last stage", pipeline.get(pipeline.size() - 1));
186-
return lastStage.toBsonDocument(documentClass, codecRegistry).get("$out");
185+
BsonDocument lastStageDocument = lastStage.toBsonDocument(documentClass, codecRegistry);
186+
if (lastStageDocument.containsKey("$out")) {
187+
return new MongoNamespace(namespace.getDatabaseName(), lastStageDocument.getString("$out").getValue());
188+
} else if (lastStageDocument.containsKey("$merge")) {
189+
BsonDocument mergeDocument = lastStageDocument.getDocument("$merge");
190+
if (mergeDocument.isDocument("into")) {
191+
BsonDocument intoDocument = mergeDocument.getDocument("into");
192+
return new MongoNamespace(intoDocument.getString("db", new BsonString(namespace.getDatabaseName())).getValue(),
193+
intoDocument.getString("coll").getValue());
194+
} else if (mergeDocument.isString("into")) {
195+
return new MongoNamespace(namespace.getDatabaseName(), mergeDocument.getString("into").getValue());
196+
}
197+
}
198+
199+
return null;
187200
}
188201
}

driver-async/src/main/com/mongodb/async/client/MapReduceIterable.java

Lines changed: 0 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -167,8 +167,6 @@ public interface MapReduceIterable<TResult> extends MongoIterable<TResult> {
167167
/**
168168
* Sets the bypass document level validation flag.
169169
*
170-
* <p>Note: This only applies when an $out stage is specified</p>.
171-
*
172170
* @param bypassDocumentValidation If true, allows the write to opt-out of document level validation.
173171
* @return this
174172
* @since 3.2

driver-async/src/main/com/mongodb/async/client/MongoCollection.java

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -539,7 +539,7 @@ public interface MongoCollection<TDocument> {
539539
<TResult> FindIterable<TResult> find(ClientSession clientSession, Bson filter, Class<TResult> resultClass);
540540

541541
/**
542-
* Aggregates documents according to the specified aggregation pipeline. If the pipeline ends with a $out stage, the returned
542+
* Aggregates documents according to the specified aggregation pipeline. If the pipeline ends with a $out or $merge stage, the returned
543543
* iterable will be a query of the collection that the aggregation was written to. Note that in this case the pipeline will be
544544
* executed even if the iterable is never iterated.
545545
*
@@ -550,7 +550,7 @@ public interface MongoCollection<TDocument> {
550550
AggregateIterable<TDocument> aggregate(List<? extends Bson> pipeline);
551551

552552
/**
553-
* Aggregates documents according to the specified aggregation pipeline. If the pipeline ends with a $out stage, the returned
553+
* Aggregates documents according to the specified aggregation pipeline. If the pipeline ends with a $out or $merge stage, the returned
554554
* iterable will be a query of the collection that the aggregation was written to. Note that in this case the pipeline will be
555555
* executed even if the iterable is never iterated.
556556
*
@@ -563,7 +563,7 @@ public interface MongoCollection<TDocument> {
563563
<TResult> AggregateIterable<TResult> aggregate(List<? extends Bson> pipeline, Class<TResult> resultClass);
564564

565565
/**
566-
* Aggregates documents according to the specified aggregation pipeline. If the pipeline ends with a $out stage, the returned
566+
* Aggregates documents according to the specified aggregation pipeline. If the pipeline ends with a $out or $merge stage, the returned
567567
* iterable will be a query of the collection that the aggregation was written to. Note that in this case the pipeline will be
568568
* executed even if the iterable is never iterated.
569569
*
@@ -577,7 +577,7 @@ public interface MongoCollection<TDocument> {
577577
AggregateIterable<TDocument> aggregate(ClientSession clientSession, List<? extends Bson> pipeline);
578578

579579
/**
580-
* Aggregates documents according to the specified aggregation pipeline. If the pipeline ends with a $out stage, the returned
580+
* Aggregates documents according to the specified aggregation pipeline. If the pipeline ends with a $out or $merge stage, the returned
581581
* iterable will be a query of the collection that the aggregation was written to. Note that in this case the pipeline will be
582582
* executed even if the iterable is never iterated.
583583
*

driver-async/src/test/unit/com/mongodb/async/client/AggregateIterableSpecification.groovy

Lines changed: 150 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -104,7 +104,7 @@ class AggregateIterableSpecification extends Specification {
104104
)
105105
}
106106

107-
def 'should build the expected AggregateToCollectionOperation'() {
107+
def 'should build the expected AggregateToCollectionOperation for $out'() {
108108
given:
109109
def cursor = Stub(AsyncBatchCursor) {
110110
next(_) >> {
@@ -207,6 +207,155 @@ class AggregateIterableSpecification extends Specification {
207207
.comment('this is a comment'))
208208
}
209209

210+
def 'should build the expected AggregateToCollectionOperation for $merge'() {
211+
given:
212+
def cursor = Stub(AsyncBatchCursor) {
213+
next(_) >> {
214+
it[0].onResult(null, null)
215+
}
216+
}
217+
def executor = new TestOperationExecutor([cursor, cursor, cursor, cursor, cursor, cursor, cursor]);
218+
def collectionName = 'collectionName'
219+
def collectionNamespace = new MongoNamespace(namespace.getDatabaseName(), collectionName)
220+
def pipeline = [new Document('$match', 1), new Document('$merge', new Document('into', collectionName))]
221+
def pipelineWithIntoDocument = [new Document('$match', 1), new Document('$merge',
222+
new Document('into', new Document('db', 'db2').append('coll', collectionName)))]
223+
224+
when: 'aggregation includes $merge'
225+
new AggregateIterableImpl(null, namespace, Document, Document, codecRegistry, readPreference, readConcern, writeConcern, executor,
226+
pipeline, AggregationLevel.COLLECTION, true)
227+
.batchSize(99)
228+
.maxAwaitTime(99, MILLISECONDS)
229+
.maxTime(999, MILLISECONDS)
230+
.allowDiskUse(true)
231+
.useCursor(true)
232+
.collation(collation)
233+
.hint(new Document('a', 1))
234+
.comment('this is a comment')
235+
.into([]) { result, t -> }
236+
237+
def operation = executor.getReadOperation() as WriteOperationThenCursorReadOperation
238+
239+
then: 'should use the overrides'
240+
expect operation.getAggregateToCollectionOperation(), isTheSameAs(new AggregateToCollectionOperation(namespace,
241+
[new BsonDocument('$match', new BsonInt32(1)),
242+
new BsonDocument('$merge', new BsonDocument('into', new BsonString(collectionName)))],
243+
writeConcern)
244+
.maxTime(999, MILLISECONDS)
245+
.allowDiskUse(true)
246+
.collation(collation)
247+
.hint(new BsonDocument('a', new BsonInt32(1)))
248+
.comment('this is a comment'))
249+
250+
when: 'the subsequent read should have the batchSize set'
251+
operation = operation.getReadOperation() as FindOperation
252+
253+
then: 'should use the correct settings'
254+
operation.getNamespace() == collectionNamespace
255+
operation.getCollation() == collation
256+
operation.getBatchSize() == 99
257+
operation.getMaxAwaitTime(MILLISECONDS) == 0
258+
operation.getMaxTime(MILLISECONDS) == 0
259+
260+
when: 'aggregation includes $merge into a different database'
261+
new AggregateIterableImpl(null, namespace, Document, Document, codecRegistry, readPreference, readConcern, writeConcern, executor,
262+
pipelineWithIntoDocument, AggregationLevel.COLLECTION, false)
263+
.batchSize(99)
264+
.maxTime(999, MILLISECONDS)
265+
.allowDiskUse(true)
266+
.useCursor(true)
267+
.collation(collation)
268+
.hint(new Document('a', 1))
269+
.comment('this is a comment')
270+
.into([]) { result, t -> }
271+
272+
operation = executor.getReadOperation() as WriteOperationThenCursorReadOperation
273+
274+
then: 'should use the overrides'
275+
expect operation.getAggregateToCollectionOperation(), isTheSameAs(new AggregateToCollectionOperation(namespace,
276+
[new BsonDocument('$match', new BsonInt32(1)),
277+
new BsonDocument('$merge', new BsonDocument('into',
278+
new BsonDocument('db', new BsonString('db2')).append('coll', new BsonString(collectionName))))],
279+
writeConcern,
280+
AggregationLevel.COLLECTION)
281+
.maxTime(999, MILLISECONDS)
282+
.allowDiskUse(true)
283+
.collation(collation)
284+
.hint(new BsonDocument('a', new BsonInt32(1)))
285+
.comment('this is a comment')
286+
)
287+
288+
when: 'the subsequent read should have the batchSize set'
289+
operation = operation.getReadOperation() as FindOperation
290+
291+
then: 'should use the correct settings'
292+
operation.getNamespace() == new MongoNamespace('db2', collectionName)
293+
operation.getBatchSize() == 99
294+
operation.getCollation() == collation
295+
operation.getMaxAwaitTime(MILLISECONDS) == 0
296+
operation.getMaxTime(MILLISECONDS) == 0
297+
298+
when: 'aggregation includes $out and is at the database level'
299+
new AggregateIterableImpl(null, namespace, Document, Document, codecRegistry, readPreference, readConcern, writeConcern, executor,
300+
pipeline, AggregationLevel.DATABASE, true)
301+
.batchSize(99)
302+
.maxAwaitTime(99, MILLISECONDS)
303+
.maxTime(999, MILLISECONDS)
304+
.allowDiskUse(true)
305+
.useCursor(true)
306+
.collation(collation)
307+
.hint(new Document('a', 1))
308+
.comment('this is a comment')
309+
.into([]) { result, t -> }
310+
311+
operation = executor.getReadOperation() as WriteOperationThenCursorReadOperation
312+
313+
then: 'should use the overrides'
314+
expect operation.getAggregateToCollectionOperation(), isTheSameAs(new AggregateToCollectionOperation(namespace,
315+
[new BsonDocument('$match', new BsonInt32(1)),
316+
new BsonDocument('$merge', new BsonDocument('into', new BsonString(collectionName)))],
317+
writeConcern,
318+
AggregationLevel.DATABASE)
319+
.maxTime(999, MILLISECONDS)
320+
.allowDiskUse(true)
321+
.collation(collation)
322+
.hint(new BsonDocument('a', new BsonInt32(1)))
323+
.comment('this is a comment'))
324+
325+
when: 'the subsequent read should have the batchSize set'
326+
operation = operation.getReadOperation() as FindOperation
327+
328+
then: 'should use the correct settings'
329+
operation.getNamespace() == collectionNamespace
330+
operation.getCollation() == collation
331+
operation.getBatchSize() == 99
332+
operation.getMaxAwaitTime(MILLISECONDS) == 0
333+
operation.getMaxTime(MILLISECONDS) == 0
334+
335+
when: 'toCollection should work as expected'
336+
def futureResultCallback = new FutureResultCallback()
337+
new AggregateIterableImpl(null, namespace, Document, Document, codecRegistry, readPreference, readConcern, writeConcern, executor,
338+
pipeline, AggregationLevel.COLLECTION, true)
339+
.allowDiskUse(true)
340+
.collation(collation)
341+
.hint(new Document('a', 1))
342+
.comment('this is a comment')
343+
.toCollection(futureResultCallback)
344+
futureResultCallback.get()
345+
346+
operation = executor.getWriteOperation() as AggregateToCollectionOperation
347+
348+
then:
349+
expect operation, isTheSameAs(new AggregateToCollectionOperation(namespace,
350+
[new BsonDocument('$match', new BsonInt32(1)),
351+
new BsonDocument('$merge', new BsonDocument('into', new BsonString(collectionName)))],
352+
writeConcern)
353+
.allowDiskUse(true)
354+
.collation(collation)
355+
.hint(new BsonDocument('a', new BsonInt32(1)))
356+
.comment('this is a comment'))
357+
}
358+
210359
def 'should handle exceptions correctly'() {
211360
given:
212361
def codecRegistry = fromProviders([new ValueCodecProvider(), new BsonValueCodecProvider()])

driver-core/src/main/com/mongodb/operation/AggregateToCollectionOperation.java

Lines changed: 1 addition & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -120,8 +120,6 @@ public AggregateToCollectionOperation(final MongoNamespace namespace, final List
120120
this.aggregationLevel = notNull("aggregationLevel", aggregationLevel);
121121

122122
isTrueArgument("pipeline is not empty", !pipeline.isEmpty());
123-
isTrueArgument("last stage of pipeline contains an output collection",
124-
pipeline.get(pipeline.size() - 1).get("$out") != null);
125123
}
126124

127125
/**
@@ -208,7 +206,7 @@ public Boolean getBypassDocumentValidation() {
208206
/**
209207
* Sets the bypass document level validation flag.
210208
*
211-
* <p>Note: This only applies when an $out stage is specified</p>.
209+
* <p>Note: This only applies when an $out or $merge stage is specified</p>.
212210
*
213211
* @param bypassDocumentValidation If true, allows the write to opt-out of document level validation.
214212
* @return this

driver-core/src/main/com/mongodb/operation/FindAndReplaceOperation.java

Lines changed: 1 addition & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -36,11 +36,11 @@
3636
import java.util.concurrent.TimeUnit;
3737

3838
import static com.mongodb.assertions.Assertions.notNull;
39+
import static com.mongodb.internal.operation.ServerVersionHelper.serverIsAtLeastVersionThreeDotTwo;
3940
import static com.mongodb.operation.CommandOperationHelper.CommandCreator;
4041
import static com.mongodb.operation.DocumentHelper.putIfNotNull;
4142
import static com.mongodb.operation.DocumentHelper.putIfNotZero;
4243
import static com.mongodb.operation.DocumentHelper.putIfTrue;
43-
import static com.mongodb.internal.operation.ServerVersionHelper.serverIsAtLeastVersionThreeDotTwo;
4444
import static com.mongodb.operation.OperationHelper.validateCollation;
4545
import static java.util.concurrent.TimeUnit.MILLISECONDS;
4646

@@ -261,8 +261,6 @@ public Boolean getBypassDocumentValidation() {
261261
/**
262262
* Sets the bypass document level validation flag.
263263
*
264-
* <p>Note: This only applies when an $out stage is specified</p>.
265-
*
266264
* @param bypassDocumentValidation If true, allows the write to opt-out of document level validation.
267265
* @return this
268266
* @since 3.2

driver-core/src/main/com/mongodb/operation/FindAndUpdateOperation.java

Lines changed: 1 addition & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -38,11 +38,11 @@
3838
import java.util.concurrent.TimeUnit;
3939

4040
import static com.mongodb.assertions.Assertions.notNull;
41+
import static com.mongodb.internal.operation.ServerVersionHelper.serverIsAtLeastVersionThreeDotTwo;
4142
import static com.mongodb.operation.CommandOperationHelper.CommandCreator;
4243
import static com.mongodb.operation.DocumentHelper.putIfNotNull;
4344
import static com.mongodb.operation.DocumentHelper.putIfNotZero;
4445
import static com.mongodb.operation.DocumentHelper.putIfTrue;
45-
import static com.mongodb.internal.operation.ServerVersionHelper.serverIsAtLeastVersionThreeDotTwo;
4646
import static com.mongodb.operation.OperationHelper.validateCollation;
4747
import static java.util.concurrent.TimeUnit.MILLISECONDS;
4848

@@ -264,8 +264,6 @@ public Boolean getBypassDocumentValidation() {
264264
/**
265265
* Sets the bypass document level validation flag.
266266
*
267-
* <p>Note: This only applies when an $out stage is specified</p>.
268-
*
269267
* @param bypassDocumentValidation If true, allows the write to opt-out of document level validation.
270268
* @return this
271269
* @since 3.2

0 commit comments

Comments
 (0)