Skip to content

Commit c89e646

Browse files
committed
sql/inspect: add job metrics for INSPECT jobs
Implements three metrics for INSPECT jobs: - jobs.inspect.runs: Total number of INSPECT job executions - jobs.inspect.runs_with_issues: Count of jobs that found inconsistencies - jobs.inspect.issues_found: Aggregate count of issues found across all processors Closes #153884 Epic: CRDB-30356 Release note: none
1 parent 564109b commit c89e646

File tree

8 files changed

+235
-2
lines changed

8 files changed

+235
-2
lines changed

docs/generated/metrics/metrics.yaml

Lines changed: 24 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -5290,6 +5290,14 @@ layers:
52905290
unit: COUNT
52915291
aggregation: AVG
52925292
derivative: NON_NEGATIVE_DERIVATIVE
5293+
- name: jobs.inspect.issues_found
5294+
exported_name: jobs_inspect_issues_found
5295+
description: Total count of issues found by INSPECT jobs
5296+
y_axis_label: Issues
5297+
type: COUNTER
5298+
unit: COUNT
5299+
aggregation: AVG
5300+
derivative: NON_NEGATIVE_DERIVATIVE
52935301
- name: jobs.inspect.protected_age_sec
52945302
exported_name: jobs_inspect_protected_age_sec
52955303
labeled_name: 'jobs.protected_age_sec{type: inspect}'
@@ -5335,6 +5343,22 @@ layers:
53355343
unit: COUNT
53365344
aggregation: AVG
53375345
derivative: NON_NEGATIVE_DERIVATIVE
5346+
- name: jobs.inspect.runs
5347+
exported_name: jobs_inspect_runs
5348+
description: Number of INSPECT jobs executed
5349+
y_axis_label: Jobs
5350+
type: COUNTER
5351+
unit: COUNT
5352+
aggregation: AVG
5353+
derivative: NON_NEGATIVE_DERIVATIVE
5354+
- name: jobs.inspect.runs_with_issues
5355+
exported_name: jobs_inspect_runs_with_issues
5356+
description: Number of INSPECT jobs that found at least one issue
5357+
y_axis_label: Jobs
5358+
type: COUNTER
5359+
unit: COUNT
5360+
aggregation: AVG
5361+
derivative: NON_NEGATIVE_DERIVATIVE
53385362
- name: jobs.key_visualizer.currently_idle
53395363
exported_name: jobs_key_visualizer_currently_idle
53405364
labeled_name: 'jobs{type: key_visualizer, status: currently_idle}'

