Skip to content

Commit b47b8a0

Browse files
committed
Make implementations of AsyncAggregateResponseBatchCursor more responsive to close signals (#654)
1) Change implementations of AsyncAggregateResponseBatchCursor to make them more responsive to close signals AsyncAggregateResponseBatchCursor wraps AsyncQueryBatchCursor. AsyncQueryBatchCursor implements pending cancel functionality. AsyncQueryBatchCursor may enter a pending close state as a result of calling close concurrently with the object being in the pending operation state. AsyncAggregateResponseBatchCursor does not need to duplicate the pending functionality, in fact, it prevented AsyncQueryBatchCursor from receiving close signals while being in the pending operation state. 2) Release resources in AsyncQueryBatchCursor The changes made is this commit appear to be correct from the standpoint of tests, but it is very hard for me to prove that by inspecting the code. There are two reasons: - the reference counting approach used in the code - the presence of multiple places in code where the instance field connectionSource is released, combined with the fact that it is not always retained in the constructor and must be released not more than once per the lifetime of an AsyncQueryBatchCursor object. The following text expresses my thoughts about reference counting and is not directly related to the changes made in this commit. The current approach for reference counting in the driver code is what we may call a release-early approach, similar to the one described in the Netty documentation (https://github.com/netty/netty/wiki/Reference-counted-objects#who-destroys-it). In this approach "the party that accesses a reference-counted object last is also responsible for the destruction of that reference-counted object". The problem with this approach is that it is not structured. "Structured" reference counting. The idea is to try and get our reference counting code closer to following the structured programming approach (https://en.wikipedia.org/wiki/Structured_programming) while fixing a bug: AsyncQueryBatchCursor does not always release a connection when it is not needed anymore. In simple words, the following is desirable: given a place in code where retain/release is called, it should be simple to tell where the corresponding release/retain is called. As a side note, structured programming applied to concurrent code is called structured concurrency. This idea is realized in Kotlin: https://kotlinlang.org/docs/reference/coroutines/basics.html#structured-concurrency, https://elizarov.medium.com/structured-concurrency-722d765aa952 (the article is written by the author of Kotlin coroutines functionality). As much as I would like us to use the structured approach, it is generally speaking not possible to reconcile it with the release-early in the same codebase. In other words, a complete refactoring may be needed to switch to the structured approach. Following is an example demonstrating a situation when synchronous code that was written with the release-early approach behaves differently when combined with the structured approach. Consider the following legacy code in which we want to refactor releaseEarlyLegacyMethod (the only code that will stay unchanged is the releaseEarlyUnchangedCode method): void releaseEarlyLegacyMethod() { ReferenceCounted resource = pool.borrow();//count is 1 /* Since ReleaseEarlyLegacyConsumer is the last party using the resource, it must release the resource. * A reader cannot find the release call corresponding to the borrow call inside the releaseEarlyLegacyMethod method.*/ releaseEarlyUnchangedCode(new ReleaseEarlyLegacyOneTimeConsumer(resource)); } class ReleaseEarlyLegacyOneTimeConsumer implements Runnable { private final ReferenceCounted resource; ReleaseEarlyLegacyOneTimeConsumer(ReferenceCounted resource) { this.resource = resource;//count is 1 } public void run() { try { System.out.println(resource.toString());//count is 1; use the resource } finally {//release the resource because ReleaseEarlyLegacyConsumer.accept is the last party accessing it /* Note that by looking at this release call, a reader cannot easily find the corresponding retain/borrow call. * The corresponding retain/borrow call is not even in this class.*/ resource.release();//count is 0; the resource returns to the pool } } } void releaseEarlyUnchangedCode(Runnable action) { action.run(); ReferenceCounted resource = pool.borrow();//count is 1; succeeds only if the single resource has been returned to the pool try {//use the resource System.out.println(resource.toString());//count is 1 } finally {//release the resource because releaseEarlyUnchangedCode is the last party accessing it resource.release();//count is 0; the resource is returned to the pool } } The pool.borrow call in releaseEarlyUnchangedCode works because the resource is returned to the pool before action.run() returns. Here is the refactored code that uses structured reference counting: void structuredNewMethod() { ReferenceCounted resource = pool.borrow();//count is 1 //a reader can easily find the release call corresponding to the borrow call because it is in the finally block in this method try { releaseEarlyUnchangedCode(new StructuredNewOneTimeConsumer(resource)); } finally { resource.release();//count is 0; the resource returns to the pool } } class StructuredNewOneTimeConsumer implements Runnable { private final ReferenceCounted resource; StructuredNewOneTimeConsumer(ReferenceCounted resource) { this.resource = resource.retain();//count is 2; we store a new reference to the resource } public void run() { try { System.out.println(resource.toString());//count is 2; use the resource } finally {//release the resource because StructuredNewConsumer is not going to use it anymore /* The corresponding retain call is in the constructor of the class because resource is the instance field. * A reader may easily locate it.*/ resource.release();//count is 1 } } } With this new code, the pool.borrow call in releaseEarlyUnchangedCode fails because the resource is not returned to the pool before action.run() returns. It is only returned to the pool before structuredNewMethod returns. Thereby, we have shown that the release-early and the structured approaches are not reconcilable. This would not be a problem if the code was written following the structured approach from the beginning because in that case, releaseEarlyUnchangedCode would have been written differently (using either return-early or structured reference counting definitely affects the structure of the rest of the code). JAVA-3907
1 parent 3aeacf5 commit b47b8a0

File tree

4 files changed

+65
-56
lines changed

4 files changed

+65
-56
lines changed

driver-core/src/main/com/mongodb/internal/operation/AsyncChangeStreamBatchCursor.java

Lines changed: 13 additions & 43 deletions
Original file line numberDiff line numberDiff line change
@@ -30,6 +30,7 @@
3030

3131
import java.util.ArrayList;
3232
import java.util.List;
33+
import java.util.concurrent.atomic.AtomicBoolean;
3334

3435
import static com.mongodb.internal.async.ErrorHandlingResultCallback.errorHandlingCallback;
3536
import static com.mongodb.internal.operation.ChangeStreamBatchCursorHelper.isRetryableError;
@@ -44,12 +45,7 @@ final class AsyncChangeStreamBatchCursor<T> implements AsyncAggregateResponseBat
4445

4546
private volatile BsonDocument resumeToken;
4647
private volatile AsyncAggregateResponseBatchCursor<RawBsonDocument> wrapped;
47-
48-
/* protected by `this` */
49-
private boolean isClosed = false;
50-
private boolean isOperationInProgress = false;
51-
private boolean isClosePending = false;
52-
/* protected by `this` */
48+
private final AtomicBoolean isClosed;
5349

5450
AsyncChangeStreamBatchCursor(final ChangeStreamOperation<T> changeStreamOperation,
5551
final AsyncAggregateResponseBatchCursor<RawBsonDocument> wrapped,
@@ -62,6 +58,7 @@ final class AsyncChangeStreamBatchCursor<T> implements AsyncAggregateResponseBat
6258
binding.retain();
6359
this.resumeToken = resumeToken;
6460
this.maxWireVersion = maxWireVersion;
61+
isClosed = new AtomicBoolean();
6562
}
6663

6764
AsyncAggregateResponseBatchCursor<RawBsonDocument> getWrapped() {
@@ -94,22 +91,13 @@ public void apply(final AsyncAggregateResponseBatchCursor<RawBsonDocument> curso
9491

9592
@Override
9693
public void close() {
97-
boolean closeCursor = false;
98-
99-
synchronized (this) {
100-
if (isOperationInProgress) {
101-
isClosePending = true;
102-
} else {
103-
closeCursor = !isClosed;
104-
isClosed = true;
105-
isClosePending = false;
94+
if (isClosed.compareAndSet(false, true)) {
95+
try {
96+
wrapped.close();
97+
} finally {
98+
binding.release();
10699
}
107100
}
108-
109-
if (closeCursor) {
110-
wrapped.close();
111-
binding.release();
112-
}
113101
}
114102

115103
@Override
@@ -124,9 +112,7 @@ public int getBatchSize() {
124112

125113
@Override
126114
public boolean isClosed() {
127-
synchronized (this) {
128-
return isClosed;
129-
}
115+
return isClosed.get();
130116
}
131117

132118
@Override
@@ -155,17 +141,6 @@ private void cachePostBatchResumeToken(final AsyncAggregateResponseBatchCursor<R
155141
}
156142
}
157143

158-
private void endOperationInProgress() {
159-
boolean closePending = false;
160-
synchronized (this) {
161-
isOperationInProgress = false;
162-
closePending = this.isClosePending;
163-
}
164-
if (closePending) {
165-
close();
166-
}
167-
}
168-
169144
private SingleResultCallback<List<RawBsonDocument>> convertResultsCallback(final SingleResultCallback<List<T>> callback) {
170145
return errorHandlingCallback(new SingleResultCallback<List<RawBsonDocument>>() {
171146
@Override
@@ -203,25 +178,20 @@ private interface AsyncBlock {
203178

204179
private void resumeableOperation(final AsyncBlock asyncBlock, final SingleResultCallback<List<RawBsonDocument>> callback,
205180
final boolean tryNext) {
206-
synchronized (this) {
207-
if (isClosed) {
208-
callback.onResult(null, new MongoException(format("%s called after the cursor was closed.",
209-
tryNext ? "tryNext()" : "next()")));
210-
return;
211-
}
212-
isOperationInProgress = true;
181+
if (isClosed.get()) {
182+
callback.onResult(null, new MongoException(format("%s called after the cursor was closed.",
183+
tryNext ? "tryNext()" : "next()")));
184+
return;
213185
}
214186
asyncBlock.apply(wrapped, new SingleResultCallback<List<RawBsonDocument>>() {
215187
@Override
216188
public void onResult(final List<RawBsonDocument> result, final Throwable t) {
217189
if (t == null) {
218-
endOperationInProgress();
219190
callback.onResult(result, null);
220191
} else if (isRetryableError(t, maxWireVersion)) {
221192
wrapped.close();
222193
retryOperation(asyncBlock, callback, tryNext);
223194
} else {
224-
endOperationInProgress();
225195
callback.onResult(null, t);
226196
}
227197
}

driver-core/src/main/com/mongodb/internal/operation/AsyncQueryBatchCursor.java

Lines changed: 22 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -69,15 +69,15 @@ class AsyncQueryBatchCursor<T> implements AsyncAggregateResponseBatchCursor<T> {
6969
private volatile int batchSize;
7070
private final AtomicInteger count = new AtomicInteger();
7171
private volatile BsonDocument postBatchResumeToken;
72-
private volatile BsonTimestamp operationTime;
73-
private volatile boolean firstBatchEmpty;
74-
private volatile int maxWireVersion = 0;
72+
private final BsonTimestamp operationTime;
73+
private final boolean firstBatchEmpty;
74+
private final int maxWireVersion;
7575

7676
/* protected by `this` */
7777
private boolean isOperationInProgress = false;
7878
private boolean isClosed = false;
79-
private boolean isClosePending = false;
8079
/* protected by `this` */
80+
private volatile boolean isClosePending = false;
8181

8282
AsyncQueryBatchCursor(final QueryResult<T> firstBatch, final int limit, final int batchSize, final long maxTimeMS,
8383
final Decoder<T> decoder, final AsyncConnectionSource connectionSource, final AsyncConnection connection) {
@@ -100,6 +100,8 @@ class AsyncQueryBatchCursor<T> implements AsyncAggregateResponseBatchCursor<T> {
100100
if (result != null) {
101101
this.operationTime = result.getTimestamp(OPERATION_TIME, null);
102102
this.postBatchResumeToken = getPostBatchResumeTokenFromResponse(result);
103+
} else {
104+
this.operationTime = null;
103105
}
104106

105107
firstBatchEmpty = firstBatch.getResults().isEmpty();
@@ -109,26 +111,24 @@ class AsyncQueryBatchCursor<T> implements AsyncAggregateResponseBatchCursor<T> {
109111
killCursor(connection);
110112
}
111113
}
112-
if (connection != null) {
113-
this.maxWireVersion = connection.getDescription().getMaxWireVersion();
114-
}
114+
this.maxWireVersion = connection == null ? 0 : connection.getDescription().getMaxWireVersion();
115115
}
116116

117117
@Override
118118
public void close() {
119-
boolean killCursor = false;
119+
boolean doClose = false;
120120

121121
synchronized (this) {
122122
if (isOperationInProgress) {
123123
isClosePending = true;
124-
} else {
125-
killCursor = !isClosed;
124+
} else if (!isClosed) {
126125
isClosed = true;
127126
isClosePending = false;
127+
doClose = true;
128128
}
129129
}
130130

131-
if (killCursor) {
131+
if (doClose) {
132132
killCursorOnClose();
133133
}
134134
}
@@ -336,7 +336,17 @@ private void endOperationInProgress() {
336336
private void handleGetMoreQueryResult(final AsyncConnection connection, final SingleResultCallback<List<T>> callback,
337337
final QueryResult<T> result, final boolean tryNext) {
338338
cursor.set(result.getCursor());
339-
if (!tryNext && result.getResults().isEmpty() && result.getCursor() != null) {
339+
if (isClosePending) {
340+
try {
341+
connection.release();
342+
if (result.getCursor() == null) {
343+
connectionSource.release();
344+
}
345+
endOperationInProgress();
346+
} finally {
347+
callback.onResult(null, null);
348+
}
349+
} else if (!tryNext && result.getResults().isEmpty() && result.getCursor() != null) {
340350
getMore(connection, result.getCursor(), callback, false);
341351
} else {
342352
count.addAndGet(result.getResults().size());

driver-core/src/test/functional/com/mongodb/internal/operation/AsyncQueryBatchCursorFunctionalSpecification.groovy

Lines changed: 24 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -35,6 +35,7 @@ import com.mongodb.internal.connection.QueryResult
3535
import org.bson.BsonBoolean
3636
import org.bson.BsonDocument
3737
import org.bson.BsonInt32
38+
import org.bson.BsonNull
3839
import org.bson.BsonString
3940
import org.bson.BsonTimestamp
4041
import org.bson.Document
@@ -303,6 +304,29 @@ class AsyncQueryBatchCursorFunctionalSpecification extends OperationFunctionalSp
303304
false | 0
304305
}
305306

307+
@Slow
308+
def 'should unblock if closed while waiting for more data from tailable cursor'() {
309+
given:
310+
collectionHelper.create(collectionName, new CreateCollectionOptions().capped(true).sizeInBytes(1000))
311+
collectionHelper.insertDocuments(new DocumentCodec(), Document.parse('{}'))
312+
def firstBatch = executeQuery(new BsonDocument('_id', BsonNull.VALUE), 0, 1, true, true);
313+
314+
when:
315+
cursor = new AsyncQueryBatchCursor<Document>(firstBatch, 0, 1, 500, new DocumentCodec(), connectionSource, connection)
316+
Thread.start {
317+
Thread.sleep(SECONDS.toMillis(2))
318+
cursor.close()
319+
}
320+
def batch = nextBatch()
321+
322+
then:
323+
cursor.isClosed()
324+
batch == null
325+
//both connection and connectionSource have reference count 1 when we pass them to the AsyncQueryBatchCursor constructor
326+
connection.getCount() == 1
327+
waitForRelease(connectionSource, 1)
328+
}
329+
306330
def 'should respect limit'() {
307331
given:
308332
cursor = new AsyncQueryBatchCursor<Document>(executeQuery(6, 3), 6, 2, 0, new DocumentCodec(), connectionSource, connection)

driver-core/src/test/unit/com/mongodb/internal/operation/AsyncQueryBatchCursorSpecification.groovy

Lines changed: 6 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -474,7 +474,9 @@ class AsyncQueryBatchCursorSpecification extends Specification {
474474

475475
then:
476476
connectionA.getCount() == 0
477-
connectionSource.getCount() == 0
477+
if (response2 == null) { //otherwise connectionSource is released asynchronously, which is not easy to verify
478+
connectionSource.getCount() == 0
479+
}
478480
cursor.isClosed()
479481

480482
where:
@@ -709,6 +711,9 @@ class AsyncQueryBatchCursorSpecification extends Specification {
709711
int counter = 0
710712
def mock = Mock(AsyncConnectionSource)
711713
mock.getConnection(_) >> {
714+
if (counter == 0) {
715+
throw new IllegalStateException('Tried to use released AsyncConnectionSource')
716+
}
712717
def (result, error) = connectionCallbackResults()
713718
it[0].onResult(result, error)
714719
}

0 commit comments

Comments
 (0)