Skip to content

Commit 8385dd7

Browse files
craig[bot]dt
andcommitted
162100: backup: implement online restore in distsql flow r=dt a=dt This implements online restore using the normal restore distsql flow, rather than the bespoke online restore paths used in its initial prototype. This is behind a setting for now, but eventually could be the basis for extending online restore -- now unified with regular restore -- to support ingesting rather than linking some data (such as revision-history layers). Release note: none. Epic: none. Co-authored-by: David Taylor <davidt@davidt.io>
2 parents e4700ff + 35f2786 commit 8385dd7

13 files changed

+504
-66
lines changed

pkg/backup/bench_covering_test.go

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -109,7 +109,9 @@ func BenchmarkRestoreEntryCover(b *testing.B) {
109109
nil,
110110
filter,
111111
&inclusiveEndKeyComparator{},
112-
spanCh)
112+
spanCh,
113+
false, /* useLink */
114+
)
113115
})
114116

115117
var cov []execinfrapb.RestoreSpanEntry

pkg/backup/compaction_dist.go

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -96,6 +96,7 @@ func runCompactionPlan(
9696
filter,
9797
fsc,
9898
spanCh,
99+
false, /* useLink */
99100
), "generateAndSendImportSpans")
100101
}
101102
dsp := execCtx.DistSQLPlanner()

pkg/backup/compaction_processor.go

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -205,6 +205,7 @@ func (p *compactBackupsProcessor) runCompactBackups(ctx context.Context) error {
205205
filter,
206206
fsc,
207207
entryCh,
208+
false, /* useLink */
208209
), "generate and send import spans")
209210
}
210211

pkg/backup/generative_split_and_scatter_processor.go

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -461,6 +461,7 @@ func runGenerativeSplitAndScatter(
461461
) error {
462462
log.Dev.Infof(ctx, "Running generative split and scatter with %d total spans, %d chunk size, %d nodes",
463463
spec.NumEntries, spec.ChunkSize, spec.NumNodes)
464+
464465
g := ctxgroup.WithContext(ctx)
465466

466467
chunkSplitAndScatterWorkers := len(chunkSplitAndScatterers)
@@ -512,6 +513,7 @@ func runGenerativeSplitAndScatter(
512513
filter,
513514
fsc,
514515
restoreSpanEntriesCh,
516+
spec.UseLink,
515517
), "generating and sending import spans")
516518
})
517519

pkg/backup/restore_data_processor.go

