Skip to content

Commit 8064395

Browse files
committed
GODRIVER-1648 Add check and test for closed cursor in Next (#428)
1 parent 6ea597f commit 8064395

File tree

2 files changed

+28
-1
lines changed

2 files changed

+28
-1
lines changed

mongo/change_stream.go

Lines changed: 6 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -495,7 +495,7 @@ func (cs *ChangeStream) TryNext(ctx context.Context) bool {
495495
}
496496

497497
func (cs *ChangeStream) next(ctx context.Context, nonBlocking bool) bool {
498-
// return false right away if the change stream has already errored.
498+
// return false right away if the change stream has already errored or if cursor is closed.
499499
if cs.err != nil {
500500
return false
501501
}
@@ -538,6 +538,11 @@ func (cs *ChangeStream) loopNext(ctx context.Context, nonBlocking bool) {
538538

539539
cs.err = replaceErrors(cs.cursor.Err())
540540
if cs.err == nil {
541+
// Check if cursor is alive
542+
if cs.ID() == 0 {
543+
return
544+
}
545+
541546
// If a getMore was done but the batch was empty, the batch cursor will return false with no error.
542547
// Update the tracked resume token to catch the post batch resume token from the server response.
543548
cs.updatePbrtFromCommand()

mongo/integration/change_stream_test.go

Lines changed: 22 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -602,6 +602,28 @@ func TestChangeStream_ReplicaSet(t *testing.T) {
602602
assert.Equal(mt, 2, numClearedEvents, "expected two PoolCleared events, got %d", numClearedEvents)
603603
})
604604
})
605+
// Setting min server version as 4.0 since v3.6 does not send a "dropEvent"
606+
mt.RunOpts("call to cursor.Next after cursor closed", mtest.NewOptions().MinServerVersion("4.0"), func(mt *mtest.T) {
607+
cs, err := mt.Coll.Watch(mtest.Background, mongo.Pipeline{})
608+
assert.Nil(mt, err, "Watch error: %v", err)
609+
defer closeStream(cs)
610+
611+
// Generate insert events
612+
generateEvents(mt, 5)
613+
// Call Coll.Drop to generate drop and invalidate event
614+
err = mt.Coll.Drop(mtest.Background)
615+
assert.Nil(mt, err, "Drop error: %v", err)
616+
617+
// Test that all events were successful
618+
for i := 0; i < 7; i++ {
619+
assert.True(mt, cs.Next(mtest.Background), "Next returned false at index %d; iteration error: %v", i, cs.Err())
620+
}
621+
622+
operationType := cs.Current.Lookup("operationType").StringValue()
623+
assert.Equal(mt, operationType, "invalidate", "expected invalidate event but returned %q event", operationType)
624+
// next call to cs.Next should return False since cursor is closed
625+
assert.False(mt, cs.Next(mtest.Background), "expected to return false, but returned true")
626+
})
605627
}
606628

607629
func closeStream(cs *mongo.ChangeStream) {

0 commit comments

Comments
 (0)