Skip to content
This repository was archived by the owner on Jul 30, 2025. It is now read-only.

Commit 8ac4000

Browse files
committed
crash instantly on data loss errors
We (nvartolomei and bash) found that GroupReadWorker wouldn't report data loss. The reason is that we didn't fail the test instantly when data loss is detected instead we relied on monotonicity validation to fail. I.e. we would crash when we detected that we would consume an earlier offset compared to what we consumed before. In the GroupReadWorker we reset the monotonicity validation state on any errors because the next attempt at consuming is almost certain to read the second time offsets already consumed. As a result we wouldn't catch monotonicity issues. In retrospect, it would have been better to fail instantly when franz-go detects data loss. So this commit achieves exactly that.
1 parent 27986ea commit 8ac4000

File tree

2 files changed

+14
-6
lines changed

2 files changed

+14
-6
lines changed

pkg/worker/verifier/group_read_worker.go

Lines changed: 7 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -259,9 +259,13 @@ func (grw *GroupReadWorker) consumerGroupReadInner(
259259
"fiber %v: Consumer group fetch %s/%d e=%v...",
260260
fiberId, t, p, err)
261261
var lossErr *kgo.ErrDataLoss
262-
if grw.config.workerCfg.TolerateDataLoss && errors.As(err, &lossErr) {
263-
grw.Status.Validator.RecordLostOffsets(lossErr.Partition, lossErr.ConsumedTo-lossErr.ResetTo)
264-
grw.Status.Validator.SetMonotonicityTestStateForPartition(p, lossErr.ResetTo-1)
262+
if errors.As(err, &lossErr) {
263+
if grw.config.workerCfg.TolerateDataLoss {
264+
grw.Status.Validator.RecordLostOffsets(lossErr.Partition, lossErr.ConsumedTo-lossErr.ResetTo)
265+
grw.Status.Validator.SetMonotonicityTestStateForPartition(p, lossErr.ResetTo-1)
266+
} else {
267+
log.Fatalf("Unexpected data loss detected: %v", lossErr)
268+
}
265269
} else {
266270
r_err = err
267271
}

pkg/worker/verifier/seq_read_worker.go

Lines changed: 7 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -154,9 +154,13 @@ func (srw *SeqReadWorker) sequentialReadInner(ctx context.Context, startAt []int
154154
fetches.EachError(func(t string, p int32, err error) {
155155
log.Warnf("Sequential fetch %s/%d e=%v...", t, p, err)
156156
var lossErr *kgo.ErrDataLoss
157-
if srw.config.workerCfg.TolerateDataLoss && errors.As(err, &lossErr) {
158-
srw.Status.Validator.RecordLostOffsets(lossErr.Partition, lossErr.ConsumedTo-lossErr.ResetTo)
159-
srw.Status.Validator.SetMonotonicityTestStateForPartition(p, lossErr.ResetTo-1)
157+
if errors.As(err, &lossErr) {
158+
if srw.config.workerCfg.TolerateDataLoss {
159+
srw.Status.Validator.RecordLostOffsets(lossErr.Partition, lossErr.ConsumedTo-lossErr.ResetTo)
160+
srw.Status.Validator.SetMonotonicityTestStateForPartition(p, lossErr.ResetTo-1)
161+
} else {
162+
log.Fatalf("Unexpected data loss detected: %v", lossErr)
163+
}
160164
} else {
161165
r_err = err
162166
}

0 commit comments

Comments
 (0)