Skip to content

Commit 299240d

Browse files
craig[bot]spilchenZhouXing19
committed
154136: sql/inspect: add progress tracking to INSPECT r=spilchen a=spilchen Previously, when you ran an INSPECT job we didn't maintain any kind of progress tracking. This commit adds it. The granularity that we use for progress tracking is with checks x spans. Note: each check won't always use all spans, so we only count spans that apply to the check. The processors flow back the completed checks (per span) they have done. A goroutine runs at the coordinator that will periodically (every 10 seconds) update the job data with the progress update. Informs #148297 Release note: none Epic: CRDB-30356 154255: sql/import: pass deep copy of BulkOpSummary to progress r=ZhouXing19 a=ZhouXing19 Fixes: #153480 Prior to this change, we passed the `BulkOpSummary` map directly from `accumulatedBulkSummary` to `prog.Summary` in #152745. This caused a `concurrent map iteration and map write` panic because the map was being updated via `metaFn` while simultaneously being marshaled into a protobuf during `jobs.Update` calls from `FractionProgressed`. This change creates a deep copy of the map when assigning from `accumulatedBulkSummary` to `prog.Summary`, eliminating the concurrency issue by ensuring each goroutine operates on separate map instances. Release Notes: None Co-authored-by: Matt Spilchen <[email protected]> Co-authored-by: ZhouXing19 <[email protected]>
3 parents 5215082 + ae5bba6 + 82f2321 commit 299240d

File tree

12 files changed

+569
-34
lines changed

12 files changed

+569
-34
lines changed

pkg/jobs/jobspb/jobs.proto

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1538,7 +1538,13 @@ message HotRangesLoggerProgress {
15381538
}
15391539

15401540
message InspectProgress {
1541+
int64 job_total_check_count = 1;
1542+
int64 job_completed_check_count = 2;
1543+
}
15411544

1545+
message InspectProcessorProgress {
1546+
int64 checks_completed = 1; // Number of checks completed since last update.
1547+
bool finished = 2; // Processor finished all checks.
15421548
}
15431549

15441550
message ImportRollbackProgress {}

pkg/kv/kvpb/api.go

Lines changed: 15 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -2133,6 +2133,21 @@ func (b *BulkOpSummary) Add(other BulkOpSummary) {
21332133
}
21342134
}
21352135

2136+
// DeepCopy returns a deep copy of the original BulkOpSummary.
2137+
func (b *BulkOpSummary) DeepCopy() BulkOpSummary {
2138+
cpy := BulkOpSummary{
2139+
DataSize: b.DataSize,
2140+
SSTDataSize: b.SSTDataSize,
2141+
}
2142+
if b.EntryCounts != nil {
2143+
cpy.EntryCounts = make(map[uint64]int64, len(b.EntryCounts))
2144+
for k, v := range b.EntryCounts {
2145+
cpy.EntryCounts[k] = v
2146+
}
2147+
}
2148+
return cpy
2149+
}
2150+
21362151
// MustSetValue is like SetValue, except it resets the enum and panics if the
21372152
// provided value is not a valid variant type.
21382153
func (e *RangeFeedEvent) MustSetValue(value interface{}) {

pkg/kv/kvpb/api.proto

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1935,6 +1935,7 @@ message BulkOpSummary {
19351935
// generation logic is also available in the BulkOpSummaryID helper. It does
19361936
// not take MVCC range tombstones into account.
19371937
map<uint64, int64> entry_counts = 5;
1938+
// Please update BulkOpSummary.DeepCopy() if new fields are added.
19381939
}
19391940

19401941
// ExportResponse is the response to an Export() operation.

pkg/sql/importer/import_processor_planning.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -176,7 +176,7 @@ func distImport(
176176
}
177177

178178
accumulatedBulkSummary.Lock()
179-
prog.Summary = accumulatedBulkSummary.BulkOpSummary
179+
prog.Summary = accumulatedBulkSummary.BulkOpSummary.DeepCopy()
180180
accumulatedBulkSummary.Unlock()
181181
return overall / float32(len(from))
182182
},

