Skip to content

Commit 287999c

Browse files
craig[bot]fqazispilchen
committed
152112: sql/schemachanger: improve performance of TestBackupSuccess r=fqazi a=fqazi Previously, the successful backup tests would for each stage of a schema change would run the full schema change, taking a backup at the target stage. This patch modifies the TestBackupSuccess variants of the schema changer test to start the server once, take all the backups and then restore the stages we should be testing. Testing show around a ~30% improvement with this change in execution time. Fixes: #152076 Release note: None 154525: sql/inspect: add job metrics for INSPECT jobs r=spilchen a=spilchen Implements three metrics for INSPECT jobs: - jobs.inspect.runs: Total number of INSPECT job executions - jobs.inspect.runs_with_issues: Count of jobs that found inconsistencies - jobs.inspect.issues_found: Aggregate count of issues found across all processors Closes #153884 Epic: CRDB-30356 Release note: none Co-authored-by: Faizan Qazi <[email protected]> Co-authored-by: Matt Spilchen <[email protected]>
3 parents 24cce8b + 3af68bd + c89e646 commit 287999c

File tree

13 files changed

+537
-131
lines changed

13 files changed

+537
-131
lines changed

docs/generated/metrics/metrics.yaml

Lines changed: 24 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -5290,6 +5290,14 @@ layers:
52905290
unit: COUNT
52915291
aggregation: AVG
52925292
derivative: NON_NEGATIVE_DERIVATIVE
5293+
- name: jobs.inspect.issues_found
5294+
exported_name: jobs_inspect_issues_found
5295+
description: Total count of issues found by INSPECT jobs
5296+
y_axis_label: Issues
5297+
type: COUNTER
5298+
unit: COUNT
5299+
aggregation: AVG
5300+
derivative: NON_NEGATIVE_DERIVATIVE
52935301
- name: jobs.inspect.protected_age_sec
52945302
exported_name: jobs_inspect_protected_age_sec
52955303
labeled_name: 'jobs.protected_age_sec{type: inspect}'
@@ -5335,6 +5343,22 @@ layers:
53355343
unit: COUNT
53365344
aggregation: AVG
53375345
derivative: NON_NEGATIVE_DERIVATIVE
5346+
- name: jobs.inspect.runs
5347+
exported_name: jobs_inspect_runs
5348+
description: Number of INSPECT jobs executed
5349+
y_axis_label: Jobs
5350+
type: COUNTER
5351+
unit: COUNT
5352+
aggregation: AVG
5353+
derivative: NON_NEGATIVE_DERIVATIVE
5354+
- name: jobs.inspect.runs_with_issues
5355+
exported_name: jobs_inspect_runs_with_issues
5356+
description: Number of INSPECT jobs that found at least one issue
5357+
y_axis_label: Jobs
5358+
type: COUNTER
5359+
unit: COUNT
5360+
aggregation: AVG
5361+
derivative: NON_NEGATIVE_DERIVATIVE
53385362
- name: jobs.key_visualizer.currently_idle
53395363
exported_name: jobs_key_visualizer_currently_idle
53405364
labeled_name: 'jobs{type: key_visualizer, status: currently_idle}'

pkg/ccl/schemachangerccl/multiregion_testcluster_factory.go

Lines changed: 17 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -56,10 +56,8 @@ func (f MultiRegionTestClusterFactory) WithSchemaLockDisabled() sctest.TestServe
5656
return f
5757
}
5858

