Skip to content

Commit c814e94

Browse files
authored
Support explain for aggregate operations (#623)
Support explain for aggregate using the explain command, which started supporting explaining aggregate commands in MongoDB 3.6. JAVA-3909
1 parent 8d78ec9 commit c814e94

File tree

21 files changed

+363
-350
lines changed

21 files changed

+363
-350
lines changed

driver-core/src/main/com/mongodb/internal/operation/AggregateExplainOperation.java

Lines changed: 0 additions & 210 deletions
This file was deleted.

driver-core/src/main/com/mongodb/internal/operation/AggregateOperation.java

Lines changed: 17 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -18,27 +18,31 @@
1818

1919
import com.mongodb.ExplainVerbosity;
2020
import com.mongodb.MongoNamespace;
21+
import com.mongodb.client.model.Collation;
2122
import com.mongodb.internal.async.AsyncBatchCursor;
2223
import com.mongodb.internal.async.SingleResultCallback;
23-
import com.mongodb.client.model.Collation;
2424
import com.mongodb.internal.binding.AsyncReadBinding;
2525
import com.mongodb.internal.binding.ReadBinding;
2626
import com.mongodb.internal.client.model.AggregationLevel;
27+
import com.mongodb.internal.connection.NoOpSessionContext;
28+
import com.mongodb.lang.Nullable;
2729
import org.bson.BsonDocument;
2830
import org.bson.BsonValue;
2931
import org.bson.codecs.Decoder;
3032

3133
import java.util.List;
3234
import java.util.concurrent.TimeUnit;
3335

36+
import static com.mongodb.internal.operation.ExplainHelper.asExplainCommand;
37+
3438
/**
3539
* An operation that executes an aggregation query.
3640
*
3741
* @param <T> the operations result type.
3842
* @mongodb.driver.manual aggregation/ Aggregation
3943
* @since 3.0
4044
*/
41-
public class AggregateOperation<T> implements AsyncReadOperation<AsyncBatchCursor<T>>, ReadOperation<BatchCursor<T>> {
45+
public class AggregateOperation<T> implements AsyncExplainableReadOperation<AsyncBatchCursor<T>>, ExplainableReadOperation<BatchCursor<T>> {
4246
private final AggregateOperationImpl<T> wrapped;
4347
/**
4448
* Construct a new instance.
@@ -304,29 +308,26 @@ public void executeAsync(final AsyncReadBinding binding, final SingleResultCallb
304308
/**
305309
* Gets an operation whose execution explains this operation.
306310
*
307-
* @param explainVerbosity the explain verbosity
311+
* @param verbosity the explain verbosity
308312
* @return a read operation that when executed will explain this operation
309313
*/
310-
public ReadOperation<BsonDocument> asExplainableOperation(final ExplainVerbosity explainVerbosity) {
311-
return new AggregateExplainOperation(getNamespace(), getPipeline())
312-
.allowDiskUse(getAllowDiskUse())
313-
.maxTime(getMaxAwaitTime(TimeUnit.MILLISECONDS), TimeUnit.MILLISECONDS)
314-
.hint(wrapped.getHint())
315-
.retryReads(getRetryReads());
314+
public <R> ReadOperation<R> asExplainableOperation(@Nullable final ExplainVerbosity verbosity, final Decoder<R> resultDecoder) {
315+
return new CommandReadOperation<R>(getNamespace().getDatabaseName(),
316+
asExplainCommand(wrapped.getCommand(NoOpSessionContext.INSTANCE), verbosity),
317+
resultDecoder);
316318
}
317319

318320
/**
319321
* Gets an operation whose execution explains this operation.
320322
*
321-
* @param explainVerbosity the explain verbosity
323+
* @param verbosity the explain verbosity
322324
* @return a read operation that when executed will explain this operation
323325
*/
324-
public AsyncReadOperation<BsonDocument> asExplainableOperationAsync(final ExplainVerbosity explainVerbosity) {
325-
return new AggregateExplainOperation(getNamespace(), getPipeline())
326-
.retryReads(getRetryReads())
327-
.allowDiskUse(getAllowDiskUse())
328-
.maxTime(getMaxAwaitTime(TimeUnit.MILLISECONDS), TimeUnit.MILLISECONDS)
329-
.hint(wrapped.getHint());
326+
public <R> AsyncReadOperation<R> asAsyncExplainableOperation(@Nullable final ExplainVerbosity verbosity,
327+
final Decoder<R> resultDecoder) {
328+
return new CommandReadOperation<R>(getNamespace().getDatabaseName(),
329+
asExplainCommand(wrapped.getCommand(NoOpSessionContext.INSTANCE), verbosity),
330+
resultDecoder);
330331
}
331332

332333

driver-core/src/main/com/mongodb/internal/operation/AggregateOperationImpl.java

Lines changed: 5 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -202,16 +202,16 @@ private CommandCreator getCommandCreator(final SessionContext sessionContext) {
202202
@Override
203203
public BsonDocument create(final ServerDescription serverDescription, final ConnectionDescription connectionDescription) {
204204
validateReadConcernAndCollation(connectionDescription, sessionContext.getReadConcern(), collation);
205-
return getCommand(connectionDescription, sessionContext);
205+
return getCommand(sessionContext);
206206
}
207207
};
208208
}
209209

210-
private BsonDocument getCommand(final ConnectionDescription description, final SessionContext sessionContext) {
210+
BsonDocument getCommand(final SessionContext sessionContext) {
211211
BsonDocument commandDocument = new BsonDocument("aggregate", aggregateTarget.create());
212212

213213
appendReadConcernToCommand(sessionContext, commandDocument);
214-
commandDocument.put("pipeline", pipelineCreator.create(description, sessionContext));
214+
commandDocument.put("pipeline", pipelineCreator.create());
215215
if (maxTimeMS > 0) {
216216
commandDocument.put("maxTimeMS", maxTimeMS > Integer.MAX_VALUE
217217
? new BsonInt64(maxTimeMS) : new BsonInt32((int) maxTimeMS));
@@ -270,7 +270,7 @@ interface AggregateTarget {
270270
}
271271

272272
interface PipelineCreator {
273-
BsonArray create(ConnectionDescription connectionDescription, SessionContext sessionContext);
273+
BsonArray create();
274274
}
275275

276276
private static AggregateTarget defaultAggregateTarget(final AggregationLevel aggregationLevel, final String collectionName) {
@@ -289,7 +289,7 @@ public BsonValue create() {
289289
private static PipelineCreator defaultPipelineCreator(final List<BsonDocument> pipeline) {
290290
return new PipelineCreator() {
291291
@Override
292-
public BsonArray create(final ConnectionDescription connectionDescription, final SessionContext sessionContext) {
292+
public BsonArray create() {
293293
return new BsonArray(pipeline);
294294
}
295295
};

driver-core/src/main/com/mongodb/internal/operation/AggregateToCollectionOperation.java

Lines changed: 1 addition & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -16,13 +16,12 @@
1616

1717
package com.mongodb.internal.operation;
1818

19-
import com.mongodb.ExplainVerbosity;
2019
import com.mongodb.MongoNamespace;
2120
import com.mongodb.ReadConcern;
2221
import com.mongodb.WriteConcern;
23-
import com.mongodb.internal.async.SingleResultCallback;
2422
import com.mongodb.client.model.Collation;
2523
import com.mongodb.connection.ConnectionDescription;
24+
import com.mongodb.internal.async.SingleResultCallback;
2625
import com.mongodb.internal.binding.AsyncWriteBinding;
2726
import com.mongodb.internal.binding.WriteBinding;
2827
import com.mongodb.internal.client.model.AggregationLevel;
@@ -345,19 +344,6 @@ public AggregateToCollectionOperation hint(final BsonDocument hint) {
345344
return this;
346345
}
347346

348-
/**
349-
* Gets an operation whose execution explains this operation.
350-
*
351-
* @param explainVerbosity the explain verbosity
352-
* @return a read operation that when executed will explain this operation
353-
*/
354-
public ReadOperation<BsonDocument> asExplainableOperation(final ExplainVerbosity explainVerbosity) {
355-
return new AggregateExplainOperation(namespace, pipeline)
356-
.allowDiskUse(allowDiskUse)
357-
.maxTime(maxTimeMS, TimeUnit.MILLISECONDS)
358-
.hint(hint);
359-
}
360-
361347
@Override
362348
public Void execute(final WriteBinding binding) {
363349
return withConnection(binding, new CallableWithConnection<Void>() {

driver-core/src/main/com/mongodb/internal/operation/ChangeStreamOperation.java

Lines changed: 3 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -17,20 +17,18 @@
1717
package com.mongodb.internal.operation;
1818

1919
import com.mongodb.MongoNamespace;
20+
import com.mongodb.client.model.Collation;
21+
import com.mongodb.client.model.changestream.FullDocument;
2022
import com.mongodb.internal.async.AsyncAggregateResponseBatchCursor;
2123
import com.mongodb.internal.async.AsyncBatchCursor;
2224
import com.mongodb.internal.async.SingleResultCallback;
23-
import com.mongodb.client.model.Collation;
24-
import com.mongodb.client.model.changestream.FullDocument;
25-
import com.mongodb.connection.ConnectionDescription;
2625
import com.mongodb.internal.binding.AsyncConnectionSource;
2726
import com.mongodb.internal.binding.AsyncReadBinding;
2827
import com.mongodb.internal.binding.ConnectionSource;
2928
import com.mongodb.internal.binding.ReadBinding;
3029
import com.mongodb.internal.client.model.changestream.ChangeStreamLevel;
3130
import com.mongodb.internal.operation.OperationHelper.AsyncCallableWithSource;
3231
import com.mongodb.internal.operation.OperationHelper.CallableWithSource;
33-
import com.mongodb.internal.session.SessionContext;
3432
import org.bson.BsonArray;
3533
import org.bson.BsonBoolean;
3634
import org.bson.BsonDocument;
@@ -404,7 +402,7 @@ public BsonValue create() {
404402
private AggregateOperationImpl.PipelineCreator getPipelineCreator() {
405403
return new AggregateOperationImpl.PipelineCreator() {
406404
@Override
407-
public BsonArray create(final ConnectionDescription description, final SessionContext sessionContext) {
405+
public BsonArray create() {
408406
List<BsonDocument> changeStreamPipeline = new ArrayList<BsonDocument>();
409407
BsonDocument changeStream = new BsonDocument();
410408
if (fullDocument != FullDocument.DEFAULT) {

0 commit comments

Comments
 (0)