Lines changed: 150 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -31,6 +31,7 @@ import (
3131
"github.com/cockroachdb/cockroach/pkg/sql/sem/tree"
3232
"github.com/cockroachdb/cockroach/pkg/sql/types"
3333
"github.com/cockroachdb/cockroach/pkg/storage"
34+
"github.com/cockroachdb/cockroach/pkg/storage/enginepb"
3435
bulkutil "github.com/cockroachdb/cockroach/pkg/util/bulk"
3536
"github.com/cockroachdb/cockroach/pkg/util/ctxgroup"
3637
"github.com/cockroachdb/cockroach/pkg/util/hlc"
@@ -420,6 +421,27 @@ func (rd *restoreDataProcessor) runRestoreWorkers(
420421
ctx, sp := tracing.ChildSpan(ctx, "restore.processRestoreSpanEntry")
421422
defer sp.Finish()
422423

424+
// Check if the first file has UseLink set. If so, link all files
425+
// in this entry via LinkExternalSSTable instead of ingesting.
426+
// TODO(dt): support entries where some layers are link and some are not.
427+
if entry.Files[0].UseLink {
428+
for _, file := range entry.Files {
429+
if !file.UseLink {
430+
return done, errors.AssertionFailedf("invalid non-linked file in linking restore")
431+
}
432+
}
433+
summary, err := rd.linkFiles(ctx, kr, entry)
434+
if err != nil {
435+
return done, errors.Wrap(err, "linking files")
436+
}
437+
select {
438+
case rd.progCh <- makeProgressUpdate(summary, entry, rd.spec.PKIDs, rd.spec.RestoreTime):
439+
case <-ctx.Done():
440+
return done, errors.Wrap(ctx.Err(), "sending progress update")
441+
}
442+
return done, nil
443+
}
444+
423445
var res *resumeEntry
424446
for {
425447
sstIter, res, err = rd.openSSTs(ctx, entry, res)
@@ -454,6 +476,134 @@ func (rd *restoreDataProcessor) runRestoreWorkers(
454476
})
455477
}
456478

479+
// linkFiles links all files in the entry via LinkExternalSSTable instead of
480+
// downloading and ingesting them. This is used for online restore when the
481+
// UseLink flag is set on files.
482+
func (rd *restoreDataProcessor) linkFiles(
483+
ctx context.Context, kr *KeyRewriter, entry execinfrapb.RestoreSpanEntry,
484+
) (kvpb.BulkOpSummary, error) {
485+
ctx, sp := tracing.ChildSpan(ctx, "restore.linkFiles")
486+
defer sp.Finish()
487+
488+
var summary kvpb.BulkOpSummary
489+
kvDB := rd.FlowCtx.Cfg.DB.KV()
490+
491+
if err := assertCommonPrefix(entry.Span, entry.ElidedPrefix); err != nil {
492+
return summary, err
493+
}
494+
495+
var currentLayer int32
496+
for i := range entry.Files {
497+
file := &entry.Files[i]
498+
if file.Layer < currentLayer {
499+
return summary, errors.AssertionFailedf("files not sorted by layer")
500+
}
501+
currentLayer = file.Layer
502+
if file.HasRangeKeys {
503+
return summary, errors.Wrapf(permanentRestoreError, "online restore of range keys not supported")
504+
}
505+
if err := assertCommonPrefix(file.BackupFileEntrySpan, entry.ElidedPrefix); err != nil {
506+
return summary, err
507+
}
508+
509+
restoringSubspan := file.BackupFileEntrySpan.Intersect(entry.Span)
510+
if !restoringSubspan.Valid() {
511+
return summary, errors.AssertionFailedf("file %s with span %s has no overlap with restore span %s",
512+
file.Path,
513+
file.BackupFileEntrySpan,
514+
entry.Span,
515+
)
516+
}
517+
518+
var rewriteErr error
519+
restoringSubspan, rewriteErr = rewriteSpan(kr, restoringSubspan.Clone(), entry.ElidedPrefix)
520+
if rewriteErr != nil {
521+
return summary, rewriteErr
522+
}
523+
524+
log.VEventf(ctx, 2, "linking file %s (file span: %s) to span %s", file.Path, file.BackupFileEntrySpan, restoringSubspan)
525+
526+
counts := file.BackupFileEntryCounts
527+
fileSize := file.ApproximatePhysicalSize
528+
if fileSize == 0 {
529+
fileSize = uint64(counts.DataSize)
530+
}
531+
if fileSize == 0 {
532+
fileSize = 16 << 20 // guess
533+
}
534+
535+
// The synthetic prefix is computed from the REWRITTEN span key, not the
536+
// original file span key. This matches the old online restore path which
537+
// overwrites file.BackupFileEntrySpan before computing the prefix.
538+
syntheticPrefix, err := backupsink.ElidedPrefix(restoringSubspan.Key, entry.ElidedPrefix)
539+
if err != nil {
540+
return summary, err
541+
}
542+
543+
fileStats := &enginepb.MVCCStats{
544+
ContainsEstimates: 1,
545+
KeyBytes: counts.DataSize / 2,
546+
ValBytes: counts.DataSize / 2,
547+
LiveBytes: counts.DataSize,
548+
KeyCount: counts.Rows + counts.IndexEntries,
549+
LiveCount: counts.Rows + counts.IndexEntries,
550+
}
551+
552+
var batchTimestamp hlc.Timestamp
553+
if writeAtBatchTS(ctx, restoringSubspan, kr.fromSystemTenant) {
554+
batchTimestamp = kvDB.Clock().Now()
555+
}
556+
557+
loc := kvpb.LinkExternalSSTableRequest_ExternalFile{
558+
Locator: file.Dir.URI,
559+
Path: file.Path,
560+
ApproximatePhysicalSize: fileSize,
561+
BackingFileSize: file.BackingFileSize,
562+
SyntheticPrefix: syntheticPrefix,
563+
UseSyntheticSuffix: batchTimestamp.IsSet(),
564+
MVCCStats: fileStats,
565+
}
566+
567+
if err := kvDB.LinkExternalSSTable(ctx, restoringSubspan, loc, batchTimestamp); err != nil {
568+
return summary, errors.Wrap(err, "linking external SSTable")
569+
}
570+
571+
// Call testing knob after each file link, matching the old online restore path.
572+
if restoreKnobs, ok := rd.FlowCtx.TestingKnobs().BackupRestoreTestingKnobs.(*sql.BackupRestoreTestingKnobs); ok {
573+
if restoreKnobs.AfterAddRemoteSST != nil {
574+
if err := restoreKnobs.AfterAddRemoteSST(); err != nil {
575+
return summary, err
576+
}
577+
}
578+
}
579+
580+
// Use ApproximatePhysicalSize for DataSize to match the coordinator path.
581+
summary.DataSize += int64(fileSize)
582+
583+
// Populate EntryCounts so that countRows() can compute row counts for
584+
// progress tracking. We need to use a key that's in pkIDs for rows to be
585+
// counted correctly. Pick any key from pkIDs (the specific key doesn't
586+
// matter since we're just summing totals).
587+
if counts.Rows > 0 || counts.IndexEntries > 0 {
588+
if summary.EntryCounts == nil {
589+
summary.EntryCounts = make(map[uint64]int64)
590+
}
591+
// Use a pkID key for rows so countRows counts them as rows.
592+
for pkID := range rd.spec.PKIDs {
593+
summary.EntryCounts[pkID] += counts.Rows
594+
break // only need one key
595+
}
596+
// Use key 0 for index entries (won't be in pkIDs, so counted as index entries).
597+
if counts.IndexEntries > 0 {
598+
summary.EntryCounts[0] += counts.IndexEntries
599+
}
600+
}
601+
}
602+
603+
rd.progressMade = true
604+
return summary, nil
605+
}
606+
457607
var backupFileReadError = errors.New("error reading backup file")
458608

459609
func (rd *restoreDataProcessor) processRestoreSpanEntry(

pkg/backup/restore_job.go

Lines changed: 88 additions & 23 deletions
Original file line numberDiff line numberDiff line change
@@ -404,6 +404,10 @@ func restore(
404404
job := resumer.job
405405
details := job.Details().(jobspb.RestoreDetails)
406406

407+
// resolvedURIs holds the main URIs after resolving any external:// aliases.
408+
// This is set in the OnlineImpl block below and used by runRestore.
409+
resolvedURIs := details.URIs
410+
407411
if details.OnlineImpl() {
408412
var linkPhaseComplete bool
409413
if err := execCtx.ExecCfg().InternalDB.Txn(restoreCtx, func(ctx context.Context, txn isql.Txn) error {
@@ -417,6 +421,18 @@ func restore(
417421
if linkPhaseComplete {
418422
return emptyRowCount, nil
419423
}
424+
425+
// If any URIs utilize external:// aliases, we need to resolve the alias
426+
// to its underlying URI before feeding it into online restore. This
427+
// applies to the main URIs, the locality info URIs, and the backup
428+
// manifest Dir fields.
429+
var err error
430+
resolvedURIs, backupLocalityInfo, err = resolveExternalStorageURIs(
431+
restoreCtx, execCtx, details.URIs, backupLocalityInfo, backupManifests,
432+
)
433+
if err != nil {
434+
return emptyRowCount, errors.Wrap(err, "resolving external storage URIs for online restore")
435+
}
420436
}
421437

422438
// If we've already migrated some of the system tables we're about to
@@ -510,6 +526,7 @@ func restore(
510526
filter,
511527
fsc,
512528
spanCh,
529+
false, /* useLink */
513530
), "generate and send import spans")
514531
}
515532

@@ -553,9 +570,15 @@ func restore(
553570
tasks = append(tasks, jobProgressLoop)
554571
}
555572

573+
// Check if online restore should use the distributed flow with file linking
574+
// instead of the simpler sendAddRemoteSSTs loop.
575+
useDistFlow := onlineRestoreUseDistFlow.Get(&execCtx.ExecCfg().Settings.SV)
576+
556577
progCh := make(chan *execinfrapb.RemoteProducerMetadata_BulkProcessorProgress)
557-
if !details.OnlineImpl() {
558-
// Online restore tracks progress by pinging requestFinishedCh instead
578+
// Start the progress checkpoint loop if this is a traditional restore OR
579+
// an online restore using the distributed flow (which also sends progress
580+
// updates via progCh).
581+
if !details.OnlineImpl() || useDistFlow {
559582
generativeCheckpointLoop := func(ctx context.Context) error {
560583
defer close(requestFinishedCh)
561584
for progress := range progCh {
@@ -599,7 +622,10 @@ func restore(
599622
}
600623

601624
resumeClusterVersion := execCtx.ExecCfg().Settings.Version.ActiveVersion(restoreCtx).Version
602-
if clusterversion.V24_3.Version().LessEq(resumeClusterVersion) && !details.OnlineImpl() {
625+
// Start the countCompletedProcLoop if this is a traditional restore OR
626+
// an online restore using the distributed flow (which also sends processor
627+
// completion signals via procCompleteCh).
628+
if clusterversion.V24_3.Version().LessEq(resumeClusterVersion) && (!details.OnlineImpl() || useDistFlow) {
603629
tasks = append(tasks, countCompletedProcLoop)
604630
}
605631

@@ -628,39 +654,78 @@ func restore(
628654
runRestore := func(ctx context.Context) error {
629655
if details.OnlineImpl() {
630656
log.Dev.Warningf(ctx, "EXPERIMENTAL ONLINE RESTORE being used")
631-
approxRows, approxDataSize, err := sendAddRemoteSSTs(
632-
ctx,
633-
execCtx,
634-
job,
635-
dataToRestore,
636-
encryption,
637-
details.URIs,
638-
backupLocalityInfo,
639-
requestFinishedCh,
640-
tracingAggCh,
641-
genSpan,
642-
)
643-
progressTracker.mu.Lock()
644-
defer progressTracker.mu.Unlock()
645-
// During the link phase of online restore, we do not update stats
646-
// progress as job occurs. We merely reuse the `progressTracker.mu.res`
647-
// var to reduce the number of local vars floating around in `restore`.
648-
progressTracker.mu.res = roachpb.RowCount{Rows: approxRows, DataSize: approxDataSize}
649-
return errors.Wrap(err, "sending remote AddSSTable requests")
657+
658+
// Use the bespoke online restore path that directly splits and links from
659+
// the coordinator without a distSQL restore flow.
660+
if !useDistFlow {
661+
approxRows, approxDataSize, err := sendAddRemoteSSTs(
662+
ctx,
663+
execCtx,
664+
job,
665+
dataToRestore,
666+
encryption,
667+
resolvedURIs,
668+
backupLocalityInfo,
669+
requestFinishedCh,
670+
tracingAggCh,
671+
genSpan,
672+
)
673+
progressTracker.mu.Lock()
674+
defer progressTracker.mu.Unlock()
675+
// During the link phase of online restore, we do not update stats
676+
// progress as job occurs. We merely reuse the `progressTracker.mu.res`
677+
// var to reduce the number of local vars floating around in `restore`.
678+
progressTracker.mu.res = roachpb.RowCount{Rows: approxRows, DataSize: approxDataSize}
679+
return errors.Wrap(err, "sending remote AddSSTable requests")
680+
}
681+
682+
// If we did not switch to the bespoke online restore path, we'll proceed
683+
// to the normal restore distSQL flow and let its split and scattter
684+
// processor direct the restore data processors to link files rather than
685+
// ingest them. But we do need to pre-split at the top-level logical spans
686+
// that are being restored, as stored in DownloadSpan, as these are what
687+
// we will clear via kv/pebble excises if we fail at any point after we
688+
// start linking files.
689+
//
690+
// TODO(dt): we should record in persisted progress when we are ready to
691+
// enter the linking/ingesting phase, i.e. *after* we make these splits,
692+
// so any failures prior to it can skip cleanup, as that cleanup could
693+
// fail if we failed prior to making these splits.
694+
var prevSplit roachpb.Key
695+
for _, span := range details.DownloadSpans {
696+
if !span.Key.Equal(prevSplit) {
697+
if err := execCtx.ExecCfg().DB.AdminSplit(ctx, span.Key, hlc.MaxTimestamp); err != nil {
698+
return errors.Wrapf(err, "pre-splitting at key %s", span.Key)
699+
}
700+
}
701+
if err := execCtx.ExecCfg().DB.AdminSplit(ctx, span.EndKey, hlc.MaxTimestamp); err != nil {
702+
return errors.Wrapf(err, "pre-splitting at key %s", span.EndKey)
703+
}
704+
prevSplit = span.EndKey
705+
}
706+
}
707+
708+
// Use the distributed restore flow. If this is an online restore (with
709+
// useDistFlow, since we returned above otherwise), link files instead of
710+
// ingesting them.
711+
useLink := details.OnlineImpl()
712+
if useLink {
713+
log.Dev.Infof(ctx, "online restore using distributed flow with file linking")
650714
}
651715
md := restoreJobMetadata{
652716
jobID: job.ID(),
653717
dataToRestore: dataToRestore,
654718
restoreTime: endTime,
655719
encryption: encryption,
656720
kmsEnv: kmsEnv,
657-
uris: details.URIs,
721+
uris: resolvedURIs,
658722
backupLocalityInfo: backupLocalityInfo,
659723
spanFilter: filter,
660724
numImportSpans: numImportSpans,
661725
execLocality: details.ExecutionLocality,
662726
exclusiveEndKeys: fsc.isExclusive(),
663727
resumeClusterVersion: resumeClusterVersion,
728+
useLink: useLink,
664729
}
665730
return errors.Wrap(distRestore(
666731
ctx,

0 commit comments

Comments
 (0)