Skip to content

Commit 397cb6a

Browse files
author
Isabella Siu
committed
GODRIVER-603 changestream shouldn't panic when a user changes resumetoken
Change-Id: Ib495b53b1ca1867dc1c4f48a654e8f8405737839
1 parent 802b0ba commit 397cb6a

File tree

2 files changed

+47
-1
lines changed

2 files changed

+47
-1
lines changed

mongo/change_stream.go

Lines changed: 8 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -197,7 +197,14 @@ func (cs *changeStream) DecodeBytes() (bson.Raw, error) {
197197
return nil, ErrMissingResumeToken
198198
}
199199

200-
cs.resumeToken, err = bsonx.ReadDoc(id.Document())
200+
var idDoc bson.Raw
201+
idDoc, ok := id.DocumentOK()
202+
if !ok {
203+
_ = cs.Close(context.Background())
204+
return nil, ErrMissingResumeToken
205+
}
206+
cs.resumeToken, err = bsonx.ReadDoc(idDoc)
207+
201208
if err != nil {
202209
_ = cs.Close(context.Background())
203210
return nil, ErrMissingResumeToken

mongo/change_stream_test.go

Lines changed: 39 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -14,6 +14,7 @@ import (
1414
"time"
1515

1616
"github.com/google/go-cmp/cmp"
17+
"github.com/mongodb/mongo-go-driver/bson/objectid"
1718
"github.com/mongodb/mongo-go-driver/x/bsonx"
1819
"github.com/mongodb/mongo-go-driver/x/network/command"
1920
"github.com/stretchr/testify/require"
@@ -71,6 +72,44 @@ func TestChangeStream_firstStage(t *testing.T) {
7172
require.NoError(t, err)
7273
}
7374

75+
func TestChangeStream_replaceRoot(t *testing.T) {
76+
t.Parallel()
77+
78+
if testing.Short() {
79+
t.Skip()
80+
}
81+
skipIfBelow36(t)
82+
83+
if os.Getenv("TOPOLOGY") != "replica_set" {
84+
t.Skip()
85+
}
86+
87+
coll := createTestCollection(t, nil, nil)
88+
89+
// Ensure the database is created.
90+
_, err := coll.InsertOne(context.Background(), bsonx.Doc{{"x", bsonx.Int32(7)}})
91+
require.NoError(t, err)
92+
93+
pipeline := make(bsonx.Arr, 0)
94+
pipeline = append(pipeline,
95+
bsonx.Document(bsonx.Doc{{"$replaceRoot",
96+
bsonx.Document(bsonx.Doc{{"newRoot",
97+
bsonx.Document(bsonx.Doc{{"_id", bsonx.ObjectID(objectid.New())}, {"x", bsonx.Int32(1)}})}}),
98+
}}))
99+
changes, err := coll.Watch(context.Background(), pipeline)
100+
require.NoError(t, err)
101+
102+
_, err = coll.InsertOne(context.Background(), bsonx.Doc{{"x", bsonx.Int32(4)}})
103+
require.NoError(t, err)
104+
105+
getNextChange(changes)
106+
var doc *bsonx.Doc
107+
108+
//Ensure the cursor returns an error when the resume token is changed.
109+
err = changes.Decode(&doc)
110+
require.Equal(t, err, ErrMissingResumeToken)
111+
}
112+
74113
func TestChangeStream_noCustomStandaloneError(t *testing.T) {
75114
t.Parallel()
76115

0 commit comments

Comments
 (0)