diff --git a/pkg/backup/backup_test.go b/pkg/backup/backup_test.go index 7e58e01db389..d50eef6a2912 100644 --- a/pkg/backup/backup_test.go +++ b/pkg/backup/backup_test.go @@ -1294,8 +1294,12 @@ func TestRestoreCheckpointing(t *testing.T) { defer jobs.TestingSetProgressThresholds()() // totalEntries represents the number of entries to appear in the persisted frontier. - totalEntries := 7 - entriesBeforePause := 4 + const totalEntries = 7 + const entriesBeforePause = 4 + processedSpans := struct { + syncutil.Mutex + spans roachpb.Spans + }{} entriesCount := 0 var alreadyPaused atomic.Bool postResumeCount := 0 @@ -1306,7 +1310,7 @@ func TestRestoreCheckpointing(t *testing.T) { knobs := base.TestingKnobs{ DistSQL: &execinfra.TestingKnobs{ BackupRestoreTestingKnobs: &sql.BackupRestoreTestingKnobs{ - RunAfterProcessingRestoreSpanEntry: func(_ context.Context, _ *execinfrapb.RestoreSpanEntry) error { + RunAfterProcessingRestoreSpanEntry: func(_ context.Context, entry *execinfrapb.RestoreSpanEntry) error { // Because the restore processor has several workers that // concurrently send addsstable requests and because all workers will // wait on the lock below, when one flush gets blocked on the @@ -1318,12 +1322,20 @@ func TestRestoreCheckpointing(t *testing.T) { // checking if the job was paused in each request before it began // waiting for the lock. wasPausedBeforeWaiting := alreadyPaused.Load() + mu.Lock() defer mu.Unlock() if entriesCount == entriesBeforePause { close(waitForProgress) <-blockDBRestore + } else if entriesCount < entriesBeforePause { + // We save all spans from before the pause to ensure that they have + // been checkpointed and saved in the job progress. + processedSpans.Lock() + defer processedSpans.Unlock() + processedSpans.spans = append(processedSpans.spans, entry.Span) } + entriesCount++ if wasPausedBeforeWaiting { postResumeCount++ @@ -1366,8 +1378,25 @@ func TestRestoreCheckpointing(t *testing.T) { // Pause the job after some progress has been logged. <-waitForProgress - // To ensure that progress gets persisted, sleep well beyond the test only job update interval. - time.Sleep(time.Second) + // To ensure that progress has been persisted, we wait until all processed + // spans from before the pause are stored in the job progress. + testutils.SucceedsSoon(t, func() error { + jobProgress := jobutils.GetJobProgress(t, sqlDB, jobID) + checkpointedSpans := jobProgress.GetRestore().Checkpoint + checkpointedSpanGroup := roachpb.SpanGroup{} + for _, span := range checkpointedSpans { + checkpointedSpanGroup.Add(span.Span) + } + + processedSpans.Lock() + defer processedSpans.Unlock() + for _, span := range processedSpans.spans { + if !checkpointedSpanGroup.Encloses(span) { + return errors.Newf("span %s was processed but not saved in job progress yet") + } + } + return nil + }) sqlDB.Exec(t, `PAUSE JOB $1`, &jobID) jobutils.WaitForJobToPause(t, sqlDB, jobID)