Skip to content

Commit 23dfb70

Browse files
committed
sql/inspect: optimize progress updates to avoid unnecessary writes
Previously, INSPECT job progress flushes occurred unconditionally on periodic intervals, writing to the jobs table and jobfrontier even when no progress had changed. This change adds conditional checks to both flushProgress and flushCheckpointUpdate to skip writes when nothing has changed. For fraction updates, we now compare against the persisted check counts. For checkpoint updates, we track the raw count of received spans (before merging) to detect new completions. Release note: None Informs: #154457 Epic: CRDB-30356
1 parent c5714b6 commit 23dfb70

File tree

2 files changed

+232
-7
lines changed

2 files changed

+232
-7
lines changed

pkg/sql/inspect/progress.go

Lines changed: 50 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -48,6 +48,11 @@ type inspectProgressTracker struct {
4848
cachedProgress *jobspb.Progress
4949
// completedSpans tracks all completed spans with automatic deduplication.
5050
completedSpans roachpb.SpanGroup
51+
// receivedSpanCount tracks the total number of spans received from metadata.
52+
// This is used to determine if we have new spans to checkpoint.
53+
receivedSpanCount int
54+
// lastCheckpointedSpanCount tracks the span count at the last checkpoint write.
55+
lastCheckpointedSpanCount int
5156
}
5257

