@@ -17,6 +17,7 @@ import (
17
17
"github.com/mongodb/mongo-go-driver/mongo/readconcern"
18
18
"github.com/mongodb/mongo-go-driver/mongo/readpref"
19
19
"github.com/mongodb/mongo-go-driver/x/bsonx"
20
+ "github.com/mongodb/mongo-go-driver/x/bsonx/bsoncore"
20
21
"github.com/mongodb/mongo-go-driver/x/mongo/driver"
21
22
"github.com/mongodb/mongo-go-driver/x/mongo/driver/session"
22
23
"github.com/mongodb/mongo-go-driver/x/network/command"
@@ -34,14 +35,25 @@ var ErrMissingResumeToken = errors.New("cannot provide resume functionality when
34
35
// ErrNilCursor indicates that the cursor for the change stream is nil.
35
36
var ErrNilCursor = errors .New ("cursor is nil" )
36
37
37
- type changeStream struct {
38
- cmd bsonx.Doc // aggregate command to run to create stream and rebuild cursor
39
- pipeline bsonx.Arr
40
- options * options.ChangeStreamOptions
41
- coll * Collection
42
- db * Database
43
- ns command.Namespace
44
- cursor Cursor
38
+ // ChangeStream instances iterate a stream of change documents. Each document can be decoded via the
39
+ // Decode method. Resume tokens should be retrieved via the ResumeToken method and can be stored to
40
+ // resume the change stream at a specific point in time.
41
+ //
42
+ // A typical usage of the ChangeStream type would be:
43
+ type ChangeStream struct {
44
+ // Current is the BSON bytes of the current change document. This property is only valid until
45
+ // the next call to Next or Close. If continued access is required to the bson.Raw, you must
46
+ // make a copy of it.
47
+ Current bson.Raw
48
+
49
+ cmd bsonx.Doc // aggregate command to run to create stream and rebuild cursor
50
+ pipeline bsonx.Arr
51
+ options * options.ChangeStreamOptions
52
+ coll * Collection
53
+ db * Database
54
+ ns command.Namespace
55
+ cursor * Cursor
56
+ cursorOpts bsonx.Doc
45
57
46
58
resumeToken bsonx.Doc
47
59
err error
@@ -53,7 +65,7 @@ type changeStream struct {
53
65
registry * bsoncodec.Registry
54
66
}
55
67
56
- func (cs * changeStream ) replaceOptions (desc description.SelectedServer ) {
68
+ func (cs * ChangeStream ) replaceOptions (desc description.SelectedServer ) {
57
69
// if cs has not received any changes and resumeAfter not specified and max wire version >= 7, run known agg cmd
58
70
// with startAtOperationTime set to startAtOperationTime provided by user or saved from initial agg
59
71
// must not send resumeAfter key
@@ -156,7 +168,7 @@ func parseOptions(csType StreamType, opts *options.ChangeStreamOptions, registry
156
168
return pipelineDoc , cursorDoc , optsDoc , nil
157
169
}
158
170
159
- func (cs * changeStream ) runCommand (ctx context.Context , replaceOptions bool ) error {
171
+ func (cs * ChangeStream ) runCommand (ctx context.Context , replaceOptions bool ) error {
160
172
ss , err := cs .client .topology .SelectServer (ctx , cs .db .writeSelector )
161
173
if err != nil {
162
174
return err
@@ -198,7 +210,12 @@ func (cs *changeStream) runCommand(ctx context.Context, replaceOptions bool) err
198
210
return err
199
211
}
200
212
201
- cursor , err := ss .BuildCursor (rdr , readCmd .Session , readCmd .Clock )
213
+ batchCursor , err := driver .NewBatchCursor (bsoncore .Document (rdr ), readCmd .Session , readCmd .Clock , ss .Server )
214
+ if err != nil {
215
+ cs .sess .EndSession (ctx )
216
+ return err
217
+ }
218
+ cursor , err := newCursor (batchCursor , cs .registry )
202
219
if err != nil {
203
220
cs .sess .EndSession (ctx )
204
221
return err
@@ -216,7 +233,7 @@ func (cs *changeStream) runCommand(ctx context.Context, replaceOptions bool) err
216
233
}
217
234
218
235
func newChangeStream (ctx context.Context , coll * Collection , pipeline interface {},
219
- opts ... * options.ChangeStreamOptions ) (* changeStream , error ) {
236
+ opts ... * options.ChangeStreamOptions ) (* ChangeStream , error ) {
220
237
221
238
pipelineArr , err := transformAggregatePipeline (coll .registry , pipeline )
222
239
if err != nil {
@@ -245,7 +262,7 @@ func newChangeStream(ctx context.Context, coll *Collection, pipeline interface{}
245
262
}
246
263
cmd = append (cmd , optsDoc ... )
247
264
248
- cs := & changeStream {
265
+ cs := & ChangeStream {
249
266
client : coll .client ,
250
267
sess : sess ,
251
268
cmd : cmd ,
@@ -257,6 +274,7 @@ func newChangeStream(ctx context.Context, coll *Collection, pipeline interface{}
257
274
readConcern : coll .readConcern ,
258
275
options : csOpts ,
259
276
registry : coll .registry ,
277
+ cursorOpts : cursorDoc ,
260
278
}
261
279
262
280
err = cs .runCommand (ctx , false )
@@ -268,7 +286,7 @@ func newChangeStream(ctx context.Context, coll *Collection, pipeline interface{}
268
286
}
269
287
270
288
func newDbChangeStream (ctx context.Context , db * Database , pipeline interface {},
271
- opts ... * options.ChangeStreamOptions ) (* changeStream , error ) {
289
+ opts ... * options.ChangeStreamOptions ) (* ChangeStream , error ) {
272
290
273
291
pipelineArr , err := transformAggregatePipeline (db .registry , pipeline )
274
292
if err != nil {
@@ -297,7 +315,7 @@ func newDbChangeStream(ctx context.Context, db *Database, pipeline interface{},
297
315
}
298
316
cmd = append (cmd , optsDoc ... )
299
317
300
- cs := & changeStream {
318
+ cs := & ChangeStream {
301
319
client : db .client ,
302
320
db : db ,
303
321
sess : sess ,
@@ -308,6 +326,7 @@ func newDbChangeStream(ctx context.Context, db *Database, pipeline interface{},
308
326
readConcern : db .readConcern ,
309
327
options : csOpts ,
310
328
registry : db .registry ,
329
+ cursorOpts : cursorDoc ,
311
330
}
312
331
313
332
err = cs .runCommand (ctx , false )
@@ -319,7 +338,7 @@ func newDbChangeStream(ctx context.Context, db *Database, pipeline interface{},
319
338
}
320
339
321
340
func newClientChangeStream (ctx context.Context , client * Client , pipeline interface {},
322
- opts ... * options.ChangeStreamOptions ) (* changeStream , error ) {
341
+ opts ... * options.ChangeStreamOptions ) (* ChangeStream , error ) {
323
342
324
343
pipelineArr , err := transformAggregatePipeline (client .registry , pipeline )
325
344
if err != nil {
@@ -348,7 +367,7 @@ func newClientChangeStream(ctx context.Context, client *Client, pipeline interfa
348
367
}
349
368
cmd = append (cmd , optsDoc ... )
350
369
351
- cs := & changeStream {
370
+ cs := & ChangeStream {
352
371
client : client ,
353
372
db : client .Database ("admin" ),
354
373
sess : sess ,
@@ -359,6 +378,7 @@ func newClientChangeStream(ctx context.Context, client *Client, pipeline interfa
359
378
readConcern : client .readConcern ,
360
379
options : csOpts ,
361
380
registry : client .registry ,
381
+ cursorOpts : cursorDoc ,
362
382
}
363
383
364
384
err = cs .runCommand (ctx , false )
@@ -369,13 +389,8 @@ func newClientChangeStream(ctx context.Context, client *Client, pipeline interfa
369
389
return cs , nil
370
390
}
371
391
372
- func (cs * changeStream ) storeResumeToken () error {
373
- br , err := cs .cursor .DecodeBytes ()
374
- if err != nil {
375
- return err
376
- }
377
-
378
- idVal , err := br .LookupErr ("_id" )
392
+ func (cs * ChangeStream ) storeResumeToken () error {
393
+ idVal , err := cs .cursor .Current .LookupErr ("_id" )
379
394
if err != nil {
380
395
_ = cs .Close (context .Background ())
381
396
return ErrMissingResumeToken
@@ -397,15 +412,18 @@ func (cs *changeStream) storeResumeToken() error {
397
412
return nil
398
413
}
399
414
400
- func (cs * changeStream ) ID () int64 {
415
+ // ID returns the cursor ID for this change stream.
416
+ func (cs * ChangeStream ) ID () int64 {
401
417
if cs .cursor == nil {
402
418
return 0
403
419
}
404
420
405
421
return cs .cursor .ID ()
406
422
}
407
423
408
- func (cs * changeStream ) Next (ctx context.Context ) bool {
424
+ // Next gets the next result from this change stream. Returns true if there were no errors and the next
425
+ // result is available for decoding.
426
+ func (cs * ChangeStream ) Next (ctx context.Context ) bool {
409
427
// execute in a loop to retry resume-able errors and advance the underlying cursor
410
428
for {
411
429
if cs .cursor == nil {
@@ -419,6 +437,7 @@ func (cs *changeStream) Next(ctx context.Context) bool {
419
437
return false
420
438
}
421
439
440
+ cs .Current = cs .cursor .Current
422
441
return true
423
442
}
424
443
@@ -447,31 +466,17 @@ func (cs *changeStream) Next(ctx context.Context) bool {
447
466
}
448
467
}
449
468
450
- func (cs * changeStream ) Decode (out interface {}) error {
469
+ // Decode will decode the current document into val.
470
+ func (cs * ChangeStream ) Decode (out interface {}) error {
451
471
if cs .cursor == nil {
452
472
return ErrNilCursor
453
473
}
454
474
455
- br , err := cs .DecodeBytes ()
456
- if err != nil {
457
- return err
458
- }
459
-
460
- return bson .UnmarshalWithRegistry (cs .registry , br , out )
461
- }
462
-
463
- func (cs * changeStream ) DecodeBytes () (bson.Raw , error ) {
464
- if cs .cursor == nil {
465
- return nil , ErrNilCursor
466
- }
467
- if cs .err != nil {
468
- return nil , cs .err
469
- }
470
-
471
- return cs .cursor .DecodeBytes ()
475
+ return bson .UnmarshalWithRegistry (cs .registry , cs .Current , out )
472
476
}
473
477
474
- func (cs * changeStream ) Err () error {
478
+ // Err returns the current error.
479
+ func (cs * ChangeStream ) Err () error {
475
480
if cs .err != nil {
476
481
return cs .err
477
482
}
@@ -482,7 +487,8 @@ func (cs *changeStream) Err() error {
482
487
return cs .cursor .Err ()
483
488
}
484
489
485
- func (cs * changeStream ) Close (ctx context.Context ) error {
490
+ // Close closes this cursor.
491
+ func (cs * ChangeStream ) Close (ctx context.Context ) error {
486
492
if cs .cursor == nil {
487
493
return nil // cursor is already closed
488
494
}
0 commit comments