@@ -87,8 +87,6 @@ func (verifier *Verifier) GetChangeStreamFilter() []bson.D {
8787}
8888
8989func (verifier * Verifier ) iterateChangeStream (ctx context.Context , cs * mongo.ChangeStream ) {
90- var changeEvent ParsedEvent
91-
9290 var lastPersistedTime time.Time
9391
9492 persistResumeTokenIfNeeded := func () error {
@@ -107,9 +105,11 @@ func (verifier *Verifier) iterateChangeStream(ctx context.Context, cs *mongo.Cha
107105 readOneChangeEvent := func () (bool , error ) {
108106 gotEvent := cs .TryNext (ctx )
109107 if gotEvent {
108+ var changeEvent ParsedEvent
110109 if err := cs .Decode (& changeEvent ); err != nil {
111110 return false , errors .Wrap (err , "failed to decode change event" )
112111 }
112+ fmt .Printf ("\n ========= got event: %+v\n \n " , changeEvent )
113113 err := verifier .HandleChangeStreamEvent (ctx , & changeEvent )
114114 if err != nil {
115115 return false , errors .Wrap (err , "failed to handle change event" )
@@ -133,6 +133,9 @@ func (verifier *Verifier) iterateChangeStream(ctx context.Context, cs *mongo.Cha
133133 // source writes are ended. This means we should exit rather than continue
134134 // reading the change stream since there should be no more events.
135135 case <- verifier .changeStreamEnderChan :
136+ verifier .logger .Debug ().
137+ Msg ("Change stream thread received shutdown request." )
138+
136139 changeStreamEnded = true
137140
138141 // Read all change events until the source reports no events.
0 commit comments