Skip to content

Commit 25cabc3

Browse files
committed
Minor updates to startAtOperation time logic
1 parent 29d2a6a commit 25cabc3

File tree

5 files changed

+106
-46
lines changed

5 files changed

+106
-46
lines changed

driver-core/src/main/com/mongodb/operation/AsyncChangeStreamBatchCursor.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -40,7 +40,7 @@ final class AsyncChangeStreamBatchCursor<T> implements AsyncBatchCursor<T> {
4040
AsyncChangeStreamBatchCursor(final ChangeStreamOperation<T> changeStreamOperation,
4141
final AsyncBatchCursor<RawBsonDocument> wrapped,
4242
final AsyncReadBinding binding) {
43-
changeStreamOperation.setStartOperationTimeForResume(binding.getSessionContext().getOperationTime());
43+
changeStreamOperation.startOperationTimeForResume(binding.getSessionContext().getOperationTime());
4444
this.changeStreamOperation = changeStreamOperation;
4545
this.resumeToken = changeStreamOperation.getResumeToken();
4646
this.wrapped = wrapped;
@@ -115,7 +115,7 @@ public void onResult(final List<RawBsonDocument> result, final Throwable t) {
115115

116116
private void retryOperation(final AsyncBlock asyncBlock, final SingleResultCallback<List<RawBsonDocument>> callback) {
117117
if (resumeToken != null) {
118-
changeStreamOperation.setStartOperationTimeForResume(null);
118+
changeStreamOperation.startOperationTimeForResume(null);
119119
changeStreamOperation.resumeAfter(resumeToken);
120120
}
121121
changeStreamOperation.executeAsync(binding, new SingleResultCallback<AsyncBatchCursor<T>>() {

driver-core/src/main/com/mongodb/operation/ChangeStreamBatchCursor.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -40,7 +40,7 @@ final class ChangeStreamBatchCursor<T> implements BatchCursor<T> {
4040
ChangeStreamBatchCursor(final ChangeStreamOperation<T> changeStreamOperation,
4141
final BatchCursor<RawBsonDocument> wrapped,
4242
final ReadBinding binding) {
43-
changeStreamOperation.setStartOperationTimeForResume(binding.getSessionContext().getOperationTime());
43+
changeStreamOperation.startOperationTimeForResume(binding.getSessionContext().getOperationTime());
4444
this.changeStreamOperation = changeStreamOperation;
4545
this.resumeToken = changeStreamOperation.getResumeToken();
4646
this.wrapped = wrapped;
@@ -139,7 +139,7 @@ <R> R resumeableOperation(final Function<BatchCursor<RawBsonDocument>, R> functi
139139
wrapped.close();
140140

141141
if (resumeToken != null) {
142-
changeStreamOperation.setStartOperationTimeForResume(null);
142+
changeStreamOperation.startOperationTimeForResume(null);
143143
changeStreamOperation.resumeAfter(resumeToken);
144144
}
145145
wrapped = ((ChangeStreamBatchCursor<T>) changeStreamOperation.execute(binding)).getWrapped();

driver-core/src/main/com/mongodb/operation/ChangeStreamOperation.java

Lines changed: 7 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -61,7 +61,7 @@ public class ChangeStreamOperation<T> implements AsyncReadOperation<AsyncBatchCu
6161

6262
private BsonDocument resumeToken;
6363
private BsonTimestamp startAtOperationTime;
64-
private boolean startAtOperationTimeForResume;
64+
private BsonTimestamp startAtOperationTimeForResume;
6565

6666
/**
6767
* Construct a new instance.
@@ -243,14 +243,9 @@ public ChangeStreamOperation<T> startAtOperationTime(final BsonTimestamp startAt
243243
}
244244

245245

246-
void setStartOperationTimeForResume(final BsonTimestamp startAtOperationTime) {
247-
if (startAtOperationTime == null && startAtOperationTimeForResume) {
248-
this.startAtOperationTime = null;
249-
startAtOperationTimeForResume = false;
250-
} else if (this.startAtOperationTime == null && resumeToken == null) {
251-
this.startAtOperationTime = startAtOperationTime;
252-
startAtOperationTimeForResume = true;
253-
}
246+
ChangeStreamOperation<T> startOperationTimeForResume(final BsonTimestamp startAtOperationTime) {
247+
startAtOperationTimeForResume = startAtOperationTime;
248+
return this;
254249
}
255250

256251
/**
@@ -309,9 +304,9 @@ public BsonArray create(final ConnectionDescription description, final SessionCo
309304
}
310305

311306
if (startAtOperationTime != null) {
312-
if (!startAtOperationTimeForResume || serverIsAtLeastVersionFourDotZero(description)) {
313-
changeStream.append("startAtOperationTime", startAtOperationTime);
314-
}
307+
changeStream.append("startAtOperationTime", startAtOperationTime);
308+
} else if (resumeToken == null && startAtOperationTimeForResume != null && serverIsAtLeastVersionFourDotZero(description)) {
309+
changeStream.append("startAtOperationTime", startAtOperationTimeForResume);
315310
}
316311

317312
changeStreamPipeline.add(new BsonDocument("$changeStream", changeStream));

driver-core/src/main/com/mongodb/operation/QueryBatchCursor.java

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -229,8 +229,9 @@ private void getMore() {
229229
throw translateCommandException(e, serverCursor);
230230
}
231231
} else {
232-
initFromQueryResult(connection.getMore(namespace, serverCursor.getId(),
233-
getNumberToReturn(limit, batchSize, count), decoder));
232+
QueryResult<T> getMore = connection.getMore(namespace, serverCursor.getId(),
233+
getNumberToReturn(limit, batchSize, count), decoder);
234+
initFromQueryResult(getMore);
234235
}
235236
if (limitReached()) {
236237
killCursor(connection);

driver-core/src/test/functional/com/mongodb/operation/ChangeStreamOperationSpecification.groovy

Lines changed: 92 additions & 28 deletions
Original file line numberDiff line numberDiff line change
@@ -34,6 +34,8 @@ import com.mongodb.client.model.changestream.UpdateDescription
3434
import com.mongodb.client.test.CollectionHelper
3535
import com.mongodb.connection.AsyncConnection
3636
import com.mongodb.connection.Connection
37+
import com.mongodb.connection.ConnectionDescription
38+
import com.mongodb.connection.ServerVersion
3739
import com.mongodb.session.SessionContext
3840
import org.bson.BsonArray
3941
import org.bson.BsonBoolean
@@ -452,46 +454,78 @@ class ChangeStreamOperationSpecification extends OperationFunctionalSpecificatio
452454

453455
def 'should set the startAtOperationTime on the sync cursor'() {
454456
given:
457+
def changeStream
455458
def binding = Stub(ReadBinding) {
456459
getSessionContext() >> Stub(SessionContext) {
457460
getReadConcern() >> ReadConcern.DEFAULT
458-
getOperationTime() >> new BsonTimestamp(42, 1)
461+
getOperationTime() >> new BsonTimestamp()
459462
}
460463
getReadConnectionSource() >> Stub(ConnectionSource) {
461464
getConnection() >> Stub(Connection) {
462-
command(*_) >> new BsonDocument('cursor', new BsonDocument('id', new BsonInt64(1))
463-
.append('ns', new BsonString(getNamespace().getFullName()))
464-
.append('firstBatch', new BsonArrayWrapper([])))
465+
command(*_) >> {
466+
changeStream = getChangeStream(it[1])
467+
new BsonDocument('cursor', new BsonDocument('id', new BsonInt64(1))
468+
.append('ns', new BsonString(getNamespace().getFullName()))
469+
.append('firstBatch', new BsonArrayWrapper([])))
470+
}
471+
getDescription() >> Stub(ConnectionDescription) {
472+
getServerVersion() >> new ServerVersion([4, 0, 0])
473+
}
465474
}
466475
}
467476
}
468477

469478
when: 'Resume token'
470-
def operation = new ChangeStreamOperation<BsonDocument>(helper.getNamespace(), FullDocument.DEFAULT, [], CODEC)
479+
new ChangeStreamOperation<BsonDocument>(helper.getNamespace(), FullDocument.DEFAULT, [], CODEC)
471480
.resumeAfter(new BsonDocument())
472-
operation.execute(binding)
481+
.execute(binding)
473482

474483
then:
475-
operation.getStartAtOperationTime() == null
484+
changeStream.containsKey('resumeAfter')
485+
!changeStream.containsKey('startAtOperationTime')
476486

477-
when: 'No token or time'
478-
operation = new ChangeStreamOperation<BsonDocument>(helper.getNamespace(), FullDocument.DEFAULT, [], CODEC)
479-
operation.execute(binding)
487+
when: 'Set startAtOperationTime'
488+
def startAtTime = new BsonTimestamp(42)
489+
new ChangeStreamOperation<BsonDocument>(helper.getNamespace(), FullDocument.DEFAULT, [], CODEC)
490+
.startAtOperationTime(startAtTime)
491+
.execute(binding)
480492

481493
then:
482-
operation.getStartAtOperationTime() == new BsonTimestamp(42, 1)
494+
changeStream.getTimestamp('startAtOperationTime') == startAtTime
483495

484-
when: 'Set time'
485-
def startAtTime = new BsonTimestamp(42)
486-
operation = operation.startAtOperationTime(startAtTime)
487-
operation.execute(binding)
496+
when: 'set startAtOperationTimeForResume'
497+
def resumeStartAt = new BsonTimestamp(42, 42)
498+
new ChangeStreamOperation<BsonDocument>(helper.getNamespace(), FullDocument.DEFAULT, [], CODEC)
499+
.startOperationTimeForResume(resumeStartAt)
500+
.execute(binding)
501+
502+
then:
503+
changeStream.getTimestamp('startAtOperationTime') == resumeStartAt
504+
505+
when: 'set startAtOperationTime && startAtOperationTimeForResume'
506+
new ChangeStreamOperation<BsonDocument>(helper.getNamespace(), FullDocument.DEFAULT, [], CODEC)
507+
.startAtOperationTime(startAtTime)
508+
.startOperationTimeForResume(resumeStartAt)
509+
.execute(binding)
510+
511+
then:
512+
changeStream.getTimestamp('startAtOperationTime') == startAtTime
513+
514+
when: 'set resumeToken && startAtOperationTime && startAtOperationTimeForResume'
515+
new ChangeStreamOperation<BsonDocument>(helper.getNamespace(), FullDocument.DEFAULT, [], CODEC)
516+
.resumeAfter(new BsonDocument())
517+
.startAtOperationTime(startAtTime)
518+
.startOperationTimeForResume(resumeStartAt)
519+
.execute(binding)
488520

489521
then:
490-
operation.getStartAtOperationTime() == startAtTime
522+
changeStream.containsKey('resumeAfter')
523+
changeStream.getTimestamp('startAtOperationTime') == startAtTime
491524
}
492525

493526
def 'should set the startAtOperationTime on the async cursor'() {
494527
given:
528+
def changeStream
495529
def binding = Stub(AsyncReadBinding) {
496530
getSessionContext() >> Stub(SessionContext) {
497531
getReadConcern() >> ReadConcern.DEFAULT
@@ -502,39 +536,66 @@ class ChangeStreamOperationSpecification extends OperationFunctionalSpecificatio
502536
getConnection(_) >> {
503537
it.last().onResult(Stub(AsyncConnection) {
504538
commandAsync(*_) >> {
539+
changeStream = getChangeStream(it[1])
505540
it.last().onResult(new BsonDocument('cursor', new BsonDocument('id', new BsonInt64(1))
506541
.append('ns', new BsonString(getNamespace().getFullName()))
507542
.append('firstBatch', new BsonArrayWrapper([]))), null)
508543
}
544+
getDescription() >> Stub(ConnectionDescription) {
545+
getServerVersion() >> new ServerVersion([4, 0, 0])
546+
}
509547
}, null)
510548
}
511549
}, null)
512550
}
513551
}
514552

515553
when: 'Resume Token'
516-
def operation = new ChangeStreamOperation<BsonDocument>(helper.getNamespace(), FullDocument.DEFAULT, [], CODEC)
554+
new ChangeStreamOperation<BsonDocument>(helper.getNamespace(), FullDocument.DEFAULT, [], CODEC)
517555
.resumeAfter(new BsonDocument())
556+
.executeAsync(binding, Stub(SingleResultCallback))
557+
558+
then:
559+
changeStream.containsKey('resumeAfter')
560+
!changeStream.containsKey('startAtOperationTime')
518561

519-
operation.executeAsync(binding, Stub(SingleResultCallback))
562+
when: 'Set startAtOperationTime'
563+
def startAtTime = new BsonTimestamp(42)
564+
new ChangeStreamOperation<BsonDocument>(helper.getNamespace(), FullDocument.DEFAULT, [], CODEC)
565+
.startAtOperationTime(startAtTime)
566+
.executeAsync(binding, Stub(SingleResultCallback))
520567

521568
then:
522-
operation.getStartAtOperationTime() == null
569+
changeStream.getTimestamp('startAtOperationTime') == startAtTime
523570

524-
when: 'No token or time'
525-
operation = new ChangeStreamOperation<BsonDocument>(helper.getNamespace(), FullDocument.DEFAULT, [], CODEC)
526-
operation.executeAsync(binding, Stub(SingleResultCallback))
571+
when: 'set startAtOperationTimeForResume'
572+
def resumeStartAt = new BsonTimestamp(42, 42)
573+
new ChangeStreamOperation<BsonDocument>(helper.getNamespace(), FullDocument.DEFAULT, [], CODEC)
574+
.startOperationTimeForResume(resumeStartAt)
575+
.executeAsync(binding, Stub(SingleResultCallback))
527576

528577
then:
529-
operation.getStartAtOperationTime() == new BsonTimestamp()
578+
changeStream.getTimestamp('startAtOperationTime') == resumeStartAt
530579

531-
when: 'set time'
532-
def startAtTime = new BsonTimestamp(42)
533-
operation = operation.startAtOperationTime(startAtTime)
534-
operation.executeAsync(binding, Stub(SingleResultCallback))
580+
when: 'set startAtOperationTime && startAtOperationTimeForResume'
581+
new ChangeStreamOperation<BsonDocument>(helper.getNamespace(), FullDocument.DEFAULT, [], CODEC)
582+
.startAtOperationTime(startAtTime)
583+
.startOperationTimeForResume(resumeStartAt)
584+
.executeAsync(binding, Stub(SingleResultCallback))
585+
586+
then:
587+
changeStream.getTimestamp('startAtOperationTime') == startAtTime
588+
589+
when: 'set resumeToken && startAtOperationTime && startAtOperationTimeForResume'
590+
new ChangeStreamOperation<BsonDocument>(helper.getNamespace(), FullDocument.DEFAULT, [], CODEC)
591+
.resumeAfter(new BsonDocument())
592+
.startAtOperationTime(startAtTime)
593+
.startOperationTimeForResume(resumeStartAt)
594+
.executeAsync(binding, Stub(SingleResultCallback))
535595

536596
then:
537-
operation.getStartAtOperationTime() == startAtTime
597+
changeStream.containsKey('resumeAfter')
598+
changeStream.getTimestamp('startAtOperationTime') == startAtTime
538599
}
539600

540601
private final static CODEC = new BsonDocumentCodec()
@@ -573,4 +634,7 @@ class ChangeStreamOperationSpecification extends OperationFunctionalSpecificatio
573634
}
574635
}
575636

637+
def getChangeStream(BsonDocument command) {
638+
command.getArray('pipeline').head().getDocument('$changeStream')
639+
}
576640
}

0 commit comments

Comments
 (0)