@@ -12,7 +12,6 @@ import (
1212 "go.mongodb.org/mongo-driver/bson/primitive"
1313 "go.mongodb.org/mongo-driver/mongo"
1414 "go.mongodb.org/mongo-driver/mongo/options"
15- "golang.org/x/exp/constraints"
1615)
1716
1817const fauxDocSizeForDeleteEvents = 1024
@@ -138,12 +137,14 @@ func (verifier *Verifier) iterateChangeStream(ctx context.Context, cs *mongo.Cha
138137 return err
139138 }
140139
141- readAndHandleOneChangeEventBatch := func () ( bool , error ) {
140+ readAndHandleOneChangeEventBatch := func () error {
142141 eventsRead := 0
143142 var changeEventBatch []ParsedEvent
144143
145144 for hasEventInBatch := true ; hasEventInBatch ; hasEventInBatch = cs .RemainingBatchLength () > 0 {
145+ verifier .logger .Info ().Msg ("reading event" )
146146 gotEvent := cs .TryNext (ctx )
147+ verifier .logger .Info ().Err (cs .Err ()).Msgf ("got event? %v" , gotEvent )
147148
148149 if ! gotEvent || cs .Err () != nil {
149150 break
@@ -154,21 +155,29 @@ func (verifier *Verifier) iterateChangeStream(ctx context.Context, cs *mongo.Cha
154155 }
155156
156157 if err := cs .Decode (& changeEventBatch [eventsRead ]); err != nil {
157- return false , errors .Wrap (err , "failed to decode change event" )
158+ return errors .Wrap (err , "failed to decode change event" )
158159 }
159160
161+ verifier .logger .Debug ().Interface ("event" , changeEventBatch [eventsRead ]).Msg ("Got event." )
162+
160163 eventsRead ++
161164 }
162165
163- if eventsRead > 0 {
164- verifier . logger . Debug (). Int ( "eventsCount" , eventsRead ). Msgf ( "Received a batch of events. " )
165- err := verifier . HandleChangeStreamEvents ( ctx , changeEventBatch )
166- if err != nil {
167- return false , errors . Wrap ( err , "failed to handle change events" )
168- }
166+ if cs . Err () != nil {
167+ return errors . Wrap ( cs . Err (), "change stream iteration failed " )
168+ }
169+
170+ if eventsRead == 0 {
171+ return nil
169172 }
170173
171- return eventsRead > 0 , errors .Wrap (cs .Err (), "change stream iteration failed" )
174+ verifier .logger .Debug ().Int ("eventsCount" , eventsRead ).Msgf ("Received a batch of events." )
175+ err := verifier .HandleChangeStreamEvents (ctx , changeEventBatch )
176+ if err != nil {
177+ return errors .Wrap (err , "failed to handle change events" )
178+ }
179+
180+ return nil
172181 }
173182
174183 for {
@@ -184,29 +193,45 @@ func (verifier *Verifier) iterateChangeStream(ctx context.Context, cs *mongo.Cha
184193 // If the changeStreamEnderChan has a message, the user has indicated that
185194 // source writes are ended. This means we should exit rather than continue
186195 // reading the change stream since there should be no more events.
187- case <- verifier .changeStreamEnderChan :
196+ case finalTs := <- verifier .changeStreamFinalTsChan :
188197 verifier .logger .Debug ().
189- Msg ("Change stream thread received shutdown request." )
198+ Interface ("finalTimestamp" , finalTs ).
199+ Msg ("Change stream thread received final timestamp. Finalizing change stream." )
190200
191201 changeStreamEnded = true
192202
193203 // Read all change events until the source reports no events.
194204 // (i.e., the `getMore` call returns empty)
195205 for {
196- var gotEvent bool
197- gotEvent , err = readAndHandleOneChangeEventBatch ()
206+ var curTs primitive.Timestamp
207+ curTs , err = extractTimestampFromResumeToken (cs .ResumeToken ())
208+ if err != nil {
209+ err = errors .Wrap (err , "failed to extract timestamp from change stream's resume token" )
210+ break
211+ }
212+
213+ if curTs .Compare (finalTs ) >= 0 {
214+ verifier .logger .Debug ().
215+ Interface ("currentTimestamp" , curTs ).
216+ Interface ("finalTimestamp" , finalTs ).
217+ Msg ("Change stream has reached the final timestamp. Shutting down." )
198218
199- if ! gotEvent || err != nil {
219+ break
220+ }
221+
222+ err = readAndHandleOneChangeEventBatch ()
223+
224+ if err != nil {
200225 break
201226 }
202227 }
203228
204229 default :
205- _ , err = readAndHandleOneChangeEventBatch ()
206- }
230+ err = readAndHandleOneChangeEventBatch ()
207231
208- if err == nil {
209- err = persistResumeTokenIfNeeded ()
232+ if err == nil {
233+ err = persistResumeTokenIfNeeded ()
234+ }
210235 }
211236
212237 if err != nil && ! errors .Is (err , context .Canceled ) {
@@ -237,9 +262,9 @@ func (verifier *Verifier) iterateChangeStream(ctx context.Context, cs *mongo.Cha
237262
238263 infoLog := verifier .logger .Info ()
239264 if verifier .lastChangeEventTime == nil {
240- infoLog = infoLog .Str ("changeStreamStopTime " , "none" )
265+ infoLog = infoLog .Str ("lastEventTime " , "none" )
241266 } else {
242- infoLog = infoLog .Interface ("changeStreamStopTime " , * verifier .lastChangeEventTime )
267+ infoLog = infoLog .Interface ("lastEventTime " , * verifier .lastChangeEventTime )
243268 }
244269
245270 infoLog .Msg ("Change stream is done." )
@@ -265,7 +290,7 @@ func (verifier *Verifier) StartChangeStream(ctx context.Context) error {
265290
266291 ts , err := extractTimestampFromResumeToken (savedResumeToken )
267292 if err == nil {
268- logEvent = addUnixTimeToLogEvent (ts . T , logEvent )
293+ logEvent = addTimestampToLogEvent (ts , logEvent )
269294 } else {
270295 verifier .logger .Warn ().
271296 Err (err ).
@@ -284,6 +309,7 @@ func (verifier *Verifier) StartChangeStream(ctx context.Context) error {
284309 return errors .Wrap (err , "failed to start session" )
285310 }
286311 sctx := mongo .NewSessionContext (ctx , sess )
312+ verifier .logger .Debug ().Interface ("pipeline" , pipeline ).Msg ("Opened change stream." )
287313 srcChangeStream , err := verifier .srcClient .Watch (sctx , pipeline , opts )
288314 if err != nil {
289315 return errors .Wrap (err , "failed to open change stream" )
@@ -318,8 +344,10 @@ func (verifier *Verifier) StartChangeStream(ctx context.Context) error {
318344 return nil
319345}
320346
321- func addUnixTimeToLogEvent [T constraints.Integer ](unixTime T , event * zerolog.Event ) * zerolog.Event {
322- return event .Time ("clockTime" , time .Unix (int64 (unixTime ), int64 (0 )))
347+ func addTimestampToLogEvent (ts primitive.Timestamp , event * zerolog.Event ) * zerolog.Event {
348+ return event .
349+ Interface ("timestamp" , ts ).
350+ Time ("time" , time .Unix (int64 (ts .T ), int64 (0 )))
323351}
324352
325353func (v * Verifier ) getChangeStreamMetadataCollection () * mongo.Collection {
@@ -358,7 +386,7 @@ func (verifier *Verifier) persistChangeStreamResumeToken(ctx context.Context, cs
358386 logEvent := verifier .logger .Debug ()
359387
360388 if err == nil {
361- logEvent = addUnixTimeToLogEvent (ts . T , logEvent )
389+ logEvent = addTimestampToLogEvent (ts , logEvent )
362390 } else {
363391 verifier .logger .Warn ().Err (err ).
364392 Msg ("failed to extract resume token timestamp" )
0 commit comments