@@ -104,45 +104,66 @@ func (verifier *Verifier) iterateChangeStream(ctx context.Context, cs *mongo.Cha
104104 return err
105105 }
106106
107+ readOneChangeEvent := func () (bool , error ) {
108+ gotEvent := cs .TryNext (ctx )
109+ if gotEvent {
110+ if err := cs .Decode (& changeEvent ); err != nil {
111+ return false , errors .Wrap (err , "failed to decode change event" )
112+ }
113+ err := verifier .HandleChangeStreamEvent (ctx , & changeEvent )
114+ if err != nil {
115+ return false , errors .Wrap (err , "failed to handle change event" )
116+ }
117+ }
118+
119+ return gotEvent , errors .Wrap (cs .Err (), "change stream iteration failed" )
120+ }
121+
107122 for {
108123 var err error
124+ var changeStreamEnded bool
109125
110- for cs .TryNext (ctx ) {
111- if err = cs .Decode (& changeEvent ); err != nil {
112- err = errors .Wrap (err , "failed to decode change event" )
113- break
126+ select {
127+
128+ // If the context is canceled, return immmediately.
129+ case <- ctx .Done ():
130+ return
131+
132+ // If the changeStreamEnderChan has a message, the user has indicated that
133+ // source writes are ended. This means we should exit rather than continue
134+ // reading the change stream since there should be no more events.
135+ case <- verifier .changeStreamEnderChan :
136+ var gotEvent bool
137+
138+ changeStreamEnded = true
139+
140+ // Read all change events until the source reports no events.
141+ // (i.e., the `getMore` call returns empty)
142+ for {
143+ gotEvent , err = readOneChangeEvent ()
144+
145+ if ! gotEvent || err != nil {
146+ break
147+ }
114148 }
115- err = verifier . HandleChangeStreamEvent ( ctx , & changeEvent )
149+
116150 if err != nil {
117- err = errors .Wrap (err , "failed to handle change event" )
118151 break
119152 }
120- }
121153
122- if cs .Err () != nil {
123- err = errors .Wrap (
124- cs .Err (),
125- "change stream iteration failed" ,
126- )
154+ default :
155+ _ , err = readOneChangeEvent ()
127156 }
128157
129158 if err == nil {
130159 err = persistResumeTokenIfNeeded ()
131160 }
132161
133- if err != nil {
134- if ! errors .Is (err , context .Canceled ) {
135- verifier .changeStreamErrChan <- err
136- }
137-
138- return
162+ if err != nil && ! errors .Is (err , context .Canceled ) {
163+ verifier .changeStreamErrChan <- err
139164 }
140165
141- select {
142- // If the changeStreamEnderChan has a message, the user has indicated that
143- // source writes are ended. This means we should exit rather than continue
144- // reading the change stream since there should be no more events.
145- case <- verifier .changeStreamEnderChan :
166+ if changeStreamEnded {
146167 verifier .mux .Lock ()
147168 verifier .changeStreamRunning = false
148169 if verifier .lastChangeEventTime != nil {
@@ -155,7 +176,6 @@ func (verifier *Verifier) iterateChangeStream(ctx context.Context, cs *mongo.Cha
155176 // since the changeStream is exhausted, we now return
156177 verifier .logger .Debug ().Msg ("Change stream is done" )
157178 return
158- default :
159179 }
160180 }
161181}
0 commit comments