Skip to content

Commit b927a51

Browse files
committed
sql/importer: pace import processing with elastic AC
Add admission control pacing to IMPORT operations in both the file-to-rows and rows-to-datums pipelines. Each goroutine creates its own pacer for CPU load management. Epic: none Release note: None
1 parent 8ac4332 commit b927a51

File tree

3 files changed

+30
-0
lines changed

3 files changed

+30
-0
lines changed

pkg/sql/importer/BUILD.bazel

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -35,6 +35,7 @@ go_library(
3535
"//pkg/jobs/jobspb",
3636
"//pkg/jobs/jobsprofiler",
3737
"//pkg/kv",
38+
"//pkg/kv/bulk",
3839
"//pkg/kv/kvpb",
3940
"//pkg/kv/kvserver/kvserverbase",
4041
"//pkg/roachpb",

pkg/sql/importer/read_import_base.go

Lines changed: 21 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -21,9 +21,11 @@ import (
2121
"github.com/cockroachdb/cockroach/pkg/cloud"
2222
"github.com/cockroachdb/cockroach/pkg/crosscluster"
2323
"github.com/cockroachdb/cockroach/pkg/kv"
24+
"github.com/cockroachdb/cockroach/pkg/kv/bulk"
2425
"github.com/cockroachdb/cockroach/pkg/kv/kvpb"
2526
"github.com/cockroachdb/cockroach/pkg/roachpb"
2627
"github.com/cockroachdb/cockroach/pkg/security/username"
28+
"github.com/cockroachdb/cockroach/pkg/settings"
2729
"github.com/cockroachdb/cockroach/pkg/sql/catalog"
2830
"github.com/cockroachdb/cockroach/pkg/sql/catalog/tabledesc"
2931
"github.com/cockroachdb/cockroach/pkg/sql/catalog/typedesc"
@@ -44,6 +46,13 @@ import (
4446
"github.com/cockroachdb/errors"
4547
)
4648

49+
var importElasticCPUControlEnabled = settings.RegisterBoolSetting(
50+
settings.ApplicationLevel,
51+
"bulkio.import.elastic_control.enabled",
52+
"determines whether import operations integrate with elastic CPU control",
53+
false, // TODO(dt): enable this by default after more benchmarking.
54+
)
55+
4756
func runImport(
4857
ctx context.Context,
4958
flowCtx *execinfra.FlowCtx,
@@ -567,9 +576,15 @@ func runParallelImport(
567576
var span *tracing.Span
568577
ctx, span = tracing.ChildSpan(ctx, "import-file-to-rows")
569578
defer span.Finish()
579+
580+
// Create a pacer for admission control for the producer.
581+
pacer := bulk.NewCPUPacer(ctx, importCtx.db, importElasticCPUControlEnabled)
582+
defer pacer.Close()
583+
570584
var numSkipped int64
571585
var count int64
572586
for producer.Scan() {
587+
pacer.Pace(ctx)
573588
// Skip rows if needed.
574589
count++
575590
if count <= fileCtx.skip {
@@ -660,6 +675,10 @@ func (p *parallelImporter) importWorker(
660675
fileCtx *importFileContext,
661676
minEmitted []int64,
662677
) error {
678+
// Create a pacer for admission control for this worker.
679+
pacer := bulk.NewCPUPacer(ctx, importCtx.db, importElasticCPUControlEnabled)
680+
defer pacer.Close()
681+
663682
conv, err := makeDatumConverter(ctx, importCtx, fileCtx, importCtx.db)
664683
if err != nil {
665684
return err
@@ -680,6 +699,8 @@ func (p *parallelImporter) importWorker(
680699
conv.KvBatch.Progress = batch.progress
681700
for batchIdx, record := range batch.data {
682701
rowNum = batch.startPos + int64(batchIdx)
702+
// Pace the admission control before processing each row.
703+
pacer.Pace(ctx)
683704
if err := consumer.FillDatums(ctx, record, rowNum, conv); err != nil {
684705
if err = handleCorruptRow(ctx, fileCtx, err); err != nil {
685706
return err

pkg/sql/importer/read_import_workload.go

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,7 @@ import (
1717
"github.com/cockroachdb/cockroach/pkg/cloud"
1818
"github.com/cockroachdb/cockroach/pkg/col/coldata"
1919
"github.com/cockroachdb/cockroach/pkg/kv"
20+
"github.com/cockroachdb/cockroach/pkg/kv/bulk"
2021
"github.com/cockroachdb/cockroach/pkg/roachpb"
2122
"github.com/cockroachdb/cockroach/pkg/security/username"
2223
"github.com/cockroachdb/cockroach/pkg/sql/catalog"
@@ -245,6 +246,12 @@ func NewWorkloadKVConverter(
245246
func (w *WorkloadKVConverter) Worker(
246247
ctx context.Context, evalCtx *eval.Context, semaCtx *tree.SemaContext,
247248
) error {
249+
// Workload needs to pace itself explicitly since it manages its own workers
250+
// and loops rather than using the "runParallelImport" helper which the other
251+
// formats use and which has pacing built-in.
252+
pacer := bulk.NewCPUPacer(ctx, w.db, importElasticCPUControlEnabled)
253+
defer pacer.Close()
254+
248255
conv, err := row.NewDatumRowConverter(
249256
ctx, semaCtx, w.tableDesc, nil, /* targetColNames */
250257
evalCtx, w.kvCh, nil /* seqChunkProvider */, nil /* metrics */, w.db,
@@ -268,6 +275,7 @@ func (w *WorkloadKVConverter) Worker(
268275
a = a.Truncate()
269276
w.rows.FillBatch(batchIdx, cb, &a)
270277
for rowIdx, numRows := 0, cb.Length(); rowIdx < numRows; rowIdx++ {
278+
pacer.Pace(ctx)
271279
for colIdx, col := range cb.ColVecs() {
272280
// TODO(dan): This does a type switch once per-datum. Reduce this to
273281
// a one-time switch per column.

0 commit comments

Comments
 (0)