Skip to content

Commit 35f2786

Browse files
committed
backup: implement online restore in distsql flow
This implements online restore using the normal restore distsql flow, rather than the bespoke online restore paths used in its initial prototype. This is behind a setting for now, but eventually could be the basis for extending online restore -- now unified with regular restore -- to support ingesting rather than linking some data (such as revision-history layers). Release note: none. Epic: none.
1 parent 9eef0da commit 35f2786

13 files changed

+504
-66
lines changed

pkg/backup/bench_covering_test.go

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -109,7 +109,9 @@ func BenchmarkRestoreEntryCover(b *testing.B) {
109109
nil,
110110
filter,
111111
&inclusiveEndKeyComparator{},
112-
spanCh)
112+
spanCh,
113+
false, /* useLink */
114+
)
113115
})
114116

115117
var cov []execinfrapb.RestoreSpanEntry

pkg/backup/compaction_dist.go

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -94,6 +94,7 @@ func runCompactionPlan(
9494
filter,
9595
fsc,
9696
spanCh,
97+
false, /* useLink */
9798
), "generateAndSendImportSpans")
9899
}
99100
dsp := execCtx.DistSQLPlanner()

pkg/backup/compaction_processor.go

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -213,6 +213,7 @@ func (p *compactBackupsProcessor) runCompactBackups(ctx context.Context) error {
213213
filter,
214214
fsc,
215215
entryCh,
216+
false, /* useLink */
216217
), "generate and send import spans")
217218
}
218219

pkg/backup/generative_split_and_scatter_processor.go

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -461,6 +461,7 @@ func runGenerativeSplitAndScatter(
461461
) error {
462462
log.Dev.Infof(ctx, "Running generative split and scatter with %d total spans, %d chunk size, %d nodes",
463463
spec.NumEntries, spec.ChunkSize, spec.NumNodes)
464+
464465
g := ctxgroup.WithContext(ctx)
465466

466467
chunkSplitAndScatterWorkers := len(chunkSplitAndScatterers)
@@ -512,6 +513,7 @@ func runGenerativeSplitAndScatter(
512513
filter,
513514
fsc,
514515
restoreSpanEntriesCh,
516+
spec.UseLink,
515517
), "generating and sending import spans")
516518
})
517519

pkg/backup/restore_data_processor.go

