Skip to content
This repository was archived by the owner on Jul 30, 2025. It is now read-only.

Commit 01dcf48

Browse files
committed
verifier: fix lastConsumableOffsets
This didn't account for tombstones being produced for the same key later in the log. Fix the logic by adding a new `latestKoByPartition` map which tracks the last offset for a given key. By using it in combination with `latestKvByPartition`, we can conclude which non-tombstone record/key has the highest offset for a given partition.
1 parent 300f32e commit 01dcf48

File tree

4 files changed

+52
-16
lines changed

4 files changed

+52
-16
lines changed

pkg/worker/verifier/latest_value_map.go

Lines changed: 29 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -11,22 +11,37 @@ import (
1111
)
1212

1313
type LatestValueMap struct {
14-
topic string
15-
LatestKvByPartition []map[string]string
14+
topic string
15+
// Latest offset for a given key
16+
latestKoByPartition []map[string]int64
17+
// Latest value for a given key
18+
LatestKvByPartition []map[string][]byte
1619
}
1720

18-
func (lvm *LatestValueMap) Get(partition int32, key string) (value string, exists bool) {
21+
func (lvm *LatestValueMap) GetValue(partition int32, key string) (value []byte, exists bool) {
1922
if partition < 0 || partition >= int32(len(lvm.LatestKvByPartition)) {
2023
log.Panicf("Partition %d out of bounds for latestValueMap of size %d", partition, len(lvm.LatestKvByPartition))
2124
}
2225
value, exists = lvm.LatestKvByPartition[partition][key]
2326
return
2427
}
2528

26-
func (lvm *LatestValueMap) Insert(partition int32, key string, value string) {
29+
func (lvm *LatestValueMap) InsertKeyValue(partition int32, key string, value []byte) {
2730
lvm.LatestKvByPartition[partition][key] = value
2831
}
2932

33+
func (lvm *LatestValueMap) GetOffset(partition int32, key string) (offset int64, exists bool) {
34+
if partition < 0 || partition >= int32(len(lvm.latestKoByPartition)) {
35+
log.Panicf("Partition %d out of bounds for latestValueMap of size %d", partition, len(lvm.latestKoByPartition))
36+
}
37+
offset, exists = lvm.latestKoByPartition[partition][key]
38+
return
39+
}
40+
41+
func (lvm *LatestValueMap) InsertKeyOffset(partition int32, key string, offset int64) {
42+
lvm.latestKoByPartition[partition][key] = offset
43+
}
44+
3045
func latestValueMapFile(topic string) string {
3146
return fmt.Sprintf("latest_value_%s.json", topic)
3247
}
@@ -73,20 +88,25 @@ func LoadLatestValues(topic string, nPartitions int32) LatestValueMap {
7388
util.Die("More partitions in latest_value_map file than in topic!")
7489
} else if len(lvm.LatestKvByPartition) < int(nPartitions) {
7590
// Creating new partitions is allowed
76-
blanks := make([]map[string]string, nPartitions-int32(len(lvm.LatestKvByPartition)))
91+
blanks := make([]map[string][]byte, nPartitions-int32(len(lvm.LatestKvByPartition)))
7792
lvm.LatestKvByPartition = append(lvm.LatestKvByPartition, blanks...)
7893
}
7994
log.Infof("Successfully read latest value map")
8095
return lvm
8196
}
8297

8398
func NewLatestValueMap(topic string, nPartitions int32) LatestValueMap {
84-
maps := make([]map[string]string, nPartitions)
85-
for i := range maps {
86-
maps[i] = make(map[string]string)
99+
kvMaps := make([]map[string][]byte, nPartitions)
100+
for i := range kvMaps {
101+
kvMaps[i] = make(map[string][]byte)
102+
}
103+
koMaps := make([]map[string]int64, nPartitions)
104+
for i := range koMaps {
105+
koMaps[i] = make(map[string]int64)
87106
}
88107
return LatestValueMap{
89108
topic: topic,
90-
LatestKvByPartition: maps,
109+
LatestKvByPartition: kvMaps,
110+
latestKoByPartition: koMaps,
91111
}
92112
}

pkg/worker/verifier/offset_ranges.go

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -132,6 +132,10 @@ func (tors *TopicOffsetRanges) Contains(p int32, o int64) bool {
132132
return tors.PartitionRanges[p].Contains(o)
133133
}
134134

135+
func (tors *TopicOffsetRanges) GetLastConsumableOffset(p int32) int64 {
136+
return tors.LastConsumableOffsets[p]
137+
}
138+
135139
func (tors *TopicOffsetRanges) SetLastConsumableOffset(p int32, o int64) {
136140
tors.LastConsumableOffsets[p] = o
137141
}

pkg/worker/verifier/producer_worker.go

Lines changed: 17 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -210,11 +210,9 @@ func (pw *ProducerWorker) OnAcked(r *kgo.Record) {
210210
pw.Status.OnAcked(r.Partition, r.Offset)
211211

212212
pw.validOffsets.Insert(r.Partition, r.Offset)
213-
if pw.config.producesTombstones && r.Value != nil {
214-
pw.validOffsets.SetLastConsumableOffset(r.Partition, r.Offset)
215-
}
216-
if pw.validateLatestValues {
217-
pw.latestValueProduced.Insert(r.Partition, string(r.Key), string(r.Value))
213+
if pw.validateLatestValues || pw.config.producesTombstones {
214+
pw.latestValueProduced.InsertKeyValue(r.Partition, string(r.Key), r.Value)
215+
pw.latestValueProduced.InsertKeyOffset(r.Partition, string(r.Key), r.Offset)
218216
}
219217
}
220218

@@ -248,6 +246,19 @@ func (self *ProducerWorkerStatus) OnFail() {
248246
self.Fails += 1
249247
}
250248

249+
func (pw *ProducerWorker) parseLastConsumableOffsets() {
250+
for partition, keyOffset := range pw.latestValueProduced.latestKoByPartition {
251+
for key, offset := range keyOffset {
252+
value, exists := pw.latestValueProduced.GetValue(int32(partition), key)
253+
if exists && value != nil {
254+
if offset > pw.validOffsets.GetLastConsumableOffset(int32(partition)) {
255+
pw.validOffsets.SetLastConsumableOffset(int32(partition), offset)
256+
}
257+
}
258+
}
259+
}
260+
}
261+
251262
func (pw *ProducerWorker) Store() {
252263
err := pw.validOffsets.Store()
253264
util.Chk(err, "Error writing offset map: %v", err)
@@ -449,6 +460,7 @@ func (pw *ProducerWorker) produceInner(n int64) (int64, []BadOffset, error) {
449460
client.Close()
450461
log.Info("Closed client.")
451462

463+
pw.parseLastConsumableOffsets()
452464
pw.produceCheckpoint()
453465

454466
if errored {

pkg/worker/verifier/validator_status.go

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -113,8 +113,8 @@ func (cs *ValidatorStatus) ValidateRecord(r *kgo.Record, validRanges *TopicOffse
113113
}
114114

115115
if cs.expectFullyCompacted {
116-
latestValue, exists := latestValuesProduced.Get(r.Partition, string(r.Key))
117-
if !exists || latestValue != string(r.Value) {
116+
latestValue, exists := latestValuesProduced.GetValue(r.Partition, string(r.Key))
117+
if !exists || string(latestValue) != string(r.Value) {
118118
log.Panicf("Consumed value for key %s does not match the latest produced value in a compacted topic- did compaction for partition %s/%d occur betwen producing and consuming?", r.Key, r.Topic, r.Partition)
119119
}
120120
}

0 commit comments

Comments
 (0)