Skip to content

Commit 9065102

Browse files
committed
Added ClientSession to async API
MongoClient, MongoDatabase and MongoCollection operations now support being passed a ClientSession. JAVA-2621
1 parent ad996dc commit 9065102

File tree

46 files changed

+3450
-1907
lines changed

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

46 files changed

+3450
-1907
lines changed

driver-async/src/main/com/mongodb/async/client/AggregateIterableImpl.java

Lines changed: 60 additions & 99 deletions
Original file line numberDiff line numberDiff line change
@@ -16,85 +16,82 @@
1616

1717
package com.mongodb.async.client;
1818

19-
import com.mongodb.Block;
20-
import com.mongodb.Function;
2119
import com.mongodb.MongoNamespace;
2220
import com.mongodb.ReadConcern;
2321
import com.mongodb.ReadPreference;
2422
import com.mongodb.WriteConcern;
2523
import com.mongodb.async.AsyncBatchCursor;
2624
import com.mongodb.async.SingleResultCallback;
2725
import com.mongodb.client.model.Collation;
28-
import com.mongodb.client.model.FindOptions;
2926
import com.mongodb.operation.AggregateOperation;
3027
import com.mongodb.operation.AggregateToCollectionOperation;
3128
import com.mongodb.operation.AsyncOperationExecutor;
29+
import com.mongodb.operation.AsyncReadOperation;
30+
import com.mongodb.operation.FindOperation;
31+
import com.mongodb.session.ClientSession;
3232
import org.bson.BsonDocument;
3333
import org.bson.BsonValue;
3434
import org.bson.codecs.configuration.CodecRegistry;
3535
import org.bson.conversions.Bson;
3636

3737
import java.util.ArrayList;
38-
import java.util.Collection;
3938
import java.util.List;
4039
import java.util.concurrent.TimeUnit;
4140

42-
import static com.mongodb.ReadPreference.primary;
4341
import static com.mongodb.assertions.Assertions.notNull;
4442
import static java.util.concurrent.TimeUnit.MILLISECONDS;
4543

4644

