@@ -100,45 +100,66 @@ func (verifier *Verifier) iterateChangeStream(ctx context.Context, cs *mongo.Cha
100100 return err
101101 }
102102
103+ readOneChangeEvent := func () (bool , error ) {
104+ gotEvent := cs .TryNext (ctx )
105+ if gotEvent {
106+ if err := cs .Decode (& changeEvent ); err != nil {
107+ return false , errors .Wrap (err , "failed to decode change event" )
108+ }
109+ err := verifier .HandleChangeStreamEvent (ctx , & changeEvent )
110+ if err != nil {
111+ return false , errors .Wrap (err , "failed to handle change event" )
112+ }
113+ }
114+
115+ return gotEvent , errors .Wrap (cs .Err (), "change stream iteration failed" )
116+ }
117+
103118 for {
104119 var err error
120+ var changeStreamEnded bool
105121
106- for cs .TryNext (ctx ) {
107- if err = cs .Decode (& changeEvent ); err != nil {
108- err = errors .Wrap (err , "failed to decode change event" )
109- break
122+ select {
123+
124+ // If the context is canceled, return immmediately.
125+ case <- ctx .Done ():
126+ return
127+
128+ // If the changeStreamEnderChan has a message, the user has indicated that
129+ // source writes are ended. This means we should exit rather than continue
130+ // reading the change stream since there should be no more events.
131+ case <- verifier .changeStreamEnderChan :
132+ var gotEvent bool
133+
134+ changeStreamEnded = true
135+
136+ // Read all change events until the source reports no events.
137+ // (i.e., the `getMore` call returns empty)
138+ for {
139+ gotEvent , err = readOneChangeEvent ()
140+
141+ if ! gotEvent || err != nil {
142+ break
143+ }
110144 }
111- err = verifier . HandleChangeStreamEvent ( ctx , & changeEvent )
145+
112146 if err != nil {
113- err = errors .Wrap (err , "failed to handle change event" )
114147 break
115148 }
116- }
117149
118- if cs .Err () != nil {
119- err = errors .Wrap (
120- cs .Err (),
121- "change stream iteration failed" ,
122- )
150+ default :
151+ _ , err = readOneChangeEvent ()
123152 }
124153
125154 if err == nil {
126155 err = persistResumeTokenIfNeeded ()
127156 }
128157
129- if err != nil {
130- if ! errors .Is (err , context .Canceled ) {
131- verifier .changeStreamErrChan <- err
132- }
133-
134- return
158+ if err != nil && ! errors .Is (err , context .Canceled ) {
159+ verifier .changeStreamErrChan <- err
135160 }
136161
137- select {
138- // If the changeStreamEnderChan has a message, the user has indicated that
139- // source writes are ended. This means we should exit rather than continue
140- // reading the change stream since there should be no more events.
141- case <- verifier .changeStreamEnderChan :
162+ if changeStreamEnded {
142163 verifier .mux .Lock ()
143164 verifier .changeStreamRunning = false
144165 if verifier .lastChangeEventTime != nil {
@@ -151,7 +172,6 @@ func (verifier *Verifier) iterateChangeStream(ctx context.Context, cs *mongo.Cha
151172 // since the changeStream is exhausted, we now return
152173 verifier .logger .Debug ().Msg ("Change stream is done" )
153174 return
154- default :
155175 }
156176 }
157177}
0 commit comments