Skip to content

Commit 612a2e1

Browse files
committed
Delay close cursor in AsyncChangeStreamBatchCursor and
AsyncQueryBatchCursor while next/tryNext is in progress JAVA-3487
1 parent d256d1b commit 612a2e1

File tree

4 files changed

+274
-46
lines changed

4 files changed

+274
-46
lines changed

driver-core/src/main/com/mongodb/operation/AsyncChangeStreamBatchCursor.java

Lines changed: 53 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,7 @@
1717
package com.mongodb.operation;
1818

1919
import com.mongodb.MongoChangeStreamException;
20+
import com.mongodb.MongoException;
2021
import com.mongodb.async.AsyncAggregateResponseBatchCursor;
2122
import com.mongodb.async.AsyncBatchCursor;
2223
import com.mongodb.async.SingleResultCallback;
@@ -34,6 +35,7 @@
3435
import static com.mongodb.operation.ChangeStreamBatchCursorHelper.isRetryableError;
3536
import static com.mongodb.operation.OperationHelper.LOGGER;
3637
import static com.mongodb.operation.OperationHelper.withAsyncReadConnection;
38+
import static java.lang.String.format;
3739

3840
final class AsyncChangeStreamBatchCursor<T> implements AsyncAggregateResponseBatchCursor<T> {
3941
private final AsyncReadBinding binding;
@@ -42,6 +44,12 @@ final class AsyncChangeStreamBatchCursor<T> implements AsyncAggregateResponseBat
4244
private volatile BsonDocument resumeToken;
4345
private volatile AsyncAggregateResponseBatchCursor<RawBsonDocument> wrapped;
4446

47+
/* protected by `this` */
48+
private boolean isClosed = false;
49+
private boolean isOperationInProgress = false;
50+
private boolean isClosePending = false;
51+
/* protected by `this` */
52+
4553
AsyncChangeStreamBatchCursor(final ChangeStreamOperation<T> changeStreamOperation,
4654
final AsyncAggregateResponseBatchCursor<RawBsonDocument> wrapped,
4755
final AsyncReadBinding binding,
@@ -66,7 +74,7 @@ public void apply(final AsyncAggregateResponseBatchCursor<RawBsonDocument> curso
6674
cursor.next(callback);
6775
cachePostBatchResumeToken(cursor);
6876
}
69-
}, convertResultsCallback(callback));
77+
}, convertResultsCallback(callback), false);
7078
}
7179

7280
@Override
@@ -78,12 +86,24 @@ public void apply(final AsyncAggregateResponseBatchCursor<RawBsonDocument> curso
7886
cursor.tryNext(callback);
7987
cachePostBatchResumeToken(cursor);
8088
}
81-
}, convertResultsCallback(callback));
89+
}, convertResultsCallback(callback), true);
8290
}
8391

