|
| 1 | +// Copyright 2025 The Cockroach Authors. |
| 2 | +// |
| 3 | +// Use of this software is governed by the CockroachDB Software License |
| 4 | +// included in the /LICENSE file. |
| 5 | + |
| 6 | +package tests |
| 7 | + |
| 8 | +import ( |
| 9 | + "bytes" |
| 10 | + "context" |
| 11 | + gosql "database/sql" |
| 12 | + "errors" |
| 13 | + "fmt" |
| 14 | + "io" |
| 15 | + "strings" |
| 16 | + "time" |
| 17 | + |
| 18 | + "github.com/cockroachdb/cockroach/pkg/cmd/roachtest/cluster" |
| 19 | + "github.com/cockroachdb/cockroach/pkg/cmd/roachtest/option" |
| 20 | + "github.com/cockroachdb/cockroach/pkg/cmd/roachtest/registry" |
| 21 | + "github.com/cockroachdb/cockroach/pkg/cmd/roachtest/roachtestutil" |
| 22 | + "github.com/cockroachdb/cockroach/pkg/cmd/roachtest/spec" |
| 23 | + "github.com/cockroachdb/cockroach/pkg/cmd/roachtest/test" |
| 24 | + "github.com/cockroachdb/cockroach/pkg/roachprod/install" |
| 25 | + "github.com/cockroachdb/cockroach/pkg/util/timeutil" |
| 26 | + "github.com/cockroachdb/cockroach/pkg/workload/histogram" |
| 27 | + "github.com/cockroachdb/cockroach/pkg/workload/histogram/exporter" |
| 28 | + "github.com/lib/pq" |
| 29 | +) |
| 30 | + |
| 31 | +func registerInspectThoughput(r registry.Registry) { |
| 32 | + // Short run: 12 nodes × 8 CPUs, 500M rows, 1 index check, ~1 hour (in v25.4) |
| 33 | + r.Add(makeInspectThroughputTest(r, 12, 8, 500_000_000, 3*time.Hour, 1)) |
| 34 | + |
| 35 | + // Long run: 12 nodes × 8 CPUs, 1B rows, 2 index checks (runs INSPECT twice: 1 index, then 2 indexes), ~5 hours (in v25.4) |
| 36 | + // TODO(148365): Until we have INSPECT syntax, we cannot execute the INSPECT |
| 37 | + // job on a subset of indexes. Set this to 2 when INSPECT SQL is available. |
| 38 | + const indexesForLongRun = 1 |
| 39 | + r.Add(makeInspectThroughputTest(r, 12, 8, 1_000_000_000, 8*time.Hour, indexesForLongRun)) |
| 40 | +} |
| 41 | + |
| 42 | +// initInspectHistograms creates a histogram registry with multiple named metrics. |
| 43 | +// All metrics share the same registry and are ticked together. |
| 44 | +func initInspectHistograms( |
| 45 | + timeout time.Duration, t test.Test, e exporter.Exporter, metricNames []string, |
| 46 | +) (*histogram.Registry, *bytes.Buffer) { |
| 47 | + reg := histogram.NewRegistryWithExporter( |
| 48 | + timeout, |
| 49 | + histogram.MockWorkloadName, |
| 50 | + e, |
| 51 | + ) |
| 52 | + |
| 53 | + // Register all metric names in the same registry |
| 54 | + for _, name := range metricNames { |
| 55 | + reg.GetHandle().Get(name) |
| 56 | + } |
| 57 | + |
| 58 | + bytesBuf := bytes.NewBuffer([]byte{}) |
| 59 | + writer := io.Writer(bytesBuf) |
| 60 | + e.Init(&writer) |
| 61 | + |
| 62 | + return reg, bytesBuf |
| 63 | +} |
| 64 | + |
| 65 | +func makeInspectThroughputTest( |
| 66 | + r registry.Registry, numNodes, numCPUs, numRows int, length time.Duration, numChecks int, |
| 67 | +) registry.TestSpec { |
| 68 | + // Define index names that will be created. |
| 69 | + indexNames := []string{ |
| 70 | + "bulkingest_c_b_a_idx", |
| 71 | + "bulkingest_b_a_c_idx", |
| 72 | + } |
| 73 | + |
| 74 | + // Cap numChecks to the number of available indexes. |
| 75 | + const maxChecks = 2 |
| 76 | + if numChecks > maxChecks { |
| 77 | + numChecks = maxChecks |
| 78 | + } |
| 79 | + if numChecks < 1 { |
| 80 | + numChecks = 1 |
| 81 | + } |
| 82 | + |
| 83 | + return registry.TestSpec{ |
| 84 | + Name: fmt.Sprintf("inspect/throughput/bulkingest/rows=%d", numRows), |
| 85 | + Owner: registry.OwnerSQLFoundations, |
| 86 | + Benchmark: true, |
| 87 | + Cluster: r.MakeClusterSpec(numNodes, spec.WorkloadNode(), spec.CPU(numCPUs)), |
| 88 | + CompatibleClouds: registry.AllExceptAWS, |
| 89 | + Suites: registry.Suites(registry.Nightly), |
| 90 | + Leases: registry.LeaderLeases, |
| 91 | + Timeout: length, |
| 92 | + PostProcessPerfMetrics: func(test string, histogram *roachtestutil.HistogramMetric) (roachtestutil.AggregatedPerfMetrics, error) { |
| 93 | + // This callback is invoked once with all histogram summaries. |
| 94 | + // histogram.Summaries contains one entry for each named metric (checks=1, checks=2). |
| 95 | + metrics := roachtestutil.AggregatedPerfMetrics{} |
| 96 | + |
| 97 | + for _, summary := range histogram.Summaries { |
| 98 | + // Each summary corresponds to one INSPECT run with a specific check count. |
| 99 | + totalElapsed := summary.TotalElapsed |
| 100 | + if totalElapsed == 0 { |
| 101 | + continue |
| 102 | + } |
| 103 | + |
| 104 | + // Calculate throughput in rows/sec per CPU. |
| 105 | + // TotalElapsed is in milliseconds, convert to seconds. |
| 106 | + inspectDuration := totalElapsed / 1000 |
| 107 | + if inspectDuration == 0 { |
| 108 | + inspectDuration = 1 // Avoid division by zero. |
| 109 | + } |
| 110 | + totalCPUs := int64(numNodes * numCPUs) |
| 111 | + throughput := roachtestutil.MetricPoint(float64(numRows) / float64(totalCPUs*inspectDuration)) |
| 112 | + |
| 113 | + // Use the summary name (e.g., "checks=1") as part of the metric name. |
| 114 | + metrics = append(metrics, &roachtestutil.AggregatedMetric{ |
| 115 | + Name: fmt.Sprintf("%s_%s_throughput", test, summary.Name), |
| 116 | + Value: throughput, |
| 117 | + Unit: "rows/s/cpu", |
| 118 | + IsHigherBetter: true, |
| 119 | + }) |
| 120 | + } |
| 121 | + |
| 122 | + return metrics, nil |
| 123 | + }, |
| 124 | + Run: func(ctx context.Context, t test.Test, c cluster.Cluster) { |
| 125 | + // Configure column a to have sequential ascending values. The payload |
| 126 | + // column will be randomized and thus uncorrelated with the primary key |
| 127 | + // (a, b, c). |
| 128 | + bNum := 1000 |
| 129 | + cNum := 1000 |
| 130 | + aNum := numRows / (bNum * cNum) |
| 131 | + if c.IsLocal() { |
| 132 | + aNum = 100000 |
| 133 | + bNum = 1 |
| 134 | + cNum = 1 |
| 135 | + } |
| 136 | + payloadBytes := 40 |
| 137 | + |
| 138 | + settings := install.MakeClusterSettings() |
| 139 | + c.Start(ctx, t.L(), option.DefaultStartOpts(), settings, c.CRDBNodes()) |
| 140 | + |
| 141 | + db := c.Conn(ctx, t.L(), 1) |
| 142 | + defer db.Close() |
| 143 | + |
| 144 | + disableRowCountValidation(t, db) |
| 145 | + |
| 146 | + // Import bulkingest data without the default index. We'll create custom |
| 147 | + // indexes based on the checks parameter. |
| 148 | + cmdImport := fmt.Sprintf( |
| 149 | + "./cockroach workload fixtures import bulkingest {pgurl:1} --a %d --b %d --c %d --payload-bytes %d --index-b-c-a=false", |
| 150 | + aNum, bNum, cNum, payloadBytes, |
| 151 | + ) |
| 152 | + |
| 153 | + t.L().Printf("Importing bulkingest data") |
| 154 | + c.Run(ctx, option.WithNodes(c.WorkloadNode()), cmdImport) |
| 155 | + |
| 156 | + // Set up a single histogram registry with metrics for the requested number of checks. |
| 157 | + var metricNames []string |
| 158 | + for i := 1; i <= numChecks; i++ { |
| 159 | + metricNames = append(metricNames, fmt.Sprintf("checks=%d", i)) |
| 160 | + } |
| 161 | + exporter := roachtestutil.CreateWorkloadHistogramExporter(t, c) |
| 162 | + reg, perfBuf := initInspectHistograms(length*2, t, exporter, metricNames) |
| 163 | + defer roachtestutil.CloseExporter(ctx, exporter, t, c, perfBuf, c.Node(1), "") |
| 164 | + |
| 165 | + // Helper to tick a specific named histogram. |
| 166 | + tickHistogram := func(name string) { |
| 167 | + reg.Tick(func(tick histogram.Tick) { |
| 168 | + if tick.Name == name { |
| 169 | + _ = tick.Exporter.SnapshotAndWrite(tick.Hist, tick.Now, tick.Elapsed, &tick.Name) |
| 170 | + } |
| 171 | + }) |
| 172 | + } |
| 173 | + |
| 174 | + // Build index lists for each check count. |
| 175 | + type checkConfig struct { |
| 176 | + metricName string |
| 177 | + indexList []string |
| 178 | + indexListSQL string |
| 179 | + } |
| 180 | + var configs []checkConfig |
| 181 | + for checks := 1; checks <= numChecks; checks++ { |
| 182 | + var indexList []string |
| 183 | + for i := 0; i < checks; i++ { |
| 184 | + indexList = append(indexList, indexNames[i]) |
| 185 | + } |
| 186 | + configs = append(configs, checkConfig{ |
| 187 | + metricName: metricNames[checks-1], |
| 188 | + indexList: indexList, |
| 189 | + indexListSQL: strings.Join(indexList, ", "), |
| 190 | + }) |
| 191 | + } |
| 192 | + |
| 193 | + // Create the requested number of indexes with explicit names. |
| 194 | + allIndexDefinitions := []struct { |
| 195 | + name string |
| 196 | + definition string |
| 197 | + }{ |
| 198 | + {indexNames[0], "(c, b, a)"}, |
| 199 | + {indexNames[1], "(b, a, c)"}, |
| 200 | + } |
| 201 | + indexDefinitions := allIndexDefinitions[:numChecks] |
| 202 | + for i, idx := range indexDefinitions { |
| 203 | + indexSQL := fmt.Sprintf("CREATE INDEX %s ON bulkingest.bulkingest %s", idx.name, idx.definition) |
| 204 | + t.L().Printf("Creating index %d/%d: %s", i+1, len(indexDefinitions), indexSQL) |
| 205 | + if _, err := db.Exec(indexSQL); err != nil { |
| 206 | + t.Fatal(err) |
| 207 | + } |
| 208 | + } |
| 209 | + |
| 210 | + t.L().Printf("Computing table statistics manually") |
| 211 | + if _, err := db.Exec("CREATE STATISTICS stats FROM bulkingest.bulkingest"); err != nil { |
| 212 | + t.Fatal(err) |
| 213 | + } |
| 214 | + |
| 215 | + // Enable scrub jobs for EXPERIMENTAL SCRUB to use job system. |
| 216 | + if _, err := db.Exec("SET enable_scrub_job = on"); err != nil { |
| 217 | + t.Fatal(err) |
| 218 | + } |
| 219 | + |
| 220 | + // Run INSPECT, ticking the corresponding histogram. |
| 221 | + for i, cfg := range configs { |
| 222 | + checks := i + 1 |
| 223 | + t.L().Printf("Running INSPECT with %d check(s): %s", checks, cfg.indexListSQL) |
| 224 | + |
| 225 | + // Tick before starting INSPECT for this specific metric. |
| 226 | + tickHistogram(cfg.metricName) |
| 227 | + before := timeutil.Now() |
| 228 | + |
| 229 | + // TODO(148365): Update to use INSPECT syntax when SQL is ready. |
| 230 | + scrubSQL := fmt.Sprintf("EXPERIMENTAL SCRUB TABLE bulkingest.bulkingest WITH OPTIONS INDEX (%s)", cfg.indexListSQL) |
| 231 | + if _, err := db.Exec(scrubSQL); err != nil { |
| 232 | + t.Fatal(err) |
| 233 | + } |
| 234 | + |
| 235 | + // Tick after INSPECT completes to capture elapsed time for this specific metric. |
| 236 | + tickHistogram(cfg.metricName) |
| 237 | + duration := timeutil.Since(before) |
| 238 | + t.L().Printf("INSPECT with %d check(s) took %v\n", checks, duration) |
| 239 | + |
| 240 | + // Query the job progress to get the total check count. Since each span |
| 241 | + // handles all indexes, we divide by the number of checks to get the |
| 242 | + // actual span count. This is important for debugging because INSPECT |
| 243 | + // concurrency is based on spans. So, knowing the number of spans vs |
| 244 | + // CPUs helps understand how well INSPECT parallelized this job. |
| 245 | + var jobTotalCheckCount int64 |
| 246 | + querySQL := ` |
| 247 | + SELECT coalesce( |
| 248 | + (crdb_internal.pb_to_json( |
| 249 | + 'cockroach.sql.jobs.jobspb.Progress', |
| 250 | + value |
| 251 | + )->'inspect'->>'jobTotalCheckCount')::INT8, |
| 252 | + 0 |
| 253 | + ) |
| 254 | + FROM system.job_info |
| 255 | + WHERE job_id = ( |
| 256 | + SELECT job_id |
| 257 | + FROM [SHOW JOBS] |
| 258 | + WHERE job_type = 'INSPECT' |
| 259 | + ORDER BY created DESC |
| 260 | + LIMIT 1 |
| 261 | + ) |
| 262 | + AND info_key = 'legacy_progress'` |
| 263 | + err := db.QueryRow(querySQL).Scan(&jobTotalCheckCount) |
| 264 | + if err != nil { |
| 265 | + t.L().Printf("Warning: failed to query job total check count: %v", err) |
| 266 | + } else { |
| 267 | + // Each span handles all indexes, so divide by number of checks to get span count |
| 268 | + spanCount := int(jobTotalCheckCount) / checks |
| 269 | + totalCPUs := numNodes * numCPUs |
| 270 | + spansPerCPU := float64(spanCount) / float64(totalCPUs) |
| 271 | + t.L().Printf("INSPECT completed %d checks across %d spans (%.2f spans/CPU with %d total CPUs)\n", |
| 272 | + jobTotalCheckCount, spanCount, spansPerCPU, totalCPUs) |
| 273 | + } |
| 274 | + } |
| 275 | + }, |
| 276 | + } |
| 277 | +} |
| 278 | + |
| 279 | +// disableRowCountValidation disables automatic row count validation during import. |
| 280 | +// This is necessary for INSPECT tests because row count validation uses INSPECT |
| 281 | +// behind the covers, and we want to control when INSPECT runs. |
| 282 | +func disableRowCountValidation(t test.Test, db *gosql.DB) { |
| 283 | + t.Helper() |
| 284 | + t.L().Printf("Disabling automatic row count validation") |
| 285 | + _, err := db.Exec("SET CLUSTER SETTING bulkio.import.row_count_validation.unsafe.mode = 'off'") |
| 286 | + // If we get an error, it's because the cluster setting is considered unsafe. |
| 287 | + // So, we need extract the interlock key from the error. |
| 288 | + if err != nil { |
| 289 | + var pqErr *pq.Error |
| 290 | + if !errors.As(err, &pqErr) { |
| 291 | + t.Fatalf("expected pq.Error, got %T: %v", err, err) |
| 292 | + } |
| 293 | + if !strings.HasPrefix(pqErr.Detail, "key: ") { |
| 294 | + t.Fatalf("expected error detail to start with 'key: ', got: %s", pqErr.Detail) |
| 295 | + } |
| 296 | + interlockKey := strings.TrimPrefix(pqErr.Detail, "key: ") |
| 297 | + |
| 298 | + // Set the interlock key and retry. |
| 299 | + if _, err := db.Exec("SET unsafe_setting_interlock_key = $1", interlockKey); err != nil { |
| 300 | + t.Fatal(err) |
| 301 | + } |
| 302 | + if _, err := db.Exec("SET CLUSTER SETTING bulkio.import.row_count_validation.unsafe.mode = 'off'"); err != nil { |
| 303 | + t.Fatal(err) |
| 304 | + } |
| 305 | + } |
| 306 | +} |
0 commit comments