@@ -1294,8 +1294,12 @@ func TestRestoreCheckpointing(t *testing.T) {
1294
1294
defer jobs .TestingSetProgressThresholds ()()
1295
1295
1296
1296
// totalEntries represents the number of entries to appear in the persisted frontier.
1297
- totalEntries := 7
1298
- entriesBeforePause := 4
1297
+ const totalEntries = 7
1298
+ const entriesBeforePause = 4
1299
+ processedSpans := struct {
1300
+ syncutil.Mutex
1301
+ spans roachpb.Spans
1302
+ }{}
1299
1303
entriesCount := 0
1300
1304
var alreadyPaused atomic.Bool
1301
1305
postResumeCount := 0
@@ -1306,7 +1310,7 @@ func TestRestoreCheckpointing(t *testing.T) {
1306
1310
knobs := base.TestingKnobs {
1307
1311
DistSQL : & execinfra.TestingKnobs {
1308
1312
BackupRestoreTestingKnobs : & sql.BackupRestoreTestingKnobs {
1309
- RunAfterProcessingRestoreSpanEntry : func (_ context.Context , _ * execinfrapb.RestoreSpanEntry ) error {
1313
+ RunAfterProcessingRestoreSpanEntry : func (_ context.Context , entry * execinfrapb.RestoreSpanEntry ) error {
1310
1314
// Because the restore processor has several workers that
1311
1315
// concurrently send addsstable requests and because all workers will
1312
1316
// wait on the lock below, when one flush gets blocked on the
@@ -1318,12 +1322,20 @@ func TestRestoreCheckpointing(t *testing.T) {
1318
1322
// checking if the job was paused in each request before it began
1319
1323
// waiting for the lock.
1320
1324
wasPausedBeforeWaiting := alreadyPaused .Load ()
1325
+
1321
1326
mu .Lock ()
1322
1327
defer mu .Unlock ()
1323
1328
if entriesCount == entriesBeforePause {
1324
1329
close (waitForProgress )
1325
1330
<- blockDBRestore
1331
+ } else if entriesCount < entriesBeforePause {
1332
+ // We save all spans from before the pause to ensure that they have
1333
+ // been checkpointed and saved in the job progress.
1334
+ processedSpans .Lock ()
1335
+ defer processedSpans .Unlock ()
1336
+ processedSpans .spans = append (processedSpans .spans , entry .Span )
1326
1337
}
1338
+
1327
1339
entriesCount ++
1328
1340
if wasPausedBeforeWaiting {
1329
1341
postResumeCount ++
@@ -1366,8 +1378,25 @@ func TestRestoreCheckpointing(t *testing.T) {
1366
1378
// Pause the job after some progress has been logged.
1367
1379
<- waitForProgress
1368
1380
1369
- // To ensure that progress gets persisted, sleep well beyond the test only job update interval.
1370
- time .Sleep (time .Second )
1381
+ // To ensure that progress has been persisted, we wait until all processed
1382
+ // spans from before the pause are stored in the job progress.
1383
+ testutils .SucceedsSoon (t , func () error {
1384
+ jobProgress := jobutils .GetJobProgress (t , sqlDB , jobID )
1385
+ checkpointedSpans := jobProgress .GetRestore ().Checkpoint
1386
+ checkpointedSpanGroup := roachpb.SpanGroup {}
1387
+ for _ , span := range checkpointedSpans {
1388
+ checkpointedSpanGroup .Add (span .Span )
1389
+ }
1390
+
1391
+ processedSpans .Lock ()
1392
+ defer processedSpans .Unlock ()
1393
+ for _ , span := range processedSpans .spans {
1394
+ if ! checkpointedSpanGroup .Encloses (span ) {
1395
+ return errors .Newf ("span %s was processed but not saved in job progress yet" )
1396
+ }
1397
+ }
1398
+ return nil
1399
+ })
1371
1400
1372
1401
sqlDB .Exec (t , `PAUSE JOB $1` , & jobID )
1373
1402
jobutils .WaitForJobToPause (t , sqlDB , jobID )
0 commit comments