Skip to content

Commit 33f5dc2

Browse files
Divjot Aroraskriptble
authored andcommitted
Fix MaxAwaitTimeMS implementation for change streams.
GODRIVER-805 Change-Id: Ie31d84652485d59d1865bd11f75de1dcb777c327
1 parent dbba268 commit 33f5dc2

File tree

2 files changed

+49
-27
lines changed

2 files changed

+49
-27
lines changed

mongo/change_stream.go

Lines changed: 26 additions & 21 deletions
Original file line numberDiff line numberDiff line change
@@ -46,14 +46,15 @@ type ChangeStream struct {
4646
// make a copy of it.
4747
Current bson.Raw
4848

49-
cmd bsonx.Doc // aggregate command to run to create stream and rebuild cursor
50-
pipeline bsonx.Arr
51-
options *options.ChangeStreamOptions
52-
coll *Collection
53-
db *Database
54-
ns command.Namespace
55-
cursor *Cursor
56-
cursorOpts bsonx.Doc
49+
cmd bsonx.Doc // aggregate command to run to create stream and rebuild cursor
50+
pipeline bsonx.Arr
51+
options *options.ChangeStreamOptions
52+
coll *Collection
53+
db *Database
54+
ns command.Namespace
55+
cursor *Cursor
56+
cursorOpts bsonx.Doc
57+
getMoreOpts bsonx.Doc
5758

5859
resumeToken bsonx.Doc
5960
err error
@@ -88,11 +89,12 @@ func (cs *ChangeStream) replaceOptions(desc description.SelectedServer) {
8889

8990
// Create options docs for the pipeline and cursor
9091
func createCmdDocs(csType StreamType, opts *options.ChangeStreamOptions, registry *bsoncodec.Registry) (bsonx.Doc,
91-
bsonx.Doc, bsonx.Doc, error) {
92+
bsonx.Doc, bsonx.Doc, bsonx.Doc, error) {
9293

9394
pipelineDoc := bsonx.Doc{}
9495
cursorDoc := bsonx.Doc{}
9596
optsDoc := bsonx.Doc{}
97+
getMoreOptsDoc := bsonx.Doc{}
9698

9799
if csType == ClientStream {
98100
pipelineDoc = pipelineDoc.Append("allChangesForCluster", bsonx.Boolean(true))
@@ -109,12 +111,12 @@ func createCmdDocs(csType StreamType, opts *options.ChangeStreamOptions, registr
109111
}
110112
if opts.MaxAwaitTime != nil {
111113
ms := int64(time.Duration(*opts.MaxAwaitTime) / time.Millisecond)
112-
pipelineDoc = pipelineDoc.Append("maxAwaitTimeMS", bsonx.Int64(ms))
114+
getMoreOptsDoc = getMoreOptsDoc.Append("maxTimeMS", bsonx.Int64(ms))
113115
}
114116
if opts.ResumeAfter != nil {
115117
rt, err := transformDocument(registry, opts.ResumeAfter)
116118
if err != nil {
117-
return nil, nil, nil, err
119+
return nil, nil, nil, nil, err
118120
}
119121

120122
pipelineDoc = pipelineDoc.Append("resumeAfter", bsonx.Document(rt))
@@ -124,7 +126,7 @@ func createCmdDocs(csType StreamType, opts *options.ChangeStreamOptions, registr
124126
bsonx.Timestamp(opts.StartAtOperationTime.T, opts.StartAtOperationTime.I))
125127
}
126128

127-
return pipelineDoc, cursorDoc, optsDoc, nil
129+
return pipelineDoc, cursorDoc, optsDoc, getMoreOptsDoc, nil
128130
}
129131

130132
func getSession(ctx context.Context, client *Client) (Session, error) {
@@ -154,18 +156,18 @@ func getSession(ctx context.Context, client *Client) (Session, error) {
154156
}
155157

156158
func parseOptions(csType StreamType, opts *options.ChangeStreamOptions, registry *bsoncodec.Registry) (bsonx.Doc,
157-
bsonx.Doc, bsonx.Doc, error) {
159+
bsonx.Doc, bsonx.Doc, bsonx.Doc, error) {
158160

159161
if opts.FullDocument == nil {
160162
opts = opts.SetFullDocument(options.Default)
161163
}
162164

163-
pipelineDoc, cursorDoc, optsDoc, err := createCmdDocs(csType, opts, registry)
165+
pipelineDoc, cursorDoc, optsDoc, getMoreOptsDoc, err := createCmdDocs(csType, opts, registry)
164166
if err != nil {
165-
return nil, nil, nil, err
167+
return nil, nil, nil, nil, err
166168
}
167169

168-
return pipelineDoc, cursorDoc, optsDoc, nil
170+
return pipelineDoc, cursorDoc, optsDoc, getMoreOptsDoc, nil
169171
}
170172

171173
func (cs *ChangeStream) runCommand(ctx context.Context, replaceOptions bool) error {
@@ -183,7 +185,7 @@ func (cs *ChangeStream) runCommand(ctx context.Context, replaceOptions bool) err
183185

184186
if replaceOptions {
185187
cs.replaceOptions(desc)
186-
optionsDoc, _, _, err := createCmdDocs(cs.streamType, cs.options, cs.registry)
188+
optionsDoc, _, _, _, err := createCmdDocs(cs.streamType, cs.options, cs.registry)
187189
if err != nil {
188190
return err
189191
}
@@ -210,7 +212,7 @@ func (cs *ChangeStream) runCommand(ctx context.Context, replaceOptions bool) err
210212
return err
211213
}
212214

213-
batchCursor, err := driver.NewBatchCursor(bsoncore.Document(rdr), readCmd.Session, readCmd.Clock, ss.Server)
215+
batchCursor, err := driver.NewBatchCursor(bsoncore.Document(rdr), readCmd.Session, readCmd.Clock, ss.Server, cs.getMoreOpts...)
214216
if err != nil {
215217
cs.sess.EndSession(ctx)
216218
return err
@@ -241,7 +243,7 @@ func newChangeStream(ctx context.Context, coll *Collection, pipeline interface{}
241243
}
242244

243245
csOpts := options.MergeChangeStreamOptions(opts...)
244-
pipelineDoc, cursorDoc, optsDoc, err := parseOptions(CollectionStream, csOpts, coll.registry)
246+
pipelineDoc, cursorDoc, optsDoc, getMoreDoc, err := parseOptions(CollectionStream, csOpts, coll.registry)
245247
if err != nil {
246248
return nil, err
247249
}
@@ -275,6 +277,7 @@ func newChangeStream(ctx context.Context, coll *Collection, pipeline interface{}
275277
options: csOpts,
276278
registry: coll.registry,
277279
cursorOpts: cursorDoc,
280+
getMoreOpts: getMoreDoc,
278281
}
279282

280283
err = cs.runCommand(ctx, false)
@@ -294,7 +297,7 @@ func newDbChangeStream(ctx context.Context, db *Database, pipeline interface{},
294297
}
295298

296299
csOpts := options.MergeChangeStreamOptions(opts...)
297-
pipelineDoc, cursorDoc, optsDoc, err := parseOptions(DatabaseStream, csOpts, db.registry)
300+
pipelineDoc, cursorDoc, optsDoc, getMoreDoc, err := parseOptions(DatabaseStream, csOpts, db.registry)
298301
if err != nil {
299302
return nil, err
300303
}
@@ -327,6 +330,7 @@ func newDbChangeStream(ctx context.Context, db *Database, pipeline interface{},
327330
options: csOpts,
328331
registry: db.registry,
329332
cursorOpts: cursorDoc,
333+
getMoreOpts: getMoreDoc,
330334
}
331335

332336
err = cs.runCommand(ctx, false)
@@ -346,7 +350,7 @@ func newClientChangeStream(ctx context.Context, client *Client, pipeline interfa
346350
}
347351

348352
csOpts := options.MergeChangeStreamOptions(opts...)
349-
pipelineDoc, cursorDoc, optsDoc, err := parseOptions(ClientStream, csOpts, client.registry)
353+
pipelineDoc, cursorDoc, optsDoc, getMoreDoc, err := parseOptions(ClientStream, csOpts, client.registry)
350354
if err != nil {
351355
return nil, err
352356
}
@@ -379,6 +383,7 @@ func newClientChangeStream(ctx context.Context, client *Client, pipeline interfa
379383
options: csOpts,
380384
registry: client.registry,
381385
cursorOpts: cursorDoc,
386+
getMoreOpts: getMoreDoc,
382387
}
383388

384389
err = cs.runCommand(ctx, false)

mongo/change_stream_test.go

Lines changed: 23 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -8,6 +8,7 @@ package mongo
88

99
import (
1010
"context"
11+
"github.com/mongodb/mongo-go-driver/mongo/options"
1112
"os"
1213
"testing"
1314
"time"
@@ -79,7 +80,7 @@ func skipIfBelow36(t *testing.T) {
7980
}
8081
}
8182

82-
func createStream(t *testing.T, client *Client, dbName string, collName string, pipeline interface{}) (*Collection, *ChangeStream) {
83+
func createStream(t *testing.T, client *Client, dbName string, collName string, pipeline interface{}, opts ...*options.ChangeStreamOptions) (*Collection, *ChangeStream) {
8384
client.writeConcern = wcMajority
8485
db := client.Database(dbName)
8586
err := db.Drop(ctx)
@@ -90,7 +91,7 @@ func createStream(t *testing.T, client *Client, dbName string, collName string,
9091
_, err = coll.InsertOne(ctx, collectionStartingDoc) // create collection on server for 3.6
9192

9293
drainChannels()
93-
stream, err := coll.Watch(ctx, pipeline)
94+
stream, err := coll.Watch(ctx, pipeline, opts...)
9495
testhelpers.RequireNil(t, err, "error creating stream: %s", err)
9596

9697
return coll, stream
@@ -105,20 +106,20 @@ func skipIfBelow32(t *testing.T) {
105106
}
106107
}
107108

108-
func createCollectionStream(t *testing.T, dbName string, collName string, pipeline interface{}) (*Collection, *ChangeStream) {
109+
func createCollectionStream(t *testing.T, dbName string, collName string, pipeline interface{}, opts ...*options.ChangeStreamOptions) (*Collection, *ChangeStream) {
109110
if pipeline == nil {
110111
pipeline = Pipeline{}
111112
}
112113
client := createTestClient(t)
113-
return createStream(t, client, dbName, collName, pipeline)
114+
return createStream(t, client, dbName, collName, pipeline, opts...)
114115
}
115116

116-
func createMonitoredStream(t *testing.T, dbName string, collName string, pipeline interface{}) (*Collection, *ChangeStream) {
117+
func createMonitoredStream(t *testing.T, dbName string, collName string, pipeline interface{}, opts ...*options.ChangeStreamOptions) (*Collection, *ChangeStream) {
117118
if pipeline == nil {
118119
pipeline = Pipeline{}
119120
}
120121
client := createMonitoredClient(t, monitor)
121-
return createStream(t, client, dbName, collName, pipeline)
122+
return createStream(t, client, dbName, collName, pipeline, opts...)
122123
}
123124

124125
func compareOptions(t *testing.T, expected bsonx.Doc, actual bsonx.Doc) {
@@ -675,6 +676,22 @@ func TestChangeStream_ReplicaSet(t *testing.T) {
675676
killChangeStreamCursor(t, cs)
676677
ensureResumeToken(t, coll, stream)
677678
})
679+
t.Run("MaxAwaitTimeMS", func(t *testing.T) {
680+
coll, stream := createMonitoredStream(t, "MaxAwaitTimeMSDB", "MaxAwaitTimeMSColl", nil, options.ChangeStream().SetMaxAwaitTime(100*time.Millisecond))
681+
drainChannels()
682+
_, err := coll.InsertOne(ctx, bsonx.Doc{{"x", bsonx.Int32(1)}})
683+
testhelpers.RequireNil(t, err, "error inserting doc: %v", err)
684+
drainChannels()
685+
686+
if !stream.Next(ctx) {
687+
t.Fatal("Next returned false, expected true")
688+
}
689+
690+
e := <-startedChan
691+
if _, err := e.Command.LookupErr("maxTimeMS"); err != nil {
692+
t.Fatalf("maxTimeMS not found in getMore command")
693+
}
694+
})
678695
}
679696

680697
// ensure that a resume token has been recorded by a change stream

0 commit comments

Comments
 (0)