pkg/sql/inspect/BUILD.bazel

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -9,6 +9,7 @@ go_library(
99
"inspect_processor.go",
1010
"issue.go",
1111
"log_sink.go",
12+
"progress.go",
1213
"runner.go",
1314
"span_source.go",
1415
"table_sink.go",
@@ -46,10 +47,13 @@ go_library(
4647
"//pkg/util/ctxgroup",
4748
"//pkg/util/hlc",
4849
"//pkg/util/log",
50+
"//pkg/util/stop",
51+
"//pkg/util/syncutil",
4952
"//pkg/util/timeutil",
5053
"//pkg/util/tracing",
5154
"@com_github_cockroachdb_errors//:errors",
5255
"@com_github_cockroachdb_redact//:redact",
56+
"@com_github_gogo_protobuf//types",
5357
],
5458
)
5559

@@ -61,6 +65,7 @@ go_test(
6165
"inspect_processor_test.go",
6266
"issue_test.go",
6367
"main_test.go",
68+
"progress_test.go",
6469
"runner_test.go",
6570
"table_sink_test.go",
6671
],
@@ -71,13 +76,15 @@ go_test(
7176
}),
7277
deps = [
7378
"//pkg/base",
79+
"//pkg/jobs",
7480
"//pkg/jobs/jobspb",
7581
"//pkg/keys",
7682
"//pkg/kv",
7783
"//pkg/kv/kvserver/protectedts",
7884
"//pkg/roachpb",
7985
"//pkg/security/securityassets",
8086
"//pkg/security/securitytest",
87+
"//pkg/security/username",
8188
"//pkg/server",
8289
"//pkg/settings/cluster",
8390
"//pkg/sql",
@@ -99,8 +106,10 @@ go_test(
99106
"//pkg/util/leaktest",
100107
"//pkg/util/log",
101108
"//pkg/util/syncutil",
109+
"//pkg/util/uuid",
102110
"@com_github_cockroachdb_errors//:errors",
103111
"@com_github_cockroachdb_redact//:redact",
112+
"@com_github_gogo_protobuf//types",
104113
"@com_github_stretchr_testify//require",
105114
],
106115
)

pkg/sql/inspect/index_consistency_check_test.go

