Skip to content

Commit 3706673

Browse files
authored
Merge pull request #154358 from spilchen/backport25.4-154136
release-25.4: sql/inspect: add progress tracking to INSPECT
2 parents 0da194d + 43323fe commit 3706673

File tree

9 files changed

+552
-33
lines changed

9 files changed

+552
-33
lines changed

pkg/jobs/jobspb/jobs.proto

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1518,7 +1518,13 @@ message HotRangesLoggerProgress {
15181518
}
15191519

15201520
message InspectProgress {
1521+
int64 job_total_check_count = 1;
1522+
int64 job_completed_check_count = 2;
1523+
}
15211524

1525+
message InspectProcessorProgress {
1526+
int64 checks_completed = 1; // Number of checks completed since last update.
1527+
bool finished = 2; // Processor finished all checks.
15221528
}
15231529

15241530
message ImportRollbackProgress {}

pkg/sql/inspect/BUILD.bazel

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -9,6 +9,7 @@ go_library(
99
"inspect_processor.go",
1010
"issue.go",
1111
"log_sink.go",
12+
"progress.go",
1213
"runner.go",
1314
"span_source.go",
1415
"table_sink.go",
@@ -46,10 +47,13 @@ go_library(
4647
"//pkg/util/ctxgroup",
4748
"//pkg/util/hlc",
4849
"//pkg/util/log",
50+
"//pkg/util/stop",
51+
"//pkg/util/syncutil",
4952
"//pkg/util/timeutil",
5053
"//pkg/util/tracing",
5154
"@com_github_cockroachdb_errors//:errors",
5255
"@com_github_cockroachdb_redact//:redact",
56+
"@com_github_gogo_protobuf//types",
5357
],
5458
)
5559

@@ -61,6 +65,7 @@ go_test(
6165
"inspect_processor_test.go",
6266
"issue_test.go",
6367
"main_test.go",
68+
"progress_test.go",
6469
"runner_test.go",
6570
"table_sink_test.go",
6671
],
@@ -71,13 +76,15 @@ go_test(
7176
}),
7277
deps = [
7378
"//pkg/base",
79+
"//pkg/jobs",
7480
"//pkg/jobs/jobspb",
7581
"//pkg/keys",
7682
"//pkg/kv",
7783
"//pkg/kv/kvserver/protectedts",
7884
"//pkg/roachpb",
7985
"//pkg/security/securityassets",
8086
"//pkg/security/securitytest",
87+
"//pkg/security/username",
8188
"//pkg/server",
8289
"//pkg/settings/cluster",
8390
"//pkg/sql",
@@ -99,8 +106,10 @@ go_test(
99106
"//pkg/util/leaktest",
100107
"//pkg/util/log",
101108
"//pkg/util/syncutil",
109+
"//pkg/util/uuid",
102110
"@com_github_cockroachdb_errors//:errors",
103111
"@com_github_cockroachdb_redact//:redact",
112+
"@com_github_gogo_protobuf//types",
104113
"@com_github_stretchr_testify//require",
105114
],
106115
)

pkg/sql/inspect/index_consistency_check_test.go

