Skip to content

Commit 84b5a34

Browse files
committed
JAVA-2858: Ensure all change stream getMore commands are retried
This commit ensures that change streams are resumable even when watching a collection that has no activity between failovers.
1 parent 2d9d65d commit 84b5a34

File tree

2 files changed

+14
-14
lines changed

2 files changed

+14
-14
lines changed

driver-core/src/main/com/mongodb/operation/AsyncChangeStreamBatchCursor.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -125,7 +125,7 @@ public void onResult(final AsyncBatchCursor<T> result, final Throwable t) {
125125
} else {
126126
wrapped = ((AsyncChangeStreamBatchCursor<T>) result).getWrapped();
127127
binding.release(); // release the new change stream batch cursor's reference to the binding
128-
asyncBlock.apply(wrapped, callback);
128+
resumeableOperation(asyncBlock, callback);
129129
}
130130
}
131131
});

driver-core/src/main/com/mongodb/operation/ChangeStreamBatchCursor.java

Lines changed: 13 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -127,19 +127,19 @@ private List<T> convertResults(final List<RawBsonDocument> rawDocuments) {
127127
}
128128

129129
<R> R resumeableOperation(final Function<BatchCursor<RawBsonDocument>, R> function) {
130-
try {
131-
return function.apply(wrapped);
132-
} catch (MongoNotPrimaryException e) {
133-
// Ignore
134-
} catch (MongoCursorNotFoundException w) {
135-
// Ignore
136-
} catch (MongoSocketException e) {
137-
// Ignore
130+
while (true) {
131+
try {
132+
return function.apply(wrapped);
133+
} catch (MongoNotPrimaryException e) {
134+
// Ignore
135+
} catch (MongoCursorNotFoundException w) {
136+
// Ignore
137+
} catch (MongoSocketException e) {
138+
// Ignore
139+
}
140+
wrapped.close();
141+
wrapped = ((ChangeStreamBatchCursor<T>) changeStreamOperation.resumeAfter(resumeToken).execute(binding)).getWrapped();
142+
binding.release(); // release the new change stream batch cursor's reference to the binding
138143
}
139-
wrapped.close();
140-
wrapped = ((ChangeStreamBatchCursor<T>) changeStreamOperation.resumeAfter(resumeToken).execute(binding)).getWrapped();
141-
binding.release(); // release the new change stream batch cursor's reference to the binding
142-
return function.apply(wrapped);
143144
}
144-
145145
}

0 commit comments

Comments
 (0)