Skip to content

Commit 6dfa19b

Browse files
committed
sql/rowexec: pace index backfill processing with elastic AC
Add admission control pacing to index backfill operations in the buildIndexEntries loop. Each index entry processing goroutine creates its own pacer for CPU load management, similar to the existing pacing in import and bulk SST operations. Release note: None Epic: none
1 parent b927a51 commit 6dfa19b

File tree

2 files changed

+16
-0
lines changed

2 files changed

+16
-0
lines changed

pkg/sql/rowexec/BUILD.bazel

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -46,6 +46,7 @@ go_library(
4646
"//pkg/jobs/jobspb",
4747
"//pkg/keys",
4848
"//pkg/kv",
49+
"//pkg/kv/bulk",
4950
"//pkg/kv/kvclient/kvstreamer",
5051
"//pkg/kv/kvpb",
5152
"//pkg/kv/kvserver/kvserverbase",

pkg/sql/rowexec/indexbackfiller.go

Lines changed: 15 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -9,6 +9,7 @@ import (
99
"context"
1010
"time"
1111

12+
"github.com/cockroachdb/cockroach/pkg/kv/bulk"
1213
"github.com/cockroachdb/cockroach/pkg/kv/kvpb"
1314
"github.com/cockroachdb/cockroach/pkg/kv/kvserver/kvserverbase"
1415
"github.com/cockroachdb/cockroach/pkg/roachpb"
@@ -78,6 +79,13 @@ var indexBackfillIngestConcurrency = settings.RegisterIntSetting(
7879
settings.PositiveInt, /* validateFn */
7980
)
8081

82+
var indexBackfillElasticCPUControlEnabled = settings.RegisterBoolSetting(
83+
settings.ApplicationLevel,
84+
"bulkio.index_backfill.elastic_control.enabled",
85+
"determines whether index backfill operations integrate with elastic CPU control",
86+
false, // TODO(dt): enable this by default after more benchmarking.
87+
)
88+
8189
func newIndexBackfiller(
8290
ctx context.Context,
8391
flowCtx *execinfra.FlowCtx,
@@ -329,9 +337,16 @@ func (ib *indexBackfiller) ingestIndexEntries(
329337
g.GoCtx(func(ctx context.Context) error {
330338
defer close(stopProgress)
331339

340+
// Create a pacer for admission control for index entry processing.
341+
pacer := bulk.NewCPUPacer(ctx, ib.flowCtx.Cfg.DB.KV(), indexBackfillElasticCPUControlEnabled)
342+
defer pacer.Close()
343+
332344
var vectorInputEntry rowenc.IndexEntry
333345
for indexBatch := range indexEntryCh {
334346
for _, indexEntry := range indexBatch.indexEntries {
347+
// Pace the admission control before processing each index entry.
348+
pacer.Pace(ctx)
349+
335350
// If there is at least one vector index being written, we need to check to see
336351
// if this IndexEntry is going to a vector index and then re-encode it for that
337352
// index if so.

0 commit comments

Comments
 (0)