Skip to content

Commit 926540b

Browse files
committed
sql/inspect: fix progress tracking bugs causing over 100% completion
Fix two bugs in INSPECT job progress tracking that caused inaccurate completion percentages, sometimes exceeding 100%. The first issue was in inspect_processor.go, where processSpan() checked the loop termination condition (!ok) before accounting for the final completed check. When the last check finished we would exit the loop before reporting the final check as complete. The second issue was in inspect_job.go, where initProgressFromPlan() calculated total checks using PK spans rather than the partitioned spans actually used by processors. The number of total checks that we think need to be complete were much lower than the partitioned spans. This caused us to report completion rates in excess of 100%. Epic: CRDB-30356 Informs: #154457 Release note: none
1 parent 14ff833 commit 926540b

File tree

4 files changed

+165
-27
lines changed

4 files changed

+165
-27
lines changed

pkg/sql/inspect/index_consistency_check_test.go

Lines changed: 21 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -31,6 +31,21 @@ import (
3131

3232
const expectedInspectFoundInconsistencies = "INSPECT found inconsistencies"
3333

34+
// requireCheckCountsMatch verifies that the job's total check count equals its completed check count.
35+
// This is used to verify that progress tracking correctly counted all checks.
36+
func requireCheckCountsMatch(t *testing.T, r *sqlutils.SQLRunner, jobID int64) {
37+
t.Helper()
38+
var totalChecks, completedChecks int64
39+
r.QueryRow(t, `
40+
SELECT
41+
(crdb_internal.pb_to_json('cockroach.sql.jobs.jobspb.Progress', value)->'inspect'->>'jobTotalCheckCount')::INT,
42+
(crdb_internal.pb_to_json('cockroach.sql.jobs.jobspb.Progress', value)->'inspect'->>'jobCompletedCheckCount')::INT
43+
FROM system.job_info
44+
WHERE job_id = $1 AND info_key = 'legacy_progress'
45+
`, jobID).Scan(&totalChecks, &completedChecks)
46+
require.Equal(t, totalChecks, completedChecks, "total checks should equal completed checks when job succeeds")
47+
}
48+
3449
// encodeSecondaryIndexEntry encodes row data into a secondary index entry.
3550
// The datums must be ordered according to the table's public columns.
3651
// Returns the encoded index entry, expecting exactly one entry to be produced.
@@ -517,13 +532,15 @@ func TestDetectIndexConsistencyErrors(t *testing.T) {
517532
}
518533

519534
// Validate job status matches expected outcome
535+
var jobID int64
520536
var jobStatus string
521537
var fractionCompleted float64
522-
r.QueryRow(t, `SELECT status, fraction_completed FROM [SHOW JOBS] WHERE job_type = 'INSPECT' ORDER BY job_id DESC LIMIT 1`).Scan(&jobStatus, &fractionCompleted)
538+
r.QueryRow(t, `SELECT job_id, status, fraction_completed FROM [SHOW JOBS] WHERE job_type = 'INSPECT' ORDER BY job_id DESC LIMIT 1`).Scan(&jobID, &jobStatus, &fractionCompleted)
523539

524540
if tc.expectedErrRegex == "" {
525541
require.Equal(t, "succeeded", jobStatus, "expected job to succeed when no issues found")
526542
require.InEpsilon(t, 1.0, fractionCompleted, 0.01, "progress should reach 100%% on successful completion")
543+
requireCheckCountsMatch(t, r, jobID)
527544
} else {
528545
require.Equal(t, "failed", jobStatus, "expected job to fail when inconsistencies found")
529546
}
@@ -578,9 +595,11 @@ func TestIndexConsistencyWithReservedWordColumns(t *testing.T) {
578595
require.Equal(t, 0, issueLogger.numIssuesFound(), "No issues should be found in happy path test")
579596

580597
// Verify job succeeded and progress reached 100%
598+
var jobID int64
581599
var jobStatus string
582600
var fractionCompleted float64
583-
r.QueryRow(t, `SELECT status, fraction_completed FROM [SHOW JOBS] WHERE job_type = 'INSPECT' ORDER BY job_id DESC LIMIT 1`).Scan(&jobStatus, &fractionCompleted)
601+
r.QueryRow(t, `SELECT job_id, status, fraction_completed FROM [SHOW JOBS] WHERE job_type = 'INSPECT' ORDER BY job_id DESC LIMIT 1`).Scan(&jobID, &jobStatus, &fractionCompleted)
584602
require.Equal(t, "succeeded", jobStatus, "INSPECT job should succeed")
585603
require.InEpsilon(t, 1.0, fractionCompleted, 0.01, "progress should reach 100%% on successful completion")
604+
requireCheckCountsMatch(t, r, jobID)
586605
}

pkg/sql/inspect/inspect_job.go

Lines changed: 49 additions & 19 deletions
Original file line numberDiff line numberDiff line change
@@ -62,12 +62,14 @@ func (c *inspectResumer) Resume(ctx context.Context, execCtx interface{}) error
6262
return err
6363
}
6464

