Skip to content

Commit 3aeacf5

Browse files
committed
Fix reactive streams batch cursor regression. (#652)
The BatchCursorPublisher now waits for demand to be signalled before creating the cursor. The batch cursor publisher also internally respects the batch size. Its either the configured batch size if set or the amount requested. JAVA-3973
1 parent b9ac4a1 commit 3aeacf5

20 files changed

+525
-155
lines changed

driver-core/src/main/com/mongodb/internal/client/model/FindOptions.java

Lines changed: 34 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -60,6 +60,40 @@ public final class FindOptions {
6060
public FindOptions() {
6161
}
6262

63+
//CHECKSTYLE:OFF
64+
FindOptions(
65+
final int batchSize, final int limit, final Bson projection, final long maxTimeMS, final long maxAwaitTimeMS, final int skip,
66+
final Bson sort, final CursorType cursorType, final boolean noCursorTimeout, final boolean oplogReplay, final boolean partial,
67+
final Collation collation, final String comment, final Bson hint, final String hintString, final Bson max, final Bson min,
68+
final boolean returnKey, final boolean showRecordId, final Boolean allowDiskUse) {
69+
this.batchSize = batchSize;
70+
this.limit = limit;
71+
this.projection = projection;
72+
this.maxTimeMS = maxTimeMS;
73+
this.maxAwaitTimeMS = maxAwaitTimeMS;
74+
this.skip = skip;
75+
this.sort = sort;
76+
this.cursorType = cursorType;
77+
this.noCursorTimeout = noCursorTimeout;
78+
this.oplogReplay = oplogReplay;
79+
this.partial = partial;
80+
this.collation = collation;
81+
this.comment = comment;
82+
this.hint = hint;
83+
this.hintString = hintString;
84+
this.max = max;
85+
this.min = min;
86+
this.returnKey = returnKey;
87+
this.showRecordId = showRecordId;
88+
this.allowDiskUse = allowDiskUse;
89+
}
90+
//CHECKSTYLE:ON
91+
92+
public FindOptions withBatchSize(final int batchSize) {
93+
return new FindOptions(batchSize, limit, projection, maxTimeMS, maxAwaitTimeMS, skip, sort, cursorType, noCursorTimeout,
94+
oplogReplay, partial, collation, comment, hint, hintString, max, min, returnKey, showRecordId, allowDiskUse);
95+
}
96+
6397
/**
6498
* Gets the limit to apply. The default is null.
6599
*

driver-reactive-streams/src/main/com/mongodb/reactivestreams/client/internal/AggregatePublisherImpl.java

Lines changed: 7 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -142,36 +142,33 @@ public <E> Publisher<E> explain(final Class<E> explainResultClass, final Explain
142142
private <E> Publisher<E> publishExplain(final Class<E> explainResultClass, @Nullable final ExplainVerbosity verbosity) {
143143
notNull("explainDocumentClass", explainResultClass);
144144
return getMongoOperationPublisher().createReadOperationMono(() ->
145-
asAggregateOperation().asAsyncExplainableOperation(verbosity,
145+
asAggregateOperation(1).asAsyncExplainableOperation(verbosity,
146146
getCodecRegistry().get(explainResultClass)),
147147
getClientSession());
148148
}
149149

150150
@Override
151-
AsyncReadOperation<AsyncBatchCursor<T>> asAsyncReadOperation() {
151+
AsyncReadOperation<AsyncBatchCursor<T>> asAsyncReadOperation(final int initialBatchSize) {
152152
MongoNamespace outNamespace = getOutNamespace();
153153

154154
if (outNamespace != null) {
155155
AsyncWriteOperation<Void> aggregateToCollectionOperation = getAggregateToCollectionOperation();
156156

157-
FindOptions findOptions = new FindOptions().collation(collation);
158-
Integer batchSize = getBatchSize();
159-
if (batchSize != null) {
160-
findOptions.batchSize(batchSize);
161-
}
157+
FindOptions findOptions = new FindOptions().collation(collation).batchSize(initialBatchSize);
158+
162159
AsyncReadOperation<AsyncBatchCursor<T>> findOperation =
163160
getOperations().find(outNamespace, new BsonDocument(), getDocumentClass(), findOptions);
164161

165162
return new WriteOperationThenCursorReadOperation<>(aggregateToCollectionOperation, findOperation);
166163
} else {
167-
return asAggregateOperation();
164+
return asAggregateOperation(initialBatchSize);
168165
}
169166
}
170167

171-
private AggregateOperation<T> asAggregateOperation() {
168+
private AggregateOperation<T> asAggregateOperation(final int initialBatchSize) {
172169
return getOperations()
173170
.aggregate(pipeline, getDocumentClass(), maxTimeMS, maxAwaitTimeMS,
174-
getBatchSize(), collation, hint, comment, allowDiskUse, aggregationLevel);
171+
initialBatchSize, collation, hint, comment, allowDiskUse, aggregationLevel);
175172
}
176173

177174
private AsyncWriteOperation<Void> getAggregateToCollectionOperation() {
Lines changed: 126 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,126 @@
1+
/*
2+
* Copyright 2008-present MongoDB, Inc.
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* http://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
16+
17+
package com.mongodb.reactivestreams.client.internal;
18+
19+
import org.reactivestreams.Publisher;
20+
import org.reactivestreams.Subscriber;
21+
import reactor.core.publisher.Flux;
22+
import reactor.core.publisher.FluxSink;
23+
import reactor.core.publisher.Mono;
24+
25+
import java.util.concurrent.atomic.AtomicBoolean;
26+
import java.util.concurrent.atomic.AtomicLong;
27+
28+
class BatchCursorFlux<T> implements Publisher<T> {
29+
30+
private final BatchCursorPublisher<T> batchCursorPublisher;
31+
private final AtomicBoolean inProgress = new AtomicBoolean(false);
32+
private final AtomicLong demandDelta = new AtomicLong(0);
33+
private volatile BatchCursor<T> batchCursor;
34+
private FluxSink<T> sink;
35+
36+
BatchCursorFlux(final BatchCursorPublisher<T> batchCursorPublisher) {
37+
this.batchCursorPublisher = batchCursorPublisher;
38+
}
39+
40+
@Override
41+
public void subscribe(final Subscriber<? super T> subscriber) {
42+
Flux.<T>create(sink -> {
43+
this.sink = sink;
44+
sink.onRequest(demand -> {
45+
if (calculateDemand(demand) > 0 && inProgress.compareAndSet(false, true)) {
46+
if (batchCursor == null) {
47+
int batchSize = calculateBatchSize(sink.requestedFromDownstream());
48+
batchCursorPublisher.batchCursor(batchSize).subscribe(bc -> {
49+
batchCursor = bc;
50+
inProgress.set(false);
51+
52+
// Handle any cancelled subscriptions that happen during the time it takes to get the batchCursor
53+
if (sink.isCancelled()) {
54+
closeCursor();
55+
} else {
56+
recurseCursor();
57+
}
58+
}, sink::error);
59+
} else {
60+
inProgress.set(false);
61+
recurseCursor();
62+
}
63+
}
64+
});
65+
sink.onCancel(this::closeCursor);
66+
sink.onDispose(this::closeCursor);
67+
}, FluxSink.OverflowStrategy.BUFFER)
68+
.subscribe(subscriber);
69+
}
70+
71+
private void closeCursor() {
72+
if (batchCursor != null) {
73+
batchCursor.close();
74+
}
75+
}
76+
77+
private void recurseCursor(){
78+
if (!sink.isCancelled() && sink.requestedFromDownstream() > 0 && inProgress.compareAndSet(false, true)) {
79+
if (batchCursor.isClosed()) {
80+
sink.complete();
81+
} else {
82+
batchCursor.setBatchSize(calculateBatchSize(sink.requestedFromDownstream()));
83+
Mono.from(batchCursor.next())
84+
.doOnCancel(this::closeCursor)
85+
.doOnError((e) -> {
86+
try {
87+
closeCursor();
88+
} finally {
89+
sink.error(e);
90+
}
91+
})
92+
.doOnSuccess(results -> {
93+
if (results != null) {
94+
results.forEach(sink::next);
95+
calculateDemand(-results.size());
96+
}
97+
if (batchCursor.isClosed()) {
98+
sink.complete();
99+
} else {
100+
inProgress.set(false);
101+
recurseCursor();
102+
}
103+
})
104+
.subscribe();
105+
}
106+
}
107+
}
108+
109+
long calculateDemand(final long demand) {
110+
return demandDelta.accumulateAndGet(demand, (originalValue, update) -> {
111+
long newValue = originalValue + update;
112+
return update > 0 && newValue < originalValue ? Long.MAX_VALUE : newValue;
113+
});
114+
}
115+
116+
int calculateBatchSize(final long demand) {
117+
Integer setBatchSize = batchCursorPublisher.getBatchSize();
118+
if (setBatchSize != null) {
119+
return setBatchSize;
120+
} else if (demand > Integer.MAX_VALUE) {
121+
return Integer.MAX_VALUE;
122+
}
123+
return Math.max(2, (int) demand);
124+
}
125+
126+
}

driver-reactive-streams/src/main/com/mongodb/reactivestreams/client/internal/BatchCursorPublisher.java

Lines changed: 7 additions & 48 deletions
Original file line numberDiff line numberDiff line change
@@ -26,30 +26,26 @@
2626
import org.bson.codecs.configuration.CodecRegistry;
2727
import org.reactivestreams.Publisher;
2828
import org.reactivestreams.Subscriber;
29-
import reactor.core.publisher.Flux;
30-
import reactor.core.publisher.FluxSink;
3129
import reactor.core.publisher.Mono;
3230

33-
import java.util.concurrent.atomic.AtomicBoolean;
3431
import java.util.function.Supplier;
3532

3633
import static com.mongodb.assertions.Assertions.notNull;
3734

3835
abstract class BatchCursorPublisher<T> implements Publisher<T> {
3936
private final ClientSession clientSession;
4037
private final MongoOperationPublisher<T> mongoOperationPublisher;
41-
4238
private Integer batchSize;
4339

4440
BatchCursorPublisher(@Nullable final ClientSession clientSession, final MongoOperationPublisher<T> mongoOperationPublisher) {
4541
this.clientSession = clientSession;
4642
this.mongoOperationPublisher = notNull("mongoOperationPublisher", mongoOperationPublisher);
4743
}
4844

49-
abstract AsyncReadOperation<AsyncBatchCursor<T>> asAsyncReadOperation();
45+
abstract AsyncReadOperation<AsyncBatchCursor<T>> asAsyncReadOperation(int initialBatchSize);
5046

5147
AsyncReadOperation<AsyncBatchCursor<T>> asAsyncFirstReadOperation() {
52-
return asAsyncReadOperation();
48+
return asAsyncReadOperation(1);
5349
}
5450

5551
@Nullable
@@ -116,48 +112,11 @@ public Publisher<T> first() {
116112

117113
@Override
118114
public void subscribe(final Subscriber<? super T> subscriber) {
119-
batchCursor()
120-
.flatMapMany(batchCursor -> {
121-
AtomicBoolean inProgress = new AtomicBoolean(false);
122-
if (batchSize != null) {
123-
batchCursor.setBatchSize(batchSize);
124-
}
125-
return Flux.create((FluxSink<T> sink) ->
126-
sink.onRequest(value -> recurseCursor(sink, batchCursor, inProgress)), FluxSink.OverflowStrategy.BUFFER)
127-
.doOnCancel(batchCursor::close);
128-
})
129-
.subscribe(subscriber);
130-
}
131-
132-
void recurseCursor(final FluxSink<T> sink, final BatchCursor<T> batchCursor, final AtomicBoolean inProgress){
133-
if (!sink.isCancelled() && sink.requestedFromDownstream() > 0 && inProgress.compareAndSet(false, true)) {
134-
if (batchCursor.isClosed()) {
135-
sink.complete();
136-
} else {
137-
Mono.from(batchCursor.next())
138-
.doOnCancel(batchCursor::close)
139-
.doOnError((e) -> {
140-
batchCursor.close();
141-
sink.error(e);
142-
})
143-
.doOnSuccess(results -> {
144-
if (results != null) {
145-
results.forEach(sink::next);
146-
}
147-
if (batchCursor.isClosed()) {
148-
sink.complete();
149-
} else {
150-
inProgress.compareAndSet(true, false);
151-
recurseCursor(sink, batchCursor, inProgress);
152-
}
153-
})
154-
.subscribe();
155-
}
156-
}
157-
}
158-
159-
public Mono<BatchCursor<T>> batchCursor() {
160-
return batchCursor(this::asAsyncReadOperation);
115+
new BatchCursorFlux<>(this).subscribe(subscriber);
116+
}
117+
118+
public Mono<BatchCursor<T>> batchCursor(final int initialBatchSize) {
119+
return batchCursor(() -> asAsyncReadOperation(initialBatchSize));
161120
}
162121

163122
Mono<BatchCursor<T>> batchCursor(final Supplier<AsyncReadOperation<AsyncBatchCursor<T>>> supplier) {

driver-reactive-streams/src/main/com/mongodb/reactivestreams/client/internal/ChangeStreamPublisherImpl.java

Lines changed: 6 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -114,8 +114,8 @@ public ChangeStreamPublisher<T> collation(@Nullable final Collation collation) {
114114
public <TDocument> Publisher<TDocument> withDocumentClass(final Class<TDocument> clazz) {
115115
return new BatchCursorPublisher<TDocument>(getClientSession(), getMongoOperationPublisher().withDocumentClass(clazz)) {
116116
@Override
117-
AsyncReadOperation<AsyncBatchCursor<TDocument>> asAsyncReadOperation() {
118-
return createChangeStreamOperation(getMongoOperationPublisher().getCodecRegistry().get(clazz));
117+
AsyncReadOperation<AsyncBatchCursor<TDocument>> asAsyncReadOperation(final int initialBatchSize) {
118+
return createChangeStreamOperation(getMongoOperationPublisher().getCodecRegistry().get(clazz), initialBatchSize);
119119
}
120120
};
121121
}
@@ -133,14 +133,14 @@ public ChangeStreamPublisherImpl<T> startAfter(final BsonDocument startAfter) {
133133
}
134134

135135
@Override
136-
AsyncReadOperation<AsyncBatchCursor<ChangeStreamDocument<T>>> asAsyncReadOperation() {
137-
return createChangeStreamOperation(codec);
136+
AsyncReadOperation<AsyncBatchCursor<ChangeStreamDocument<T>>> asAsyncReadOperation(final int initialBatchSize) {
137+
return createChangeStreamOperation(codec, initialBatchSize);
138138
}
139139

140-
private <S> AsyncReadOperation<AsyncBatchCursor<S>> createChangeStreamOperation(final Codec<S> codec) {
140+
private <S> AsyncReadOperation<AsyncBatchCursor<S>> createChangeStreamOperation(final Codec<S> codec, final int initialBatchSize) {
141141
return new ChangeStreamOperation<>(getNamespace(), fullDocument,
142142
createBsonDocumentList(pipeline), codec, changeStreamLevel)
143-
.batchSize(getBatchSize())
143+
.batchSize(initialBatchSize)
144144
.collation(collation)
145145
.maxAwaitTime(maxAwaitTimeMS, MILLISECONDS)
146146
.resumeAfter(resumeToken)

driver-reactive-streams/src/main/com/mongodb/reactivestreams/client/internal/DistinctPublisherImpl.java

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -70,7 +70,8 @@ public DistinctPublisher<T> collation(@Nullable final Collation collation) {
7070
}
7171

7272
@Override
73-
AsyncReadOperation<AsyncBatchCursor<T>> asAsyncReadOperation() {
73+
AsyncReadOperation<AsyncBatchCursor<T>> asAsyncReadOperation(final int initialBatchSize) {
74+
// initialBatchSize is ignored for distinct operations.
7475
return getOperations().distinct(fieldName, filter, getDocumentClass(), maxTimeMS, collation);
7576
}
7677
}

driver-reactive-streams/src/main/com/mongodb/reactivestreams/client/internal/FindPublisherImpl.java

Lines changed: 5 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -23,6 +23,7 @@
2323
import com.mongodb.internal.client.model.FindOptions;
2424
import com.mongodb.internal.operation.AsyncExplainableReadOperation;
2525
import com.mongodb.internal.operation.AsyncReadOperation;
26+
import com.mongodb.internal.operation.FindOperation;
2627
import com.mongodb.lang.Nullable;
2728
import com.mongodb.reactivestreams.client.ClientSession;
2829
import com.mongodb.reactivestreams.client.FindPublisher;
@@ -199,14 +200,15 @@ public <E> Publisher<E> explain(final Class<E> explainResultClass, final Explain
199200
private <E> Publisher<E> publishExplain(final Class<E> explainResultClass, @Nullable final ExplainVerbosity verbosity) {
200201
notNull("explainDocumentClass", explainResultClass);
201202
return getMongoOperationPublisher().createReadOperationMono(() ->
202-
asAsyncReadOperation().asAsyncExplainableOperation(verbosity,
203+
asAsyncReadOperation(0).asAsyncExplainableOperation(verbosity,
203204
getCodecRegistry().get(explainResultClass)),
204205
getClientSession());
205206
}
206207

207208
@Override
208-
AsyncExplainableReadOperation<AsyncBatchCursor<T>> asAsyncReadOperation() {
209-
return getOperations().find(filter, getDocumentClass(), findOptions);
209+
AsyncExplainableReadOperation<AsyncBatchCursor<T>> asAsyncReadOperation(final int initialBatchSize) {
210+
FindOperation<T> operation = getOperations().find(filter, getDocumentClass(), findOptions.withBatchSize(initialBatchSize));
211+
return operation;
210212
}
211213

212214
@Override

driver-reactive-streams/src/main/com/mongodb/reactivestreams/client/internal/ListCollectionsPublisherImpl.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -59,8 +59,8 @@ public ListCollectionsPublisherImpl<T> filter(@Nullable final Bson filter) {
5959
return this;
6060
}
6161

62-
AsyncReadOperation<AsyncBatchCursor<T>> asAsyncReadOperation() {
62+
AsyncReadOperation<AsyncBatchCursor<T>> asAsyncReadOperation(final int initialBatchSize) {
6363
return getOperations().listCollections(getNamespace().getDatabaseName(), getDocumentClass(), filter, collectionNamesOnly,
64-
getBatchSize(), maxTimeMS);
64+
initialBatchSize, maxTimeMS);
6565
}
6666
}

driver-reactive-streams/src/main/com/mongodb/reactivestreams/client/internal/ListDatabasesPublisherImpl.java

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -67,7 +67,8 @@ public ListDatabasesPublisherImpl<T> authorizedDatabasesOnly(@Nullable final Boo
6767
return this;
6868
}
6969

70-
AsyncReadOperation<AsyncBatchCursor<T>> asAsyncReadOperation() {
70+
AsyncReadOperation<AsyncBatchCursor<T>> asAsyncReadOperation(final int initialBatchSize) {
71+
// initialBatchSize is ignored for distinct operations.
7172
return getOperations().listDatabases(getDocumentClass(), filter, nameOnly, maxTimeMS, authorizedDatabasesOnly);
7273
}
7374
}

driver-reactive-streams/src/main/com/mongodb/reactivestreams/client/internal/ListIndexesPublisherImpl.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -48,7 +48,7 @@ public ListIndexesPublisherImpl<T> batchSize(final int batchSize) {
4848
return this;
4949
}
5050

51-
AsyncReadOperation<AsyncBatchCursor<T>> asAsyncReadOperation() {
52-
return getOperations().listIndexes(getDocumentClass(), getBatchSize(), maxTimeMS);
51+
AsyncReadOperation<AsyncBatchCursor<T>> asAsyncReadOperation(final int initialBatchSize) {
52+
return getOperations().listIndexes(getDocumentClass(), initialBatchSize, maxTimeMS);
5353
}
5454
}

0 commit comments

Comments
 (0)