Skip to content
This repository was archived by the owner on Jul 30, 2025. It is now read-only.

Commit 27986ea

Browse files
authored
Merge pull request #60 from WillemKauf/tombstones
Compaction verification enhancements
2 parents 7bbf8c8 + 300f32e commit 27986ea

File tree

12 files changed

+279
-60
lines changed

12 files changed

+279
-60
lines changed

cmd/kgo-repeater/main.go

Lines changed: 6 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -50,8 +50,9 @@ var (
5050
transactionAbortRate = flag.Float64("transaction-abort-rate", 0.0, "The probability that any given transaction should abort")
5151
msgsPerTransaction = flag.Uint("msgs-per-transaction", 1, "The number of messages that should be in a given transaction")
5252

53-
compressionType = flag.String("compression-type", "", "One of gzip, snappy, lz4, zstd, or 'mixed' to pick a random codec for each producer")
54-
compressiblePayload = flag.Bool("compressible-payload", false, "If true, use a highly compressible payload instead of the default random payload")
53+
compressionType = flag.String("compression-type", "", "One of gzip, snappy, lz4, zstd, or 'mixed' to pick a random codec for each producer")
54+
compressiblePayload = flag.Bool("compressible-payload", false, "If true, use a highly compressible payload instead of the default random payload")
55+
tombstoneProbability = flag.Float64("tombstone-probability", 0.0, "The probability (between 0.0 and 1.0) that a record produced is a tombstone record.")
5556
)
5657

