Skip to content

Commit bdbdbf1

Browse files
craig[bot]msbutler
andcommitted
Merge #143674
143674: backup: add experimental copy option to restore r=jeffswenson a=msbutler This patch adds the `experimental copy` option to restore to conduct an online restore with a blocking download job-- i.e. the job will only complete once the download completes. An async download job will not be spun up. Epic: none Release note: this patch adds the new `experimental copy` to restore which runs online restore, but waits to publish the tables until all data is downloaded. Co-authored-by: Michael Butler <[email protected]>
2 parents 70cf580 + 1565634 commit bdbdbf1

File tree

14 files changed

+142
-63
lines changed

14 files changed

+142
-63
lines changed

docs/generated/sql/bnf/restore_options.bnf

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -18,4 +18,5 @@ restore_options ::=
1818
| 'UNSAFE_RESTORE_INCOMPATIBLE_VERSION'
1919
| 'EXECUTION' 'LOCALITY' '=' string_or_placeholder
2020
| 'EXPERIMENTAL' 'DEFERRED' 'COPY'
21+
| 'EXPERIMENTAL' 'COPY'
2122
| 'REMOVE_REGIONS'

docs/generated/sql/bnf/stmt_block.bnf

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -2882,6 +2882,7 @@ restore_options ::=
28822882
| 'UNSAFE_RESTORE_INCOMPATIBLE_VERSION'
28832883
| 'EXECUTION' 'LOCALITY' '=' string_or_placeholder
28842884
| 'EXPERIMENTAL' 'DEFERRED' 'COPY'
2885+
| 'EXPERIMENTAL' 'COPY'
28852886
| 'REMOVE_REGIONS'
28862887

28872888
scrub_option_list ::=

pkg/backup/backup_cloud_test.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -139,7 +139,7 @@ func TestOnlineRestoreS3(t *testing.T) {
139139
_, rSQLDB, cleanupFnRestored := backupRestoreTestSetupEmpty(t, 1, "", InitManualReplication, params)
140140
defer cleanupFnRestored()
141141

142-
bankOnlineRestore(t, rSQLDB, numAccounts, externalStorage)
142+
bankOnlineRestore(t, rSQLDB, numAccounts, externalStorage, false)
143143
}
144144

145145
// TestBackupRestoreGoogleCloudStorage hits the real GCS and so could

pkg/backup/restore_job.go

