Skip to content

Commit 1d11d14

Browse files
committed
admission: move some grant coordinators to their own source file
Specifically ElasticCPUGrantCoordinator and StoreGrantCoordinators are moved. And there is some tiny renaming and addition of commentary. This is a precursor to simplifying how StoreGrantCoordinators is implemented, since it doesn't use most of the functionality in GrantCoordinator (as it doesn't need to manange multiple granters). Epic: none Release note: None
1 parent cad9544 commit 1d11d14

File tree

7 files changed

+471
-434
lines changed

7 files changed

+471
-434
lines changed

pkg/server/server.go

Lines changed: 10 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -600,11 +600,11 @@ func NewServer(cfg Config, stopper *stop.Stopper) (serverctl.ServerStartupInterf
600600
storesForRACv2,
601601
admissionKnobs,
602602
)
603-
db.SQLKVResponseAdmissionQ = gcoords.Regular.GetWorkQueue(admission.SQLKVResponseWork)
604-
db.AdmissionPacerFactory = gcoords.Elastic
603+
db.SQLKVResponseAdmissionQ = gcoords.RegularCPU.GetWorkQueue(admission.SQLKVResponseWork)
604+
db.AdmissionPacerFactory = gcoords.ElasticCPU
605605
goschedstats.RegisterSettings(st)
606606
if goschedstats.Supported {
607-
cbID := goschedstats.RegisterRunnableCountCallback(gcoords.Regular.CPULoad)
607+
cbID := goschedstats.RegisterRunnableCountCallback(gcoords.RegularCPU.CPULoad)
608608
stopper.AddCloser(stop.CloserFn(func() {
609609
goschedstats.UnregisterRunnableCountCallback(cbID)
610610
}))
@@ -618,12 +618,12 @@ func NewServer(cfg Config, stopper *stop.Stopper) (serverctl.ServerStartupInterf
618618
kvAdmissionController kvadmission.Controller
619619
racHandles kvflowcontrol.ReplicationAdmissionHandles
620620
}
621-
admissionControl.schedulerLatencyListener = gcoords.Elastic.SchedulerLatencyListener
621+
admissionControl.schedulerLatencyListener = gcoords.ElasticCPU.SchedulerLatencyListener
622622
admissionControl.racHandles = kvserver.MakeRACHandles(stores)
623623
admissionControl.kvAdmissionController = kvadmission.MakeController(
624624
nodeIDContainer,
625-
gcoords.Regular.GetWorkQueue(admission.KVWork),
626-
gcoords.Elastic,
625+
gcoords.RegularCPU.GetWorkQueue(admission.KVWork),
626+
gcoords.ElasticCPU,
627627
gcoords.Stores,
628628
admissionControl.racHandles,
629629
cfg.Settings,
@@ -973,8 +973,8 @@ func NewServer(cfg Config, stopper *stop.Stopper) (serverctl.ServerStartupInterf
973973
txnMetrics,
974974
stores,
975975
cfg.ClusterIDContainer,
976-
gcoords.Regular.GetWorkQueue(admission.KVWork),
977-
gcoords.Elastic,
976+
gcoords.RegularCPU.GetWorkQueue(admission.KVWork),
977+
gcoords.ElasticCPU,
978978
gcoords.Stores,
979979
tenantUsage,
980980
tenantSettingsWatcher,
@@ -1165,7 +1165,7 @@ func NewServer(cfg Config, stopper *stop.Stopper) (serverctl.ServerStartupInterf
11651165
externalStorage: externalStorage,
11661166
externalStorageFromURI: externalStorageFromURI,
11671167
isMeta1Leaseholder: node.stores.IsMeta1Leaseholder,
1168-
sqlSQLResponseAdmissionQ: gcoords.Regular.GetWorkQueue(admission.SQLSQLResponseWork),
1168+
sqlSQLResponseAdmissionQ: gcoords.RegularCPU.GetWorkQueue(admission.SQLSQLResponseWork),
11691169
spanConfigKVAccessor: spanConfig.kvAccessorForTenantRecords,
11701170
kvStoresIterator: kvserver.MakeStoresIterator(node.stores),
11711171
inspectzServer: inspectzServer,
@@ -1202,7 +1202,7 @@ func NewServer(cfg Config, stopper *stop.Stopper) (serverctl.ServerStartupInterf
12021202
tenantUsageServer: tenantUsage,
12031203
monitorAndMetrics: sqlMonitorAndMetrics,
12041204
settingsStorage: settingsWriter,
1205-
admissionPacerFactory: gcoords.Elastic,
1205+
admissionPacerFactory: gcoords.ElasticCPU,
12061206
rangeDescIteratorFactory: rangedesc.NewIteratorFactory(db),
12071207
tenantCapabilitiesReader: sql.MakeSystemTenantOnly[tenantcapabilities.Reader](tenantCapabilitiesWatcher),
12081208
})

pkg/util/admission/BUILD.bazel

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -5,6 +5,7 @@ go_library(
55
srcs = [
66
"admission.go",
77
"disk_bandwidth.go",
8+
"elastic_cpu_grant_coordinator.go",
89
"elastic_cpu_granter.go",
910
"elastic_cpu_work_handle.go",
1011
"elastic_cpu_work_queue.go",
@@ -16,6 +17,7 @@ go_library(
1617
"scheduler_latency_listener.go",
1718
"sequencer.go",
1819
"snapshot_queue.go",
20+
"store_grant_coordinator.go",
1921
"store_token_estimation.go",
2022
"testing_knobs.go",
2123
"tokens_linear_model.go",
Lines changed: 96 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,96 @@
1+
// Copyright 2025 The Cockroach Authors.
2+
//
3+
// Use of this software is governed by the CockroachDB Software License
4+
// included in the /LICENSE file.
5+
6+
package admission
7+
8+
import (
9+
"time"
10+
11+
"github.com/cockroachdb/cockroach/pkg/settings/cluster"
12+
"github.com/cockroachdb/cockroach/pkg/util/admission/admissionpb"
13+
"github.com/cockroachdb/cockroach/pkg/util/log"
14+
"github.com/cockroachdb/cockroach/pkg/util/metric"
15+
)
16+
17+
func makeElasticCPUGrantCoordinator(
18+
ambientCtx log.AmbientContext, st *cluster.Settings, registry *metric.Registry,
19+
) *ElasticCPUGrantCoordinator {
20+
schedulerLatencyListenerMetrics := makeSchedulerLatencyListenerMetrics()
21+
registry.AddMetricStruct(schedulerLatencyListenerMetrics)
22+
elasticCPUGranterMetrics := makeElasticCPUGranterMetrics()
23+
registry.AddMetricStruct(elasticCPUGranterMetrics)
24+
25+
elasticWorkQueueMetrics := makeWorkQueueMetrics("elastic-cpu", registry,
26+
admissionpb.BulkNormalPri, admissionpb.NormalPri)
27+
28+
elasticCPUGranter := newElasticCPUGranter(ambientCtx, st, elasticCPUGranterMetrics)
29+
schedulerLatencyListener := newSchedulerLatencyListener(ambientCtx, st, schedulerLatencyListenerMetrics, elasticCPUGranter)
30+
31+
elasticCPUInternalWorkQueue := &WorkQueue{}
32+
initWorkQueue(elasticCPUInternalWorkQueue, ambientCtx, KVWork, "kv-elastic-cpu-queue", elasticCPUGranter, st,
33+
elasticWorkQueueMetrics,
34+
workQueueOptions{usesTokens: true}, nil /* knobs */) // will be closed by the embedding *ElasticCPUWorkQueue
35+
elasticCPUWorkQueue := makeElasticCPUWorkQueue(st, elasticCPUInternalWorkQueue, elasticCPUGranter, elasticCPUGranterMetrics)
36+
elasticCPUGrantCoordinator := newElasticCPUGrantCoordinator(elasticCPUGranter, elasticCPUWorkQueue, schedulerLatencyListener)
37+
elasticCPUGranter.setRequester(elasticCPUInternalWorkQueue)
38+
schedulerLatencyListener.setCoord(elasticCPUGrantCoordinator)
39+
return elasticCPUGrantCoordinator
40+
}
41+
42+
// ElasticCPUGrantCoordinator coordinates grants for elastic CPU tokens, it has
43+
// a single granter-requester pair. Since it's used for elastic CPU work, and
44+
// the total allotment of CPU available for such work is reduced before getting
45+
// close to CPU saturation (we observe 1ms+ p99 scheduling latencies when
46+
// running at 65% utilization on 8vCPU machines, which is enough to affect
47+
// foreground latencies), we don't want it to serve as a gatekeeper for
48+
// SQL-level admission. All this informs why its structured as a separate grant
49+
// coordinator.
50+
//
51+
// TODO(irfansharif): Ideally we wouldn't use this separate
52+
// ElasticGrantCoordinator and just make this part of the one GrantCoordinator
53+
// above but given we're dealing with a different workClass (elasticWorkClass)
54+
// but for an existing WorkKind (KVWork), and not all APIs on the grant
55+
// coordinator currently segment across the two, it was easier to copy over some
56+
// of the mediating code instead (grant chains also don't apply in this scheme).
57+
// Try to do something better here and revisit the existing abstractions; see
58+
// github.com/cockroachdb/cockroach/pull/86638#pullrequestreview-1084437330.
59+
type ElasticCPUGrantCoordinator struct {
60+
SchedulerLatencyListener SchedulerLatencyListener
61+
ElasticCPUWorkQueue *ElasticCPUWorkQueue
62+
elasticCPUGranter *elasticCPUGranter
63+
}
64+
65+
func newElasticCPUGrantCoordinator(
66+
elasticCPUGranter *elasticCPUGranter,
67+
elasticCPUWorkQueue *ElasticCPUWorkQueue,
68+
listener *schedulerLatencyListener,
69+
) *ElasticCPUGrantCoordinator {
70+
return &ElasticCPUGrantCoordinator{
71+
elasticCPUGranter: elasticCPUGranter,
72+
ElasticCPUWorkQueue: elasticCPUWorkQueue,
73+
SchedulerLatencyListener: listener,
74+
}
75+
}
76+
77+
func (e *ElasticCPUGrantCoordinator) close() {
78+
e.ElasticCPUWorkQueue.close()
79+
}
80+
81+
// tryGrant is used to attempt to grant to waiting requests.
82+
func (e *ElasticCPUGrantCoordinator) tryGrant() {
83+
e.elasticCPUGranter.tryGrant()
84+
}
85+
86+
// NewPacer implements the PacerMaker interface.
87+
func (e *ElasticCPUGrantCoordinator) NewPacer(unit time.Duration, wi WorkInfo) *Pacer {
88+
if e == nil {
89+
return nil
90+
}
91+
return &Pacer{
92+
unit: unit,
93+
wi: wi,
94+
wq: e.ElasticCPUWorkQueue,
95+
}
96+
}

0 commit comments

Comments
 (0)