@@ -133,24 +133,19 @@ func (verifier *Verifier) iterateChangeStream(ctx context.Context, cs *mongo.Cha
133133 // source writes are ended. This means we should exit rather than continue
134134 // reading the change stream since there should be no more events.
135135 case <- verifier .changeStreamEnderChan :
136- var gotEvent bool
137-
138136 changeStreamEnded = true
139137
140138 // Read all change events until the source reports no events.
141139 // (i.e., the `getMore` call returns empty)
142140 for {
141+ var gotEvent bool
143142 gotEvent , err = readOneChangeEvent ()
144143
145144 if ! gotEvent || err != nil {
146145 break
147146 }
148147 }
149148
150- if err != nil {
151- break
152- }
153-
154149 default :
155150 _ , err = readOneChangeEvent ()
156151 }
@@ -160,17 +155,14 @@ func (verifier *Verifier) iterateChangeStream(ctx context.Context, cs *mongo.Cha
160155 }
161156
162157 if err != nil && ! errors .Is (err , context .Canceled ) {
163- if changeStreamEnded {
164- // Avoid potentially blocking if the change stream is done.
165- verifier .logger .Fatal ().
166- Err (err ).
167- Msg ("Failed to finish reading change stream." )
168- } else {
169- verifier .logger .Warn ().
170- Err (err ).
171- Msg ("An error occurred during change stream processing." )
172-
173- verifier .changeStreamErrChan <- err
158+ verifier .logger .Warn ().
159+ Err (err ).
160+ Msg ("An error occurred during change stream processing." )
161+
162+ verifier .changeStreamErrChan <- err
163+
164+ if ! changeStreamEnded {
165+ return
174166 }
175167 }
176168
0 commit comments