Skip to content

Commit c5c821a

Browse files
craig[bot]erikgrinaker
andcommitted
107722: roachtest: add changefeed scan benchmarks r=erikgrinaker a=erikgrinaker This patch adds a set of changefeed scan benchmarks, which measure the scan rate (rows per second) across a simple key/value table with various types of scans: initial scans, catchup scans, and catchup scans with cold data (no new rows). There are no concurrent writes to the table. The average rows per second rate is exported for roachperf graphing, as a single value for the entire run. No time series are exported. These benchmarks don't use the `cdcTester` framework, because we don't want them to be affected by incidental test changes, and the logic is straightforward. The CDC team can change this later if they wish. ``` cdc/scan/catchup-cold/nodes=5/cpu=16/rows=1G/ranges=100/protocol=mux/format=json/sink=null [cdc] cdc/scan/catchup-cold/nodes=5/cpu=16/rows=1G/ranges=100/protocol=rangefeed/format=json/sink=null [cdc] cdc/scan/catchup-cold/nodes=5/cpu=16/rows=1G/ranges=100K/protocol=mux/format=json/sink=null [cdc] cdc/scan/catchup-cold/nodes=5/cpu=16/rows=1G/ranges=100K/protocol=rangefeed/format=json/sink=null [cdc] cdc/scan/catchup/nodes=5/cpu=16/rows=1G/ranges=100/protocol=mux/format=json/sink=null [cdc] cdc/scan/catchup/nodes=5/cpu=16/rows=1G/ranges=100/protocol=rangefeed/format=json/sink=null [cdc] cdc/scan/catchup/nodes=5/cpu=16/rows=1G/ranges=100K/protocol=mux/format=json/sink=null [cdc] cdc/scan/catchup/nodes=5/cpu=16/rows=1G/ranges=100K/protocol=rangefeed/format=json/sink=null [cdc] cdc/scan/initial/nodes=5/cpu=16/rows=1G/ranges=100/protocol=mux/format=json/sink=null [cdc] cdc/scan/initial/nodes=5/cpu=16/rows=1G/ranges=100/protocol=rangefeed/format=json/sink=null [cdc] cdc/scan/initial/nodes=5/cpu=16/rows=1G/ranges=100K/protocol=mux/format=json/sink=null [cdc] cdc/scan/initial/nodes=5/cpu=16/rows=1G/ranges=100K/protocol=rangefeed/format=json/sink=null [cdc] ``` Resolves cockroachdb#107070. Epic: none Release note: None Co-authored-by: Erik Grinaker <[email protected]>
2 parents d87a04b + 65b5fc0 commit c5c821a

File tree

4 files changed

+347
-0
lines changed

4 files changed

+347
-0
lines changed

