Skip to content

Commit 4aec8e2

Browse files
author
Divjot Arora
authored
GODRIVER-1383 ChangeStream and Cursor documentation (#229)
1 parent 058ad2b commit 4aec8e2

File tree

3 files changed

+220
-74
lines changed

3 files changed

+220
-74
lines changed

mongo/change_stream.go

Lines changed: 35 additions & 26 deletions
Original file line numberDiff line numberDiff line change
@@ -31,19 +31,18 @@ const errorInterrupted int32 = 11601
3131
const errorCappedPositionLost int32 = 136
3232
const errorCursorKilled int32 = 237
3333

34-
// ErrMissingResumeToken indicates that a change stream notification from the server did not
35-
// contain a resume token.
34+
// ErrMissingResumeToken indicates that a change stream notification from the server did not contain a resume token.
3635
var ErrMissingResumeToken = errors.New("cannot provide resume functionality when the resume token is missing")
3736

38-
// ErrNilCursor indicates that the cursor for the change stream is nil.
37+
// ErrNilCursor indicates that the underlying cursor for the change stream is nil.
3938
var ErrNilCursor = errors.New("cursor is nil")
4039

41-
// ChangeStream instances iterate a stream of change documents. Each document can be decoded via the
42-
// Decode method. Resume tokens should be retrieved via the ResumeToken method and can be stored to
43-
// resume the change stream at a specific point in time.
44-
//
45-
// A typical usage of the ChangeStream type would be:
40+
// ChangeStream is used to iterate over a stream of events. Each event can be decoded into a Go type via the Decode
41+
// method or accessed as raw BSON via the Current field. For more information about change streams, see
42+
// https://docs.mongodb.com/manual/changeStreams/.
4643
type ChangeStream struct {
44+
// Current is the BSON bytes of the current event. This property is only valid until the next call to Next or
45+
// TryNext. If continued access is required, a copy must be made.
4746
Current bson.Raw
4847

4948
aggregate *operation.Aggregate
@@ -405,15 +404,16 @@ func (cs *ChangeStream) replaceOptions(ctx context.Context, wireVersion *descrip
405404
cs.options.SetStartAtOperationTime(nil)
406405
}
407406

408-
// ID returns the cursor ID for this change stream.
407+
// ID returns the ID for this change stream, or 0 if the cursor has been closed or exhausted.
409408
func (cs *ChangeStream) ID() int64 {
410409
if cs.cursor == nil {
411410
return 0
412411
}
413412
return cs.cursor.ID()
414413
}
415414

416-
// Decode will decode the current document into val.
415+
// Decode will unmarshal the current event document into val and return any errors from the unmarshalling process
416+
// without any modification. If val is nil or is a typed nil, an error will be returned.
417417
func (cs *ChangeStream) Decode(val interface{}) error {
418418
if cs.cursor == nil {
419419
return ErrNilCursor
@@ -422,7 +422,7 @@ func (cs *ChangeStream) Decode(val interface{}) error {
422422
return bson.UnmarshalWithRegistry(cs.registry, cs.Current, val)
423423
}
424424

425-
// Err returns the current error.
425+
// Err returns the last error seen by the change stream, or nil if no errors has occurred.
426426
func (cs *ChangeStream) Err() error {
427427
if cs.err != nil {
428428
return replaceErrors(cs.err)
@@ -434,7 +434,8 @@ func (cs *ChangeStream) Err() error {
434434
return replaceErrors(cs.cursor.Err())
435435
}
436436

437-
// Close closes this cursor.
437+
// Close closes this change stream and the underlying cursor. Next and TryNext must not be called after Close has been
438+
// called. Close is idempotent. After the first call, any subsequent calls will not change the state.
438439
func (cs *ChangeStream) Close(ctx context.Context) error {
439440
if ctx == nil {
440441
ctx = context.Background()
@@ -451,26 +452,34 @@ func (cs *ChangeStream) Close(ctx context.Context) error {
451452
return cs.Err()
452453
}
453454

454-
// ResumeToken returns the last cached resume token for this change stream.
455+
// ResumeToken returns the last cached resume token for this change stream, or nil if a resume token has not been
456+
// stored.
455457
func (cs *ChangeStream) ResumeToken() bson.Raw {
456458
return cs.resumeToken
457459
}
458460

459-
// Next gets the next result from this change stream. Returns true if there were no errors and the next
460-
// result is available for decoding. Next blocks until an event is available for decoding or ctx expires.
461-
// If the given context expires during execution, the stream's error will be set and the change stream may be in an
462-
// invalid state and should be re-created. If Next returns false, it must not be called again.
461+
// Next gets the next event for this change stream. It returns true if there were no errors and the next event document
462+
// is available.
463+
//
464+
// Next blocks until an event is available, an error occurs, or ctx expires. If ctx expires, the error
465+
// will be set to ctx.Err(). In an error case, Next will return false.
466+
//
467+
// If Next returns false, subsequent calls will also return false.
463468
func (cs *ChangeStream) Next(ctx context.Context) bool {
464469
return cs.next(ctx, false)
465470
}
466471

467-
// TryNext attempts to get the next result from this change stream. It returns true if there were no errors and the next
468-
// result is available for decoding. It returns false if the change stream was closed by the server, there was an
469-
// error getting more results from the server, the server returned an empty batch of events, or the given context expires.
470-
// If an error occurred or the stream was closed (can be checked with cs.Err() != nil || cs.ID() == 0), TryNext must
471-
// not be called again. If the given context expires during execution, the stream's error will be set and the change
472-
// stream may be in an invalid state and should be re-created.
473-
// Added in version 1.2.0.
472+
// TryNext attempts to get the next event for this change stream. It returns true if there were no errors and the next
473+
// event document is available.
474+
//
475+
// TryNext returns false if the change stream is closed by the server, an error occurs when getting changes from the
476+
// server, the next change is not yet available, or ctx expires. If ctx expires, the error will be set to ctx.Err().
477+
//
478+
// If TryNext returns false and an error occurred or the change stream was closed
479+
// (i.e. cs.Err() != nil || cs.ID() == 0), subsequent attempts will also return false. Otherwise, it is safe to call
480+
// TryNext again until a change is available.
481+
//
482+
// This method requires driver version >= 1.2.0.
474483
func (cs *ChangeStream) TryNext(ctx context.Context) bool {
475484
return cs.next(ctx, true)
476485
}
@@ -549,11 +558,11 @@ func (cs *ChangeStream) emptyBatch() bool {
549558
return cs.cursor.Batch().Empty()
550559
}
551560

552-
// StreamType represents the type of a change stream.
561+
// StreamType represents the cluster type against which a ChangeStream was created.
553562
type StreamType uint8
554563

555564
// These constants represent valid change stream types. A change stream can be initialized over a collection, all
556-
// collections in a database, or over a whole client.
565+
// collections in a database, or over a cluster.
557566
const (
558567
CollectionStream StreamType = iota
559568
DatabaseStream

mongo/crud_examples_test.go

Lines changed: 148 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -10,6 +10,7 @@ import (
1010
"context"
1111
"fmt"
1212
"log"
13+
"sync"
1314
"time"
1415

1516
"go.mongodb.org/mongo-driver/bson"
@@ -556,3 +557,150 @@ func ExampleClient_StartSession_withTransaction() {
556557
}
557558
fmt.Printf("result: %v\n", result)
558559
}
560+
561+
// Cursor examples
562+
563+
func ExampleCursor_All() {
564+
var cursor *mongo.Cursor
565+
566+
var results []bson.M
567+
if err := cursor.All(context.TODO(), &results); err != nil {
568+
log.Fatal(err)
569+
}
570+
fmt.Println(results)
571+
}
572+
573+
func ExampleCursor_Next() {
574+
var cursor *mongo.Cursor
575+
defer cursor.Close(context.TODO())
576+
577+
// Iterate the cursor and print out each document until the cursor is exhausted or there is an error getting the
578+
// next document.
579+
for cursor.Next(context.TODO()) {
580+
// A new result variable should be declared for each document.
581+
var result bson.M
582+
if err := cursor.Decode(&result); err != nil {
583+
log.Fatal(err)
584+
}
585+
fmt.Println(result)
586+
}
587+
if err := cursor.Err(); err != nil {
588+
log.Fatal(err)
589+
}
590+
}
591+
592+
func ExampleCursor_TryNext() {
593+
var cursor *mongo.Cursor
594+
defer cursor.Close(context.TODO())
595+
596+
// Iterate the cursor and print out each document until the cursor is exhausted or there is an error getting the
597+
// next document.
598+
for {
599+
if cursor.TryNext(context.TODO()) {
600+
// A new result variable should be declared for each document.
601+
var result bson.M
602+
if err := cursor.Decode(&result); err != nil {
603+
log.Fatal(err)
604+
}
605+
fmt.Println(result)
606+
continue
607+
}
608+
609+
// If TryNext returns false, the next document is not yet available, the cursor was exhausted and was closed, or
610+
// an error occured. TryNext should only be called again for the empty batch case.
611+
if err := cursor.Err(); err != nil {
612+
log.Fatal(err)
613+
}
614+
if cursor.ID() == 0 {
615+
break
616+
}
617+
}
618+
}
619+
620+
// ChangeStream examples
621+
622+
func ExampleChangeStream_Next() {
623+
var stream *mongo.ChangeStream
624+
defer stream.Close(context.TODO())
625+
626+
// Iterate the change stream and print out each event.
627+
// Because the Next call blocks until an event is available, another way to iterate the change stream is to call
628+
// Next in a goroutine and pass in a context that can be cancelled to abort the call.
629+
630+
for stream.Next(context.TODO()) {
631+
// A new event variable should be declared for each event.
632+
var event bson.M
633+
if err := stream.Decode(&event); err != nil {
634+
log.Fatal(err)
635+
}
636+
fmt.Println(event)
637+
}
638+
if err := stream.Err(); err != nil {
639+
log.Fatal(err)
640+
}
641+
}
642+
643+
func ExampleChangeStream_TryNext() {
644+
var stream *mongo.ChangeStream
645+
defer stream.Close(context.TODO())
646+
647+
// Iterate the change stream and print out each event until the change stream is closed by the server or there is an
648+
// error getting the next event.
649+
for {
650+
if stream.TryNext(context.TODO()) {
651+
// A new event variable should be declared for each event.
652+
var event bson.M
653+
if err := stream.Decode(&event); err != nil {
654+
log.Fatal(err)
655+
}
656+
fmt.Println(event)
657+
continue
658+
}
659+
660+
// If TryNext returns false, the next change is not yet available, the change stream was closed by the server,
661+
// or an error occurred. TryNext should only be called again for the empty batch case.
662+
if err := stream.Err(); err != nil {
663+
log.Fatal(err)
664+
}
665+
if stream.ID() == 0 {
666+
break
667+
}
668+
}
669+
}
670+
671+
func ExampleChangeStream_ResumeToken() {
672+
var client *mongo.Client
673+
var stream *mongo.ChangeStream // assume stream was created via client.Watch()
674+
675+
cancelCtx, cancel := context.WithCancel(context.TODO())
676+
defer cancel()
677+
var wg sync.WaitGroup
678+
wg.Add(1)
679+
680+
// Run a goroutine to process events.
681+
go func() {
682+
for stream.Next(cancelCtx) {
683+
fmt.Println(stream.Current)
684+
}
685+
wg.Done()
686+
}()
687+
688+
// Assume client needs to be disconnected. Cancel the context being used by the goroutine to abort any
689+
// in-progres Next calls and wait for the goroutine to exit.
690+
cancel()
691+
wg.Wait()
692+
693+
// Before disconnecting the client, store the last seen resume token for the change stream.
694+
resumeToken := stream.ResumeToken()
695+
_ = client.Disconnect(context.TODO())
696+
697+
// Once a new client is created, the change stream can be re-created. Specify resumeToken as the ResumeAfter option
698+
// so only events that occurred after resumeToken will be returned.
699+
var newClient *mongo.Client
700+
opts := options.ChangeStream().SetResumeAfter(resumeToken)
701+
newStream, err := newClient.Watch(context.TODO(), mongo.Pipeline{}, opts)
702+
if err != nil {
703+
log.Fatal(err)
704+
}
705+
defer newStream.Close(context.TODO())
706+
}

0 commit comments

Comments
 (0)