Skip to content

Commit 1324d34

Browse files
rozzajyemin
authored andcommitted
Added db.aggregate method
JAVA-3116
1 parent 7b4ba07 commit 1324d34

38 files changed

+1023
-117
lines changed

driver-async/src/main/com/mongodb/async/client/AggregateIterableImpl.java

Lines changed: 16 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -22,6 +22,7 @@
2222
import com.mongodb.WriteConcern;
2323
import com.mongodb.async.AsyncBatchCursor;
2424
import com.mongodb.async.SingleResultCallback;
25+
import com.mongodb.client.model.AggregationLevel;
2526
import com.mongodb.client.model.Collation;
2627
import com.mongodb.client.model.FindOptions;
2728
import com.mongodb.internal.operation.AsyncOperations;
@@ -46,6 +47,7 @@ class AggregateIterableImpl<TDocument, TResult> extends MongoIterableImpl<TResul
4647
private final Class<TResult> resultClass;
4748
private final CodecRegistry codecRegistry;
4849
private final List<? extends Bson> pipeline;
50+
private final AggregationLevel aggregationLevel;
4951

5052
private Boolean allowDiskUse;
5153
private long maxTimeMS;
@@ -56,17 +58,26 @@ class AggregateIterableImpl<TDocument, TResult> extends MongoIterableImpl<TResul
5658
private String comment;
5759
private Bson hint;
5860

61+
AggregateIterableImpl(@Nullable final ClientSession clientSession, final String databaseName, final Class<TDocument> documentClass,
62+
final Class<TResult> resultClass, final CodecRegistry codecRegistry, final ReadPreference readPreference,
63+
final ReadConcern readConcern, final WriteConcern writeConcern, final OperationExecutor executor,
64+
final List<? extends Bson> pipeline, final AggregationLevel aggregationLevel) {
65+
this(clientSession, new MongoNamespace(databaseName, "ignored"), documentClass, resultClass, codecRegistry, readPreference,
66+
readConcern, writeConcern, executor, pipeline, aggregationLevel);
67+
}
68+
5969
AggregateIterableImpl(@Nullable final ClientSession clientSession, final MongoNamespace namespace, final Class<TDocument> documentClass,
6070
final Class<TResult> resultClass, final CodecRegistry codecRegistry, final ReadPreference readPreference,
6171
final ReadConcern readConcern, final WriteConcern writeConcern, final OperationExecutor executor,
62-
final List<? extends Bson> pipeline) {
72+
final List<? extends Bson> pipeline, final AggregationLevel aggregationLevel) {
6373
super(clientSession, executor, readConcern, readPreference);
6474
this.operations = new AsyncOperations<TDocument>(namespace, documentClass, readPreference, codecRegistry, writeConcern, false);
6575
this.namespace = notNull("namespace", namespace);
6676
this.documentClass = notNull("documentClass", documentClass);
6777
this.resultClass = notNull("resultClass", resultClass);
6878
this.codecRegistry = notNull("codecRegistry", codecRegistry);
6979
this.pipeline = notNull("pipeline", pipeline);
80+
this.aggregationLevel = notNull("aggregationLevel", aggregationLevel);
7081
}
7182

7283
@Override
@@ -77,7 +88,7 @@ public void toCollection(final SingleResultCallback<Void> callback) {
7788
}
7889

7990
getExecutor().execute(operations.aggregateToCollection(pipeline, maxTimeMS, allowDiskUse, bypassDocumentValidation, collation, hint,
80-
comment), getReadConcern(), getClientSession(), callback);
91+
comment, aggregationLevel), getReadConcern(), getClientSession(), callback);
8192
}
8293