Lines changed: 7 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -518,10 +518,12 @@ func TestDetectIndexConsistencyErrors(t *testing.T) {
518518

519519
// Validate job status matches expected outcome
520520
var jobStatus string
521-
r.QueryRow(t, `SELECT status FROM [SHOW JOBS] WHERE job_type = 'INSPECT' ORDER BY job_id DESC LIMIT 1`).Scan(&jobStatus)
521+
var fractionCompleted float64
522+
r.QueryRow(t, `SELECT status, fraction_completed FROM [SHOW JOBS] WHERE job_type = 'INSPECT' ORDER BY job_id DESC LIMIT 1`).Scan(&jobStatus, &fractionCompleted)
522523

523524
if tc.expectedErrRegex == "" {
524525
require.Equal(t, "succeeded", jobStatus, "expected job to succeed when no issues found")
526+
require.InEpsilon(t, 1.0, fractionCompleted, 0.01, "progress should reach 100%% on successful completion")
525527
} else {
526528
require.Equal(t, "failed", jobStatus, "expected job to fail when inconsistencies found")
527529
}
@@ -575,8 +577,10 @@ func TestIndexConsistencyWithReservedWordColumns(t *testing.T) {
575577
require.NoError(t, err, "should succeed on table with reserved word column names")
576578
require.Equal(t, 0, issueLogger.numIssuesFound(), "No issues should be found in happy path test")
577579

578-
// Verify job succeeded
580+
// Verify job succeeded and progress reached 100%
579581
var jobStatus string
580-
r.QueryRow(t, `SELECT status FROM [SHOW JOBS] WHERE job_type = 'INSPECT' ORDER BY job_id DESC LIMIT 1`).Scan(&jobStatus)
582+
var fractionCompleted float64
583+
r.QueryRow(t, `SELECT status, fraction_completed FROM [SHOW JOBS] WHERE job_type = 'INSPECT' ORDER BY job_id DESC LIMIT 1`).Scan(&jobStatus, &fractionCompleted)
581584
require.Equal(t, "succeeded", jobStatus, "INSPECT job should succeed")
585+
require.InEpsilon(t, 1.0, fractionCompleted, 0.01, "progress should reach 100%% on successful completion")
582586
}

pkg/sql/inspect/inspect_job.go

Lines changed: 75 additions & 19 deletions
Original file line numberDiff line numberDiff line change
@@ -18,7 +18,6 @@ import (
1818
"github.com/cockroachdb/cockroach/pkg/sql/catalog/descpb"
1919
"github.com/cockroachdb/cockroach/pkg/sql/catalog/descs"
2020
"github.com/cockroachdb/cockroach/pkg/sql/execinfrapb"
21-
"github.com/cockroachdb/cockroach/pkg/sql/isql"
2221
"github.com/cockroachdb/cockroach/pkg/sql/physicalplan"
2322
"github.com/cockroachdb/cockroach/pkg/sql/sem/tree"
2423
"github.com/cockroachdb/cockroach/pkg/sql/types"
@@ -67,13 +66,19 @@ func (c *inspectResumer) Resume(ctx context.Context, execCtx interface{}) error
6766
return err
6867
}
6968

70-
if err := c.runInspectPlan(ctx, jobExecCtx, planCtx, plan); err != nil {
69+
progressTracker, cleanupProgress, err := c.setupProgressTracking(ctx, execCfg, pkSpans)
70+
if err != nil {
71+
return err
72+
}
73+
defer cleanupProgress()
74+
75+
if err := c.runInspectPlan(ctx, jobExecCtx, planCtx, plan, progressTracker); err != nil {
7176
return err
7277
}
7378

7479
c.maybeCleanupProtectedTimestamp(ctx, execCfg)
7580

76-
return c.markJobComplete(ctx)
81+
return nil
7782
}
7883

7984
// OnFailOrCancel implements the Resumer interface
@@ -181,19 +186,26 @@ func (c *inspectResumer) planInspectProcessors(
181186
}
182187

183188
// runInspectPlan executes the distributed physical plan for the INSPECT job.
184-
// It sets up a metadata-only DistSQL receiver to collect any execution errors,
185-
// then runs the plan using the provided planning context and evaluation context.
186-
// This function returns any error surfaced via metadata from the processors.
189+
// It sets up a metadata-only DistSQL receiver to collect any execution errors
190+
// and progress updates, then runs the plan using the provided planning context
191+
// and evaluation context. This function returns any error surfaced via metadata
192+
// from the processors.
187193
func (c *inspectResumer) runInspectPlan(
188194
ctx context.Context,
189195
jobExecCtx sql.JobExecContext,
190196
planCtx *sql.PlanningCtx,
191197
plan *sql.PhysicalPlan,
198+
progressTracker *inspectProgressTracker,
192199
) error {
193200
execCfg := jobExecCtx.ExecCfg()
194201

195202
metadataCallbackWriter := sql.NewMetadataOnlyMetadataCallbackWriter(
196-
func(context.Context, *execinfrapb.ProducerMetadata) error { return nil })
203+
func(ctx context.Context, meta *execinfrapb.ProducerMetadata) error {
204+
if meta.BulkProcessorProgress != nil {
205+
return progressTracker.handleProcessorProgress(ctx, meta)
206+
}
207+
return nil
208+
})
197209

198210
distSQLReceiver := sql.MakeDistSQLReceiver(
199211
ctx,
@@ -216,18 +228,62 @@ func (c *inspectResumer) runInspectPlan(
216228
return metadataCallbackWriter.Err()
217229
}
218230

219-
func (c *inspectResumer) markJobComplete(ctx context.Context) error {
220-
// TODO(148297): add fine-grained progress reporting
221-
return c.job.NoTxn().Update(ctx,
222-
func(_ isql.Txn, md jobs.JobMetadata, ju *jobs.JobUpdater) error {
223-
progress := md.Progress
224-
progress.Progress = &jobspb.Progress_FractionCompleted{
225-
FractionCompleted: 1,
226-
}
227-
ju.UpdateProgress(progress)
228-
return nil
229-
},
230-
)
231+
// setupProgressTracking initializes and starts progress tracking for the INSPECT job.
232+
// It calculates the total number of applicable checks, initializes progress to 0%, and starts
233+
// the background update goroutine. Returns the progress tracker and cleanup function.
234+
func (c *inspectResumer) setupProgressTracking(
235+
ctx context.Context, execCfg *sql.ExecutorConfig, pkSpans []roachpb.Span,
236+
) (*inspectProgressTracker, func(), error) {
237+
fractionInterval := fractionUpdateInterval.Get(&execCfg.Settings.SV)
238+
details := c.job.Details().(jobspb.InspectDetails)
239+
240+
// Build applicability checkers for progress tracking
241+
applicabilityCheckers, err := buildApplicabilityCheckers(details)
242+
if err != nil {
243+
return nil, nil, err
244+
}
245+
246+
// Calculate total applicable checks: only count checks that will actually run
247+
totalCheckCount, err := countApplicableChecks(pkSpans, applicabilityCheckers, execCfg.Codec)
248+
if err != nil {
249+
return nil, nil, err
250+
}
251+
252+
// Create and initialize progress tracker
253+
progressTracker := newInspectProgressTracker(c.job, fractionInterval)
254+
if err := progressTracker.initJobProgress(ctx, totalCheckCount); err != nil {
255+
progressTracker.terminateTracker(ctx)
256+
return nil, nil, err
257+
}
258+
259+
// Start background progress updates
260+
progressTracker.startBackgroundUpdates(ctx)
261+
262+
// Return cleanup function
263+
cleanup := func() {
264+
progressTracker.terminateTracker(ctx)
265+
}
266+
267+
return progressTracker, cleanup, nil
268+
}
269+
270+
// buildApplicabilityCheckers creates lightweight applicability checkers from InspectDetails.
271+
// These are used only for progress calculation and don't require the full check machinery.
272+
func buildApplicabilityCheckers(
273+
details jobspb.InspectDetails,
274+
) ([]inspectCheckApplicability, error) {
275+
checkers := make([]inspectCheckApplicability, 0, len(details.Checks))
276+
for _, specCheck := range details.Checks {
277+
switch specCheck.Type {
278+
case jobspb.InspectCheckIndexConsistency:
279+
checkers = append(checkers, &indexConsistencyCheckApplicability{
280+
tableID: specCheck.TableID,
281+
})
282+
default:
283+
return nil, errors.AssertionFailedf("unsupported inspect check type: %v", specCheck.Type)
284+
}
285+
}
286+
return checkers, nil
231287
}
232288

233289
// maybeProtectTimestamp creates a protected timestamp record for the AsOf

pkg/sql/inspect/inspect_processor.go

Lines changed: 63 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -8,6 +8,7 @@ package inspect
88
import (
99
"context"
1010
"runtime"
11+
"time"
1112

1213
"github.com/cockroachdb/cockroach/pkg/jobs/jobspb"
1314
"github.com/cockroachdb/cockroach/pkg/roachpb"
@@ -24,6 +25,7 @@ import (
2425
"github.com/cockroachdb/cockroach/pkg/util/log"
2526
"github.com/cockroachdb/cockroach/pkg/util/tracing"
2627
"github.com/cockroachdb/errors"
28+
pbtypes "github.com/gogo/protobuf/types"
2729
)
2830

2931
var (
@@ -35,6 +37,14 @@ var (
3537
0,
3638
settings.NonNegativeInt,
3739
)
40+
41+
fractionUpdateInterval = settings.RegisterDurationSetting(
42+
settings.ApplicationLevel,
43+
"sql.inspect.fraction_update_interval",
44+
"the amount of time between INSPECT job percent complete progress updates",
45+
10*time.Second,
46+
settings.DurationWithMinimum(1*time.Millisecond),
47+
)
3848
)
3949

4050
type inspectCheckFactory func(asOf hlc.Timestamp) inspectCheck
@@ -116,7 +126,7 @@ func (p *inspectProcessor) runInspect(ctx context.Context, output execinfra.RowR
116126
// Channel is closed, no more spans to process.
117127
return nil
118128
}
119-
if err := p.processSpan(ctx, span, workerIndex); err != nil {
129+
if err := p.processSpan(ctx, span, workerIndex, output); err != nil {
120130
// On error, return it. ctxgroup will cancel all other goroutines.
121131
return err
122132
}
@@ -150,6 +160,12 @@ func (p *inspectProcessor) runInspect(ctx context.Context, output execinfra.RowR
150160
if err := group.Wait(); err != nil {
151161
return err
152162
}
163+
164+
// Send final completion message to indicate this processor is finished
165+
if err := p.sendInspectProgress(ctx, output, 0, true /* finished */); err != nil {
166+
return err
167+
}
168+
153169
log.Dev.Infof(ctx, "INSPECT processor completed processorID=%d issuesFound=%t", p.processorID, p.logger.hasIssues())
154170
if p.logger.hasIssues() {
155171
return pgerror.Newf(pgcode.DataException, "INSPECT found inconsistencies")
@@ -188,9 +204,10 @@ func getInspectLogger(flowCtx *execinfra.FlowCtx, jobID jobspb.JobID) inspectLog
188204

189205
// processSpan executes all configured inspect checks against a single span.
190206
// It instantiates a fresh set of checks from the configured factories and uses
191-
// an inspectRunner to drive their execution.
207+
// an inspectRunner to drive their execution. After processing the span, it
208+
// sends progress metadata to the coordinator.
192209
func (p *inspectProcessor) processSpan(
193-
ctx context.Context, span roachpb.Span, workerIndex int,
210+
ctx context.Context, span roachpb.Span, workerIndex int, output execinfra.RowReceiver,
194211
) (err error) {
195212
asOfToUse := p.getTimestampForSpan()
196213

@@ -222,6 +239,10 @@ func (p *inspectProcessor) processSpan(
222239
}
223240
}()
224241

242+
// Keep track of completed checks for progress updates.
243+
initialCheckCount := len(checks)
244+
lastSeenCheckCount := initialCheckCount
245+
225246
// Process all checks on the given span.
226247
for {
227248
ok, stepErr := runner.Step(ctx, p.cfg, span, workerIndex)
@@ -231,7 +252,46 @@ func (p *inspectProcessor) processSpan(
231252
if !ok {
232253
break
233254
}
255+
256+
// Check if any inspections have completed (when the count decreases).
257+
currentCheckCount := runner.CheckCount()
258+
if currentCheckCount < lastSeenCheckCount {
259+
checksCompleted := lastSeenCheckCount - currentCheckCount
260+
lastSeenCheckCount = currentCheckCount
261+
if err := p.sendInspectProgress(ctx, output, int64(checksCompleted), false /* finished */); err != nil {
262+
return err
263+
}
264+
}
234265
}
266+
267+
return nil
268+
}
269+
270+
// sendInspectProgress marshals and sends inspect processor progress via BulkProcessorProgress.
271+
func (p *inspectProcessor) sendInspectProgress(
272+
ctx context.Context, output execinfra.RowReceiver, checksCompleted int64, finished bool,
273+
) error {
274+
progressMsg := &jobspb.InspectProcessorProgress{
275+
ChecksCompleted: checksCompleted,
276+
Finished: finished,
277+
}
278+
279+
progressAny, err := pbtypes.MarshalAny(progressMsg)
280+
if err != nil {
281+
return errors.Wrapf(err, "unable to marshal inspect processor progress")
282+
}
283+
284+
meta := &execinfrapb.ProducerMetadata{
285+
BulkProcessorProgress: &execinfrapb.RemoteProducerMetadata_BulkProcessorProgress{
286+
ProgressDetails: *progressAny,
287+
NodeID: p.flowCtx.NodeID.SQLInstanceID(),
288+
FlowID: p.flowCtx.ID,
289+
ProcessorID: p.processorID,
290+
Drained: finished,
291+
},
292+
}
293+
294+
output.Push(nil, meta)
235295
return nil
236296
}
237297

0 commit comments

Comments
 (0)