Skip to content

Commit 8ac4332

Browse files
committed
kv/bulk: pace bulk SST generation/addition with elastic AC
Various bulk operations end up using SST Batcher to construct an SST, which is can be a CPU-bound operation when a large buffer of keys each need to be compared, compressed and flushed in a tight loop. This adds AC pacing to the method that is called per key so that it can pace these operations as needed. Release note: none. Epic: none.
1 parent acd07c1 commit 8ac4332

File tree

4 files changed

+91
-2
lines changed

4 files changed

+91
-2
lines changed

pkg/kv/bulk/BUILD.bazel

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -5,6 +5,7 @@ go_library(
55
srcs = [
66
"buffering_adder.go",
77
"bulk_metrics.go",
8+
"cpu_pacer.go",
89
"kv_buf.go",
910
"setting.go",
1011
"sst_adder.go",
@@ -24,6 +25,7 @@ go_library(
2425
"//pkg/settings/cluster",
2526
"//pkg/storage",
2627
"//pkg/storage/enginepb",
28+
"//pkg/util/admission",
2729
"//pkg/util/admission/admissionpb",
2830
"//pkg/util/buildutil",
2931
"//pkg/util/ctxgroup",

pkg/kv/bulk/buffering_adder.go

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -153,6 +153,7 @@ func MakeBulkAdder(
153153
writeAtBatchTS: opts.WriteAtBatchTimestamp,
154154
mem: bulkMon.MakeConcurrentBoundAccount(),
155155
limiter: sendLimiter,
156+
pacer: NewCPUPacer(ctx, db, sstBatcherElasticCPUControlEnabled),
156157
},
157158
timestamp: timestamp,
158159
maxBufferLimit: opts.MaxBufferSize,

pkg/kv/bulk/cpu_pacer.go

Lines changed: 71 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,71 @@
1+
// Copyright 2025 The Cockroach Authors.
2+
//
3+
// Use of this software is governed by the CockroachDB Software License
4+
// included in the /LICENSE file.
5+
6+
package bulk
7+
8+
import (
9+
"context"
10+
"time"
11+
12+
"github.com/cockroachdb/cockroach/pkg/kv"
13+
"github.com/cockroachdb/cockroach/pkg/roachpb"
14+
"github.com/cockroachdb/cockroach/pkg/settings"
15+
"github.com/cockroachdb/cockroach/pkg/util/admission"
16+
"github.com/cockroachdb/cockroach/pkg/util/admission/admissionpb"
17+
"github.com/cockroachdb/cockroach/pkg/util/log"
18+
"github.com/cockroachdb/cockroach/pkg/util/timeutil"
19+
)
20+
21+
// CPUPacer wraps an admission.Pacer for use in bulk operations where any errors
22+
// pacing can just be logged (at most once a minute).
23+
type CPUPacer struct {
24+
pacer *admission.Pacer
25+
logEvery *log.EveryN
26+
}
27+
28+
func (p *CPUPacer) Pace(ctx context.Context) {
29+
if err := p.pacer.Pace(ctx); err != nil {
30+
if p.logEvery.ShouldLog() {
31+
// TODO(dt): consider just returning this error/eliminating this wrapper,
32+
// and making callers bubble up this error if it is only ever a ctx cancel
33+
// error that the caller should be bubbling up anyway.
34+
log.Warningf(ctx, "failed to pace SST batcher: %v", err)
35+
}
36+
}
37+
}
38+
func (p *CPUPacer) Close() {
39+
p.pacer.Close()
40+
}
41+
42+
var cpuPacerRequestDuration = settings.RegisterDurationSetting(
43+
settings.ApplicationLevel,
44+
"bulkio.elastic_cpu_control.request_duration",
45+
"exeuction time unit to request when pacing CPU requests during various bulk operations",
46+
50*time.Millisecond,
47+
)
48+
49+
// NewCPUPacer creates a new AC pacer for SST batcher. It may return an empty
50+
// Pacer which noops if pacing is disabled or its arguments are nil.
51+
func NewCPUPacer(ctx context.Context, db *kv.DB, setting *settings.BoolSetting) CPUPacer {
52+
if db == nil || db.AdmissionPacerFactory == nil || !setting.Get(db.SettingsValues()) {
53+
log.Infof(ctx, "admission control is not configured to pace bulk ingestion")
54+
return CPUPacer{}
55+
}
56+
tenantID, ok := roachpb.ClientTenantFromContext(ctx)
57+
if !ok {
58+
tenantID = roachpb.SystemTenantID
59+
}
60+
every := log.Every(time.Minute)
61+
return CPUPacer{pacer: db.AdmissionPacerFactory.NewPacer(
62+
cpuPacerRequestDuration.Get(db.SettingsValues()),
63+
admission.WorkInfo{
64+
TenantID: tenantID,
65+
Priority: admissionpb.BulkNormalPri,
66+
CreateTime: timeutil.Now().UnixNano(),
67+
BypassAdmission: false,
68+
}),
69+
logEvery: &every,
70+
}
71+
}

pkg/kv/bulk/sst_batcher.go

Lines changed: 17 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -71,6 +71,13 @@ var (
7171
"if set, kvserver will compute an accurate stats diff for every addsstable request",
7272
metamorphic.ConstantWithTestBool("computeStatsDiffInStreamBatcher", true),
7373
)
74+
75+
sstBatcherElasticCPUControlEnabled = settings.RegisterBoolSetting(
76+
settings.ApplicationLevel,
77+
"bulkio.ingest.sst_batcher_elastic_control.enabled",
78+
"determines whether the sst batcher integrates with elastic CPU control",
79+
false, // TODO(dt): enable this by default.
80+
)
7481
)
7582

7683
// MakeAndRegisterConcurrencyLimiter makes a concurrency limiter and registers it
@@ -212,6 +219,9 @@ type SSTBatcher struct {
212219
// disallowShadowingBelow is described on kvpb.AddSSTableRequest.
213220
disallowShadowingBelow hlc.Timestamp
214221

222+
// pacer for admission control during SST ingestion
223+
pacer CPUPacer
224+
215225
// skips duplicate keys (iff they are buffered together). This is true when
216226
// used to backfill an inverted index. An array in JSONB with multiple values
217227
// which are the same, will all correspond to the same kv in the inverted
@@ -313,6 +323,7 @@ func MakeSSTBatcher(
313323
mem: mem,
314324
limiter: sendLimiter,
315325
rc: rc,
326+
pacer: NewCPUPacer(ctx, db, sstBatcherElasticCPUControlEnabled),
316327
}
317328
b.mu.lastFlush = timeutil.Now()
318329
b.mu.tracingSpan = tracing.SpanFromContext(ctx)
@@ -352,6 +363,7 @@ func MakeStreamSSTBatcher(
352363
// does not however make sense to scatter that range as the RHS maybe
353364
// non-empty.
354365
disableScatters: true,
366+
pacer: NewCPUPacer(ctx, db, sstBatcherElasticCPUControlEnabled),
355367
}
356368
b.mu.lastFlush = timeutil.Now()
357369
b.mu.tracingSpan = tracing.SpanFromContext(ctx)
@@ -379,6 +391,7 @@ func MakeTestingSSTBatcher(
379391
ingestAll: ingestAll,
380392
mem: mem,
381393
limiter: sendLimiter,
394+
pacer: NewCPUPacer(ctx, db, sstBatcherElasticCPUControlEnabled),
382395
}
383396
b.init(ctx)
384397
return b, nil
@@ -394,7 +407,6 @@ func (b *SSTBatcher) SetOnFlush(onFlush func(summary kvpb.BulkOpSummary)) {
394407
func (b *SSTBatcher) AddMVCCKeyWithImportEpoch(
395408
ctx context.Context, key storage.MVCCKey, value []byte, importEpoch uint32,
396409
) error {
397-
398410
mvccVal, err := storage.DecodeMVCCValue(value)
399411
if err != nil {
400412
return err
@@ -411,7 +423,6 @@ func (b *SSTBatcher) AddMVCCKeyWithImportEpoch(
411423
}
412424

413425
func (b *SSTBatcher) AddMVCCKeyLDR(ctx context.Context, key storage.MVCCKey, value []byte) error {
414-
415426
mvccVal, err := storage.DecodeMVCCValue(value)
416427
if err != nil {
417428
return err
@@ -432,6 +443,9 @@ func (b *SSTBatcher) AddMVCCKeyLDR(ctx context.Context, key storage.MVCCKey, val
432443
// keys -- like RESTORE where we want the restored data to look like the backup.
433444
// Keys must be added in order.
434445
func (b *SSTBatcher) AddMVCCKey(ctx context.Context, key storage.MVCCKey, value []byte) error {
446+
// Pace based on admission control before adding the key.
447+
b.pacer.Pace(ctx)
448+
435449
if len(b.batch.endKey) > 0 && bytes.Equal(b.batch.endKey, key.Key) {
436450
if b.ingestAll && key.Timestamp.Equal(b.batch.endTimestamp) {
437451
if bytes.Equal(b.batch.endValue, value) {
@@ -924,6 +938,7 @@ func (b *SSTBatcher) Close(ctx context.Context) {
924938
if err := b.syncFlush(); err != nil {
925939
log.Warningf(ctx, "closing with flushes in-progress encountered an error: %v", err)
926940
}
941+
b.pacer.Close()
927942
b.mem.Close(ctx)
928943
}
929944

0 commit comments

Comments
 (0)