Lines changed: 7 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -518,10 +518,12 @@ func TestDetectIndexConsistencyErrors(t *testing.T) {
518518

519519
// Validate job status matches expected outcome
520520
var jobStatus string
521-
r.QueryRow(t, `SELECT status FROM [SHOW JOBS] WHERE job_type = 'INSPECT' ORDER BY job_id DESC LIMIT 1`).Scan(&jobStatus)
521+
var fractionCompleted float64
522+
r.QueryRow(t, `SELECT status, fraction_completed FROM [SHOW JOBS] WHERE job_type = 'INSPECT' ORDER BY job_id DESC LIMIT 1`).Scan(&jobStatus, &fractionCompleted)
522523

523524
if tc.expectedErrRegex == "" {
524525
require.Equal(t, "succeeded", jobStatus, "expected job to succeed when no issues found")
526+
require.InEpsilon(t, 1.0, fractionCompleted, 0.01, "progress should reach 100%% on successful completion")
525527
} else {
526528
require.Equal(t, "failed", jobStatus, "expected job to fail when inconsistencies found")
527529
}
@@ -575,8 +577,10 @@ func TestIndexConsistencyWithReservedWordColumns(t *testing.T) {
575577
require.NoError(t, err, "should succeed on table with reserved word column names")
576578
require.Equal(t, 0, issueLogger.numIssuesFound(), "No issues should be found in happy path test")
577579

578-
// Verify job succeeded
580+
// Verify job succeeded and progress reached 100%
579581
var jobStatus string
580-
r.QueryRow(t, `SELECT status FROM [SHOW JOBS] WHERE job_type = 'INSPECT' ORDER BY job_id DESC LIMIT 1`).Scan(&jobStatus)
582+
var fractionCompleted float64
583+
r.QueryRow(t, `SELECT status, fraction_completed FROM [SHOW JOBS] WHERE job_type = 'INSPECT' ORDER BY job_id DESC LIMIT 1`).Scan(&jobStatus, &fractionCompleted)
581584
require.Equal(t, "succeeded", jobStatus, "INSPECT job should succeed")
585+
require.InEpsilon(t, 1.0, fractionCompleted, 0.01, "progress should reach 100%% on successful completion")
582586
}

pkg/sql/inspect/inspect_job.go

Lines changed: 75 additions & 19 deletions
Original file line numberDiff line numberDiff line change
@@ -18,7 +18,6 @@ import (
1818
"github.com/cockroachdb/cockroach/pkg/sql/catalog/descpb"
1919
"github.com/cockroachdb/cockroach/pkg/sql/catalog/descs"
2020
"github.com/cockroachdb/cockroach/pkg/sql/execinfrapb"
21-
"github.com/cockroachdb/cockroach/pkg/sql/isql"
2221
"github.com/cockroachdb/cockroach/pkg/sql/physicalplan"
2322
"github.com/cockroachdb/cockroach/pkg/sql/sem/tree"
2423
"github.com/cockroachdb/cockroach/pkg/sql/types"
@@ -67,13 +66,19 @@ func (c *inspectResumer) Resume(ctx context.Context, execCtx interface{}) error
6766
return err
6867
}
6968

70-
if err := c.runInspectPlan(ctx, jobExecCtx, planCtx, plan); err != nil {
69+
progressTracker, cleanupProgress, err := c.setupProgressTracking(ctx, execCfg, pkSpans)
70+
if err != nil {
71+
return err
72+
}
73+
defer cleanupProgress()
74+
75+
if err := c.runInspectPlan(ctx, jobExecCtx, planCtx, plan, progressTracker); err != nil {
7176
return err
7277
}
7378

7479
c.maybeCleanupProtectedTimestamp(ctx, execCfg)
7580

76-
return c.markJobComplete(ctx)
81+
return nil
7782
}
7883

7984
// OnFailOrCancel implements the Resumer interface
@@ -181,19 +186,26 @@ func (c *inspectResumer) planInspectProcessors(
181186
}
182187

183188
// runInspectPlan executes the distributed physical plan for the INSPECT job.
184-
// It sets up a metadata-only DistSQL receiver to collect any execution errors,
185-
// then runs the plan using the provided planning context and evaluation context.
186-
// This function returns any error surfaced via metadata from the processors.
189+
// It sets up a metadata-only DistSQL receiver to collect any execution errors
190+
// and progress updates, then runs the plan using the provided planning context
191+
// and evaluation context. This function returns any error surfaced via metadata
192+
// from the processors.
187193
func (c *inspectResumer) runInspectPlan(
188194
ctx context.Context,
189195
jobExecCtx sql.JobExecContext,
190196
planCtx *sql.PlanningCtx,
191197
plan *sql.PhysicalPlan,
198+
progressTracker *inspectProgressTracker,
192199
) error {
193200
execCfg := jobExecCtx.ExecCfg()
194201

195202
metadataCallbackWriter := sql.NewMetadataOnlyMetadataCallbackWriter(
196-
func(context.Context, *execinfrapb.ProducerMetadata) error { return nil })
203+
func(ctx context.Context, meta *execinfrapb.ProducerMetadata) error {
204+
if meta.BulkProcessorProgress != nil {
205+
return progressTracker.handleProcessorProgress(ctx, meta)
206+
}
207+
return nil
208+
})
197209

198210
distSQLReceiver := sql.MakeDistSQLReceiver(
199211
ctx,
@@ -216,18 +228,62 @@ func (c *inspectResumer) runInspectPlan(
216228
return metadataCallbackWriter.Err()
217229
}
218230

219-
func (c *inspectResumer) markJobComplete(ctx context.Context) error {
220-
// TODO(148297): add fine-grained progress reporting
221-
return c.job.NoTxn().Update(ctx,
222-
func(_ isql.Txn, md jobs.JobMetadata, ju *jobs.JobUpdater) error {
223-
progress := md.Progress
224-
progress.Progress = &jobspb.Progress_FractionCompleted{
225-
FractionCompleted: 1,
226-
}
227-
ju.UpdateProgress(progress)
228-
return nil
229-
},
230-
)
231+
// setupProgressTracking initializes and starts progress tracking for the INSPECT job.
232+
// It calculates the total number of applicable checks, initializes progress to 0%, and starts
233+
// the background update goroutine. Returns the progress tracker and cleanup function.
234+
func (c *inspectResumer) setupProgressTracking(
235+
ctx context.Context, execCfg *sql.ExecutorConfig, pkSpans []roachpb.Span,
236+
) (*inspectProgressTracker, func(), error) {
237+
fractionInterval := fractionUpdateInterval.Get(&execCfg.Settings.SV)
238+
details := c.job.Details().(jobspb.InspectDetails)
239+
240+
// Build applicability checkers for progress tracking
241+
applicabilityCheckers, err := buildApplicabilityCheckers(details)
242+
if err != nil {
243+
return nil, nil, err
244+
}
245+
246+
// Calculate total applicable checks: only count checks that will actually run
247+
totalCheckCount, err := countApplicableChecks(pkSpans, applicabilityCheckers, execCfg.Codec)
248+
if err != nil {
249+
return nil, nil, err
250+
}
251+
252+
// Create and initialize progress tracker
253+
progressTracker := newInspectProgressTracker(c.job, fractionInterval)
254+
if err := progressTracker.initJobProgress(ctx, totalCheckCount); err != nil {
255+
progressTracker.terminateTracker(ctx)
256+
return nil, nil, err
257+
}
258+
259+
// Start background progress updates
260+
progressTracker.startBackgroundUpdates(ctx)
261+
262+
// Return cleanup function
263+
cleanup := func() {
264+
progressTracker.terminateTracker(ctx)
265+
}
266+
267+
return progressTracker, cleanup, nil
268+
}
269+
270+
// buildApplicabilityCheckers creates lightweight applicability checkers from InspectDetails.
271+
// These are used only for progress calculation and don't require the full check machinery.
272+
func buildApplicabilityCheckers(
273+
details jobspb.InspectDetails,
274+
) ([]inspectCheckApplicability, error) {
275+
checkers := make([]inspectCheckApplicability, 0, len(details.Checks))
276+
for _, specCheck := range details.Checks {
277+
switch specCheck.Type {
278+
case jobspb.InspectCheckIndexConsistency:
279+
checkers = append(checkers, &indexConsistencyCheckApplicability{
280+
tableID: specCheck.TableID,
281+
})
282+
default:
283+
return nil, errors.AssertionFailedf("unsupported inspect check type: %v", specCheck.Type)
284+
}
285+
}
286+
return checkers, nil
231287
}
232288

233289
// maybeProtectTimestamp creates a protected timestamp record for the AsOf

pkg/sql/inspect/inspect_processor.go

Lines changed: 63 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -8,6 +8,7 @@ package inspect
88
import (
99
"context"
1010
"runtime"
11+
"time"
1112

1213
"github.com/cockroachdb/cockroach/pkg/jobs/jobspb"
1314
"github.com/cockroachdb/cockroach/pkg/roachpb"
@@ -24,6 +25,7 @@ import (
2425
"github.com/cockroachdb/cockroach/pkg/util/log"
2526
"github.com/cockroachdb/cockroach/pkg/util/tracing"
2627
"github.com/cockroachdb/errors"
28+
pbtypes "github.com/gogo/protobuf/types"
2729
)
2830

2931
var (
@@ -35,6 +37,14 @@ var (
3537
0,
3638
settings.NonNegativeInt,
3739
)
40+
41+
fractionUpdateInterval = settings.RegisterDurationSetting(
42+
settings.ApplicationLevel,
43+
"sql.inspect.fraction_update_interval",
44+
"the amount of time between INSPECT job percent complete progress updates",
45+
10*time.Second,
46+
settings.DurationWithMinimum(1*time.Millisecond),
47+
)
3848
)
3949

4050
type inspectCheckFactory func(asOf hlc.Timestamp) inspectCheck
@@ -116,7 +126,7 @@ func (p *inspectProcessor) runInspect(ctx context.Context, output execinfra.RowR
116126
// Channel is closed, no more spans to process.
117127
return nil
118128
}
119-
if err := p.processSpan(ctx, span, workerIndex); err != nil {
129+
if err := p.processSpan(ctx, span, workerIndex, output); err != nil {
120130
// On error, return it. ctxgroup will cancel all other goroutines.
121131
return err
122132
}
@@ -150,6 +160,12 @@ func (p *inspectProcessor) runInspect(ctx context.Context, output execinfra.RowR
150160
if err := group.Wait(); err != nil {
151161
return err
152162
}
163+
164+
// Send final completion message to indicate this processor is finished
165+
if err := p.sendInspectProgress(ctx, output, 0, true /* finished */); err != nil {
166+
return err
167+
}
168+
153169
log.Dev.Infof(ctx, "INSPECT processor completed processorID=%d issuesFound=%t", p.processorID, p.logger.hasIssues())
154170
if p.logger.hasIssues() {
155171
return pgerror.Newf(pgcode.DataException, "INSPECT found inconsistencies")
@@ -188,9 +204,10 @@ func getInspectLogger(flowCtx *execinfra.FlowCtx, jobID jobspb.JobID) inspectLog
188204

189205
// processSpan executes all configured inspect checks against a single span.
190206
// It instantiates a fresh set of checks from the configured factories and uses
191-
// an inspectRunner to drive their execution.
207+
// an inspectRunner to drive their execution. After processing the span, it
208+
// sends progress metadata to the coordinator.
192209
func (p *inspectProcessor) processSpan(
193-
ctx context.Context, span roachpb.Span, workerIndex int,
210+
ctx context.Context, span roachpb.Span, workerIndex int, output execinfra.RowReceiver,
194211
) (err error) {
195212
asOfToUse := p.getTimestampForSpan()
196213

@@ -222,6 +239,10 @@ func (p *inspectProcessor) processSpan(
222239
}
223240
}()
224241

242+
// Keep track of completed checks for progress updates.
243+
initialCheckCount := len(checks)
244+
lastSeenCheckCount := initialCheckCount
245+
225246
// Process all checks on the given span.
226247
for {
227248
ok, stepErr := runner.Step(ctx, p.cfg, span, workerIndex)
@@ -231,7 +252,46 @@ func (p *inspectProcessor) processSpan(
231252
if !ok {
232253
break
233254
}
255+
256+
// Check if any inspections have completed (when the count decreases).
257+
currentCheckCount := runner.CheckCount()
258+
if currentCheckCount < lastSeenCheckCount {
259+
checksCompleted := lastSeenCheckCount - currentCheckCount
260+
lastSeenCheckCount = currentCheckCount
261+
if err := p.sendInspectProgress(ctx, output, int64(checksCompleted), false /* finished */); err != nil {
262+
return err
263+
}
264+
}
234265
}
266+
267+
return nil
268+
}
269+
270+
// sendInspectProgress marshals and sends inspect processor progress via BulkProcessorProgress.
271+
func (p *inspectProcessor) sendInspectProgress(
272+
ctx context.Context, output execinfra.RowReceiver, checksCompleted int64, finished bool,
273+
) error {
274+
progressMsg := &jobspb.InspectProcessorProgress{
275+
ChecksCompleted: checksCompleted,
276+
Finished: finished,
277+
}
278+
279+
progressAny, err := pbtypes.MarshalAny(progressMsg)
280+
if err != nil {
281+
return errors.Wrapf(err, "unable to marshal inspect processor progress")
282+
}
283+
284+
meta := &execinfrapb.ProducerMetadata{
285+
BulkProcessorProgress: &execinfrapb.RemoteProducerMetadata_BulkProcessorProgress{
286+
ProgressDetails: *progressAny,
287+
NodeID: p.flowCtx.NodeID.SQLInstanceID(),
288+
FlowID: p.flowCtx.ID,
289+
ProcessorID: p.processorID,
290+
Drained: finished,
291+
},
292+
}
293+
294+
output.Push(nil, meta)
235295
return nil
236296
}
237297

0 commit comments

Comments
 (0)