@@ -12,7 +12,6 @@ import (
1212 "go.mongodb.org/mongo-driver/bson/primitive"
1313 "go.mongodb.org/mongo-driver/mongo"
1414 "go.mongodb.org/mongo-driver/mongo/options"
15- "golang.org/x/exp/constraints"
1615)
1716
1817const fauxDocSizeForDeleteEvents = 1024
@@ -105,6 +104,11 @@ func (verifier *Verifier) HandleChangeStreamEvents(ctx context.Context, batch []
105104// GetChangeStreamFilter returns an aggregation pipeline that filters
106105// namespaces as per configuration.
107106//
107+ // Note that this omits verifier.globalFilter because we still need to
108+ // recheck any out-filter documents that may have changed in order to
109+ // account for filter traversals (i.e., updates that change whether a
110+ // document matches the filter).
111+ //
108112// NB: Ideally we could make the change stream give $bsonSize(fullDocument)
109113// and omit fullDocument, but $bsonSize was new in MongoDB 4.4, and we still
110114// want to verify migrations from 4.2. fullDocument is unlikely to be a
@@ -122,55 +126,60 @@ func (verifier *Verifier) GetChangeStreamFilter() []bson.D {
122126 return []bson.D {stage }
123127}
124128
125- func (verifier * Verifier ) iterateChangeStream (ctx context.Context , cs * mongo.ChangeStream ) {
126- defer cs .Close (ctx )
129+ func (verifier * Verifier ) readAndHandleOneChangeEventBatch (ctx context.Context , cs * mongo.ChangeStream ) error {
130+ eventsRead := 0
131+ var changeEventBatch []ParsedEvent
127132
128- var lastPersistedTime time.Time
133+ for hasEventInBatch := true ; hasEventInBatch ; hasEventInBatch = cs .RemainingBatchLength () > 0 {
134+ gotEvent := cs .TryNext (ctx )
129135
130- persistResumeTokenIfNeeded := func () error {
131- if time .Since (lastPersistedTime ) <= minChangeStreamPersistInterval {
132- return nil
136+ if cs .Err () != nil {
137+ return errors .Wrap (cs .Err (), "change stream iteration failed" )
133138 }
134139
135- err := verifier .persistChangeStreamResumeToken (ctx , cs )
136- if err == nil {
137- lastPersistedTime = time .Now ()
140+ if ! gotEvent {
141+ break
138142 }
139143
140- return err
144+ if changeEventBatch == nil {
145+ changeEventBatch = make ([]ParsedEvent , cs .RemainingBatchLength ()+ 1 )
146+ }
147+
148+ if err := cs .Decode (& changeEventBatch [eventsRead ]); err != nil {
149+ return errors .Wrap (err , "failed to decode change event" )
150+ }
151+
152+ eventsRead ++
141153 }
142154
143- readAndHandleOneChangeEventBatch := func () ( bool , error ) {
144- eventsRead := 0
145- var changeEventBatch [] ParsedEvent
155+ if eventsRead == 0 {
156+ return nil
157+ }
146158
147- for hasEventInBatch := true ; hasEventInBatch ; hasEventInBatch = cs .RemainingBatchLength () > 0 {
148- gotEvent := cs .TryNext (ctx )
159+ err := verifier .HandleChangeStreamEvents (ctx , changeEventBatch )
160+ if err != nil {
161+ return errors .Wrap (err , "failed to handle change events" )
162+ }
149163
150- if ! gotEvent || cs .Err () != nil {
151- break
152- }
164+ return nil
165+ }
153166
154- if changeEventBatch == nil {
155- changeEventBatch = make ([]ParsedEvent , cs .RemainingBatchLength ()+ 1 )
156- }
167+ func (verifier * Verifier ) iterateChangeStream (ctx context.Context , cs * mongo.ChangeStream ) {
168+ defer cs .Close (ctx )
157169
158- if err := cs .Decode (& changeEventBatch [eventsRead ]); err != nil {
159- return false , errors .Wrap (err , "failed to decode change event" )
160- }
170+ var lastPersistedTime time.Time
161171
162- eventsRead ++
172+ persistResumeTokenIfNeeded := func () error {
173+ if time .Since (lastPersistedTime ) <= minChangeStreamPersistInterval {
174+ return nil
163175 }
164176
165- if eventsRead > 0 {
166- verifier .logger .Debug ().Int ("eventsCount" , eventsRead ).Msgf ("Received a batch of events." )
167- err := verifier .HandleChangeStreamEvents (ctx , changeEventBatch )
168- if err != nil {
169- return false , errors .Wrap (err , "failed to handle change events" )
170- }
177+ err := verifier .persistChangeStreamResumeToken (ctx , cs )
178+ if err == nil {
179+ lastPersistedTime = time .Now ()
171180 }
172181
173- return eventsRead > 0 , errors . Wrap ( cs . Err (), "change stream iteration failed" )
182+ return err
174183 }
175184
176185 for {
@@ -189,29 +198,45 @@ func (verifier *Verifier) iterateChangeStream(ctx context.Context, cs *mongo.Cha
189198 // If the changeStreamEnderChan has a message, the user has indicated that
190199 // source writes are ended. This means we should exit rather than continue
191200 // reading the change stream since there should be no more events.
192- case <- verifier .changeStreamEnderChan :
201+ case finalTs := <- verifier .changeStreamFinalTsChan :
193202 verifier .logger .Debug ().
194- Msg ("Change stream thread received shutdown request." )
203+ Interface ("finalTimestamp" , finalTs ).
204+ Msg ("Change stream thread received final timestamp. Finalizing change stream." )
195205
196206 changeStreamEnded = true
197207
198208 // Read all change events until the source reports no events.
199209 // (i.e., the `getMore` call returns empty)
200210 for {
201- var gotEvent bool
202- gotEvent , err = readAndHandleOneChangeEventBatch ()
211+ var curTs primitive.Timestamp
212+ curTs , err = extractTimestampFromResumeToken (cs .ResumeToken ())
213+ if err != nil {
214+ err = errors .Wrap (err , "failed to extract timestamp from change stream's resume token" )
215+ break
216+ }
217+
218+ if curTs == finalTs || curTs .After (finalTs ) {
219+ verifier .logger .Debug ().
220+ Interface ("currentTimestamp" , curTs ).
221+ Interface ("finalTimestamp" , finalTs ).
222+ Msg ("Change stream has reached the final timestamp. Shutting down." )
203223
204- if ! gotEvent || err != nil {
224+ break
225+ }
226+
227+ err = verifier .readAndHandleOneChangeEventBatch (ctx , cs )
228+
229+ if err != nil {
205230 break
206231 }
207232 }
208233
209234 default :
210- _ , err = readAndHandleOneChangeEventBatch ()
211- }
235+ err = verifier .readAndHandleOneChangeEventBatch (ctx , cs )
212236
213- if err == nil {
214- err = persistResumeTokenIfNeeded ()
237+ if err == nil {
238+ err = persistResumeTokenIfNeeded ()
239+ }
215240 }
216241
217242 if err != nil && ! errors .Is (err , context .Canceled ) {
@@ -242,9 +267,9 @@ func (verifier *Verifier) iterateChangeStream(ctx context.Context, cs *mongo.Cha
242267
243268 infoLog := verifier .logger .Info ()
244269 if verifier .lastChangeEventTime == nil {
245- infoLog = infoLog .Str ("changeStreamStopTime " , "none" )
270+ infoLog = infoLog .Str ("lastEventTime " , "none" )
246271 } else {
247- infoLog = infoLog .Interface ("changeStreamStopTime " , * verifier .lastChangeEventTime )
272+ infoLog = infoLog .Interface ("lastEventTime " , * verifier .lastChangeEventTime )
248273 }
249274
250275 infoLog .Msg ("Change stream is done." )
@@ -270,7 +295,7 @@ func (verifier *Verifier) StartChangeStream(ctx context.Context) error {
270295
271296 ts , err := extractTimestampFromResumeToken (savedResumeToken )
272297 if err == nil {
273- logEvent = addUnixTimeToLogEvent (ts . T , logEvent )
298+ logEvent = addTimestampToLogEvent (ts , logEvent )
274299 } else {
275300 verifier .logger .Warn ().
276301 Err (err ).
@@ -323,8 +348,10 @@ func (verifier *Verifier) StartChangeStream(ctx context.Context) error {
323348 return nil
324349}
325350
326- func addUnixTimeToLogEvent [T constraints.Integer ](unixTime T , event * zerolog.Event ) * zerolog.Event {
327- return event .Time ("timestampTime" , time .Unix (int64 (unixTime ), int64 (0 )))
351+ func addTimestampToLogEvent (ts primitive.Timestamp , event * zerolog.Event ) * zerolog.Event {
352+ return event .
353+ Interface ("timestamp" , ts ).
354+ Time ("time" , time .Unix (int64 (ts .T ), int64 (0 )))
328355}
329356
330357func (v * Verifier ) getChangeStreamMetadataCollection () * mongo.Collection {
@@ -363,7 +390,7 @@ func (verifier *Verifier) persistChangeStreamResumeToken(ctx context.Context, cs
363390 logEvent := verifier .logger .Debug ()
364391
365392 if err == nil {
366- logEvent = addUnixTimeToLogEvent (ts . T , logEvent )
393+ logEvent = addTimestampToLogEvent (ts , logEvent )
367394 } else {
368395 verifier .logger .Warn ().Err (err ).
369396 Msg ("failed to extract resume token timestamp" )
0 commit comments