From 35e69ebf369d9a78cd928ed9b0bc6ca576ef93c9 Mon Sep 17 00:00:00 2001 From: Kevin Cao <39608887+kev-cao@users.noreply.github.com> Date: Thu, 2 Oct 2025 18:30:37 +0000 Subject: [PATCH] restore: deflake TestRestoreCheckpointing The current `TestRestoreCheckpointing` waits for job progress to be updated by sleeping for a duration greater than the checkpoint interval. While this works the vast majority of the time, if the test cluster is overloaded, it is not sufficient and we can end up resuming the job before the progress was updated. This results in more spans being processed than expected. This commit updates the test to instead check the contents of the job progress checkpoint and wait until the checkpoint contains all processed spans from before the pause. Fixes: #153848 Release note: None --- pkg/backup/backup_test.go | 39 ++++++++++++++++++++++++++++++++++----- 1 file changed, 34 insertions(+), 5 deletions(-) diff --git a/pkg/backup/backup_test.go b/pkg/backup/backup_test.go index 7e58e01db389..d50eef6a2912 100644 --- a/pkg/backup/backup_test.go +++ b/pkg/backup/backup_test.go @@ -1294,8 +1294,12 @@ func TestRestoreCheckpointing(t *testing.T) { defer jobs.TestingSetProgressThresholds()() // totalEntries represents the number of entries to appear in the persisted frontier. - totalEntries := 7 - entriesBeforePause := 4 + const totalEntries = 7 + const entriesBeforePause = 4 + processedSpans := struct { + syncutil.Mutex + spans roachpb.Spans + }{} entriesCount := 0 var alreadyPaused atomic.Bool postResumeCount := 0 @@ -1306,7 +1310,7 @@ func TestRestoreCheckpointing(t *testing.T) { knobs := base.TestingKnobs{ DistSQL: &execinfra.TestingKnobs{ BackupRestoreTestingKnobs: &sql.BackupRestoreTestingKnobs{ - RunAfterProcessingRestoreSpanEntry: func(_ context.Context, _ *execinfrapb.RestoreSpanEntry) error { + RunAfterProcessingRestoreSpanEntry: func(_ context.Context, entry *execinfrapb.RestoreSpanEntry) error { // Because the restore processor has several workers that // concurrently send addsstable requests and because all workers will // wait on the lock below, when one flush gets blocked on the @@ -1318,12 +1322,20 @@ func TestRestoreCheckpointing(t *testing.T) { // checking if the job was paused in each request before it began // waiting for the lock. wasPausedBeforeWaiting := alreadyPaused.Load() + mu.Lock() defer mu.Unlock() if entriesCount == entriesBeforePause { close(waitForProgress) <-blockDBRestore + } else if entriesCount < entriesBeforePause { + // We save all spans from before the pause to ensure that they have + // been checkpointed and saved in the job progress. + processedSpans.Lock() + defer processedSpans.Unlock() + processedSpans.spans = append(processedSpans.spans, entry.Span) } + entriesCount++ if wasPausedBeforeWaiting { postResumeCount++ @@ -1366,8 +1378,25 @@ func TestRestoreCheckpointing(t *testing.T) { // Pause the job after some progress has been logged. <-waitForProgress - // To ensure that progress gets persisted, sleep well beyond the test only job update interval. - time.Sleep(time.Second) + // To ensure that progress has been persisted, we wait until all processed + // spans from before the pause are stored in the job progress. + testutils.SucceedsSoon(t, func() error { + jobProgress := jobutils.GetJobProgress(t, sqlDB, jobID) + checkpointedSpans := jobProgress.GetRestore().Checkpoint + checkpointedSpanGroup := roachpb.SpanGroup{} + for _, span := range checkpointedSpans { + checkpointedSpanGroup.Add(span.Span) + } + + processedSpans.Lock() + defer processedSpans.Unlock() + for _, span := range processedSpans.spans { + if !checkpointedSpanGroup.Encloses(span) { + return errors.Newf("span %s was processed but not saved in job progress yet") + } + } + return nil + }) sqlDB.Exec(t, `PAUSE JOB $1`, &jobID) jobutils.WaitForJobToPause(t, sqlDB, jobID)