Lines changed: 150 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -31,6 +31,7 @@ import (
3131
"github.com/cockroachdb/cockroach/pkg/sql/sem/tree"
3232
"github.com/cockroachdb/cockroach/pkg/sql/types"
3333
"github.com/cockroachdb/cockroach/pkg/storage"
34+
"github.com/cockroachdb/cockroach/pkg/storage/enginepb"
3435
bulkutil "github.com/cockroachdb/cockroach/pkg/util/bulk"
3536
"github.com/cockroachdb/cockroach/pkg/util/ctxgroup"
3637
"github.com/cockroachdb/cockroach/pkg/util/hlc"
@@ -420,6 +421,27 @@ func (rd *restoreDataProcessor) runRestoreWorkers(
420421
ctx, sp := tracing.ChildSpan(ctx, "restore.processRestoreSpanEntry")
421422
defer sp.Finish()
422423

424+
// Check if the first file has UseLink set. If so, link all files
425+
// in this entry via LinkExternalSSTable instead of ingesting.
426+
// TODO(dt): support entries where some layers are link and some are not.
427+
if entry.Files[0].UseLink {
428+
for _, file := range entry.Files {
429+
if !file.UseLink {
430+
return done, errors.AssertionFailedf("invalid non-linked file in linking restore")
431+
}
432+
}
433+
summary, err := rd.linkFiles(ctx, kr, entry)
434+
if err != nil {
435+
return done, errors.Wrap(err, "linking files")
436+
}
437+
select {
438+
case rd.progCh <- makeProgressUpdate(summary, entry, rd.spec.PKIDs, rd.spec.RestoreTime):
439+
case <-ctx.Done():
440+
return done, errors.Wrap(ctx.Err(), "sending progress update")
441+
}
442+
return done, nil
443+
}
444+
423445
var res *resumeEntry
424446
for {
425447
sstIter, res, err = rd.openSSTs(ctx, entry, res)
@@ -454,6 +476,134 @@ func (rd *restoreDataProcessor) runRestoreWorkers(
454476
})
455477
}
456478

479+
// linkFiles links all files in the entry via LinkExternalSSTable instead of
480+
// downloading and ingesting them. This is used for online restore when the
481+
// UseLink flag is set on files.
482+
func (rd *restoreDataProcessor) linkFiles(
483+
ctx context.Context, kr *KeyRewriter, entry execinfrapb.RestoreSpanEntry,
484+
) (kvpb.BulkOpSummary, error) {
485+
ctx, sp := tracing.ChildSpan(ctx, "restore.linkFiles")
486+
defer sp.Finish()
487+
488+
var summary kvpb.BulkOpSummary
489+
kvDB := rd.FlowCtx.Cfg.DB.KV()
490+
491+
if err := assertCommonPrefix(entry.Span, entry.ElidedPrefix); err != nil {
492+
return summary, err
493+
}
494+
495+
var currentLayer int32
496+
for i := range entry.Files {
497+
file := &entry.Files[i]
498+
if file.Layer < currentLayer {
499+
return summary, errors.AssertionFailedf("files not sorted by layer")
500+
}
501+
currentLayer = file.Layer
502+
if file.HasRangeKeys {
503+
return summary, errors.Wrapf(permanentRestoreError, "online restore of range keys not supported")
504+
}
505+
if err := assertCommonPrefix(file.BackupFileEntrySpan, entry.ElidedPrefix); err != nil {
506+
return summary, err
507+
}
508+
509+
restoringSubspan := file.BackupFileEntrySpan.Intersect(entry.Span)
510+
if !restoringSubspan.Valid() {
511+
return summary, errors.AssertionFailedf("file %s with span %s has no overlap with restore span %s",
512+
file.Path,
513+
file.BackupFileEntrySpan,
514+
entry.Span,
515+
)
516+
}
517+
518+
var rewriteErr error
519+
restoringSubspan, rewriteErr = rewriteSpan(kr, restoringSubspan.Clone(), entry.ElidedPrefix)
520+
if rewriteErr != nil {
521+
return summary, rewriteErr
522+
}
523+
524+
log.VEventf(ctx, 2, "linking file %s (file span: %s) to span %s", file.Path, file.BackupFileEntrySpan, restoringSubspan)
525+
526+
counts := file.BackupFileEntryCounts
527+
fileSize := file.ApproximatePhysicalSize
528+
if fileSize == 0 {
529+
fileSize = uint64(counts.DataSize)
530+
}
531+
if fileSize == 0 {
532+
fileSize = 16 << 20 // guess
533+
}
534+
535+
// The synthetic prefix is computed from the REWRITTEN span key, not the
536+
// original file span key. This matches the old online restore path which
537+
// overwrites file.BackupFileEntrySpan before computing the prefix.
538+
syntheticPrefix, err := backupsink.ElidedPrefix(restoringSubspan.Key, entry.ElidedPrefix)
539+
if err != nil {
540+
return summary, err
541+
}
542+
543+
fileStats := &enginepb.MVCCStats{
544+
ContainsEstimates: 1,
545+
KeyBytes: counts.DataSize / 2,
546+
ValBytes: counts.DataSize / 2,
547+
LiveBytes: counts.DataSize,
548+
KeyCount: counts.Rows + counts.IndexEntries,
549+
LiveCount: counts.Rows + counts.IndexEntries,
550+
}
551+
552+
var batchTimestamp hlc.Timestamp
553+
if writeAtBatchTS(ctx, restoringSubspan, kr.fromSystemTenant) {
554+
batchTimestamp = kvDB.Clock().Now()
555+
}
556+
557+
loc := kvpb.LinkExternalSSTableRequest_ExternalFile{
558+
Locator: file.Dir.URI,
559+
Path: file.Path,
560+
ApproximatePhysicalSize: fileSize,
561+
BackingFileSize: file.BackingFileSize,
562+
SyntheticPrefix: syntheticPrefix,
563+
UseSyntheticSuffix: batchTimestamp.IsSet(),
564+
MVCCStats: fileStats,
565+
}
566+
567+
if err := kvDB.LinkExternalSSTable(ctx, restoringSubspan, loc, batchTimestamp); err != nil {
568+
return summary, errors.Wrap(err, "linking external SSTable")
569+
}
570+
571+
// Call testing knob after each file link, matching the old online restore path.
572+
if restoreKnobs, ok := rd.FlowCtx.TestingKnobs().BackupRestoreTestingKnobs.(*sql.BackupRestoreTestingKnobs); ok {
573+
if restoreKnobs.AfterAddRemoteSST != nil {
574+
if err := restoreKnobs.AfterAddRemoteSST(); err != nil {
575+
return summary, err
576+
}
577+
}
578+
}
579+
580+
// Use ApproximatePhysicalSize for DataSize to match the coordinator path.
581+
summary.DataSize += int64(fileSize)
582+
583+
// Populate EntryCounts so that countRows() can compute row counts for
584+
// progress tracking. We need to use a key that's in pkIDs for rows to be
585+
// counted correctly. Pick any key from pkIDs (the specific key doesn't
586+
// matter since we're just summing totals).
587+
if counts.Rows > 0 || counts.IndexEntries > 0 {
588+
if summary.EntryCounts == nil {
589+
summary.EntryCounts = make(map[uint64]int64)
590+
}
591+
// Use a pkID key for rows so countRows counts them as rows.
592+
for pkID := range rd.spec.PKIDs {
593+
summary.EntryCounts[pkID] += counts.Rows
594+
break // only need one key
595+
}
596+
// Use key 0 for index entries (won't be in pkIDs, so counted as index entries).
597+
if counts.IndexEntries > 0 {
598+
summary.EntryCounts[0] += counts.IndexEntries
599+
}
600+
}
601+
}
602+
603+
rd.progressMade = true
604+
return summary, nil
605+
}
606+
457607
var backupFileReadError = errors.New("error reading backup file")
458608

459609
func (rd *restoreDataProcessor) processRestoreSpanEntry(

pkg/backup/restore_job.go

Lines changed: 88 additions & 23 deletions
Original file line numberDiff line numberDiff line change
@@ -404,6 +404,10 @@ func restore(
404404
job := resumer.job
405405
details := job.Details().(jobspb.RestoreDetails)
406406

407+
// resolvedURIs holds the main URIs after resolving any external:// aliases.
408+
// This is set in the OnlineImpl block below and used by runRestore.
409+
resolvedURIs := details.URIs
410+
407411
if details.OnlineImpl() {
408412
var linkPhaseComplete bool
409413
if err := execCtx.ExecCfg().InternalDB.Txn(restoreCtx, func(ctx context.Context, txn isql.Txn) error {
@@ -417,6 +421,18 @@ func restore(
417421
if linkPhaseComplete {
418422
return emptyRowCount, nil
419423
}
424+
425+
// If any URIs utilize external:// aliases, we need to resolve the alias
426+
// to its underlying URI before feeding it into online restore. This
427+
// applies to the main URIs, the locality info URIs, and the backup
428+
// manifest Dir fields.
429+
var err error
430+
resolvedURIs, backupLocalityInfo, err = resolveExternalStorageURIs(
431+
restoreCtx, execCtx, details.URIs, backupLocalityInfo, backupManifests,
432+
)
433+
if err != nil {
434+
return emptyRowCount, errors.Wrap(err, "resolving external storage URIs for online restore")
435+
}
420436
}
421437

422438
// If we've already migrated some of the system tables we're about to
@@ -510,6 +526,7 @@ func restore(
510526
filter,
511527
fsc,
512528
spanCh,
529+
false, /* useLink */
513530
), "generate and send import spans")
514531
}
515532

@@ -553,9 +570,15 @@ func restore(
553570
tasks = append(tasks, jobProgressLoop)
554571
}
555572

573+
// Check if online restore should use the distributed flow with file linking
574+
// instead of the simpler sendAddRemoteSSTs loop.
575+
useDistFlow := onlineRestoreUseDistFlow.Get(&execCtx.ExecCfg().Settings.SV)
576+
556577
progCh := make(chan *execinfrapb.RemoteProducerMetadata_BulkProcessorProgress)
557-
if !details.OnlineImpl() {
558-
// Online restore tracks progress by pinging requestFinishedCh instead
578+
// Start the progress checkpoint loop if this is a traditional restore OR
579+
// an online restore using the distributed flow (which also sends progress
580+
// updates via progCh).
581+
if !details.OnlineImpl() || useDistFlow {
559582
generativeCheckpointLoop := func(ctx context.Context) error {
560583
defer close(requestFinishedCh)
561584
for progress := range progCh {
@@ -599,7 +622,10 @@ func restore(
599622
}
600623

601624
resumeClusterVersion := execCtx.ExecCfg().Settings.Version.ActiveVersion(restoreCtx).Version
602-
if clusterversion.V24_3.Version().LessEq(resumeClusterVersion) && !details.OnlineImpl() {
625+
// Start the countCompletedProcLoop if this is a traditional restore OR
626+
// an online restore using the distributed flow (which also sends processor
627+
// completion signals via procCompleteCh).
628+
if clusterversion.V24_3.Version().LessEq(resumeClusterVersion) && (!details.OnlineImpl() || useDistFlow) {
603629
tasks = append(tasks, countCompletedProcLoop)
604630
}
605631

@@ -628,39 +654,78 @@ func restore(
628654
runRestore := func(ctx context.Context) error {
629655
if details.OnlineImpl() {
630656
log.Dev.Warningf(ctx, "EXPERIMENTAL ONLINE RESTORE being used")
631-
approxRows, approxDataSize, err := sendAddRemoteSSTs(
632-
ctx,
633-
execCtx,
634-
job,
635-
dataToRestore,
636-
encryption,
637-
details.URIs,
638-
backupLocalityInfo,
639-
requestFinishedCh,
640-
tracingAggCh,
641-
genSpan,
642-
)
643-
progressTracker.mu.Lock()
644-
defer progressTracker.mu.Unlock()
645-
// During the link phase of online restore, we do not update stats
646-
// progress as job occurs. We merely reuse the `progressTracker.mu.res`
647-
// var to reduce the number of local vars floating around in `restore`.
648-
progressTracker.mu.res = roachpb.RowCount{Rows: approxRows, DataSize: approxDataSize}
649-
return errors.Wrap(err, "sending remote AddSSTable requests")
657+
658+
// Use the bespoke online restore path that directly splits and links from
659+
// the coordinator without a distSQL restore flow.
660+
if !useDistFlow {
661+
approxRows, approxDataSize, err := sendAddRemoteSSTs(
662+
ctx,
663+
execCtx,
664+
job,
665+
dataToRestore,
666+
encryption,
667+
resolvedURIs,
668+
backupLocalityInfo,
669+
requestFinishedCh,
670+
tracingAggCh,
671+
genSpan,
672+
)
673+
progressTracker.mu.Lock()
674+
defer progressTracker.mu.Unlock()
675+
// During the link phase of online restore, we do not update stats
676+
// progress as job occurs. We merely reuse the `progressTracker.mu.res`
677+
// var to reduce the number of local vars floating around in `restore`.
678+
progressTracker.mu.res = roachpb.RowCount{Rows: approxRows, DataSize: approxDataSize}
679+
return errors.Wrap(err, "sending remote AddSSTable requests")
680+
}
681+
682+
// If we did not switch to the bespoke online restore path, we'll proceed
683+
// to the normal restore distSQL flow and let its split and scattter
684+
// processor direct the restore data processors to link files rather than
685+
// ingest them. But we do need to pre-split at the top-level logical spans
686+
// that are being restored, as stored in DownloadSpan, as these are what
687+
// we will clear via kv/pebble excises if we fail at any point after we
688+
// start linking files.
689+
//
690+
// TODO(dt): we should record in persisted progress when we are ready to
691+
// enter the linking/ingesting phase, i.e. *after* we make these splits,
692+
// so any failures prior to it can skip cleanup, as that cleanup could
693+
// fail if we failed prior to making these splits.
694+
var prevSplit roachpb.Key
695+
for _, span := range details.DownloadSpans {
696+
if !span.Key.Equal(prevSplit) {
697+
if err := execCtx.ExecCfg().DB.AdminSplit(ctx, span.Key, hlc.MaxTimestamp); err != nil {
698+
return errors.Wrapf(err, "pre-splitting at key %s", span.Key)
699+
}
700+
}
701+
if err := execCtx.ExecCfg().DB.AdminSplit(ctx, span.EndKey, hlc.MaxTimestamp); err != nil {
702+
return errors.Wrapf(err, "pre-splitting at key %s", span.EndKey)
703+
}
704+
prevSplit = span.EndKey
705+
}
706+
}
707+
708+
// Use the distributed restore flow. If this is an online restore (with
709+
// useDistFlow, since we returned above otherwise), link files instead of
710+
// ingesting them.
711+
useLink := details.OnlineImpl()
712+
if useLink {
713+
log.Dev.Infof(ctx, "online restore using distributed flow with file linking")
650714
}
651715
md := restoreJobMetadata{
652716
jobID: job.ID(),
653717
dataToRestore: dataToRestore,
654718
restoreTime: endTime,
655719
encryption: encryption,
656720
kmsEnv: kmsEnv,
657-
uris: details.URIs,
721+
uris: resolvedURIs,
658722
backupLocalityInfo: backupLocalityInfo,
659723
spanFilter: filter,
660724
numImportSpans: numImportSpans,
661725
execLocality: details.ExecutionLocality,
662726
exclusiveEndKeys: fsc.isExclusive(),
663727
resumeClusterVersion: resumeClusterVersion,
728+
useLink: useLink,
664729
}
665730
return errors.Wrap(distRestore(
666731
ctx,

0 commit comments

Comments
 (0)