59-
// Run implements the sctest.TestServerFactory interface.
60-
func (f MultiRegionTestClusterFactory) Run(
61-
ctx context.Context, t *testing.T, fn func(_ serverutils.TestServerInterface, _ *gosql.DB),
62-
) {
59+
// Start implements the sctest.TestServerFactory interface.
60+
func (f MultiRegionTestClusterFactory) Start(ctx context.Context, t *testing.T) sctest.TestServer {
6361
const numServers = 3
6462
knobs := base.TestingKnobs{
6563
SQLEvalContext: &eval.TestingKnobs{
@@ -83,6 +81,19 @@ func (f MultiRegionTestClusterFactory) Run(
8381
}
8482
sql.CreateTableWithSchemaLocked.Override(ctx, &st.SV, !f.schemaLockedDisabled)
8583
c, db, _ := multiregionccltestutils.TestingCreateMultiRegionCluster(t, numServers, knobs, multiregionccltestutils.WithSettings(st))
86-
defer c.Stopper().Stop(ctx)
87-
fn(c.Server(0), db)
84+
return sctest.TestServer{
85+
Server: c.Server(0),
86+
DB: db,
87+
Stopper: func(t *testing.T) {
88+
c.Stopper().Stop(ctx)
89+
},
90+
}
91+
}
92+
93+
func (f MultiRegionTestClusterFactory) Run(
94+
ctx context.Context, t *testing.T, fn func(s serverutils.TestServerInterface, tdb *gosql.DB),
95+
) {
96+
s := f.Start(ctx, t)
97+
defer s.Stopper(t)
98+
fn(s.Server, s.DB)
8899
}

pkg/jobs/metrics.go

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -42,6 +42,7 @@ type Metrics struct {
4242
Changefeed metric.Struct
4343
StreamIngest metric.Struct
4444
Backup metric.Struct
45+
Inspect metric.Struct
4546

4647
// AdoptIterations counts the number of adopt loops executed by Registry.
4748
AdoptIterations *metric.Counter
@@ -402,6 +403,9 @@ func (m *Metrics) init(histogramWindowInterval time.Duration, lookup *cidr.Looku
402403
if MakeBackupMetricsHook != nil {
403404
m.Backup = MakeBackupMetricsHook(histogramWindowInterval)
404405
}
406+
if MakeInspectMetricsHook != nil {
407+
m.Inspect = MakeInspectMetricsHook(histogramWindowInterval)
408+
}
405409

406410
m.AdoptIterations = metric.NewCounter(metaAdoptIterations)
407411
m.ClaimedJobs = metric.NewCounter(metaClaimedJobs)
@@ -482,6 +486,9 @@ var MakeRowLevelTTLMetricsHook func(time.Duration) metric.Struct
482486
// MakeBackupMetricsHook allows for registration of backup metrics.
483487
var MakeBackupMetricsHook func(time.Duration) metric.Struct
484488

489+
// MakeInspectMetricsHook allows for registration of inspect metrics.
490+
var MakeInspectMetricsHook func(time.Duration) metric.Struct
491+
485492
// JobTelemetryMetrics is a telemetry metrics for individual job types.
486493
type JobTelemetryMetrics struct {
487494
Successful telemetry.Counter

pkg/sql/inspect/BUILD.bazel

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -6,6 +6,7 @@ go_library(
66
"index_consistency_check.go",
77
"inspect_job.go",
88
"inspect_logger.go",
9+
"inspect_metrics.go",
910
"inspect_processor.go",
1011
"issue.go",
1112
"log_sink.go",
@@ -48,6 +49,7 @@ go_library(
4849
"//pkg/util/ctxgroup",
4950
"//pkg/util/hlc",
5051
"//pkg/util/log",
52+
"//pkg/util/metric",
5153
"//pkg/util/protoutil",
5254
"//pkg/util/span",
5355
"//pkg/util/syncutil",
@@ -64,6 +66,7 @@ go_test(
6466
srcs = [
6567
"index_consistency_check_test.go",
6668
"inspect_job_test.go",
69+
"inspect_metrics_test.go",
6770
"inspect_processor_test.go",
6871
"issue_test.go",
6972
"main_test.go",

pkg/sql/inspect/inspect_job.go

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -38,6 +38,8 @@ func (c *inspectResumer) Resume(ctx context.Context, execCtx interface{}) error
3838
jobExecCtx := execCtx.(sql.JobExecContext)
3939
execCfg := jobExecCtx.ExecCfg()
4040

41+
execCfg.JobRegistry.MetricsStruct().Inspect.(*InspectMetrics).Runs.Inc(1)
42+
4143
if err := c.maybeRunOnJobStartHook(execCfg); err != nil {
4244
return err
4345
}
@@ -93,6 +95,11 @@ func (c *inspectResumer) OnFailOrCancel(
9395
jobExecCtx := execCtx.(sql.JobExecContext)
9496
execCfg := jobExecCtx.ExecCfg()
9597
c.maybeCleanupProtectedTimestamp(ctx, execCfg)
98+
99+
// Record RunsWithIssues metric if the job failed due to finding inconsistencies.
100+
if jobErr != nil && errors.Is(jobErr, errInspectFoundInconsistencies) {
101+
execCfg.JobRegistry.MetricsStruct().Inspect.(*InspectMetrics).RunsWithIssues.Inc(1)
102+
}
96103
return nil
97104
}
98105

pkg/sql/inspect/inspect_logger.go

Lines changed: 22 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -7,7 +7,9 @@ package inspect
77

88
import (
99
"context"
10+
"sync/atomic"
1011

12+
"github.com/cockroachdb/cockroach/pkg/util/metric"
1113
"github.com/cockroachdb/errors"
1214
)
1315

@@ -45,3 +47,23 @@ func (l inspectLoggers) hasIssues() bool {
4547
}
4648
return false
4749
}
50+
51+
// metricsLogger increments metrics when issues are logged.
52+
type metricsLogger struct {
53+
foundIssue atomic.Bool
54+
issuesFoundCtr *metric.Counter
55+
}
56+
57+
var _ inspectLogger = &metricsLogger{}
58+
59+
// logIssue implements the inspectLogger interface.
60+
func (m *metricsLogger) logIssue(ctx context.Context, issue *inspectIssue) error {
61+
m.foundIssue.Store(true)
62+
m.issuesFoundCtr.Inc(1)
63+
return nil
64+
}
65+
66+
// hasIssues implements the inspectLogger interface.
67+
func (m *metricsLogger) hasIssues() bool {
68+
return m.foundIssue.Load()
69+
}

pkg/sql/inspect/inspect_metrics.go

Lines changed: 59 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,59 @@
1+
// Copyright 2025 The Cockroach Authors.
2+
//
3+
// Use of this software is governed by the CockroachDB Software License
4+
// included in the /LICENSE file.
5+
6+
package inspect
7+
8+
import (
9+
"time"
10+
11+
"github.com/cockroachdb/cockroach/pkg/jobs"
12+
"github.com/cockroachdb/cockroach/pkg/util/metric"
13+
)
14+
15+
// InspectMetrics holds metrics for INSPECT job operations.
16+
type InspectMetrics struct {
17+
Runs *metric.Counter
18+
RunsWithIssues *metric.Counter
19+
IssuesFound *metric.Counter
20+
}
21+
22+
var _ metric.Struct = (*InspectMetrics)(nil)
23+
24+
// MetricStruct implements the metric.Struct interface.
25+
func (InspectMetrics) MetricStruct() {}
26+
27+
var (
28+
metaInspectRuns = metric.Metadata{
29+
Name: "jobs.inspect.runs",
30+
Help: "Number of INSPECT jobs executed",
31+
Measurement: "Jobs",
32+
Unit: metric.Unit_COUNT,
33+
}
34+
metaInspectRunsWithIssues = metric.Metadata{
35+
Name: "jobs.inspect.runs_with_issues",
36+
Help: "Number of INSPECT jobs that found at least one issue",
37+
Measurement: "Jobs",
38+
Unit: metric.Unit_COUNT,
39+
}
40+
metaInspectIssuesFound = metric.Metadata{
41+
Name: "jobs.inspect.issues_found",
42+
Help: "Total count of issues found by INSPECT jobs",
43+
Measurement: "Issues",
44+
Unit: metric.Unit_COUNT,
45+
}
46+
)
47+
48+
// MakeInspectMetrics instantiates the metrics for INSPECT jobs.
49+
func MakeInspectMetrics(histogramWindow time.Duration) metric.Struct {
50+
return &InspectMetrics{
51+
Runs: metric.NewCounter(metaInspectRuns),
52+
RunsWithIssues: metric.NewCounter(metaInspectRunsWithIssues),
53+
IssuesFound: metric.NewCounter(metaInspectIssuesFound),
54+
}
55+
}
56+
57+
func init() {
58+
jobs.MakeInspectMetricsHook = MakeInspectMetrics
59+
}
Lines changed: 101 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,101 @@
1+
// Copyright 2025 The Cockroach Authors.
2+
//
3+
// Use of this software is governed by the CockroachDB Software License
4+
// included in the /LICENSE file.
5+
6+
package inspect
7+
8+
import (
9+
"context"
10+
"testing"
11+
12+
"github.com/cockroachdb/cockroach/pkg/base"
13+
"github.com/cockroachdb/cockroach/pkg/sql"
14+
"github.com/cockroachdb/cockroach/pkg/sql/catalog/desctestutils"
15+
"github.com/cockroachdb/cockroach/pkg/sql/sem/tree"
16+
"github.com/cockroachdb/cockroach/pkg/testutils/serverutils"
17+
"github.com/cockroachdb/cockroach/pkg/testutils/sqlutils"
18+
"github.com/cockroachdb/cockroach/pkg/util/leaktest"
19+
"github.com/cockroachdb/cockroach/pkg/util/log"
20+
"github.com/stretchr/testify/require"
21+
)
22+
23+
// TestInspectMetrics verifies that INSPECT job metrics are correctly updated
24+
// when jobs run.
25+
func TestInspectMetrics(t *testing.T) {
26+
defer leaktest.AfterTest(t)()
27+
defer log.Scope(t).Close(t)
28+
29+
ctx := context.Background()
30+
s, db, _ := serverutils.StartServer(t, base.TestServerArgs{
31+
DefaultTestTenant: base.TestIsSpecificToStorageLayerAndNeedsASystemTenant,
32+
})
33+
defer s.Stopper().Stop(ctx)
34+
35+
runner := sqlutils.MakeSQLRunner(db)
36+
runner.Exec(t, `
37+
CREATE DATABASE db;
38+
SET enable_scrub_job = true;
39+
CREATE TABLE db.t (
40+
id INT PRIMARY KEY,
41+
val INT
42+
);
43+
CREATE INDEX i1 on db.t (val);
44+
INSERT INTO db.t VALUES (1, 2), (2, 3);
45+
`)
46+
47+
execCfg := s.ApplicationLayer().ExecutorConfig().(sql.ExecutorConfig)
48+
metrics := execCfg.JobRegistry.MetricsStruct().Inspect.(*InspectMetrics)
49+
50+
initialRuns := metrics.Runs.Count()
51+
initialRunsWithIssues := metrics.RunsWithIssues.Count()
52+
initialIssuesFound := metrics.IssuesFound.Count()
53+
54+
// First run: no corruption, should succeed without issues
55+
runner.Exec(t, "EXPERIMENTAL SCRUB TABLE db.t")
56+
require.Equal(t, initialRuns+1, metrics.Runs.Count(), "Runs counter should increment")
57+
require.Equal(t, initialRunsWithIssues, metrics.RunsWithIssues.Count(), "RunsWithIssues should not increment")
58+
require.Equal(t, initialIssuesFound, metrics.IssuesFound.Count(), "IssuesFound should not increment")
59+
60+
// Create corruption: delete a secondary index entry for row (1, 2)
61+
// This creates a "missing_secondary_index_entry" issue - the primary key exists
62+
// but the corresponding secondary index entry is missing.
63+
kvDB := s.DB()
64+
codec := s.Codec()
65+
tableDesc := desctestutils.TestingGetPublicTableDescriptor(kvDB, codec, "db", "t")
66+
secIndex := tableDesc.PublicNonPrimaryIndexes()[0] // i1 index on (val)
67+
row := []tree.Datum{
68+
tree.NewDInt(1), // id
69+
tree.NewDInt(2), // val
70+
}
71+
err := deleteSecondaryIndexEntry(ctx, row, kvDB, tableDesc, secIndex)
72+
require.NoError(t, err)
73+
74+
// Second run: with corruption, should detect the missing index entry.
75+
_, err = db.Exec("EXPERIMENTAL SCRUB TABLE db.t")
76+
require.Error(t, err, "INSPECT should fail when corruption is detected")
77+
require.Contains(t, err.Error(), "INSPECT found inconsistencies")
78+
require.Equal(t, initialRuns+2, metrics.Runs.Count(),
79+
"Runs counter should increment for each job execution")
80+
require.Equal(t, initialRunsWithIssues+1, metrics.RunsWithIssues.Count(),
81+
"RunsWithIssues should increment when issues are found")
82+
require.Equal(t, initialIssuesFound+1, metrics.IssuesFound.Count(),
83+
"IssuesFound should increment when issues are detected")
84+
85+
// Third run: on a different clean table to verify RunsWithIssues doesn't increment again.
86+
runner.Exec(t, `
87+
CREATE TABLE db.t2 (
88+
id INT PRIMARY KEY,
89+
val INT
90+
);
91+
CREATE INDEX i2 on db.t2 (val);
92+
INSERT INTO db.t2 VALUES (1, 2), (2, 3);
93+
`)
94+
runner.Exec(t, "EXPERIMENTAL SCRUB TABLE db.t2")
95+
require.Equal(t, initialRuns+3, metrics.Runs.Count(),
96+
"Runs counter should increment for third job execution")
97+
require.Equal(t, initialRunsWithIssues+1, metrics.RunsWithIssues.Count(),
98+
"RunsWithIssues should NOT increment for successful job")
99+
require.Equal(t, initialIssuesFound+1, metrics.IssuesFound.Count(),
100+
"IssuesFound should NOT increment for successful job")
101+
}

pkg/sql/inspect/inspect_processor.go

Lines changed: 12 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -29,6 +29,10 @@ import (
2929
)
3030

3131
var (
32+
// errInspectFoundInconsistencies is a sentinel error used to mark errors
33+
// returned when INSPECT jobs find data inconsistencies.
34+
errInspectFoundInconsistencies = pgerror.New(pgcode.DataException, "INSPECT found inconsistencies")
35+
3236
processorConcurrencyOverride = settings.RegisterIntSetting(
3337
settings.ApplicationLevel,
3438
"sql.inspect.processor_concurrency",
@@ -176,7 +180,7 @@ func (p *inspectProcessor) runInspect(ctx context.Context, output execinfra.RowR
176180

177181
log.Dev.Infof(ctx, "INSPECT processor completed processorID=%d issuesFound=%t", p.processorID, p.logger.hasIssues())
178182
if p.logger.hasIssues() {
179-
return pgerror.Newf(pgcode.DataException, "INSPECT found inconsistencies")
183+
return errInspectFoundInconsistencies
180184
}
181185
return nil
182186
}
@@ -194,15 +198,21 @@ func getProcessorConcurrency(flowCtx *execinfra.FlowCtx) int {
194198

195199
// getInspectLogger returns a logger for the inspect processor.
196200
func getInspectLogger(flowCtx *execinfra.FlowCtx, jobID jobspb.JobID) inspectLogger {
201+
execCfg := flowCtx.Cfg.ExecutorConfig.(*sql.ExecutorConfig)
202+
metrics := execCfg.JobRegistry.MetricsStruct().Inspect.(*InspectMetrics)
203+
197204
loggers := inspectLoggers{
198205
&logSink{},
199206
&tableSink{
200207
db: flowCtx.Cfg.DB,
201208
jobID: jobID,
202209
},
210+
&metricsLogger{
211+
issuesFoundCtr: metrics.IssuesFound,
212+
},
203213
}
204214

205-
knobs := flowCtx.Cfg.ExecutorConfig.(*sql.ExecutorConfig).InspectTestingKnobs
215+
knobs := execCfg.InspectTestingKnobs
206216
if knobs != nil && knobs.InspectIssueLogger != nil {
207217
loggers = append(loggers, knobs.InspectIssueLogger.(inspectLogger))
208218
}

0 commit comments

Comments
 (0)