8492
@Override
8593
public void close() {
86-
if (!isClosed()) {
94+
boolean closeCursor = false;
95+
96+
synchronized (this) {
97+
if (isOperationInProgress) {
98+
isClosePending = true;
99+
} else {
100+
closeCursor = !isClosed;
101+
isClosed = true;
102+
isClosePending = false;
103+
}
104+
}
105+
106+
if (closeCursor) {
87107
wrapped.close();
88108
binding.release();
89109
}
@@ -101,7 +121,9 @@ public int getBatchSize() {
101121

102122
@Override
103123
public boolean isClosed() {
104-
return wrapped.isClosed();
124+
synchronized (this) {
125+
return isClosed;
126+
}
105127
}
106128

107129
@Override
@@ -125,6 +147,17 @@ private void cachePostBatchResumeToken(final AsyncAggregateResponseBatchCursor<R
125147
}
126148
}
127149

150+
private void endOperationInProgress() {
151+
boolean closePending = false;
152+
synchronized (this) {
153+
isOperationInProgress = false;
154+
closePending = this.isClosePending;
155+
}
156+
if (closePending) {
157+
close();
158+
}
159+
}
160+
128161
private SingleResultCallback<List<RawBsonDocument>> convertResultsCallback(final SingleResultCallback<List<T>> callback) {
129162
return errorHandlingCallback(new SingleResultCallback<List<RawBsonDocument>>() {
130163
@Override
@@ -155,23 +188,35 @@ private interface AsyncBlock {
155188
void apply(AsyncAggregateResponseBatchCursor<RawBsonDocument> cursor, SingleResultCallback<List<RawBsonDocument>> callback);
156189
}
157190

158-
private void resumeableOperation(final AsyncBlock asyncBlock, final SingleResultCallback<List<RawBsonDocument>> callback) {
191+
private void resumeableOperation(final AsyncBlock asyncBlock, final SingleResultCallback<List<RawBsonDocument>> callback,
192+
final boolean tryNext) {
193+
synchronized (this) {
194+
if (isClosed) {
195+
callback.onResult(null, new MongoException(format("%s called after the cursor was closed.",
196+
tryNext ? "tryNext()" : "next()")));
197+
return;
198+
}
199+
isOperationInProgress = true;
200+
}
159201
asyncBlock.apply(wrapped, new SingleResultCallback<List<RawBsonDocument>>() {
160202
@Override
161203
public void onResult(final List<RawBsonDocument> result, final Throwable t) {
162204
if (t == null) {
205+
endOperationInProgress();
163206
callback.onResult(result, null);
164207
} else if (isRetryableError(t)) {
165208
wrapped.close();
166-
retryOperation(asyncBlock, callback);
209+
retryOperation(asyncBlock, callback, tryNext);
167210
} else {
211+
endOperationInProgress();
168212
callback.onResult(null, t);
169213
}
170214
}
171215
});
172216
}
173217

174-
private void retryOperation(final AsyncBlock asyncBlock, final SingleResultCallback<List<RawBsonDocument>> callback) {
218+
private void retryOperation(final AsyncBlock asyncBlock, final SingleResultCallback<List<RawBsonDocument>> callback,
219+
final boolean tryNext) {
175220
withAsyncReadConnection(binding, new AsyncCallableWithSource() {
176221
@Override
177222
public void call(final AsyncConnectionSource source, final Throwable t) {
@@ -188,7 +233,7 @@ public void onResult(final AsyncBatchCursor<T> result, final Throwable t) {
188233
} else {
189234
wrapped = ((AsyncChangeStreamBatchCursor<T>) result).getWrapped();
190235
binding.release(); // release the new change stream batch cursor's reference to the binding
191-
resumeableOperation(asyncBlock, callback);
236+
resumeableOperation(asyncBlock, callback, tryNext);
192237
}
193238
}
194239
});

driver-core/src/main/com/mongodb/operation/AsyncQueryBatchCursor.java

Lines changed: 54 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -38,7 +38,6 @@
3838
import org.bson.codecs.Decoder;
3939

4040
import java.util.List;
41-
import java.util.concurrent.atomic.AtomicBoolean;
4241
import java.util.concurrent.atomic.AtomicInteger;
4342
import java.util.concurrent.atomic.AtomicReference;
4443

@@ -65,7 +64,6 @@ class AsyncQueryBatchCursor<T> implements AsyncAggregateResponseBatchCursor<T> {
6564
private final Decoder<T> decoder;
6665
private final long maxTimeMS;
6766
private final AsyncConnectionSource connectionSource;
68-
private final AtomicBoolean isClosed = new AtomicBoolean();
6967
private final AtomicReference<ServerCursor> cursor;
7068
private volatile QueryResult<T> firstBatch;
7169
private volatile int batchSize;
@@ -74,6 +72,12 @@ class AsyncQueryBatchCursor<T> implements AsyncAggregateResponseBatchCursor<T> {
7472
private volatile BsonTimestamp operationTime;
7573
private volatile boolean firstBatchEmpty;
7674

75+
/* protected by `this` */
76+
private boolean isOperationInProgress = false;
77+
private boolean isClosed = false;
78+
private boolean isClosePending = false;
79+
/* protected by `this` */
80+
7781
AsyncQueryBatchCursor(final QueryResult<T> firstBatch, final int limit, final int batchSize, final long maxTimeMS,
7882
final Decoder<T> decoder, final AsyncConnectionSource connectionSource, final AsyncConnection connection) {
7983
this(firstBatch, limit, batchSize, maxTimeMS, decoder, connectionSource, connection, null);
@@ -108,7 +112,19 @@ class AsyncQueryBatchCursor<T> implements AsyncAggregateResponseBatchCursor<T> {
108112

109113
@Override
110114
public void close() {
111-
if (!isClosed.getAndSet(true)) {
115+
boolean killCursor = false;
116+
117+
synchronized (this) {
118+
if (isOperationInProgress) {
119+
isClosePending = true;
120+
} else {
121+
killCursor = !isClosed;
122+
isClosed = true;
123+
isClosePending = false;
124+
}
125+
}
126+
127+
if (killCursor) {
112128
killCursorOnClose();
113129
}
114130
}
@@ -125,19 +141,25 @@ public void tryNext(final SingleResultCallback<List<T>> callback) {
125141

126142
@Override
127143
public void setBatchSize(final int batchSize) {
128-
isTrue("open", !isClosed.get());
144+
synchronized (this) {
145+
isTrue("open", !isClosed);
146+
}
129147
this.batchSize = batchSize;
130148
}
131149

132150
@Override
133151
public int getBatchSize() {
134-
isTrue("open", !isClosed.get());
152+
synchronized (this) {
153+
isTrue("open", !isClosed);
154+
}
135155
return batchSize;
136156
}
137157

138158
@Override
139159
public boolean isClosed() {
140-
return isClosed.get();
160+
synchronized (this) {
161+
return isClosed;
162+
}
141163
}
142164

143165
@Override
@@ -170,9 +192,19 @@ private void next(final SingleResultCallback<List<T>> callback, final boolean tr
170192
} else {
171193
ServerCursor localCursor = getServerCursor();
172194
if (localCursor == null) {
173-
isClosed.set(true);
195+
synchronized (this) {
196+
isClosed = true;
197+
}
174198
callback.onResult(null, null);
175199
} else {
200+
synchronized (this) {
201+
if (isClosed) {
202+
callback.onResult(null, new MongoException(format("%s called after the cursor was closed.",
203+
tryNext ? "tryNext()" : "next()")));
204+
return;
205+
}
206+
isOperationInProgress = true;
207+
}
176208
getMore(localCursor, callback, tryNext);
177209
}
178210
}
@@ -187,6 +219,7 @@ private void getMore(final ServerCursor cursor, final SingleResultCallback<List<
187219
@Override
188220
public void onResult(final AsyncConnection connection, final Throwable t) {
189221
if (t != null) {
222+
endOperationInProgress();
190223
callback.onResult(null, t);
191224
} else {
192225
getMore(connection, cursor, callback, tryNext);
@@ -274,17 +307,20 @@ private BsonDocument asKillCursorsCommandDocument(final ServerCursor localCursor
274307
.append("cursors", new BsonArray(singletonList(new BsonInt64(localCursor.getId()))));
275308
}
276309

310+
private void endOperationInProgress() {
311+
boolean closePending = false;
312+
synchronized (this) {
313+
isOperationInProgress = false;
314+
closePending = this.isClosePending;
315+
}
316+
if (closePending) {
317+
close();
318+
}
319+
}
277320

278321
private void handleGetMoreQueryResult(final AsyncConnection connection, final SingleResultCallback<List<T>> callback,
279322
final QueryResult<T> result, final boolean tryNext) {
280-
if (isClosed()) {
281-
connection.release();
282-
callback.onResult(null, new MongoException(format("The cursor was closed before %s completed.",
283-
tryNext ? "tryNext()" : "next()")));
284-
return;
285-
}
286-
287-
cursor.getAndSet(result.getCursor());
323+
cursor.set(result.getCursor());
288324
if (!tryNext && result.getResults().isEmpty() && result.getCursor() != null) {
289325
getMore(connection, result.getCursor(), callback, false);
290326
} else {
@@ -298,6 +334,7 @@ private void handleGetMoreQueryResult(final AsyncConnection connection, final Si
298334
connectionSource.release();
299335
}
300336
}
337+
endOperationInProgress();
301338

302339
if (result.getResults().isEmpty()) {
303340
callback.onResult(null, null);
@@ -328,6 +365,7 @@ public void onResult(final BsonDocument result, final Throwable t) {
328365
? translateCommandException((MongoCommandException) t, cursor)
329366
: t;
330367
connection.release();
368+
endOperationInProgress();
331369
callback.onResult(null, translatedException);
332370
} else {
333371
QueryResult<T> queryResult = getMoreCursorDocumentToQueryResult(result.getDocument(CURSOR),
@@ -354,6 +392,7 @@ private class QueryResultSingleResultCallback implements SingleResultCallback<Qu
354392
public void onResult(final QueryResult<T> result, final Throwable t) {
355393
if (t != null) {
356394
connection.release();
395+
endOperationInProgress();
357396
callback.onResult(null, t);
358397
} else {
359398
handleGetMoreQueryResult(connection, callback, result, tryNext);

0 commit comments

Comments
 (0)