Skip to content

Commit 21f5b4f

Browse files
craig[bot]dt
andcommitted
Merge #150703
150703: bulk,importer,rowexec: add CPU AC to bulk ops r=dt a=dt This adds the option to enable a CPU-based pacer to a few loops that have the potential to become tight, CPU-bound loops during bulk operations, namely: * SST Batcher's Add method, which is called in a tight loop after sorting a buffered pool of KVs * index backfiller's key generation loop, which is a tight loop over a buffer of read rows * IMPORT's row -> datum reader loop and datum -> KV loops. Each of these are controlled by a setting, which are default off for now pending more production-scale testing and benchmarking. Co-authored-by: David Taylor <[email protected]>
2 parents ef049f8 + 6dfa19b commit 21f5b4f

File tree

9 files changed

+137
-2
lines changed

9 files changed

+137
-2
lines changed

pkg/kv/bulk/BUILD.bazel

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -5,6 +5,7 @@ go_library(
55
srcs = [
66
"buffering_adder.go",
77
"bulk_metrics.go",
8+
"cpu_pacer.go",
89
"kv_buf.go",
910
"setting.go",
1011
"sst_adder.go",
@@ -24,6 +25,7 @@ go_library(
2425
"//pkg/settings/cluster",
2526
"//pkg/storage",
2627
"//pkg/storage/enginepb",
28+
"//pkg/util/admission",
2729
"//pkg/util/admission/admissionpb",
2830
"//pkg/util/buildutil",
2931
"//pkg/util/ctxgroup",

pkg/kv/bulk/buffering_adder.go

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -153,6 +153,7 @@ func MakeBulkAdder(
153153
writeAtBatchTS: opts.WriteAtBatchTimestamp,
154154
mem: bulkMon.MakeConcurrentBoundAccount(),
155155
limiter: sendLimiter,
156+
pacer: NewCPUPacer(ctx, db, sstBatcherElasticCPUControlEnabled),
156157
},
157158
timestamp: timestamp,
158159
maxBufferLimit: opts.MaxBufferSize,

pkg/kv/bulk/cpu_pacer.go

Lines changed: 71 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,71 @@
1+
// Copyright 2025 The Cockroach Authors.
2+
//
3+
// Use of this software is governed by the CockroachDB Software License
4+
// included in the /LICENSE file.
5+
6+
package bulk
7+
8+
import (
9+
"context"
10+
"time"
11+
12+
"github.com/cockroachdb/cockroach/pkg/kv"
13+
"github.com/cockroachdb/cockroach/pkg/roachpb"
14+
"github.com/cockroachdb/cockroach/pkg/settings"
15+
"github.com/cockroachdb/cockroach/pkg/util/admission"
16+
"github.com/cockroachdb/cockroach/pkg/util/admission/admissionpb"
17+
"github.com/cockroachdb/cockroach/pkg/util/log"
18+
"github.com/cockroachdb/cockroach/pkg/util/timeutil"
19+
)
20+
21+
// CPUPacer wraps an admission.Pacer for use in bulk operations where any errors
22+
// pacing can just be logged (at most once a minute).
23+
type CPUPacer struct {
24+
pacer *admission.Pacer
25+
logEvery *log.EveryN
26+
}
27+
28+
func (p *CPUPacer) Pace(ctx context.Context) {
29+
if err := p.pacer.Pace(ctx); err != nil {
30+
if p.logEvery.ShouldLog() {
31+
// TODO(dt): consider just returning this error/eliminating this wrapper,
32+
// and making callers bubble up this error if it is only ever a ctx cancel
33+
// error that the caller should be bubbling up anyway.
34+
log.Warningf(ctx, "failed to pace SST batcher: %v", err)
35+
}
36+
}
37+
}
38+
func (p *CPUPacer) Close() {
39+
p.pacer.Close()
40+
}
41+
42+
var cpuPacerRequestDuration = settings.RegisterDurationSetting(
43+
settings.ApplicationLevel,
44+
"bulkio.elastic_cpu_control.request_duration",
45+
"exeuction time unit to request when pacing CPU requests during various bulk operations",
46+
50*time.Millisecond,
47+
)
48+
49+
// NewCPUPacer creates a new AC pacer for SST batcher. It may return an empty
50+
// Pacer which noops if pacing is disabled or its arguments are nil.
51+
func NewCPUPacer(ctx context.Context, db *kv.DB, setting *settings.BoolSetting) CPUPacer {
52+
if db == nil || db.AdmissionPacerFactory == nil || !setting.Get(db.SettingsValues()) {
53+
log.Infof(ctx, "admission control is not configured to pace bulk ingestion")
54+
return CPUPacer{}
55+
}
56+
tenantID, ok := roachpb.ClientTenantFromContext(ctx)
57+
if !ok {
58+
tenantID = roachpb.SystemTenantID
59+
}
60+
every := log.Every(time.Minute)
61+
return CPUPacer{pacer: db.AdmissionPacerFactory.NewPacer(
62+
cpuPacerRequestDuration.Get(db.SettingsValues()),
63+
admission.WorkInfo{
64+
TenantID: tenantID,
65+
Priority: admissionpb.BulkNormalPri,
66+
CreateTime: timeutil.Now().UnixNano(),
67+
BypassAdmission: false,
68+
}),
69+
logEvery: &every,
70+
}
71+
}

pkg/kv/bulk/sst_batcher.go

Lines changed: 17 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -71,6 +71,13 @@ var (
7171
"if set, kvserver will compute an accurate stats diff for every addsstable request",
7272
metamorphic.ConstantWithTestBool("computeStatsDiffInStreamBatcher", true),
7373
)
74+
75+
sstBatcherElasticCPUControlEnabled = settings.RegisterBoolSetting(
76+
settings.ApplicationLevel,
77+
"bulkio.ingest.sst_batcher_elastic_control.enabled",
78+
"determines whether the sst batcher integrates with elastic CPU control",
79+
false, // TODO(dt): enable this by default.
80+
)
7481
)
7582

7683
// MakeAndRegisterConcurrencyLimiter makes a concurrency limiter and registers it
@@ -212,6 +219,9 @@ type SSTBatcher struct {
212219
// disallowShadowingBelow is described on kvpb.AddSSTableRequest.
213220
disallowShadowingBelow hlc.Timestamp
214221

222+
// pacer for admission control during SST ingestion
223+
pacer CPUPacer
224+
215225
// skips duplicate keys (iff they are buffered together). This is true when
216226
// used to backfill an inverted index. An array in JSONB with multiple values
217227
// which are the same, will all correspond to the same kv in the inverted
@@ -313,6 +323,7 @@ func MakeSSTBatcher(
313323
mem: mem,
314324
limiter: sendLimiter,
315325
rc: rc,
326+
pacer: NewCPUPacer(ctx, db, sstBatcherElasticCPUControlEnabled),
316327
}
317328
b.mu.lastFlush = timeutil.Now()
318329
b.mu.tracingSpan = tracing.SpanFromContext(ctx)
@@ -352,6 +363,7 @@ func MakeStreamSSTBatcher(
352363
// does not however make sense to scatter that range as the RHS maybe
353364
// non-empty.
354365
disableScatters: true,
366+
pacer: NewCPUPacer(ctx, db, sstBatcherElasticCPUControlEnabled),
355367
}
356368
b.mu.lastFlush = timeutil.Now()
357369
b.mu.tracingSpan = tracing.SpanFromContext(ctx)
@@ -379,6 +391,7 @@ func MakeTestingSSTBatcher(
379391
ingestAll: ingestAll,
380392
mem: mem,
381393
limiter: sendLimiter,
394+
pacer: NewCPUPacer(ctx, db, sstBatcherElasticCPUControlEnabled),
382395
}
383396
b.init(ctx)
384397
return b, nil
@@ -394,7 +407,6 @@ func (b *SSTBatcher) SetOnFlush(onFlush func(summary kvpb.BulkOpSummary)) {
394407
func (b *SSTBatcher) AddMVCCKeyWithImportEpoch(
395408
ctx context.Context, key storage.MVCCKey, value []byte, importEpoch uint32,
396409
) error {
397-
398410
mvccVal, err := storage.DecodeMVCCValue(value)
399411
if err != nil {
400412
return err
@@ -411,7 +423,6 @@ func (b *SSTBatcher) AddMVCCKeyWithImportEpoch(
411423
}
412424

413425
func (b *SSTBatcher) AddMVCCKeyLDR(ctx context.Context, key storage.MVCCKey, value []byte) error {
414-
415426
mvccVal, err := storage.DecodeMVCCValue(value)
416427
if err != nil {
417428
return err
@@ -432,6 +443,9 @@ func (b *SSTBatcher) AddMVCCKeyLDR(ctx context.Context, key storage.MVCCKey, val
432443
// keys -- like RESTORE where we want the restored data to look like the backup.
433444
// Keys must be added in order.
434445
func (b *SSTBatcher) AddMVCCKey(ctx context.Context, key storage.MVCCKey, value []byte) error {
446+
// Pace based on admission control before adding the key.
447+
b.pacer.Pace(ctx)
448+
435449
if len(b.batch.endKey) > 0 && bytes.Equal(b.batch.endKey, key.Key) {
436450
if b.ingestAll && key.Timestamp.Equal(b.batch.endTimestamp) {
437451
if bytes.Equal(b.batch.endValue, value) {
@@ -924,6 +938,7 @@ func (b *SSTBatcher) Close(ctx context.Context) {
924938
if err := b.syncFlush(); err != nil {
925939
log.Warningf(ctx, "closing with flushes in-progress encountered an error: %v", err)
926940
}
941+
b.pacer.Close()
927942
b.mem.Close(ctx)
928943
}
929944

pkg/sql/importer/BUILD.bazel

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -35,6 +35,7 @@ go_library(
3535
"//pkg/jobs/jobspb",
3636
"//pkg/jobs/jobsprofiler",
3737
"//pkg/kv",
38+
"//pkg/kv/bulk",
3839
"//pkg/kv/kvpb",
3940
"//pkg/kv/kvserver/kvserverbase",
4041
"//pkg/roachpb",

pkg/sql/importer/read_import_base.go

Lines changed: 21 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -21,9 +21,11 @@ import (
2121
"github.com/cockroachdb/cockroach/pkg/cloud"
2222
"github.com/cockroachdb/cockroach/pkg/crosscluster"
2323
"github.com/cockroachdb/cockroach/pkg/kv"
24+
"github.com/cockroachdb/cockroach/pkg/kv/bulk"
2425
"github.com/cockroachdb/cockroach/pkg/kv/kvpb"
2526
"github.com/cockroachdb/cockroach/pkg/roachpb"
2627
"github.com/cockroachdb/cockroach/pkg/security/username"
28+
"github.com/cockroachdb/cockroach/pkg/settings"
2729
"github.com/cockroachdb/cockroach/pkg/sql/catalog"
2830
"github.com/cockroachdb/cockroach/pkg/sql/catalog/tabledesc"
2931
"github.com/cockroachdb/cockroach/pkg/sql/catalog/typedesc"
@@ -44,6 +46,13 @@ import (
4446
"github.com/cockroachdb/errors"
4547
)
4648

49+
var importElasticCPUControlEnabled = settings.RegisterBoolSetting(
50+
settings.ApplicationLevel,
51+
"bulkio.import.elastic_control.enabled",
52+
"determines whether import operations integrate with elastic CPU control",
53+
false, // TODO(dt): enable this by default after more benchmarking.
54+
)
55+
4756
func runImport(
4857
ctx context.Context,
4958
flowCtx *execinfra.FlowCtx,
@@ -567,9 +576,15 @@ func runParallelImport(
567576
var span *tracing.Span
568577
ctx, span = tracing.ChildSpan(ctx, "import-file-to-rows")
569578
defer span.Finish()
579+
580+
// Create a pacer for admission control for the producer.
581+
pacer := bulk.NewCPUPacer(ctx, importCtx.db, importElasticCPUControlEnabled)
582+
defer pacer.Close()
583+
570584
var numSkipped int64
571585
var count int64
572586
for producer.Scan() {
587+
pacer.Pace(ctx)
573588
// Skip rows if needed.
574589
count++
575590
if count <= fileCtx.skip {
@@ -660,6 +675,10 @@ func (p *parallelImporter) importWorker(
660675
fileCtx *importFileContext,
661676
minEmitted []int64,
662677
) error {
678+
// Create a pacer for admission control for this worker.
679+
pacer := bulk.NewCPUPacer(ctx, importCtx.db, importElasticCPUControlEnabled)
680+
defer pacer.Close()
681+
663682
conv, err := makeDatumConverter(ctx, importCtx, fileCtx, importCtx.db)
664683
if err != nil {
665684
return err
@@ -680,6 +699,8 @@ func (p *parallelImporter) importWorker(
680699
conv.KvBatch.Progress = batch.progress
681700
for batchIdx, record := range batch.data {
682701
rowNum = batch.startPos + int64(batchIdx)
702+
// Pace the admission control before processing each row.
703+
pacer.Pace(ctx)
683704
if err := consumer.FillDatums(ctx, record, rowNum, conv); err != nil {
684705
if err = handleCorruptRow(ctx, fileCtx, err); err != nil {
685706
return err

pkg/sql/importer/read_import_workload.go

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,7 @@ import (
1717
"github.com/cockroachdb/cockroach/pkg/cloud"
1818
"github.com/cockroachdb/cockroach/pkg/col/coldata"
1919
"github.com/cockroachdb/cockroach/pkg/kv"
20+
"github.com/cockroachdb/cockroach/pkg/kv/bulk"
2021
"github.com/cockroachdb/cockroach/pkg/roachpb"
2122
"github.com/cockroachdb/cockroach/pkg/security/username"
2223
"github.com/cockroachdb/cockroach/pkg/sql/catalog"
@@ -245,6 +246,12 @@ func NewWorkloadKVConverter(
245246
func (w *WorkloadKVConverter) Worker(
246247
ctx context.Context, evalCtx *eval.Context, semaCtx *tree.SemaContext,
247248
) error {
249+
// Workload needs to pace itself explicitly since it manages its own workers
250+
// and loops rather than using the "runParallelImport" helper which the other
251+
// formats use and which has pacing built-in.
252+
pacer := bulk.NewCPUPacer(ctx, w.db, importElasticCPUControlEnabled)
253+
defer pacer.Close()
254+
248255
conv, err := row.NewDatumRowConverter(
249256
ctx, semaCtx, w.tableDesc, nil, /* targetColNames */
250257
evalCtx, w.kvCh, nil /* seqChunkProvider */, nil /* metrics */, w.db,
@@ -268,6 +275,7 @@ func (w *WorkloadKVConverter) Worker(
268275
a = a.Truncate()
269276
w.rows.FillBatch(batchIdx, cb, &a)
270277
for rowIdx, numRows := 0, cb.Length(); rowIdx < numRows; rowIdx++ {
278+
pacer.Pace(ctx)
271279
for colIdx, col := range cb.ColVecs() {
272280
// TODO(dan): This does a type switch once per-datum. Reduce this to
273281
// a one-time switch per column.

pkg/sql/rowexec/BUILD.bazel

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -46,6 +46,7 @@ go_library(
4646
"//pkg/jobs/jobspb",
4747
"//pkg/keys",
4848
"//pkg/kv",
49+
"//pkg/kv/bulk",
4950
"//pkg/kv/kvclient/kvstreamer",
5051
"//pkg/kv/kvpb",
5152
"//pkg/kv/kvserver/kvserverbase",

pkg/sql/rowexec/indexbackfiller.go

Lines changed: 15 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -9,6 +9,7 @@ import (
99
"context"
1010
"time"
1111

12+
"github.com/cockroachdb/cockroach/pkg/kv/bulk"
1213
"github.com/cockroachdb/cockroach/pkg/kv/kvpb"
1314
"github.com/cockroachdb/cockroach/pkg/kv/kvserver/kvserverbase"
1415
"github.com/cockroachdb/cockroach/pkg/roachpb"
@@ -78,6 +79,13 @@ var indexBackfillIngestConcurrency = settings.RegisterIntSetting(
7879
settings.PositiveInt, /* validateFn */
7980
)
8081

82+
var indexBackfillElasticCPUControlEnabled = settings.RegisterBoolSetting(
83+
settings.ApplicationLevel,
84+
"bulkio.index_backfill.elastic_control.enabled",
85+
"determines whether index backfill operations integrate with elastic CPU control",
86+
false, // TODO(dt): enable this by default after more benchmarking.
87+
)
88+
8189
func newIndexBackfiller(
8290
ctx context.Context,
8391
flowCtx *execinfra.FlowCtx,
@@ -329,9 +337,16 @@ func (ib *indexBackfiller) ingestIndexEntries(
329337
g.GoCtx(func(ctx context.Context) error {
330338
defer close(stopProgress)
331339

340+
// Create a pacer for admission control for index entry processing.
341+
pacer := bulk.NewCPUPacer(ctx, ib.flowCtx.Cfg.DB.KV(), indexBackfillElasticCPUControlEnabled)
342+
defer pacer.Close()
343+
332344
var vectorInputEntry rowenc.IndexEntry
333345
for indexBatch := range indexEntryCh {
334346
for _, indexEntry := range indexBatch.indexEntries {
347+
// Pace the admission control before processing each index entry.
348+
pacer.Pace(ctx)
349+
335350
// If there is at least one vector index being written, we need to check to see
336351
// if this IndexEntry is going to a vector index and then re-encode it for that
337352
// index if so.

0 commit comments

Comments
 (0)