Skip to content

Commit d5398f1

Browse files
committed
*: treat ac pacer errs as errors
The AC CPU pacer.Pace() only returns a non-nil error in situations where the caller should stop processing paced work, such as if the context is cancelled, meaning the caller of the pacer should generally return its non-nil err. Previously many callers were treating errors from Pace() as failures to pace, but a failure to pace was simply something to note to a log before continuing to run the (now unpaced) operation. This changes that log-and-continue handling of Pace errors across users of the pacer to the more standard 'return err' handling. Release note: none. Epic: none.
1 parent c0934ae commit d5398f1

File tree

9 files changed

+28
-51
lines changed

9 files changed

+28
-51
lines changed

pkg/kv/bulk/cpu_pacer.go

Lines changed: 4 additions & 28 deletions
Original file line numberDiff line numberDiff line change
@@ -18,27 +18,6 @@ import (
1818
"github.com/cockroachdb/cockroach/pkg/util/timeutil"
1919
)
2020

21-
// CPUPacer wraps an admission.Pacer for use in bulk operations where any errors
22-
// pacing can just be logged (at most once a minute).
23-
type CPUPacer struct {
24-
pacer *admission.Pacer
25-
logEvery *log.EveryN
26-
}
27-
28-
func (p *CPUPacer) Pace(ctx context.Context) {
29-
if err := p.pacer.Pace(ctx); err != nil {
30-
if p.logEvery.ShouldLog() {
31-
// TODO(dt): consider just returning this error/eliminating this wrapper,
32-
// and making callers bubble up this error if it is only ever a ctx cancel
33-
// error that the caller should be bubbling up anyway.
34-
log.Warningf(ctx, "failed to pace SST batcher: %v", err)
35-
}
36-
}
37-
}
38-
func (p *CPUPacer) Close() {
39-
p.pacer.Close()
40-
}
41-
4221
var cpuPacerRequestDuration = settings.RegisterDurationSetting(
4322
settings.ApplicationLevel,
4423
"bulkio.elastic_cpu_control.request_duration",
@@ -48,24 +27,21 @@ var cpuPacerRequestDuration = settings.RegisterDurationSetting(
4827

4928
// NewCPUPacer creates a new AC pacer for SST batcher. It may return an empty
5029
// Pacer which noops if pacing is disabled or its arguments are nil.
51-
func NewCPUPacer(ctx context.Context, db *kv.DB, setting *settings.BoolSetting) CPUPacer {
30+
func NewCPUPacer(ctx context.Context, db *kv.DB, setting *settings.BoolSetting) *admission.Pacer {
5231
if db == nil || db.AdmissionPacerFactory == nil || !setting.Get(db.SettingsValues()) {
5332
log.Infof(ctx, "admission control is not configured to pace bulk ingestion")
54-
return CPUPacer{}
33+
return nil
5534
}
5635
tenantID, ok := roachpb.ClientTenantFromContext(ctx)
5736
if !ok {
5837
tenantID = roachpb.SystemTenantID
5938
}
60-
every := log.Every(time.Minute)
61-
return CPUPacer{pacer: db.AdmissionPacerFactory.NewPacer(
39+
return db.AdmissionPacerFactory.NewPacer(
6240
cpuPacerRequestDuration.Get(db.SettingsValues()),
6341
admission.WorkInfo{
6442
TenantID: tenantID,
6543
Priority: admissionpb.BulkNormalPri,
6644
CreateTime: timeutil.Now().UnixNano(),
6745
BypassAdmission: false,
68-
}),
69-
logEvery: &every,
70-
}
46+
})
7147
}

pkg/kv/bulk/sst_batcher.go

Lines changed: 5 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -23,6 +23,7 @@ import (
2323
"github.com/cockroachdb/cockroach/pkg/settings/cluster"
2424
"github.com/cockroachdb/cockroach/pkg/storage"
2525
"github.com/cockroachdb/cockroach/pkg/storage/enginepb"
26+
"github.com/cockroachdb/cockroach/pkg/util/admission"
2627
"github.com/cockroachdb/cockroach/pkg/util/admission/admissionpb"
2728
"github.com/cockroachdb/cockroach/pkg/util/buildutil"
2829
"github.com/cockroachdb/cockroach/pkg/util/ctxgroup"
@@ -220,7 +221,7 @@ type SSTBatcher struct {
220221
disallowShadowingBelow hlc.Timestamp
221222

222223
// pacer for admission control during SST ingestion
223-
pacer CPUPacer
224+
pacer *admission.Pacer
224225

225226
// skips duplicate keys (iff they are buffered together). This is true when
226227
// used to backfill an inverted index. An array in JSONB with multiple values
@@ -444,7 +445,9 @@ func (b *SSTBatcher) AddMVCCKeyLDR(ctx context.Context, key storage.MVCCKey, val
444445
// Keys must be added in order.
445446
func (b *SSTBatcher) AddMVCCKey(ctx context.Context, key storage.MVCCKey, value []byte) error {
446447
// Pace based on admission control before adding the key.
447-
b.pacer.Pace(ctx)
448+
if err := b.pacer.Pace(ctx); err != nil {
449+
return err
450+
}
448451

449452
if len(b.batch.endKey) > 0 && bytes.Equal(b.batch.endKey, key.Key) {
450453
if b.ingestAll && key.Timestamp.Equal(b.batch.endTimestamp) {

pkg/kv/kvserver/rangefeed/catchup_scan.go

Lines changed: 1 addition & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -8,7 +8,6 @@ package rangefeed
88
import (
99
"bytes"
1010
"context"
11-
"time"
1211

1312
"github.com/cockroachdb/cockroach/pkg/kv/kvpb"
1413
"github.com/cockroachdb/cockroach/pkg/roachpb"
@@ -18,7 +17,6 @@ import (
1817
"github.com/cockroachdb/cockroach/pkg/util/admission"
1918
"github.com/cockroachdb/cockroach/pkg/util/bufalloc"
2019
"github.com/cockroachdb/cockroach/pkg/util/hlc"
21-
"github.com/cockroachdb/cockroach/pkg/util/log"
2220
"github.com/cockroachdb/cockroach/pkg/util/protoutil"
2321
"github.com/cockroachdb/errors"
2422
)
@@ -191,7 +189,6 @@ func (i *CatchUpIterator) CatchUpScan(
191189
var meta enginepb.MVCCMetadata
192190
i.SeekGE(storage.MVCCKey{Key: i.span.Key})
193191

194-
every := log.Every(100 * time.Millisecond)
195192
for {
196193
if ok, err := i.Valid(); err != nil {
197194
return err
@@ -200,11 +197,7 @@ func (i *CatchUpIterator) CatchUpScan(
200197
}
201198

202199
if err := i.pacer.Pace(ctx); err != nil {
203-
// We're unable to pace things automatically -- shout loudly
204-
// semi-infrequently but don't fail the rangefeed itself.
205-
if every.ShouldLog() {
206-
log.Errorf(ctx, "automatic pacing: %v", err)
207-
}
200+
return err
208201
}
209202

210203
// Emit any new MVCC range tombstones when their start key is encountered.

pkg/sql/importer/read_import_base.go

Lines changed: 7 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -584,7 +584,9 @@ func runParallelImport(
584584
var numSkipped int64
585585
var count int64
586586
for producer.Scan() {
587-
pacer.Pace(ctx)
587+
if err := pacer.Pace(ctx); err != nil {
588+
return err
589+
}
588590
// Skip rows if needed.
589591
count++
590592
if count <= fileCtx.skip {
@@ -700,7 +702,10 @@ func (p *parallelImporter) importWorker(
700702
for batchIdx, record := range batch.data {
701703
rowNum = batch.startPos + int64(batchIdx)
702704
// Pace the admission control before processing each row.
703-
pacer.Pace(ctx)
705+
if err := pacer.Pace(ctx); err != nil {
706+
return err
707+
}
708+
704709
if err := consumer.FillDatums(ctx, record, rowNum, conv); err != nil {
705710
if err = handleCorruptRow(ctx, fileCtx, err); err != nil {
706711
return err

pkg/sql/importer/read_import_workload.go

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -275,7 +275,9 @@ func (w *WorkloadKVConverter) Worker(
275275
a = a.Truncate()
276276
w.rows.FillBatch(batchIdx, cb, &a)
277277
for rowIdx, numRows := 0, cb.Length(); rowIdx < numRows; rowIdx++ {
278-
pacer.Pace(ctx)
278+
if err := pacer.Pace(ctx); err != nil {
279+
return err
280+
}
279281
for colIdx, col := range cb.ColVecs() {
280282
// TODO(dan): This does a type switch once per-datum. Reduce this to
281283
// a one-time switch per column.

pkg/sql/row/kv_batch_fetcher.go

Lines changed: 1 addition & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -55,8 +55,6 @@ var defaultKVBatchSize = rowinfra.KeyLimit(metamorphic.ConstantWithTestValue(
5555
1, /* metamorphicValue */
5656
))
5757

58-
var logAdmissionPacerErr = log.Every(100 * time.Millisecond)
59-
6058
// elasticCPUDurationPerLowPriReadResponse controls how many CPU tokens are allotted
6159
// each time we seek admission for response handling during internally submitted
6260
// low priority reads (like row-level TTL selects).
@@ -753,12 +751,7 @@ func (f *txnKVFetcher) maybeAdmitBatchResponse(ctx context.Context, br *kvpb.Bat
753751
// to ensure that they have local plans with a single TableReader
754752
// processor in multi-node clusters.
755753
if err := f.admissionPacer.Pace(ctx); err != nil {
756-
// We're unable to pace things automatically -- shout loudly
757-
// semi-infrequently but don't fail the kv fetcher itself. At
758-
// worst we'd be over-admitting.
759-
if logAdmissionPacerErr.ShouldLog() {
760-
log.Errorf(ctx, "automatic pacing: %v", err)
761-
}
754+
return err
762755
}
763756
} else if f.responseAdmissionQ != nil {
764757
responseAdmission := admission.WorkInfo{

pkg/sql/rowexec/indexbackfiller.go

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -345,7 +345,9 @@ func (ib *indexBackfiller) ingestIndexEntries(
345345
for indexBatch := range indexEntryCh {
346346
for _, indexEntry := range indexBatch.indexEntries {
347347
// Pace the admission control before processing each index entry.
348-
pacer.Pace(ctx)
348+
if err := pacer.Pace(ctx); err != nil {
349+
return err
350+
}
349351

350352
// If there is at least one vector index being written, we need to check to see
351353
// if this IndexEntry is going to a vector index and then re-encode it for that

pkg/util/admission/elastic_cpu_work_queue.go

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -66,6 +66,7 @@ func makeElasticCPUWorkQueue(
6666
}
6767

6868
// Admit is called when requesting admission for elastic CPU work.
69+
// Non-nil errors are returned only if the context is canceled.
6970
func (e *ElasticCPUWorkQueue) Admit(
7071
ctx context.Context, duration time.Duration, info WorkInfo,
7172
) (*ElasticCPUWorkHandle, error) {

pkg/util/admission/pacer.go

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -23,7 +23,9 @@ type Pacer struct {
2323
cur *ElasticCPUWorkHandle
2424
}
2525

26-
// Pace is part of the Pacer interface.
26+
// Pace will block as needed to pace work that calls it as configured. It is
27+
// intended to be called in a tight loop, and will attempt to minimize per-call
28+
// overhead. Non-nil errors are returned only if the context is canceled.
2729
func (p *Pacer) Pace(ctx context.Context) error {
2830
if p == nil {
2931
return nil

0 commit comments

Comments
 (0)