Skip to content

Commit 314400c

Browse files
committed
admission: add getterQualification to granter/requester interfaces
This is in preparation for the CPU time token scheme, where the WorkQueue will distinguish between burstable and non-burstable tenants. This is just an interface change, with no behavioral change. Informs #153591 Epic: none Release note: None
1 parent 7d65e38 commit 314400c

10 files changed

+62
-41
lines changed

pkg/util/admission/admission.go

Lines changed: 25 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -129,13 +129,28 @@ import (
129129
"github.com/cockroachdb/pebble"
130130
)
131131

132+
// burstQualification is an optional behavior of certain WorkQueues (which
133+
// implement requester), that differentiate between tenants that are qualified
134+
// to burst (in their token consumption) and those that are not. This is a
135+
// dynamic attribute of a tenant, based on token consumption history
136+
// maintained in the WorkQueue. The ordering of tenants is also affected in
137+
// that burstable tenants are ordered before non-burstable tenants.
138+
type burstQualification uint8
139+
140+
const (
141+
canBurst burstQualification = iota
142+
noBurst
143+
numBurstQualifications
144+
)
145+
132146
// requester is an interface implemented by an object that orders admission
133147
// work for a particular WorkKind. See WorkQueue for the implementation of
134148
// requester.
135149
type requester interface {
136150
// hasWaitingRequests returns whether there are any waiting/queued requests
137-
// of this WorkKind.
138-
hasWaitingRequests() bool
151+
// of this WorkKind, and when true, the qualification of the highest
152+
// importance getter.
153+
hasWaitingRequests() (bool, burstQualification)
139154
// granted is called by a granter to grant admission to a single queued
140155
// request. It returns > 0 if the grant was accepted, else returns 0. A
141156
// grant may not be accepted if the grant raced with request cancellation
@@ -155,12 +170,15 @@ type granter interface {
155170
grantKind() grantKind
156171
// tryGet is used by a requester to get slots/tokens for a piece of work
157172
// that has encountered no waiting/queued work. This is the fast path that
158-
// avoids queueing in the requester.
173+
// avoids queueing in the requester. The optional parameter
174+
// burstQualification identifies the qualification of the getter, which is
175+
// useful for certain granters.
159176
//
160177
// REQUIRES: count > 0. count == 1 for slots.
161-
tryGet(count int64) (granted bool)
178+
tryGet(getterQual burstQualification, count int64) (granted bool)
162179
// returnGrant is called for:
163180
// - returning slots after use.
181+
// - returning tokens after use, if all the granted tokens were not used.
164182
// - returning either slots or tokens when the grant raced with the work
165183
// being canceled, and the grantee did not end up doing any work.
166184
//
@@ -197,7 +215,9 @@ type granter interface {
197215
// the grantee after its goroutine runs and notices that it has been granted
198216
// a slot/tokens. This provides a natural throttling that reduces grant
199217
// bursts by taking into immediate account the capability of the goroutine
200-
// scheduler to schedule such work.
218+
// scheduler to schedule such work. Grant chains are only used for the CPU
219+
// resource in the hybrid slot/token scheme, where slots are used for KVWork
220+
// and tokens for SQLKVResponseWork and SQLSQLResponseWork.
201221
//
202222
// In an experiment, using such grant chains reduced burstiness of grants by
203223
// 5x and shifted ~2s of latency (at p99) from the scheduler into admission

pkg/util/admission/elastic_cpu_granter.go

Lines changed: 4 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -146,7 +146,7 @@ func (e *elasticCPUGranter) grantKind() grantKind {
146146
}
147147

148148
// tryGet implements granter.
149-
func (e *elasticCPUGranter) tryGet(count int64) (granted bool) {
149+
func (e *elasticCPUGranter) tryGet(_ burstQualification, count int64) (granted bool) {
150150
e.mu.Lock()
151151
defer e.mu.Unlock()
152152

@@ -182,7 +182,7 @@ func (e *elasticCPUGranter) continueGrantChain(grantChainID) {
182182

183183
// tryGrant is used to attempt to grant to waiting requests.
184184
func (e *elasticCPUGranter) tryGrant() {
185-
for e.requester.hasWaitingRequests() && e.tryGet(1) {
185+
for e.hasWaitingRequests() && e.tryGet(canBurst /*arbitrary*/, 1) {
186186
tokens := e.requester.granted(noGrantChain)
187187
if tokens == 0 {
188188
e.returnGrantWithoutGrantingElsewhere(1)
@@ -235,7 +235,8 @@ func (e *elasticCPUGranter) getUtilizationLimit() float64 {
235235

236236
// hasWaitingRequests is part of the elasticCPULimiter interface.
237237
func (e *elasticCPUGranter) hasWaitingRequests() bool {
238-
return e.requester.hasWaitingRequests()
238+
hasWaiting, _ := e.requester.hasWaitingRequests()
239+
return hasWaiting
239240
}
240241

241242
// computeUtilizationMetric is part of the elasticCPULimiter interface.

pkg/util/admission/elastic_cpu_granter_test.go

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -152,7 +152,7 @@ func TestElasticCPUGranter(t *testing.T) {
152152
return ""
153153

154154
case "try-get":
155-
granted := elasticCPUGranter.tryGet(duration.Nanoseconds())
155+
granted := elasticCPUGranter.tryGet(canBurst /*arbitrary*/, duration.Nanoseconds())
156156
return fmt.Sprintf("granted: %t\n", granted)
157157

158158
case "return-grant":
@@ -189,13 +189,13 @@ type testElasticCPURequester struct {
189189

190190
var _ requester = &testElasticCPURequester{}
191191

192-
func (t *testElasticCPURequester) hasWaitingRequests() bool {
192+
func (t *testElasticCPURequester) hasWaitingRequests() (bool, burstQualification) {
193193
var padding string
194194
if t.buf.Len() > 0 {
195195
padding = " "
196196
}
197197
t.buf.WriteString(fmt.Sprintf("%shas-waiting=%t ", padding, t.numWaitingRequests > 0))
198-
return t.numWaitingRequests > 0
198+
return t.numWaitingRequests > 0, canBurst /*arbitrary*/
199199
}
200200

201201
func (t *testElasticCPURequester) granted(grantChainID grantChainID) int64 {

pkg/util/admission/elastic_cpu_work_queue_test.go

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -130,7 +130,7 @@ func (t *testElasticCPUGranter) grantKind() grantKind {
130130
return token
131131
}
132132

133-
func (t *testElasticCPUGranter) tryGet(count int64) (granted bool) {
133+
func (t *testElasticCPUGranter) tryGet(_ burstQualification, count int64) (granted bool) {
134134
panic("unimplemented")
135135
}
136136

@@ -175,7 +175,7 @@ func (t *testElasticCPUInternalWorkQueue) adjustTenantUsed(
175175
}
176176
}
177177

178-
func (t *testElasticCPUInternalWorkQueue) hasWaitingRequests() bool {
178+
func (t *testElasticCPUInternalWorkQueue) hasWaitingRequests() (bool, burstQualification) {
179179
panic("unimplemented")
180180
}
181181

pkg/util/admission/granter.go

Lines changed: 12 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -53,7 +53,7 @@ func (sg *slotGranter) grantKind() grantKind {
5353
}
5454

5555
// tryGet implements granter.
56-
func (sg *slotGranter) tryGet(count int64) bool {
56+
func (sg *slotGranter) tryGet(_ burstQualification, count int64) bool {
5757
return sg.coord.tryGet(sg.workKind, count, 0 /*arbitrary*/)
5858
}
5959

@@ -122,7 +122,8 @@ func (sg *slotGranter) continueGrantChain(grantChainID grantChainID) {
122122

123123
// requesterHasWaitingRequests implements granterWithLockedCalls.
124124
func (sg *slotGranter) requesterHasWaitingRequests() bool {
125-
return sg.requester.hasWaitingRequests()
125+
hasWaiting, _ := sg.requester.hasWaitingRequests()
126+
return hasWaiting
126127
}
127128

128129
// tryGrantLocked implements granterWithLockedCalls.
@@ -193,7 +194,7 @@ func (tg *tokenGranter) grantKind() grantKind {
193194
}
194195

195196
// tryGet implements granter.
196-
func (tg *tokenGranter) tryGet(count int64) bool {
197+
func (tg *tokenGranter) tryGet(_ burstQualification, count int64) bool {
197198
return tg.coord.tryGet(tg.workKind, count, 0 /*arbitrary*/)
198199
}
199200

@@ -239,7 +240,8 @@ func (tg *tokenGranter) continueGrantChain(grantChainID grantChainID) {
239240

240241
// requesterHasWaitingRequests implements granterWithLockedCalls.
241242
func (tg *tokenGranter) requesterHasWaitingRequests() bool {
242-
return tg.requester.hasWaitingRequests()
243+
hasWaiting, _ := tg.requester.hasWaitingRequests()
244+
return hasWaiting
243245
}
244246

245247
// tryGrantLocked implements granterWithLockedCalls.
@@ -340,7 +342,7 @@ func (cg *kvStoreTokenChildGranter) grantKind() grantKind {
340342
}
341343

342344
// tryGet implements granter.
343-
func (cg *kvStoreTokenChildGranter) tryGet(count int64) bool {
345+
func (cg *kvStoreTokenChildGranter) tryGet(_ burstQualification, count int64) bool {
344346
return cg.parent.tryGet(cg.workType, count)
345347
}
346348

@@ -604,14 +606,6 @@ func (sg *kvStoreTokenGranter) subtractTokensLockedForWorkClass(
604606
}
605607
}
606608

607-
// requesterHasWaitingRequests returns whether some requester associated with
608-
// the granter has waiting requests. Used by storeGrantCoordinator.
609-
func (sg *kvStoreTokenGranter) requesterHasWaitingRequests() bool {
610-
return sg.regularRequester.hasWaitingRequests() ||
611-
sg.elasticRequester.hasWaitingRequests() ||
612-
sg.snapshotRequester.hasWaitingRequests()
613-
}
614-
615609
func (sg *kvStoreTokenGranter) tryGrant() {
616610
sg.mu.Lock()
617611
defer sg.mu.Unlock()
@@ -620,13 +614,14 @@ func (sg *kvStoreTokenGranter) tryGrant() {
620614

621615
// tryGrantLocked attempts to grant to as many requests as possible.
622616
func (sg *kvStoreTokenGranter) tryGrantLocked() {
623-
for sg.requesterHasWaitingRequests() && sg.tryGrantLockedOne() {
617+
for sg.tryGrantLockedOne() {
624618
}
625619
}
626620

627621
// tryGrantLocked is used to attempt to grant to waiting requests. Used by
628622
// storeGrantCoordinator. It successfully grants to at most one waiting
629-
// request.
623+
// request. If there are no waiting requests, or all waiters reject the grant,
624+
// it returns false.
630625
func (sg *kvStoreTokenGranter) tryGrantLockedOne() bool {
631626
// NB: We grant work in the following priority order: regular, snapshot
632627
// ingest, elastic work. Snapshot ingests are a special type of elastic work.
@@ -640,7 +635,8 @@ func (sg *kvStoreTokenGranter) tryGrantLockedOne() bool {
640635
} else if wt == admissionpb.SnapshotIngestStoreWorkType {
641636
req = sg.snapshotRequester
642637
}
643-
if req.hasWaitingRequests() {
638+
hasWaiting, _ := req.hasWaitingRequests()
639+
if hasWaiting {
644640
res := sg.tryGetLocked(1, wt)
645641
if res {
646642
tookTokenCount := req.granted(noGrantChain)

pkg/util/admission/granter_test.go

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -512,8 +512,8 @@ type testRequester struct {
512512

513513
var _ requester = &testRequester{}
514514

515-
func (tr *testRequester) hasWaitingRequests() bool {
516-
return tr.waitingRequests
515+
func (tr *testRequester) hasWaitingRequests() (bool, burstQualification) {
516+
return tr.waitingRequests, canBurst /*arbitrary*/
517517
}
518518

519519
func (tr *testRequester) granted(grantChainID grantChainID) int64 {
@@ -527,7 +527,7 @@ func (tr *testRequester) granted(grantChainID grantChainID) int64 {
527527
func (tr *testRequester) close() {}
528528

529529
func (tr *testRequester) tryGet(count int64) {
530-
rv := tr.granter.tryGet(count)
530+
rv := tr.granter.tryGet(canBurst /*arbitrary*/, count)
531531
fmt.Fprintf(tr.buf, "%s%s: tryGet(%d) returned %t\n", tr.workKind,
532532
tr.additionalID, count, rv)
533533
}

pkg/util/admission/replicated_write_admission_test.go

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -384,7 +384,7 @@ func (tg *testReplicatedWriteGranter) grantKind() grantKind {
384384
return token
385385
}
386386

387-
func (tg *testReplicatedWriteGranter) tryGet(count int64) bool {
387+
func (tg *testReplicatedWriteGranter) tryGet(_ burstQualification, count int64) bool {
388388
if count > tg.tokens {
389389
tg.buf.printf("[%s] try-get=%s available=%s => insufficient tokens",
390390
tg.wc, printTrimmedBytes(count), printTrimmedBytes(tg.tokens))
@@ -414,7 +414,8 @@ func (tg *testReplicatedWriteGranter) grant() {
414414
if tg.tokens <= 0 {
415415
return // nothing left to do
416416
}
417-
if !tg.r.hasWaitingRequests() {
417+
hasWaiting, _ := tg.r.hasWaitingRequests()
418+
if !hasWaiting {
418419
return // nothing left to do
419420
}
420421
_ = tg.r.granted(0 /* unused */)

pkg/util/admission/snapshot_queue.go

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -111,10 +111,10 @@ func makeSnapshotQueue(snapshotGranter granter, metrics *SnapshotMetrics) *Snaps
111111
var _ requester = &SnapshotQueue{}
112112
var _ snapshotRequester = &SnapshotQueue{}
113113

114-
func (s *SnapshotQueue) hasWaitingRequests() bool {
114+
func (s *SnapshotQueue) hasWaitingRequests() (bool, burstQualification) {
115115
s.mu.Lock()
116116
defer s.mu.Unlock()
117-
return !s.mu.q.Empty()
117+
return !s.mu.q.Empty(), canBurst /*arbitrary*/
118118
}
119119

120120
func (s *SnapshotQueue) granted(_ grantChainID) int64 {
@@ -160,7 +160,7 @@ func (s *SnapshotQueue) Admit(ctx context.Context, count int64) error {
160160
s.snapshotGranter.returnGrant(count)
161161
return nil
162162
}
163-
if s.snapshotGranter.tryGet(count) {
163+
if s.snapshotGranter.tryGet(canBurst /*arbitrary*/, count) {
164164
return nil
165165
}
166166
// We were unable to get tokens for admission, so we queue.

pkg/util/admission/work_queue.go

Lines changed: 6 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -639,7 +639,9 @@ func (q *WorkQueue) Admit(ctx context.Context, info WorkInfo) (enabled bool, err
639639
// We have unlocked q.mu, so another concurrent request can also do tryGet
640640
// and get ahead of this request. We don't need to be fair for such
641641
// concurrent requests.
642-
if q.granter.tryGet(info.RequestedCount) {
642+
//
643+
// TODO(sumeer): set a proper burstQualification.
644+
if q.granter.tryGet(canBurst /*arbitrary*/, info.RequestedCount) {
643645
q.metrics.incAdmitted(info.Priority)
644646
if info.ReplicatedWorkInfo.Enabled {
645647
// TODO(irfansharif): There's a race here, and could lead to
@@ -871,10 +873,11 @@ func (q *WorkQueue) AdmittedWorkDone(tenantID roachpb.TenantID, cpuTime time.Dur
871873
q.granter.returnGrant(1)
872874
}
873875

874-
func (q *WorkQueue) hasWaitingRequests() bool {
876+
func (q *WorkQueue) hasWaitingRequests() (bool, burstQualification) {
875877
q.mu.Lock()
876878
defer q.mu.Unlock()
877-
return len(q.mu.tenantHeap) > 0
879+
// TODO(sumeer): return a proper burstQualification.
880+
return len(q.mu.tenantHeap) > 0, canBurst /*arbitrary*/
878881
}
879882

880883
func (q *WorkQueue) granted(grantChainID grantChainID) int64 {

pkg/util/admission/work_queue_test.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -74,7 +74,7 @@ func (tg *testGranter) grantKind() grantKind {
7474
return tg.gk
7575
}
7676

77-
func (tg *testGranter) tryGet(count int64) bool {
77+
func (tg *testGranter) tryGet(_ burstQualification, count int64) bool {
7878
tg.buf.printf("tryGet%s: returning %t", tg.name, tg.returnValueFromTryGet)
7979
return tg.returnValueFromTryGet
8080
}

0 commit comments

Comments
 (0)