Skip to content
This repository was archived by the owner on Jul 30, 2025. It is now read-only.

Commit dee8b80

Browse files
authored
Merge pull request #64 from dotnwat/random-data-produce
Random data produce
2 parents 8c57630 + 104fd4d commit dee8b80

File tree

4 files changed

+168
-41
lines changed

4 files changed

+168
-41
lines changed

cmd/kgo-verifier/main.go

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -75,6 +75,8 @@ var (
7575
tombstoneProbability = flag.Float64("tombstone-probability", 0.0, "The probability (between 0.0 and 1.0) that a record produced is a tombstone record.")
7676
compacted = flag.Bool("compacted", false, "Whether the topic to be verified is compacted or not. This will suppress warnings about offset gaps in consumed values.")
7777
validateLatestValues = flag.Bool("validate-latest-values", false, "If true, values consumed by a worker will be validated against the last produced value by a producer. This value should only be set if compaction has been allowed to fully de-duplicate the entirety of the log before consuming.")
78+
79+
produceRandomBytes = flag.Bool("produce-random-bytes", false, "If true, when generating random values, generate random bytes rather than random ascii")
7880
)
7981

8082
func makeWorkerConfig() worker.WorkerConfig {
@@ -249,7 +251,7 @@ func main() {
249251

250252
if *pCount > 0 {
251253
log.Info("Starting producer...")
252-
pwc := verifier.NewProducerConfig(makeWorkerConfig(), "producer", nPartitions, *mSize, *pCount, *fakeTimestampMs, *fakeTimestampStepMs, (*produceRateLimitBps), *keySetCardinality, *msgsPerProducerId, *tombstoneProbability)
254+
pwc := verifier.NewProducerConfig(makeWorkerConfig(), "producer", nPartitions, *mSize, *pCount, *fakeTimestampMs, *fakeTimestampStepMs, (*produceRateLimitBps), *keySetCardinality, *msgsPerProducerId, *tombstoneProbability, *produceRandomBytes)
253255
pw := verifier.NewProducerWorker(pwc)
254256

255257
if *useTransactions {

pkg/worker/verifier/producer_worker.go

Lines changed: 8 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -35,7 +35,7 @@ type ProducerConfig struct {
3535
}
3636

3737
func NewProducerConfig(wc worker.WorkerConfig, name string, nPartitions int32,
38-
messageSize int, messageCount int, fakeTimestampMs int64, fakeTimestampStepMs int64, rateLimitBytes int, keySetCardinality int, messagesPerProducerId int, tombstoneProbability float64) ProducerConfig {
38+
messageSize int, messageCount int, fakeTimestampMs int64, fakeTimestampStepMs int64, rateLimitBytes int, keySetCardinality int, messagesPerProducerId int, tombstoneProbability float64, produceRandomBytes bool) ProducerConfig {
3939
return ProducerConfig{
4040
workerCfg: wc,
4141
name: name,
@@ -52,6 +52,7 @@ func NewProducerConfig(wc worker.WorkerConfig, name string, nPartitions int32,
5252
PayloadSize: uint64(messageSize),
5353
Compressible: wc.CompressiblePayload,
5454
TombstoneProbability: tombstoneProbability,
55+
ProduceRandomBytes: produceRandomBytes,
5556
},
5657
}
5758
}
@@ -62,8 +63,6 @@ type ProducerWorker struct {
6263
validOffsets TopicOffsetRanges
6364
latestValueProduced LatestValueMap
6465

65-
payload []byte
66-
6766
// Used for enabling transactional produces
6867
transactionsEnabled bool
6968
transactionSTMConfig worker.TransactionSTMConfig
@@ -94,7 +93,6 @@ func NewProducerWorker(cfg ProducerConfig) ProducerWorker {
9493
Status: NewProducerWorkerStatus(cfg.workerCfg.Topic),
9594
latestValueProduced: NewLatestValueMap(cfg.workerCfg.Topic, cfg.nPartitions),
9695
validOffsets: validOffsets,
97-
payload: cfg.valueGenerator.Generate(),
9896
churnProducers: cfg.messagesPerProducerId > 0,
9997
tolerateDataLoss: cfg.workerCfg.TolerateDataLoss,
10098
tolerateFailedProduce: cfg.workerCfg.TolerateFailedProduce,
@@ -135,8 +133,13 @@ func (pw *ProducerWorker) newRecord(producerId int, sequence int64) *kgo.Record
135133
value.Write(make([]byte, paddingSize))
136134
}
137135
payload = value.Bytes()
136+
} else if pw.config.valueGenerator.Compressible {
137+
payload = pw.config.valueGenerator.GenerateCompressible()
138+
} else if pw.config.valueGenerator.ProduceRandomBytes {
139+
payload = make([]byte, pw.config.valueGenerator.PayloadSize)
140+
rand.Read(payload)
138141
} else {
139-
payload = make([]byte, pw.config.messageSize)
142+
payload = pw.config.valueGenerator.GenerateRandom()
140143
}
141144
}
142145

pkg/worker/worker.go

Lines changed: 57 additions & 35 deletions
Original file line numberDiff line numberDiff line change
@@ -103,52 +103,74 @@ type ValueGenerator struct {
103103
PayloadSize uint64
104104
Compressible bool
105105
TombstoneProbability float64
106+
RandomData []byte
107+
ProduceRandomBytes bool
106108
}
107109

108110
var compressible_payload []byte
109111

112+
func min(vars ...int) int {
113+
v := vars[0]
114+
for _, i := range vars {
115+
if i < v {
116+
v = i
117+
}
118+
}
119+
return v
120+
}
121+
122+
func (vg *ValueGenerator) GenerateRandom() []byte {
123+
if vg.RandomData == nil {
124+
// 2mb should be enough
125+
data := make([]byte, 1<<21)
126+
for i := range data {
127+
// printable ascii range
128+
data[i] = byte(rand.Intn(126+1-32) + 32)
129+
}
130+
vg.RandomData = data
131+
}
132+
frag_size := 1024 // didn't seem to have much perf impact
133+
size := int(vg.PayloadSize)
134+
res := make([]byte, 0, size)
135+
for {
136+
remaining := size - len(res)
137+
if remaining == 0 {
138+
break
139+
}
140+
start := rand.Intn(size)
141+
view := vg.RandomData[start:]
142+
end := min(len(view), remaining, frag_size)
143+
res = append(res, view[:end]...)
144+
}
145+
return res
146+
}
147+
148+
func (vg *ValueGenerator) GenerateCompressible() []byte {
149+
// Zeros, which is about as compressible as an array can be.
150+
if len(compressible_payload) == 0 {
151+
compressible_payload = make([]byte, vg.PayloadSize)
152+
} else if len(compressible_payload) != int(vg.PayloadSize) {
153+
// This is an implementation shortcut that lets us use a simple
154+
// global array of zeros for compressible payloads, as long
155+
// as everyone wants the same size.
156+
panic("Can't have multiple compressible generators of different sizes")
157+
}
158+
159+
// Everyone who asks for compressible payload gets a ref to the same array
160+
// of zeros: this is worthwhile because a compressible producer might do
161+
// huge message sizes (e.g. 128MIB of zeros compresses down to <1MiB.
162+
return compressible_payload
163+
}
164+
110165
func (vg *ValueGenerator) Generate() []byte {
111166
isTombstone := rand.Float64() < vg.TombstoneProbability
112167
if isTombstone {
113168
return nil
114169
}
115170
if vg.Compressible {
116-
// Zeros, which is about as compressible as an array can be.
117-
if len(compressible_payload) == 0 {
118-
compressible_payload = make([]byte, vg.PayloadSize)
119-
} else if len(compressible_payload) != int(vg.PayloadSize) {
120-
// This is an implementation shortcut that lets us use a simple
121-
// global array of zeros for compressible payloads, as long
122-
// as everyone wants the same size.
123-
panic("Can't have multiple compressible generators of different sizes")
124-
}
125-
126-
// Everyone who asks for compressible payload gets a ref to the same array
127-
// of zeros: this is worthwhile because a compressible producer might do
128-
// huge message sizes (e.g. 128MIB of zeros compresses down to <1MiB.
129-
return compressible_payload
171+
return vg.GenerateCompressible()
130172
} else {
131-
randBytes := make([]byte, vg.PayloadSize)
132-
// An incompressible high entropy payload. This will likely not be UTF-8 decodable.
133-
n, err := rand.Read(randBytes)
134-
if err != nil {
135-
panic(err.Error())
136-
}
137-
if n != int(vg.PayloadSize) {
138-
panic("Unexpected byte count from rand.Read")
139-
}
140-
// Convert to a valid UTF-8 string, replacing bad chars with " ".
141-
// A valid UTF-8 string is needed to avoid any decoding issues
142-
// for services on the consuming end.
143-
payload := []byte(strings.ToValidUTF8(string(randBytes), " "))
144-
145-
// In converting to valid UTF-8, we may have lost some bytes.
146-
// Append back the difference.
147-
diff := int(vg.PayloadSize) - len(payload)
148-
if diff > 0 {
149-
payload = append(payload, make([]byte, diff)...)
150-
}
151-
return payload
173+
return vg.GenerateRandom()
152174
}
153175
}
154176

pkg/worker/worker_test.go

Lines changed: 100 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,100 @@
1+
package worker
2+
3+
import (
4+
"math/rand"
5+
"strings"
6+
"testing"
7+
)
8+
9+
type state struct {
10+
data []byte
11+
}
12+
13+
func newstate(size int) state {
14+
data := make([]byte, size)
15+
for i := range data {
16+
// printable ascii range
17+
data[i] = byte(rand.Intn(126+1-32) + 32)
18+
}
19+
return state{
20+
data: data,
21+
}
22+
}
23+
24+
func (s *state) generate(size, frag_size int) []byte {
25+
res := make([]byte, 0, size)
26+
for {
27+
remaining := size - len(res)
28+
if remaining == 0 {
29+
break
30+
}
31+
start := rand.Intn(size)
32+
view := s.data[start:]
33+
end := min(len(view), remaining, frag_size)
34+
res = append(res, view[:end]...)
35+
}
36+
return res
37+
}
38+
39+
func benchmark_old_random_payload(i int, b *testing.B) {
40+
for n := 0; n < b.N; n++ {
41+
randBytes := make([]byte, i)
42+
// An incompressible high entropy payload. This will likely not be UTF-8 decodable.
43+
n, err := rand.Read(randBytes)
44+
if err != nil {
45+
panic(err.Error())
46+
}
47+
if n != int(i) {
48+
panic("Unexpected byte count from rand.Read")
49+
}
50+
// Convert to a valid UTF-8 string, replacing bad chars with " ".
51+
// A valid UTF-8 string is needed to avoid any decoding issues
52+
// for services on the consuming end.
53+
payload := []byte(strings.ToValidUTF8(string(randBytes), " "))
54+
55+
// In converting to valid UTF-8, we may have lost some bytes.
56+
// Append back the difference.
57+
diff := int(i) - len(payload)
58+
if diff > 0 {
59+
payload = append(payload, make([]byte, diff)...)
60+
}
61+
}
62+
}
63+
64+
func benchmark_random_payload(i int, b *testing.B) {
65+
s := newstate(1 << 22)
66+
b.ResetTimer()
67+
for n := 0; n < b.N; n++ {
68+
s.generate(i, 1024)
69+
}
70+
}
71+
72+
func benchmark_empty_payload(i int, b *testing.B) {
73+
gen := func() []byte {
74+
return make([]byte, i)
75+
}
76+
for n := 0; n < b.N; n++ {
77+
gen()
78+
}
79+
}
80+
81+
func Benchmark_old_random_payload1(b *testing.B) { benchmark_old_random_payload(10, b) }
82+
func Benchmark_old_random_payload2(b *testing.B) { benchmark_old_random_payload(100, b) }
83+
func Benchmark_old_random_payload3(b *testing.B) { benchmark_old_random_payload(1000, b) }
84+
func Benchmark_old_random_payload10(b *testing.B) { benchmark_old_random_payload(10000, b) }
85+
func Benchmark_old_random_payload20(b *testing.B) { benchmark_old_random_payload(100000, b) }
86+
func Benchmark_old_random_payload40(b *testing.B) { benchmark_old_random_payload(1000000, b) }
87+
88+
func Benchmark_random_payload1(b *testing.B) { benchmark_random_payload(10, b) }
89+
func Benchmark_random_payload2(b *testing.B) { benchmark_random_payload(100, b) }
90+
func Benchmark_random_payload3(b *testing.B) { benchmark_random_payload(1000, b) }
91+
func Benchmark_random_payload10(b *testing.B) { benchmark_random_payload(10000, b) }
92+
func Benchmark_random_payload20(b *testing.B) { benchmark_random_payload(100000, b) }
93+
func Benchmark_random_payload40(b *testing.B) { benchmark_random_payload(1000000, b) }
94+
95+
func Benchmark_empty_payload1(b *testing.B) { benchmark_empty_payload(10, b) }
96+
func Benchmark_empty_payload2(b *testing.B) { benchmark_empty_payload(100, b) }
97+
func Benchmark_empty_payload3(b *testing.B) { benchmark_empty_payload(1000, b) }
98+
func Benchmark_empty_payload10(b *testing.B) { benchmark_empty_payload(10000, b) }
99+
func Benchmark_empty_payload20(b *testing.B) { benchmark_empty_payload(100000, b) }
100+
func Benchmark_empty_payload40(b *testing.B) { benchmark_empty_payload(1000000, b) }

0 commit comments

Comments
 (0)