@@ -2,15 +2,17 @@ package verifier
22
33import (
44 "context"
5- "errors"
65 "fmt"
76 "time"
87
98 "github.com/10gen/migration-verifier/internal/keystring"
9+ "github.com/pkg/errors"
10+ "github.com/rs/zerolog"
1011 "go.mongodb.org/mongo-driver/bson"
1112 "go.mongodb.org/mongo-driver/bson/primitive"
1213 "go.mongodb.org/mongo-driver/mongo"
1314 "go.mongodb.org/mongo-driver/mongo/options"
15+ "golang.org/x/exp/constraints"
1416)
1517
1618// ParsedEvent contains the fields of an event that we have parsed from 'bson.Raw'.
@@ -32,6 +34,19 @@ type DocKey struct {
3234 ID interface {} `bson:"_id"`
3335}
3436
37+ const (
38+ minChangeStreamPersistInterval = time .Second * 10
39+ metadataChangeStreamCollectionName = "changeStream"
40+ )
41+
42+ type UnknownEventError struct {
43+ Event * ParsedEvent
44+ }
45+
46+ func (uee UnknownEventError ) Error () string {
47+ return fmt .Sprintf ("Unknown event type: %#q" , uee .Event .OpType )
48+ }
49+
3550// HandleChangeStreamEvent performs the necessary work for change stream events that occur during
3651// operation.
3752func (verifier * Verifier ) HandleChangeStreamEvent (ctx context.Context , changeEvent * ParsedEvent ) error {
@@ -50,7 +65,7 @@ func (verifier *Verifier) HandleChangeStreamEvent(ctx context.Context, changeEve
5065 case "update" :
5166 return verifier .InsertChangeEventRecheckDoc (ctx , changeEvent )
5267 default :
53- return errors . New ( `Not supporting: "` + changeEvent . OpType + `" events` )
68+ return UnknownEventError { Event : changeEvent }
5469 }
5570}
5671
@@ -67,121 +82,241 @@ func (verifier *Verifier) GetChangeStreamFilter() []bson.D {
6782 return []bson.D {stage }
6883}
6984
70- // StartChangeStream starts the change stream.
71- func (verifier * Verifier ) StartChangeStream (ctx context.Context , startTime * primitive.Timestamp ) error {
72- streamReader := func (cs * mongo.ChangeStream ) {
73- var changeEvent ParsedEvent
74- for {
75- select {
76- // if the context is cancelled return immmediately
77- case <- ctx .Done ():
78- return
79- // if the changeStreamEnderChan has a message, we have moved to the Recheck phase, obtain
80- // the remaining changes, but when TryNext returns false, we will exit, since there should
81- // be no message until the user has guaranteed writes to the source have ended.
82- case <- verifier .changeStreamEnderChan :
83- for cs .TryNext (ctx ) {
84- if err := cs .Decode (& changeEvent ); err != nil {
85- verifier .logger .Fatal ().Err (err ).Msg ("Failed to decode change event" )
86- }
87- err := verifier .HandleChangeStreamEvent (ctx , & changeEvent )
88- if err != nil {
89- verifier .changeStreamErrChan <- err
90- verifier .logger .Fatal ().Err (err ).Msg ("Error handling change event" )
91- }
92- }
93- verifier .mux .Lock ()
94- verifier .changeStreamRunning = false
95- if verifier .lastChangeEventTime != nil {
96- verifier .srcStartAtTs = verifier .lastChangeEventTime
97- }
98- verifier .mux .Unlock ()
99- // since we have started Recheck, we must signal that we have
100- // finished the change stream changes so that Recheck can continue.
101- verifier .changeStreamDoneChan <- struct {}{}
102- // since the changeStream is exhausted, we now return
103- verifier .logger .Debug ().Msg ("Change stream is done" )
104- return
105- // the default case is that we are still in the Check phase, in the check phase we still
106- // use TryNext, but we do not exit if TryNext returns false.
107- default :
108- if next := cs .TryNext (ctx ); ! next {
109- continue
110- }
111- if err := cs .Decode (& changeEvent ); err != nil {
112- verifier .logger .Fatal ().Err (err ).Msg ("" )
113- }
114- err := verifier .HandleChangeStreamEvent (ctx , & changeEvent )
115- if err != nil {
116- verifier .changeStreamErrChan <- err
117- return
118- }
85+ func (verifier * Verifier ) iterateChangeStream (ctx context.Context , cs * mongo.ChangeStream ) {
86+ var changeEvent ParsedEvent
87+
88+ var lastPersistedTime time.Time
89+
90+ persistResumeTokenIfNeeded := func () error {
91+ if time .Since (lastPersistedTime ) <= minChangeStreamPersistInterval {
92+ return nil
93+ }
94+
95+ err := verifier .persistChangeStreamResumeToken (ctx , cs )
96+ if err == nil {
97+ lastPersistedTime = time .Now ()
98+ }
99+
100+ return err
101+ }
102+
103+ for {
104+ var err error
105+
106+ for cs .TryNext (ctx ) {
107+ if err = cs .Decode (& changeEvent ); err != nil {
108+ err = errors .Wrap (err , "failed to decode change event" )
109+ break
110+ }
111+ err = verifier .HandleChangeStreamEvent (ctx , & changeEvent )
112+ if err != nil {
113+ err = errors .Wrap (err , "failed to handle change event" )
114+ break
115+ }
116+ }
117+
118+ if cs .Err () != nil {
119+ err = errors .Wrap (
120+ cs .Err (),
121+ "change stream iteration failed" ,
122+ )
123+ }
124+
125+ if err == nil {
126+ err = persistResumeTokenIfNeeded ()
127+ }
128+
129+ if err != nil {
130+ if ! errors .Is (err , context .Canceled ) {
131+ verifier .changeStreamErrChan <- err
119132 }
133+
134+ return
135+ }
136+
137+ select {
138+ // If the changeStreamEnderChan has a message, the user has indicated that
139+ // source writes are ended. This means we should exit rather than continue
140+ // reading the change stream since there should be no more events.
141+ case <- verifier .changeStreamEnderChan :
142+ verifier .mux .Lock ()
143+ verifier .changeStreamRunning = false
144+ if verifier .lastChangeEventTime != nil {
145+ verifier .srcStartAtTs = verifier .lastChangeEventTime
146+ }
147+ verifier .mux .Unlock ()
148+ // since we have started Recheck, we must signal that we have
149+ // finished the change stream changes so that Recheck can continue.
150+ verifier .changeStreamDoneChan <- struct {}{}
151+ // since the changeStream is exhausted, we now return
152+ verifier .logger .Debug ().Msg ("Change stream is done" )
153+ return
154+ default :
120155 }
121156 }
157+ }
158+
159+ // StartChangeStream starts the change stream.
160+ func (verifier * Verifier ) StartChangeStream (ctx context.Context ) error {
122161 pipeline := verifier .GetChangeStreamFilter ()
123162 opts := options .ChangeStream ().SetMaxAwaitTime (1 * time .Second )
124- if startTime != nil {
125- opts = opts .SetStartAtOperationTime (startTime )
126- verifier .srcStartAtTs = startTime
163+
164+ savedResumeToken , err := verifier .loadChangeStreamResumeToken (ctx )
165+ if err != nil {
166+ return errors .Wrap (err , "failed to load persisted change stream resume token" )
167+ }
168+
169+ csStartLogEvent := verifier .logger .Info ()
170+
171+ if savedResumeToken != nil {
172+ logEvent := csStartLogEvent .
173+ Stringer ("resumeToken" , savedResumeToken )
174+
175+ ts , err := extractTimestampFromResumeToken (savedResumeToken )
176+ if err == nil {
177+ logEvent = addUnixTimeToLogEvent (ts .T , logEvent )
178+ } else {
179+ verifier .logger .Warn ().
180+ Err (err ).
181+ Msg ("Failed to extract timestamp from persisted resume token." )
182+ }
183+
184+ logEvent .Msg ("Starting change stream from persisted resume token." )
185+
186+ opts = opts .SetStartAfter (savedResumeToken )
187+ } else {
188+ csStartLogEvent .Msg ("Starting change stream from current source cluster time." )
127189 }
190+
128191 sess , err := verifier .srcClient .StartSession ()
129192 if err != nil {
130- return err
193+ return errors . Wrap ( err , "failed to start session" )
131194 }
132195 sctx := mongo .NewSessionContext (ctx , sess )
133196 srcChangeStream , err := verifier .srcClient .Watch (sctx , pipeline , opts )
197+ if err != nil {
198+ return errors .Wrap (err , "failed to open change stream" )
199+ }
200+
201+ err = verifier .persistChangeStreamResumeToken (ctx , srcChangeStream )
134202 if err != nil {
135203 return err
136204 }
137- if startTime == nil {
138- resumeToken := srcChangeStream .ResumeToken ()
139- if resumeToken == nil {
140- return errors .New ("Resume token is missing; cannot choose start time" )
141- }
142- // Change stream token is always a V1 keystring in the _data field
143- resumeTokenDataValue := resumeToken .Lookup ("_data" )
144- resumeTokenData , ok := resumeTokenDataValue .StringValueOK ()
145- if ! ok {
146- return fmt .Errorf ("Resume token _data is missing or the wrong type: %v" ,
147- resumeTokenDataValue .Type )
148- }
149- resumeTokenBson , err := keystring .KeystringToBson (keystring .V1 , resumeTokenData )
150- if err != nil {
151- return err
152- }
153- // First element is the cluster time we want
154- resumeTokenTime , ok := resumeTokenBson [0 ].Value .(primitive.Timestamp )
155- if ! ok {
156- return errors .New ("Resume token lacks a cluster time" )
157- }
158- verifier .srcStartAtTs = & resumeTokenTime
159205
160- // On sharded servers the resume token time can be ahead of the actual cluster time by one
161- // increment. In that case we must use the actual cluster time or we will get errors.
162- clusterTimeRaw := sess .ClusterTime ()
163- clusterTimeInner , err := clusterTimeRaw .LookupErr ("$clusterTime" )
164- if err != nil {
165- return err
166- }
167- clusterTimeTsVal , err := bson .Raw (clusterTimeInner .Value ).LookupErr ("clusterTime" )
168- if err != nil {
169- return err
170- }
171- var clusterTimeTs primitive.Timestamp
172- clusterTimeTs .T , clusterTimeTs .I , ok = clusterTimeTsVal .TimestampOK ()
173- if ! ok {
174- return errors .New ("Cluster time is not a timestamp" )
175- }
206+ csTimestamp , err := extractTimestampFromResumeToken (srcChangeStream .ResumeToken ())
207+ if err != nil {
208+ return errors .Wrap (err , "failed to extract timestamp from change stream's resume token" )
209+ }
176210
177- verifier .logger .Debug ().Msgf ("Initial cluster time is %+v" , clusterTimeTs )
178- if clusterTimeTs .Compare (resumeTokenTime ) < 0 {
179- verifier .srcStartAtTs = & clusterTimeTs
180- }
211+ clusterTime , err := getClusterTimeFromSession (sess )
212+ if err != nil {
213+ return errors .Wrap (err , "failed to read cluster time from session" )
214+ }
215+
216+ verifier .srcStartAtTs = & csTimestamp
217+ if csTimestamp .After (clusterTime ) {
218+ verifier .srcStartAtTs = & clusterTime
181219 }
220+
182221 verifier .mux .Lock ()
183222 verifier .changeStreamRunning = true
184223 verifier .mux .Unlock ()
185- go streamReader (srcChangeStream )
224+
225+ go verifier .iterateChangeStream (ctx , srcChangeStream )
226+
186227 return nil
187228}
229+
230+ func addUnixTimeToLogEvent [T constraints.Integer ](unixTime T , event * zerolog.Event ) * zerolog.Event {
231+ return event .Time ("clockTime" , time .Unix (int64 (unixTime ), int64 (0 )))
232+ }
233+
234+ func (v * Verifier ) getChangeStreamMetadataCollection () * mongo.Collection {
235+ return v .metaClient .Database (v .metaDBName ).Collection (metadataChangeStreamCollectionName )
236+ }
237+
238+ func (verifier * Verifier ) loadChangeStreamResumeToken (ctx context.Context ) (bson.Raw , error ) {
239+ coll := verifier .getChangeStreamMetadataCollection ()
240+
241+ token , err := coll .FindOne (
242+ ctx ,
243+ bson.D {{"_id" , "resumeToken" }},
244+ ).Raw ()
245+
246+ if errors .Is (err , mongo .ErrNoDocuments ) {
247+ return nil , nil
248+ }
249+
250+ return token , err
251+ }
252+
253+ func (verifier * Verifier ) persistChangeStreamResumeToken (ctx context.Context , cs * mongo.ChangeStream ) error {
254+ token := cs .ResumeToken ()
255+
256+ coll := verifier .getChangeStreamMetadataCollection ()
257+ _ , err := coll .ReplaceOne (
258+ ctx ,
259+ bson.D {{"_id" , "resumeToken" }},
260+ token ,
261+ options .Replace ().SetUpsert (true ),
262+ )
263+
264+ if err == nil {
265+ ts , err := extractTimestampFromResumeToken (token )
266+
267+ logEvent := verifier .logger .Debug ()
268+
269+ if err == nil {
270+ logEvent = addUnixTimeToLogEvent (ts .T , logEvent )
271+ } else {
272+ verifier .logger .Warn ().Err (err ).
273+ Msg ("failed to extract resume token timestamp" )
274+ }
275+
276+ logEvent .Msg ("Persisted change stream resume token." )
277+
278+ return nil
279+ }
280+
281+ return errors .Wrapf (err , "failed to persist change stream resume token (%v)" , token )
282+ }
283+
284+ func extractTimestampFromResumeToken (resumeToken bson.Raw ) (primitive.Timestamp , error ) {
285+ tokenStruct := struct {
286+ Data string `bson:"_data"`
287+ }{}
288+
289+ // Change stream token is always a V1 keystring in the _data field
290+ err := bson .Unmarshal (resumeToken , & tokenStruct )
291+ if err != nil {
292+ return primitive.Timestamp {}, errors .Wrapf (err , "failed to extract %#q from resume token (%v)" , "_data" , resumeToken )
293+ }
294+
295+ resumeTokenBson , err := keystring .KeystringToBson (keystring .V1 , tokenStruct .Data )
296+ if err != nil {
297+ return primitive.Timestamp {}, err
298+ }
299+ // First element is the cluster time we want
300+ resumeTokenTime , ok := resumeTokenBson [0 ].Value .(primitive.Timestamp )
301+ if ! ok {
302+ return primitive.Timestamp {}, errors .Errorf ("resume token data's (%+v) first element is of type %T, not a timestamp" , resumeTokenBson , resumeTokenBson [0 ].Value )
303+ }
304+
305+ return resumeTokenTime , nil
306+ }
307+
308+ func getClusterTimeFromSession (sess mongo.Session ) (primitive.Timestamp , error ) {
309+ ctStruct := struct {
310+ ClusterTime struct {
311+ ClusterTime primitive.Timestamp `bson:"clusterTime"`
312+ } `bson:"$clusterTime"`
313+ }{}
314+
315+ clusterTimeRaw := sess .ClusterTime ()
316+ err := bson .Unmarshal (sess .ClusterTime (), & ctStruct )
317+ if err != nil {
318+ return primitive.Timestamp {}, errors .Wrapf (err , "failed to find clusterTime in session cluster time document (%v)" , clusterTimeRaw )
319+ }
320+
321+ return ctStruct .ClusterTime .ClusterTime , nil
322+ }
0 commit comments