Skip to content

Commit 7a28a7b

Browse files
craig[bot]kev-cao
andcommitted
Merge #144516
144516: backup: integrate hot loop AC into backup compactions r=msbutler a=kev-cao Fixes: #143903 Release note: None Co-authored-by: Kevin Cao <[email protected]>
2 parents c194f4a + e7b5293 commit 7a28a7b

File tree

4 files changed

+42
-26
lines changed

4 files changed

+42
-26
lines changed

pkg/backup/backup_processor.go

Lines changed: 27 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -444,23 +444,9 @@ func runBackupProcessor(
444444

445445
readTime := spec.BackupEndTime.GoTime()
446446

447-
// Passing a nil pacer is effectively a noop if CPU control is disabled.
448-
var pacer *admission.Pacer = nil
449-
if fileSSTSinkElasticCPUControlEnabled.Get(&clusterSettings.SV) {
450-
tenantID, ok := roachpb.ClientTenantFromContext(ctx)
451-
if !ok {
452-
tenantID = roachpb.SystemTenantID
453-
}
454-
pacer = flowCtx.Cfg.AdmissionPacerFactory.NewPacer(
455-
100*time.Millisecond,
456-
admission.WorkInfo{
457-
TenantID: tenantID,
458-
Priority: admissionpb.BulkNormalPri,
459-
CreateTime: timeutil.Now().UnixNano(),
460-
BypassAdmission: false,
461-
},
462-
)
463-
}
447+
pacer := newBackupPacer(
448+
ctx, flowCtx.Cfg.AdmissionPacerFactory, clusterSettings,
449+
)
464450
// It is safe to close a nil pacer.
465451
defer pacer.Close()
466452

@@ -774,6 +760,30 @@ func logClose(ctx context.Context, c io.Closer, desc string) {
774760
}
775761
}
776762

763+
// newBackupPacer creates a new AC pacer for backup. It may return nil if CPU
764+
// control is disabled, which is effectively a noop.
765+
func newBackupPacer(
766+
ctx context.Context, factory admission.PacerFactory, settings *cluster.Settings,
767+
) *admission.Pacer {
768+
var pacer *admission.Pacer
769+
if fileSSTSinkElasticCPUControlEnabled.Get(&settings.SV) {
770+
tenantID, ok := roachpb.ClientTenantFromContext(ctx)
771+
if !ok {
772+
tenantID = roachpb.SystemTenantID
773+
}
774+
pacer = factory.NewPacer(
775+
100*time.Millisecond,
776+
admission.WorkInfo{
777+
TenantID: tenantID,
778+
Priority: admissionpb.BulkNormalPri,
779+
CreateTime: timeutil.Now().UnixNano(),
780+
BypassAdmission: false,
781+
},
782+
)
783+
}
784+
return pacer
785+
}
786+
777787
func init() {
778788
rowexec.NewBackupDataProcessor = newBackupDataProcessor
779789
}

pkg/backup/backupsink/sst_sink_key_writer.go

Lines changed: 2 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -15,7 +15,6 @@ import (
1515
"github.com/cockroachdb/cockroach/pkg/roachpb"
1616
"github.com/cockroachdb/cockroach/pkg/sql/execinfrapb"
1717
"github.com/cockroachdb/cockroach/pkg/storage"
18-
"github.com/cockroachdb/cockroach/pkg/util/admission"
1918
"github.com/cockroachdb/cockroach/pkg/util/log"
2019
"github.com/cockroachdb/errors"
2120
)
@@ -32,14 +31,12 @@ type SSTSinkKeyWriter struct {
3231
targetFileSize int64
3332
}
3433

