@@ -44,7 +44,7 @@ const (
4444func (verifier * Verifier ) HandleChangeStreamEvent (ctx context.Context , changeEvent * ParsedEvent ) error {
4545 if changeEvent .ClusterTime != nil &&
4646 (verifier .lastChangeEventTime == nil ||
47- primitive . CompareTimestamp ( * verifier .lastChangeEventTime , * changeEvent .ClusterTime ) < 0 ) {
47+ verifier .lastChangeEventTime . Compare ( * changeEvent .ClusterTime ) < 0 ) {
4848 verifier .lastChangeEventTime = changeEvent .ClusterTime
4949 }
5050 switch changeEvent .OpType {
@@ -126,13 +126,20 @@ func (verifier *Verifier) iterateChangeStream(ctx context.Context, cs *mongo.Cha
126126 // the default case is that we are still in the Check phase, in the check phase we still
127127 // use TryNext, but we do not exit if TryNext returns false.
128128 default :
129- if next := cs .TryNext (ctx ); ! next {
130- continue
131- }
132- if err := cs .Decode (& changeEvent ); err != nil {
133- verifier .logger .Fatal ().Err (err ).Msg ("" )
129+ var err error
130+
131+ if next := cs .TryNext (ctx ); next {
132+ if err = cs .Decode (& changeEvent ); err != nil {
133+ err = errors .Wrapf (err , "failed to decode change event (%v)" , cs .Current )
134+ }
135+
136+ if err == nil {
137+ err = verifier .HandleChangeStreamEvent (ctx , & changeEvent )
138+ if err != nil {
139+ err = errors .Wrapf (err , "failed to handle change event (%+v)" , changeEvent )
140+ }
141+ }
134142 }
135- err := verifier .HandleChangeStreamEvent (ctx , & changeEvent )
136143
137144 if err == nil {
138145 err = persistResumeTokenIfNeeded ()
@@ -231,7 +238,7 @@ func (verifier *Verifier) loadChangeStreamResumeToken(ctx context.Context) (bson
231238 token , err := coll .FindOne (
232239 ctx ,
233240 bson.D {{"_id" , "resumeToken" }},
234- ).DecodeBytes ()
241+ ).Raw ()
235242
236243 if errors .Is (err , mongo .ErrNoDocuments ) {
237244 return nil , nil
0 commit comments