Skip to content

Commit 29d2a6a

Browse files
jyeminrozza
authored andcommitted
Restrict when startAtOperationTime is included for change streams
When resuming a change stream, the driver is required to save the operationTime returned in the aggregate command response and send it on subsequent aggregate commands when resuming the change stream, until a resumeToken is available. This commit ensures that it's only done when connected to a MongoDB 4.0 server, as per specification. JAVA-2892
1 parent 4b541c3 commit 29d2a6a

File tree

4 files changed

+24
-12
lines changed

4 files changed

+24
-12
lines changed

driver-core/src/main/com/mongodb/operation/AsyncChangeStreamBatchCursor.java

Lines changed: 3 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -40,9 +40,7 @@ final class AsyncChangeStreamBatchCursor<T> implements AsyncBatchCursor<T> {
4040
AsyncChangeStreamBatchCursor(final ChangeStreamOperation<T> changeStreamOperation,
4141
final AsyncBatchCursor<RawBsonDocument> wrapped,
4242
final AsyncReadBinding binding) {
43-
if (changeStreamOperation.getResumeToken() == null && changeStreamOperation.getStartAtOperationTime() == null) {
44-
changeStreamOperation.startAtOperationTime(binding.getSessionContext().getOperationTime());
45-
}
43+
changeStreamOperation.setStartOperationTimeForResume(binding.getSessionContext().getOperationTime());
4644
this.changeStreamOperation = changeStreamOperation;
4745
this.resumeToken = changeStreamOperation.getResumeToken();
4846
this.wrapped = wrapped;
@@ -117,7 +115,8 @@ public void onResult(final List<RawBsonDocument> result, final Throwable t) {
117115

118116
private void retryOperation(final AsyncBlock asyncBlock, final SingleResultCallback<List<RawBsonDocument>> callback) {
119117
if (resumeToken != null) {
120-
changeStreamOperation.startAtOperationTime(null).resumeAfter(resumeToken);
118+
changeStreamOperation.setStartOperationTimeForResume(null);
119+
changeStreamOperation.resumeAfter(resumeToken);
121120
}
122121
changeStreamOperation.executeAsync(binding, new SingleResultCallback<AsyncBatchCursor<T>>() {
123122
@Override

driver-core/src/main/com/mongodb/operation/ChangeStreamBatchCursor.java

Lines changed: 3 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -40,9 +40,7 @@ final class ChangeStreamBatchCursor<T> implements BatchCursor<T> {
4040
ChangeStreamBatchCursor(final ChangeStreamOperation<T> changeStreamOperation,
4141
final BatchCursor<RawBsonDocument> wrapped,
4242
final ReadBinding binding) {
43-
if (changeStreamOperation.getResumeToken() == null && changeStreamOperation.getStartAtOperationTime() == null) {
44-
changeStreamOperation.startAtOperationTime(binding.getSessionContext().getOperationTime());
45-
}
43+
changeStreamOperation.setStartOperationTimeForResume(binding.getSessionContext().getOperationTime());
4644
this.changeStreamOperation = changeStreamOperation;
4745
this.resumeToken = changeStreamOperation.getResumeToken();
4846
this.wrapped = wrapped;
@@ -141,7 +139,8 @@ <R> R resumeableOperation(final Function<BatchCursor<RawBsonDocument>, R> functi
141139
wrapped.close();
142140

143141
if (resumeToken != null) {
144-
changeStreamOperation.startAtOperationTime(null).resumeAfter(resumeToken);
142+
changeStreamOperation.setStartOperationTimeForResume(null);
143+
changeStreamOperation.resumeAfter(resumeToken);
145144
}
146145
wrapped = ((ChangeStreamBatchCursor<T>) changeStreamOperation.execute(binding)).getWrapped();
147146
binding.release(); // release the new change stream batch cursor's reference to the binding

driver-core/src/main/com/mongodb/operation/ChangeStreamOperation.java

Lines changed: 16 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -42,6 +42,7 @@
4242
import java.util.concurrent.TimeUnit;
4343

4444
import static com.mongodb.assertions.Assertions.notNull;
45+
import static com.mongodb.internal.operation.ServerVersionHelper.serverIsAtLeastVersionFourDotZero;
4546

4647
/**
4748
* An operation that executes an {@code $changeStream} aggregation.
@@ -60,6 +61,7 @@ public class ChangeStreamOperation<T> implements AsyncReadOperation<AsyncBatchCu
6061

6162
private BsonDocument resumeToken;
6263
private BsonTimestamp startAtOperationTime;
64+
private boolean startAtOperationTimeForResume;
6365

6466
/**
6567
* Construct a new instance.
@@ -240,6 +242,17 @@ public ChangeStreamOperation<T> startAtOperationTime(final BsonTimestamp startAt
240242
return this;
241243
}
242244

245+
246+
void setStartOperationTimeForResume(final BsonTimestamp startAtOperationTime) {
247+
if (startAtOperationTime == null && startAtOperationTimeForResume) {
248+
this.startAtOperationTime = null;
249+
startAtOperationTimeForResume = false;
250+
} else if (this.startAtOperationTime == null && resumeToken == null) {
251+
this.startAtOperationTime = startAtOperationTime;
252+
startAtOperationTimeForResume = true;
253+
}
254+
}
255+
243256
/**
244257
* Returns the start at operation time
245258
*
@@ -296,7 +309,9 @@ public BsonArray create(final ConnectionDescription description, final SessionCo
296309
}
297310

298311
if (startAtOperationTime != null) {
299-
changeStream.append("startAtOperationTime", startAtOperationTime);
312+
if (!startAtOperationTimeForResume || serverIsAtLeastVersionFourDotZero(description)) {
313+
changeStream.append("startAtOperationTime", startAtOperationTime);
314+
}
300315
}
301316

302317
changeStreamPipeline.add(new BsonDocument("$changeStream", changeStream));
@@ -305,5 +320,4 @@ public BsonArray create(final ConnectionDescription description, final SessionCo
305320
}
306321
};
307322
}
308-
309323
}

driver-core/src/test/functional/com/mongodb/operation/ChangeStreamOperationSpecification.groovy

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -455,7 +455,7 @@ class ChangeStreamOperationSpecification extends OperationFunctionalSpecificatio
455455
def binding = Stub(ReadBinding) {
456456
getSessionContext() >> Stub(SessionContext) {
457457
getReadConcern() >> ReadConcern.DEFAULT
458-
getOperationTime() >> new BsonTimestamp()
458+
getOperationTime() >> new BsonTimestamp(42, 1)
459459
}
460460
getReadConnectionSource() >> Stub(ConnectionSource) {
461461
getConnection() >> Stub(Connection) {
@@ -479,7 +479,7 @@ class ChangeStreamOperationSpecification extends OperationFunctionalSpecificatio
479479
operation.execute(binding)
480480

481481
then:
482-
operation.getStartAtOperationTime() == new BsonTimestamp()
482+
operation.getStartAtOperationTime() == new BsonTimestamp(42, 1)
483483

484484
when: 'Set time'
485485
def startAtTime = new BsonTimestamp(42)

0 commit comments

Comments
 (0)