Skip to content

Commit 298633d

Browse files
craig[bot]kev-caomsbutler
committed
153065: restore: scale number of OR link workers to number of nodes r=msbutler a=kev-cao This patch teaches online restore to scale the number of link workers to the number of nodes in the cluster instead of statically setting it to 32. In the new behavior, the default value for `backup.restore.online_worker_count` is 0, which sets the number of link workers to two times the number of nodes. Epic: CRDB-48786 Fixes: #146584 153380: sql: fix node panic on SIGINT during CHECK EXTERNAL CONNECTION r=jeffswenson a=kev-cao `CHECK EXTERNAL CONNECTION` defers a close of the `rows` channel as part of its execution. It also closes that same channel in its `Close` method. Under normal execution, if `CHECK EXTERNAL CONNECTION` is allowed to complete, that `rows` channel is set to `nil`, so `Close` skips the attempt to close to the channel. However, if a `SIGINT` is sent during the execution to cancel the query, `rows` is never set to `nil` and `Close` will attempt to close a closed channel, causing a node panic. Epic: None Release note (bug fix): Fixed a bug introduced in v25.1.0 that would cause a node panic if a `SIGINT` signal was sent during the execution of a `CHECK EXTERNAL CONNECTION` command. 153470: backupdest: attempt to close all stores in MakeBackupDestinationStores cleanupFn r=kev-cao a=msbutler Epic: none Release note: none Co-authored-by: Kevin Cao <[email protected]> Co-authored-by: Michael Butler <[email protected]>
4 parents 6525c6a + 922c9f7 + 8285422 + c4fdf87 commit 298633d

File tree

8 files changed

+81
-100
lines changed

8 files changed

+81
-100
lines changed

pkg/backup/backupdest/backup_destination.go