47-
class AggregateIterableImpl<TDocument, TResult> implements AggregateIterable<TResult> {
45+
class AggregateIterableImpl<TDocument, TResult> extends MongoIterableImpl<TResult> implements AggregateIterable<TResult> {
4846
private final MongoNamespace namespace;
4947
private final Class<TDocument> documentClass;
5048
private final Class<TResult> resultClass;
51-
private final ReadPreference readPreference;
52-
private final ReadConcern readConcern;
5349
private final WriteConcern writeConcern;
5450
private final CodecRegistry codecRegistry;
55-
private final AsyncOperationExecutor executor;
5651
private final List<? extends Bson> pipeline;
5752

5853
private Boolean allowDiskUse;
59-
private Integer batchSize;
60-
private long maxAwaitTimeMS;
6154
private long maxTimeMS;
55+
private long maxAwaitTimeMS;
6256
private Boolean useCursor;
6357
private Boolean bypassDocumentValidation;
6458
private Collation collation;
6559
private String comment;
6660
private Bson hint;
6761

68-
AggregateIterableImpl(final MongoNamespace namespace, final Class<TDocument> documentClass, final Class<TResult> resultClass,
69-
final CodecRegistry codecRegistry, final ReadPreference readPreference, final ReadConcern readConcern,
70-
final WriteConcern writeConcern, final AsyncOperationExecutor executor, final List<? extends Bson> pipeline) {
62+
AggregateIterableImpl(final ClientSession clientSession, final MongoNamespace namespace, final Class<TDocument> documentClass,
63+
final Class<TResult> resultClass, final CodecRegistry codecRegistry, final ReadPreference readPreference,
64+
final ReadConcern readConcern, final WriteConcern writeConcern, final AsyncOperationExecutor executor,
65+
final List<? extends Bson> pipeline) {
66+
super(clientSession, executor, readConcern, readPreference);
7167
this.namespace = notNull("namespace", namespace);
7268
this.documentClass = notNull("documentClass", documentClass);
7369
this.resultClass = notNull("resultClass", resultClass);
7470
this.codecRegistry = notNull("codecRegistry", codecRegistry);
75-
this.readPreference = notNull("readPreference", readPreference);
76-
this.readConcern = notNull("readConcern", readConcern);
7771
this.writeConcern = notNull("writeConcern", writeConcern);
78-
this.executor = notNull("executor", executor);
7972
this.pipeline = notNull("pipeline", pipeline);
8073
}
8174

8275
@Override
83-
public AggregateIterable<TResult> allowDiskUse(final Boolean allowDiskUse) {
84-
this.allowDiskUse = allowDiskUse;
85-
return this;
76+
public void toCollection(final SingleResultCallback<Void> callback) {
77+
List<BsonDocument> aggregateList = createBsonDocumentList(pipeline);
78+
79+
if (getOutCollection(aggregateList) == null) {
80+
throw new IllegalStateException("The last stage of the aggregation pipeline must be $out");
81+
}
82+
83+
getExecutor().execute(createAggregateToCollectionOperation(aggregateList), getClientSession(), callback);
8684
}
8785

8886
@Override
89-
public AggregateIterable<TResult> batchSize(final int batchSize) {
90-
this.batchSize = batchSize;
87+
public AggregateIterable<TResult> allowDiskUse(final Boolean allowDiskUse) {
88+
this.allowDiskUse = allowDiskUse;
9189
return this;
9290
}
9391

9492
@Override
95-
public AggregateIterable<TResult> maxAwaitTime(final long maxAwaitTime, final TimeUnit timeUnit) {
96-
notNull("timeUnit", timeUnit);
97-
this.maxAwaitTimeMS = TimeUnit.MILLISECONDS.convert(maxAwaitTime, timeUnit);
93+
public AggregateIterable<TResult> batchSize(final int batchSize) {
94+
super.batchSize(batchSize);
9895
return this;
9996
}
10097

@@ -105,7 +102,6 @@ public AggregateIterable<TResult> maxTime(final long maxTime, final TimeUnit tim
105102
return this;
106103
}
107104

108-
109105
@Override
110106
@Deprecated
111107
public AggregateIterable<TResult> useCursor(final Boolean useCursor) {
@@ -114,51 +110,10 @@ public AggregateIterable<TResult> useCursor(final Boolean useCursor) {
114110
}
115111

116112
@Override
117-
public void toCollection(final SingleResultCallback<Void> callback) {
118-
List<BsonDocument> aggregateList = createBsonDocumentList();
119-
BsonValue outCollection = getAggregateOutCollection(aggregateList);
120-
121-
if (outCollection == null) {
122-
throw new IllegalStateException("The last stage of the aggregation pipeline must be $out");
123-
}
124-
125-
executor.execute(new AggregateToCollectionOperation(namespace, aggregateList, writeConcern)
126-
.maxTime(maxTimeMS, MILLISECONDS)
127-
.allowDiskUse(allowDiskUse)
128-
.collation(collation)
129-
.hint(hint == null ? null : hint.toBsonDocument(documentClass, codecRegistry))
130-
.comment(comment), callback);
131-
}
132-
133-
@Override
134-
public void first(final SingleResultCallback<TResult> callback) {
135-
notNull("callback", callback);
136-
execute().first(callback);
137-
}
138-
139-
@Override
140-
public void forEach(final Block<? super TResult> block, final SingleResultCallback<Void> callback) {
141-
notNull("block", block);
142-
notNull("callback", callback);
143-
execute().forEach(block, callback);
144-
}
145-
146-
@Override
147-
public <A extends Collection<? super TResult>> void into(final A target, final SingleResultCallback<A> callback) {
148-
notNull("target", target);
149-
notNull("callback", callback);
150-
execute().into(target, callback);
151-
}
152-
153-
@Override
154-
public <U> MongoIterable<U> map(final Function<TResult, U> mapper) {
155-
return new MappingIterable<TResult, U>(this, mapper);
156-
}
157-
158-
@Override
159-
public void batchCursor(final SingleResultCallback<AsyncBatchCursor<TResult>> callback) {
160-
notNull("callback", callback);
161-
execute().batchCursor(callback);
113+
public AggregateIterable<TResult> maxAwaitTime(final long maxAwaitTime, final TimeUnit timeUnit) {
114+
notNull("timeUnit", timeUnit);
115+
this.maxAwaitTimeMS = TimeUnit.MILLISECONDS.convert(maxAwaitTime, timeUnit);
116+
return this;
162117
}
163118

164119
@Override
@@ -185,53 +140,59 @@ public AggregateIterable<TResult> hint(final Bson hint) {
185140
return this;
186141
}
187142

143+
@Override
188144
@SuppressWarnings("deprecation")
189-
private MongoIterable<TResult> execute() {
190-
List<BsonDocument> aggregateList = createBsonDocumentList();
191-
BsonValue outCollection = getAggregateOutCollection(aggregateList);
145+
AsyncReadOperation<AsyncBatchCursor<TResult>> asAsyncReadOperation() {
146+
List<BsonDocument> aggregateList = createBsonDocumentList(pipeline);
147+
BsonValue outCollection = getOutCollection(aggregateList);
192148

193149
if (outCollection != null) {
194-
AggregateToCollectionOperation operation = new AggregateToCollectionOperation(namespace, aggregateList, writeConcern)
195-
.maxTime(maxTimeMS, MILLISECONDS)
196-
.allowDiskUse(allowDiskUse)
197-
.bypassDocumentValidation(bypassDocumentValidation)
198-
.collation(collation)
199-
.hint(hint == null ? null : hint.toBsonDocument(documentClass, codecRegistry))
200-
.comment(comment);
201-
MongoIterable<TResult> delegated = new FindIterableImpl<TDocument, TResult>(new MongoNamespace(namespace.getDatabaseName(),
202-
outCollection.asString().getValue()), documentClass, resultClass, codecRegistry, primary(), readConcern,
203-
executor, new BsonDocument(), new FindOptions().collation(collation).maxAwaitTime(maxAwaitTimeMS, MILLISECONDS));
204-
if (batchSize != null) {
205-
delegated.batchSize(batchSize);
150+
AggregateToCollectionOperation aggregateToCollectionOperation = createAggregateToCollectionOperation(aggregateList);
151+
FindOperation<TResult> findOperation =
152+
new FindOperation<TResult>(new MongoNamespace(namespace.getDatabaseName(), outCollection.asString().getValue()),
153+
codecRegistry.get(resultClass))
154+
.readConcern(getReadConcern())
155+
.collation(collation)
156+
.maxAwaitTime(maxAwaitTimeMS, MILLISECONDS);
157+
if (getBatchSize() != null) {
158+
findOperation.batchSize(getBatchSize());
206159
}
207-
return new AwaitingWriteOperationIterable<TResult, Void>(operation, executor, delegated);
160+
return new AggregateToCollectionThenFindOperation<TResult>(aggregateToCollectionOperation, findOperation);
208161
} else {
209-
return new OperationIterable<TResult>(new AggregateOperation<TResult>(namespace, aggregateList, codecRegistry.get(resultClass))
210-
.maxAwaitTime(maxAwaitTimeMS, MILLISECONDS)
162+
return new AggregateOperation<TResult>(namespace, aggregateList, codecRegistry.get(resultClass))
211163
.maxTime(maxTimeMS, MILLISECONDS)
164+
.maxAwaitTime(maxAwaitTimeMS, MILLISECONDS)
212165
.allowDiskUse(allowDiskUse)
213-
.batchSize(batchSize)
166+
.batchSize(getBatchSize())
214167
.useCursor(useCursor)
215-
.readConcern(readConcern)
168+
.readConcern(getReadConcern())
216169
.collation(collation)
217170
.hint(hint == null ? null : hint.toBsonDocument(documentClass, codecRegistry))
218-
.comment(comment),
219-
readPreference,
220-
executor);
171+
.comment(comment);
221172
}
222173
}
223174

224-
private BsonValue getAggregateOutCollection(final List<BsonDocument> aggregateList) {
175+
private BsonValue getOutCollection(final List<BsonDocument> aggregateList) {
225176
return aggregateList.size() == 0 ? null : aggregateList.get(aggregateList.size() - 1).get("$out");
226177
}
227178

228-
private List<BsonDocument> createBsonDocumentList() {
179+
private AggregateToCollectionOperation createAggregateToCollectionOperation(final List<BsonDocument> aggregateList) {
180+
return new AggregateToCollectionOperation(namespace, aggregateList, writeConcern)
181+
.maxTime(maxTimeMS, MILLISECONDS)
182+
.allowDiskUse(allowDiskUse)
183+
.bypassDocumentValidation(bypassDocumentValidation)
184+
.collation(collation)
185+
.hint(hint == null ? null : hint.toBsonDocument(documentClass, codecRegistry))
186+
.comment(comment);
187+
}
188+
189+
private List<BsonDocument> createBsonDocumentList(final List<? extends Bson> pipeline) {
229190
List<BsonDocument> aggregateList = new ArrayList<BsonDocument>(pipeline.size());
230-
for (Bson document : pipeline) {
231-
if (document == null) {
191+
for (Bson obj : pipeline) {
192+
if (obj == null) {
232193
throw new IllegalArgumentException("pipeline can not contain a null value");
233194
}
234-
aggregateList.add(document.toBsonDocument(documentClass, codecRegistry));
195+
aggregateList.add(obj.toBsonDocument(documentClass, codecRegistry));
235196
}
236197
return aggregateList;
237198
}
Lines changed: 59 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,59 @@
1+
/*
2+
* Copyright 2017 MongoDB, Inc.
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* http://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
16+
17+
package com.mongodb.async.client;
18+
19+
import com.mongodb.async.AsyncBatchCursor;
20+
import com.mongodb.async.SingleResultCallback;
21+
import com.mongodb.binding.AsyncReadBinding;
22+
import com.mongodb.binding.AsyncWriteBinding;
23+
import com.mongodb.operation.AsyncReadOperation;
24+
import com.mongodb.operation.AsyncWriteOperation;
25+
import com.mongodb.operation.FindOperation;
26+
27+
class AggregateToCollectionThenFindOperation<T> implements AsyncReadOperation<AsyncBatchCursor<T>> {
28+
private final AsyncWriteOperation<Void> aggregateToCollectionOperation;
29+
private final FindOperation<T> findOperation;
30+
31+
AggregateToCollectionThenFindOperation(final AsyncWriteOperation<Void> aggregateToCollectionOperation,
32+
final FindOperation<T> findOperation) {
33+
this.aggregateToCollectionOperation = aggregateToCollectionOperation;
34+
this.findOperation = findOperation;
35+
}
36+
37+
public AsyncWriteOperation<Void> getAggregateToCollectionOperation() {
38+
return aggregateToCollectionOperation;
39+
}
40+
41+
public FindOperation<T> getFindOperation() {
42+
return findOperation;
43+
}
44+
45+
@Override
46+
@SuppressWarnings("unchecked")
47+
public void executeAsync(final AsyncReadBinding binding, final SingleResultCallback<AsyncBatchCursor<T>> callback) {
48+
aggregateToCollectionOperation.executeAsync((AsyncWriteBinding) binding, new SingleResultCallback<Void>() {
49+
@Override
50+
public void onResult(final Void result, final Throwable t) {
51+
if (t != null) {
52+
callback.onResult(null, t);
53+
} else {
54+
findOperation.executeAsync(binding, callback);
55+
}
56+
}
57+
});
58+
}
59+
}

0 commit comments

Comments
 (0)