|
| 1 | +// Copyright 2025 The Cockroach Authors. |
| 2 | +// |
| 3 | +// Use of this software is governed by the CockroachDB Software License |
| 4 | +// included in the /LICENSE file. |
| 5 | + |
| 6 | +package tests |
| 7 | + |
| 8 | +import ( |
| 9 | + "context" |
| 10 | + "fmt" |
| 11 | + "time" |
| 12 | + |
| 13 | + "github.com/cockroachdb/cockroach/pkg/cmd/roachprod/grafana" |
| 14 | + "github.com/cockroachdb/cockroach/pkg/cmd/roachtest/cluster" |
| 15 | + roachtestgrafana "github.com/cockroachdb/cockroach/pkg/cmd/roachtest/grafana" |
| 16 | + "github.com/cockroachdb/cockroach/pkg/cmd/roachtest/option" |
| 17 | + "github.com/cockroachdb/cockroach/pkg/cmd/roachtest/registry" |
| 18 | + "github.com/cockroachdb/cockroach/pkg/cmd/roachtest/roachtestutil" |
| 19 | + "github.com/cockroachdb/cockroach/pkg/cmd/roachtest/spec" |
| 20 | + "github.com/cockroachdb/cockroach/pkg/cmd/roachtest/test" |
| 21 | + "github.com/cockroachdb/cockroach/pkg/roachprod/install" |
| 22 | + "github.com/cockroachdb/cockroach/pkg/roachprod/prometheus" |
| 23 | + "github.com/cockroachdb/cockroach/pkg/util/timeutil" |
| 24 | + "github.com/cockroachdb/cockroach/pkg/workload/histogram" |
| 25 | +) |
| 26 | + |
| 27 | +func registerInspectAdmissionControl(r registry.Registry) { |
| 28 | + r.Add(makeInspectAdmissionControlTest(r, 4, 8, 25_000_000, 5*time.Hour)) |
| 29 | +} |
| 30 | + |
| 31 | +// makeInspectAdmissionControlTest creates a test that sets up a CRDB cluster, loads it |
| 32 | +// up with bulkingest data, and sets up a foreground read-only workload. It then |
| 33 | +// runs INSPECT twice: once with the default low QoS priority and once with |
| 34 | +// normal priority, to measure the impact on foreground latency. |
| 35 | +// |
| 36 | +// The test ensures sufficient ranges are created so that INSPECT work is |
| 37 | +// well-distributed across the CPUs. |
| 38 | +func makeInspectAdmissionControlTest( |
| 39 | + r registry.Registry, numCRDBNodes, numCPUs, numRows int, timeout time.Duration, |
| 40 | +) registry.TestSpec { |
| 41 | + // totalNodes includes CRDB nodes + 1 workload node |
| 42 | + totalNodes := numCRDBNodes + 1 |
| 43 | + |
| 44 | + return registry.TestSpec{ |
| 45 | + Name: fmt.Sprintf("inspect/admission-control/nodes=%d/cpu=%d/rows=%d", numCRDBNodes, numCPUs, numRows), |
| 46 | + Timeout: timeout, |
| 47 | + Owner: registry.OwnerSQLFoundations, |
| 48 | + Benchmark: true, |
| 49 | + CompatibleClouds: registry.AllExceptAWS, |
| 50 | + Suites: registry.Suites(registry.Weekly), |
| 51 | + Cluster: r.MakeClusterSpec(totalNodes, spec.CPU(numCPUs), spec.WorkloadNode()), |
| 52 | + Leases: registry.MetamorphicLeases, |
| 53 | + PostProcessPerfMetrics: func(test string, histogram *roachtestutil.HistogramMetric) (roachtestutil.AggregatedPerfMetrics, error) { |
| 54 | + metrics := roachtestutil.AggregatedPerfMetrics{} |
| 55 | + |
| 56 | + for _, summary := range histogram.Summaries { |
| 57 | + totalElapsed := summary.TotalElapsed |
| 58 | + if totalElapsed == 0 { |
| 59 | + continue |
| 60 | + } |
| 61 | + |
| 62 | + // Calculate throughput in rows/sec per CPU. |
| 63 | + // TotalElapsed is in milliseconds, convert to seconds. |
| 64 | + inspectDuration := totalElapsed / 1000 |
| 65 | + if inspectDuration == 0 { |
| 66 | + inspectDuration = 1 // Avoid division by zero. |
| 67 | + } |
| 68 | + totalCPUs := int64(numCRDBNodes * numCPUs) |
| 69 | + throughput := roachtestutil.MetricPoint(float64(numRows) / float64(totalCPUs*inspectDuration)) |
| 70 | + |
| 71 | + // Use the summary name (e.g., "admission_control_enabled") as part of the metric name. |
| 72 | + metrics = append(metrics, &roachtestutil.AggregatedMetric{ |
| 73 | + Name: fmt.Sprintf("%s_%s_throughput", test, summary.Name), |
| 74 | + Value: throughput, |
| 75 | + Unit: "rows/s/cpu", |
| 76 | + IsHigherBetter: true, |
| 77 | + }) |
| 78 | + } |
| 79 | + |
| 80 | + return metrics, nil |
| 81 | + }, |
| 82 | + Run: func(ctx context.Context, t test.Test, c cluster.Cluster) { |
| 83 | + // Adjust for local testing |
| 84 | + rowsToImport := numRows |
| 85 | + targetRanges := numCRDBNodes * numCPUs * 2 |
| 86 | + if c.IsLocal() { |
| 87 | + rowsToImport = 100_000 |
| 88 | + targetRanges = 4 |
| 89 | + } |
| 90 | + |
| 91 | + // Calculate bulkingest parameters to achieve target row count |
| 92 | + // We'll use b × c = 1000, so a = numRows / 1000 |
| 93 | + bNum := 1000 |
| 94 | + cNum := 1000 |
| 95 | + aNum := rowsToImport / (bNum * cNum) |
| 96 | + |
| 97 | + // For local, use simpler values |
| 98 | + if c.IsLocal() { |
| 99 | + aNum = 100000 |
| 100 | + bNum = 1 |
| 101 | + cNum = 1 |
| 102 | + } |
| 103 | + |
| 104 | + payloadBytes := 40 |
| 105 | + |
| 106 | + c.Start( |
| 107 | + ctx, t.L(), option.NewStartOpts(option.NoBackupSchedule), |
| 108 | + install.MakeClusterSettings(), c.CRDBNodes(), |
| 109 | + ) |
| 110 | + |
| 111 | + { |
| 112 | + promCfg := &prometheus.Config{} |
| 113 | + promCfg.WithPrometheusNode(c.WorkloadNode().InstallNodes()[0]) |
| 114 | + promCfg.WithNodeExporter(c.All().InstallNodes()) |
| 115 | + promCfg.WithCluster(c.CRDBNodes().InstallNodes()) |
| 116 | + promCfg.WithGrafanaDashboardJSON(roachtestgrafana.SnapshotAdmissionControlGrafanaJSON) |
| 117 | + promCfg.ScrapeConfigs = append(promCfg.ScrapeConfigs, prometheus.MakeWorkloadScrapeConfig("workload", |
| 118 | + "/", makeWorkloadScrapeNodes(c.WorkloadNode().InstallNodes()[0], []workloadInstance{ |
| 119 | + {nodes: c.WorkloadNode()}, |
| 120 | + }))) |
| 121 | + _, cleanupFunc := setupPrometheusForRoachtest(ctx, t, c, promCfg, []workloadInstance{{nodes: c.WorkloadNode()}}) |
| 122 | + defer cleanupFunc() |
| 123 | + } |
| 124 | + |
| 125 | + baselineDuration, err := time.ParseDuration(roachtestutil.IfLocal(c, "30s", "5m")) |
| 126 | + if err != nil { |
| 127 | + t.Fatal(err) |
| 128 | + } |
| 129 | + |
| 130 | + // Set up histogram tracking for throughput metrics |
| 131 | + metricNames := []string{"calibration", "admission_control_enabled", "admission_control_disabled"} |
| 132 | + exporter := roachtestutil.CreateWorkloadHistogramExporter(t, c) |
| 133 | + reg, perfBuf := initInspectHistograms(timeout*2, t, exporter, metricNames) |
| 134 | + defer roachtestutil.CloseExporter(ctx, exporter, t, c, perfBuf, c.Node(1), "") |
| 135 | + |
| 136 | + // Helper to tick a specific named histogram |
| 137 | + tickHistogram := func(name string) { |
| 138 | + reg.Tick(func(tick histogram.Tick) { |
| 139 | + if tick.Name == name { |
| 140 | + _ = tick.Exporter.SnapshotAndWrite(tick.Hist, tick.Now, tick.Elapsed, &tick.Name) |
| 141 | + } |
| 142 | + }) |
| 143 | + } |
| 144 | + |
| 145 | + db := c.Conn(ctx, t.L(), 1) |
| 146 | + defer db.Close() |
| 147 | + |
| 148 | + // Helper to add Grafana annotations. |
| 149 | + addAnnotation := func(text string, startTime time.Time) { |
| 150 | + err := c.AddGrafanaAnnotation(ctx, t.L(), grafana.AddAnnotationRequest{ |
| 151 | + Text: text, |
| 152 | + StartTime: startTime.UnixMilli(), |
| 153 | + EndTime: timeutil.Now().UnixMilli(), |
| 154 | + }) |
| 155 | + if err != nil { |
| 156 | + t.L().Printf("failed to add grafana annotation: %v", err) |
| 157 | + } |
| 158 | + } |
| 159 | + |
| 160 | + // Helper to run INSPECT with timing, metrics, and logging. |
| 161 | + runInspect := func(description, histogramName, annotationText string) time.Duration { |
| 162 | + t.Status("running " + description) |
| 163 | + tickHistogram(histogramName) |
| 164 | + startTime := timeutil.Now() |
| 165 | + if _, err := db.ExecContext(ctx, "SET enable_inspect_command = true"); err != nil { |
| 166 | + t.Fatal(err) |
| 167 | + } |
| 168 | + if _, err := db.ExecContext(ctx, "INSPECT TABLE bulkingest.bulkingest"); err != nil { |
| 169 | + t.Fatal(err) |
| 170 | + } |
| 171 | + tickHistogram(histogramName) |
| 172 | + duration := timeutil.Since(startTime) |
| 173 | + addAnnotation(annotationText, startTime) |
| 174 | + |
| 175 | + totalCPUs := numCRDBNodes * numCPUs |
| 176 | + throughput := float64(rowsToImport) / float64(totalCPUs) / duration.Seconds() |
| 177 | + t.L().Printf("%s completed in %v (%.2f rows/s/cpu)\n", description, duration, throughput) |
| 178 | + |
| 179 | + return duration |
| 180 | + } |
| 181 | + |
| 182 | + if !t.SkipInit() { |
| 183 | + t.Status("importing bulkingest dataset") |
| 184 | + |
| 185 | + // Disable automatic row count validation during import |
| 186 | + disableRowCountValidation(t, db) |
| 187 | + |
| 188 | + // Import bulkingest data with the default secondary index |
| 189 | + cmdImport := fmt.Sprintf( |
| 190 | + "./cockroach workload fixtures import bulkingest {pgurl:1} --a %d --b %d --c %d --payload-bytes %d", |
| 191 | + aNum, bNum, cNum, payloadBytes, |
| 192 | + ) |
| 193 | + c.Run(ctx, option.WithNodes(c.WorkloadNode()), cmdImport) |
| 194 | + |
| 195 | + // Split the table into the target number of ranges |
| 196 | + t.Status(fmt.Sprintf("splitting table into ~%d ranges", targetRanges)) |
| 197 | + splitSQL := fmt.Sprintf("ALTER TABLE bulkingest.bulkingest SPLIT AT SELECT (i * %d) // %d FROM generate_series(1, %d-1) AS i", |
| 198 | + aNum, targetRanges, targetRanges) |
| 199 | + if _, err := db.Exec(splitSQL); err != nil { |
| 200 | + t.Fatal(err) |
| 201 | + } |
| 202 | + |
| 203 | + // Scatter the ranges to ensure even distribution before INSPECT. |
| 204 | + t.Status("scattering ranges across cluster") |
| 205 | + if _, err := db.Exec("ALTER TABLE bulkingest.bulkingest SCATTER"); err != nil { |
| 206 | + t.Fatal(err) |
| 207 | + } |
| 208 | + |
| 209 | + // Initialize kv workload for foreground traffic |
| 210 | + t.Status("initializing kv workload") |
| 211 | + splits := roachtestutil.IfLocal(c, " --splits=3", " --splits=100") |
| 212 | + c.Run(ctx, option.WithNodes(c.WorkloadNode()), "./cockroach workload init kv"+splits+" {pgurl:1}") |
| 213 | + } |
| 214 | + |
| 215 | + // Run calibration INSPECT without workload to determine how long the |
| 216 | + // workload should run. |
| 217 | + calibrationDuration := runInspect( |
| 218 | + "calibration INSPECT (no workload, admission control enabled)", |
| 219 | + "calibration", |
| 220 | + "INSPECT (AC, no load)", |
| 221 | + ) |
| 222 | + |
| 223 | + // Estimate total workload duration: |
| 224 | + // - INSPECT with admission control (~9.5× calibrationDuration) |
| 225 | + // - INSPECT without admission control (~0.5× calibrationDuration) |
| 226 | + // - baseline overhead before and in between runs (2× baselineDuration) |
| 227 | + // Add a 25% safety buffer. |
| 228 | + estimatedDuration := baselineDuration*2 + calibrationDuration*10 |
| 229 | + workloadDuration := time.Duration(float64(estimatedDuration) * 1.25) |
| 230 | + t.L().Printf("Setting workload duration to %v (based on calibration: %v)\n", workloadDuration, calibrationDuration) |
| 231 | + |
| 232 | + // Run a read-only kv workload to match INSPECT’s read-only behavior. |
| 233 | + // This avoids write pressure, keeps the dataset static, and ensures |
| 234 | + // comparisons reflect load differences, not data drift. |
| 235 | + t.Status(fmt.Sprintf("starting read-only kv workload for %v", workloadDuration)) |
| 236 | + m := c.NewDeprecatedMonitor(ctx, c.CRDBNodes()) |
| 237 | + |
| 238 | + m.Go(func(ctx context.Context) error { |
| 239 | + concurrency := roachtestutil.IfLocal(c, "8", "256") |
| 240 | + c.Run(ctx, option.WithNodes(c.WorkloadNode()), |
| 241 | + fmt.Sprintf("./cockroach workload run kv --read-percent=100 --duration=%s --concurrency=%s {pgurl%s}", |
| 242 | + workloadDuration, concurrency, c.CRDBNodes()), |
| 243 | + ) |
| 244 | + return nil |
| 245 | + }) |
| 246 | + |
| 247 | + t.Status(fmt.Sprintf("waiting %v for workload to run before starting INSPECT", baselineDuration)) |
| 248 | + time.Sleep(baselineDuration) |
| 249 | + |
| 250 | + _ = runInspect( |
| 251 | + "INSPECT under load with admission control enabled", |
| 252 | + "admission_control_enabled", |
| 253 | + "INSPECT (AC, load)", |
| 254 | + ) |
| 255 | + |
| 256 | + t.Status(fmt.Sprintf("waiting %v between INSPECTs", baselineDuration)) |
| 257 | + time.Sleep(baselineDuration) |
| 258 | + |
| 259 | + // Disable admission control for second INSPECT. This should have more of an |
| 260 | + // impact to the foreground workload. |
| 261 | + t.Status("disabling admission control for INSPECT") |
| 262 | + if _, err := db.ExecContext(ctx, "SET CLUSTER SETTING sql.inspect.admission_control.enabled = false"); err != nil { |
| 263 | + t.Fatal(err) |
| 264 | + } |
| 265 | + |
| 266 | + _ = runInspect( |
| 267 | + "INSPECT under load with admission control disabled", |
| 268 | + "admission_control_disabled", |
| 269 | + "INSPECT (no AC, load)", |
| 270 | + ) |
| 271 | + |
| 272 | + // Wait for workload to complete. |
| 273 | + m.Wait() |
| 274 | + }, |
| 275 | + } |
| 276 | +} |
0 commit comments