8394
@Override
@@ -144,7 +155,8 @@ AsyncReadOperation<AsyncBatchCursor<TResult>> asAsyncReadOperation() {
144155

145156
if (outCollection != null) {
146157
AsyncWriteOperation<Void> aggregateToCollectionOperation =
147-
operations.aggregateToCollection(pipeline, maxTimeMS, allowDiskUse, bypassDocumentValidation, collation, hint, comment);
158+
operations.aggregateToCollection(pipeline, maxTimeMS, allowDiskUse, bypassDocumentValidation, collation, hint, comment,
159+
aggregationLevel);
148160

149161
FindOptions findOptions = new FindOptions().collation(collation);
150162
Integer batchSize = getBatchSize();
@@ -158,7 +170,7 @@ AsyncReadOperation<AsyncBatchCursor<TResult>> asAsyncReadOperation() {
158170
return new WriteOperationThenCursorReadOperation<TResult>(aggregateToCollectionOperation, findOperation);
159171
} else {
160172
return operations.aggregate(pipeline, resultClass, maxTimeMS, maxAwaitTimeMS, getBatchSize(), collation,
161-
hint, comment, allowDiskUse, useCursor);
173+
hint, comment, allowDiskUse, useCursor, aggregationLevel);
162174
}
163175

164176
}

driver-async/src/main/com/mongodb/async/client/MongoCollectionImpl.java

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -29,6 +29,7 @@
2929
import com.mongodb.async.SingleResultCallback;
3030
import com.mongodb.bulk.BulkWriteResult;
3131
import com.mongodb.bulk.WriteRequest;
32+
import com.mongodb.client.model.AggregationLevel;
3233
import com.mongodb.client.model.BulkWriteOptions;
3334
import com.mongodb.client.model.CountOptions;
3435
import com.mongodb.internal.client.model.CountStrategy;
@@ -346,7 +347,7 @@ private <TResult> AggregateIterable<TResult> createAggregateIterable(@Nullable f
346347
final List<? extends Bson> pipeline,
347348
final Class<TResult> resultClass) {
348349
return new AggregateIterableImpl<TDocument, TResult>(clientSession, namespace, documentClass, resultClass, codecRegistry,
349-
readPreference, readConcern, writeConcern, executor, pipeline);
350+
readPreference, readConcern, writeConcern, executor, pipeline, AggregationLevel.COLLECTION);
350351
}
351352

352353
@Override

driver-async/src/main/com/mongodb/async/client/MongoDatabase.java

Lines changed: 54 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -502,4 +502,58 @@ void createView(ClientSession clientSession, String viewName, String viewOn, Lis
502502
*/
503503
<TResult> ChangeStreamIterable<TResult> watch(ClientSession clientSession, List<? extends Bson> pipeline, Class<TResult> resultClass);
504504

505+
/**
506+
* Runs an aggregation framework pipeline on the database for pipeline stages
507+
* that do not require an underlying collection, such as {@code $currentOp} and {@code $listLocalSessions}.
508+
*
509+
* @param pipeline the aggregation pipeline
510+
* @return an iterable containing the result of the aggregation operation
511+
* @since 3.10
512+
* @mongodb.driver.manual reference/command/aggregate/#dbcmd.aggregate Aggregate Command
513+
* @mongodb.server.release 3.6
514+
*/
515+
AggregateIterable<Document> aggregate(List<? extends Bson> pipeline);
516+
517+
/**
518+
* Runs an aggregation framework pipeline on the database for pipeline stages
519+
* that do not require an underlying collection, such as {@code $currentOp} and {@code $listLocalSessions}.
520+
*
521+
* @param pipeline the aggregation pipeline
522+
* @param resultClass the class to decode each document into
523+
* @param <TResult> the target document type of the iterable.
524+
* @return an iterable containing the result of the aggregation operation
525+
* @since 3.10
526+
* @mongodb.driver.manual reference/command/aggregate/#dbcmd.aggregate Aggregate Command
527+
* @mongodb.server.release 3.6
528+
*/
529+
<TResult> AggregateIterable<TResult> aggregate(List<? extends Bson> pipeline, Class<TResult> resultClass);
530+
531+
/**
532+
* Runs an aggregation framework pipeline on the database for pipeline stages
533+
* that do not require an underlying collection, such as {@code $currentOp} and {@code $listLocalSessions}.
534+
*
535+
* @param clientSession the client session with which to associate this operation
536+
* @param pipeline the aggregation pipeline
537+
* @return an iterable containing the result of the aggregation operation
538+
* @since 3.10
539+
* @mongodb.driver.manual reference/command/aggregate/#dbcmd.aggregate Aggregate Command
540+
* @mongodb.server.release 3.6
541+
*/
542+
AggregateIterable<Document> aggregate(ClientSession clientSession, List<? extends Bson> pipeline);
543+
544+
/**
545+
* Runs an aggregation framework pipeline on the database for pipeline stages
546+
* that do not require an underlying collection, such as {@code $currentOp} and {@code $listLocalSessions}.
547+
*
548+
* @param clientSession the client session with which to associate this operation
549+
* @param pipeline the aggregation pipeline
550+
* @param resultClass the class to decode each document into
551+
* @param <TResult> the target document type of the iterable.
552+
* @return an iterable containing the result of the aggregation operation
553+
* @since 3.10
554+
* @mongodb.driver.manual reference/command/aggregate/#dbcmd.aggregate Aggregate Command
555+
* @mongodb.server.release 3.6
556+
*/
557+
<TResult> AggregateIterable<TResult> aggregate(ClientSession clientSession, List<? extends Bson> pipeline, Class<TResult> resultClass);
558+
505559
}

