Skip to content

Commit 87018f9

Browse files
authored
Merge pull request #154701 from spilchen/backport25.4-154276
release-25.4: sql/inspect: add checkpointing tracking to INSPECT
2 parents 1c66a3a + 9cf8de2 commit 87018f9

File tree

5 files changed

+723
-176
lines changed

5 files changed

+723
-176
lines changed

pkg/sql/inspect/BUILD.bazel

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,7 @@ go_library(
1818
visibility = ["//visibility:public"],
1919
deps = [
2020
"//pkg/jobs",
21+
"//pkg/jobs/jobfrontier",
2122
"//pkg/jobs/jobspb",
2223
"//pkg/keys",
2324
"//pkg/kv/kvserver/protectedts/ptpb",
@@ -47,7 +48,8 @@ go_library(
4748
"//pkg/util/ctxgroup",
4849
"//pkg/util/hlc",
4950
"//pkg/util/log",
50-
"//pkg/util/stop",
51+
"//pkg/util/protoutil",
52+
"//pkg/util/span",
5153
"//pkg/util/syncutil",
5254
"//pkg/util/timeutil",
5355
"//pkg/util/tracing",
@@ -77,6 +79,7 @@ go_test(
7779
deps = [
7880
"//pkg/base",
7981
"//pkg/jobs",
82+
"//pkg/jobs/jobfrontier",
8083
"//pkg/jobs/jobspb",
8184
"//pkg/keys",
8285
"//pkg/kv",

pkg/sql/inspect/inspect_job.go

Lines changed: 61 additions & 28 deletions
Original file line numberDiff line numberDiff line change
@@ -60,17 +60,22 @@ func (c *inspectResumer) Resume(ctx context.Context, execCtx interface{}) error
6060
return err
6161
}
6262

63-
// TODO(149460): add a goroutine that will replan the job on topology changes
64-
plan, planCtx, err := c.planInspectProcessors(ctx, jobExecCtx, pkSpans)
63+
progressTracker, cleanupProgress, remainingSpans, err := c.setupProgressTrackingAndFilter(ctx, execCfg, pkSpans)
6564
if err != nil {
6665
return err
6766
}
67+
defer cleanupProgress()
68+
69+
// If all spans are completed, job is done.
70+
if len(remainingSpans) == 0 {
71+
log.Dev.Infof(ctx, "all spans already completed, INSPECT job finished")
72+
return nil
73+
}
6874

69-
progressTracker, cleanupProgress, err := c.setupProgressTracking(ctx, execCfg, pkSpans)
75+
plan, planCtx, err := c.planInspectProcessors(ctx, jobExecCtx, remainingSpans)
7076
if err != nil {
7177
return err
7278
}
73-
defer cleanupProgress()
7479

7580
if err := c.runInspectPlan(ctx, jobExecCtx, planCtx, plan, progressTracker); err != nil {
7681
return err
@@ -202,7 +207,7 @@ func (c *inspectResumer) runInspectPlan(
202207
metadataCallbackWriter := sql.NewMetadataOnlyMetadataCallbackWriter(
203208
func(ctx context.Context, meta *execinfrapb.ProducerMetadata) error {
204209
if meta.BulkProcessorProgress != nil {
205-
return progressTracker.handleProcessorProgress(ctx, meta)
210+
return progressTracker.handleProgressUpdate(ctx, meta)
206211
}
207212
return nil
208213
})
@@ -228,43 +233,71 @@ func (c *inspectResumer) runInspectPlan(
228233
return metadataCallbackWriter.Err()
229234
}
230235

231-
// setupProgressTracking initializes and starts progress tracking for the INSPECT job.
232-
// It calculates the total number of applicable checks, initializes progress to 0%, and starts
233-
// the background update goroutine. Returns the progress tracker and cleanup function.
234-
func (c *inspectResumer) setupProgressTracking(
236+
// setupProgressTrackingAndFilter initializes progress tracking and returns
237+
// it, along with a cleanup function and the remaining spans to process.
238+
func (c *inspectResumer) setupProgressTrackingAndFilter(
235239
ctx context.Context, execCfg *sql.ExecutorConfig, pkSpans []roachpb.Span,
236-
) (*inspectProgressTracker, func(), error) {
237-
fractionInterval := fractionUpdateInterval.Get(&execCfg.Settings.SV)
238-
details := c.job.Details().(jobspb.InspectDetails)
240+
) (*inspectProgressTracker, func(), []roachpb.Span, error) {
241+
// Create and initialize the tracker. We use the completed spans from the job
242+
// (if any) to filter out the spans we need to process in this run of the job.
243+
progressTracker := newInspectProgressTracker(
244+
c.job,
245+
&execCfg.Settings.SV,
246+
execCfg.InternalDB,
247+
)
248+
completedSpans, err := progressTracker.initTracker(ctx)
249+
if err != nil {
250+
return nil, nil, nil, err
251+
}
252+
remainingSpans := c.filterCompletedSpans(pkSpans, completedSpans)
239253

240-
// Build applicability checkers for progress tracking
241-
applicabilityCheckers, err := buildApplicabilityCheckers(details)
254+
applicabilityCheckers, err := buildApplicabilityCheckers(c.job.Details().(jobspb.InspectDetails))
242255
if err != nil {
243-
return nil, nil, err
256+
return nil, nil, nil, err
244257
}
245258

246-
// Calculate total applicable checks: only count checks that will actually run
259+
// Calculate total applicable checks on ALL spans (not just remaining ones)
260+
// This ensures consistent progress calculation across job restarts.
247261
totalCheckCount, err := countApplicableChecks(pkSpans, applicabilityCheckers, execCfg.Codec)
248262
if err != nil {
249-
return nil, nil, err
263+
return nil, nil, nil, err
264+
}
265+
completedCheckCount, err := countApplicableChecks(completedSpans, applicabilityCheckers, execCfg.Codec)
266+
if err != nil {
267+
return nil, nil, nil, err
250268
}
251269

252-
// Create and initialize progress tracker
253-
progressTracker := newInspectProgressTracker(c.job, fractionInterval)
254-
if err := progressTracker.initJobProgress(ctx, totalCheckCount); err != nil {
255-
progressTracker.terminateTracker(ctx)
256-
return nil, nil, err
270+
if err := progressTracker.initJobProgress(ctx, totalCheckCount, completedCheckCount); err != nil {
271+
return nil, nil, nil, err
272+
}
273+
cleanup := func() {
274+
progressTracker.terminateTracker()
257275
}
258276

259-
// Start background progress updates
260-
progressTracker.startBackgroundUpdates(ctx)
277+
return progressTracker, cleanup, remainingSpans, nil
278+
}
261279

262-
// Return cleanup function
263-
cleanup := func() {
264-
progressTracker.terminateTracker(ctx)
280+
// filterCompletedSpans removes spans that are already completed from the list to process.
281+
func (c *inspectResumer) filterCompletedSpans(
282+
allSpans []roachpb.Span, completedSpans []roachpb.Span,
283+
) []roachpb.Span {
284+
if len(completedSpans) == 0 {
285+
return allSpans
286+
}
287+
288+
completedGroup := roachpb.SpanGroup{}
289+
completedGroup.Add(completedSpans...)
290+
291+
var remainingSpans []roachpb.Span
292+
for _, span := range allSpans {
293+
// Check if this span is fully contained in completed spans.
294+
// We need to check if the entire span is covered by completed spans.
295+
if !completedGroup.Encloses(span) {
296+
remainingSpans = append(remainingSpans, span)
297+
}
265298
}
266299

267-
return progressTracker, cleanup, nil
300+
return remainingSpans
268301
}
269302

270303
// buildApplicabilityCheckers creates lightweight applicability checkers from InspectDetails.

pkg/sql/inspect/inspect_processor.go

Lines changed: 44 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -45,6 +45,14 @@ var (
4545
10*time.Second,
4646
settings.DurationWithMinimum(1*time.Millisecond),
4747
)
48+
49+
checkpointInterval = settings.RegisterDurationSetting(
50+
settings.ApplicationLevel,
51+
"sql.inspect.checkpoint_interval",
52+
"the amount of time between INSPECT job checkpoint updates",
53+
30*time.Second,
54+
settings.DurationWithMinimum(1*time.Millisecond),
55+
)
4856
)
4957

5058
type inspectCheckFactory func(asOf hlc.Timestamp) inspectCheck
@@ -211,7 +219,7 @@ func (p *inspectProcessor) processSpan(
211219
) (err error) {
212220
asOfToUse := p.getTimestampForSpan()
213221

214-
// Only create checks that apply to this span
222+
// Only create checks that apply to this span.
215223
var checks []inspectCheck
216224
for _, factory := range p.checkFactories {
217225
check := factory(asOfToUse)
@@ -224,11 +232,6 @@ func (p *inspectProcessor) processSpan(
224232
}
225233
}
226234

227-
// If no checks apply to this span, there's nothing to do
228-
if len(checks) == 0 {
229-
return nil
230-
}
231-
232235
runner := inspectRunner{
233236
checks: checks,
234237
logger: p.logger,
@@ -264,6 +267,11 @@ func (p *inspectProcessor) processSpan(
264267
}
265268
}
266269

270+
// Report span completion for checkpointing.
271+
if err := p.sendSpanCompletionProgress(ctx, output, span, false /* finished */); err != nil {
272+
return err
273+
}
274+
267275
return nil
268276
}
269277

@@ -305,6 +313,36 @@ func (p *inspectProcessor) getTimestampForSpan() hlc.Timestamp {
305313
return p.spec.InspectDetails.AsOf
306314
}
307315

316+
// sendSpanCompletionProgress sends progress indicating a span has been completed.
317+
// This is used for checkpointing to track which spans are done.
318+
func (p *inspectProcessor) sendSpanCompletionProgress(
319+
ctx context.Context, output execinfra.RowReceiver, completedSpan roachpb.Span, finished bool,
320+
) error {
321+
progressMsg := &jobspb.InspectProcessorProgress{
322+
ChecksCompleted: 0, // No additional checks completed, just marking span done
323+
Finished: finished,
324+
}
325+
326+
progressAny, err := pbtypes.MarshalAny(progressMsg)
327+
if err != nil {
328+
return errors.Wrapf(err, "unable to marshal inspect processor progress")
329+
}
330+
331+
meta := &execinfrapb.ProducerMetadata{
332+
BulkProcessorProgress: &execinfrapb.RemoteProducerMetadata_BulkProcessorProgress{
333+
CompletedSpans: []roachpb.Span{completedSpan}, // Mark this span as completed
334+
ProgressDetails: *progressAny,
335+
NodeID: p.flowCtx.NodeID.SQLInstanceID(),
336+
FlowID: p.flowCtx.ID,
337+
ProcessorID: p.processorID,
338+
Drained: finished,
339+
},
340+
}
341+
342+
output.Push(nil, meta)
343+
return nil
344+
}
345+
308346
// newInspectProcessor constructs a new inspectProcessor from the given InspectSpec.
309347
// It parses the spec to generate a set of inspectCheck factories, sets up the span source,
310348
// and wires in logging and concurrency controls.

0 commit comments

Comments
 (0)