65-
progressTracker, cleanupProgress, remainingSpans, err := c.setupProgressTrackingAndFilter(ctx, execCfg, pkSpans)
65+
progressTracker, completedSpans, cleanupProgress, err := c.setupProgressTracking(ctx, execCfg)
6666
if err != nil {
6767
return err
6868
}
6969
defer cleanupProgress()
7070

71+
remainingSpans := c.filterCompletedSpans(pkSpans, completedSpans)
72+
7173
// If all spans are completed, job is done.
7274
if len(remainingSpans) == 0 {
7375
log.Dev.Infof(ctx, "all spans already completed, INSPECT job finished")
@@ -79,6 +81,13 @@ func (c *inspectResumer) Resume(ctx context.Context, execCtx interface{}) error
7981
return err
8082
}
8183

84+
// After planning, we have the finalized set of spans to process (adjacent
85+
// spans on the same node are merged). Compute the checks to run and initialize
86+
// progress tracking from the plan.
87+
if err := c.initProgressFromPlan(ctx, execCfg, progressTracker, plan, completedSpans); err != nil {
88+
return err
89+
}
90+
8291
if err := c.runInspectPlan(ctx, jobExecCtx, planCtx, plan, progressTracker); err != nil {
8392
return err
8493
}
@@ -240,11 +249,11 @@ func (c *inspectResumer) runInspectPlan(
240249
return metadataCallbackWriter.Err()
241250
}
242251

