@@ -783,11 +783,17 @@ func newCDCTester(ctx context.Context, t test.Test, c cluster.Cluster, opts ...o
783
783
settings .ClusterSettings ["changefeed.slow_span_log_threshold" ] = "30s"
784
784
settings .ClusterSettings ["server.child_metrics.enabled" ] = "true"
785
785
786
- // Randomly set a quantization interval since metamorphic settings
787
- // don't extend to roachtests.
788
- quantization := fmt .Sprintf ("%ds" , rand .Intn (30 ))
789
- settings .ClusterSettings ["changefeed.resolved_timestamp.granularity" ] = quantization
790
- t .Status (fmt .Sprintf ("changefeed.resolved_timestamp.granularity: %s" , quantization ))
786
+ // Set cluster settings that we want to test metamorphically to random values
787
+ // since metamorphic settings don't extend to roachtests.
788
+ {
789
+ quantization := fmt .Sprintf ("%ds" , rand .Intn (30 ))
790
+ settings .ClusterSettings ["changefeed.resolved_timestamp.granularity" ] = quantization
791
+ t .Status (fmt .Sprintf ("changefeed.resolved_timestamp.granularity: %s" , quantization ))
792
+
793
+ perTableTracking := fmt .Sprintf ("%t" , rand .Intn (2 ) == 0 )
794
+ settings .ClusterSettings ["changefeed.progress.per_table_tracking.enabled" ] = perTableTracking
795
+ t .Status (fmt .Sprintf ("changefeed.progress.per_table_tracking.enabled: %s" , perTableTracking ))
796
+ }
791
797
792
798
settings .Env = append (settings .Env , envVars ... )
793
799
@@ -3106,7 +3112,6 @@ func registerCDC(r registry.Registry) {
3106
3112
CompatibleClouds : registry .AllExceptIBM ,
3107
3113
Run : runMessageTooLarge ,
3108
3114
})
3109
-
3110
3115
for _ , perTablePTS := range []bool {false , true } {
3111
3116
for _ , config := range []struct {
3112
3117
numTables int
@@ -3138,17 +3143,30 @@ func registerCDC(r registry.Registry) {
3138
3143
})
3139
3144
}
3140
3145
}
3141
- for _ , interval := range []string {"30s" , "5m" , "10m" } {
3142
- for _ , perTableTracking := range []bool {false , true } {
3146
+ for _ , interval := range []string {
3147
+ "5s" , // min interval
3148
+ "30s" , // default interval
3149
+ "10m" , // max interval
3150
+ } {
3151
+ for _ , cfg := range []struct {
3152
+ tables int
3153
+ ranges int
3154
+ }{
3155
+ {tables : 1 , ranges : 10_000 },
3156
+ {tables : 10 , ranges : 1_000 },
3157
+ {tables : 100 , ranges : 100 },
3158
+ {tables : 1_000 , ranges : 10 },
3159
+ {tables : 10_000 , ranges : 1 },
3160
+ } {
3143
3161
r .Add (registry.TestSpec {
3144
3162
Name : "cdc/frontier-persistence-benchmark" +
3145
- fmt .Sprintf ("/interval=%s/per-table-tracking=%t " , interval , perTableTracking ),
3163
+ fmt .Sprintf ("/interval=%s/tables=%d/ranges=%d " , interval , cfg . tables , cfg . ranges ),
3146
3164
Owner : registry .OwnerCDC ,
3147
3165
Benchmark : true ,
3148
3166
Cluster : r .MakeClusterSpec (4 , spec .CPU (16 ), spec .WorkloadNode ()),
3149
3167
CompatibleClouds : registry .AllClouds ,
3150
3168
Suites : registry .Suites (registry .Nightly ),
3151
- Timeout : time .Hour ,
3169
+ Timeout : 2 * time .Hour ,
3152
3170
Run : func (ctx context.Context , t test.Test , c cluster.Cluster ) {
3153
3171
ct := newCDCTester (ctx , t , c )
3154
3172
defer ct .Close ()
@@ -3158,7 +3176,6 @@ func registerCDC(r registry.Registry) {
3158
3176
// Configure various cluster settings.
3159
3177
for name , value := range map [string ]string {
3160
3178
"changefeed.progress.frontier_persistence.interval" : fmt .Sprintf ("'%s'" , interval ),
3161
- "changefeed.progress.per_table_tracking.enabled" : fmt .Sprintf ("%t" , perTableTracking ),
3162
3179
// Disable span-level checkpointing since it's not necessary
3163
3180
// when frontier persistence is on.
3164
3181
"changefeed.span_checkpoint.interval" : "'0'" ,
@@ -3171,13 +3188,13 @@ func registerCDC(r registry.Registry) {
3171
3188
}
3172
3189
}
3173
3190
3174
- // Initialize bank workload with multiple tables.
3175
- numTables := 1_000
3176
- numRows := 1_000
3177
- numRanges := 10
3191
+ // Initialize bank workload with multiple tables with multiple ranges .
3192
+ // Each range will have a single row (or 2 when there's a single range)
3193
+ // to maximize the likelihood of unmerged spans in the span frontier.
3194
+ rows := max ( cfg . ranges , 2 )
3178
3195
initCmd := fmt .Sprintf (
3179
- "./cockroach workload init bank --rows =%d --ranges=%d --tables =%d {pgurl%s}" ,
3180
- numRows , numRanges , numTables , ct .crdbNodes .RandNode ())
3196
+ "./cockroach workload init bank --tables =%d --ranges=%d --rows =%d {pgurl%s}" ,
3197
+ cfg . tables , cfg . ranges , rows , ct .crdbNodes .RandNode ())
3181
3198
if err := c .RunE (ctx , option .WithNodes (ct .workloadNode ), initCmd ); err != nil {
3182
3199
t .Fatalf ("failed to initialize bank tables: %v" , err )
3183
3200
}
@@ -3187,20 +3204,24 @@ func registerCDC(r registry.Registry) {
3187
3204
ct .mon .Go (func (ctx context.Context ) error {
3188
3205
defer ct .workloadWg .Done ()
3189
3206
workloadCmd := fmt .Sprintf (
3190
- "./cockroach workload run bank --rows =%d --duration=30m --tables =%d {pgurl%s}" ,
3191
- numRows , numTables , ct .crdbNodes )
3207
+ "./cockroach workload run bank --tables =%d --ranges=%d --rows =%d --duration=30m {pgurl%s}" ,
3208
+ cfg . tables , cfg . ranges , rows , ct .crdbNodes )
3192
3209
return c .RunE (ctx , option .WithNodes (ct .workloadNode ), workloadCmd )
3193
3210
})
3194
3211
3195
3212
// Create changefeed targeting all the bank tables.
3196
- targetNames := make ([]string , 0 , numTables )
3197
- for i := range numTables {
3198
- targetNames = append (targetNames , fmt .Sprintf ("bank.bank_%d" , i ))
3213
+ targets := make ([]string , cfg .tables )
3214
+ if cfg .tables == 1 {
3215
+ targets [0 ] = "bank.bank"
3216
+ } else {
3217
+ for i := range targets {
3218
+ targets [i ] = fmt .Sprintf ("bank.bank_%d" , i )
3219
+ }
3199
3220
}
3200
3221
3201
3222
feed := ct .newChangefeed (feedArgs {
3202
3223
sinkType : nullSink ,
3203
- targets : targetNames ,
3224
+ targets : targets ,
3204
3225
opts : map [string ]string {
3205
3226
"initial_scan" : "'no'" ,
3206
3227
"resolved" : "'3s'" ,
0 commit comments