Skip to content

Commit 2c7dc57

Browse files
committed
Make ChangeStreamOperationSpecification more resilient
Remove assumption that expected change stream documents are returned in a single batch. JAVA-3742
1 parent ecbce07 commit 2c7dc57

File tree

2 files changed

+35
-28
lines changed

2 files changed

+35
-28
lines changed

driver-core/src/test/functional/com/mongodb/OperationFunctionalSpecification.groovy

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -156,6 +156,16 @@ class OperationFunctionalSpecification extends Specification {
156156
results
157157
}
158158

159+
def next(cursor, boolean async, int minimumCount) {
160+
List<BsonDocument> retVal = []
161+
162+
while (retVal.size() < minimumCount) {
163+
retVal.addAll(next(cursor, async))
164+
}
165+
166+
retVal
167+
}
168+
159169
def next(cursor, boolean async) {
160170
if (async) {
161171
def futureResultCallback = new FutureResultCallback<List<BsonDocument>>()

driver-core/src/test/functional/com/mongodb/operation/ChangeStreamOperationSpecification.groovy

Lines changed: 25 additions & 28 deletions
Original file line numberDiff line numberDiff line change
@@ -16,7 +16,6 @@
1616

1717
package com.mongodb.operation
1818

19-
2019
import com.mongodb.MongoException
2120
import com.mongodb.MongoNamespace
2221
import com.mongodb.OperationFunctionalSpecification
@@ -155,7 +154,7 @@ class ChangeStreamOperationSpecification extends OperationFunctionalSpecificatio
155154
def expected = insertDocuments(helper, [1, 2])
156155

157156
then:
158-
def next = nextAndClean(cursor, async)
157+
def next = nextAndClean(cursor, async, expected.size())
159158
next == expected
160159

161160
when:
@@ -164,7 +163,7 @@ class ChangeStreamOperationSpecification extends OperationFunctionalSpecificatio
164163

165164
then:
166165
cursor.getBatchSize() == 5
167-
nextAndClean(cursor, async) == expected
166+
nextAndClean(cursor, async, expected.size()) == expected
168167

169168
then:
170169
if (async) {
@@ -193,7 +192,7 @@ class ChangeStreamOperationSpecification extends OperationFunctionalSpecificatio
193192
when:
194193
def cursor = execute(operation, false)
195194
helper.insertDocuments(BsonDocument.parse('{ _id : 2, x : 2 }'))
196-
ChangeStreamDocument<BsonDocument> next = next(cursor, false).get(0)
195+
ChangeStreamDocument<BsonDocument> next = next(cursor, false, 1).get(0)
197196

198197
then:
199198
next.getResumeToken() != null
@@ -220,7 +219,7 @@ class ChangeStreamOperationSpecification extends OperationFunctionalSpecificatio
220219
when:
221220
def cursor = execute(operation, false)
222221
helper.updateOne(BsonDocument.parse('{ _id : 2}'), BsonDocument.parse('{ $set : {x : 3}, $unset : {y : 1}}'))
223-
ChangeStreamDocument<BsonDocument> next = next(cursor, false).get(0)
222+
ChangeStreamDocument<BsonDocument> next = next(cursor, false, 1).get(0)
224223

225224
then:
226225
next.getResumeToken() != null
@@ -247,7 +246,7 @@ class ChangeStreamOperationSpecification extends OperationFunctionalSpecificatio
247246
when:
248247
def cursor = execute(operation, false)
249248
helper.replaceOne(BsonDocument.parse('{ _id : 2}'), BsonDocument.parse('{ _id : 2, x : 3}'), false)
250-
ChangeStreamDocument<BsonDocument> next = next(cursor, false).get(0)
249+
ChangeStreamDocument<BsonDocument> next = next(cursor, false, 1).get(0)
251250

252251
then:
253252
next.getResumeToken() != null
@@ -274,7 +273,7 @@ class ChangeStreamOperationSpecification extends OperationFunctionalSpecificatio
274273
when:
275274
def cursor = execute(operation, false)
276275
helper.deleteOne(BsonDocument.parse('{ _id : 2}'))
277-
ChangeStreamDocument<BsonDocument> next = next(cursor, false).get(0)
276+
ChangeStreamDocument<BsonDocument> next = next(cursor, false, 1).get(0)
278277

279278
then:
280279
next.getResumeToken() != null
@@ -301,7 +300,7 @@ class ChangeStreamOperationSpecification extends OperationFunctionalSpecificatio
301300
when:
302301
def cursor = execute(operation, false)
303302
helper.drop()
304-
ChangeStreamDocument<BsonDocument> next = next(cursor, false).get(0)
303+
ChangeStreamDocument<BsonDocument> next = next(cursor, false, 1).get(0)
305304

306305
then:
307306
next.getResumeToken() != null
@@ -329,7 +328,7 @@ class ChangeStreamOperationSpecification extends OperationFunctionalSpecificatio
329328
when:
330329
def cursor = execute(operation, false)
331330
helper.drop()
332-
ChangeStreamDocument<BsonDocument> next = next(cursor, false).get(0)
331+
ChangeStreamDocument<BsonDocument> next = next(cursor, false, 1).get(0)
333332

334333
then:
335334
next.getResumeToken() != null
@@ -358,7 +357,7 @@ class ChangeStreamOperationSpecification extends OperationFunctionalSpecificatio
358357
when:
359358
def cursor = execute(operation, false)
360359
helper.dropDatabase('JavaDriverTest')
361-
ChangeStreamDocument<BsonDocument> next = next(cursor, false).get(0)
360+
ChangeStreamDocument<BsonDocument> next = next(cursor, false, 1).get(0)
362361

363362
then:
364363
next.getResumeToken() != null
@@ -387,7 +386,7 @@ class ChangeStreamOperationSpecification extends OperationFunctionalSpecificatio
387386
when:
388387
def cursor = execute(operation, false)
389388
helper.renameCollection(newNamespace)
390-
ChangeStreamDocument<BsonDocument> next = next(cursor, false).get(0)
389+
ChangeStreamDocument<BsonDocument> next = next(cursor, false, 1).get(0)
391390

392391
then:
393392
next.getResumeToken() != null
@@ -442,7 +441,7 @@ class ChangeStreamOperationSpecification extends OperationFunctionalSpecificatio
442441
def expected = insertDocuments(helper, [1, 2])
443442

444443
then:
445-
nextAndClean(cursor, async) == expected
444+
nextAndClean(cursor, async, expected.size()) == expected
446445

447446
then:
448447
tryNextAndClean(cursor, async) == null
@@ -451,7 +450,7 @@ class ChangeStreamOperationSpecification extends OperationFunctionalSpecificatio
451450
expected = insertDocuments(helper, [3, 4])
452451

453452
then:
454-
nextAndClean(cursor, async) == expected
453+
nextAndClean(cursor, async, expected.size()) == expected
455454

456455
cleanup:
457456
cursor?.close()
@@ -473,15 +472,12 @@ class ChangeStreamOperationSpecification extends OperationFunctionalSpecificatio
473472
def expected = insertDocuments(helper, [1, 2])
474473

475474
then:
476-
nextAndClean(cursor, async) == expected
475+
nextAndClean(cursor, async, expected.size()) == expected
477476

478477
when:
479478
helper.killCursor(helper.getNamespace(), cursor.getWrapped().getServerCursor())
480479
expected = insertDocuments(helper, [3, 4])
481-
def results = nextAndClean(cursor, async)
482-
if (results.size() < expected.size()) {
483-
results.addAll(nextAndClean(cursor, async))
484-
}
480+
def results = nextAndClean(cursor, async, expected.size())
485481

486482
then:
487483
results == expected
@@ -493,10 +489,7 @@ class ChangeStreamOperationSpecification extends OperationFunctionalSpecificatio
493489
expected = insertDocuments(helper, [5, 6])
494490
helper.killCursor(helper.getNamespace(), cursor.getWrapped().getServerCursor())
495491

496-
results = nextAndClean(cursor, async)
497-
if (results.size() < expected.size()) {
498-
results.addAll(nextAndClean(cursor, async))
499-
}
492+
results = nextAndClean(cursor, async, expected.size())
500493

501494
then:
502495
results == expected
@@ -521,7 +514,7 @@ class ChangeStreamOperationSpecification extends OperationFunctionalSpecificatio
521514

522515
when:
523516
def expected = insertDocuments(helper, [1, 2])
524-
def result = next(cursor, async)
517+
def result = next(cursor, async, 2)
525518

526519
then:
527520
result.size() == 2
@@ -532,7 +525,7 @@ class ChangeStreamOperationSpecification extends OperationFunctionalSpecificatio
532525

533526
operation.startAtOperationTime(result.last().getTimestamp('clusterTime'))
534527
cursor = execute(operation, async)
535-
result = nextAndClean(cursor, async)
528+
result = nextAndClean(cursor, async, expected.tail.size())
536529

537530
then:
538531
result == expected.tail()
@@ -556,7 +549,7 @@ class ChangeStreamOperationSpecification extends OperationFunctionalSpecificatio
556549

557550
when:
558551
def expected = insertDocuments(helper, [1, 2])
559-
def result = next(cursor, async)
552+
def result = next(cursor, async, 2)
560553

561554
then:
562555
result.size() == 2
@@ -567,7 +560,7 @@ class ChangeStreamOperationSpecification extends OperationFunctionalSpecificatio
567560

568561
operation.resumeAfter(result.head().getDocument('_id')).startAtOperationTime(null)
569562
cursor = execute(operation, async)
570-
result = nextAndClean(cursor, async)
563+
result = nextAndClean(cursor, async, expected.tail().size())
571564

572565
then:
573566
result == expected.tail()
@@ -592,7 +585,7 @@ class ChangeStreamOperationSpecification extends OperationFunctionalSpecificatio
592585

593586
when:
594587
def expected = insertDocuments(helper, [1, 2])
595-
def result = next(cursor, async)
588+
def result = next(cursor, async, 2)
596589

597590
then:
598591
result.size() == 2
@@ -602,7 +595,7 @@ class ChangeStreamOperationSpecification extends OperationFunctionalSpecificatio
602595
waitForLastRelease(async ? getAsyncCluster() : getCluster())
603596

604597
cursor = execute(operation.startAfter(result.head().getDocument('_id')).startAtOperationTime(null), async)
605-
result = nextAndClean(cursor, async)
598+
result = nextAndClean(cursor, async, expected.tail().size())
606599

607600
then:
608601
result == expected.tail()
@@ -762,6 +755,10 @@ class ChangeStreamOperationSpecification extends OperationFunctionalSpecificatio
762755
removeExtra(tryNext(cursor, async))
763756
}
764757

758+
def nextAndClean(cursor, boolean async, int minimumCount) {
759+
removeExtra(next(cursor, async, minimumCount))
760+
}
761+
765762
def nextAndClean(cursor, boolean async) {
766763
removeExtra(next(cursor, async))
767764
}

0 commit comments

Comments
 (0)