Skip to content

Commit 8b39b69

Browse files
committed
sql/inspect: add checkpointing to INSPECT jobs
Previously, INSPECT jobs did not support checkpointing, meaning job restarts would reprocess all data from the beginning. This change adds span based checkpointing to allow jobs to resume from their last completed spans. Progress is tracked using jobfrontier to store completed spans. On job restart, the resumer loads completed spans and filters them from the work to be done. Closes #148297 Release note: None Epic: CRDB-30356
1 parent 2a1771e commit 8b39b69

File tree

5 files changed

+723
-175
lines changed

5 files changed

+723
-175
lines changed

pkg/sql/inspect/BUILD.bazel

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,7 @@ go_library(
1818
visibility = ["//visibility:public"],
1919
deps = [
2020
"//pkg/jobs",
21+
"//pkg/jobs/jobfrontier",
2122
"//pkg/jobs/jobspb",
2223
"//pkg/keys",
2324
"//pkg/kv/kvserver/protectedts/ptpb",
@@ -47,7 +48,8 @@ go_library(
4748
"//pkg/util/ctxgroup",
4849
"//pkg/util/hlc",
4950
"//pkg/util/log",
50-
"//pkg/util/stop",
51+
"//pkg/util/protoutil",
52+
"//pkg/util/span",
5153
"//pkg/util/syncutil",
5254
"//pkg/util/timeutil",
5355
"//pkg/util/tracing",
@@ -77,6 +79,7 @@ go_test(
7779
deps = [
7880
"//pkg/base",
7981
"//pkg/jobs",
82+
"//pkg/jobs/jobfrontier",
8083
"//pkg/jobs/jobspb",
8184
"//pkg/keys",
8285
"//pkg/kv",

pkg/sql/inspect/inspect_job.go

Lines changed: 61 additions & 27 deletions
Original file line numberDiff line numberDiff line change
@@ -60,16 +60,22 @@ func (c *inspectResumer) Resume(ctx context.Context, execCtx interface{}) error
6060
return err
6161
}
6262

63-
plan, planCtx, err := c.planInspectProcessors(ctx, jobExecCtx, pkSpans)
63+
progressTracker, cleanupProgress, remainingSpans, err := c.setupProgressTrackingAndFilter(ctx, execCfg, pkSpans)
6464
if err != nil {
6565
return err
6666
}
67+
defer cleanupProgress()
68+
69+
// If all spans are completed, job is done.
70+
if len(remainingSpans) == 0 {
71+
log.Dev.Infof(ctx, "all spans already completed, INSPECT job finished")
72+
return nil
73+
}
6774

68-
progressTracker, cleanupProgress, err := c.setupProgressTracking(ctx, execCfg, pkSpans)
75+
plan, planCtx, err := c.planInspectProcessors(ctx, jobExecCtx, remainingSpans)
6976
if err != nil {
7077
return err
7178
}
72-
defer cleanupProgress()
7379

7480
if err := c.runInspectPlan(ctx, jobExecCtx, planCtx, plan, progressTracker); err != nil {
7581
return err
@@ -201,7 +207,7 @@ func (c *inspectResumer) runInspectPlan(
201207
metadataCallbackWriter := sql.NewMetadataOnlyMetadataCallbackWriter(
202208
func(ctx context.Context, meta *execinfrapb.ProducerMetadata) error {
203209
if meta.BulkProcessorProgress != nil {
204-
return progressTracker.handleProcessorProgress(ctx, meta)
210+
return progressTracker.handleProgressUpdate(ctx, meta)
205211
}
206212
return nil
207213
})
@@ -227,43 +233,71 @@ func (c *inspectResumer) runInspectPlan(
227233
return metadataCallbackWriter.Err()
228234
}
229235

230-
// setupProgressTracking initializes and starts progress tracking for the INSPECT job.
231-
// It calculates the total number of applicable checks, initializes progress to 0%, and starts
232-
// the background update goroutine. Returns the progress tracker and cleanup function.
233-
func (c *inspectResumer) setupProgressTracking(
236+
// setupProgressTrackingAndFilter initializes progress tracking and returns
237+
// it, along with a cleanup function and the remaining spans to process.
238+
func (c *inspectResumer) setupProgressTrackingAndFilter(
234239
ctx context.Context, execCfg *sql.ExecutorConfig, pkSpans []roachpb.Span,
235-
) (*inspectProgressTracker, func(), error) {
236-
fractionInterval := fractionUpdateInterval.Get(&execCfg.Settings.SV)
237-
details := c.job.Details().(jobspb.InspectDetails)
240+
) (*inspectProgressTracker, func(), []roachpb.Span, error) {
241+
// Create and initialize the tracker. We use the completed spans from the job
242+
// (if any) to filter out the spans we need to process in this run of the job.
243+
progressTracker := newInspectProgressTracker(
244+
c.job,
245+
&execCfg.Settings.SV,
246+
execCfg.InternalDB,
247+
)
248+
completedSpans, err := progressTracker.initTracker(ctx)
249+
if err != nil {
250+
return nil, nil, nil, err
251+
}
252+
remainingSpans := c.filterCompletedSpans(pkSpans, completedSpans)
238253

239-
// Build applicability checkers for progress tracking
240-
applicabilityCheckers, err := buildApplicabilityCheckers(details)
254+
applicabilityCheckers, err := buildApplicabilityCheckers(c.job.Details().(jobspb.InspectDetails))
241255
if err != nil {
242-
return nil, nil, err
256+
return nil, nil, nil, err
243257
}
244258

245-
// Calculate total applicable checks: only count checks that will actually run
259+
// Calculate total applicable checks on ALL spans (not just remaining ones)
260+
// This ensures consistent progress calculation across job restarts.
246261
totalCheckCount, err := countApplicableChecks(pkSpans, applicabilityCheckers, execCfg.Codec)
247262
if err != nil {
248-
return nil, nil, err
263+
return nil, nil, nil, err
264+
}
265+
completedCheckCount, err := countApplicableChecks(completedSpans, applicabilityCheckers, execCfg.Codec)
266+
if err != nil {
267+
return nil, nil, nil, err
249268
}
250269

251-
// Create and initialize progress tracker
252-
progressTracker := newInspectProgressTracker(c.job, fractionInterval)
253-
if err := progressTracker.initJobProgress(ctx, totalCheckCount); err != nil {
254-
progressTracker.terminateTracker(ctx)
255-
return nil, nil, err
270+
if err := progressTracker.initJobProgress(ctx, totalCheckCount, completedCheckCount); err != nil {
271+
return nil, nil, nil, err
272+
}
273+
cleanup := func() {
274+
progressTracker.terminateTracker()
256275
}
257276

258-
// Start background progress updates
259-
progressTracker.startBackgroundUpdates(ctx)
277+
return progressTracker, cleanup, remainingSpans, nil
278+
}
260279

261-
// Return cleanup function
262-
cleanup := func() {
263-
progressTracker.terminateTracker(ctx)
280+
// filterCompletedSpans removes spans that are already completed from the list to process.
281+
func (c *inspectResumer) filterCompletedSpans(
282+
allSpans []roachpb.Span, completedSpans []roachpb.Span,
283+
) []roachpb.Span {
284+
if len(completedSpans) == 0 {
285+
return allSpans
286+
}
287+
288+
completedGroup := roachpb.SpanGroup{}
289+
completedGroup.Add(completedSpans...)
290+
291+
var remainingSpans []roachpb.Span
292+
for _, span := range allSpans {
293+
// Check if this span is fully contained in completed spans.
294+
// We need to check if the entire span is covered by completed spans.
295+
if !completedGroup.Encloses(span) {
296+
remainingSpans = append(remainingSpans, span)
297+
}
264298
}
265299

266-
return progressTracker, cleanup, nil
300+
return remainingSpans
267301
}
268302

269303
// buildApplicabilityCheckers creates lightweight applicability checkers from InspectDetails.

pkg/sql/inspect/inspect_processor.go

Lines changed: 44 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -45,6 +45,14 @@ var (
4545
10*time.Second,
4646
settings.DurationWithMinimum(1*time.Millisecond),
4747
)
48+
49+
checkpointInterval = settings.RegisterDurationSetting(
50+
settings.ApplicationLevel,
51+
"sql.inspect.checkpoint_interval",
52+
"the amount of time between INSPECT job checkpoint updates",
53+
30*time.Second,
54+
settings.DurationWithMinimum(1*time.Millisecond),
55+
)
4856
)
4957

5058
type inspectCheckFactory func(asOf hlc.Timestamp) inspectCheck
@@ -211,7 +219,7 @@ func (p *inspectProcessor) processSpan(
211219
) (err error) {
212220
asOfToUse := p.getTimestampForSpan()
213221

214-
// Only create checks that apply to this span
222+
// Only create checks that apply to this span.
215223
var checks []inspectCheck
216224
for _, factory := range p.checkFactories {
217225
check := factory(asOfToUse)
@@ -224,11 +232,6 @@ func (p *inspectProcessor) processSpan(
224232
}
225233
}
226234

227-
// If no checks apply to this span, there's nothing to do
228-
if len(checks) == 0 {
229-
return nil
230-
}
231-
232235
runner := inspectRunner{
233236
checks: checks,
234237
logger: p.logger,
@@ -264,6 +267,11 @@ func (p *inspectProcessor) processSpan(
264267
}
265268
}
266269

270+
// Report span completion for checkpointing.
271+
if err := p.sendSpanCompletionProgress(ctx, output, span, false /* finished */); err != nil {
272+
return err
273+
}
274+
267275
return nil
268276
}
269277

@@ -305,6 +313,36 @@ func (p *inspectProcessor) getTimestampForSpan() hlc.Timestamp {
305313
return p.spec.InspectDetails.AsOf
306314
}
307315

316+
// sendSpanCompletionProgress sends progress indicating a span has been completed.
317+
// This is used for checkpointing to track which spans are done.
318+
func (p *inspectProcessor) sendSpanCompletionProgress(
319+
ctx context.Context, output execinfra.RowReceiver, completedSpan roachpb.Span, finished bool,
320+
) error {
321+
progressMsg := &jobspb.InspectProcessorProgress{
322+
ChecksCompleted: 0, // No additional checks completed, just marking span done
323+
Finished: finished,
324+
}
325+
326+
progressAny, err := pbtypes.MarshalAny(progressMsg)
327+
if err != nil {
328+
return errors.Wrapf(err, "unable to marshal inspect processor progress")
329+
}
330+
331+
meta := &execinfrapb.ProducerMetadata{
332+
BulkProcessorProgress: &execinfrapb.RemoteProducerMetadata_BulkProcessorProgress{
333+
CompletedSpans: []roachpb.Span{completedSpan}, // Mark this span as completed
334+
ProgressDetails: *progressAny,
335+
NodeID: p.flowCtx.NodeID.SQLInstanceID(),
336+
FlowID: p.flowCtx.ID,
337+
ProcessorID: p.processorID,
338+
Drained: finished,
339+
},
340+
}
341+
342+
output.Push(nil, meta)
343+
return nil
344+
}
345+
308346
// newInspectProcessor constructs a new inspectProcessor from the given InspectSpec.
309347
// It parses the spec to generate a set of inspectCheck factories, sets up the span source,
310348
// and wires in logging and concurrency controls.

0 commit comments

Comments
 (0)