Skip to content

Commit fbef5ab

Browse files
committed
admission: move yielding into ElasticCPUWorkHandle
ElasticCPUWorkHandle.Overlimit is expected to be called in a tight loop, so yielding there is conceptually the right place. More importantly, this will allow in the future for KV work that is not holding latches to also yield. As part of this change, elastic work that does not wish to wait in admission control queues (due to cluster settings), is now accounted for in the elastic tokens, and in the admission.elastic_cpu_bypassed.utilization metric. One side-effect of this accounting is that work that needs to wait in admission queues may have fewer tokens available to it, and may wait longer. This is considered acceptable since: - Elastic work that bypasses queueing is still elastic work, and our overarching goal is to reduce impact to foreground work. - Due to the default on use of runtime.Yield, all elastic work yields, which allows the system to run at higher elastic CPU utilization without impacting the latency of foreground work. Epic: none Release note: None
1 parent 33e93ea commit fbef5ab

17 files changed

+123
-110
lines changed

pkg/backup/datadriven_test.go

Lines changed: 1 addition & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -27,7 +27,6 @@ import (
2727
"github.com/cockroachdb/cockroach/pkg/jobs/jobspb"
2828
"github.com/cockroachdb/cockroach/pkg/keys"
2929
"github.com/cockroachdb/cockroach/pkg/kv"
30-
"github.com/cockroachdb/cockroach/pkg/kv/bulk"
3130
"github.com/cockroachdb/cockroach/pkg/kv/kvclient/kvcoord"
3231
"github.com/cockroachdb/cockroach/pkg/kv/kvpb"
3332
"github.com/cockroachdb/cockroach/pkg/kv/kvserver"
@@ -200,8 +199,7 @@ func (d *datadrivenTestState) addCluster(t *testing.T, cfg clusterCfg) error {
200199
// and overload the host. Generally overload would mean bulk work, which only
201200
// uses strictly spare capacitym gets starved, but these tests expect it to
202201
// still run (just slowly, along with everything else).
203-
bulk.YieldIfNoPacer.Override(context.Background(), &settings.SV, false)
204-
admission.YieldInPacer.Override(context.Background(), &settings.SV, false)
202+
admission.YieldForElasticCPU.Override(context.Background(), &settings.SV, false)
205203
params.ServerArgs.Settings = settings
206204

207205
clusterSize := cfg.nodes

pkg/kv/bulk/cpu_pacer.go

Lines changed: 10 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -25,26 +25,18 @@ var cpuPacerRequestDuration = settings.RegisterDurationSetting(
2525
50*time.Millisecond,
2626
)
2727

28-
// YieldIfNoPacer is exported so it can be overridden in tests.
29-
var YieldIfNoPacer = settings.RegisterBoolSetting(
30-
settings.ApplicationLevel,
31-
"bulkio.elastic_cpu_control.always_yield.enabled",
32-
"if true, yield the CPU as needed even when time-based elastic pacing is not enabled",
33-
true,
34-
)
35-
36-
// NewCPUPacer creates a new AC pacer for SST batcher. It may return an empty
37-
// Pacer which noops if pacing is disabled or its arguments are nil.
28+
// NewCPUPacer creates a new AC pacer for SST batcher. It will return an empty
29+
// Pacer which noops if db or db.AdmissionPacerFactory is nil.
30+
//
31+
// The setting specifies where waiting in the elastic admission control queue
32+
// is enabled. If disabled, the CPU consumed will be accounted for in
33+
// admission control, but pacing will not wait in admission control.
3834
func NewCPUPacer(ctx context.Context, db *kv.DB, setting *settings.BoolSetting) *admission.Pacer {
39-
if db == nil || db.AdmissionPacerFactory == nil || !setting.Get(db.SettingsValues()) {
40-
log.Dev.Infof(ctx, "admission control is not configured to pace bulk ingestion")
41-
42-
if db != nil && YieldIfNoPacer.Get(db.SettingsValues()) {
43-
// Return a Pacer that just yields.
44-
return &admission.Pacer{Yield: true}
45-
}
35+
if db == nil || db.AdmissionPacerFactory == nil {
36+
log.Dev.Infof(ctx, "admission control is not configured to pace this bulk work")
4637
return nil
4738
}
39+
bypassACQueue := !setting.Get(db.SettingsValues())
4840
tenantID, ok := roachpb.ClientTenantFromContext(ctx)
4941
if !ok {
5042
tenantID = roachpb.SystemTenantID
@@ -55,6 +47,6 @@ func NewCPUPacer(ctx context.Context, db *kv.DB, setting *settings.BoolSetting)
5547
TenantID: tenantID,
5648
Priority: admissionpb.BulkNormalPri,
5749
CreateTime: timeutil.Now().UnixNano(),
58-
BypassAdmission: false,
50+
BypassAdmission: bypassACQueue,
5951
})
6052
}

pkg/kv/kvserver/kvadmission/kvadmission.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -396,7 +396,7 @@ func (n *controllerImpl) AdmitKVWork(
396396
// reads, so in some sense, the integration is incomplete. This is
397397
// probably harmless.
398398
elasticWorkHandle, err := n.elasticCPUGrantCoordinator.ElasticCPUWorkQueue.Admit(
399-
ctx, admitDuration, admissionInfo,
399+
ctx, admitDuration, admissionInfo, false,
400400
)
401401
if err != nil {
402402
return Handle{}, err

pkg/server/testserver.go

Lines changed: 2 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -26,7 +26,6 @@ import (
2626
"github.com/cockroachdb/cockroach/pkg/jobs"
2727
"github.com/cockroachdb/cockroach/pkg/keys"
2828
"github.com/cockroachdb/cockroach/pkg/kv"
29-
"github.com/cockroachdb/cockroach/pkg/kv/bulk"
3029
"github.com/cockroachdb/cockroach/pkg/kv/kvclient/rangefeed"
3130
"github.com/cockroachdb/cockroach/pkg/kv/kvpb"
3231
"github.com/cockroachdb/cockroach/pkg/kv/kvprober"
@@ -1486,8 +1485,7 @@ func (ts *testServer) StartSharedProcessTenant(
14861485

14871486
// Disable yield AC for tenant servers in tests, for the same reason as the
14881487
// system tenant (see comment in serverutils.NewServer).
1489-
admission.YieldInPacer.Override(ctx, &sqlServer.cfg.Settings.SV, false)
1490-
bulk.YieldIfNoPacer.Override(ctx, &sqlServer.cfg.Settings.SV, false)
1488+
admission.YieldForElasticCPU.Override(ctx, &sqlServer.cfg.Settings.SV, false)
14911489

14921490
hts := &httpTestServer{}
14931491
hts.t.authentication = sqlServerWrapper.authentication
@@ -1879,8 +1877,7 @@ func (ts *testServer) StartTenant(
18791877

18801878
// Disable yield AC for tenant servers in tests, for the same reason as the
18811879
// system tenant (see comment in serverutils.NewServer).
1882-
admission.YieldInPacer.Override(ctx, &st.SV, false)
1883-
bulk.YieldIfNoPacer.Override(ctx, &st.SV, false)
1880+
admission.YieldForElasticCPU.Override(ctx, &st.SV, false)
18841881

18851882
hts := &httpTestServer{}
18861883
hts.t.authentication = sw.authentication

pkg/testutils/serverutils/BUILD.bazel

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -24,7 +24,6 @@ go_library(
2424
"//pkg/config/zonepb",
2525
"//pkg/keys",
2626
"//pkg/kv",
27-
"//pkg/kv/bulk",
2827
"//pkg/kv/kvpb",
2928
"//pkg/kv/kvprober",
3029
"//pkg/kv/kvserver/liveness/livenesspb",

pkg/testutils/serverutils/test_server_shim.go

Lines changed: 1 addition & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -22,7 +22,6 @@ import (
2222

2323
"github.com/cockroachdb/cockroach/pkg/base"
2424
"github.com/cockroachdb/cockroach/pkg/kv"
25-
"github.com/cockroachdb/cockroach/pkg/kv/bulk"
2625
"github.com/cockroachdb/cockroach/pkg/multitenant/tenantcapabilitiespb"
2726
"github.com/cockroachdb/cockroach/pkg/roachpb"
2827
"github.com/cockroachdb/cockroach/pkg/security/securitytest"
@@ -427,8 +426,7 @@ func NewServer(params base.TestServerArgs) (TestServerInterface, error) {
427426
// system components/rangefeeds/etc of several testservers/tenants is enough
428427
// to have a perpetually non-empty runnable queue in CIs for 10+ minutes. As
429428
// such, we have to disable yield AC if we want background work to run at all.
430-
admission.YieldInPacer.Override(context.Background(), &srv.(TestServerInterfaceRaw).ClusterSettings().SV, false)
431-
bulk.YieldIfNoPacer.Override(context.Background(), &srv.(TestServerInterfaceRaw).ClusterSettings().SV, false)
429+
admission.YieldForElasticCPU.Override(context.Background(), &srv.(TestServerInterfaceRaw).ClusterSettings().SV, false)
432430
srv = wrapTestServer(srv.(TestServerInterfaceRaw), tcfg)
433431
return srv.(TestServerInterface), nil
434432
}

pkg/util/admission/BUILD.bazel

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -24,6 +24,7 @@ go_library(
2424
"testing_knobs.go",
2525
"tokens_linear_model.go",
2626
"work_queue.go",
27+
"yield.go",
2728
],
2829
importpath = "github.com/cockroachdb/cockroach/pkg/util/admission",
2930
visibility = ["//visibility:public"],

pkg/util/admission/elastic_cpu_grant_coordinator.go

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -84,10 +84,10 @@ func (e *ElasticCPUGrantCoordinator) tryGrant() {
8484
e.elasticCPUGranter.tryGrant()
8585
}
8686

87-
// YieldInPacer is exported so it can be overridden in tests.
88-
var YieldInPacer = settings.RegisterBoolSetting(
87+
// YieldForElasticCPU is exported so it can be overridden in tests.
88+
var YieldForElasticCPU = settings.RegisterBoolSetting(
8989
settings.ApplicationLevel,
90-
"admission.elastic_cpu.yield_in_pacer.enabled",
90+
"admission.elastic_cpu.yield.enabled",
9191
"when true, time-based elastic CPU pacing additionally yields CPU as-needed according to the scheduler",
9292
true,
9393
)
@@ -101,6 +101,6 @@ func (e *ElasticCPUGrantCoordinator) NewPacer(unit time.Duration, wi WorkInfo) *
101101
unit: unit,
102102
wi: wi,
103103
wq: e.ElasticCPUWorkQueue,
104-
Yield: YieldInPacer.Get(&e.ElasticCPUWorkQueue.settings.SV),
104+
yield: YieldForElasticCPU.Get(&e.ElasticCPUWorkQueue.settings.SV),
105105
}
106106
}

pkg/util/admission/elastic_cpu_granter.go

Lines changed: 35 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -8,6 +8,7 @@ package admission
88
import (
99
"context"
1010
"runtime"
11+
"sync/atomic"
1112
"time"
1213

1314
"github.com/cockroachdb/cockroach/pkg/base"
@@ -250,14 +251,26 @@ func (e *elasticCPUGranter) computeUtilizationMetric() {
250251
currentCumAcquiredNanos := e.metrics.AcquiredNanos.Count()
251252
currentCumUsedNanos := currentCumAcquiredNanos - currentCumReturnedNanos
252253

254+
intervalUsedFracFunc := func(used int64) float64 {
255+
return float64(used) /
256+
(float64(e.metrics.MaxAvailableNanos.Count()) * elasticCPUUtilizationMetricInterval.Seconds())
257+
}
253258
if e.metrics.lastCumUsedNanos != 0 {
254259
intervalUsedNanos := currentCumUsedNanos - e.metrics.lastCumUsedNanos
255-
intervalUsedPercent := float64(intervalUsedNanos) /
256-
(float64(e.metrics.MaxAvailableNanos.Count()) * elasticCPUUtilizationMetricInterval.Seconds())
257-
e.metrics.Utilization.Update(intervalUsedPercent)
258-
e.metrics.lastCumUsedNanos = currentCumUsedNanos
260+
intervalUsedFrac := intervalUsedFracFunc(intervalUsedNanos)
261+
e.metrics.Utilization.Update(intervalUsedFrac)
259262
}
260263
e.metrics.lastCumUsedNanos = currentCumUsedNanos
264+
265+
// Compute the utilization of work that bypassed admission.
266+
curBypassedNanos := e.metrics.bypassedAdmissionCumNanos.Load()
267+
if curBypassedNanos != 0 {
268+
intervalBypassedNanos := curBypassedNanos - e.metrics.lastBypassedAdmissionCumNanos
269+
intervalBypassedFrac := intervalUsedFracFunc(intervalBypassedNanos)
270+
e.metrics.BypassedAdmissionUtilization.Update(intervalBypassedFrac)
271+
272+
}
273+
e.metrics.lastBypassedAdmissionCumNanos = curBypassedNanos
261274
}
262275

263276
// TODO(irfansharif): Provide separate enums for different elastic CPU token
@@ -324,6 +337,13 @@ var ( // granter-side metrics (some of these have parallels on the requester sid
324337
Unit: metric.Unit_PERCENT,
325338
}
326339

340+
elasticCPUGranterBypassedUtilization = metric.Metadata{
341+
Name: "admission.elastic_cpu_bypassed.utilization",
342+
Help: "CPU utilization by elastic work that bypassed admission",
343+
Measurement: "CPU Time",
344+
Unit: metric.Unit_PERCENT,
345+
}
346+
327347
elasticCPUGranterUtilizationLimit = metric.Metadata{
328348
Name: "admission.elastic_cpu.utilization_limit",
329349
Help: "Utilization limit set for the elastic CPU work",
@@ -344,9 +364,15 @@ type elasticCPUGranterMetrics struct {
344364
NanosExhaustedDuration *metric.Counter
345365
OverLimitDuration metric.IHistogram
346366

347-
Utilization *metric.GaugeFloat64 // updated every elasticCPUUtilizationMetricInterval, using fields below
367+
Utilization *metric.GaugeFloat64 // updated every elasticCPUUtilizationMetricInterval, using fields below
368+
BypassedAdmissionUtilization *metric.GaugeFloat64
369+
348370
everyInterval util.EveryN[crtime.Mono]
349371
lastCumUsedNanos int64
372+
373+
// Used for computing BypassedAdmissionUtilization.
374+
bypassedAdmissionCumNanos atomic.Int64
375+
lastBypassedAdmissionCumNanos int64
350376
}
351377

352378
const elasticCPUUtilizationMetricInterval = 10 * time.Second
@@ -365,9 +391,10 @@ func makeElasticCPUGranterMetrics() *elasticCPUGranterMetrics {
365391
Duration: base.DefaultHistogramWindowInterval(),
366392
BucketConfig: metric.IOLatencyBuckets,
367393
}),
368-
Utilization: metric.NewGaugeFloat64(elasticCPUGranterUtilization),
369-
UtilizationLimit: metric.NewGaugeFloat64(elasticCPUGranterUtilizationLimit),
370-
everyInterval: util.EveryMono(elasticCPUUtilizationMetricInterval),
394+
Utilization: metric.NewGaugeFloat64(elasticCPUGranterUtilization),
395+
BypassedAdmissionUtilization: metric.NewGaugeFloat64(elasticCPUGranterBypassedUtilization),
396+
UtilizationLimit: metric.NewGaugeFloat64(elasticCPUGranterUtilizationLimit),
397+
everyInterval: util.EveryMono(elasticCPUUtilizationMetricInterval),
371398
}
372399

373400
metrics.MaxAvailableNanos.Inc(int64(runtime.GOMAXPROCS(0)) * time.Second.Nanoseconds())

pkg/util/admission/elastic_cpu_work_handle.go

Lines changed: 18 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -30,6 +30,10 @@ type ElasticCPUWorkHandle struct {
3030
// duration to count against what it was allotted, but we still want to
3131
// track to deduct an appropriate number of granter tokens.
3232
preWork time.Duration
33+
// yield decides whether Overlimit will call runtime.Yield.
34+
yield bool
35+
// bypassedAdmission is true if the allotment happened without waiting.
36+
bypassedAdmission bool
3337

3438
// This handle is used in tight loops that are sensitive to per-iteration
3539
// overhead (checking against the running time too can have an effect). To
@@ -49,9 +53,10 @@ type ElasticCPUWorkHandle struct {
4953
}
5054

5155
func newElasticCPUWorkHandle(
52-
tenantID roachpb.TenantID, allotted time.Duration,
56+
tenantID roachpb.TenantID, allotted time.Duration, yield bool, bypassedAdmission bool,
5357
) *ElasticCPUWorkHandle {
54-
h := &ElasticCPUWorkHandle{tenantID: tenantID, allotted: allotted}
58+
h := &ElasticCPUWorkHandle{
59+
tenantID: tenantID, allotted: allotted, yield: yield, bypassedAdmission: bypassedAdmission}
5560
h.cpuStart = grunning.Time()
5661
return h
5762
}
@@ -85,6 +90,13 @@ func (h *ElasticCPUWorkHandle) runningTime() time.Duration {
8590
return grunning.Elapsed(h.cpuStart, grunning.Time())
8691
}
8792

93+
// SetYield sets the behavior of whether to call runtime.Yield. It should only
94+
// be used when it is not possible to set the value correctly at construction
95+
// time.
96+
func (h *ElasticCPUWorkHandle) SetYield(yield bool) {
97+
h.yield = yield
98+
}
99+
88100
// OverLimit is used to check whether we're over the allotted elastic CPU
89101
// tokens. If StartTimer was invoked, we start measuring on-CPU time only after
90102
// the invocation. It also returns the total time difference between how long we
@@ -103,7 +115,9 @@ func (h *ElasticCPUWorkHandle) OverLimit() (overLimit bool, difference time.Dura
103115
if h == nil { // not applicable
104116
return false, time.Duration(0)
105117
}
106-
118+
if h.yield {
119+
runtimeYield()
120+
}
107121
// What we're effectively doing is just:
108122
//
109123
// runningTime := h.runningTime()
@@ -190,7 +204,7 @@ func ElasticCPUWorkHandleFromContext(ctx context.Context) *ElasticCPUWorkHandle
190204
// TestingNewElasticCPUHandle exports the ElasticCPUWorkHandle constructor for
191205
// testing purposes.
192206
func TestingNewElasticCPUHandle() *ElasticCPUWorkHandle {
193-
return newElasticCPUWorkHandle(roachpb.SystemTenantID, 420*time.Hour) // use a very high allotment
207+
return newElasticCPUWorkHandle(roachpb.SystemTenantID, 420*time.Hour, false, false) // use a very high allotment
194208
}
195209

196210
// TestingNewElasticCPUHandleWithCallback constructs an ElasticCPUWorkHandle

0 commit comments

Comments
 (0)