35-
func MakeSSTSinkKeyWriter(
36-
conf SSTSinkConf, dest cloud.ExternalStorage, pacer *admission.Pacer,
37-
) (*SSTSinkKeyWriter, error) {
34+
func MakeSSTSinkKeyWriter(conf SSTSinkConf, dest cloud.ExternalStorage) (*SSTSinkKeyWriter, error) {
3835
if conf.ElideMode == execinfrapb.ElidePrefix_None {
3936
return nil, errors.New("KeyWriter does not support ElidePrefix_None")
4037
}
4138
return &SSTSinkKeyWriter{
42-
FileSSTSink: *MakeFileSSTSink(conf, dest, pacer),
39+
FileSSTSink: *MakeFileSSTSink(conf, dest, nil),
4340
targetFileSize: targetFileSize.Get(conf.Settings),
4441
}, nil
4542
}

pkg/backup/backupsink/sst_sink_key_writer_test.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -485,7 +485,7 @@ func sstSinkKeyWriterTestSetup(
485485
t *testing.T, settings *cluster.Settings, elideMode execinfrapb.ElidePrefix,
486486
) (*SSTSinkKeyWriter, cloud.ExternalStorage) {
487487
conf, store := sinkTestSetup(t, settings, elideMode)
488-
sink, err := MakeSSTSinkKeyWriter(conf, store, nil /* pacer */)
488+
sink, err := MakeSSTSinkKeyWriter(conf, store)
489489
require.NoError(t, err)
490490
return sink, store
491491
}

pkg/backup/compaction_processor.go

Lines changed: 12 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -25,6 +25,7 @@ import (
2525
"github.com/cockroachdb/cockroach/pkg/sql/rowexec"
2626
"github.com/cockroachdb/cockroach/pkg/sql/types"
2727
"github.com/cockroachdb/cockroach/pkg/storage"
28+
"github.com/cockroachdb/cockroach/pkg/util/admission"
2829
"github.com/cockroachdb/cockroach/pkg/util/ctxgroup"
2930
"github.com/cockroachdb/cockroach/pkg/util/hlc"
3031
"github.com/cockroachdb/cockroach/pkg/util/log"
@@ -251,7 +252,7 @@ func (p *compactBackupsProcessor) processSpanEntries(
251252
Settings: &execCfg.Settings.SV,
252253
ElideMode: p.spec.ElideMode,
253254
}
254-
sink, err := backupsink.MakeSSTSinkKeyWriter(sinkConf, store, nil)
255+
sink, err := backupsink.MakeSSTSinkKeyWriter(sinkConf, store)
255256
if err != nil {
256257
return err
257258
}
@@ -261,6 +262,11 @@ func (p *compactBackupsProcessor) processSpanEntries(
261262
logClose(ctx, sink, "SST sink")
262263
}
263264
}()
265+
pacer := newBackupPacer(
266+
ctx, p.FlowCtx.Cfg.AdmissionPacerFactory, p.FlowCtx.Cfg.Settings,
267+
)
268+
// It is safe to close a nil pacer.
269+
defer pacer.Close()
264270

265271
for {
266272
select {
@@ -280,7 +286,7 @@ func (p *compactBackupsProcessor) processSpanEntries(
280286
return errors.Wrap(err, "opening SSTs")
281287
}
282288

283-
if err := compactSpanEntry(ctx, sstIter, sink); err != nil {
289+
if err := compactSpanEntry(ctx, sstIter, sink, pacer); err != nil {
284290
return errors.Wrap(err, "compacting span entry")
285291
}
286292
}
@@ -385,7 +391,7 @@ func openSSTs(
385391
}
386392

387393
func compactSpanEntry(
388-
ctx context.Context, sstIter mergedSST, sink *backupsink.SSTSinkKeyWriter,
394+
ctx context.Context, sstIter mergedSST, sink *backupsink.SSTSinkKeyWriter, pacer *admission.Pacer,
389395
) error {
390396
defer sstIter.cleanup()
391397
entry := sstIter.entry
@@ -404,6 +410,9 @@ func compactSpanEntry(
404410
scratch = append(scratch, prefix...)
405411
iter := sstIter.iter
406412
for iter.SeekGE(trimmedStart); ; iter.NextKey() {
413+
if err := pacer.Pace(ctx); err != nil {
414+
return err
415+
}
407416
var key storage.MVCCKey
408417
if ok, err := iter.Valid(); err != nil {
409418
return err

0 commit comments

Comments
 (0)