@@ -7,10 +7,12 @@ import (
77
88 "github.com/10gen/migration-verifier/internal/keystring"
99 "github.com/pkg/errors"
10+ "github.com/rs/zerolog"
1011 "go.mongodb.org/mongo-driver/bson"
1112 "go.mongodb.org/mongo-driver/bson/primitive"
1213 "go.mongodb.org/mongo-driver/mongo"
1314 "go.mongodb.org/mongo-driver/mongo/options"
15+ "golang.org/x/exp/constraints"
1416)
1517
1618// ParsedEvent contains the fields of an event that we have parsed from 'bson.Raw'.
@@ -32,6 +34,19 @@ type DocKey struct {
3234 ID interface {} `bson:"_id"`
3335}
3436
37+ const (
38+ minChangeStreamPersistInterval = time .Second * 10
39+ metadataChangeStreamCollectionName = "changeStream"
40+ )
41+
42+ type UnknownEventError struct {
43+ Event * ParsedEvent
44+ }
45+
46+ func (uee UnknownEventError ) Error () string {
47+ return fmt .Sprintf ("Unknown event type: %#q" , uee .Event .OpType )
48+ }
49+
3550// HandleChangeStreamEvent performs the necessary work for change stream events that occur during
3651// operation.
3752func (verifier * Verifier ) HandleChangeStreamEvent (ctx context.Context , changeEvent * ParsedEvent ) error {
@@ -54,7 +69,7 @@ func (verifier *Verifier) HandleChangeStreamEvent(ctx context.Context, changeEve
5469
5570 return verifier .InsertChangeEventRecheckDoc (ctx , changeEvent )
5671 default :
57- return errors . New ( `Not supporting: "` + changeEvent . OpType + `" events` )
72+ return UnknownEventError { Event : changeEvent }
5873 }
5974}
6075
@@ -71,121 +86,241 @@ func (verifier *Verifier) GetChangeStreamFilter() []bson.D {
7186 return []bson.D {stage }
7287}
7388
74- // StartChangeStream starts the change stream.
75- func (verifier * Verifier ) StartChangeStream (ctx context.Context , startTime * primitive.Timestamp ) error {
76- streamReader := func (cs * mongo.ChangeStream ) {
77- var changeEvent ParsedEvent
78- for {
79- select {
80- // if the context is cancelled return immmediately
81- case <- ctx .Done ():
82- return
83- // if the changeStreamEnderChan has a message, we have moved to the Recheck phase, obtain
84- // the remaining changes, but when TryNext returns false, we will exit, since there should
85- // be no message until the user has guaranteed writes to the source have ended.
86- case <- verifier .changeStreamEnderChan :
87- for cs .TryNext (ctx ) {
88- if err := cs .Decode (& changeEvent ); err != nil {
89- verifier .logger .Fatal ().Err (err ).Msg ("Failed to decode change event" )
90- }
91- err := verifier .HandleChangeStreamEvent (ctx , & changeEvent )
92- if err != nil {
93- verifier .changeStreamErrChan <- err
94- verifier .logger .Fatal ().Err (err ).Msg ("Error handling change event" )
95- }
96- }
97- verifier .mux .Lock ()
98- verifier .changeStreamRunning = false
99- if verifier .lastChangeEventTime != nil {
100- verifier .srcStartAtTs = verifier .lastChangeEventTime
101- }
102- verifier .mux .Unlock ()
103- // since we have started Recheck, we must signal that we have
104- // finished the change stream changes so that Recheck can continue.
105- verifier .changeStreamDoneChan <- struct {}{}
106- // since the changeStream is exhausted, we now return
107- verifier .logger .Debug ().Msg ("Change stream is done" )
108- return
109- // the default case is that we are still in the Check phase, in the check phase we still
110- // use TryNext, but we do not exit if TryNext returns false.
111- default :
112- if next := cs .TryNext (ctx ); ! next {
113- continue
114- }
115- if err := cs .Decode (& changeEvent ); err != nil {
116- verifier .logger .Fatal ().Err (err ).Msg ("" )
117- }
118- err := verifier .HandleChangeStreamEvent (ctx , & changeEvent )
119- if err != nil {
120- verifier .changeStreamErrChan <- err
121- return
122- }
89+ func (verifier * Verifier ) iterateChangeStream (ctx context.Context , cs * mongo.ChangeStream ) {
90+ var changeEvent ParsedEvent
91+
92+ var lastPersistedTime time.Time
93+
94+ persistResumeTokenIfNeeded := func () error {
95+ if time .Since (lastPersistedTime ) <= minChangeStreamPersistInterval {
96+ return nil
97+ }
98+
99+ err := verifier .persistChangeStreamResumeToken (ctx , cs )
100+ if err == nil {
101+ lastPersistedTime = time .Now ()
102+ }
103+
104+ return err
105+ }
106+
107+ for {
108+ var err error
109+
110+ for cs .TryNext (ctx ) {
111+ if err = cs .Decode (& changeEvent ); err != nil {
112+ err = errors .Wrap (err , "failed to decode change event" )
113+ break
114+ }
115+ err = verifier .HandleChangeStreamEvent (ctx , & changeEvent )
116+ if err != nil {
117+ err = errors .Wrap (err , "failed to handle change event" )
118+ break
119+ }
120+ }
121+
122+ if cs .Err () != nil {
123+ err = errors .Wrap (
124+ cs .Err (),
125+ "change stream iteration failed" ,
126+ )
127+ }
128+
129+ if err == nil {
130+ err = persistResumeTokenIfNeeded ()
131+ }
132+
133+ if err != nil {
134+ if ! errors .Is (err , context .Canceled ) {
135+ verifier .changeStreamErrChan <- err
123136 }
137+
138+ return
139+ }
140+
141+ select {
142+ // If the changeStreamEnderChan has a message, the user has indicated that
143+ // source writes are ended. This means we should exit rather than continue
144+ // reading the change stream since there should be no more events.
145+ case <- verifier .changeStreamEnderChan :
146+ verifier .mux .Lock ()
147+ verifier .changeStreamRunning = false
148+ if verifier .lastChangeEventTime != nil {
149+ verifier .srcStartAtTs = verifier .lastChangeEventTime
150+ }
151+ verifier .mux .Unlock ()
152+ // since we have started Recheck, we must signal that we have
153+ // finished the change stream changes so that Recheck can continue.
154+ verifier .changeStreamDoneChan <- struct {}{}
155+ // since the changeStream is exhausted, we now return
156+ verifier .logger .Debug ().Msg ("Change stream is done" )
157+ return
158+ default :
124159 }
125160 }
161+ }
162+
163+ // StartChangeStream starts the change stream.
164+ func (verifier * Verifier ) StartChangeStream (ctx context.Context ) error {
126165 pipeline := verifier .GetChangeStreamFilter ()
127166 opts := options .ChangeStream ().SetMaxAwaitTime (1 * time .Second )
128- if startTime != nil {
129- opts = opts .SetStartAtOperationTime (startTime )
130- verifier .srcStartAtTs = startTime
167+
168+ savedResumeToken , err := verifier .loadChangeStreamResumeToken (ctx )
169+ if err != nil {
170+ return errors .Wrap (err , "failed to load persisted change stream resume token" )
171+ }
172+
173+ csStartLogEvent := verifier .logger .Info ()
174+
175+ if savedResumeToken != nil {
176+ logEvent := csStartLogEvent .
177+ Stringer ("resumeToken" , savedResumeToken )
178+
179+ ts , err := extractTimestampFromResumeToken (savedResumeToken )
180+ if err == nil {
181+ logEvent = addUnixTimeToLogEvent (ts .T , logEvent )
182+ } else {
183+ verifier .logger .Warn ().
184+ Err (err ).
185+ Msg ("Failed to extract timestamp from persisted resume token." )
186+ }
187+
188+ logEvent .Msg ("Starting change stream from persisted resume token." )
189+
190+ opts = opts .SetStartAfter (savedResumeToken )
191+ } else {
192+ csStartLogEvent .Msg ("Starting change stream from current source cluster time." )
131193 }
194+
132195 sess , err := verifier .srcClient .StartSession ()
133196 if err != nil {
134- return err
197+ return errors . Wrap ( err , "failed to start session" )
135198 }
136199 sctx := mongo .NewSessionContext (ctx , sess )
137200 srcChangeStream , err := verifier .srcClient .Watch (sctx , pipeline , opts )
201+ if err != nil {
202+ return errors .Wrap (err , "failed to open change stream" )
203+ }
204+
205+ err = verifier .persistChangeStreamResumeToken (ctx , srcChangeStream )
138206 if err != nil {
139207 return err
140208 }
141- if startTime == nil {
142- resumeToken := srcChangeStream .ResumeToken ()
143- if resumeToken == nil {
144- return errors .New ("Resume token is missing; cannot choose start time" )
145- }
146- // Change stream token is always a V1 keystring in the _data field
147- resumeTokenDataValue := resumeToken .Lookup ("_data" )
148- resumeTokenData , ok := resumeTokenDataValue .StringValueOK ()
149- if ! ok {
150- return fmt .Errorf ("Resume token _data is missing or the wrong type: %v" ,
151- resumeTokenDataValue .Type )
152- }
153- resumeTokenBson , err := keystring .KeystringToBson (keystring .V1 , resumeTokenData )
154- if err != nil {
155- return err
156- }
157- // First element is the cluster time we want
158- resumeTokenTime , ok := resumeTokenBson [0 ].Value .(primitive.Timestamp )
159- if ! ok {
160- return errors .New ("Resume token lacks a cluster time" )
161- }
162- verifier .srcStartAtTs = & resumeTokenTime
163209
164- // On sharded servers the resume token time can be ahead of the actual cluster time by one
165- // increment. In that case we must use the actual cluster time or we will get errors.
166- clusterTimeRaw := sess .ClusterTime ()
167- clusterTimeInner , err := clusterTimeRaw .LookupErr ("$clusterTime" )
168- if err != nil {
169- return err
170- }
171- clusterTimeTsVal , err := bson .Raw (clusterTimeInner .Value ).LookupErr ("clusterTime" )
172- if err != nil {
173- return err
174- }
175- var clusterTimeTs primitive.Timestamp
176- clusterTimeTs .T , clusterTimeTs .I , ok = clusterTimeTsVal .TimestampOK ()
177- if ! ok {
178- return errors .New ("Cluster time is not a timestamp" )
179- }
210+ csTimestamp , err := extractTimestampFromResumeToken (srcChangeStream .ResumeToken ())
211+ if err != nil {
212+ return errors .Wrap (err , "failed to extract timestamp from change stream's resume token" )
213+ }
180214
181- verifier .logger .Debug ().Msgf ("Initial cluster time is %+v" , clusterTimeTs )
182- if clusterTimeTs .Compare (resumeTokenTime ) < 0 {
183- verifier .srcStartAtTs = & clusterTimeTs
184- }
215+ clusterTime , err := getClusterTimeFromSession (sess )
216+ if err != nil {
217+ return errors .Wrap (err , "failed to read cluster time from session" )
218+ }
219+
220+ verifier .srcStartAtTs = & csTimestamp
221+ if csTimestamp .After (clusterTime ) {
222+ verifier .srcStartAtTs = & clusterTime
185223 }
224+
186225 verifier .mux .Lock ()
187226 verifier .changeStreamRunning = true
188227 verifier .mux .Unlock ()
189- go streamReader (srcChangeStream )
228+
229+ go verifier .iterateChangeStream (ctx , srcChangeStream )
230+
190231 return nil
191232}
233+
234+ func addUnixTimeToLogEvent [T constraints.Integer ](unixTime T , event * zerolog.Event ) * zerolog.Event {
235+ return event .Time ("clockTime" , time .Unix (int64 (unixTime ), int64 (0 )))
236+ }
237+
238+ func (v * Verifier ) getChangeStreamMetadataCollection () * mongo.Collection {
239+ return v .metaClient .Database (v .metaDBName ).Collection (metadataChangeStreamCollectionName )
240+ }
241+
242+ func (verifier * Verifier ) loadChangeStreamResumeToken (ctx context.Context ) (bson.Raw , error ) {
243+ coll := verifier .getChangeStreamMetadataCollection ()
244+
245+ token , err := coll .FindOne (
246+ ctx ,
247+ bson.D {{"_id" , "resumeToken" }},
248+ ).Raw ()
249+
250+ if errors .Is (err , mongo .ErrNoDocuments ) {
251+ return nil , nil
252+ }
253+
254+ return token , err
255+ }
256+
257+ func (verifier * Verifier ) persistChangeStreamResumeToken (ctx context.Context , cs * mongo.ChangeStream ) error {
258+ token := cs .ResumeToken ()
259+
260+ coll := verifier .getChangeStreamMetadataCollection ()
261+ _ , err := coll .ReplaceOne (
262+ ctx ,
263+ bson.D {{"_id" , "resumeToken" }},
264+ token ,
265+ options .Replace ().SetUpsert (true ),
266+ )
267+
268+ if err == nil {
269+ ts , err := extractTimestampFromResumeToken (token )
270+
271+ logEvent := verifier .logger .Debug ()
272+
273+ if err == nil {
274+ logEvent = addUnixTimeToLogEvent (ts .T , logEvent )
275+ } else {
276+ verifier .logger .Warn ().Err (err ).
277+ Msg ("failed to extract resume token timestamp" )
278+ }
279+
280+ logEvent .Msg ("Persisted change stream resume token." )
281+
282+ return nil
283+ }
284+
285+ return errors .Wrapf (err , "failed to persist change stream resume token (%v)" , token )
286+ }
287+
288+ func extractTimestampFromResumeToken (resumeToken bson.Raw ) (primitive.Timestamp , error ) {
289+ tokenStruct := struct {
290+ Data string `bson:"_data"`
291+ }{}
292+
293+ // Change stream token is always a V1 keystring in the _data field
294+ err := bson .Unmarshal (resumeToken , & tokenStruct )
295+ if err != nil {
296+ return primitive.Timestamp {}, errors .Wrapf (err , "failed to extract %#q from resume token (%v)" , "_data" , resumeToken )
297+ }
298+
299+ resumeTokenBson , err := keystring .KeystringToBson (keystring .V1 , tokenStruct .Data )
300+ if err != nil {
301+ return primitive.Timestamp {}, err
302+ }
303+ // First element is the cluster time we want
304+ resumeTokenTime , ok := resumeTokenBson [0 ].Value .(primitive.Timestamp )
305+ if ! ok {
306+ return primitive.Timestamp {}, errors .Errorf ("resume token data's (%+v) first element is of type %T, not a timestamp" , resumeTokenBson , resumeTokenBson [0 ].Value )
307+ }
308+
309+ return resumeTokenTime , nil
310+ }
311+
312+ func getClusterTimeFromSession (sess mongo.Session ) (primitive.Timestamp , error ) {
313+ ctStruct := struct {
314+ ClusterTime struct {
315+ ClusterTime primitive.Timestamp `bson:"clusterTime"`
316+ } `bson:"$clusterTime"`
317+ }{}
318+
319+ clusterTimeRaw := sess .ClusterTime ()
320+ err := bson .Unmarshal (sess .ClusterTime (), & ctStruct )
321+ if err != nil {
322+ return primitive.Timestamp {}, errors .Wrapf (err , "failed to find clusterTime in session cluster time document (%v)" , clusterTimeRaw )
323+ }
324+
325+ return ctStruct .ClusterTime .ClusterTime , nil
326+ }
0 commit comments