Skip to content

Commit d7cdba0

Browse files
craig[bot]sumeerbhola
andcommitted
Merge #150333
150333: admission: move some grant coordinators to their own source file r=wenyihu6 a=sumeerbhola Specifically ElasticCPUGrantCoordinator and StoreGrantCoordinators are moved. And there is some tiny renaming and addition of commentary. This is a precursor to simplifying how StoreGrantCoordinators is implemented, since it doesn't use most of the functionality in GrantCoordinator (as it doesn't need to manange multiple granters). Epic: none Release note: None Co-authored-by: sumeerbhola <[email protected]>
2 parents 9272e28 + 1d11d14 commit d7cdba0

File tree

7 files changed

+471
-434
lines changed

7 files changed

+471
-434
lines changed

pkg/server/server.go

Lines changed: 10 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -602,11 +602,11 @@ func NewServer(cfg Config, stopper *stop.Stopper) (serverctl.ServerStartupInterf
602602
storesForRACv2,
603603
admissionKnobs,
604604
)
605-
db.SQLKVResponseAdmissionQ = gcoords.Regular.GetWorkQueue(admission.SQLKVResponseWork)
606-
db.AdmissionPacerFactory = gcoords.Elastic
605+
db.SQLKVResponseAdmissionQ = gcoords.RegularCPU.GetWorkQueue(admission.SQLKVResponseWork)
606+
db.AdmissionPacerFactory = gcoords.ElasticCPU
607607
goschedstats.RegisterSettings(st)
608608
if goschedstats.Supported {
609-
cbID := goschedstats.RegisterRunnableCountCallback(gcoords.Regular.CPULoad)
609+
cbID := goschedstats.RegisterRunnableCountCallback(gcoords.RegularCPU.CPULoad)
610610
stopper.AddCloser(stop.CloserFn(func() {
611611
goschedstats.UnregisterRunnableCountCallback(cbID)
612612
}))
@@ -620,12 +620,12 @@ func NewServer(cfg Config, stopper *stop.Stopper) (serverctl.ServerStartupInterf
620620
kvAdmissionController kvadmission.Controller
621621
racHandles kvflowcontrol.ReplicationAdmissionHandles
622622
}
623-
admissionControl.schedulerLatencyListener = gcoords.Elastic.SchedulerLatencyListener
623+
admissionControl.schedulerLatencyListener = gcoords.ElasticCPU.SchedulerLatencyListener
624624
admissionControl.racHandles = kvserver.MakeRACHandles(stores)
625625
admissionControl.kvAdmissionController = kvadmission.MakeController(
626626
nodeIDContainer,
627-
gcoords.Regular.GetWorkQueue(admission.KVWork),
628-
gcoords.Elastic,
627+
gcoords.RegularCPU.GetWorkQueue(admission.KVWork),
628+
gcoords.ElasticCPU,
629629
gcoords.Stores,
630630
admissionControl.racHandles,
631631
cfg.Settings,
@@ -984,8 +984,8 @@ func NewServer(cfg Config, stopper *stop.Stopper) (serverctl.ServerStartupInterf
984984
txnMetrics,
985985
stores,
986986
cfg.ClusterIDContainer,
987-
gcoords.Regular.GetWorkQueue(admission.KVWork),
988-
gcoords.Elastic,
987+
gcoords.RegularCPU.GetWorkQueue(admission.KVWork),
988+
gcoords.ElasticCPU,
989989
gcoords.Stores,
990990
tenantUsage,
991991
tenantSettingsWatcher,
@@ -1176,7 +1176,7 @@ func NewServer(cfg Config, stopper *stop.Stopper) (serverctl.ServerStartupInterf
11761176
externalStorage: externalStorage,
11771177
externalStorageFromURI: externalStorageFromURI,
11781178
isMeta1Leaseholder: node.stores.IsMeta1Leaseholder,
1179-
sqlSQLResponseAdmissionQ: gcoords.Regular.GetWorkQueue(admission.SQLSQLResponseWork),
1179+
sqlSQLResponseAdmissionQ: gcoords.RegularCPU.GetWorkQueue(admission.SQLSQLResponseWork),
11801180
spanConfigKVAccessor: spanConfig.kvAccessorForTenantRecords,
11811181
kvStoresIterator: kvserver.MakeStoresIterator(node.stores),
11821182
inspectzServer: inspectzServer,
@@ -1213,7 +1213,7 @@ func NewServer(cfg Config, stopper *stop.Stopper) (serverctl.ServerStartupInterf
12131213
tenantUsageServer: tenantUsage,
12141214
monitorAndMetrics: sqlMonitorAndMetrics,
12151215
settingsStorage: settingsWriter,
1216-
admissionPacerFactory: gcoords.Elastic,
1216+
admissionPacerFactory: gcoords.ElasticCPU,
12171217
rangeDescIteratorFactory: rangedesc.NewIteratorFactory(db),
12181218
tenantCapabilitiesReader: sql.MakeSystemTenantOnly[tenantcapabilities.Reader](tenantCapabilitiesWatcher),
12191219
})

pkg/util/admission/BUILD.bazel

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -5,6 +5,7 @@ go_library(
55
srcs = [
66
"admission.go",
77
"disk_bandwidth.go",
8+
"elastic_cpu_grant_coordinator.go",
89
"elastic_cpu_granter.go",
910
"elastic_cpu_work_handle.go",
1011
"elastic_cpu_work_queue.go",
@@ -16,6 +17,7 @@ go_library(
1617
"scheduler_latency_listener.go",
1718
"sequencer.go",
1819
"snapshot_queue.go",
20+
"store_grant_coordinator.go",
1921
"store_token_estimation.go",
2022
"testing_knobs.go",
2123
"tokens_linear_model.go",
Lines changed: 96 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,96 @@
1+
// Copyright 2025 The Cockroach Authors.
2+
//
3+
// Use of this software is governed by the CockroachDB Software License
4+
// included in the /LICENSE file.
5+
6+
package admission
7+
8+
import (
9+
"time"
10+
11+
"github.com/cockroachdb/cockroach/pkg/settings/cluster"
12+
"github.com/cockroachdb/cockroach/pkg/util/admission/admissionpb"
13+
"github.com/cockroachdb/cockroach/pkg/util/log"
14+
"github.com/cockroachdb/cockroach/pkg/util/metric"
15+
)
16+
17+
func makeElasticCPUGrantCoordinator(
18+
ambientCtx log.AmbientContext, st *cluster.Settings, registry *metric.Registry,
19+
) *ElasticCPUGrantCoordinator {
20+
schedulerLatencyListenerMetrics := makeSchedulerLatencyListenerMetrics()
21+
registry.AddMetricStruct(schedulerLatencyListenerMetrics)
22+
elasticCPUGranterMetrics := makeElasticCPUGranterMetrics()
23+
registry.AddMetricStruct(elasticCPUGranterMetrics)
24+
25+
elasticWorkQueueMetrics := makeWorkQueueMetrics("elastic-cpu", registry,
26+
admissionpb.BulkNormalPri, admissionpb.NormalPri)
27+
28+
elasticCPUGranter := newElasticCPUGranter(ambientCtx, st, elasticCPUGranterMetrics)
29+
schedulerLatencyListener := newSchedulerLatencyListener(ambientCtx, st, schedulerLatencyListenerMetrics, elasticCPUGranter)
30+
31+
elasticCPUInternalWorkQueue := &WorkQueue{}
32+
initWorkQueue(elasticCPUInternalWorkQueue, ambientCtx, KVWork, "kv-elastic-cpu-queue", elasticCPUGranter, st,
33+
elasticWorkQueueMetrics,
34+
workQueueOptions{usesTokens: true}, nil /* knobs */) // will be closed by the embedding *ElasticCPUWorkQueue
35+
elasticCPUWorkQueue := makeElasticCPUWorkQueue(st, elasticCPUInternalWorkQueue, elasticCPUGranter, elasticCPUGranterMetrics)
36+
elasticCPUGrantCoordinator := newElasticCPUGrantCoordinator(elasticCPUGranter, elasticCPUWorkQueue, schedulerLatencyListener)
37+
elasticCPUGranter.setRequester(elasticCPUInternalWorkQueue)
38+
schedulerLatencyListener.setCoord(elasticCPUGrantCoordinator)
39+
return elasticCPUGrantCoordinator
40+
}
41+
42+
// ElasticCPUGrantCoordinator coordinates grants for elastic CPU tokens, it has
43+
// a single granter-requester pair. Since it's used for elastic CPU work, and
44+
// the total allotment of CPU available for such work is reduced before getting
45+
// close to CPU saturation (we observe 1ms+ p99 scheduling latencies when
46+
// running at 65% utilization on 8vCPU machines, which is enough to affect
47+
// foreground latencies), we don't want it to serve as a gatekeeper for
48+
// SQL-level admission. All this informs why its structured as a separate grant
49+
// coordinator.
50+
//
51+
// TODO(irfansharif): Ideally we wouldn't use this separate
52+
// ElasticGrantCoordinator and just make this part of the one GrantCoordinator
53+
// above but given we're dealing with a different workClass (elasticWorkClass)
54+
// but for an existing WorkKind (KVWork), and not all APIs on the grant
55+
// coordinator currently segment across the two, it was easier to copy over some
56+
// of the mediating code instead (grant chains also don't apply in this scheme).
57+
// Try to do something better here and revisit the existing abstractions; see
58+
// github.com/cockroachdb/cockroach/pull/86638#pullrequestreview-1084437330.
59+
type ElasticCPUGrantCoordinator struct {
60+
SchedulerLatencyListener SchedulerLatencyListener
61+
ElasticCPUWorkQueue *ElasticCPUWorkQueue
62+
elasticCPUGranter *elasticCPUGranter
63+
}
64+
65+
func newElasticCPUGrantCoordinator(
66+
elasticCPUGranter *elasticCPUGranter,
67+
elasticCPUWorkQueue *ElasticCPUWorkQueue,
68+
listener *schedulerLatencyListener,
69+
) *ElasticCPUGrantCoordinator {
70+
return &ElasticCPUGrantCoordinator{
71+
elasticCPUGranter: elasticCPUGranter,
72+
ElasticCPUWorkQueue: elasticCPUWorkQueue,
73+
SchedulerLatencyListener: listener,
74+
}
75+
}
76+
77+
func (e *ElasticCPUGrantCoordinator) close() {
78+
e.ElasticCPUWorkQueue.close()
79+
}
80+
81+
// tryGrant is used to attempt to grant to waiting requests.
82+
func (e *ElasticCPUGrantCoordinator) tryGrant() {
83+
e.elasticCPUGranter.tryGrant()
84+
}
85+
86+
// NewPacer implements the PacerMaker interface.
87+
func (e *ElasticCPUGrantCoordinator) NewPacer(unit time.Duration, wi WorkInfo) *Pacer {
88+
if e == nil {
89+
return nil
90+
}
91+
return &Pacer{
92+
unit: unit,
93+
wi: wi,
94+
wq: e.ElasticCPUWorkQueue,
95+
}
96+
}

0 commit comments

Comments
 (0)