@@ -136,16 +136,23 @@ func (verifier *Verifier) iterateChangeStream(ctx context.Context, cs *mongo.Cha
136136 default :
137137 var err error
138138
139- if next := cs .TryNext (ctx ); next {
140- if err = cs .Decode (& changeEvent ); err != nil {
141- err = errors .Wrapf (err , "failed to decode change event (%v)" , cs .Current )
142- }
139+ for err == nil {
140+ next := cs .TryNext (ctx )
141+
142+ if next {
143+ if err = cs .Decode (& changeEvent ); err != nil {
144+ err = errors .Wrapf (err , "failed to decode change event (%v)" , cs .Current )
145+ }
143146
144- if err == nil {
145- err = verifier .HandleChangeStreamEvent (ctx , & changeEvent )
146- if err != nil {
147- err = errors .Wrapf (err , "failed to handle change event (%+v)" , changeEvent )
147+ if err == nil {
148+ err = verifier .HandleChangeStreamEvent (ctx , & changeEvent )
149+ if err != nil {
150+ err = errors .Wrapf (err , "failed to handle change event (%+v)" , changeEvent )
151+ }
148152 }
153+ } else {
154+ err = errors .Wrap (cs .Err (), "change stream iteration failed" )
155+ break
149156 }
150157 }
151158
@@ -266,9 +273,22 @@ func (verifier *Verifier) persistChangeStreamResumeToken(ctx context.Context, cs
266273 options .Replace ().SetUpsert (true ),
267274 )
268275
269- verifier .logger .Debug ().
270- Stringer ("resumeToken" , token ).
271- Msg ("Persisted change stream resume token." )
276+ if err == nil {
277+ ts , err := extractTimestampFromResumeToken (token )
278+
279+ logEvent := verifier .logger .Debug ()
280+
281+ if err == nil {
282+ logEvent = addUnixTimeToLogEvent (ts .T , logEvent )
283+ } else {
284+ verifier .logger .Warn ().Err (err ).
285+ Msg ("failed to extract resume token timestamp" )
286+ }
287+
288+ logEvent .Msg ("Persisted change stream resume token." )
289+
290+ return nil
291+ }
272292
273293 return errors .Wrapf (err , "failed to persist change stream resume token (%v)" , token )
274294}
0 commit comments