Skip to content

Commit 37b6cf6

Browse files
committed
Change resume token options to be empty interfaces
The ResumeToken property and SetResumeToken method on options.ChangeStreamOptions now uses an empty interface instead of a bsonx.Doc. GODRIVER-644 Change-Id: Ibead5375c64061185d282222f991e83cac9fd693
1 parent 9929f17 commit 37b6cf6

File tree

2 files changed

+8
-6
lines changed

2 files changed

+8
-6
lines changed

mongo/change_stream.go

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -71,7 +71,11 @@ func newChangeStream(ctx context.Context, coll *Collection, pipeline interface{}
7171
aggOpts.MaxAwaitTime = csOpts.MaxAwaitTime
7272
}
7373
if csOpts.ResumeAfter != nil {
74-
changeStreamOptions = append(changeStreamOptions, bsonx.Elem{"resumeAfter", bsonx.Document(csOpts.ResumeAfter)})
74+
rt, err := transformDocument(coll.registry, csOpts.ResumeAfter)
75+
if err != nil {
76+
return nil, err
77+
}
78+
changeStreamOptions = append(changeStreamOptions, bsonx.Elem{"resumeAfter", bsonx.Document(rt)})
7579
}
7680

7781
pipelineArr = append(pipelineArr, bsonx.Val{})

mongo/options/changestreamoptions.go

Lines changed: 3 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -8,8 +8,6 @@ package options
88

99
import (
1010
"time"
11-
12-
"github.com/mongodb/mongo-go-driver/x/bsonx"
1311
)
1412

1513
// ChangeStreamOptions represents all possible options to a change stream
@@ -18,7 +16,7 @@ type ChangeStreamOptions struct {
1816
Collation *Collation // Specifies a collation
1917
FullDocument *FullDocument // When set to ‘updateLookup’, the change notification for partial updates will include both a delta describing the changes to the document, as well as a copy of the entire document that was changed from some time after the change occurred.
2018
MaxAwaitTime *time.Duration // The maximum amount of time for the server to wait on new documents to satisfy a change stream query
21-
ResumeAfter bsonx.Doc // Specifies the logical starting point for the new change stream
19+
ResumeAfter interface{} // Specifies the logical starting point for the new change stream
2220
}
2321

2422
// ChangeStream returns a pointer to a new ChangeStreamOptions
@@ -55,8 +53,8 @@ func (cso *ChangeStreamOptions) SetMaxAwaitTime(d time.Duration) *ChangeStreamOp
5553
}
5654

5755
// SetResumeAfter specifies the logical starting point for the new change stream
58-
func (cso *ChangeStreamOptions) SetResumeAfter(d bsonx.Doc) *ChangeStreamOptions {
59-
cso.ResumeAfter = d
56+
func (cso *ChangeStreamOptions) SetResumeAfter(rt interface{}) *ChangeStreamOptions {
57+
cso.ResumeAfter = rt
6058
return cso
6159
}
6260

0 commit comments

Comments
 (0)