Lines changed: 28 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -334,11 +334,11 @@ func restore(
334334
defer introducedSpanFrontier.Release()
335335

336336
targetSize := targetRestoreSpanSize.Get(&execCtx.ExecCfg().Settings.SV)
337-
if details.ExperimentalOnline {
337+
if details.OnlineImpl() {
338338
targetSize = targetOnlineRestoreSpanSize.Get(&execCtx.ExecCfg().Settings.SV)
339339
}
340340
maxFileCount := maxFileCount.Get(&execCtx.ExecCfg().Settings.SV)
341-
if details.ExperimentalOnline {
341+
if details.OnlineImpl() {
342342
// Online Restore does not need to limit the number of files per restore
343343
// span entry as the files are never opened when processing the span. The
344344
// span is only used to create split points.
@@ -433,7 +433,7 @@ func restore(
433433
}
434434

435435
progCh := make(chan *execinfrapb.RemoteProducerMetadata_BulkProcessorProgress)
436-
if !details.ExperimentalOnline {
436+
if !details.OnlineImpl() {
437437
// Online restore tracks progress by pinging requestFinishedCh instead
438438
generativeCheckpointLoop := func(ctx context.Context) error {
439439
defer close(requestFinishedCh)
@@ -479,7 +479,7 @@ func restore(
479479
}
480480

481481
resumeClusterVersion := execCtx.ExecCfg().Settings.Version.ActiveVersion(restoreCtx).Version
482-
if clusterversion.V24_3.Version().LessEq(resumeClusterVersion) && !details.ExperimentalOnline {
482+
if clusterversion.V24_3.Version().LessEq(resumeClusterVersion) && !details.OnlineImpl() {
483483
tasks = append(tasks, countCompletedProcLoop)
484484
}
485485

@@ -506,7 +506,7 @@ func restore(
506506
tasks = append(tasks, tracingAggLoop)
507507

508508
runRestore := func(ctx context.Context) error {
509-
if details.ExperimentalOnline {
509+
if details.OnlineImpl() {
510510
log.Warningf(ctx, "EXPERIMENTAL ONLINE RESTORE being used")
511511
approxRows, approxDataSize, err := sendAddRemoteSSTs(
512512
ctx,
@@ -997,7 +997,7 @@ func createImportingDescriptors(
997997
}
998998

999999
if backedUpDescriptorWithInProgressImportInto(desc) {
1000-
if details.ExperimentalOnline && !epochBasedInProgressImport(desc) {
1000+
if details.OnlineImpl() && !epochBasedInProgressImport(desc) {
10011001
return nil, nil, nil, errors.Newf("table %s (id %d) in restoring backup has an in-progress import, but online restore cannot be run on a table with an in progress import", desc.GetName(), desc.GetID())
10021002
}
10031003
} else {
@@ -1048,19 +1048,19 @@ func createImportingDescriptors(
10481048

10491049
// We get the spans of the restoring tables _as they appear in the backup_,
10501050
// that is, in the 'old' keyspace, before we reassign the table IDs.
1051-
preRestoreSpans, err := spansForAllRestoreTableIndexes(backupCodec, preRestoreTables, nil, details.SchemaOnly, details.ExperimentalOnline)
1051+
preRestoreSpans, err := spansForAllRestoreTableIndexes(backupCodec, preRestoreTables, nil, details.SchemaOnly, details.OnlineImpl())
10521052
if err != nil {
10531053
return nil, nil, nil, err
10541054
}
1055-
postRestoreSpans, err := spansForAllRestoreTableIndexes(backupCodec, postRestoreTables, nil, details.SchemaOnly, details.ExperimentalOnline)
1055+
postRestoreSpans, err := spansForAllRestoreTableIndexes(backupCodec, postRestoreTables, nil, details.SchemaOnly, details.OnlineImpl())
10561056
if err != nil {
10571057
return nil, nil, nil, err
10581058
}
10591059
var verifySpans []roachpb.Span
10601060
if details.VerifyData {
10611061
// verifySpans contains the spans that should be read and checksum'd during a
10621062
// verify_backup_table_data RESTORE
1063-
verifySpans, err = spansForAllRestoreTableIndexes(backupCodec, postRestoreTables, nil, false, details.ExperimentalOnline)
1063+
verifySpans, err = spansForAllRestoreTableIndexes(backupCodec, postRestoreTables, nil, false, details.OnlineImpl())
10641064
if err != nil {
10651065
return nil, nil, nil, err
10661066
}
@@ -1769,7 +1769,7 @@ func (r *restoreResumer) doResume(ctx context.Context, execCtx interface{}) erro
17691769
if err := p.ExecCfg().JobRegistry.CheckPausepoint("restore.before_do_download_files"); err != nil {
17701770
return err
17711771
}
1772-
return r.doDownloadFiles(ctx, p)
1772+
return r.doDownloadFiles(ctx, p, details.DownloadSpans)
17731773
}
17741774

17751775
if err := p.ExecCfg().JobRegistry.CheckPausepoint("restore.before_load_descriptors_from_backup"); err != nil {
@@ -1866,9 +1866,6 @@ func (r *restoreResumer) doResume(ctx context.Context, execCtx interface{}) erro
18661866
return err
18671867
}
18681868
}
1869-
if err := r.maybeWriteDownloadJob(ctx, p.ExecCfg(), preData, mainData); err != nil {
1870-
return err
1871-
}
18721869
emitRestoreJobEvent(ctx, p, jobs.StateSucceeded, r.job)
18731870
return nil
18741871
}
@@ -1897,6 +1894,10 @@ func (r *restoreResumer) doResume(ctx context.Context, execCtx interface{}) erro
18971894

18981895
var resTotal roachpb.RowCount
18991896

1897+
// TODO(msbutler): add a progress field to skip running a restore flow if it
1898+
// has already complete. This ensures we skip the link phase if we resume an
1899+
// online restore job that blocks on the download job.
1900+
19001901
if !preData.isEmpty() {
19011902
res, err := restoreWithRetry(
19021903
ctx,
@@ -1981,6 +1982,15 @@ func (r *restoreResumer) doResume(ctx context.Context, execCtx interface{}) erro
19811982
if err := insertStats(ctx, r.job, p.ExecCfg(), remappedStats); err != nil {
19821983
return errors.Wrap(err, "inserting table statistics")
19831984
}
1985+
if details.ExperimentalCopy {
1986+
downloadSpans, err := getDownloadSpans(p.ExecCfg().Codec, preData, mainData)
1987+
if err != nil {
1988+
return err
1989+
}
1990+
if err := r.doDownloadFiles(ctx, p, downloadSpans); err != nil {
1991+
return err
1992+
}
1993+
}
19841994

19851995
publishDescriptors := func(ctx context.Context, txn descs.Txn) (err error) {
19861996
return r.publishDescriptors(
@@ -2058,7 +2068,7 @@ func (r *restoreResumer) doResume(ctx context.Context, execCtx interface{}) erro
20582068
if err := r.execCfg.ProtectedTimestampManager.Unprotect(ctx, r.job); err != nil {
20592069
log.Errorf(ctx, "failed to release protected timestamp: %v", err)
20602070
}
2061-
if !details.ExperimentalOnline {
2071+
if !details.OnlineImpl() {
20622072
r.notifyStatsRefresherOfNewTables()
20632073
}
20642074

@@ -2169,7 +2179,7 @@ func (r *restoreResumer) ReportResults(ctx context.Context, resultsCh chan<- tre
21692179
return ctx.Err()
21702180
case resultsCh <- func() tree.Datums {
21712181
details := r.job.Details().(jobspb.RestoreDetails)
2172-
if details.ExperimentalOnline {
2182+
if details.OnlineImpl() {
21732183
return tree.Datums{
21742184
tree.NewDInt(tree.DInt(r.job.ID())),
21752185
tree.NewDInt(tree.DInt(len(details.TableDescs))),
@@ -2440,7 +2450,7 @@ func (r *restoreResumer) publishDescriptors(
24402450
return err
24412451
}
24422452

2443-
if details.ExperimentalOnline && epochBasedInProgressImport(desc) {
2453+
if details.OnlineImpl() && epochBasedInProgressImport(desc) {
24442454
if err := createImportRollbackJob(ctx,
24452455
r.execCfg.JobRegistry, txn, r.job.Payload().UsernameProto.Decode(), mutTable,
24462456
); err != nil {
@@ -2481,7 +2491,7 @@ func (r *restoreResumer) publishDescriptors(
24812491
b := txn.KV().NewBatch()
24822492
if err := all.ForEachDescriptor(func(desc catalog.Descriptor) error {
24832493
d := desc.(catalog.MutableDescriptor)
2484-
if details.ExperimentalOnline && epochBasedInProgressImport(desc) {
2494+
if details.OnlineImpl() && epochBasedInProgressImport(desc) {
24852495
log.Infof(ctx, "table %q (%d) with in-progress IMPORT remaining offline", desc.GetName(), desc.GetID())
24862496
} else {
24872497
d.SetPublic()
@@ -2520,7 +2530,7 @@ func (r *restoreResumer) publishDescriptors(
25202530
details.SchemaDescs = newSchemas
25212531
details.DatabaseDescs = newDBs
25222532
details.FunctionDescs = newFunctions
2523-
if details.ExperimentalOnline {
2533+
if details.OnlineImpl() {
25242534
details.PostDownloadTableAutoStatsSettings = tableAutoStatsSettings
25252535
}
25262536
if err := r.job.WithTxn(txn).SetDetails(ctx, details); err != nil {

pkg/backup/restore_online.go

Lines changed: 29 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,7 @@ import (
1717
"github.com/cockroachdb/cockroach/pkg/clusterversion"
1818
"github.com/cockroachdb/cockroach/pkg/jobs"
1919
"github.com/cockroachdb/cockroach/pkg/jobs/jobspb"
20+
"github.com/cockroachdb/cockroach/pkg/keys"
2021
"github.com/cockroachdb/cockroach/pkg/kv/kvpb"
2122
"github.com/cockroachdb/cockroach/pkg/roachpb"
2223
"github.com/cockroachdb/cockroach/pkg/security/username"
@@ -566,25 +567,18 @@ func sendDownloadSpan(ctx context.Context, execCtx sql.JobExecContext, spans roa
566567
return nil
567568
}
568569

569-
func (r *restoreResumer) maybeWriteDownloadJob(
570-
ctx context.Context,
571-
execConfig *sql.ExecutorConfig,
572-
preRestoreData *restorationDataBase,
573-
mainRestoreData *mainRestorationData,
574-
) error {
575-
details := r.job.Details().(jobspb.RestoreDetails)
576-
if !details.ExperimentalOnline {
577-
return nil
578-
}
570+
func getDownloadSpans(
571+
codec keys.SQLCodec, preRestoreData *restorationDataBase, mainRestoreData *mainRestorationData,
572+
) (roachpb.Spans, error) {
579573
rekey := mainRestoreData.getRekeys()
580574
rekey = append(rekey, preRestoreData.getRekeys()...)
581575

582576
tenantRekey := mainRestoreData.getTenantRekeys()
583577
tenantRekey = append(tenantRekey, preRestoreData.getTenantRekeys()...)
584-
kr, err := MakeKeyRewriterFromRekeys(execConfig.Codec, rekey, tenantRekey,
578+
kr, err := MakeKeyRewriterFromRekeys(codec, rekey, tenantRekey,
585579
false /* restoreTenantFromStream */)
586580
if err != nil {
587-
return errors.Wrap(err, "creating key rewriter from rekeys")
581+
return nil, errors.Wrap(err, "creating key rewriter from rekeys")
588582
}
589583
downloadSpans := mainRestoreData.getSpans()
590584

@@ -599,9 +593,27 @@ func (r *restoreResumer) maybeWriteDownloadJob(
599593
var err error
600594
downloadSpans[i], err = rewriteSpan(kr, downloadSpans[i].Clone(), execinfrapb.ElidePrefix_None)
601595
if err != nil {
602-
return err
596+
return nil, err
603597
}
604598
}
599+
return downloadSpans, nil
600+
}
601+
602+
func (r *restoreResumer) maybeWriteDownloadJob(
603+
ctx context.Context,
604+
execConfig *sql.ExecutorConfig,
605+
preRestoreData *restorationDataBase,
606+
mainRestoreData *mainRestorationData,
607+
) error {
608+
details := r.job.Details().(jobspb.RestoreDetails)
609+
if !details.ExperimentalOnline {
610+
return nil
611+
}
612+
613+
downloadSpans, err := getDownloadSpans(execConfig.Codec, preRestoreData, mainRestoreData)
614+
if err != nil {
615+
return errors.Wrap(err, "failed to get download spans")
616+
}
605617

606618
log.Infof(ctx, "creating job to track downloads in %d spans", len(downloadSpans))
607619
downloadJobRecord := jobs.Record{
@@ -721,13 +733,15 @@ func getRemainingExternalFileBytes(
721733
return remaining, nil
722734
}
723735

724-
func (r *restoreResumer) doDownloadFiles(ctx context.Context, execCtx sql.JobExecContext) error {
736+
func (r *restoreResumer) doDownloadFiles(
737+
ctx context.Context, execCtx sql.JobExecContext, downloadSpans roachpb.Spans,
738+
) error {
725739
details := r.job.Details().(jobspb.RestoreDetails)
726740

727741
grp := ctxgroup.WithContext(ctx)
728742
completionPoller := make(chan struct{})
729743

730-
grp.GoCtx(r.sendDownloadWorker(execCtx, details.DownloadSpans, completionPoller))
744+
grp.GoCtx(r.sendDownloadWorker(execCtx, downloadSpans, completionPoller))
731745
grp.GoCtx(func(ctx context.Context) error {
732746
return r.waitForDownloadToComplete(ctx, execCtx, details, completionPoller)
733747
})

pkg/backup/restore_online_test.go

Lines changed: 33 additions & 21 deletions
Original file line numberDiff line numberDiff line change
@@ -58,33 +58,37 @@ func TestOnlineRestoreBasic(t *testing.T) {
5858
createStmtRes := sqlDB.QueryStr(t, createStmt)
5959

6060
testutils.RunTrueAndFalse(t, "incremental", func(t *testing.T, incremental bool) {
61-
sqlDB.Exec(t, fmt.Sprintf("BACKUP DATABASE data INTO '%s'", externalStorage))
61+
testutils.RunTrueAndFalse(t, "blocking download", func(t *testing.T, blockingDownload bool) {
62+
sqlDB.Exec(t, fmt.Sprintf("BACKUP DATABASE data INTO '%s'", externalStorage))
6263

63-
if incremental {
64-
sqlDB.Exec(t, "UPDATE data.bank SET balance = balance+123 where true;")
65-
sqlDB.Exec(t, fmt.Sprintf("BACKUP DATABASE data INTO LATEST IN '%s'", externalStorage))
66-
}
64+
if incremental {
65+
sqlDB.Exec(t, "UPDATE data.bank SET balance = balance+123 where true;")
66+
sqlDB.Exec(t, fmt.Sprintf("BACKUP DATABASE data INTO LATEST IN '%s'", externalStorage))
67+
}
6768

68-
var preRestoreTs float64
69-
sqlDB.QueryRow(t, `SELECT cluster_logical_timestamp()`).Scan(&preRestoreTs)
69+
var preRestoreTs float64
70+
sqlDB.QueryRow(t, `SELECT cluster_logical_timestamp()`).Scan(&preRestoreTs)
7071

71-
bankOnlineRestore(t, rSQLDB, numAccounts, externalStorage)
72+
bankOnlineRestore(t, rSQLDB, numAccounts, externalStorage, blockingDownload)
7273

73-
fpSrc, err := fingerprintutils.FingerprintDatabase(ctx, tc.Conns[0], "data", fingerprintutils.Stripped())
74-
require.NoError(t, err)
75-
fpDst, err := fingerprintutils.FingerprintDatabase(ctx, rtc.Conns[0], "data", fingerprintutils.Stripped())
76-
require.NoError(t, err)
77-
require.NoError(t, fingerprintutils.CompareDatabaseFingerprints(fpSrc, fpDst))
74+
fpSrc, err := fingerprintutils.FingerprintDatabase(ctx, tc.Conns[0], "data", fingerprintutils.Stripped())
75+
require.NoError(t, err)
76+
fpDst, err := fingerprintutils.FingerprintDatabase(ctx, rtc.Conns[0], "data", fingerprintutils.Stripped())
77+
require.NoError(t, err)
78+
require.NoError(t, fingerprintutils.CompareDatabaseFingerprints(fpSrc, fpDst))
7879

79-
assertMVCCOnlineRestore(t, rSQLDB, preRestoreTs)
80-
assertOnlineRestoreWithRekeying(t, sqlDB, rSQLDB)
80+
assertMVCCOnlineRestore(t, rSQLDB, preRestoreTs)
81+
assertOnlineRestoreWithRekeying(t, sqlDB, rSQLDB)
8182

82-
waitForLatestDownloadJobToSucceed(t, rSQLDB)
83+
if !blockingDownload {
84+
waitForLatestDownloadJobToSucceed(t, rSQLDB)
85+
}
8386

84-
rSQLDB.CheckQueryResults(t, createStmt, createStmtRes)
85-
sqlDB.CheckQueryResults(t, jobutils.GetExternalBytesForConnectedTenant, [][]string{{"0"}})
87+
rSQLDB.CheckQueryResults(t, createStmt, createStmtRes)
88+
sqlDB.CheckQueryResults(t, jobutils.GetExternalBytesForConnectedTenant, [][]string{{"0"}})
8689

87-
rSQLDB.Exec(t, "DROP DATABASE data CASCADE")
90+
rSQLDB.Exec(t, "DROP DATABASE data CASCADE")
91+
})
8892
})
8993

9094
}
@@ -365,13 +369,21 @@ func TestOnlineRestoreErrors(t *testing.T) {
365369
}
366370

367371
func bankOnlineRestore(
368-
t *testing.T, sqlDB *sqlutils.SQLRunner, numAccounts int, externalStorage string,
372+
t *testing.T,
373+
sqlDB *sqlutils.SQLRunner,
374+
numAccounts int,
375+
externalStorage string,
376+
blockingDownload bool,
369377
) {
370378
// Create a table in the default database to force table id rewriting.
371379
sqlDB.Exec(t, "CREATE TABLE IF NOT EXISTS foo (i INT PRIMARY KEY, s STRING);")
372380

373381
sqlDB.Exec(t, "CREATE DATABASE data")
374-
sqlDB.Exec(t, fmt.Sprintf("RESTORE TABLE data.bank FROM LATEST IN '%s' WITH EXPERIMENTAL DEFERRED COPY", externalStorage))
382+
stmt := fmt.Sprintf("RESTORE TABLE data.bank FROM LATEST IN '%s' WITH EXPERIMENTAL DEFERRED COPY", externalStorage)
383+
if blockingDownload {
384+
stmt = fmt.Sprintf("RESTORE TABLE data.bank FROM LATEST IN '%s' WITH EXPERIMENTAL COPY", externalStorage)
385+
}
386+
sqlDB.Exec(t, stmt)
375387

376388
var restoreRowCount int
377389
sqlDB.QueryRow(t, "SELECT count(*) FROM data.bank").Scan(&restoreRowCount)

0 commit comments

Comments
 (0)