Skip to content

Commit 871caa4

Browse files
committed
roachtest: add INSPECT performance benchmarks
Add two nightly roachtest benchmarks to measure INSPECT throughput across different test durations: - Short run: 12 nodes × 8 CPUs, 500M rows, 1 index (~1 hour) - Long run: 12 nodes × 8 CPUs, 1B rows, 2 indexes (~5 hours) These configurations ensure that the number of spans exceeds the total CPU count (96), which is important because INSPECT parallelism is based on the number of spans. Informs #154457 Epic: CRDB-30356 Release note: none
1 parent 481e179 commit 871caa4

File tree

3 files changed

+308
-0
lines changed

3 files changed

+308
-0
lines changed

pkg/cmd/roachtest/tests/BUILD.bazel

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -90,6 +90,7 @@ go_library(
9090
"import_cancellation.go",
9191
"inconsistency.go",
9292
"indexes.go",
93+
"inspect_throughput.go",
9394
"invariant_check_detection.go",
9495
"inverted_index.go",
9596
"jasyncsql.go",
Lines changed: 306 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,306 @@
1+
// Copyright 2025 The Cockroach Authors.
2+
//
3+
// Use of this software is governed by the CockroachDB Software License
4+
// included in the /LICENSE file.
5+
6+
package tests
7+
8+
import (
9+
"bytes"
10+
"context"
11+
gosql "database/sql"
12+
"errors"
13+
"fmt"
14+
"io"
15+
"strings"
16+
"time"
17+
18+
"github.com/cockroachdb/cockroach/pkg/cmd/roachtest/cluster"
19+
"github.com/cockroachdb/cockroach/pkg/cmd/roachtest/option"
20+
"github.com/cockroachdb/cockroach/pkg/cmd/roachtest/registry"
21+
"github.com/cockroachdb/cockroach/pkg/cmd/roachtest/roachtestutil"
22+
"github.com/cockroachdb/cockroach/pkg/cmd/roachtest/spec"
23+
"github.com/cockroachdb/cockroach/pkg/cmd/roachtest/test"
24+
"github.com/cockroachdb/cockroach/pkg/roachprod/install"
25+
"github.com/cockroachdb/cockroach/pkg/util/timeutil"
26+
"github.com/cockroachdb/cockroach/pkg/workload/histogram"
27+
"github.com/cockroachdb/cockroach/pkg/workload/histogram/exporter"
28+
"github.com/lib/pq"
29+
)
30+
31+
func registerInspectThoughput(r registry.Registry) {
32+
// Short run: 12 nodes × 8 CPUs, 500M rows, 1 index check, ~1 hour (in v25.4)
33+
r.Add(makeInspectThroughputTest(r, 12, 8, 500_000_000, 3*time.Hour, 1))
34+
35+
// Long run: 12 nodes × 8 CPUs, 1B rows, 2 index checks (runs INSPECT twice: 1 index, then 2 indexes), ~5 hours (in v25.4)
36+
// TODO(148365): Until we have INSPECT syntax, we cannot execute the INSPECT
37+
// job on a subset of indexes. Set this to 2 when INSPECT SQL is available.
38+
const indexesForLongRun = 1
39+
r.Add(makeInspectThroughputTest(r, 12, 8, 1_000_000_000, 8*time.Hour, indexesForLongRun))
40+
}
41+
42+
// initInspectHistograms creates a histogram registry with multiple named metrics.
43+
// All metrics share the same registry and are ticked together.
44+
func initInspectHistograms(
45+
timeout time.Duration, t test.Test, e exporter.Exporter, metricNames []string,
46+
) (*histogram.Registry, *bytes.Buffer) {
47+
reg := histogram.NewRegistryWithExporter(
48+
timeout,
49+
histogram.MockWorkloadName,
50+
e,
51+
)
52+
53+
// Register all metric names in the same registry
54+
for _, name := range metricNames {
55+
reg.GetHandle().Get(name)
56+
}
57+
58+
bytesBuf := bytes.NewBuffer([]byte{})
59+
writer := io.Writer(bytesBuf)
60+
e.Init(&writer)
61+
62+
return reg, bytesBuf
63+
}
64+
65+
func makeInspectThroughputTest(
66+
r registry.Registry, numNodes, numCPUs, numRows int, length time.Duration, numChecks int,
67+
) registry.TestSpec {
68+
// Define index names that will be created.
69+
indexNames := []string{
70+
"bulkingest_c_b_a_idx",
71+
"bulkingest_b_a_c_idx",
72+
}
73+
74+
// Cap numChecks to the number of available indexes.
75+
const maxChecks = 2
76+
if numChecks > maxChecks {
77+
numChecks = maxChecks
78+
}
79+
if numChecks < 1 {
80+
numChecks = 1
81+
}
82+
83+
return registry.TestSpec{
84+
Name: fmt.Sprintf("inspect/throughput/bulkingest/rows=%d", numRows),
85+
Owner: registry.OwnerSQLFoundations,
86+
Benchmark: true,
87+
Cluster: r.MakeClusterSpec(numNodes, spec.WorkloadNode(), spec.CPU(numCPUs)),
88+
CompatibleClouds: registry.AllExceptAWS,
89+
Suites: registry.Suites(registry.Nightly),
90+
Leases: registry.LeaderLeases,
91+
Timeout: length,
92+
PostProcessPerfMetrics: func(test string, histogram *roachtestutil.HistogramMetric) (roachtestutil.AggregatedPerfMetrics, error) {
93+
// This callback is invoked once with all histogram summaries.
94+
// histogram.Summaries contains one entry for each named metric (checks=1, checks=2).
95+
metrics := roachtestutil.AggregatedPerfMetrics{}
96+
97+
for _, summary := range histogram.Summaries {
98+
// Each summary corresponds to one INSPECT run with a specific check count.
99+
totalElapsed := summary.TotalElapsed
100+
if totalElapsed == 0 {
101+
continue
102+
}
103+
104+
// Calculate throughput in rows/sec per CPU.
105+
// TotalElapsed is in milliseconds, convert to seconds.
106+
inspectDuration := totalElapsed / 1000
107+
if inspectDuration == 0 {
108+
inspectDuration = 1 // Avoid division by zero.
109+
}
110+
totalCPUs := int64(numNodes * numCPUs)
111+
throughput := roachtestutil.MetricPoint(float64(numRows) / float64(totalCPUs*inspectDuration))
112+
113+
// Use the summary name (e.g., "checks=1") as part of the metric name.
114+
metrics = append(metrics, &roachtestutil.AggregatedMetric{
115+
Name: fmt.Sprintf("%s_%s_throughput", test, summary.Name),
116+
Value: throughput,
117+
Unit: "rows/s/cpu",
118+
IsHigherBetter: true,
119+
})
120+
}
121+
122+
return metrics, nil
123+
},
124+
Run: func(ctx context.Context, t test.Test, c cluster.Cluster) {
125+
// Configure column a to have sequential ascending values. The payload
126+
// column will be randomized and thus uncorrelated with the primary key
127+
// (a, b, c).
128+
bNum := 1000
129+
cNum := 1000
130+
aNum := numRows / (bNum * cNum)
131+
if c.IsLocal() {
132+
aNum = 100000
133+
bNum = 1
134+
cNum = 1
135+
}
136+
payloadBytes := 40
137+
138+
settings := install.MakeClusterSettings()
139+
c.Start(ctx, t.L(), option.DefaultStartOpts(), settings, c.CRDBNodes())
140+
141+
db := c.Conn(ctx, t.L(), 1)
142+
defer db.Close()
143+
144+
disableRowCountValidation(t, db)
145+
146+
// Import bulkingest data without the default index. We'll create custom
147+
// indexes based on the checks parameter.
148+
cmdImport := fmt.Sprintf(
149+
"./cockroach workload fixtures import bulkingest {pgurl:1} --a %d --b %d --c %d --payload-bytes %d --index-b-c-a=false",
150+
aNum, bNum, cNum, payloadBytes,
151+
)
152+
153+
t.L().Printf("Importing bulkingest data")
154+
c.Run(ctx, option.WithNodes(c.WorkloadNode()), cmdImport)
155+
156+
// Set up a single histogram registry with metrics for the requested number of checks.
157+
var metricNames []string
158+
for i := 1; i <= numChecks; i++ {
159+
metricNames = append(metricNames, fmt.Sprintf("checks=%d", i))
160+
}
161+
exporter := roachtestutil.CreateWorkloadHistogramExporter(t, c)
162+
reg, perfBuf := initInspectHistograms(length*2, t, exporter, metricNames)
163+
defer roachtestutil.CloseExporter(ctx, exporter, t, c, perfBuf, c.Node(1), "")
164+
165+
// Helper to tick a specific named histogram.
166+
tickHistogram := func(name string) {
167+
reg.Tick(func(tick histogram.Tick) {
168+
if tick.Name == name {
169+
_ = tick.Exporter.SnapshotAndWrite(tick.Hist, tick.Now, tick.Elapsed, &tick.Name)
170+
}
171+
})
172+
}
173+
174+
// Build index lists for each check count.
175+
type checkConfig struct {
176+
metricName string
177+
indexList []string
178+
indexListSQL string
179+
}
180+
var configs []checkConfig
181+
for checks := 1; checks <= numChecks; checks++ {
182+
var indexList []string
183+
for i := 0; i < checks; i++ {
184+
indexList = append(indexList, indexNames[i])
185+
}
186+
configs = append(configs, checkConfig{
187+
metricName: metricNames[checks-1],
188+
indexList: indexList,
189+
indexListSQL: strings.Join(indexList, ", "),
190+
})
191+
}
192+
193+
// Create the requested number of indexes with explicit names.
194+
allIndexDefinitions := []struct {
195+
name string
196+
definition string
197+
}{
198+
{indexNames[0], "(c, b, a)"},
199+
{indexNames[1], "(b, a, c)"},
200+
}
201+
indexDefinitions := allIndexDefinitions[:numChecks]
202+
for i, idx := range indexDefinitions {
203+
indexSQL := fmt.Sprintf("CREATE INDEX %s ON bulkingest.bulkingest %s", idx.name, idx.definition)
204+
t.L().Printf("Creating index %d/%d: %s", i+1, len(indexDefinitions), indexSQL)
205+
if _, err := db.Exec(indexSQL); err != nil {
206+
t.Fatal(err)
207+
}
208+
}
209+
210+
t.L().Printf("Computing table statistics manually")
211+
if _, err := db.Exec("CREATE STATISTICS stats FROM bulkingest.bulkingest"); err != nil {
212+
t.Fatal(err)
213+
}
214+
215+
// Enable scrub jobs for EXPERIMENTAL SCRUB to use job system.
216+
if _, err := db.Exec("SET enable_scrub_job = on"); err != nil {
217+
t.Fatal(err)
218+
}
219+
220+
// Run INSPECT, ticking the corresponding histogram.
221+
for i, cfg := range configs {
222+
checks := i + 1
223+
t.L().Printf("Running INSPECT with %d check(s): %s", checks, cfg.indexListSQL)
224+
225+
// Tick before starting INSPECT for this specific metric.
226+
tickHistogram(cfg.metricName)
227+
before := timeutil.Now()
228+
229+
// TODO(148365): Update to use INSPECT syntax when SQL is ready.
230+
scrubSQL := fmt.Sprintf("EXPERIMENTAL SCRUB TABLE bulkingest.bulkingest WITH OPTIONS INDEX (%s)", cfg.indexListSQL)
231+
if _, err := db.Exec(scrubSQL); err != nil {
232+
t.Fatal(err)
233+
}
234+
235+
// Tick after INSPECT completes to capture elapsed time for this specific metric.
236+
tickHistogram(cfg.metricName)
237+
duration := timeutil.Since(before)
238+
t.L().Printf("INSPECT with %d check(s) took %v\n", checks, duration)
239+
240+
// Query the job progress to get the total check count. Since each span
241+
// handles all indexes, we divide by the number of checks to get the
242+
// actual span count. This is important for debugging because INSPECT
243+
// concurrency is based on spans. So, knowing the number of spans vs
244+
// CPUs helps understand how well INSPECT parallelized this job.
245+
var jobTotalCheckCount int64
246+
querySQL := `
247+
SELECT coalesce(
248+
(crdb_internal.pb_to_json(
249+
'cockroach.sql.jobs.jobspb.Progress',
250+
value
251+
)->'inspect'->>'jobTotalCheckCount')::INT8,
252+
0
253+
)
254+
FROM system.job_info
255+
WHERE job_id = (
256+
SELECT job_id
257+
FROM [SHOW JOBS]
258+
WHERE job_type = 'INSPECT'
259+
ORDER BY created DESC
260+
LIMIT 1
261+
)
262+
AND info_key = 'legacy_progress'`
263+
err := db.QueryRow(querySQL).Scan(&jobTotalCheckCount)
264+
if err != nil {
265+
t.L().Printf("Warning: failed to query job total check count: %v", err)
266+
} else {
267+
// Each span handles all indexes, so divide by number of checks to get span count
268+
spanCount := int(jobTotalCheckCount) / checks
269+
totalCPUs := numNodes * numCPUs
270+
spansPerCPU := float64(spanCount) / float64(totalCPUs)
271+
t.L().Printf("INSPECT completed %d checks across %d spans (%.2f spans/CPU with %d total CPUs)\n",
272+
jobTotalCheckCount, spanCount, spansPerCPU, totalCPUs)
273+
}
274+
}
275+
},
276+
}
277+
}
278+
279+
// disableRowCountValidation disables automatic row count validation during import.
280+
// This is necessary for INSPECT tests because row count validation uses INSPECT
281+
// behind the covers, and we want to control when INSPECT runs.
282+
func disableRowCountValidation(t test.Test, db *gosql.DB) {
283+
t.Helper()
284+
t.L().Printf("Disabling automatic row count validation")
285+
_, err := db.Exec("SET CLUSTER SETTING bulkio.import.row_count_validation.unsafe.mode = 'off'")
286+
// If we get an error, it's because the cluster setting is considered unsafe.
287+
// So, we need extract the interlock key from the error.
288+
if err != nil {
289+
var pqErr *pq.Error
290+
if !errors.As(err, &pqErr) {
291+
t.Fatalf("expected pq.Error, got %T: %v", err, err)
292+
}
293+
if !strings.HasPrefix(pqErr.Detail, "key: ") {
294+
t.Fatalf("expected error detail to start with 'key: ', got: %s", pqErr.Detail)
295+
}
296+
interlockKey := strings.TrimPrefix(pqErr.Detail, "key: ")
297+
298+
// Set the interlock key and retry.
299+
if _, err := db.Exec("SET unsafe_setting_interlock_key = $1", interlockKey); err != nil {
300+
t.Fatal(err)
301+
}
302+
if _, err := db.Exec("SET CLUSTER SETTING bulkio.import.row_count_validation.unsafe.mode = 'off'"); err != nil {
303+
t.Fatal(err)
304+
}
305+
}
306+
}

pkg/cmd/roachtest/tests/registry.go

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -77,6 +77,7 @@ func RegisterTests(r registry.Registry) {
7777
registerImportTPCH(r)
7878
registerInconsistency(r)
7979
registerIndexes(r)
80+
registerInspectThoughput(r)
8081
registerInvariantCheckDetection(r)
8182
registerJasyncSQL(r)
8283
registerJepsen(r)

0 commit comments

Comments
 (0)