Skip to content

Commit de9675c

Browse files
committed
tweak verbiage
1 parent 2abca23 commit de9675c

File tree

1 file changed

+13
-13
lines changed

1 file changed

+13
-13
lines changed

internal/verifier/change_stream.go

Lines changed: 13 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -138,7 +138,7 @@ func (verifier *Verifier) iterateChangeStream(ctx context.Context, cs *mongo.Cha
138138
return err
139139
}
140140

141-
readAndHandleOneChangeEventBatch := func() (bool, error) {
141+
readAndHandleOneChangeEventBatch := func() error {
142142
eventsRead := 0
143143
var changeEventBatch []ParsedEvent
144144

@@ -154,27 +154,27 @@ func (verifier *Verifier) iterateChangeStream(ctx context.Context, cs *mongo.Cha
154154
}
155155

156156
if err := cs.Decode(&changeEventBatch[eventsRead]); err != nil {
157-
return false, errors.Wrap(err, "failed to decode change event")
157+
return errors.Wrap(err, "failed to decode change event")
158158
}
159159

160160
eventsRead++
161161
}
162162

163163
if cs.Err() != nil {
164-
return false, errors.Wrap(cs.Err(), "change stream iteration failed")
164+
return errors.Wrap(cs.Err(), "change stream iteration failed")
165165
}
166166

167167
if eventsRead == 0 {
168-
return false, nil
168+
return nil
169169
}
170170

171171
verifier.logger.Debug().Int("eventsCount", eventsRead).Msgf("Received a batch of events.")
172172
err := verifier.HandleChangeStreamEvents(ctx, changeEventBatch)
173173
if err != nil {
174-
return false, errors.Wrap(err, "failed to handle change events")
174+
return errors.Wrap(err, "failed to handle change events")
175175
}
176176

177-
return true, nil
177+
return nil
178178
}
179179

180180
for {
@@ -216,19 +216,19 @@ func (verifier *Verifier) iterateChangeStream(ctx context.Context, cs *mongo.Cha
216216
break
217217
}
218218

219-
_, err = readAndHandleOneChangeEventBatch()
219+
err = readAndHandleOneChangeEventBatch()
220220

221221
if err != nil {
222222
break
223223
}
224224
}
225225

226226
default:
227-
_, err = readAndHandleOneChangeEventBatch()
228-
}
227+
err = readAndHandleOneChangeEventBatch()
229228

230-
if err == nil {
231-
err = persistResumeTokenIfNeeded()
229+
if err == nil {
230+
err = persistResumeTokenIfNeeded()
231+
}
232232
}
233233

234234
if err != nil && !errors.Is(err, context.Canceled) {
@@ -259,9 +259,9 @@ func (verifier *Verifier) iterateChangeStream(ctx context.Context, cs *mongo.Cha
259259

260260
infoLog := verifier.logger.Info()
261261
if verifier.lastChangeEventTime == nil {
262-
infoLog = infoLog.Str("changeStreamStopTime", "none")
262+
infoLog = infoLog.Str("lastEventTime", "none")
263263
} else {
264-
infoLog = infoLog.Interface("changeStreamStopTime", *verifier.lastChangeEventTime)
264+
infoLog = infoLog.Interface("lastEventTime", *verifier.lastChangeEventTime)
265265
}
266266

267267
infoLog.Msg("Change stream is done.")

0 commit comments

Comments
 (0)