driver-async/src/main/com/mongodb/async/client/MongoDatabaseImpl.java

Lines changed: 29 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -23,6 +23,7 @@
2323
import com.mongodb.ReadPreference;
2424
import com.mongodb.WriteConcern;
2525
import com.mongodb.async.SingleResultCallback;
26+
import com.mongodb.client.model.AggregationLevel;
2627
import com.mongodb.client.model.CreateCollectionOptions;
2728
import com.mongodb.client.model.CreateViewOptions;
2829
import com.mongodb.client.model.IndexOptionDefaults;
@@ -368,6 +369,34 @@ public <TResult> ChangeStreamIterable<TResult> watch(final ClientSession clientS
368369
return createChangeStreamIterable(clientSession, pipeline, resultClass);
369370
}
370371

372+
@Override
373+
public AggregateIterable<Document> aggregate(final List<? extends Bson> pipeline) {
374+
return aggregate(pipeline, Document.class);
375+
}
376+
377+
@Override
378+
public <TResult> AggregateIterable<TResult> aggregate(final List<? extends Bson> pipeline, final Class<TResult> resultClass) {
379+
return createAggregateIterable(null, pipeline, resultClass);
380+
}
381+
382+
@Override
383+
public AggregateIterable<Document> aggregate(final ClientSession clientSession, final List<? extends Bson> pipeline) {
384+
return aggregate(clientSession, pipeline, Document.class);
385+
}
386+
387+
@Override
388+
public <TResult> AggregateIterable<TResult> aggregate(final ClientSession clientSession, final List<? extends Bson> pipeline,
389+
final Class<TResult> resultClass) {
390+
return createAggregateIterable(notNull("clientSession", clientSession), pipeline, resultClass);
391+
}
392+
393+
private <TResult> AggregateIterable<TResult> createAggregateIterable(@Nullable final ClientSession clientSession,
394+
final List<? extends Bson> pipeline,
395+
final Class<TResult> resultClass) {
396+
return new AggregateIterableImpl<Document, TResult>(clientSession, name, Document.class, resultClass, codecRegistry,
397+
readPreference, readConcern, writeConcern, executor, pipeline, AggregationLevel.DATABASE);
398+
}
399+
371400
private <TResult> ChangeStreamIterable<TResult> createChangeStreamIterable(@Nullable final ClientSession clientSession,
372401
final List<? extends Bson> pipeline,
373402
final Class<TResult> resultClass) {

driver-async/src/test/functional/com/mongodb/async/client/CrudTest.java

Lines changed: 9 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,7 @@
1919
import com.mongodb.MongoNamespace;
2020
import org.bson.BsonArray;
2121
import org.bson.BsonDocument;
22+
import org.bson.BsonString;
2223
import org.bson.BsonValue;
2324
import org.junit.Before;
2425
import org.junit.Test;
@@ -37,21 +38,25 @@
3738
import static com.mongodb.ClusterFixture.serverVersionGreaterThan;
3839
import static com.mongodb.ClusterFixture.serverVersionLessThan;
3940
import static com.mongodb.async.client.Fixture.getDefaultDatabase;
41+
import static com.mongodb.async.client.Fixture.getMongoClient;
4042
import static org.junit.Assert.assertEquals;
4143

4244
// See https://github.com/mongodb/specifications/tree/master/source/crud/tests
4345
@RunWith(Parameterized.class)
4446
public class CrudTest extends DatabaseTestCase {
4547
private final String filename;
4648
private final String description;
49+
private final String databaseName;
4750
private final BsonArray data;
4851
private final BsonDocument definition;
4952
private MongoCollection<BsonDocument> collection;
5053
private JsonPoweredCrudTestHelper helper;
5154

52-
public CrudTest(final String filename, final String description, final BsonArray data, final BsonDocument definition) {
55+
public CrudTest(final String filename, final String description, final String databaseName,
56+
final BsonArray data, final BsonDocument definition) {
5357
this.filename = filename;
5458
this.description = description;
59+
this.databaseName = databaseName;
5560
this.data = data;
5661
this.definition = definition;
5762
}
@@ -60,9 +65,9 @@ public CrudTest(final String filename, final String description, final BsonArray
6065
@Override
6166
public void setUp() {
6267
super.setUp();
63-
collection = Fixture.initializeCollection(new MongoNamespace(getDefaultDatabaseName(), getClass().getName()))
68+
collection = Fixture.initializeCollection(new MongoNamespace(databaseName, getClass().getName()))
6469
.withDocumentClass(BsonDocument.class);
65-
helper = new JsonPoweredCrudTestHelper(description, getDefaultDatabase(), collection);
70+
helper = new JsonPoweredCrudTestHelper(description, getMongoClient().getDatabase(databaseName), collection);
6671
if (!data.isEmpty()) {
6772
new MongoOperation<Void>() {
6873
@Override
@@ -124,6 +129,7 @@ && serverVersionGreaterThan(testDocument.getString("maxServerVersion").getValue(
124129
}
125130
for (BsonValue test : testDocument.getArray("tests")) {
126131
data.add(new Object[]{file.getName(), test.asDocument().getString("description").getValue(),
132+
testDocument.getString("database_name", new BsonString(getDefaultDatabaseName())).getValue(),
127133
testDocument.getArray("data"), test.asDocument()});
128134
}
129135
}

driver-async/src/test/functional/com/mongodb/async/client/JsonPoweredCrudTestHelper.java

Lines changed: 61 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -57,6 +57,7 @@
5757
import org.bson.BsonDocument;
5858
import org.bson.BsonInt32;
5959
import org.bson.BsonNull;
60+
import org.bson.BsonString;
6061
import org.bson.BsonValue;
6162
import org.junit.AssumptionViolatedException;
6263

@@ -66,7 +67,9 @@
6667
import java.util.Collections;
6768
import java.util.List;
6869

70+
import static com.mongodb.async.client.Fixture.isSharded;
6971
import static java.lang.String.format;
72+
import static org.junit.Assume.assumeTrue;
7073

7174
public class JsonPoweredCrudTestHelper {
7275
private final String description;
@@ -85,11 +88,11 @@ BsonDocument getOperationResults(final BsonDocument operation) {
8588
}
8689

8790
BsonDocument getOperationResults(final BsonDocument operation, @Nullable final ClientSession clientSession) {
88-
String name = operation.getString("name").getValue();
8991
BsonDocument collectionOptions = operation.getDocument("collectionOptions", new BsonDocument());
9092
BsonDocument arguments = operation.getDocument("arguments");
9193

92-
String methodName = "get" + name.substring(0, 1).toUpperCase() + name.substring(1) + "Result";
94+
String methodName = createMethodName(operation.getString("name").getValue(),
95+
operation.getString("object", new BsonString("")).getValue());
9396
try {
9497
Method method = getClass().getDeclaredMethod(methodName, BsonDocument.class, BsonDocument.class, ClientSession.class);
9598
return (BsonDocument) method.invoke(this, collectionOptions, arguments, clientSession);
@@ -108,6 +111,19 @@ BsonDocument getOperationResults(final BsonDocument operation, @Nullable final C
108111
}
109112
}
110113

114+
private String createMethodName(final String name, final String object) {
115+
StringBuilder builder = new StringBuilder();
116+
builder.append("get");
117+
if (!object.isEmpty() && !object.equals("collection")) {
118+
builder.append(object.substring(0, 1).toUpperCase());
119+
builder.append(object.substring(1));
120+
}
121+
builder.append(name.substring(0, 1).toUpperCase());
122+
builder.append(name.substring(1));
123+
builder.append("Result");
124+
return builder.toString();
125+
}
126+
111127
<T> T futureResult(final FutureResultCallback<T> callback) {
112128
try {
113129
return callback.get();
@@ -195,6 +211,11 @@ BsonDocument toResult(@Nullable final BsonValue results) {
195211
return new BsonDocument("result", results != null ? results : BsonNull.VALUE);
196212
}
197213

214+
BsonDocument getDatabaseRunCommandResult(final BsonDocument collectionOptions, final BsonDocument arguments,
215+
@Nullable final ClientSession clientSession) {
216+
return getRunCommandResult(collectionOptions, arguments, clientSession);
217+
}
218+
198219
BsonDocument getRunCommandResult(final BsonDocument collectionOptions, final BsonDocument arguments,
199220
@Nullable final ClientSession clientSession) {
200221
BsonDocument command = arguments.getDocument("command");
@@ -247,6 +268,44 @@ BsonDocument getAggregateResult(final BsonDocument collectionOptions, final Bson
247268
return toResult(iterable);
248269
}
249270

271+
BsonDocument getDatabaseAggregateResult(final BsonDocument collectionOptions, final BsonDocument arguments,
272+
@Nullable final ClientSession clientSession) {
273+
assumeTrue(!isSharded());
274+
List<BsonDocument> pipeline = new ArrayList<BsonDocument>();
275+
for (BsonValue stage : arguments.getArray("pipeline")) {
276+
pipeline.add(stage.asDocument());
277+
}
278+
279+
AggregateIterable<BsonDocument> iterable;
280+
if (clientSession == null) {
281+
iterable = database.aggregate(pipeline, BsonDocument.class);
282+
} else {
283+
iterable = database.aggregate(clientSession, pipeline, BsonDocument.class);
284+
}
285+
286+
if (arguments.containsKey("allowDiskUse")) {
287+
iterable.allowDiskUse(arguments.getBoolean("allowDiskUse").getValue());
288+
}
289+
if (arguments.containsKey("batchSize")) {
290+
iterable.batchSize(arguments.getNumber("batchSize").intValue());
291+
}
292+
if (arguments.containsKey("collation")) {
293+
iterable.collation(getCollation(arguments.getDocument("collation")));
294+
}
295+
296+
BsonDocument results = toResult(iterable);
297+
for (BsonValue result : results.getArray("result", new BsonArray())) {
298+
if (result.isDocument()) {
299+
BsonDocument command = result.asDocument().getDocument("command", new BsonDocument());
300+
command.remove("$readPreference");
301+
command.remove("$clusterTime");
302+
command.remove("signature");
303+
command.remove("keyId");
304+
}
305+
}
306+
return results;
307+
}
308+
250309
@SuppressWarnings("deprecation")
251310
BsonDocument getCountResult(final BsonDocument collectionOptions, final BsonDocument arguments,
252311
@Nullable final ClientSession clientSession) {

0 commit comments

Comments
 (0)