Skip to content
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
39 changes: 34 additions & 5 deletions pkg/backup/backup_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1301,8 +1301,12 @@ func TestRestoreCheckpointing(t *testing.T) {
defer jobs.TestingSetProgressThresholds()()

// totalEntries represents the number of entries to appear in the persisted frontier.
totalEntries := 7
entriesBeforePause := 4
const totalEntries = 7
const entriesBeforePause = 4
processedSpans := struct {
syncutil.Mutex
spans roachpb.Spans
}{}
entriesCount := 0
var alreadyPaused atomic.Bool
postResumeCount := 0
Expand All @@ -1313,7 +1317,7 @@ func TestRestoreCheckpointing(t *testing.T) {
knobs := base.TestingKnobs{
DistSQL: &execinfra.TestingKnobs{
BackupRestoreTestingKnobs: &sql.BackupRestoreTestingKnobs{
RunAfterProcessingRestoreSpanEntry: func(_ context.Context, _ *execinfrapb.RestoreSpanEntry) error {
RunAfterProcessingRestoreSpanEntry: func(_ context.Context, entry *execinfrapb.RestoreSpanEntry) error {
// Because the restore processor has several workers that
// concurrently send addsstable requests and because all workers will
// wait on the lock below, when one flush gets blocked on the
Expand All @@ -1325,12 +1329,20 @@ func TestRestoreCheckpointing(t *testing.T) {
// checking if the job was paused in each request before it began
// waiting for the lock.
wasPausedBeforeWaiting := alreadyPaused.Load()

mu.Lock()
defer mu.Unlock()
if entriesCount == entriesBeforePause {
close(waitForProgress)
<-blockDBRestore
} else if entriesCount < entriesBeforePause {
// We save all spans from before the pause to ensure that they have
// been checkpointed and saved in the job progress.
processedSpans.Lock()
defer processedSpans.Unlock()
processedSpans.spans = append(processedSpans.spans, entry.Span)
}

entriesCount++
if wasPausedBeforeWaiting {
postResumeCount++
Expand Down Expand Up @@ -1373,8 +1385,25 @@ func TestRestoreCheckpointing(t *testing.T) {
// Pause the job after some progress has been logged.
<-waitForProgress

// To ensure that progress gets persisted, sleep well beyond the test only job update interval.
time.Sleep(time.Second)
// To ensure that progress has been persisted, we wait until all processed
// spans from before the pause are stored in the job progress.
testutils.SucceedsSoon(t, func() error {
jobProgress := jobutils.GetJobProgress(t, sqlDB, jobID)
checkpointedSpans := jobProgress.GetRestore().Checkpoint
checkpointedSpanGroup := roachpb.SpanGroup{}
for _, span := range checkpointedSpans {
checkpointedSpanGroup.Add(span.Span)
}

processedSpans.Lock()
defer processedSpans.Unlock()
for _, span := range processedSpans.spans {
if !checkpointedSpanGroup.Encloses(span) {
return errors.Newf("span %s was processed but not saved in job progress yet")
}
}
return nil
})

sqlDB.Exec(t, `PAUSE JOB $1`, &jobID)
jobutils.WaitForJobToPause(t, sqlDB, jobID)
Expand Down