Skip to content

Commit 3b26420

Browse files
committed
refactor & solve inconsistency
1 parent 36a6cac commit 3b26420

File tree

1 file changed

+34
-48
lines changed

1 file changed

+34
-48
lines changed

internal/verifier/change_stream.go

Lines changed: 34 additions & 48 deletions
Original file line numberDiff line numberDiff line change
@@ -101,24 +101,42 @@ func (verifier *Verifier) iterateChangeStream(ctx context.Context, cs *mongo.Cha
101101
}
102102

103103
for {
104-
select {
105-
// if the context is cancelled return immmediately
106-
case <-ctx.Done():
104+
var err error
105+
106+
for cs.TryNext(ctx) {
107+
if err = cs.Decode(&changeEvent); err != nil {
108+
err = errors.Wrap(err, "failed to decode change event")
109+
break
110+
}
111+
err = verifier.HandleChangeStreamEvent(ctx, &changeEvent)
112+
if err != nil {
113+
err = errors.Wrap(err, "failed to handle change event")
114+
break
115+
}
116+
}
117+
118+
if cs.Err() != nil {
119+
err = errors.Wrap(
120+
cs.Err(),
121+
"change stream iteration failed",
122+
)
123+
}
124+
125+
if err != nil {
126+
if !errors.Is(err, context.Canceled) {
127+
verifier.changeStreamErrChan <- err
128+
}
129+
107130
return
108-
// if the changeStreamEnderChan has a message, we have moved to the Recheck phase, obtain
109-
// the remaining changes, but when TryNext returns false, we will exit, since there should
110-
// be no message until the user has guaranteed writes to the source have ended.
131+
}
132+
133+
persistResumeTokenIfNeeded()
134+
135+
select {
136+
// If the changeStreamEnderChan has a message, the user has indicated that
137+
// source writes are ended. This means we should exit rather than continue
138+
// reading the change stream since there should be no more events.
111139
case <-verifier.changeStreamEnderChan:
112-
for cs.TryNext(ctx) {
113-
if err := cs.Decode(&changeEvent); err != nil {
114-
verifier.logger.Fatal().Err(err).Msg("Failed to decode change event")
115-
}
116-
err := verifier.HandleChangeStreamEvent(ctx, &changeEvent)
117-
if err != nil {
118-
verifier.changeStreamErrChan <- err
119-
verifier.logger.Fatal().Err(err).Msg("Error handling change event")
120-
}
121-
}
122140
verifier.mux.Lock()
123141
verifier.changeStreamRunning = false
124142
if verifier.lastChangeEventTime != nil {
@@ -131,39 +149,7 @@ func (verifier *Verifier) iterateChangeStream(ctx context.Context, cs *mongo.Cha
131149
// since the changeStream is exhausted, we now return
132150
verifier.logger.Debug().Msg("Change stream is done")
133151
return
134-
// the default case is that we are still in the Check phase, in the check phase we still
135-
// use TryNext, but we do not exit if TryNext returns false.
136152
default:
137-
var err error
138-
139-
for err == nil {
140-
next := cs.TryNext(ctx)
141-
142-
if next {
143-
if err = cs.Decode(&changeEvent); err != nil {
144-
err = errors.Wrapf(err, "failed to decode change event (%v)", cs.Current)
145-
}
146-
147-
if err == nil {
148-
err = verifier.HandleChangeStreamEvent(ctx, &changeEvent)
149-
if err != nil {
150-
err = errors.Wrapf(err, "failed to handle change event (%+v)", changeEvent)
151-
}
152-
}
153-
} else {
154-
err = errors.Wrap(cs.Err(), "change stream iteration failed")
155-
break
156-
}
157-
}
158-
159-
if err == nil {
160-
err = persistResumeTokenIfNeeded()
161-
}
162-
163-
if err != nil {
164-
verifier.changeStreamErrChan <- err
165-
return
166-
}
167153
}
168154
}
169155
}

0 commit comments

Comments
 (0)