Lines changed: 28 additions & 24 deletions
Original file line numberDiff line numberDiff line change
@@ -557,8 +557,6 @@ func ResolveBackupManifests(
557557
mem *mon.BoundAccount,
558558
defaultCollectionURI string,
559559
collectionURIs []string,
560-
baseStores []cloud.ExternalStorage,
561-
incStores []cloud.ExternalStorage,
562560
mkStore cloud.ExternalStorageFromURIFactory,
563561
resolvedSubdir string,
564562
fullyResolvedBaseDirectory []string,
@@ -590,7 +588,7 @@ func ResolveBackupManifests(
590588

591589
if !ReadBackupIndexEnabled.Get(&execCfg.Settings.SV) || !exists || isCustomIncLocation {
592590
return legacyResolveBackupManifests(
593-
ctx, execCfg, mem, defaultCollectionURI, baseStores, incStores, mkStore,
591+
ctx, execCfg, mem, defaultCollectionURI, mkStore,
594592
resolvedSubdir, fullyResolvedBaseDirectory, fullyResolvedIncrementalsDirectory,
595593
endTime, encryption, kmsEnv, user, includeSkipped, includeCompacted,
596594
)
@@ -609,8 +607,6 @@ func legacyResolveBackupManifests(
609607
execCfg *sql.ExecutorConfig,
610608
mem *mon.BoundAccount,
611609
defaultCollectionURI string,
612-
baseStores []cloud.ExternalStorage,
613-
incStores []cloud.ExternalStorage,
614610
mkStore cloud.ExternalStorageFromURIFactory,
615611
resolvedSubdir string,
616612
fullyResolvedBaseDirectory []string,
@@ -637,6 +633,27 @@ func legacyResolveBackupManifests(
637633
mem.Shrink(ctx, ownedMemSize)
638634
}
639635
}()
636+
637+
baseStores, baseCleanupFn, err := MakeBackupDestinationStores(ctx, user, mkStore, fullyResolvedBaseDirectory)
638+
if err != nil {
639+
return nil, nil, nil, 0, err
640+
}
641+
defer func() {
642+
if err := baseCleanupFn(); err != nil {
643+
log.Dev.Warningf(ctx, "failed to close base store: %+v", err)
644+
}
645+
}()
646+
647+
incStores, incCleanupFn, err := MakeBackupDestinationStores(ctx, user, mkStore, fullyResolvedIncrementalsDirectory)
648+
if err != nil {
649+
return nil, nil, nil, 0, err
650+
}
651+
defer func() {
652+
if err := incCleanupFn(); err != nil {
653+
log.Dev.Warningf(ctx, "failed to close inc store: %+v", err)
654+
}
655+
}()
656+
640657
baseManifest, memSize, err := backupinfo.ReadBackupManifestFromStore(ctx, mem, baseStores[0], fullyResolvedBaseDirectory[0],
641658
encryption, kmsEnv)
642659
if err != nil {
@@ -646,11 +663,6 @@ func legacyResolveBackupManifests(
646663

647664
var incrementalBackups []string
648665
if len(incStores) > 0 {
649-
rootStore, err := mkStore(ctx, defaultCollectionURI, user)
650-
if err != nil {
651-
return nil, nil, nil, 0, err
652-
}
653-
defer rootStore.Close()
654666
incrementalBackups, err = LegacyFindPriorBackups(ctx, incStores[0], includeManifest)
655667
if err != nil {
656668
return nil, nil, nil, 0, err
@@ -771,23 +783,15 @@ func indexedResolveBackupManifests(
771783
ctx, sp := tracing.ChildSpan(ctx, "backupdest.ResolveBackupManifestsWithIndexes")
772784
defer sp.Finish()
773785

774-
rootStores := make([]cloud.ExternalStorage, 0, len(collectionURIs))
786+
rootStores, rootCleanupFn, err := MakeBackupDestinationStores(ctx, user, mkStore, collectionURIs)
787+
if err != nil {
788+
return nil, nil, nil, 0, err
789+
}
775790
defer func() {
776-
for _, store := range rootStores {
777-
if store != nil {
778-
if err := store.Close(); err != nil {
779-
log.Dev.Errorf(ctx, "error closing store: %v", err)
780-
}
781-
}
791+
if err := rootCleanupFn(); err != nil {
792+
log.Dev.Warningf(ctx, "failed to close collection store: %s", err)
782793
}
783794
}()
784-
for _, uri := range collectionURIs {
785-
store, err := mkStore(ctx, uri, user)
786-
if err != nil {
787-
return nil, nil, nil, 0, err
788-
}
789-
rootStores = append(rootStores, store)
790-
}
791795

792796
indexes, err := GetBackupTreeIndexMetadata(
793797
ctx, rootStores[0], resolvedSubdir,

pkg/backup/backupdest/backup_destination_test.go

Lines changed: 0 additions & 20 deletions
Original file line numberDiff line numberDiff line change
@@ -396,31 +396,13 @@ func TestResolveBackupManifests(t *testing.T) {
396396
})
397397
require.NoError(t, err)
398398

399-
baseStores, err := util.MapE(fullyResolvedBaseDirs, func(uri string) (cloud.ExternalStorage, error) {
400-
return execCfg.DistSQLSrv.ExternalStorageFromURI(ctx, uri, username.RootUserName())
401-
})
402-
require.NoError(t, err)
403-
for i := range baseStores {
404-
defer baseStores[i].Close()
405-
}
406-
407-
incStores, err := util.MapE(fullyResolvedIncDirs, func(uri string) (cloud.ExternalStorage, error) {
408-
return execCfg.DistSQLSrv.ExternalStorageFromURI(ctx, uri, username.RootUserName())
409-
})
410-
require.NoError(t, err)
411-
for i := range incStores {
412-
defer incStores[i].Close()
413-
}
414-
415399
t.Run("resolve backup manifests with latest AOST", func(t *testing.T) {
416400
uris, manifests, locality, memSize, err := ResolveBackupManifests(
417401
ctx,
418402
&execCfg,
419403
&mem,
420404
collections[0],
421405
collections,
422-
baseStores,
423-
incStores,
424406
execCfg.DistSQLSrv.ExternalStorageFromURI,
425407
fullSubdir,
426408
fullyResolvedBaseDirs,
@@ -448,8 +430,6 @@ func TestResolveBackupManifests(t *testing.T) {
448430
&mem,
449431
collections[0],
450432
collections,
451-
baseStores,
452-
incStores,
453433
execCfg.DistSQLSrv.ExternalStorageFromURI,
454434
fullSubdir,
455435
fullyResolvedBaseDirs,

pkg/backup/backupdest/incrementals.go

Lines changed: 8 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -216,23 +216,24 @@ func MakeBackupDestinationStores(
216216
mkStore cloud.ExternalStorageFromURIFactory,
217217
destinationDirs []string,
218218
) ([]cloud.ExternalStorage, func() error, error) {
219-
incStores := make([]cloud.ExternalStorage, len(destinationDirs))
219+
stores := make([]cloud.ExternalStorage, len(destinationDirs))
220220
for i := range destinationDirs {
221221
store, err := mkStore(ctx, destinationDirs[i], user)
222222
if err != nil {
223223
return nil, nil, errors.Wrapf(err, "failed to open backup storage location")
224224
}
225-
incStores[i] = store
225+
stores[i] = store
226226
}
227227

228-
return incStores, func() error {
229-
// Close all the incremental stores in the returned cleanup function.
230-
for _, store := range incStores {
228+
return stores, func() error {
229+
// Close all the stores in the returned cleanup function.
230+
var combinedErr error
231+
for _, store := range stores {
231232
if err := store.Close(); err != nil {
232-
return err
233+
combinedErr = errors.CombineErrors(combinedErr, err)
233234
}
234235
}
235-
return nil
236+
return combinedErr
236237
}, nil
237238
}
238239

pkg/backup/compaction_job.go

Lines changed: 1 addition & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -745,17 +745,6 @@ func getBackupChain(
745745
log.Dev.Warningf(ctx, "failed to cleanup base backup stores: %+v", err)
746746
}
747747
}()
748-
incStores, incCleanup, err := backupdest.MakeBackupDestinationStores(
749-
ctx, user, mkStore, resolvedIncDirs,
750-
)
751-
if err != nil {
752-
return nil, nil, nil, nil, err
753-
}
754-
defer func() {
755-
if err := incCleanup(); err != nil {
756-
log.Dev.Warningf(ctx, "failed to cleanup incremental backup stores: %+v", err)
757-
}
758-
}()
759748
baseEncryptionInfo := encryptionOpts
760749
if encryptionOpts != nil && !encryptionOpts.HasKey() {
761750
baseEncryptionInfo, err = backupencryption.GetEncryptionFromBaseStore(
@@ -770,7 +759,7 @@ func getBackupChain(
770759
defer mem.Close(ctx)
771760

772761
_, manifests, localityInfo, memReserved, err := backupdest.ResolveBackupManifests(
773-
ctx, execCfg, &mem, defaultCollectionURI, dest.To, baseStores, incStores, mkStore, resolvedSubdir,
762+
ctx, execCfg, &mem, defaultCollectionURI, dest.To, mkStore, resolvedSubdir,
774763
resolvedBaseDirs, resolvedIncDirs, endTime, baseEncryptionInfo, kmsEnv,
775764
user, false /*includeSkipped */, true /*includeCompacted */, len(dest.IncrementalStorage) > 0,
776765
)

pkg/backup/restore_online.go

Lines changed: 31 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -46,12 +46,14 @@ import (
4646
"github.com/cockroachdb/errors"
4747
)
4848

49-
var onlineRestoreLinkWorkers = settings.RegisterByteSizeSetting(
49+
const defaultLinkWorkersPerNode = 2
50+
51+
var onlineRestoreLinkWorkers = settings.RegisterIntSetting(
5052
settings.ApplicationLevel,
5153
"backup.restore.online_worker_count",
52-
"workers to use for online restore link phase",
53-
32,
54-
settings.PositiveInt,
54+
"total workers to use for online restore link phase (defaults to 2x number of nodes if set to 0)",
55+
0,
56+
settings.NonNegativeInt,
5557
)
5658

5759
var onlineRestoreLayerLimit = settings.RegisterIntSetting(
@@ -83,7 +85,10 @@ func splitAndScatter(
8385

8486
log.Dev.Infof(ctx, "splitting and scattering spans")
8587

86-
workers := int(onlineRestoreLinkWorkers.Get(&execCtx.ExecCfg().Settings.SV))
88+
workers, err := getNumOnlineRestoreLinkWorkers(ctx, execCtx)
89+
if err != nil {
90+
return err
91+
}
8792
toScatter := make(chan execinfrapb.RestoreSpanEntry, 1)
8893
toSplit := make(chan execinfrapb.RestoreSpanEntry, workers)
8994

@@ -270,7 +275,10 @@ func linkExternalFiles(
270275

271276
log.Dev.Infof(ctx, "ingesting remote files")
272277

273-
workers := int(onlineRestoreLinkWorkers.Get(&execCtx.ExecCfg().Settings.SV))
278+
workers, err := getNumOnlineRestoreLinkWorkers(ctx, execCtx)
279+
if err != nil {
280+
return 0, 0, err
281+
}
274282

275283
grp := ctxgroup.WithContext(ctx)
276284
ch := make(chan execinfrapb.RestoreSpanEntry, workers)
@@ -1039,3 +1047,20 @@ func (r *restoreResumer) maybeCleanupFailedOnlineRestore(
10391047

10401048
return unstickRestoreSpans(ctx, p.ExecCfg(), details.DownloadSpans)
10411049
}
1050+
1051+
// getNumOnlineRestoreLinkWorkers returns the total number of workers to use for
1052+
// the link phase of an online restore.
1053+
func getNumOnlineRestoreLinkWorkers(ctx context.Context, execCtx sql.JobExecContext) (int, error) {
1054+
if workers := onlineRestoreLinkWorkers.Get(&execCtx.ExecCfg().Settings.SV); workers > 0 {
1055+
return int(workers), nil
1056+
}
1057+
// All nodes are used in a restore
1058+
_, sqlInstanceIDs, err := execCtx.ExecCfg().DistSQLPlanner.SetupAllNodesPlanning(
1059+
ctx, execCtx.ExtendedEvalContext(), execCtx.ExecCfg(),
1060+
)
1061+
if err != nil {
1062+
return 0, err
1063+
}
1064+
numNodes := min(len(sqlInstanceIDs), 1)
1065+
return defaultLinkWorkersPerNode * numNodes, nil
1066+
}

pkg/backup/restore_planning.go

Lines changed: 2 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -1738,18 +1738,7 @@ func doRestorePlan(
17381738
}
17391739
defer func() {
17401740
if err := cleanupFn(); err != nil {
1741-
log.Dev.Warningf(ctx, "failed to close incremental store: %+v", err)
1742-
}
1743-
}()
1744-
1745-
incStores, cleanupFn, err := backupdest.MakeBackupDestinationStores(ctx, p.User(), mkStore,
1746-
fullyResolvedIncrementalsDirectory)
1747-
if err != nil {
1748-
return err
1749-
}
1750-
defer func() {
1751-
if err := cleanupFn(); err != nil {
1752-
log.Dev.Warningf(ctx, "failed to close incremental store: %+v", err)
1741+
log.Dev.Warningf(ctx, "failed to close base store: %+v", err)
17531742
}
17541743
}()
17551744

@@ -1805,7 +1794,7 @@ func doRestorePlan(
18051794
// directories, return the URIs and manifests of all backup layers in all
18061795
// localities. Incrementals will be searched for automatically.
18071796
defaultURIs, mainBackupManifests, localityInfo, memReserved, err := backupdest.ResolveBackupManifests(
1808-
ctx, p.ExecCfg(), &mem, defaultCollectionURI, from, baseStores, incStores, mkStore,
1797+
ctx, p.ExecCfg(), &mem, defaultCollectionURI, from, mkStore,
18091798
fullyResolvedSubdir, fullyResolvedBaseDirectory, fullyResolvedIncrementalsDirectory, endTime,
18101799
encryption, &kmsEnv, p.User(), false, includeCompacted, len(incFrom) > 0,
18111800
)

pkg/backup/show.go

Lines changed: 1 addition & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -346,20 +346,10 @@ you must pass the 'encryption_info_dir' parameter that points to the directory o
346346
info.enc = encryption
347347

348348
mkStore := p.ExecCfg().DistSQLSrv.ExternalStorageFromURI
349-
incStores, cleanupFn, err := backupdest.MakeBackupDestinationStores(ctx, p.User(), mkStore,
350-
fullyResolvedIncrementalsDirectory)
351-
if err != nil {
352-
return err
353-
}
354-
defer func() {
355-
if err := cleanupFn(); err != nil {
356-
log.Dev.Warningf(ctx, "failed to close incremental store: %+v", err)
357-
}
358-
}()
359349

360350
info.defaultURIs, info.manifests, info.localityInfo, memReserved,
361351
err = backupdest.ResolveBackupManifests(
362-
ctx, p.ExecCfg(), &mem, defaultCollectionURI, dest, baseStores, incStores, mkStore, subdir,
352+
ctx, p.ExecCfg(), &mem, defaultCollectionURI, dest, mkStore, subdir,
363353
fullyResolvedDest, fullyResolvedIncrementalsDirectory, hlc.Timestamp{},
364354
encryption, &kmsEnv, p.User(), true /* includeSkipped */, true, /* includeCompacted */
365355
len(explicitIncPaths) > 0,

pkg/sql/check_external_connection.go

Lines changed: 10 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -13,6 +13,7 @@ import (
1313
"github.com/cockroachdb/cockroach/pkg/sql/execinfrapb"
1414
"github.com/cockroachdb/cockroach/pkg/sql/physicalplan"
1515
"github.com/cockroachdb/cockroach/pkg/sql/sem/tree"
16+
"github.com/cockroachdb/cockroach/pkg/util/ctxgroup"
1617
"github.com/cockroachdb/cockroach/pkg/util/humanizeutil"
1718
"github.com/cockroachdb/cockroach/pkg/util/tracing"
1819
"github.com/cockroachdb/errors"
@@ -26,6 +27,7 @@ func (p *planner) CheckExternalConnection(
2627

2728
type checkExternalConnectionNode struct {
2829
zeroInputPlanNode
30+
execGrp ctxgroup.Group
2931
node *tree.CheckExternalConnection
3032
loc string
3133
params CloudCheckParams
@@ -101,7 +103,9 @@ func (n *checkExternalConnectionNode) startExec(params runParams) error {
101103
return nil
102104
})
103105

104-
go func() {
106+
grp := ctxgroup.WithContext(ctx)
107+
n.execGrp = grp
108+
grp.GoCtx(func(ctx context.Context) error {
105109
recv := MakeDistSQLReceiver(
106110
ctx,
107111
rowWriter,
@@ -117,7 +121,9 @@ func (n *checkExternalConnectionNode) startExec(params runParams) error {
117121
// Copy the eval.Context, as dsp.Run() might change it.
118122
evalCtxCopy := params.extendedEvalCtx.Context.Copy()
119123
dsp.Run(ctx, planCtx, nil, plan, recv, evalCtxCopy, nil /* finishedSetupFn */)
120-
}()
124+
return nil
125+
})
126+
121127
return nil
122128
}
123129

@@ -130,7 +136,6 @@ func (n *checkExternalConnectionNode) Next(params runParams) (bool, error) {
130136
return false, params.ctx.Err()
131137
case row, more := <-n.rows:
132138
if !more {
133-
n.rows = nil
134139
return false, nil
135140
}
136141
n.row = row
@@ -143,10 +148,8 @@ func (n *checkExternalConnectionNode) Values() tree.Datums {
143148
}
144149

145150
func (n *checkExternalConnectionNode) Close(_ context.Context) {
146-
if n.rows != nil {
147-
close(n.rows)
148-
n.rows = nil
149-
}
151+
_ = n.execGrp.Wait()
152+
n.rows = nil
150153
}
151154

152155
func (n *checkExternalConnectionNode) parseParams(params runParams) error {

0 commit comments

Comments
 (0)