Skip to content

Commit c091973

Browse files
author
Divjot Arora
committed
Add nil checks for change stream cursors.
GODRIVER-464 Change-Id: I62b06a9fd52c9cd78d2fd32648d795705a98a102
1 parent 62004e6 commit c091973

File tree

2 files changed

+49
-0
lines changed

2 files changed

+49
-0
lines changed

mongo/change_stream.go

Lines changed: 26 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -31,6 +31,9 @@ const errorCursorKilled int32 = 237
3131
// contain a resume token.
3232
var ErrMissingResumeToken = errors.New("cannot provide resume functionality when the resume token is missing")
3333

34+
// ErrNilCursor indicates that the cursor for the change stream is nil.
35+
var ErrNilCursor = errors.New("cursor is nil")
36+
3437
type changeStream struct {
3538
cmd bsonx.Doc // aggregate command to run to create stream and rebuild cursor
3639
pipeline bsonx.Arr
@@ -396,10 +399,18 @@ func newClientChangeStream(ctx context.Context, client *Client, pipeline interfa
396399
}
397400

398401
func (cs *changeStream) ID() int64 {
402+
if cs.cursor == nil {
403+
return 0
404+
}
405+
399406
return cs.cursor.ID()
400407
}
401408

402409
func (cs *changeStream) Next(ctx context.Context) bool {
410+
if cs.cursor == nil {
411+
return false
412+
}
413+
403414
if cs.cursor.Next(ctx) {
404415
return true
405416
}
@@ -431,6 +442,10 @@ func (cs *changeStream) Next(ctx context.Context) bool {
431442
}
432443

433444
func (cs *changeStream) Decode(out interface{}) error {
445+
if cs.cursor == nil {
446+
return ErrNilCursor
447+
}
448+
434449
br, err := cs.DecodeBytes()
435450
if err != nil {
436451
return err
@@ -440,6 +455,10 @@ func (cs *changeStream) Decode(out interface{}) error {
440455
}
441456

442457
func (cs *changeStream) DecodeBytes() (bson.Raw, error) {
458+
if cs.cursor == nil {
459+
return nil, ErrNilCursor
460+
}
461+
443462
br, err := cs.cursor.DecodeBytes()
444463
if err != nil {
445464
return nil, err
@@ -471,11 +490,18 @@ func (cs *changeStream) Err() error {
471490
if cs.err != nil {
472491
return cs.err
473492
}
493+
if cs.cursor == nil {
494+
return nil
495+
}
474496

475497
return cs.cursor.Err()
476498
}
477499

478500
func (cs *changeStream) Close(ctx context.Context) error {
501+
if cs.cursor == nil {
502+
return nil // cursor is already closed
503+
}
504+
479505
return cs.cursor.Close(ctx)
480506
}
481507

mongo/change_stream_test.go

Lines changed: 23 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -239,6 +239,29 @@ func TestChangeStream(t *testing.T) {
239239
t.Errorf("Should have returned command error, but got %T", err)
240240
}
241241
})
242+
243+
t.Run("TestNilCursor", func(t *testing.T) {
244+
cs := &changeStream{}
245+
246+
if id := cs.ID(); id != 0 {
247+
t.Fatalf("Wrong ID returned. Expected 0 got %d", id)
248+
}
249+
if cs.Next(ctx) {
250+
t.Fatalf("Next returned true, expected false")
251+
}
252+
if err := cs.Decode(nil); err != ErrNilCursor {
253+
t.Fatalf("Wrong decode err. Expected ErrNilCursor got %s", err)
254+
}
255+
if _, err := cs.DecodeBytes(); err != ErrNilCursor {
256+
t.Fatalf("Wrong decode bytes err. Expected ErrNilCursor got %s", err)
257+
}
258+
if err := cs.Err(); err != nil {
259+
t.Fatalf("Wrong Err error. Expected nil got %s", err)
260+
}
261+
if err := cs.Close(ctx); err != nil {
262+
t.Fatalf("Wrong Close error. Expected nil got %s", err)
263+
}
264+
})
242265
}
243266

244267
func TestChangeStream_ReplicaSet(t *testing.T) {

0 commit comments

Comments
 (0)