@@ -12,6 +12,7 @@ import (
1212 "go.mongodb.org/mongo-driver/bson/primitive"
1313 "go.mongodb.org/mongo-driver/mongo"
1414 "go.mongodb.org/mongo-driver/mongo/options"
15+ "golang.org/x/exp/constraints"
1516)
1617
1718const fauxDocSizeForDeleteEvents = 1024
@@ -104,11 +105,6 @@ func (verifier *Verifier) HandleChangeStreamEvents(ctx context.Context, batch []
104105// GetChangeStreamFilter returns an aggregation pipeline that filters
105106// namespaces as per configuration.
106107//
107- // Note that this omits verifier.globalFilter because we still need to
108- // recheck any out-filter documents that may have changed in order to
109- // account for filter traversals (i.e., updates that change whether a
110- // document matches the filter).
111- //
112108// NB: Ideally we could make the change stream give $bsonSize(fullDocument)
113109// and omit fullDocument, but $bsonSize was new in MongoDB 4.4, and we still
114110// want to verify migrations from 4.2. fullDocument is unlikely to be a
@@ -126,60 +122,55 @@ func (verifier *Verifier) GetChangeStreamFilter() []bson.D {
126122 return []bson.D {stage }
127123}
128124
129- func (verifier * Verifier ) readAndHandleOneChangeEventBatch (ctx context.Context , cs * mongo.ChangeStream ) error {
130- eventsRead := 0
131- var changeEventBatch []ParsedEvent
132-
133- for hasEventInBatch := true ; hasEventInBatch ; hasEventInBatch = cs .RemainingBatchLength () > 0 {
134- gotEvent := cs .TryNext (ctx )
135-
136- if cs .Err () != nil {
137- return errors .Wrap (cs .Err (), "change stream iteration failed" )
138- }
125+ func (verifier * Verifier ) iterateChangeStream (ctx context.Context , cs * mongo.ChangeStream ) {
126+ defer cs .Close (ctx )
139127
140- if ! gotEvent {
141- break
142- }
128+ var lastPersistedTime time.Time
143129
144- if changeEventBatch == nil {
145- changeEventBatch = make ([]ParsedEvent , cs .RemainingBatchLength ()+ 1 )
130+ persistResumeTokenIfNeeded := func () error {
131+ if time .Since (lastPersistedTime ) <= minChangeStreamPersistInterval {
132+ return nil
146133 }
147134
148- if err := cs .Decode (& changeEventBatch [eventsRead ]); err != nil {
149- return errors .Wrap (err , "failed to decode change event" )
135+ err := verifier .persistChangeStreamResumeToken (ctx , cs )
136+ if err == nil {
137+ lastPersistedTime = time .Now ()
150138 }
151139
152- eventsRead ++
140+ return err
153141 }
154142
155- if eventsRead == 0 {
156- return nil
157- }
143+ readAndHandleOneChangeEventBatch := func () ( bool , error ) {
144+ eventsRead := 0
145+ var changeEventBatch [] ParsedEvent
158146
159- err := verifier .HandleChangeStreamEvents (ctx , changeEventBatch )
160- if err != nil {
161- return errors .Wrap (err , "failed to handle change events" )
162- }
147+ for hasEventInBatch := true ; hasEventInBatch ; hasEventInBatch = cs .RemainingBatchLength () > 0 {
148+ gotEvent := cs .TryNext (ctx )
163149
164- return nil
165- }
150+ if ! gotEvent || cs .Err () != nil {
151+ break
152+ }
166153
167- func (verifier * Verifier ) iterateChangeStream (ctx context.Context , cs * mongo.ChangeStream ) {
168- defer cs .Close (ctx )
154+ if changeEventBatch == nil {
155+ changeEventBatch = make ([]ParsedEvent , cs .RemainingBatchLength ()+ 1 )
156+ }
169157
170- var lastPersistedTime time.Time
158+ if err := cs .Decode (& changeEventBatch [eventsRead ]); err != nil {
159+ return false , errors .Wrap (err , "failed to decode change event" )
160+ }
171161
172- persistResumeTokenIfNeeded := func () error {
173- if time .Since (lastPersistedTime ) <= minChangeStreamPersistInterval {
174- return nil
162+ eventsRead ++
175163 }
176164
177- err := verifier .persistChangeStreamResumeToken (ctx , cs )
178- if err == nil {
179- lastPersistedTime = time .Now ()
165+ if eventsRead > 0 {
166+ verifier .logger .Debug ().Int ("eventsCount" , eventsRead ).Msgf ("Received a batch of events." )
167+ err := verifier .HandleChangeStreamEvents (ctx , changeEventBatch )
168+ if err != nil {
169+ return false , errors .Wrap (err , "failed to handle change events" )
170+ }
180171 }
181172
182- return err
173+ return eventsRead > 0 , errors . Wrap ( cs . Err (), "change stream iteration failed" )
183174 }
184175
185176 for {
@@ -198,45 +189,29 @@ func (verifier *Verifier) iterateChangeStream(ctx context.Context, cs *mongo.Cha
198189 // If the changeStreamEnderChan has a message, the user has indicated that
199190 // source writes are ended. This means we should exit rather than continue
200191 // reading the change stream since there should be no more events.
201- case finalTs := <- verifier .changeStreamFinalTsChan :
192+ case <- verifier .changeStreamEnderChan :
202193 verifier .logger .Debug ().
203- Interface ("finalTimestamp" , finalTs ).
204- Msg ("Change stream thread received final timestamp. Finalizing change stream." )
194+ Msg ("Change stream thread received shutdown request." )
205195
206196 changeStreamEnded = true
207197
208198 // Read all change events until the source reports no events.
209199 // (i.e., the `getMore` call returns empty)
210200 for {
211- var curTs primitive.Timestamp
212- curTs , err = extractTimestampFromResumeToken (cs .ResumeToken ())
213- if err != nil {
214- err = errors .Wrap (err , "failed to extract timestamp from change stream's resume token" )
215- break
216- }
217-
218- if curTs == finalTs || curTs .After (finalTs ) {
219- verifier .logger .Debug ().
220- Interface ("currentTimestamp" , curTs ).
221- Interface ("finalTimestamp" , finalTs ).
222- Msg ("Change stream has reached the final timestamp. Shutting down." )
201+ var gotEvent bool
202+ gotEvent , err = readAndHandleOneChangeEventBatch ()
223203
224- break
225- }
226-
227- err = verifier .readAndHandleOneChangeEventBatch (ctx , cs )
228-
229- if err != nil {
204+ if ! gotEvent || err != nil {
230205 break
231206 }
232207 }
233208
234209 default :
235- err = verifier .readAndHandleOneChangeEventBatch (ctx , cs )
210+ _ , err = readAndHandleOneChangeEventBatch ()
211+ }
236212
237- if err == nil {
238- err = persistResumeTokenIfNeeded ()
239- }
213+ if err == nil {
214+ err = persistResumeTokenIfNeeded ()
240215 }
241216
242217 if err != nil && ! errors .Is (err , context .Canceled ) {
@@ -267,9 +242,9 @@ func (verifier *Verifier) iterateChangeStream(ctx context.Context, cs *mongo.Cha
267242
268243 infoLog := verifier .logger .Info ()
269244 if verifier .lastChangeEventTime == nil {
270- infoLog = infoLog .Str ("lastEventTime " , "none" )
245+ infoLog = infoLog .Str ("changeStreamStopTime " , "none" )
271246 } else {
272- infoLog = infoLog .Interface ("lastEventTime " , * verifier .lastChangeEventTime )
247+ infoLog = infoLog .Interface ("changeStreamStopTime " , * verifier .lastChangeEventTime )
273248 }
274249
275250 infoLog .Msg ("Change stream is done." )
@@ -295,7 +270,7 @@ func (verifier *Verifier) StartChangeStream(ctx context.Context) error {
295270
296271 ts , err := extractTimestampFromResumeToken (savedResumeToken )
297272 if err == nil {
298- logEvent = addTimestampToLogEvent (ts , logEvent )
273+ logEvent = addUnixTimeToLogEvent (ts . T , logEvent )
299274 } else {
300275 verifier .logger .Warn ().
301276 Err (err ).
@@ -348,10 +323,8 @@ func (verifier *Verifier) StartChangeStream(ctx context.Context) error {
348323 return nil
349324}
350325
351- func addTimestampToLogEvent (ts primitive.Timestamp , event * zerolog.Event ) * zerolog.Event {
352- return event .
353- Interface ("timestamp" , ts ).
354- Time ("time" , time .Unix (int64 (ts .T ), int64 (0 )))
326+ func addUnixTimeToLogEvent [T constraints.Integer ](unixTime T , event * zerolog.Event ) * zerolog.Event {
327+ return event .Time ("timestampTime" , time .Unix (int64 (unixTime ), int64 (0 )))
355328}
356329
357330func (v * Verifier ) getChangeStreamMetadataCollection () * mongo.Collection {
@@ -390,7 +363,7 @@ func (verifier *Verifier) persistChangeStreamResumeToken(ctx context.Context, cs
390363 logEvent := verifier .logger .Debug ()
391364
392365 if err == nil {
393- logEvent = addTimestampToLogEvent (ts , logEvent )
366+ logEvent = addUnixTimeToLogEvent (ts . T , logEvent )
394367 } else {
395368 verifier .logger .Warn ().Err (err ).
396369 Msg ("failed to extract resume token timestamp" )
0 commit comments