243-
// setupProgressTrackingAndFilter initializes progress tracking and returns
244-
// it, along with a cleanup function and the remaining spans to process.
245-
func (c *inspectResumer) setupProgressTrackingAndFilter(
246-
ctx context.Context, execCfg *sql.ExecutorConfig, pkSpans []roachpb.Span,
247-
) (*inspectProgressTracker, func(), []roachpb.Span, error) {
252+
// setupProgressTracking initializes progress tracking and returns
253+
// it, along with a cleanup function.
254+
func (c *inspectResumer) setupProgressTracking(
255+
ctx context.Context, execCfg *sql.ExecutorConfig,
256+
) (*inspectProgressTracker, []roachpb.Span, func(), error) {
248257
// Create and initialize the tracker. We use the completed spans from the job
249258
// (if any) to filter out the spans we need to process in this run of the job.
250259
progressTracker := newInspectProgressTracker(
@@ -256,32 +265,53 @@ func (c *inspectResumer) setupProgressTrackingAndFilter(
256265
if err != nil {
257266
return nil, nil, nil, err
258267
}
259-
remainingSpans := c.filterCompletedSpans(pkSpans, completedSpans)
268+
269+
cleanup := func() {
270+
progressTracker.terminateTracker()
271+
}
272+
273+
return progressTracker, completedSpans, cleanup, nil
274+
}
275+
276+
// initProgressFromPlan initializes job progress based on the actual partitioned spans
277+
// that will be processed.
278+
func (c *inspectResumer) initProgressFromPlan(
279+
ctx context.Context,
280+
execCfg *sql.ExecutorConfig,
281+
progressTracker *inspectProgressTracker,
282+
plan *sql.PhysicalPlan,
283+
completedPartitionedSpans []roachpb.Span,
284+
) error {
285+
// Extract all spans from the plan processors.
286+
var remainingPartitionedSpans []roachpb.Span
287+
for i := range plan.Processors {
288+
if plan.Processors[i].Spec.Core.Inspect != nil {
289+
remainingPartitionedSpans = append(remainingPartitionedSpans, plan.Processors[i].Spec.Core.Inspect.Spans...)
290+
}
291+
}
260292

261293
applicabilityCheckers, err := buildApplicabilityCheckers(c.job.Details().(jobspb.InspectDetails))
262294
if err != nil {
263-
return nil, nil, nil, err
295+
return err
264296
}
265297

266298
// Calculate total applicable checks on ALL spans (not just remaining ones)
267299
// This ensures consistent progress calculation across job restarts.
268-
totalCheckCount, err := countApplicableChecks(pkSpans, applicabilityCheckers, execCfg.Codec)
300+
completedCheckCount, err := countApplicableChecks(completedPartitionedSpans, applicabilityCheckers, execCfg.Codec)
269301
if err != nil {
270-
return nil, nil, nil, err
302+
return err
271303
}
272-
completedCheckCount, err := countApplicableChecks(completedSpans, applicabilityCheckers, execCfg.Codec)
304+
remainingCheckCount, err := countApplicableChecks(remainingPartitionedSpans, applicabilityCheckers, execCfg.Codec)
273305
if err != nil {
274-
return nil, nil, nil, err
306+
return err
275307
}
276308

277-
if err := progressTracker.initJobProgress(ctx, totalCheckCount, completedCheckCount); err != nil {
278-
return nil, nil, nil, err
279-
}
280-
cleanup := func() {
281-
progressTracker.terminateTracker()
282-
}
309+
totalCheckCount := completedCheckCount + remainingCheckCount
310+
311+
log.Dev.Infof(ctx, "INSPECT progress init: %d partitioned spans, %d total checks (%d remaining + %d completed)",
312+
len(remainingPartitionedSpans), totalCheckCount, remainingCheckCount, completedCheckCount)
283313

284-
return progressTracker, cleanup, remainingSpans, nil
314+
return progressTracker.initJobProgress(ctx, totalCheckCount, completedCheckCount)
285315
}
286316

287317
// filterCompletedSpans removes spans that are already completed from the list to process.

pkg/sql/inspect/inspect_job_test.go

Lines changed: 90 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -129,11 +129,12 @@ func TestInspectJobImplicitTxnSemantics(t *testing.T) {
129129
}()
130130

131131
// Wait for the job to finish.
132+
var jobID int64
132133
var status string
133134
var fractionCompleted float64
134135
testutils.SucceedsSoon(t, func() error {
135-
row := db.QueryRow(`SELECT status, fraction_completed FROM [SHOW JOBS] WHERE job_type = 'INSPECT' ORDER BY job_id DESC LIMIT 1`)
136-
if err := row.Scan(&status, &fractionCompleted); err != nil {
136+
row := db.QueryRow(`SELECT job_id, status, fraction_completed FROM [SHOW JOBS] WHERE job_type = 'INSPECT' ORDER BY job_id DESC LIMIT 1`)
137+
if err := row.Scan(&jobID, &status, &fractionCompleted); err != nil {
137138
return err
138139
}
139140
if status == "succeeded" || status == "failed" {
@@ -304,3 +305,90 @@ func TestInspectJobProtectedTimestamp(t *testing.T) {
304305
})
305306
}
306307
}
308+
309+
// TestInspectProgressWithMultiRangeTable is a regression test for the bug where
310+
// INSPECT progress could exceed 100% when a table had many ranges. This test
311+
// creates a multi-node cluster with a multi-range table and verifies that:
312+
// 1. The progress never exceeds 100%
313+
// 2. The final progress is exactly 100% when the job completes
314+
// 3. The total check count is based on partitioned spans, not PK spans
315+
func TestInspectProgressWithMultiRangeTable(t *testing.T) {
316+
defer leaktest.AfterTest(t)()
317+
defer log.Scope(t).Close(t)
318+
skip.UnderShort(t)
319+
320+
ctx := context.Background()
321+
const numNodes = 3
322+
tc := serverutils.StartCluster(t, numNodes, base.TestClusterArgs{})
323+
defer tc.Stopper().Stop(ctx)
324+
325+
db := tc.ServerConn(0)
326+
runner := sqlutils.MakeSQLRunner(db)
327+
328+
// Create a table and split it into multiple ranges.
329+
runner.Exec(t, `
330+
CREATE DATABASE testdb;
331+
USE testdb;
332+
CREATE TABLE multi_range_table (
333+
id INT PRIMARY KEY,
334+
val1 INT,
335+
val2 STRING,
336+
INDEX idx_val1 (val1),
337+
INDEX idx_val2 (val2)
338+
);
339+
`)
340+
341+
// Insert data to create multiple ranges. We'll insert enough data and then
342+
// manually split to ensure multiple ranges.
343+
runner.Exec(t, `INSERT INTO multi_range_table SELECT i, i*2, md5(i::STRING) FROM generate_series(1, 1000) AS g(i)`)
344+
for i := 100; i <= 900; i += 100 {
345+
runner.Exec(t, `ALTER TABLE multi_range_table SPLIT AT VALUES ($1)`, i)
346+
}
347+
runner.Exec(t, `ALTER TABLE multi_range_table SCATTER`)
348+
349+
// Wait for scatter to distribute ranges across nodes.
350+
var rangeCount, nodeCount int
351+
testutils.SucceedsSoon(t, func() error {
352+
// Count total ranges and verify distribution.
353+
runner.QueryRow(t, `
354+
WITH r AS (SHOW RANGES FROM TABLE multi_range_table WITH DETAILS)
355+
SELECT count(DISTINCT range_id), count(DISTINCT lease_holder)
356+
FROM r
357+
`).Scan(&rangeCount, &nodeCount)
358+
359+
if rangeCount <= 5 {
360+
return errors.Newf("waiting for splits: only %d ranges", rangeCount)
361+
}
362+
363+
if nodeCount < 2 {
364+
return errors.Newf("waiting for scatter to multiple nodes: ranges only on %d node(s)", nodeCount)
365+
}
366+
367+
return nil
368+
})
369+
370+
// Start the INSPECT job.
371+
// TODO(148365): Run INSPECT instead of SCRUB.
372+
t.Logf("Starting INSPECT job on table with %d ranges distributed across %d nodes", rangeCount, nodeCount)
373+
_, err := db.Exec(`
374+
SET enable_scrub_job = true;
375+
COMMIT;
376+
EXPERIMENTAL SCRUB TABLE multi_range_table`)
377+
require.NoError(t, err)
378+
379+
var jobID int64
380+
var status string
381+
var fractionCompleted float64
382+
runner.QueryRow(t, `
383+
SELECT job_id, status, fraction_completed
384+
FROM [SHOW JOBS]
385+
WHERE job_type = 'INSPECT'
386+
ORDER BY created DESC
387+
LIMIT 1
388+
`).Scan(&jobID, &status, &fractionCompleted)
389+
t.Logf("Job %d: status=%s, fraction_completed=%.4f", jobID, status, fractionCompleted)
390+
391+
require.Equal(t, "succeeded", status, "INSPECT job should succeed")
392+
require.InEpsilon(t, 1.0, fractionCompleted, 0.01,
393+
"progress should be ~100%% at completion, got %.2f%%", fractionCompleted*100)
394+
}

pkg/sql/inspect/inspect_processor.go

Lines changed: 5 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -114,7 +114,7 @@ func (p *inspectProcessor) Run(ctx context.Context, output execinfra.RowReceiver
114114
// Each span is read from a buffered channel and passed to processSpan.
115115
// The function blocks until all spans are processed or an error occurs.
116116
func (p *inspectProcessor) runInspect(ctx context.Context, output execinfra.RowReceiver) error {
117-
log.Dev.Infof(ctx, "INSPECT processor started processorID=%d concurrency=%d", p.processorID, p.concurrency)
117+
log.Dev.Infof(ctx, "INSPECT processor started processorID=%d concurrency=%d spans=%d", p.processorID, p.concurrency, len(p.spec.Spans))
118118

119119
group := ctxgroup.WithContext(ctx)
120120

@@ -262,9 +262,6 @@ func (p *inspectProcessor) processSpan(
262262
if stepErr != nil {
263263
return stepErr
264264
}
265-
if !ok {
266-
break
267-
}
268265

269266
// Check if any inspections have completed (when the count decreases).
270267
currentCheckCount := runner.CheckCount()
@@ -275,6 +272,10 @@ func (p *inspectProcessor) processSpan(
275272
return err
276273
}
277274
}
275+
276+
if !ok {
277+
break
278+
}
278279
}
279280

280281
// Report span completion for checkpointing.

0 commit comments

Comments
 (0)