5358
// Goroutine management.
@@ -208,6 +213,7 @@ func (t *inspectProgressTracker) updateProgressCache(
208213
// automatic deduplication and merging.
209214
if len(meta.BulkProcessorProgress.CompletedSpans) > 0 {
210215
t.mu.completedSpans.Add(meta.BulkProcessorProgress.CompletedSpans...)
216+
t.mu.receivedSpanCount += len(meta.BulkProcessorProgress.CompletedSpans)
211217
}
212218

213219
// Update check count.
@@ -299,13 +305,22 @@ func (t *inspectProgressTracker) flushProgress(ctx context.Context) error {
299305
cachedInspectProgress = t.mu.cachedProgress.GetInspect()
300306
}()
301307

302-
// Calculate fraction complete based on check counts from cached progress.
303-
var fractionComplete float32
304-
if cachedInspectProgress.JobTotalCheckCount > 0 {
305-
fractionComplete = float32(cachedInspectProgress.JobCompletedCheckCount) / float32(cachedInspectProgress.JobTotalCheckCount)
306-
}
307-
308308
return t.job.NoTxn().Update(ctx, func(_ isql.Txn, md jobs.JobMetadata, ju *jobs.JobUpdater) error {
309+
// Only write if any part of the inspect progress has changed.
310+
currentInspectProgress := md.Progress.GetInspect()
311+
if currentInspectProgress != nil &&
312+
currentInspectProgress.JobTotalCheckCount == cachedInspectProgress.JobTotalCheckCount &&
313+
currentInspectProgress.JobCompletedCheckCount == cachedInspectProgress.JobCompletedCheckCount &&
314+
md.Progress.GetProgress() != nil { // Ensure fraction complete has been initialized.
315+
return nil
316+
}
317+
318+
// Calculate fraction complete based on check counts from cached progress.
319+
var fractionComplete float32
320+
if cachedInspectProgress.JobTotalCheckCount > 0 {
321+
fractionComplete = float32(cachedInspectProgress.JobCompletedCheckCount) / float32(cachedInspectProgress.JobTotalCheckCount)
322+
}
323+
309324
newProgress := &jobspb.Progress{
310325
Details: &jobspb.Progress_Inspect{
311326
Inspect: cachedInspectProgress,
@@ -322,19 +337,25 @@ func (t *inspectProgressTracker) flushProgress(ctx context.Context) error {
322337
// flushCheckpointUpdate performs a progress update to include completed spans.
323338
// The completed spans are stored via jobfrontier.
324339
func (t *inspectProgressTracker) flushCheckpointUpdate(ctx context.Context) error {
340+
if !t.hasUncheckpointedSpans() {
341+
return nil
342+
}
343+
325344
var completedSpans []roachpb.Span
345+
var capturedReceivedCount int
326346
func() {
327347
t.mu.Lock()
328348
defer t.mu.Unlock()
329349
completedSpans = t.mu.completedSpans.Slice()
350+
capturedReceivedCount = t.mu.receivedSpanCount
330351
}()
331352

332353
// If no completed spans, nothing to store.
333354
if len(completedSpans) == 0 {
334355
return nil
335356
}
336357

337-
return t.internalDB.Txn(ctx, func(ctx context.Context, txn isql.Txn) error {
358+
err := t.internalDB.Txn(ctx, func(ctx context.Context, txn isql.Txn) error {
338359
// Create a frontier for the spans (using zero timestamps since INSPECT doesn't need timing).
339360
frontier, err := span.MakeFrontier(completedSpans...)
340361
if err != nil {
@@ -343,6 +364,20 @@ func (t *inspectProgressTracker) flushCheckpointUpdate(ctx context.Context) erro
343364
defer frontier.Release()
344365
return jobfrontier.Store(ctx, txn, t.jobID, inspectCompletedSpansKey, frontier)
345366
})
367+
368+
// If the checkpoint write succeeded, update the last checkpointed span count.
369+
// This prevents unnecessary future writes when no new spans have been completed.
370+
if err == nil {
371+
func() {
372+
t.mu.Lock()
373+
defer t.mu.Unlock()
374+
if capturedReceivedCount > t.mu.lastCheckpointedSpanCount {
375+
t.mu.lastCheckpointedSpanCount = capturedReceivedCount
376+
}
377+
}()
378+
}
379+
380+
return err
346381
}
347382

348383
// getCachedCheckCounts returns the current cached check counts (total and completed).
@@ -363,6 +398,14 @@ func (t *inspectProgressTracker) getCachedCheckCounts() (totalChecks int64, comp
363398
return inspectProgress.JobTotalCheckCount, inspectProgress.JobCompletedCheckCount
364399
}
365400

401+
// hasUncheckpointedSpans returns true if there are completed spans that have not been
402+
// checkpointed yet. This is used to determine if a checkpoint flush is needed.
403+
func (t *inspectProgressTracker) hasUncheckpointedSpans() bool {
404+
t.mu.Lock()
405+
defer t.mu.Unlock()
406+
return t.mu.receivedSpanCount > t.mu.lastCheckpointedSpanCount
407+
}
408+
366409
// countApplicableChecks determines how many checks will actually run across all spans.
367410
// This provides accurate progress calculation by only counting checks that apply to each span.
368411
func countApplicableChecks(

pkg/sql/inspect/progress_test.go

Lines changed: 182 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -7,7 +7,9 @@ package inspect
77

88
import (
99
"context"
10+
"math"
1011
"testing"
12+
"time"
1113

1214
"github.com/cockroachdb/cockroach/pkg/base"
1315
"github.com/cockroachdb/cockroach/pkg/jobs"
@@ -17,6 +19,7 @@ import (
1719
"github.com/cockroachdb/cockroach/pkg/security/username"
1820
"github.com/cockroachdb/cockroach/pkg/sql/execinfrapb"
1921
"github.com/cockroachdb/cockroach/pkg/sql/isql"
22+
"github.com/cockroachdb/cockroach/pkg/testutils"
2023
"github.com/cockroachdb/cockroach/pkg/testutils/serverutils"
2124
"github.com/cockroachdb/cockroach/pkg/util/leaktest"
2225
"github.com/cockroachdb/cockroach/pkg/util/log"
@@ -437,3 +440,182 @@ func createProcessorProgressUpdate(
437440

438441
return meta, nil
439442
}
443+
444+
func TestInspectProgressTracker_ProgressFlushConditions(t *testing.T) {
445+
defer leaktest.AfterTest(t)()
446+
defer log.Scope(t).Close(t)
447+
448+
ctx, s, _, _, cleanup := setupProgressTestInfra(t)
449+
defer cleanup()
450+
451+
const totalChecks = 1000
452+
453+
testCases := []struct {
454+
name string
455+
setupFunc func(t *testing.T, tracker *inspectProgressTracker, job *jobs.Job)
456+
expectedFraction float32
457+
expectUncheckpointedSpans bool
458+
}{
459+
{
460+
name: "initial state - no progress updates",
461+
setupFunc: func(t *testing.T, tracker *inspectProgressTracker, job *jobs.Job) {
462+
require.NoError(t, tracker.initJobProgress(ctx, totalChecks, 0))
463+
},
464+
expectedFraction: 0.0,
465+
expectUncheckpointedSpans: false,
466+
},
467+
{
468+
name: "check count updates without spans",
469+
setupFunc: func(t *testing.T, tracker *inspectProgressTracker, job *jobs.Job) {
470+
require.NoError(t, tracker.initJobProgress(ctx, totalChecks, 0))
471+
472+
// Send progress updates with check counts but no spans.
473+
meta, err := createProcessorProgressUpdate(100, false, nil)
474+
require.NoError(t, err)
475+
_, err = tracker.updateProgressCache(meta)
476+
require.NoError(t, err)
477+
},
478+
expectedFraction: 0.1,
479+
expectUncheckpointedSpans: false,
480+
},
481+
{
482+
name: "span updates trigger checkpoint need",
483+
setupFunc: func(t *testing.T, tracker *inspectProgressTracker, job *jobs.Job) {
484+
require.NoError(t, tracker.initJobProgress(ctx, totalChecks, 0))
485+
486+
// Send progress with completed spans.
487+
spans := []roachpb.Span{{Key: roachpb.Key("a"), EndKey: roachpb.Key("m")}}
488+
meta, err := createProcessorProgressUpdate(50, false, spans)
489+
require.NoError(t, err)
490+
_, err = tracker.updateProgressCache(meta)
491+
require.NoError(t, err)
492+
},
493+
expectedFraction: 0.05,
494+
expectUncheckpointedSpans: true,
495+
},
496+
{
497+
name: "automatic flush clears uncheckpointed state",
498+
setupFunc: func(t *testing.T, tracker *inspectProgressTracker, job *jobs.Job) {
499+
require.NoError(t, tracker.initJobProgress(ctx, totalChecks, 0))
500+
501+
// Send progress with completed spans.
502+
spans := []roachpb.Span{{Key: roachpb.Key("a"), EndKey: roachpb.Key("m")}}
503+
meta, err := createProcessorProgressUpdate(100, false, spans)
504+
require.NoError(t, err)
505+
_, err = tracker.updateProgressCache(meta)
506+
require.NoError(t, err)
507+
508+
// Wait for automatic checkpoint flush.
509+
testutils.SucceedsSoon(t, func() error {
510+
if tracker.hasUncheckpointedSpans() {
511+
return errors.New("still has uncheckpointed spans")
512+
}
513+
return nil
514+
})
515+
},
516+
expectedFraction: 0.1,
517+
expectUncheckpointedSpans: false,
518+
},
519+
{
520+
name: "multiple span updates merge and checkpoint",
521+
setupFunc: func(t *testing.T, tracker *inspectProgressTracker, job *jobs.Job) {
522+
require.NoError(t, tracker.initJobProgress(ctx, totalChecks, 0))
523+
524+
// Send multiple progress updates with different spans.
525+
spans1 := []roachpb.Span{{Key: roachpb.Key("a"), EndKey: roachpb.Key("d")}}
526+
meta1, err := createProcessorProgressUpdate(100, false, spans1)
527+
require.NoError(t, err)
528+
_, err = tracker.updateProgressCache(meta1)
529+
require.NoError(t, err)
530+
531+
spans2 := []roachpb.Span{{Key: roachpb.Key("d"), EndKey: roachpb.Key("g")}}
532+
meta2, err := createProcessorProgressUpdate(100, false, spans2)
533+
require.NoError(t, err)
534+
_, err = tracker.updateProgressCache(meta2)
535+
require.NoError(t, err)
536+
537+
// Wait for checkpoint to complete.
538+
testutils.SucceedsSoon(t, func() error {
539+
if tracker.hasUncheckpointedSpans() {
540+
return errors.New("still has uncheckpointed spans")
541+
}
542+
return nil
543+
})
544+
},
545+
expectedFraction: 0.2,
546+
expectUncheckpointedSpans: false,
547+
},
548+
{
549+
name: "immediate flush on drained processor",
550+
setupFunc: func(t *testing.T, tracker *inspectProgressTracker, job *jobs.Job) {
551+
require.NoError(t, tracker.initJobProgress(ctx, totalChecks, 0))
552+
553+
// Send progress with drained=true to trigger immediate flush.
554+
spans := []roachpb.Span{{Key: roachpb.Key("a"), EndKey: roachpb.Key("z")}}
555+
meta, err := createProcessorProgressUpdate(500, true, spans)
556+
require.NoError(t, err)
557+
require.NoError(t, tracker.handleProgressUpdate(ctx, meta))
558+
559+
// Immediate flush should have happened, so no uncheckpointed spans.
560+
require.False(t, tracker.hasUncheckpointedSpans())
561+
},
562+
expectedFraction: 0.5,
563+
expectUncheckpointedSpans: false,
564+
},
565+
}
566+
567+
for _, tc := range testCases {
568+
t.Run(tc.name, func(t *testing.T) {
569+
// Create a fresh job for each test case.
570+
record := jobs.Record{
571+
Details: jobspb.InspectDetails{},
572+
Progress: jobspb.InspectProgress{},
573+
Username: username.TestUserName(),
574+
}
575+
576+
freshJob, err := s.JobRegistry().(*jobs.Registry).CreateJobWithTxn(ctx, record, s.JobRegistry().(*jobs.Registry).MakeJobID(), nil)
577+
require.NoError(t, err)
578+
579+
freshTracker := newInspectProgressTracker(freshJob, &s.ClusterSettings().SV, s.InternalDB().(isql.DB))
580+
defer freshTracker.terminateTracker()
581+
582+
// Override intervals for faster testing.
583+
const fastCheckpointInterval = 10 * time.Millisecond
584+
const fastFractionInterval = 5 * time.Millisecond
585+
freshTracker.checkpointInterval = func() time.Duration { return fastCheckpointInterval }
586+
freshTracker.fractionInterval = func() time.Duration { return fastFractionInterval }
587+
588+
// Run the test setup.
589+
tc.setupFunc(t, freshTracker, freshJob)
590+
591+
// Verify uncheckpointed spans state.
592+
require.Equal(t, tc.expectUncheckpointedSpans, freshTracker.hasUncheckpointedSpans(),
593+
"unexpected uncheckpointed spans state")
594+
595+
// Verify fraction complete.
596+
progress := freshJob.Progress()
597+
fractionCompleted, ok := progress.Progress.(*jobspb.Progress_FractionCompleted)
598+
require.True(t, ok, "progress should be FractionCompleted type")
599+
if tc.expectedFraction == 0.0 {
600+
// For zero expected fraction, check immediately.
601+
require.Equal(t, tc.expectedFraction, fractionCompleted.FractionCompleted)
602+
} else {
603+
// For non-zero expected fraction, wait for async flush to complete.
604+
testutils.SucceedsSoon(t, func() error {
605+
progress = freshJob.Progress()
606+
fractionCompleted, ok = progress.Progress.(*jobspb.Progress_FractionCompleted)
607+
if !ok {
608+
return errors.New("progress should be FractionCompleted type")
609+
}
610+
// Check if fraction is within epsilon (1% tolerance).
611+
const epsilon = 0.01
612+
if math.Abs(float64(fractionCompleted.FractionCompleted-tc.expectedFraction)) > epsilon {
613+
return errors.Newf("fraction complete not within epsilon: expected %f ± %f, got %f",
614+
tc.expectedFraction, epsilon, fractionCompleted.FractionCompleted)
615+
}
616+
return nil
617+
})
618+
}
619+
})
620+
}
621+
}

0 commit comments

Comments
 (0)