Skip to content

Commit 8fdaf89

Browse files
committed
roachtest: add benchmark for changefeed frontier persistence
Release note: None
1 parent 21e9887 commit 8fdaf89

File tree

1 file changed

+79
-0
lines changed

1 file changed

+79
-0
lines changed

pkg/cmd/roachtest/tests/cdc.go

Lines changed: 79 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -2941,6 +2941,85 @@ func registerCDC(r registry.Registry) {
29412941
})
29422942
}
29432943
}
2944+
for _, interval := range []string{"30s", "5m", "10m"} {
2945+
for _, perTableTracking := range []bool{false, true} {
2946+
r.Add(registry.TestSpec{
2947+
Name: "cdc/frontier-persistence-benchmark" +
2948+
fmt.Sprintf("/interval=%s/per-table-tracking=%t", interval, perTableTracking),
2949+
Owner: registry.OwnerCDC,
2950+
Benchmark: true,
2951+
Cluster: r.MakeClusterSpec(4, spec.CPU(16), spec.WorkloadNode()),
2952+
CompatibleClouds: registry.AllClouds,
2953+
Suites: registry.Suites(registry.Nightly),
2954+
Timeout: time.Hour,
2955+
Run: func(ctx context.Context, t test.Test, c cluster.Cluster) {
2956+
ct := newCDCTester(ctx, t, c)
2957+
defer ct.Close()
2958+
2959+
db := ct.DB()
2960+
2961+
// Configure various cluster settings.
2962+
for name, value := range map[string]string{
2963+
"changefeed.progress.frontier_persistence.interval": fmt.Sprintf("'%s'", interval),
2964+
"changefeed.progress.per_table_tracking.enabled": fmt.Sprintf("%t", perTableTracking),
2965+
// Disable span-level checkpointing since it's not necessary
2966+
// when frontier persistence is on.
2967+
"changefeed.span_checkpoint.interval": "'0'",
2968+
// Disable per-table PTS to avoid impact on results.
2969+
"changefeed.protected_timestamp.per_table.enabled": "false",
2970+
} {
2971+
stmt := fmt.Sprintf(`SET CLUSTER SETTING %s = %s`, name, value)
2972+
if _, err := db.ExecContext(ctx, stmt); err != nil {
2973+
t.Fatalf("failed to run %q: %v", stmt, err)
2974+
}
2975+
}
2976+
2977+
// Initialize bank workload with multiple tables.
2978+
numTables := 1_000
2979+
numRows := 1_000
2980+
numRanges := 10
2981+
initCmd := fmt.Sprintf(
2982+
"./cockroach workload init bank --rows=%d --ranges=%d --tables=%d {pgurl%s}",
2983+
numRows, numRanges, numTables, ct.crdbNodes.RandNode())
2984+
if err := c.RunE(ctx, option.WithNodes(ct.workloadNode), initCmd); err != nil {
2985+
t.Fatalf("failed to initialize bank tables: %v", err)
2986+
}
2987+
2988+
// Run bank workload.
2989+
ct.workloadWg.Add(1)
2990+
ct.mon.Go(func(ctx context.Context) error {
2991+
defer ct.workloadWg.Done()
2992+
workloadCmd := fmt.Sprintf(
2993+
"./cockroach workload run bank --rows=%d --duration=30m --tables=%d {pgurl%s}",
2994+
numRows, numTables, ct.crdbNodes)
2995+
return c.RunE(ctx, option.WithNodes(ct.workloadNode), workloadCmd)
2996+
})
2997+
2998+
// Create changefeed targeting all the bank tables.
2999+
targetNames := make([]string, 0, numTables)
3000+
for i := range numTables {
3001+
targetNames = append(targetNames, fmt.Sprintf("bank.bank_%d", i))
3002+
}
3003+
3004+
feed := ct.newChangefeed(feedArgs{
3005+
sinkType: nullSink,
3006+
targets: targetNames,
3007+
opts: map[string]string{
3008+
"initial_scan": "'no'",
3009+
"resolved": "'3s'",
3010+
"min_checkpoint_frequency": "'30s'",
3011+
},
3012+
})
3013+
3014+
ct.runFeedLatencyVerifier(feed, latencyTargets{
3015+
steadyLatency: 2 * time.Minute,
3016+
})
3017+
3018+
ct.waitForWorkload()
3019+
},
3020+
})
3021+
}
3022+
}
29443023
}
29453024

29463025
const (

0 commit comments

Comments
 (0)