@@ -95,48 +95,52 @@ func registerCDCBench(r registry.Registry) {
95
95
for _ , scanType := range cdcBenchScanTypes {
96
96
for _ , ranges := range []int64 {100 , 100000 } {
97
97
const (
98
- nodes = 5 // excluding coordinator/workload node
99
- cpus = 16
100
- rows = 1_000_000_000 // 19 GB
101
- format = "json"
98
+ nodes = 5 // excluding coordinator/workload node
99
+ cpus = 16
100
+ rows = 1_000_000_000 // 19 GB
102
101
)
103
102
104
- r .Add (registry.TestSpec {
105
- Name : fmt .Sprintf (
106
- "cdc/scan/%s/nodes=%d/cpu=%d/rows=%s/ranges=%s/protocol=mux/format=%s/sink=null" ,
107
- scanType , nodes , cpus , formatSI (rows ), formatSI (ranges ), format ),
108
- Owner : registry .OwnerCDC ,
109
- Benchmark : true ,
110
- Cluster : r .MakeClusterSpec (nodes + 1 , spec .CPU (cpus )),
111
- CompatibleClouds : registry .AllExceptAWS ,
112
- Suites : registry .Suites (registry .Weekly ),
113
- Timeout : 4 * time .Hour , // Allow for the initial import and catchup scans with 100k ranges.
114
- Run : func (ctx context.Context , t test.Test , c cluster.Cluster ) {
115
- runCDCBenchScan (ctx , t , c , scanType , rows , ranges , format )
116
- },
117
- PostProcessPerfMetrics : postProcessScanPerfMetrics ,
118
- })
103
+ for _ , format := range []string {"json" , "protobuf" } {
119
104
120
- // Enriched envelope benchmarks, using the same parameters.
121
- for _ , enrichedProperties := range []string {"none" , "source" , "source,schema" } {
122
105
r .Add (registry.TestSpec {
123
- Name : fmt .Sprintf ("cdc/scan/%s/ranges=%s/envelope=enriched/enriched_properties=%s" ,
124
- scanType , formatSI (ranges ), enrichedProperties ),
106
+ Name : fmt .Sprintf (
107
+ "cdc/scan/%s/nodes=%d/cpu=%d/rows=%s/ranges=%s/protocol=mux/format=%s/sink=null" ,
108
+ scanType , nodes , cpus , formatSI (rows ), formatSI (ranges ), format ),
125
109
Owner : registry .OwnerCDC ,
126
110
Benchmark : true ,
127
111
Cluster : r .MakeClusterSpec (nodes + 1 , spec .CPU (cpus )),
128
112
CompatibleClouds : registry .AllExceptAWS ,
129
113
Suites : registry .Suites (registry .Weekly ),
130
- Timeout : 4 * time .Hour ,
114
+ Timeout : 4 * time .Hour , // Allow for the initial import and catchup scans with 100k ranges.
131
115
Run : func (ctx context.Context , t test.Test , c cluster.Cluster ) {
132
- otherOpts := []kv {{"envelope" , "enriched" }}
133
- if enrichedProperties != "none" {
134
- otherOpts = append (otherOpts , kv {"enriched_properties" , enrichedProperties })
135
- }
136
- runCDCBenchScan (ctx , t , c , scanType , rows , ranges , format , otherOpts ... )
116
+ runCDCBenchScan (ctx , t , c , scanType , rows , ranges , format )
137
117
},
138
118
PostProcessPerfMetrics : postProcessScanPerfMetrics ,
139
119
})
120
+
121
+ // Enriched envelope benchmarks, using the same parameters.
122
+ if format == "json" { // Skip protobuf as enriched envelopes are not supported yet.
123
+ for _ , enrichedProperties := range []string {"none" , "source" , "source,schema" } {
124
+ r .Add (registry.TestSpec {
125
+ Name : fmt .Sprintf ("cdc/scan/%s/ranges=%s/envelope=enriched/enriched_properties=%s" ,
126
+ scanType , formatSI (ranges ), enrichedProperties ),
127
+ Owner : registry .OwnerCDC ,
128
+ Benchmark : true ,
129
+ Cluster : r .MakeClusterSpec (nodes + 1 , spec .CPU (cpus )),
130
+ CompatibleClouds : registry .AllExceptAWS ,
131
+ Suites : registry .Suites (registry .Weekly ),
132
+ Timeout : 4 * time .Hour ,
133
+ Run : func (ctx context.Context , t test.Test , c cluster.Cluster ) {
134
+ otherOpts := []kv {{"envelope" , "enriched" }}
135
+ if enrichedProperties != "none" {
136
+ otherOpts = append (otherOpts , kv {"enriched_properties" , enrichedProperties })
137
+ }
138
+ runCDCBenchScan (ctx , t , c , scanType , rows , ranges , format , otherOpts ... )
139
+ },
140
+ PostProcessPerfMetrics : postProcessScanPerfMetrics ,
141
+ })
142
+ }
143
+ }
140
144
}
141
145
}
142
146
}
@@ -148,9 +152,8 @@ func registerCDCBench(r registry.Registry) {
148
152
for _ , readPercent := range []int {0 } {
149
153
for _ , ranges := range []int64 {100 , 100000 } {
150
154
const (
151
- nodes = 5 // excluding coordinator and workload nodes
152
- cpus = 16
153
- format = "json"
155
+ nodes = 5 // excluding coordinator and workload nodes
156
+ cpus = 16
154
157
)
155
158
156
159
// Control run that only runs the workload, with no changefeed.
@@ -169,37 +172,40 @@ func registerCDCBench(r registry.Registry) {
169
172
},
170
173
})
171
174
172
- // Workloads with a concurrent changefeed running.
173
- for _ , server := range cdcBenchServers {
174
- r .Add (registry.TestSpec {
175
- Name : fmt .Sprintf (
176
- "cdc/workload/kv%d/nodes=%d/cpu=%d/ranges=%s/server=%s/protocol=mux/format=%s/sink=null" ,
177
- readPercent , nodes , cpus , formatSI (ranges ), server , format ),
178
- Owner : registry .OwnerCDC ,
179
- Benchmark : true ,
180
- Cluster : r .MakeClusterSpec (nodes + 2 , spec .CPU (cpus )),
181
- CompatibleClouds : registry .AllExceptAWS ,
182
- Suites : registry .Suites (registry .Weekly ),
183
- Timeout : time .Hour ,
184
- Run : func (ctx context.Context , t test.Test , c cluster.Cluster ) {
185
- runCDCBenchWorkload (ctx , t , c , ranges , readPercent , server , format , nullSink )
186
- },
187
- })
188
-
189
- r .Add (registry.TestSpec {
190
- Name : fmt .Sprintf (
191
- "cdc/workload/kv%d/nodes=%d/cpu=%d/ranges=%s/server=%s/protocol=mux/format=%s/sink=kafka" ,
192
- readPercent , nodes , cpus , formatSI (ranges ), server , format ),
193
- Owner : registry .OwnerCDC ,
194
- Benchmark : true ,
195
- Cluster : r .MakeClusterSpec (nodes + 3 , spec .CPU (cpus )),
196
- CompatibleClouds : registry .AllExceptAWS ,
197
- Suites : registry .Suites (registry .Weekly ),
198
- Timeout : time .Hour ,
199
- Run : func (ctx context.Context , t test.Test , c cluster.Cluster ) {
200
- runCDCBenchWorkload (ctx , t , c , ranges , readPercent , server , format , kafkaSink )
201
- },
202
- })
175
+ for _ , format := range []string {"json" , "protobuf" } {
176
+
177
+ // Workloads with a concurrent changefeed running.
178
+ for _ , server := range cdcBenchServers {
179
+ r .Add (registry.TestSpec {
180
+ Name : fmt .Sprintf (
181
+ "cdc/workload/kv%d/nodes=%d/cpu=%d/ranges=%s/server=%s/protocol=mux/format=%s/sink=null" ,
182
+ readPercent , nodes , cpus , formatSI (ranges ), server , format ),
183
+ Owner : registry .OwnerCDC ,
184
+ Benchmark : true ,
185
+ Cluster : r .MakeClusterSpec (nodes + 2 , spec .CPU (cpus )),
186
+ CompatibleClouds : registry .AllExceptAWS ,
187
+ Suites : registry .Suites (registry .Weekly ),
188
+ Timeout : time .Hour ,
189
+ Run : func (ctx context.Context , t test.Test , c cluster.Cluster ) {
190
+ runCDCBenchWorkload (ctx , t , c , ranges , readPercent , server , format , nullSink )
191
+ },
192
+ })
193
+
194
+ r .Add (registry.TestSpec {
195
+ Name : fmt .Sprintf (
196
+ "cdc/workload/kv%d/nodes=%d/cpu=%d/ranges=%s/server=%s/protocol=mux/format=%s/sink=kafka" ,
197
+ readPercent , nodes , cpus , formatSI (ranges ), server , format ),
198
+ Owner : registry .OwnerCDC ,
199
+ Benchmark : true ,
200
+ Cluster : r .MakeClusterSpec (nodes + 3 , spec .CPU (cpus )),
201
+ CompatibleClouds : registry .AllExceptAWS ,
202
+ Suites : registry .Suites (registry .Weekly ),
203
+ Timeout : time .Hour ,
204
+ Run : func (ctx context.Context , t test.Test , c cluster.Cluster ) {
205
+ runCDCBenchWorkload (ctx , t , c , ranges , readPercent , server , format , kafkaSink )
206
+ },
207
+ })
208
+ }
203
209
}
204
210
}
205
211
}
0 commit comments