Skip to content

Commit 4d9c948

Browse files
committed
changefeedccl: add Protobuf vs JSON encoder benchmarks
This change adds benchmark testing for the recenlty added changefeed protobuf format. Release note: none Fixes: #149915
1 parent 658cecb commit 4d9c948

File tree

1 file changed

+69
-63
lines changed

1 file changed

+69
-63
lines changed

pkg/cmd/roachtest/tests/cdc_bench.go

Lines changed: 69 additions & 63 deletions
Original file line numberDiff line numberDiff line change
@@ -95,48 +95,52 @@ func registerCDCBench(r registry.Registry) {
9595
for _, scanType := range cdcBenchScanTypes {
9696
for _, ranges := range []int64{100, 100000} {
9797
const (
98-
nodes = 5 // excluding coordinator/workload node
99-
cpus = 16
100-
rows = 1_000_000_000 // 19 GB
101-
format = "json"
98+
nodes = 5 // excluding coordinator/workload node
99+
cpus = 16
100+
rows = 1_000_000_000 // 19 GB
102101
)
103102

104-
r.Add(registry.TestSpec{
105-
Name: fmt.Sprintf(
106-
"cdc/scan/%s/nodes=%d/cpu=%d/rows=%s/ranges=%s/protocol=mux/format=%s/sink=null",
107-
scanType, nodes, cpus, formatSI(rows), formatSI(ranges), format),
108-
Owner: registry.OwnerCDC,
109-
Benchmark: true,
110-
Cluster: r.MakeClusterSpec(nodes+1, spec.CPU(cpus)),
111-
CompatibleClouds: registry.AllExceptAWS,
112-
Suites: registry.Suites(registry.Weekly),
113-
Timeout: 4 * time.Hour, // Allow for the initial import and catchup scans with 100k ranges.
114-
Run: func(ctx context.Context, t test.Test, c cluster.Cluster) {
115-
runCDCBenchScan(ctx, t, c, scanType, rows, ranges, format)
116-
},
117-
PostProcessPerfMetrics: postProcessScanPerfMetrics,
118-
})
103+
for _, format := range []string{"json", "protobuf"} {
119104

120-
// Enriched envelope benchmarks, using the same parameters.
121-
for _, enrichedProperties := range []string{"none", "source", "source,schema"} {
122105
r.Add(registry.TestSpec{
123-
Name: fmt.Sprintf("cdc/scan/%s/ranges=%s/envelope=enriched/enriched_properties=%s",
124-
scanType, formatSI(ranges), enrichedProperties),
106+
Name: fmt.Sprintf(
107+
"cdc/scan/%s/nodes=%d/cpu=%d/rows=%s/ranges=%s/protocol=mux/format=%s/sink=null",
108+
scanType, nodes, cpus, formatSI(rows), formatSI(ranges), format),
125109
Owner: registry.OwnerCDC,
126110
Benchmark: true,
127111
Cluster: r.MakeClusterSpec(nodes+1, spec.CPU(cpus)),
128112
CompatibleClouds: registry.AllExceptAWS,
129113
Suites: registry.Suites(registry.Weekly),
130-
Timeout: 4 * time.Hour,
114+
Timeout: 4 * time.Hour, // Allow for the initial import and catchup scans with 100k ranges.
131115
Run: func(ctx context.Context, t test.Test, c cluster.Cluster) {
132-
otherOpts := []kv{{"envelope", "enriched"}}
133-
if enrichedProperties != "none" {
134-
otherOpts = append(otherOpts, kv{"enriched_properties", enrichedProperties})
135-
}
136-
runCDCBenchScan(ctx, t, c, scanType, rows, ranges, format, otherOpts...)
116+
runCDCBenchScan(ctx, t, c, scanType, rows, ranges, format)
137117
},
138118
PostProcessPerfMetrics: postProcessScanPerfMetrics,
139119
})
120+
121+
// Enriched envelope benchmarks, using the same parameters.
122+
if format == "json" { // Skip protobuf as enriched envelopes are not supported yet.
123+
for _, enrichedProperties := range []string{"none", "source", "source,schema"} {
124+
r.Add(registry.TestSpec{
125+
Name: fmt.Sprintf("cdc/scan/%s/ranges=%s/envelope=enriched/enriched_properties=%s",
126+
scanType, formatSI(ranges), enrichedProperties),
127+
Owner: registry.OwnerCDC,
128+
Benchmark: true,
129+
Cluster: r.MakeClusterSpec(nodes+1, spec.CPU(cpus)),
130+
CompatibleClouds: registry.AllExceptAWS,
131+
Suites: registry.Suites(registry.Weekly),
132+
Timeout: 4 * time.Hour,
133+
Run: func(ctx context.Context, t test.Test, c cluster.Cluster) {
134+
otherOpts := []kv{{"envelope", "enriched"}}
135+
if enrichedProperties != "none" {
136+
otherOpts = append(otherOpts, kv{"enriched_properties", enrichedProperties})
137+
}
138+
runCDCBenchScan(ctx, t, c, scanType, rows, ranges, format, otherOpts...)
139+
},
140+
PostProcessPerfMetrics: postProcessScanPerfMetrics,
141+
})
142+
}
143+
}
140144
}
141145
}
142146
}
@@ -148,9 +152,8 @@ func registerCDCBench(r registry.Registry) {
148152
for _, readPercent := range []int{0} {
149153
for _, ranges := range []int64{100, 100000} {
150154
const (
151-
nodes = 5 // excluding coordinator and workload nodes
152-
cpus = 16
153-
format = "json"
155+
nodes = 5 // excluding coordinator and workload nodes
156+
cpus = 16
154157
)
155158

156159
// Control run that only runs the workload, with no changefeed.
@@ -169,37 +172,40 @@ func registerCDCBench(r registry.Registry) {
169172
},
170173
})
171174

172-
// Workloads with a concurrent changefeed running.
173-
for _, server := range cdcBenchServers {
174-
r.Add(registry.TestSpec{
175-
Name: fmt.Sprintf(
176-
"cdc/workload/kv%d/nodes=%d/cpu=%d/ranges=%s/server=%s/protocol=mux/format=%s/sink=null",
177-
readPercent, nodes, cpus, formatSI(ranges), server, format),
178-
Owner: registry.OwnerCDC,
179-
Benchmark: true,
180-
Cluster: r.MakeClusterSpec(nodes+2, spec.CPU(cpus)),
181-
CompatibleClouds: registry.AllExceptAWS,
182-
Suites: registry.Suites(registry.Weekly),
183-
Timeout: time.Hour,
184-
Run: func(ctx context.Context, t test.Test, c cluster.Cluster) {
185-
runCDCBenchWorkload(ctx, t, c, ranges, readPercent, server, format, nullSink)
186-
},
187-
})
188-
189-
r.Add(registry.TestSpec{
190-
Name: fmt.Sprintf(
191-
"cdc/workload/kv%d/nodes=%d/cpu=%d/ranges=%s/server=%s/protocol=mux/format=%s/sink=kafka",
192-
readPercent, nodes, cpus, formatSI(ranges), server, format),
193-
Owner: registry.OwnerCDC,
194-
Benchmark: true,
195-
Cluster: r.MakeClusterSpec(nodes+3, spec.CPU(cpus)),
196-
CompatibleClouds: registry.AllExceptAWS,
197-
Suites: registry.Suites(registry.Weekly),
198-
Timeout: time.Hour,
199-
Run: func(ctx context.Context, t test.Test, c cluster.Cluster) {
200-
runCDCBenchWorkload(ctx, t, c, ranges, readPercent, server, format, kafkaSink)
201-
},
202-
})
175+
for _, format := range []string{"json", "protobuf"} {
176+
177+
// Workloads with a concurrent changefeed running.
178+
for _, server := range cdcBenchServers {
179+
r.Add(registry.TestSpec{
180+
Name: fmt.Sprintf(
181+
"cdc/workload/kv%d/nodes=%d/cpu=%d/ranges=%s/server=%s/protocol=mux/format=%s/sink=null",
182+
readPercent, nodes, cpus, formatSI(ranges), server, format),
183+
Owner: registry.OwnerCDC,
184+
Benchmark: true,
185+
Cluster: r.MakeClusterSpec(nodes+2, spec.CPU(cpus)),
186+
CompatibleClouds: registry.AllExceptAWS,
187+
Suites: registry.Suites(registry.Weekly),
188+
Timeout: time.Hour,
189+
Run: func(ctx context.Context, t test.Test, c cluster.Cluster) {
190+
runCDCBenchWorkload(ctx, t, c, ranges, readPercent, server, format, nullSink)
191+
},
192+
})
193+
194+
r.Add(registry.TestSpec{
195+
Name: fmt.Sprintf(
196+
"cdc/workload/kv%d/nodes=%d/cpu=%d/ranges=%s/server=%s/protocol=mux/format=%s/sink=kafka",
197+
readPercent, nodes, cpus, formatSI(ranges), server, format),
198+
Owner: registry.OwnerCDC,
199+
Benchmark: true,
200+
Cluster: r.MakeClusterSpec(nodes+3, spec.CPU(cpus)),
201+
CompatibleClouds: registry.AllExceptAWS,
202+
Suites: registry.Suites(registry.Weekly),
203+
Timeout: time.Hour,
204+
Run: func(ctx context.Context, t test.Test, c cluster.Cluster) {
205+
runCDCBenchWorkload(ctx, t, c, ranges, readPercent, server, format, kafkaSink)
206+
},
207+
})
208+
}
203209
}
204210
}
205211
}

0 commit comments

Comments
 (0)