@@ -1191,23 +1191,23 @@ func runCDCInitialScanRollingRestart(
1191
1191
}
1192
1192
}
1193
1193
1194
+ type fineGrainedCheckpointingParams struct {
1195
+ numRanges int
1196
+ transientErrorFrequency time.Duration
1197
+ rangeDelays []time.Duration
1198
+ maxVal int
1199
+ }
1200
+
1194
1201
// runCDCFineGrainedCheckpointingBenchmark runs a changefeed
1195
1202
// on a 4-node cluster, using node 1 as the coordinator. It will split the
1196
1203
// table into many ranges and start a sink which will be artificially slower
1197
1204
// on some of the ranges so that our fine grained checkpoints are exercised.
1198
1205
// This sink will also occasionally error which should force restarts and
1199
1206
// restore from these fine-grained checkpoints.
1200
1207
func runCDCFineGrainedCheckpointingBenchmark (
1201
- ctx context.Context ,
1202
- t test.Test ,
1203
- c cluster.Cluster ,
1204
- numRanges int ,
1205
- transientErrorFrequency time.Duration ,
1206
- rangeDelays []time.Duration ,
1207
- maxVal int ,
1208
- dupeLimit int ,
1208
+ ctx context.Context , t test.Test , c cluster.Cluster , params fineGrainedCheckpointingParams ,
1209
1209
) {
1210
- if len (rangeDelays ) > numRanges {
1210
+ if len (params . rangeDelays ) > params . numRanges {
1211
1211
t .Fatalf ("too many range delays provided" )
1212
1212
}
1213
1213
@@ -1287,11 +1287,11 @@ func runCDCFineGrainedCheckpointingBenchmark(
1287
1287
}
1288
1288
1289
1289
values := []string {}
1290
- for i := 0 ; i < numRanges ; i ++ {
1290
+ for i := 0 ; i < params . numRanges ; i ++ {
1291
1291
values = append (values , fmt .Sprintf ("(%d, 0)" , i * 10 ))
1292
1292
}
1293
1293
setupStmts = append (setupStmts , fmt .Sprintf ("INSERT INTO foo VALUES %s" , strings .Join (values , ", " )))
1294
- setupStmts = append (setupStmts , fmt .Sprintf ("ALTER TABLE foo SPLIT AT SELECT generate_series(0, %d, 10)" , numRanges * 10 ))
1294
+ setupStmts = append (setupStmts , fmt .Sprintf ("ALTER TABLE foo SPLIT AT SELECT generate_series(0, %d, 10)" , params . numRanges * 10 ))
1295
1295
1296
1296
for _ , s := range setupStmts {
1297
1297
t .L ().Printf (s )
@@ -1301,15 +1301,15 @@ func runCDCFineGrainedCheckpointingBenchmark(
1301
1301
}
1302
1302
1303
1303
delayStrings := []string {}
1304
- for _ , delay := range rangeDelays {
1304
+ for _ , delay := range params . rangeDelays {
1305
1305
delayStrings = append (delayStrings , fmt .Sprint (delay .Milliseconds ()))
1306
1306
}
1307
1307
1308
1308
// Run the sink server.
1309
1309
m .Go (func (ctx context.Context ) error {
1310
1310
t .L ().Printf ("starting up sink server at %s..." , sinkURL )
1311
1311
err := c .RunE (ctx , option .WithNodes (c .Node (c .Spec ().NodeCount )),
1312
- fmt .Sprintf ("./cockroach workload debug webhook-server-slow %d %s" , transientErrorFrequency .Milliseconds (), strings .Join (delayStrings , " " )))
1312
+ fmt .Sprintf ("./cockroach workload debug webhook-server-slow %d %s" , params . transientErrorFrequency .Milliseconds (), strings .Join (delayStrings , " " )))
1313
1313
if err != nil {
1314
1314
return err
1315
1315
}
@@ -1330,7 +1330,7 @@ func runCDCFineGrainedCheckpointingBenchmark(
1330
1330
}
1331
1331
1332
1332
var inserts []string
1333
- for i := 0 ; i < numRanges ; i ++ {
1333
+ for i := 0 ; i < params . numRanges ; i ++ {
1334
1334
for j := 1 ; j < 10 ; j ++ {
1335
1335
inserts = append (inserts , fmt .Sprintf ("(%d, 0)" , i * 10 + j ))
1336
1336
}
@@ -1341,7 +1341,7 @@ func runCDCFineGrainedCheckpointingBenchmark(
1341
1341
t .Fatal (err )
1342
1342
}
1343
1343
1344
- for c := 1 ; c <= maxVal ; c ++ {
1344
+ for c := 1 ; c <= params . maxVal ; c ++ {
1345
1345
if _ , err := db .Exec (fmt .Sprintf (
1346
1346
"UPDATE foo SET val = %d" , c )); err != nil {
1347
1347
t .Fatal (err )
@@ -1369,7 +1369,7 @@ func runCDCFineGrainedCheckpointingBenchmark(
1369
1369
// 10 keys per range are each updated maxVal + 1 times
1370
1370
// except for one key per range which is set to 0 before
1371
1371
// the changefeed starts and only updated maxVal times.
1372
- expected := 10 * numRanges * (maxVal + 1 ) - numRanges
1372
+ expected := 10 * params . numRanges * (params . maxVal + 1 ) - params . numRanges
1373
1373
t .L ().Printf ("expecting %d rows" , expected )
1374
1374
1375
1375
var dupes int
@@ -1388,11 +1388,6 @@ func runCDCFineGrainedCheckpointingBenchmark(
1388
1388
return fmt .Errorf ("expected %d, got %d" , expected , unique )
1389
1389
}
1390
1390
1391
- if dupes > dupeLimit {
1392
- t .Fatalf ("expected dupes <= %d, got %d" , dupeLimit , dupes )
1393
- return nil
1394
- }
1395
-
1396
1391
return nil
1397
1392
}, 30 * time .Minute )
1398
1393
@@ -1703,10 +1698,12 @@ func registerCDC(r registry.Registry) {
1703
1698
Cluster : r .MakeClusterSpec (4 ),
1704
1699
CompatibleClouds : registry .AllClouds ,
1705
1700
Suites : registry .Suites (registry .Nightly ),
1706
- Timeout : 15 * time .Minute ,
1701
+ Timeout : 30 * time .Minute ,
1707
1702
Run : func (ctx context.Context , t test.Test , c cluster.Cluster ) {
1708
- runCDCFineGrainedCheckpointingBenchmark (ctx , t , c , 1000 , 500 * time .Millisecond ,
1709
- []time.Duration {
1703
+ runCDCFineGrainedCheckpointingBenchmark (ctx , t , c , fineGrainedCheckpointingParams {
1704
+ numRanges : 1000 ,
1705
+ transientErrorFrequency : 500 * time .Millisecond ,
1706
+ rangeDelays : []time.Duration {
1710
1707
2 * time .Millisecond ,
1711
1708
4 * time .Millisecond ,
1712
1709
8 * time .Millisecond ,
@@ -1717,7 +1714,9 @@ func registerCDC(r registry.Registry) {
1717
1714
8 * time .Millisecond ,
1718
1715
16 * time .Millisecond ,
1719
1716
32 * time .Millisecond ,
1720
- }, 100 , 50000 )
1717
+ },
1718
+ maxVal : 100 ,
1719
+ })
1721
1720
},
1722
1721
})
1723
1722
r .Add (registry.TestSpec {
0 commit comments