Skip to content

Commit fd30fd3

Browse files
authored
Merge pull request #154760 from cockroachdb/blathers/backport-release-25.3-154654
release-25.3: restore: deflake TestRestoreCheckpointing
2 parents 9569f6f + 35e69eb commit fd30fd3

File tree

1 file changed

+34
-5
lines changed

1 file changed

+34
-5
lines changed

pkg/backup/backup_test.go

Lines changed: 34 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -1294,8 +1294,12 @@ func TestRestoreCheckpointing(t *testing.T) {
12941294
defer jobs.TestingSetProgressThresholds()()
12951295

12961296
// totalEntries represents the number of entries to appear in the persisted frontier.
1297-
totalEntries := 7
1298-
entriesBeforePause := 4
1297+
const totalEntries = 7
1298+
const entriesBeforePause = 4
1299+
processedSpans := struct {
1300+
syncutil.Mutex
1301+
spans roachpb.Spans
1302+
}{}
12991303
entriesCount := 0
13001304
var alreadyPaused atomic.Bool
13011305
postResumeCount := 0
@@ -1306,7 +1310,7 @@ func TestRestoreCheckpointing(t *testing.T) {
13061310
knobs := base.TestingKnobs{
13071311
DistSQL: &execinfra.TestingKnobs{
13081312
BackupRestoreTestingKnobs: &sql.BackupRestoreTestingKnobs{
1309-
RunAfterProcessingRestoreSpanEntry: func(_ context.Context, _ *execinfrapb.RestoreSpanEntry) error {
1313+
RunAfterProcessingRestoreSpanEntry: func(_ context.Context, entry *execinfrapb.RestoreSpanEntry) error {
13101314
// Because the restore processor has several workers that
13111315
// concurrently send addsstable requests and because all workers will
13121316
// wait on the lock below, when one flush gets blocked on the
@@ -1318,12 +1322,20 @@ func TestRestoreCheckpointing(t *testing.T) {
13181322
// checking if the job was paused in each request before it began
13191323
// waiting for the lock.
13201324
wasPausedBeforeWaiting := alreadyPaused.Load()
1325+
13211326
mu.Lock()
13221327
defer mu.Unlock()
13231328
if entriesCount == entriesBeforePause {
13241329
close(waitForProgress)
13251330
<-blockDBRestore
1331+
} else if entriesCount < entriesBeforePause {
1332+
// We save all spans from before the pause to ensure that they have
1333+
// been checkpointed and saved in the job progress.
1334+
processedSpans.Lock()
1335+
defer processedSpans.Unlock()
1336+
processedSpans.spans = append(processedSpans.spans, entry.Span)
13261337
}
1338+
13271339
entriesCount++
13281340
if wasPausedBeforeWaiting {
13291341
postResumeCount++
@@ -1366,8 +1378,25 @@ func TestRestoreCheckpointing(t *testing.T) {
13661378
// Pause the job after some progress has been logged.
13671379
<-waitForProgress
13681380

1369-
// To ensure that progress gets persisted, sleep well beyond the test only job update interval.
1370-
time.Sleep(time.Second)
1381+
// To ensure that progress has been persisted, we wait until all processed
1382+
// spans from before the pause are stored in the job progress.
1383+
testutils.SucceedsSoon(t, func() error {
1384+
jobProgress := jobutils.GetJobProgress(t, sqlDB, jobID)
1385+
checkpointedSpans := jobProgress.GetRestore().Checkpoint
1386+
checkpointedSpanGroup := roachpb.SpanGroup{}
1387+
for _, span := range checkpointedSpans {
1388+
checkpointedSpanGroup.Add(span.Span)
1389+
}
1390+
1391+
processedSpans.Lock()
1392+
defer processedSpans.Unlock()
1393+
for _, span := range processedSpans.spans {
1394+
if !checkpointedSpanGroup.Encloses(span) {
1395+
return errors.Newf("span %s was processed but not saved in job progress yet")
1396+
}
1397+
}
1398+
return nil
1399+
})
13711400

13721401
sqlDB.Exec(t, `PAUSE JOB $1`, &jobID)
13731402
jobutils.WaitForJobToPause(t, sqlDB, jobID)

0 commit comments

Comments
 (0)