pkg/jobs/metrics.go

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -42,6 +42,7 @@ type Metrics struct {
4242
Changefeed metric.Struct
4343
StreamIngest metric.Struct
4444
Backup metric.Struct
45+
Inspect metric.Struct
4546

4647
// AdoptIterations counts the number of adopt loops executed by Registry.
4748
AdoptIterations *metric.Counter
@@ -402,6 +403,9 @@ func (m *Metrics) init(histogramWindowInterval time.Duration, lookup *cidr.Looku
402403
if MakeBackupMetricsHook != nil {
403404
m.Backup = MakeBackupMetricsHook(histogramWindowInterval)
404405
}
406+
if MakeInspectMetricsHook != nil {
407+
m.Inspect = MakeInspectMetricsHook(histogramWindowInterval)
408+
}
405409

406410
m.AdoptIterations = metric.NewCounter(metaAdoptIterations)
407411
m.ClaimedJobs = metric.NewCounter(metaClaimedJobs)
@@ -482,6 +486,9 @@ var MakeRowLevelTTLMetricsHook func(time.Duration) metric.Struct
482486
// MakeBackupMetricsHook allows for registration of backup metrics.
483487
var MakeBackupMetricsHook func(time.Duration) metric.Struct
484488

489+
// MakeInspectMetricsHook allows for registration of inspect metrics.
490+
var MakeInspectMetricsHook func(time.Duration) metric.Struct
491+
485492
// JobTelemetryMetrics is a telemetry metrics for individual job types.
486493
type JobTelemetryMetrics struct {
487494
Successful telemetry.Counter

pkg/sql/inspect/BUILD.bazel

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -6,6 +6,7 @@ go_library(
66
"index_consistency_check.go",
77
"inspect_job.go",
88
"inspect_logger.go",
9+
"inspect_metrics.go",
910
"inspect_processor.go",
1011
"issue.go",
1112
"log_sink.go",
@@ -48,6 +49,7 @@ go_library(
4849
"//pkg/util/ctxgroup",
4950
"//pkg/util/hlc",
5051
"//pkg/util/log",
52+
"//pkg/util/metric",
5153
"//pkg/util/protoutil",
5254
"//pkg/util/span",
5355
"//pkg/util/syncutil",
@@ -64,6 +66,7 @@ go_test(
6466
srcs = [
6567
"index_consistency_check_test.go",
6668
"inspect_job_test.go",
69+
"inspect_metrics_test.go",
6770
"inspect_processor_test.go",
6871
"issue_test.go",
6972
"main_test.go",

pkg/sql/inspect/inspect_job.go

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -38,6 +38,8 @@ func (c *inspectResumer) Resume(ctx context.Context, execCtx interface{}) error
3838
jobExecCtx := execCtx.(sql.JobExecContext)
3939
execCfg := jobExecCtx.ExecCfg()
4040

41+
execCfg.JobRegistry.MetricsStruct().Inspect.(*InspectMetrics).Runs.Inc(1)
42+
4143
if err := c.maybeRunOnJobStartHook(execCfg); err != nil {
4244
return err
4345
}
@@ -93,6 +95,11 @@ func (c *inspectResumer) OnFailOrCancel(
9395
jobExecCtx := execCtx.(sql.JobExecContext)
9496
execCfg := jobExecCtx.ExecCfg()
9597
c.maybeCleanupProtectedTimestamp(ctx, execCfg)
98+
99+
// Record RunsWithIssues metric if the job failed due to finding inconsistencies.
100+
if jobErr != nil && errors.Is(jobErr, errInspectFoundInconsistencies) {
101+
execCfg.JobRegistry.MetricsStruct().Inspect.(*InspectMetrics).RunsWithIssues.Inc(1)
102+
}
96103
return nil
97104
}
98105

pkg/sql/inspect/inspect_logger.go

Lines changed: 22 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -7,7 +7,9 @@ package inspect
77

88
import (
99
"context"
10+
"sync/atomic"
1011

12+
"github.com/cockroachdb/cockroach/pkg/util/metric"
1113
"github.com/cockroachdb/errors"
1214
)
1315

@@ -45,3 +47,23 @@ func (l inspectLoggers) hasIssues() bool {
4547
}
4648
return false
4749
}
50+
51+
// metricsLogger increments metrics when issues are logged.
52+
type metricsLogger struct {
53+
foundIssue atomic.Bool
54+
issuesFoundCtr *metric.Counter
55+
}
56+
57+
var _ inspectLogger = &metricsLogger{}
58+
59+
// logIssue implements the inspectLogger interface.
60+
func (m *metricsLogger) logIssue(ctx context.Context, issue *inspectIssue) error {
61+
m.foundIssue.Store(true)
62+
m.issuesFoundCtr.Inc(1)
63+
return nil
64+
}
65+
66+
// hasIssues implements the inspectLogger interface.
67+
func (m *metricsLogger) hasIssues() bool {
68+
return m.foundIssue.Load()
69+
}

pkg/sql/inspect/inspect_metrics.go

Lines changed: 59 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,59 @@
1+
// Copyright 2025 The Cockroach Authors.
2+
//
3+
// Use of this software is governed by the CockroachDB Software License
4+
// included in the /LICENSE file.
5+
6+
package inspect
7+
8+
import (
9+
"time"
10+
11+
"github.com/cockroachdb/cockroach/pkg/jobs"
12+
"github.com/cockroachdb/cockroach/pkg/util/metric"
13+
)
14+
15+
// InspectMetrics holds metrics for INSPECT job operations.
16+
type InspectMetrics struct {
17+
Runs *metric.Counter
18+
RunsWithIssues *metric.Counter
19+
IssuesFound *metric.Counter
20+
}
21+
22+
var _ metric.Struct = (*InspectMetrics)(nil)
23+
24+
// MetricStruct implements the metric.Struct interface.
25+
func (InspectMetrics) MetricStruct() {}
26+
27+
var (
28+
metaInspectRuns = metric.Metadata{
29+
Name: "jobs.inspect.runs",
30+
Help: "Number of INSPECT jobs executed",
31+
Measurement: "Jobs",
32+
Unit: metric.Unit_COUNT,
33+
}
34+
metaInspectRunsWithIssues = metric.Metadata{
35+
Name: "jobs.inspect.runs_with_issues",
36+
Help: "Number of INSPECT jobs that found at least one issue",
37+
Measurement: "Jobs",
38+
Unit: metric.Unit_COUNT,
39+
}
40+
metaInspectIssuesFound = metric.Metadata{
41+
Name: "jobs.inspect.issues_found",
42+
Help: "Total count of issues found by INSPECT jobs",
43+
Measurement: "Issues",
44+
Unit: metric.Unit_COUNT,
45+
}
46+
)
47+
48+
// MakeInspectMetrics instantiates the metrics for INSPECT jobs.
49+
func MakeInspectMetrics(histogramWindow time.Duration) metric.Struct {
50+
return &InspectMetrics{
51+
Runs: metric.NewCounter(metaInspectRuns),
52+
RunsWithIssues: metric.NewCounter(metaInspectRunsWithIssues),
53+
IssuesFound: metric.NewCounter(metaInspectIssuesFound),
54+
}
55+
}
56+
57+
func init() {
58+
jobs.MakeInspectMetricsHook = MakeInspectMetrics
59+
}
Lines changed: 101 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,101 @@
1+
// Copyright 2025 The Cockroach Authors.
2+
//
3+
// Use of this software is governed by the CockroachDB Software License
4+
// included in the /LICENSE file.
5+
6+
package inspect
7+
8+
import (
9+
"context"
10+
"testing"
11+
12+
"github.com/cockroachdb/cockroach/pkg/base"
13+
"github.com/cockroachdb/cockroach/pkg/sql"
14+
"github.com/cockroachdb/cockroach/pkg/sql/catalog/desctestutils"
15+
"github.com/cockroachdb/cockroach/pkg/sql/sem/tree"
16+
"github.com/cockroachdb/cockroach/pkg/testutils/serverutils"
17+
"github.com/cockroachdb/cockroach/pkg/testutils/sqlutils"
18+
"github.com/cockroachdb/cockroach/pkg/util/leaktest"
19+
"github.com/cockroachdb/cockroach/pkg/util/log"
20+
"github.com/stretchr/testify/require"
21+
)
22+
23+
// TestInspectMetrics verifies that INSPECT job metrics are correctly updated
24+
// when jobs run.
25+
func TestInspectMetrics(t *testing.T) {
26+
defer leaktest.AfterTest(t)()
27+
defer log.Scope(t).Close(t)
28+
29+
ctx := context.Background()
30+
s, db, _ := serverutils.StartServer(t, base.TestServerArgs{
31+
DefaultTestTenant: base.TestIsSpecificToStorageLayerAndNeedsASystemTenant,
32+
})
33+
defer s.Stopper().Stop(ctx)
34+
35+
runner := sqlutils.MakeSQLRunner(db)
36+
runner.Exec(t, `
37+
CREATE DATABASE db;
38+
SET enable_scrub_job = true;
39+
CREATE TABLE db.t (
40+
id INT PRIMARY KEY,
41+
val INT
42+
);
43+
CREATE INDEX i1 on db.t (val);
44+
INSERT INTO db.t VALUES (1, 2), (2, 3);
45+
`)
46+
47+
execCfg := s.ApplicationLayer().ExecutorConfig().(sql.ExecutorConfig)
48+
metrics := execCfg.JobRegistry.MetricsStruct().Inspect.(*InspectMetrics)
49+
50+
initialRuns := metrics.Runs.Count()
51+
initialRunsWithIssues := metrics.RunsWithIssues.Count()
52+
initialIssuesFound := metrics.IssuesFound.Count()
53+
54+
// First run: no corruption, should succeed without issues
55+
runner.Exec(t, "EXPERIMENTAL SCRUB TABLE db.t")
56+
require.Equal(t, initialRuns+1, metrics.Runs.Count(), "Runs counter should increment")
57+
require.Equal(t, initialRunsWithIssues, metrics.RunsWithIssues.Count(), "RunsWithIssues should not increment")
58+
require.Equal(t, initialIssuesFound, metrics.IssuesFound.Count(), "IssuesFound should not increment")
59+
60+
// Create corruption: delete a secondary index entry for row (1, 2)
61+
// This creates a "missing_secondary_index_entry" issue - the primary key exists
62+
// but the corresponding secondary index entry is missing.
63+
kvDB := s.DB()
64+
codec := s.Codec()
65+
tableDesc := desctestutils.TestingGetPublicTableDescriptor(kvDB, codec, "db", "t")
66+
secIndex := tableDesc.PublicNonPrimaryIndexes()[0] // i1 index on (val)
67+
row := []tree.Datum{
68+
tree.NewDInt(1), // id
69+
tree.NewDInt(2), // val
70+
}
71+
err := deleteSecondaryIndexEntry(ctx, row, kvDB, tableDesc, secIndex)
72+
require.NoError(t, err)
73+
74+
// Second run: with corruption, should detect the missing index entry.
75+
_, err = db.Exec("EXPERIMENTAL SCRUB TABLE db.t")
76+
require.Error(t, err, "INSPECT should fail when corruption is detected")
77+
require.Contains(t, err.Error(), "INSPECT found inconsistencies")
78+
require.Equal(t, initialRuns+2, metrics.Runs.Count(),
79+
"Runs counter should increment for each job execution")
80+
require.Equal(t, initialRunsWithIssues+1, metrics.RunsWithIssues.Count(),
81+
"RunsWithIssues should increment when issues are found")
82+
require.Equal(t, initialIssuesFound+1, metrics.IssuesFound.Count(),
83+
"IssuesFound should increment when issues are detected")
84+
85+
// Third run: on a different clean table to verify RunsWithIssues doesn't increment again.
86+
runner.Exec(t, `
87+
CREATE TABLE db.t2 (
88+
id INT PRIMARY KEY,
89+
val INT
90+
);
91+
CREATE INDEX i2 on db.t2 (val);
92+
INSERT INTO db.t2 VALUES (1, 2), (2, 3);
93+
`)
94+
runner.Exec(t, "EXPERIMENTAL SCRUB TABLE db.t2")
95+
require.Equal(t, initialRuns+3, metrics.Runs.Count(),
96+
"Runs counter should increment for third job execution")
97+
require.Equal(t, initialRunsWithIssues+1, metrics.RunsWithIssues.Count(),
98+
"RunsWithIssues should NOT increment for successful job")
99+
require.Equal(t, initialIssuesFound+1, metrics.IssuesFound.Count(),
100+
"IssuesFound should NOT increment for successful job")
101+
}

pkg/sql/inspect/inspect_processor.go

Lines changed: 12 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -29,6 +29,10 @@ import (
2929
)
3030

3131
var (
32+
// errInspectFoundInconsistencies is a sentinel error used to mark errors
33+
// returned when INSPECT jobs find data inconsistencies.
34+
errInspectFoundInconsistencies = pgerror.New(pgcode.DataException, "INSPECT found inconsistencies")
35+
3236
processorConcurrencyOverride = settings.RegisterIntSetting(
3337
settings.ApplicationLevel,
3438
"sql.inspect.processor_concurrency",
@@ -176,7 +180,7 @@ func (p *inspectProcessor) runInspect(ctx context.Context, output execinfra.RowR
176180

177181
log.Dev.Infof(ctx, "INSPECT processor completed processorID=%d issuesFound=%t", p.processorID, p.logger.hasIssues())
178182
if p.logger.hasIssues() {
179-
return pgerror.Newf(pgcode.DataException, "INSPECT found inconsistencies")
183+
return errInspectFoundInconsistencies
180184
}
181185
return nil
182186
}
@@ -194,15 +198,21 @@ func getProcessorConcurrency(flowCtx *execinfra.FlowCtx) int {
194198

195199
// getInspectLogger returns a logger for the inspect processor.
196200
func getInspectLogger(flowCtx *execinfra.FlowCtx, jobID jobspb.JobID) inspectLogger {
201+
execCfg := flowCtx.Cfg.ExecutorConfig.(*sql.ExecutorConfig)
202+
metrics := execCfg.JobRegistry.MetricsStruct().Inspect.(*InspectMetrics)
203+
197204
loggers := inspectLoggers{
198205
&logSink{},
199206
&tableSink{
200207
db: flowCtx.Cfg.DB,
201208
jobID: jobID,
202209
},
210+
&metricsLogger{
211+
issuesFoundCtr: metrics.IssuesFound,
212+
},
203213
}
204214

205-
knobs := flowCtx.Cfg.ExecutorConfig.(*sql.ExecutorConfig).InspectTestingKnobs
215+
knobs := execCfg.InspectTestingKnobs
206216
if knobs != nil && knobs.InspectIssueLogger != nil {
207217
loggers = append(loggers, knobs.InspectIssueLogger.(inspectLogger))
208218
}

0 commit comments

Comments
 (0)