Skip to content

Commit 9cf8de2

Browse files
committed
sql/inspect: add checkpointing to INSPECT jobs
Previously, INSPECT jobs did not support checkpointing, meaning job restarts would reprocess all data from the beginning. This change adds span based checkpointing to allow jobs to resume from their last completed spans. Progress is tracked using jobfrontier to store completed spans. On job restart, the resumer loads completed spans and filters them from the work to be done. Closes #148297 Release note: None Epic: CRDB-30356
1 parent df939a7 commit 9cf8de2

File tree

5 files changed

+723
-176
lines changed

5 files changed

+723
-176
lines changed

pkg/sql/inspect/BUILD.bazel

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,7 @@ go_library(
1818
visibility = ["//visibility:public"],
1919
deps = [
2020
"//pkg/jobs",
21+
"//pkg/jobs/jobfrontier",
2122
"//pkg/jobs/jobspb",
2223
"//pkg/keys",
2324
"//pkg/kv/kvserver/protectedts/ptpb",
@@ -47,7 +48,8 @@ go_library(
4748
"//pkg/util/ctxgroup",
4849
"//pkg/util/hlc",
4950
"//pkg/util/log",
50-
"//pkg/util/stop",
51+
"//pkg/util/protoutil",
52+
"//pkg/util/span",
5153
"//pkg/util/syncutil",
5254
"//pkg/util/timeutil",
5355
"//pkg/util/tracing",
@@ -77,6 +79,7 @@ go_test(
7779
deps = [
7880
"//pkg/base",
7981
"//pkg/jobs",
82+
"//pkg/jobs/jobfrontier",
8083
"//pkg/jobs/jobspb",
8184
"//pkg/keys",
8285
"//pkg/kv",

pkg/sql/inspect/inspect_job.go

Lines changed: 61 additions & 28 deletions
Original file line numberDiff line numberDiff line change
@@ -60,17 +60,22 @@ func (c *inspectResumer) Resume(ctx context.Context, execCtx interface{}) error
6060
return err
6161
}
6262

63-
// TODO(149460): add a goroutine that will replan the job on topology changes
64-
plan, planCtx, err := c.planInspectProcessors(ctx, jobExecCtx, pkSpans)
63+
progressTracker, cleanupProgress, remainingSpans, err := c.setupProgressTrackingAndFilter(ctx, execCfg, pkSpans)
6564
if err != nil {
6665
return err
6766
}
67+
defer cleanupProgress()
68+
69+
// If all spans are completed, job is done.
70+
if len(remainingSpans) == 0 {
71+
log.Dev.Infof(ctx, "all spans already completed, INSPECT job finished")
72+
return nil
73+
}
6874

69-
progressTracker, cleanupProgress, err := c.setupProgressTracking(ctx, execCfg, pkSpans)
75+
plan, planCtx, err := c.planInspectProcessors(ctx, jobExecCtx, remainingSpans)
7076
if err != nil {
7177
return err
7278
}
73-
defer cleanupProgress()
7479

7580
if err := c.runInspectPlan(ctx, jobExecCtx, planCtx, plan, progressTracker); err != nil {
7681
return err
@@ -202,7 +207,7 @@ func (c *inspectResumer) runInspectPlan(
202207
metadataCallbackWriter := sql.NewMetadataOnlyMetadataCallbackWriter(
203208
func(ctx context.Context, meta *execinfrapb.ProducerMetadata) error {
204209
if meta.BulkProcessorProgress != nil {
205-
return progressTracker.handleProcessorProgress(ctx, meta)
210+
return progressTracker.handleProgressUpdate(ctx, meta)
206211
}
207212
return nil
208213
})
@@ -228,43 +233,71 @@ func (c *inspectResumer) runInspectPlan(
228233
return metadataCallbackWriter.Err()
229234
}
230235

231-
// setupProgressTracking initializes and starts progress tracking for the INSPECT job.
232-
// It calculates the total number of applicable checks, initializes progress to 0%, and starts
233-
// the background update goroutine. Returns the progress tracker and cleanup function.
234-
func (c *inspectResumer) setupProgressTracking(
236+
// setupProgressTrackingAndFilter initializes progress tracking and returns
237+
// it, along with a cleanup function and the remaining spans to process.
238+
func (c *inspectResumer) setupProgressTrackingAndFilter(
235239
ctx context.Context, execCfg *sql.ExecutorConfig, pkSpans []roachpb.Span,
236-
) (*inspectProgressTracker, func(), error) {
237-
fractionInterval := fractionUpdateInterval.Get(&execCfg.Settings.SV)
238-
details := c.job.Details().(jobspb.InspectDetails)
240+
) (*inspectProgressTracker, func(), []roachpb.Span, error) {
241+
// Create and initialize the tracker. We use the completed spans from the job
242+
// (if any) to filter out the spans we need to process in this run of the job.
243+
progressTracker := newInspectProgressTracker(
244+
c.job,
245+
&execCfg.Settings.SV,
246+
execCfg.InternalDB,
247+
)
248+
completedSpans, err := progressTracker.initTracker(ctx)
249+
if err != nil {
250+
return nil, nil, nil, err
251+
}
252+
remainingSpans := c.filterCompletedSpans(pkSpans, completedSpans)
239253

240-
// Build applicability checkers for progress tracking
241-
applicabilityCheckers, err := buildApplicabilityCheckers(details)
254+
applicabilityCheckers, err := buildApplicabilityCheckers(c.job.Details().(jobspb.InspectDetails))
242255
if err != nil {
243-
return nil, nil, err
256+
return nil, nil, nil, err
244257
}
245258

246-
// Calculate total applicable checks: only count checks that will actually run
259+
// Calculate total applicable checks on ALL spans (not just remaining ones)
260+
// This ensures consistent progress calculation across job restarts.
247261
totalCheckCount, err := countApplicableChecks(pkSpans, applicabilityCheckers, execCfg.Codec)
248262
if err != nil {
249-
return nil, nil, err
263+
return nil, nil, nil, err
264+
}
265+
completedCheckCount, err := countApplicableChecks(completedSpans, applicabilityCheckers, execCfg.Codec)
266+
if err != nil {
267+
return nil, nil, nil, err
250268
}
251269

252-
// Create and initialize progress tracker
253-
progressTracker := newInspectProgressTracker(c.job, fractionInterval)
254-
if err := progressTracker.initJobProgress(ctx, totalCheckCount); err != nil {
255-
progressTracker.terminateTracker(ctx)
256-
return nil, nil, err
270+
if err := progressTracker.initJobProgress(ctx, totalCheckCount, completedCheckCount); err != nil {
271+
return nil, nil, nil, err
272+
}
273+
cleanup := func() {
274+
progressTracker.terminateTracker()
257275
}
258276

259-
// Start background progress updates
260-
progressTracker.startBackgroundUpdates(ctx)
277+
return progressTracker, cleanup, remainingSpans, nil
278+
}
261279

262-
// Return cleanup function
263-
cleanup := func() {
264-
progressTracker.terminateTracker(ctx)
280+
// filterCompletedSpans removes spans that are already completed from the list to process.
281+
func (c *inspectResumer) filterCompletedSpans(
282+
allSpans []roachpb.Span, completedSpans []roachpb.Span,
283+
) []roachpb.Span {
284+
if len(completedSpans) == 0 {
285+
return allSpans
286+
}
287+
288+
completedGroup := roachpb.SpanGroup{}
289+
completedGroup.Add(completedSpans...)
290+
291+
var remainingSpans []roachpb.Span
292+
for _, span := range allSpans {
293+
// Check if this span is fully contained in completed spans.
294+
// We need to check if the entire span is covered by completed spans.
295+
if !completedGroup.Encloses(span) {
296+
remainingSpans = append(remainingSpans, span)
297+
}
265298
}
266299

267-
return progressTracker, cleanup, nil
300+
return remainingSpans
268301
}
269302

270303
// buildApplicabilityCheckers creates lightweight applicability checkers from InspectDetails.

pkg/sql/inspect/inspect_processor.go

Lines changed: 44 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -45,6 +45,14 @@ var (
4545
10*time.Second,
4646
settings.DurationWithMinimum(1*time.Millisecond),
4747
)
48+
49+
checkpointInterval = settings.RegisterDurationSetting(
50+
settings.ApplicationLevel,
51+
"sql.inspect.checkpoint_interval",
52+
"the amount of time between INSPECT job checkpoint updates",
53+
30*time.Second,
54+
settings.DurationWithMinimum(1*time.Millisecond),
55+
)
4856
)
4957

5058
type inspectCheckFactory func(asOf hlc.Timestamp) inspectCheck
@@ -211,7 +219,7 @@ func (p *inspectProcessor) processSpan(
211219
) (err error) {
212220
asOfToUse := p.getTimestampForSpan()
213221

214-
// Only create checks that apply to this span
222+
// Only create checks that apply to this span.
215223
var checks []inspectCheck
216224
for _, factory := range p.checkFactories {
217225
check := factory(asOfToUse)
@@ -224,11 +232,6 @@ func (p *inspectProcessor) processSpan(
224232
}
225233
}
226234

227-
// If no checks apply to this span, there's nothing to do
228-
if len(checks) == 0 {
229-
return nil
230-
}
231-
232235
runner := inspectRunner{
233236
checks: checks,
234237
logger: p.logger,
@@ -264,6 +267,11 @@ func (p *inspectProcessor) processSpan(
264267
}
265268
}
266269

270+
// Report span completion for checkpointing.
271+
if err := p.sendSpanCompletionProgress(ctx, output, span, false /* finished */); err != nil {
272+
return err
273+
}
274+
267275
return nil
268276
}
269277

@@ -305,6 +313,36 @@ func (p *inspectProcessor) getTimestampForSpan() hlc.Timestamp {
305313
return p.spec.InspectDetails.AsOf
306314
}
307315

316+
// sendSpanCompletionProgress sends progress indicating a span has been completed.
317+
// This is used for checkpointing to track which spans are done.
318+
func (p *inspectProcessor) sendSpanCompletionProgress(
319+
ctx context.Context, output execinfra.RowReceiver, completedSpan roachpb.Span, finished bool,
320+
) error {
321+
progressMsg := &jobspb.InspectProcessorProgress{
322+
ChecksCompleted: 0, // No additional checks completed, just marking span done
323+
Finished: finished,
324+
}
325+
326+
progressAny, err := pbtypes.MarshalAny(progressMsg)
327+
if err != nil {
328+
return errors.Wrapf(err, "unable to marshal inspect processor progress")
329+
}
330+
331+
meta := &execinfrapb.ProducerMetadata{
332+
BulkProcessorProgress: &execinfrapb.RemoteProducerMetadata_BulkProcessorProgress{
333+
CompletedSpans: []roachpb.Span{completedSpan}, // Mark this span as completed
334+
ProgressDetails: *progressAny,
335+
NodeID: p.flowCtx.NodeID.SQLInstanceID(),
336+
FlowID: p.flowCtx.ID,
337+
ProcessorID: p.processorID,
338+
Drained: finished,
339+
},
340+
}
341+
342+
output.Push(nil, meta)
343+
return nil
344+
}
345+
308346
// newInspectProcessor constructs a new inspectProcessor from the given InspectSpec.
309347
// It parses the spec to generate a set of inspectCheck factories, sets up the span source,
310348
// and wires in logging and concurrency controls.

0 commit comments

Comments
 (0)