pkg/cmd/roachtest/tests/BUILD.bazel

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -31,6 +31,7 @@ go_library(
3131
"canary.go",
3232
"cancel.go",
3333
"cdc.go",
34+
"cdc_bench.go",
3435
"cdc_stats.go",
3536
"chaos.go",
3637
"clearrange.go",

pkg/cmd/roachtest/tests/cdc.go

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -2374,6 +2374,7 @@ func (cfc *changefeedCreator) Create() (int, error) {
23742374
type changefeedInfo struct {
23752375
status string
23762376
errMsg string
2377+
startedTime time.Time
23772378
statementTime time.Time
23782379
highwaterTime time.Time
23792380
finishedTime time.Time
@@ -2409,6 +2410,7 @@ func getChangefeedInfo(db *gosql.DB, jobID int) (*changefeedInfo, error) {
24092410
return &changefeedInfo{
24102411
status: status,
24112412
errMsg: payload.Error,
2413+
startedTime: time.UnixMicro(payload.StartedMicros),
24122414
statementTime: payload.GetChangefeed().StatementTime.GoTime(),
24132415
highwaterTime: highwaterTime,
24142416
finishedTime: time.UnixMicro(payload.FinishedMicros),
Lines changed: 343 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,343 @@
1+
// Copyright 2023 The Cockroach Authors.
2+
//
3+
// Use of this software is governed by the Business Source License
4+
// included in the file licenses/BSL.txt.
5+
//
6+
// As of the Change Date specified in that file, in accordance with
7+
// the Business Source License, use of this software will be governed
8+
// by the Apache License, Version 2.0, included in the file
9+
// licenses/APL.txt.
10+
11+
package tests
12+
13+
import (
14+
"bytes"
15+
"context"
16+
gosql "database/sql"
17+
"encoding/json"
18+
"fmt"
19+
"path/filepath"
20+
"time"
21+
22+
"github.com/cockroachdb/cockroach/pkg/cmd/roachtest/cluster"
23+
"github.com/cockroachdb/cockroach/pkg/cmd/roachtest/option"
24+
"github.com/cockroachdb/cockroach/pkg/cmd/roachtest/registry"
25+
"github.com/cockroachdb/cockroach/pkg/cmd/roachtest/spec"
26+
"github.com/cockroachdb/cockroach/pkg/cmd/roachtest/test"
27+
"github.com/cockroachdb/cockroach/pkg/jobs"
28+
"github.com/cockroachdb/cockroach/pkg/roachprod/install"
29+
"github.com/cockroachdb/cockroach/pkg/util/timeutil"
30+
"github.com/cockroachdb/cockroach/pkg/workload/histogram"
31+
"github.com/cockroachdb/errors"
32+
humanize "github.com/dustin/go-humanize"
33+
"github.com/stretchr/testify/require"
34+
)
35+
36+
type cdcBenchScanType string
37+
type cdcBenchProtocol string
38+
39+
const (
40+
// cdcBenchInitialScan runs an initial scan across a table, i.e. it scans and
41+
// emits all rows in the table.
42+
cdcBenchInitialScan cdcBenchScanType = "initial"
43+
44+
// cdcBenchCatchupScan runs a catchup scan across a table where all the data
45+
// is eligible for emission, i.e. it creates a changefeed with a cursor below
46+
// the data ingestion timestamp and emits all rows in the table.
47+
cdcBenchCatchupScan cdcBenchScanType = "catchup"
48+
49+
// cdcBenchColdCatchupScan runs a catchup scan across a table, where none of
50+
// the data is eligible, i.e. it creates a changefeed with a cursor above the
51+
// data ingestion timestamp. This is the common case in production clusters,
52+
// where tables are large and the relative amount of changes is low. This
53+
// won't emit any rows, but it still needs to scan the entire table to look
54+
// for data above the cursor, and relies on Pebble's block property filters to
55+
// do so efficiently. Ideally, this wouldn't take any time at all, but in
56+
// practice it can.
57+
cdcBenchColdCatchupScan cdcBenchScanType = "catchup-cold"
58+
59+
cdcBenchNoProtocol cdcBenchProtocol = ""
60+
cdcBenchRangefeedProtocol cdcBenchProtocol = "rangefeed" // basic rangefeed protocol
61+
cdcBenchMuxProtocol cdcBenchProtocol = "mux" // multiplexing rangefeed protocol
62+
)
63+
64+
var (
65+
cdcBenchScanTypes = []cdcBenchScanType{
66+
cdcBenchInitialScan, cdcBenchCatchupScan, cdcBenchColdCatchupScan}
67+
cdcBenchProtocols = []cdcBenchProtocol{cdcBenchRangefeedProtocol, cdcBenchMuxProtocol}
68+
)
69+
70+
func registerCDCBench(r registry.Registry) {
71+
72+
// Initial/catchup scan benchmarks.
73+
for _, scanType := range cdcBenchScanTypes {
74+
for _, ranges := range []int64{100, 100000} {
75+
for _, protocol := range cdcBenchProtocols {
76+
scanType, ranges, protocol := scanType, ranges, protocol // pin loop variables
77+
const (
78+
nodes = 5 // excluding coordinator/workload node
79+
cpus = 16
80+
rows = 1_000_000_000 // 19 GB
81+
format = "json"
82+
)
83+
r.Add(registry.TestSpec{
84+
Name: fmt.Sprintf(
85+
"cdc/scan/%s/nodes=%d/cpu=%d/rows=%s/ranges=%s/protocol=%s/format=%s/sink=null",
86+
scanType, nodes, cpus, formatSI(rows), formatSI(ranges), protocol, format),
87+
Owner: registry.OwnerCDC,
88+
Benchmark: true,
89+
Cluster: r.MakeClusterSpec(nodes+1, spec.CPU(cpus)),
90+
RequiresLicense: true,
91+
Timeout: 2 * time.Hour, // catchup scans with 100k ranges can take >1 hour
92+
Run: func(ctx context.Context, t test.Test, c cluster.Cluster) {
93+
if ranges == 100000 && scanType == cdcBenchCatchupScan {
94+
t.Skip("fails to complete, see https://github.com/cockroachdb/cockroach/issues/108157")
95+
}
96+
runCDCBenchScan(ctx, t, c, scanType, rows, ranges, protocol, format)
97+
},
98+
})
99+
}
100+
}
101+
}
102+
}
103+
104+
func formatSI(num int64) string {
105+
numSI, suffix := humanize.ComputeSI(float64(num))
106+
return fmt.Sprintf("%d%s", int64(numSI), suffix)
107+
}
108+
109+
// makeCDCBenchOptions creates common cluster options for CDC benchmarks.
110+
func makeCDCBenchOptions() (option.StartOpts, install.ClusterSettings) {
111+
opts := option.DefaultStartOpts()
112+
settings := install.MakeClusterSettings()
113+
settings.ClusterSettings["kv.rangefeed.enabled"] = "true"
114+
115+
// Disable the stuck watcher, since it can cause continual catchup scans when
116+
// ranges aren't able to keep up.
117+
settings.ClusterSettings["kv.rangefeed.range_stuck_threshold"] = "0"
118+
119+
// Scheduled backups may interfere with performance, disable them.
120+
opts.RoachprodOpts.ScheduleBackups = false
121+
122+
// Backpressure writers when rangefeed clients can't keep up. This gives more
123+
// reliable results, since we can otherwise randomly hit timeouts and incur
124+
// catchup scans.
125+
settings.Env = append(settings.Env, "COCKROACH_RANGEFEED_SEND_TIMEOUT=0")
126+
127+
return opts, settings
128+
}
129+
130+
// runCDCBenchScan benchmarks throughput for a changefeed initial or catchup
131+
// scan as rows scanned per second.
132+
//
133+
// It sets up a cluster with N-1 data nodes, and a separate changefeed
134+
// coordinator node. The latter is also used as the workload runner, since we
135+
// don't start the coordinator until the data has been imported.
136+
func runCDCBenchScan(
137+
ctx context.Context,
138+
t test.Test,
139+
c cluster.Cluster,
140+
scanType cdcBenchScanType,
141+
numRows, numRanges int64,
142+
protocol cdcBenchProtocol,
143+
format string,
144+
) {
145+
const sink = "null://"
146+
var (
147+
numNodes = c.Spec().NodeCount
148+
nData = c.Range(1, numNodes-1)
149+
nCoord = c.Node(numNodes)
150+
)
151+
152+
// Start data nodes first to place data on them. We'll start the changefeed
153+
// coordinator later, since we don't want any data on it.
154+
opts, settings := makeCDCBenchOptions()
155+
156+
switch protocol {
157+
case cdcBenchMuxProtocol:
158+
settings.ClusterSettings["changefeed.mux_rangefeed.enabled"] = "true"
159+
case cdcBenchRangefeedProtocol:
160+
settings.ClusterSettings["changefeed.mux_rangefeed.enabled"] = "false"
161+
case cdcBenchNoProtocol:
162+
default:
163+
t.Fatalf("unknown protocol %q", protocol)
164+
}
165+
166+
c.Put(ctx, t.Cockroach(), "./cockroach")
167+
c.Start(ctx, t.L(), opts, settings, nData)
168+
m := c.NewMonitor(ctx, nData.Merge(nCoord))
169+
170+
conn := c.Conn(ctx, t.L(), nData[0])
171+
defer conn.Close()
172+
173+
// Prohibit ranges on the changefeed coordinator.
174+
t.L().Printf("configuring zones")
175+
for _, target := range getAllZoneTargets(ctx, t, conn) {
176+
_, err := conn.ExecContext(ctx, fmt.Sprintf(
177+
`ALTER %s CONFIGURE ZONE USING num_replicas=3, constraints='[-node%d]'`, target, nCoord[0]))
178+
require.NoError(t, err)
179+
}
180+
181+
// Wait for system ranges to upreplicate.
182+
require.NoError(t, WaitFor3XReplication(ctx, t, conn))
183+
184+
// Create and split the workload table. We don't import data here, because it
185+
// imports before splitting, which takes a very long time.
186+
//
187+
// NB: don't scatter -- the ranges end up fairly well-distributed anyway, and
188+
// the scatter can often fail with 100k ranges.
189+
t.L().Printf("creating table with %s ranges", humanize.Comma(numRanges))
190+
c.Run(ctx, nCoord, fmt.Sprintf(
191+
`./cockroach workload init kv --splits %d {pgurl:%d}`, numRanges, nData[0]))
192+
require.NoError(t, WaitFor3XReplication(ctx, t, conn))
193+
194+
cursor := timeutil.Now() // before data is ingested
195+
196+
// Ingest data. init allows us to import into the existing table. However,
197+
// catchup scans can't operate across an import, so use inserts in that case.
198+
loader := "import"
199+
if scanType == cdcBenchCatchupScan {
200+
loader = "insert"
201+
}
202+
t.L().Printf("ingesting %s rows using %s", humanize.Comma(numRows), loader)
203+
c.Run(ctx, nCoord, fmt.Sprintf(
204+
`./cockroach workload init kv --insert-count %d --data-loader %s {pgurl:%d}`,
205+
numRows, loader, nData[0]))
206+
207+
// Now that the ranges are placed, start the changefeed coordinator.
208+
t.L().Printf("starting coordinator node")
209+
c.Start(ctx, t.L(), opts, settings, nCoord)
210+
211+
conn = c.Conn(ctx, t.L(), nCoord[0])
212+
defer conn.Close()
213+
214+
if scanType == cdcBenchColdCatchupScan {
215+
cursor = timeutil.Now() // after data is ingested
216+
}
217+
218+
// Start the scan on the changefeed coordinator. We set an explicit end time
219+
// in the near future, and compute throughput based on the job's start and
220+
// finish time.
221+
t.L().Printf("running changefeed %s scan", scanType)
222+
with := fmt.Sprintf(`format = '%s', end_time = '%s'`,
223+
format, timeutil.Now().Add(5*time.Second).Format(time.RFC3339))
224+
switch scanType {
225+
case cdcBenchInitialScan:
226+
with += ", initial_scan = 'yes'"
227+
case cdcBenchCatchupScan, cdcBenchColdCatchupScan:
228+
with += fmt.Sprintf(", cursor = '%s'", cursor.Format(time.RFC3339))
229+
default:
230+
t.Fatalf("unknown scan type %q", scanType)
231+
}
232+
var jobID int
233+
require.NoError(t, conn.QueryRowContext(ctx,
234+
fmt.Sprintf(`CREATE CHANGEFEED FOR kv.kv INTO '%s' WITH %s`, sink, with)).
235+
Scan(&jobID))
236+
237+
// Wait for the changefeed to complete, and compute throughput.
238+
m.Go(func(ctx context.Context) error {
239+
t.L().Printf("waiting for changefeed to finish")
240+
info, err := waitForChangefeed(ctx, conn, jobID, func(info changefeedInfo) (bool, error) {
241+
switch jobs.Status(info.status) {
242+
case jobs.StatusSucceeded:
243+
return true, nil
244+
case jobs.StatusPending, jobs.StatusRunning:
245+
return false, nil
246+
default:
247+
return false, errors.Errorf("unexpected changefeed status %q", info.status)
248+
}
249+
})
250+
if err != nil {
251+
return err
252+
}
253+
254+
duration := info.finishedTime.Sub(info.startedTime)
255+
rate := int64(float64(numRows) / duration.Seconds())
256+
t.L().Printf("changefeed completed in %s (scanned %s rows per second)",
257+
duration.Truncate(time.Second), humanize.Comma(rate))
258+
259+
// Record scan rate to stats.json.
260+
return writeCDCBenchStats(ctx, t, c, nCoord, "scan-rate", rate)
261+
})
262+
263+
m.Wait()
264+
}
265+
266+
// getAllZoneTargets returns all zone targets (e.g. "RANGE default", "DATABASE
267+
// system", etc).
268+
func getAllZoneTargets(ctx context.Context, t test.Test, conn *gosql.DB) []string {
269+
rows, err := conn.QueryContext(ctx, `SELECT target FROM [SHOW ALL ZONE CONFIGURATIONS]`)
270+
require.NoError(t, err)
271+
var targets []string
272+
for rows.Next() {
273+
var target string
274+
require.NoError(t, rows.Scan(&target))
275+
targets = append(targets, target)
276+
}
277+
require.NoError(t, rows.Err())
278+
return targets
279+
}
280+
281+
// waitForChangefeed waits until the changefeed satisfies the given closure.
282+
func waitForChangefeed(
283+
ctx context.Context, conn *gosql.DB, jobID int, f func(changefeedInfo) (bool, error),
284+
) (changefeedInfo, error) {
285+
ticker := time.NewTicker(5 * time.Second)
286+
defer ticker.Stop()
287+
for {
288+
select {
289+
case <-ticker.C:
290+
case <-ctx.Done():
291+
return changefeedInfo{}, ctx.Err()
292+
}
293+
294+
info, err := getChangefeedInfo(conn, jobID)
295+
if err != nil {
296+
return changefeedInfo{}, err
297+
} else if info.errMsg != "" {
298+
return changefeedInfo{}, errors.Errorf("changefeed error: %s", info.errMsg)
299+
}
300+
if ok, err := f(*info); err != nil {
301+
return changefeedInfo{}, err
302+
} else if ok {
303+
return *info, nil
304+
}
305+
}
306+
}
307+
308+
// writeCDCBenchStats writes a single perf metric into stats.json on the
309+
// given node, for graphing in roachperf.
310+
func writeCDCBenchStats(
311+
ctx context.Context,
312+
t test.Test,
313+
c cluster.Cluster,
314+
node option.NodeListOption,
315+
metric string,
316+
value int64,
317+
) error {
318+
// The easiest way to record a precise metric for roachperf is to cast it as a
319+
// duration in seconds in the histogram's upper bound.
320+
valueS := time.Duration(value) * time.Second
321+
reg := histogram.NewRegistry(valueS, histogram.MockWorkloadName)
322+
bytesBuf := bytes.NewBuffer([]byte{})
323+
jsonEnc := json.NewEncoder(bytesBuf)
324+
325+
var err error
326+
reg.GetHandle().Get(metric).Record(valueS)
327+
reg.Tick(func(tick histogram.Tick) {
328+
err = jsonEnc.Encode(tick.Snapshot())
329+
})
330+
if err != nil {
331+
return err
332+
}
333+
334+
// Upload the perf artifacts to the given node.
335+
path := filepath.Join(t.PerfArtifactsDir(), "stats.json")
336+
if err := c.RunE(ctx, node, "mkdir -p "+filepath.Dir(path)); err != nil {
337+
return err
338+
}
339+
if err := c.PutString(ctx, bytesBuf.String(), path, 0755, node); err != nil {
340+
return err
341+
}
342+
return nil
343+
}

pkg/cmd/roachtest/tests/registry.go

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -28,6 +28,7 @@ func RegisterTests(r registry.Registry) {
2828
registerBackupNodeShutdown(r)
2929
registerBackupFixtures(r)
3030
registerCDC(r)
31+
registerCDCBench(r)
3132
registerCDCMixedVersions(r)
3233
registerExportParquet(r)
3334
registerCancel(r)

0 commit comments

Comments
 (0)