Skip to content

Commit 7220225

Browse files
craig[bot]andrew-r-thomas
andcommitted
Merge #159090
159090: backup: allow external connections to be used with online restore r=andrew-r-thomas a=andrew-r-thomas Previously, external connections were not supported in online restore due to external connections being unavailable during early boot. This patch materializes external connections into their underlying uris before the restore job accesses external storage. Fixes: #158511 Release note (backup change): external connections can now be used with online restore Co-authored-by: Andrew Thomas <andrew.thomas@cockroachlabs.com>
2 parents 3611b56 + c680174 commit 7220225

File tree

10 files changed

+197
-44
lines changed

10 files changed

+197
-44
lines changed

pkg/backup/BUILD.bazel

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -56,6 +56,7 @@ go_library(
5656
"//pkg/ccl/storageccl",
5757
"//pkg/cloud",
5858
"//pkg/cloud/cloudpb",
59+
"//pkg/cloud/externalconn",
5960
"//pkg/clusterversion",
6061
"//pkg/featureflag",
6162
"//pkg/jobs",

pkg/backup/backuptestutils/testutils.go

Lines changed: 22 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -8,6 +8,7 @@ package backuptestutils
88
import (
99
"context"
1010
gosql "database/sql"
11+
"fmt"
1112
"reflect"
1213
"strings"
1314
"testing"
@@ -48,6 +49,27 @@ const (
4849
// bugs in time-bound iterators. We disable this in race builds, which can
4950
// be too slow.
5051
var smallEngineBlocks = !util.RaceEnabled && metamorphic.ConstantWithTestBool("small-engine-blocks", false)
52+
var externalConnection = metamorphic.ConstantWithTestBool("external-connection", false)
53+
54+
// GetExternalStorageURI metamorphically chooses between baseURI,
55+
// and an external connection pointing to baseURI.
56+
// if external connection is chosen, it will apply `CREATE EXTERNAL CONNECTION` commands to dbs
57+
func GetExternalStorageURI(
58+
t *testing.T, baseURI string, extConnName string, dbs ...*sqlutils.SQLRunner,
59+
) string {
60+
uri := baseURI
61+
if externalConnection {
62+
for _, db := range dbs {
63+
db.Exec(t, fmt.Sprintf(
64+
"CREATE EXTERNAL CONNECTION '%s' AS '%s';",
65+
extConnName,
66+
baseURI,
67+
))
68+
}
69+
uri = "external://" + extConnName
70+
}
71+
return uri
72+
}
5173

5274
// InitManualReplication calls tc.ToggleReplicateQueues(false).
5375
//

pkg/backup/restore_online.go

Lines changed: 69 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -9,12 +9,15 @@ import (
99
"bytes"
1010
"context"
1111
"fmt"
12+
"net/url"
1213
"strings"
1314
"sync/atomic"
1415
"time"
1516

1617
"github.com/cockroachdb/cockroach/pkg/backup/backuppb"
1718
"github.com/cockroachdb/cockroach/pkg/backup/backupsink"
19+
"github.com/cockroachdb/cockroach/pkg/cloud"
20+
"github.com/cockroachdb/cockroach/pkg/cloud/externalconn"
1821
"github.com/cockroachdb/cockroach/pkg/clusterversion"
1922
"github.com/cockroachdb/cockroach/pkg/jobs"
2023
"github.com/cockroachdb/cockroach/pkg/jobs/jobspb"
@@ -306,6 +309,9 @@ func sendAddRemoteSSTWorker(
306309
) func(context.Context) error {
307310
return func(ctx context.Context) error {
308311
testingKnobs := execCtx.ExecCfg().BackupRestoreTestingKnobs
312+
313+
extConnCache := make(map[string]externalconn.ExternalConnection)
314+
309315
for entry := range restoreSpanEntriesCh {
310316
log.Dev.VInfof(ctx, 1, "starting restore of backed up span %s containing %d files", entry.Span, len(entry.Files))
311317

@@ -342,6 +348,41 @@ func sendAddRemoteSSTWorker(
342348

343349
log.Dev.VInfof(ctx, 1, "restoring span %s of file %s (file span: %s)", restoringSubspan, file.Path, file.BackupFileEntrySpan)
344350
file.BackupFileEntrySpan = restoringSubspan
351+
352+
// swap out any external connections for their underlying uris and revalidate
353+
uri, err := url.Parse(file.Dir.URI)
354+
if err != nil {
355+
return errors.Wrap(err, "invalid URI")
356+
}
357+
if uri.Scheme == externalconn.Scheme {
358+
var ec externalconn.ExternalConnection
359+
var ok bool
360+
if ec, ok = extConnCache[uri.Host]; !ok {
361+
if err := execCtx.ExecCfg().InternalDB.Txn(ctx, func(ctx context.Context, t isql.Txn) error {
362+
ec, err = externalconn.LoadExternalConnection(ctx, uri.Host, t)
363+
if err != nil {
364+
return err
365+
}
366+
extConnCache[uri.Host] = ec
367+
return nil
368+
}); err != nil {
369+
return errors.Wrap(err, "failed to load external connection")
370+
}
371+
}
372+
373+
materialized, err := externalconn.Materialize(ec, uri)
374+
if err != nil {
375+
return errors.Wrap(err, "failed to materialize external connection URI")
376+
}
377+
// revalidate in case the user changed the underlying uri in the external connection
378+
// to a non early boot uri since the job was created
379+
if err := cloud.SchemeSupportsEarlyBoot(materialized.Scheme); err != nil {
380+
return errors.Wrap(err, "backup URI not supported for online restore")
381+
}
382+
383+
file.Dir.URI = materialized.String()
384+
}
385+
345386
if err := sendRemoteAddSSTable(ctx, execCtx, file, entry.ElidedPrefix, fromSystemTenant); err != nil {
346387
return err
347388
}
@@ -1080,3 +1121,31 @@ func getNumOnlineRestoreLinkWorkers(ctx context.Context, execCtx sql.JobExecCont
10801121
numNodes := max(len(sqlInstanceIDs), 1)
10811122
return defaultLinkWorkersPerNode * numNodes, nil
10821123
}
1124+
1125+
// uriCompatibleWithOnlineRestore validates that a uri scheme is supported for early boot.
1126+
// additionally, if an external connection uri is passed, the underlying uri the
1127+
// external connection points to will be loaded from the system table and validated
1128+
func uriCompatibleWithOnlineRestore(ctx context.Context, txn isql.Txn, path string) error {
1129+
uri, err := url.Parse(path)
1130+
if err != nil {
1131+
return errors.Wrap(err, "failed to parse URI for online restore")
1132+
}
1133+
scheme := uri.Scheme
1134+
if scheme == externalconn.Scheme {
1135+
// online restore materializes external connections late and does not support certain schemes,
1136+
// so we need to validate that the underlying uri has a supported scheme
1137+
ec, err := externalconn.LoadExternalConnection(ctx, uri.Host, txn)
1138+
if err != nil {
1139+
return errors.Wrap(err, "failed to load external connection")
1140+
}
1141+
materialized, err := externalconn.Materialize(ec, uri)
1142+
if err != nil {
1143+
return err
1144+
}
1145+
scheme = materialized.Scheme
1146+
}
1147+
if err := cloud.SchemeSupportsEarlyBoot(scheme); err != nil {
1148+
return errors.Wrap(err, "backup URI not supported for online restore")
1149+
}
1150+
return nil
1151+
}

pkg/backup/restore_online_test.go

Lines changed: 48 additions & 27 deletions
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,7 @@ import (
1717
"sync/atomic"
1818
"testing"
1919

20+
"github.com/cockroachdb/cockroach/pkg/backup/backuptestutils"
2021
"github.com/cockroachdb/cockroach/pkg/base"
2122
"github.com/cockroachdb/cockroach/pkg/cloud/nodelocal"
2223
"github.com/cockroachdb/cockroach/pkg/jobs"
@@ -73,7 +74,7 @@ func TestOnlineRestoreBasic(t *testing.T) {
7374
rtc, rSQLDB, cleanupFnRestored := backupRestoreTestSetupEmpty(t, 1, dir, InitManualReplication, orParams)
7475
defer cleanupFnRestored()
7576

76-
externalStorage := "nodelocal://1/backup"
77+
externalStorage := backuptestutils.GetExternalStorageURI(t, "nodelocal://1/backup", "backup", sqlDB, rSQLDB)
7778

7879
createStmt := `SELECT create_statement FROM [SHOW CREATE TABLE data.bank]`
7980
createStmtRes := sqlDB.QueryStr(t, createStmt)
@@ -124,11 +125,12 @@ func TestOnlineRestoreRecovery(t *testing.T) {
124125

125126
const numAccounts = 1000
126127

127-
externalStorage := "nodelocal://1/backup"
128-
129128
_, sqlDB, _, cleanupFn := backupRestoreTestSetupWithParams(t, singleNode, numAccounts, InitManualReplication, orParams)
130129
defer cleanupFn()
131130

131+
trueExternalStorage := "nodelocal://1/backup"
132+
externalStorage := backuptestutils.GetExternalStorageURI(t, trueExternalStorage, "backup", sqlDB)
133+
132134
restoreToPausedDownloadJob := func(t *testing.T, newDBName string) int {
133135
defer func() {
134136
sqlDB.Exec(t, "SET CLUSTER SETTING jobs.debug.pausepoints = ''")
@@ -169,7 +171,7 @@ func TestOnlineRestoreRecovery(t *testing.T) {
169171
t.Run("delete file", func(t *testing.T) {
170172
dbName := "data_delete"
171173
downloadJobID := restoreToPausedDownloadJob(t, dbName)
172-
corruptBackup(t, sqlDB, tmpDir, externalStorage)
174+
corruptBackup(t, sqlDB, tmpDir, trueExternalStorage)
173175
sqlDB.ExpectErr(t, "no such file or directory", "SELECT count(*) FROM data_delete.bank")
174176
sqlDB.Exec(t, fmt.Sprintf("RESUME JOB %d", downloadJobID))
175177
jobutils.WaitForJobToFail(t, sqlDB, jobspb.JobID(downloadJobID))
@@ -200,7 +202,7 @@ func TestOnlineRestoreRecovery(t *testing.T) {
200202
var blockingJobID int
201203
sqlDB.QueryRow(t, fmt.Sprintf("RESTORE DATABASE data FROM LATEST IN '%s' WITH EXPERIMENTAL COPY, new_db_name=%s, detached", externalStorage, dbName)).Scan(&blockingJobID)
202204
jobutils.WaitForJobToPause(t, sqlDB, jobspb.JobID(blockingJobID))
203-
corruptBackup(t, sqlDB, tmpDir, externalStorage)
205+
corruptBackup(t, sqlDB, tmpDir, trueExternalStorage)
204206
sqlDB.Exec(t, "SET CLUSTER SETTING jobs.debug.pausepoints = ''")
205207
sqlDB.Exec(t, fmt.Sprintf("RESUME JOB %d", blockingJobID))
206208
jobutils.WaitForJobToFail(t, sqlDB, jobspb.JobID(blockingJobID))
@@ -220,11 +222,12 @@ func TestFullClusterOnlineRestoreRecovery(t *testing.T) {
220222

221223
const numAccounts = 1000
222224

223-
externalStorage := "nodelocal://1/backup"
224-
225225
_, sqlDB, _, cleanupFn := backupRestoreTestSetupWithParams(t, singleNode, numAccounts, InitManualReplication, orParams)
226226
defer cleanupFn()
227227

228+
trueExternalStorage := "nodelocal://1/backup"
229+
externalStorage := backuptestutils.GetExternalStorageURI(t, trueExternalStorage, "backup", sqlDB)
230+
228231
sqlDB.Exec(t, "SET CLUSTER SETTING jobs.debug.pausepoints = 'restore.before_download'")
229232
sqlDB.Exec(t, fmt.Sprintf("BACKUP INTO '%s'", externalStorage))
230233

@@ -248,7 +251,7 @@ func TestFullClusterOnlineRestoreRecovery(t *testing.T) {
248251
var downloadJobID jobspb.JobID
249252
sqlDB.QueryRow(t, latestDownloadJobIDQuery).Scan(&downloadJobID)
250253
jobutils.WaitForJobToPause(t, sqlDB, downloadJobID)
251-
corruptBackup(t, sqlDB, tmpDir, externalStorage)
254+
corruptBackup(t, sqlDB, tmpDir, trueExternalStorage)
252255
sqlDB.Exec(t, "SET CLUSTER SETTING jobs.debug.pausepoints = ''")
253256
sqlDB.Exec(t, fmt.Sprintf("RESUME JOB %d", downloadJobID))
254257
jobutils.WaitForJobToFail(t, sqlDB, downloadJobID)
@@ -292,13 +295,17 @@ func TestOnlineRestorePartitioned(t *testing.T) {
292295
)
293296
defer cleanupFn()
294297

295-
sqlDB.Exec(t, `BACKUP DATABASE data INTO ('nodelocal://1/a?COCKROACH_LOCALITY=default',
296-
'nodelocal://1/b?COCKROACH_LOCALITY=dc%3Ddc2',
297-
'nodelocal://1/c?COCKROACH_LOCALITY=dc%3Ddc3')`)
298+
a := backuptestutils.GetExternalStorageURI(t, "nodelocal://1/a", "conn-a", sqlDB) + "?COCKROACH_LOCALITY=default"
299+
b := backuptestutils.GetExternalStorageURI(t, "nodelocal://1/b", "conn-b", sqlDB) + "?COCKROACH_LOCALITY=dc%3Ddc2"
300+
c := backuptestutils.GetExternalStorageURI(t, "nodelocal://1/c", "conn-c", sqlDB) + "?COCKROACH_LOCALITY=dc%3Ddc3"
298301

299-
j := sqlDB.QueryStr(t, `RESTORE DATABASE data FROM LATEST IN ('nodelocal://1/a?COCKROACH_LOCALITY=default',
300-
'nodelocal://1/b?COCKROACH_LOCALITY=dc%3Ddc2',
301-
'nodelocal://1/c?COCKROACH_LOCALITY=dc%3Ddc3') WITH new_db_name='d2', EXPERIMENTAL DEFERRED COPY`)
302+
sqlDB.Exec(t, fmt.Sprintf("BACKUP DATABASE data INTO ('%s', '%s', '%s')", a, b, c))
303+
304+
j := sqlDB.QueryStr(t, fmt.Sprintf(
305+
`RESTORE DATABASE data FROM LATEST IN ('%s', '%s', '%s')
306+
WITH new_db_name='d2', EXPERIMENTAL DEFERRED COPY`,
307+
a, b, c,
308+
))
302309

303310
srv.Servers[0].JobRegistry().(*jobs.Registry).TestingNudgeAdoptionQueue()
304311

@@ -321,11 +328,14 @@ func TestOnlineRestoreLinkCheckpoint(t *testing.T) {
321328
orParams,
322329
)
323330
defer cleanupFn()
324-
sqlDB.Exec(t, "BACKUP DATABASE data INTO $1", localFoo)
331+
332+
externalStorage := backuptestutils.GetExternalStorageURI(t, "nodelocal://1/backup", "backup", sqlDB)
333+
334+
sqlDB.Exec(t, "BACKUP DATABASE data INTO $1", externalStorage)
325335
sqlDB.Exec(t, "SET CLUSTER SETTING jobs.debug.pausepoints = 'restore.before_publishing_descriptors'")
326336
var jobID jobspb.JobID
327337
stmt := fmt.Sprintf("RESTORE DATABASE data FROM LATEST IN $1 WITH OPTIONS (new_db_name='data2', %s, detached)", onlineImpl(rng))
328-
sqlDB.QueryRow(t, stmt, localFoo).Scan(&jobID)
338+
sqlDB.QueryRow(t, stmt, externalStorage).Scan(&jobID)
329339
jobutils.WaitForJobToPause(t, sqlDB, jobID)
330340

331341
// Set a pauspoint during the link phase which should not get hit because of
@@ -354,18 +364,20 @@ func TestOnlineRestoreStatementResult(t *testing.T) {
354364
)
355365
defer cleanupFn()
356366

367+
externalStorage := backuptestutils.GetExternalStorageURI(t, "nodelocal://1/backup", "backup", sqlDB)
368+
357369
sqlDB.ExecMultiple(
358370
t,
359371
"USE data",
360372
"CREATE TABLE foo (x INT PRIMARY KEY, y INT)",
361373
"INSERT INTO foo VALUES (1, 2)",
362374
)
363-
sqlDB.Exec(t, "BACKUP DATABASE data INTO $1", localFoo)
375+
sqlDB.Exec(t, "BACKUP DATABASE data INTO $1", externalStorage)
364376

365377
rows := sqlDB.Query(
366378
t,
367379
"RESTORE DATABASE data FROM LATEST IN $1 WITH OPTIONS (new_db_name='data2', experimental deferred copy)",
368-
localFoo,
380+
externalStorage,
369381
)
370382
columns, err := rows.Columns()
371383
if err != nil {
@@ -419,7 +431,7 @@ func TestOnlineRestoreWaitForDownload(t *testing.T) {
419431
},
420432
})
421433
defer cleanupFn()
422-
externalStorage := "nodelocal://1/backup"
434+
externalStorage := backuptestutils.GetExternalStorageURI(t, "nodelocal://1/backup", "backup", sqlDB)
423435

424436
sqlDB.Exec(t, fmt.Sprintf("BACKUP INTO '%s'", externalStorage))
425437

@@ -445,8 +457,6 @@ func TestOnlineRestoreTenant(t *testing.T) {
445457

446458
defer nodelocal.ReplaceNodeLocalForTesting(t.TempDir())()
447459

448-
externalStorage := "nodelocal://1/backup"
449-
450460
params := base.TestClusterArgs{ServerArgs: base.TestServerArgs{
451461
Knobs: base.TestingKnobs{
452462
JobsTestingKnobs: jobs.NewTestingKnobsWithShortIntervals(),
@@ -467,6 +477,8 @@ func TestOnlineRestoreTenant(t *testing.T) {
467477
defer cleanupFn()
468478
srv := tc.Server(0)
469479

480+
externalStorage := backuptestutils.GetExternalStorageURI(t, "nodelocal://1/backup", "backup", systemDB)
481+
470482
_ = securitytest.EmbeddedTenantIDs()
471483

472484
_, conn10 := serverutils.StartTenant(t, srv, base.TestTenantArgs{TenantID: roachpb.MustMakeTenantID(10)})
@@ -479,6 +491,9 @@ func TestOnlineRestoreTenant(t *testing.T) {
479491
restoreTC, rSQLDB, cleanupFnRestored := backupRestoreTestSetupEmpty(t, 1, dir, InitManualReplication, params)
480492
defer cleanupFnRestored()
481493

494+
// just using this to run CREATE EXTERNAL CONNECTION on the recovery db if we're using one
495+
_ = backuptestutils.GetExternalStorageURI(t, "nodelocal://1/backup", "backup", rSQLDB)
496+
482497
systemDB.Exec(t, fmt.Sprintf(`BACKUP TENANT 10 INTO '%s'`, externalStorage))
483498

484499
if incremental {
@@ -555,9 +570,15 @@ func TestOnlineRestoreErrors(t *testing.T) {
555570
defer cleanupFnRestored()
556571
rSQLDB.Exec(t, "CREATE DATABASE data")
557572
var (
558-
fullBackup = "nodelocal://1/full-backup"
559-
fullBackupWithRevs = "nodelocal://1/full-backup-with-revs"
560-
incrementalBackupWithRevs = "nodelocal://1/incremental-backup-with-revs"
573+
fullBackup = backuptestutils.GetExternalStorageURI(
574+
t, "nodelocal://1/full-backup", "full-backup", sqlDB, rSQLDB,
575+
)
576+
fullBackupWithRevs = backuptestutils.GetExternalStorageURI(
577+
t, "nodelocal://1/full-backup-with-revs", "full-backup-with-revs", sqlDB, rSQLDB,
578+
)
579+
incrementalBackupWithRevs = backuptestutils.GetExternalStorageURI(
580+
t, "nodelocal://1/incremental-backup-with-revs", "incremental-backup-with-revs", sqlDB, rSQLDB,
581+
)
561582
)
562583
t.Run("full backups with revision history are unsupported", func(t *testing.T) {
563584
var systemTime string
@@ -625,7 +646,7 @@ func TestOnlineRestoreRetryingDownloadRequests(t *testing.T) {
625646
)
626647
defer cleanupFn()
627648

628-
externalStorage := "nodelocal://1/backup"
649+
externalStorage := backuptestutils.GetExternalStorageURI(t, "nodelocal://1/backup", "backup", sqlDB)
629650
sqlDB.Exec(t, fmt.Sprintf("BACKUP INTO '%s'", externalStorage))
630651
sqlDB.Exec(
631652
t,
@@ -684,7 +705,7 @@ func TestOnlineRestoreDownloadRetryReset(t *testing.T) {
684705
)
685706
defer cleanupFn()
686707

687-
externalStorage := "nodelocal://1/backup"
708+
externalStorage := backuptestutils.GetExternalStorageURI(t, "nodelocal://1/backup", "backup", sqlDB)
688709
sqlDB.Exec(t, fmt.Sprintf("BACKUP INTO '%s'", externalStorage))
689710
sqlDB.Exec(
690711
t,
@@ -769,7 +790,7 @@ func TestOnlineRestoreFailScatterNonEmptyRanges(t *testing.T) {
769790
)
770791
defer cleanupFn()
771792

772-
externalStorage := "nodelocal://1/backup"
793+
externalStorage := backuptestutils.GetExternalStorageURI(t, "nodelocal://1/backup", "backup", sqlDB)
773794
sqlDB.Exec(t, fmt.Sprintf("BACKUP INTO '%s'", externalStorage))
774795

775796
var linkJobID jobspb.JobID

pkg/backup/restore_planning.go

Lines changed: 9 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -1686,6 +1686,15 @@ func doRestorePlan(
16861686

16871687
var fullyResolvedSubdir string
16881688

1689+
if restoreStmt.Options.OnlineImpl() {
1690+
// validate that from uris are allowed in online restore
1691+
for _, path := range from {
1692+
if err := uriCompatibleWithOnlineRestore(ctx, p.InternalSQLTxn(), path); err != nil {
1693+
return err
1694+
}
1695+
}
1696+
}
1697+
16891698
defaultCollectionURI, _, err := backupdest.GetURIsByLocalityKV(from, "")
16901699
if err != nil {
16911700
return err
@@ -1801,13 +1810,6 @@ func doRestorePlan(
18011810
if err != nil {
18021811
return err
18031812
}
1804-
if restoreStmt.Options.OnlineImpl() {
1805-
for _, uri := range defaultURIs {
1806-
if err := cloud.SchemeSupportsEarlyBoot(uri); err != nil {
1807-
return errors.Wrap(err, "backup URI not supported for online restore")
1808-
}
1809-
}
1810-
}
18111813

18121814
defer func() {
18131815
mem.Shrink(ctx, memReserved)

0 commit comments

Comments
 (0)