Skip to content

Commit 954304d

Browse files
authored
GODRIVER-1648 Add check and test for closed cursor in Next (#428)
1 parent fedbabd commit 954304d

File tree

2 files changed

+28
-1
lines changed

2 files changed

+28
-1
lines changed

mongo/change_stream.go

Lines changed: 6 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -516,7 +516,7 @@ func (cs *ChangeStream) TryNext(ctx context.Context) bool {
516516
}
517517

518518
func (cs *ChangeStream) next(ctx context.Context, nonBlocking bool) bool {
519-
// return false right away if the change stream has already errored.
519+
// return false right away if the change stream has already errored or if cursor is closed.
520520
if cs.err != nil {
521521
return false
522522
}
@@ -559,6 +559,11 @@ func (cs *ChangeStream) loopNext(ctx context.Context, nonBlocking bool) {
559559

560560
cs.err = replaceErrors(cs.cursor.Err())
561561
if cs.err == nil {
562+
// Check if cursor is alive
563+
if cs.ID() == 0 {
564+
return
565+
}
566+
562567
// If a getMore was done but the batch was empty, the batch cursor will return false with no error.
563568
// Update the tracked resume token to catch the post batch resume token from the server response.
564569
cs.updatePbrtFromCommand()

mongo/integration/change_stream_test.go

Lines changed: 22 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -586,6 +586,28 @@ func TestChangeStream_ReplicaSet(t *testing.T) {
586586
assert.Equal(mt, 2, numClearedEvents, "expected two PoolCleared events, got %d", numClearedEvents)
587587
})
588588
})
589+
// Setting min server version as 4.0 since v3.6 does not send a "dropEvent"
590+
mt.RunOpts("call to cursor.Next after cursor closed", mtest.NewOptions().MinServerVersion("4.0"), func(mt *mtest.T) {
591+
cs, err := mt.Coll.Watch(mtest.Background, mongo.Pipeline{})
592+
assert.Nil(mt, err, "Watch error: %v", err)
593+
defer closeStream(cs)
594+
595+
// Generate insert events
596+
generateEvents(mt, 5)
597+
// Call Coll.Drop to generate drop and invalidate event
598+
err = mt.Coll.Drop(mtest.Background)
599+
assert.Nil(mt, err, "Drop error: %v", err)
600+
601+
// Test that all events were successful
602+
for i := 0; i < 7; i++ {
603+
assert.True(mt, cs.Next(mtest.Background), "Next returned false at index %d; iteration error: %v", i, cs.Err())
604+
}
605+
606+
operationType := cs.Current.Lookup("operationType").StringValue()
607+
assert.Equal(mt, operationType, "invalidate", "expected invalidate event but returned %q event", operationType)
608+
// next call to cs.Next should return False since cursor is closed
609+
assert.False(mt, cs.Next(mtest.Background), "expected to return false, but returned true")
610+
})
589611
}
590612

591613
func closeStream(cs *mongo.ChangeStream) {

0 commit comments

Comments
 (0)