5758
// NewAdmin returns a franz-go admin client.
@@ -150,7 +151,7 @@ func main() {
150151
// it was refactored.
151152
wConfig := worker.NewWorkerConfig(
152153
name, *brokers, *trace, topicsList[0], *linger, *maxBufferedRecords, *useTransactions, *compressionType, *compressiblePayload, *username, *password, *enableTls)
153-
config := repeater.NewRepeaterConfig(wConfig, topicsList, *group, *keys, *payloadSize, dataInFlightPerWorker, rateLimitPerWorker)
154+
config := repeater.NewRepeaterConfig(wConfig, topicsList, *group, *keys, *payloadSize, dataInFlightPerWorker, rateLimitPerWorker, *tombstoneProbability)
154155
lv := repeater.NewWorker(config)
155156
if *useTransactions {
156157
tconfig := worker.NewTransactionSTMConfig(*transactionAbortRate, *msgsPerTransaction)
@@ -212,9 +213,9 @@ func main() {
212213
go func() {
213214
listenAddr := fmt.Sprintf("0.0.0.0:%d", *remotePort)
214215
if err := http.ListenAndServe(listenAddr, mux); err != nil {
215-
panic(fmt.Sprintf("failed to listen on %s: %v", listenAddr, err));
216+
panic(fmt.Sprintf("failed to listen on %s: %v", listenAddr, err))
216217
}
217-
}();
218+
}()
218219

219220
if !*remote {
220221
admin, err := NewAdmin()

cmd/kgo-verifier/main.go

Lines changed: 8 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -72,6 +72,9 @@ var (
7272

7373
tolerateDataLoss = flag.Bool("tolerate-data-loss", false, "If true, tolerate data-loss events")
7474
tolerateFailedProduce = flag.Bool("tolerate-failed-produce", false, "If true, tolerate and retry failed produce")
75+
tombstoneProbability = flag.Float64("tombstone-probability", 0.0, "The probability (between 0.0 and 1.0) that a record produced is a tombstone record.")
76+
compacted = flag.Bool("compacted", false, "Whether the topic to be verified is compacted or not. This will suppress warnings about offset gaps in consumed values.")
77+
validateLatestValues = flag.Bool("validate-latest-values", false, "If true, values consumed by a worker will be validated against the last produced value by a producer. This value should only be set if compaction has been allowed to fully de-duplicate the entirety of the log before consuming.")
7578
)
7679

7780
func makeWorkerConfig() worker.WorkerConfig {
@@ -92,6 +95,7 @@ func makeWorkerConfig() worker.WorkerConfig {
9295
TolerateDataLoss: *tolerateDataLoss,
9396
TolerateFailedProduce: *tolerateFailedProduce,
9497
Continuous: *continuous,
98+
ValidateLatestValues: *validateLatestValues,
9599
}
96100

97101
return c
@@ -245,7 +249,7 @@ func main() {
245249

246250
if *pCount > 0 {
247251
log.Info("Starting producer...")
248-
pwc := verifier.NewProducerConfig(makeWorkerConfig(), "producer", nPartitions, *mSize, *pCount, *fakeTimestampMs, *fakeTimestampStepMs, (*produceRateLimitBps), *keySetCardinality, *msgsPerProducerId)
252+
pwc := verifier.NewProducerConfig(makeWorkerConfig(), "producer", nPartitions, *mSize, *pCount, *fakeTimestampMs, *fakeTimestampStepMs, (*produceRateLimitBps), *keySetCardinality, *msgsPerProducerId, *tombstoneProbability)
249253
pw := verifier.NewProducerWorker(pwc)
250254

251255
if *useTransactions {
@@ -261,7 +265,7 @@ func main() {
261265
srw := verifier.NewSeqReadWorker(verifier.NewSeqReadConfig(
262266
makeWorkerConfig(), "sequential", nPartitions, *seqConsumeCount,
263267
(*consumeTputMb)*1024*1024,
264-
))
268+
), verifier.NewValidatorStatus(*compacted, *validateLatestValues, *topic, nPartitions))
265269
workers = append(workers, &srw)
266270

267271
for loopState.Next() {
@@ -280,7 +284,7 @@ func main() {
280284
workerCfg := verifier.NewRandomReadConfig(
281285
makeWorkerConfig(), fmt.Sprintf("random-%03d", i), nPartitions, *cCount,
282286
)
283-
worker := verifier.NewRandomReadWorker(workerCfg)
287+
worker := verifier.NewRandomReadWorker(workerCfg, verifier.NewValidatorStatus(*compacted, *validateLatestValues, *topic, nPartitions))
284288
randomWorkers = append(randomWorkers, &worker)
285289
workers = append(workers, &worker)
286290
}
@@ -309,7 +313,7 @@ func main() {
309313
grw := verifier.NewGroupReadWorker(
310314
verifier.NewGroupReadConfig(
311315
makeWorkerConfig(), *cgName, nPartitions, *cgReaders,
312-
*seqConsumeCount, (*consumeTputMb)*1024*1024))
316+
*seqConsumeCount, (*consumeTputMb)*1024*1024), verifier.NewValidatorStatus(*compacted, *validateLatestValues, *topic, nPartitions))
313317
workers = append(workers, &grw)
314318

315319
for loopState.Next() {

pkg/worker/repeater/repeater_worker.go

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -52,13 +52,13 @@ type RepeaterConfig struct {
5252
RateLimitBps int
5353
}
5454

55-
func NewRepeaterConfig(cfg worker.WorkerConfig, topics []string, group string, keys uint64, payloadSize uint64, dataInFlight uint64, rateLimitBps int) RepeaterConfig {
55+
func NewRepeaterConfig(cfg worker.WorkerConfig, topics []string, group string, keys uint64, payloadSize uint64, dataInFlight uint64, rateLimitBps int, tombstoneProbability float64) RepeaterConfig {
5656
return RepeaterConfig{
5757
workerCfg: cfg,
5858
Topics: topics,
5959
Group: group,
6060
KeySpace: worker.KeySpace{UniqueCount: keys},
61-
ValueGenerator: worker.ValueGenerator{PayloadSize: payloadSize, Compressible: cfg.CompressiblePayload},
61+
ValueGenerator: worker.ValueGenerator{PayloadSize: payloadSize, Compressible: cfg.CompressiblePayload, TombstoneProbability: tombstoneProbability},
6262
DataInFlight: dataInFlight,
6363
RateLimitBps: rateLimitBps,
6464
}

pkg/worker/verifier/group_read_worker.go

Lines changed: 7 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -49,10 +49,10 @@ type GroupReadWorker struct {
4949
Status GroupWorkerStatus
5050
}
5151

52-
func NewGroupReadWorker(cfg GroupReadConfig) GroupReadWorker {
52+
func NewGroupReadWorker(cfg GroupReadConfig, validatorStatus ValidatorStatus) GroupReadWorker {
5353
return GroupReadWorker{
5454
config: cfg,
55-
Status: GroupWorkerStatus{Topic: cfg.workerCfg.Topic},
55+
Status: GroupWorkerStatus{Topic: cfg.workerCfg.Topic, Validator: validatorStatus},
5656
}
5757
}
5858

@@ -240,6 +240,10 @@ func (grw *GroupReadWorker) consumerGroupReadInner(
240240
defer client.Close()
241241

242242
validRanges := LoadTopicOffsetRanges(grw.config.workerCfg.Topic, grw.config.nPartitions)
243+
var latestValuesProduced LatestValueMap
244+
if grw.Status.Validator.expectFullyCompacted {
245+
latestValuesProduced = LoadLatestValues(grw.config.workerCfg.Topic, grw.config.nPartitions)
246+
}
243247

244248
for {
245249
fetches := client.PollFetches(ctx)
@@ -271,7 +275,7 @@ func (grw *GroupReadWorker) consumerGroupReadInner(
271275
log.Debugf(
272276
"fiber %v: Consumer group read %s/%d o=%d...",
273277
fiberId, grw.config.workerCfg.Topic, r.Partition, r.Offset)
274-
grw.Status.Validator.ValidateRecord(r, &validRanges)
278+
grw.Status.Validator.ValidateRecord(r, &validRanges, &latestValuesProduced)
275279
// Will cancel the context if we have read everything
276280
cgOffsets.AddRecord(ctx, r)
277281
})
Lines changed: 92 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,92 @@
1+
package verifier
2+
3+
import (
4+
"encoding/json"
5+
"fmt"
6+
"io/ioutil"
7+
"os"
8+
9+
"github.com/redpanda-data/kgo-verifier/pkg/util"
10+
log "github.com/sirupsen/logrus"
11+
)
12+
13+
type LatestValueMap struct {
14+
topic string
15+
LatestKvByPartition []map[string]string
16+
}
17+
18+
func (lvm *LatestValueMap) Get(partition int32, key string) (value string, exists bool) {
19+
if partition < 0 || partition >= int32(len(lvm.LatestKvByPartition)) {
20+
log.Panicf("Partition %d out of bounds for latestValueMap of size %d", partition, len(lvm.LatestKvByPartition))
21+
}
22+
value, exists = lvm.LatestKvByPartition[partition][key]
23+
return
24+
}
25+
26+
func (lvm *LatestValueMap) Insert(partition int32, key string, value string) {
27+
lvm.LatestKvByPartition[partition][key] = value
28+
}
29+
30+
func latestValueMapFile(topic string) string {
31+
return fmt.Sprintf("latest_value_%s.json", topic)
32+
}
33+
34+
func (lvm *LatestValueMap) Store() error {
35+
log.Infof("LatestValueMap::Storing %s", latestValueMapFile(lvm.topic))
36+
37+
data, err := json.Marshal(lvm)
38+
if err != nil {
39+
return err
40+
}
41+
42+
tmp_file, err := ioutil.TempFile("./", "latest_value_*.tmp")
43+
if err != nil {
44+
return err
45+
}
46+
47+
_, err = tmp_file.Write(data)
48+
if err != nil {
49+
return err
50+
}
51+
52+
err = os.Rename(tmp_file.Name(), latestValueMapFile(lvm.topic))
53+
if err != nil {
54+
return err
55+
}
56+
57+
return nil
58+
}
59+
60+
func LoadLatestValues(topic string, nPartitions int32) LatestValueMap {
61+
data, err := ioutil.ReadFile(latestValueMapFile(topic))
62+
if err != nil {
63+
util.Die("Can't read topic latest value map: %v", err)
64+
}
65+
66+
var lvm LatestValueMap
67+
if len(data) > 0 {
68+
err = json.Unmarshal(data, &lvm)
69+
util.Chk(err, "Bad JSON %v", err)
70+
}
71+
72+
if int32(len(lvm.LatestKvByPartition)) > nPartitions {
73+
util.Die("More partitions in latest_value_map file than in topic!")
74+
} else if len(lvm.LatestKvByPartition) < int(nPartitions) {
75+
// Creating new partitions is allowed
76+
blanks := make([]map[string]string, nPartitions-int32(len(lvm.LatestKvByPartition)))
77+
lvm.LatestKvByPartition = append(lvm.LatestKvByPartition, blanks...)
78+
}
79+
log.Infof("Successfully read latest value map")
80+
return lvm
81+
}
82+
83+
func NewLatestValueMap(topic string, nPartitions int32) LatestValueMap {
84+
maps := make([]map[string]string, nPartitions)
85+
for i := range maps {
86+
maps[i] = make(map[string]string)
87+
}
88+
return LatestValueMap{
89+
topic: topic,
90+
LatestKvByPartition: maps,
91+
}
92+
}

pkg/worker/verifier/offset_ranges.go

Lines changed: 27 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -31,6 +31,14 @@ func LoadTopicOffsetRanges(topic string, nPartitions int32) TopicOffsetRanges {
3131
tors.PartitionRanges = append(tors.PartitionRanges, blanks...)
3232
}
3333

34+
if int32(len(tors.LastConsumableOffsets)) > nPartitions {
35+
util.Die("More partitions in valid_offsets file than in topic!")
36+
} else if len(tors.LastConsumableOffsets) < int(nPartitions) {
37+
// Creating new partitions is allowed
38+
blanks := make([]int64, nPartitions-int32(len(tors.LastConsumableOffsets)))
39+
tors.LastConsumableOffsets = append(tors.LastConsumableOffsets, blanks...)
40+
}
41+
3442
return tors
3543
}
3644
}
@@ -105,6 +113,15 @@ func (ors *OffsetRanges) Contains(o int64) bool {
105113
type TopicOffsetRanges struct {
106114
topic string
107115
PartitionRanges []OffsetRanges
116+
117+
// In the case that the topic being consumed from had tombstones produced,
118+
// the high watermark may be given by a tombstone record that has been removed.
119+
// In trying to consume until this point, readers will become stuck polling for
120+
// new records.
121+
AdjustConsumableOffsets bool
122+
// Persist the last consumable offset here to adjust the offset we attempt to read
123+
// up to in the read workers.
124+
LastConsumableOffsets []int64
108125
}
109126

110127
func (tors *TopicOffsetRanges) Insert(p int32, o int64) {
@@ -115,6 +132,10 @@ func (tors *TopicOffsetRanges) Contains(p int32, o int64) bool {
115132
return tors.PartitionRanges[p].Contains(o)
116133
}
117134

135+
func (tors *TopicOffsetRanges) SetLastConsumableOffset(p int32, o int64) {
136+
tors.LastConsumableOffsets[p] = o
137+
}
138+
118139
func topicOffsetRangeFile(topic string) string {
119140
return fmt.Sprintf("valid_offsets_%s.json", topic)
120141
}
@@ -153,8 +174,12 @@ func NewTopicOffsetRanges(topic string, nPartitions int32) TopicOffsetRanges {
153174
for _, or := range prs {
154175
or.Ranges = make([]OffsetRange, 0)
155176
}
177+
lcos := make([]int64, nPartitions)
178+
156179
return TopicOffsetRanges{
157-
topic: topic,
158-
PartitionRanges: prs,
180+
topic: topic,
181+
PartitionRanges: prs,
182+
AdjustConsumableOffsets: false,
183+
LastConsumableOffsets: lcos,
159184
}
160185
}

0 commit comments

Comments
 (0)