Skip to content

Commit d040043

Browse files
authored
Merge pull request kubernetes#124736 from MikeSpreitzer/exempt-borrows-more
More assertive borrowing by exempt
2 parents c4bd05d + 9aa9d3d commit d040043

File tree

2 files changed

+226
-28
lines changed

2 files changed

+226
-28
lines changed

staging/src/k8s.io/apiserver/pkg/util/flowcontrol/apf_controller.go

Lines changed: 62 additions & 28 deletions
Original file line numberDiff line numberDiff line change
@@ -204,7 +204,7 @@ type priorityLevelState struct {
204204
// reached through this pointer is mutable.
205205
pl *flowcontrol.PriorityLevelConfiguration
206206

207-
// qsCompleter holds the QueueSetCompleter derived from `config`
207+
// qsCompleter holds the QueueSetCompleter derived from `pl`
208208
// and `queues`.
209209
qsCompleter fq.QueueSetCompleter
210210

@@ -255,12 +255,12 @@ type priorityLevelState struct {
255255
type seatDemandStats struct {
256256
avg float64
257257
stdDev float64
258-
highWatermark float64
258+
highWatermark int
259259
smoothed float64
260260
}
261261

262262
func (stats *seatDemandStats) update(obs fq.IntegratorResults) {
263-
stats.highWatermark = obs.Max
263+
stats.highWatermark = int(math.Round(obs.Max))
264264
if obs.Duration <= 0 {
265265
return
266266
}
@@ -398,38 +398,63 @@ func (cfgCtlr *configController) updateBorrowing() {
398398

399399
func (cfgCtlr *configController) updateBorrowingLocked(setCompleters bool, plStates map[string]*priorityLevelState) {
400400
items := make([]allocProblemItem, 0, len(plStates))
401-
plNames := make([]string, 0, len(plStates))
401+
nonExemptPLNames := make([]string, 0, len(plStates))
402+
idxOfNonExempt := map[string]int{} // items index of non-exempt classes
403+
cclOfExempt := map[string]int{} // minCurrentCL of exempt classes
404+
var minCLSum, minCurrentCLSum int // sums over non-exempt classes
405+
remainingServerCL := cfgCtlr.nominalCLSum
402406
for plName, plState := range plStates {
403407
obs := plState.seatDemandIntegrator.Reset()
404408
plState.seatDemandStats.update(obs)
405-
// Lower bound on this priority level's adjusted concurreny limit is the lesser of:
406-
// - its seat demamd high watermark over the last adjustment period, and
407-
// - its configured concurrency limit.
408-
// BUT: we do not want this to be lower than the lower bound from configuration.
409-
// See KEP-1040 for a more detailed explanation.
410-
minCurrentCL := math.Max(float64(plState.minCL), math.Min(float64(plState.nominalCL), plState.seatDemandStats.highWatermark))
411-
plNames = append(plNames, plName)
412-
items = append(items, allocProblemItem{
413-
lowerBound: minCurrentCL,
414-
upperBound: float64(plState.maxCL),
415-
target: math.Max(minCurrentCL, plState.seatDemandStats.smoothed),
416-
})
409+
var minCurrentCL int
410+
if plState.pl.Spec.Type == flowcontrol.PriorityLevelEnablementExempt {
411+
minCurrentCL = max(plState.minCL, plState.seatDemandStats.highWatermark)
412+
cclOfExempt[plName] = minCurrentCL
413+
remainingServerCL -= minCurrentCL
414+
} else {
415+
// Lower bound on this priority level's adjusted concurreny limit is the lesser of:
416+
// - its seat demamd high watermark over the last adjustment period, and
417+
// - its configured concurrency limit.
418+
// BUT: we do not want this to be lower than the lower bound from configuration.
419+
// See KEP-1040 for a more detailed explanation.
420+
minCurrentCL = max(plState.minCL, min(plState.nominalCL, plState.seatDemandStats.highWatermark))
421+
idxOfNonExempt[plName] = len(items)
422+
nonExemptPLNames = append(nonExemptPLNames, plName)
423+
items = append(items, allocProblemItem{
424+
lowerBound: float64(minCurrentCL),
425+
upperBound: float64(plState.maxCL),
426+
target: math.Max(float64(minCurrentCL), plState.seatDemandStats.smoothed),
427+
})
428+
minCLSum += plState.minCL
429+
minCurrentCLSum += minCurrentCL
430+
}
417431
}
418432
if len(items) == 0 && cfgCtlr.nominalCLSum > 0 {
419433
klog.ErrorS(nil, "Impossible: no priority levels", "plStates", cfgCtlr.priorityLevelStates)
420434
return
421435
}
422-
allocs, fairFrac, err := computeConcurrencyAllocation(cfgCtlr.nominalCLSum, items)
423-
if err != nil {
424-
klog.ErrorS(err, "Unable to derive new concurrency limits", "plNames", plNames, "items", items)
425-
allocs = make([]float64, len(items))
426-
for idx, plName := range plNames {
427-
plState := plStates[plName]
428-
allocs[idx] = float64(plState.currentCL)
436+
var allocs []float64
437+
var shareFrac, fairFrac float64
438+
var err error
439+
if remainingServerCL <= minCLSum {
440+
metrics.SetFairFrac(0)
441+
} else if remainingServerCL <= minCurrentCLSum {
442+
shareFrac = float64(remainingServerCL-minCLSum) / float64(minCurrentCLSum-minCLSum)
443+
metrics.SetFairFrac(0)
444+
} else {
445+
allocs, fairFrac, err = computeConcurrencyAllocation(cfgCtlr.nominalCLSum, items)
446+
if err != nil {
447+
klog.ErrorS(err, "Unable to derive new concurrency limits", "plNames", nonExemptPLNames, "items", items)
448+
allocs = make([]float64, len(items))
449+
for idx, plName := range nonExemptPLNames {
450+
plState := plStates[plName]
451+
allocs[idx] = float64(plState.currentCL)
452+
}
429453
}
454+
metrics.SetFairFrac(float64(fairFrac))
430455
}
431-
for idx, plName := range plNames {
432-
plState := plStates[plName]
456+
for plName, plState := range plStates {
457+
idx, isNonExempt := idxOfNonExempt[plName]
433458
if setCompleters {
434459
qsCompleter, err := queueSetCompleterForPL(cfgCtlr.queueSetFactory, plState.queues,
435460
plState.pl, plState.reqsGaugePair, plState.execSeatsObs,
@@ -440,10 +465,20 @@ func (cfgCtlr *configController) updateBorrowingLocked(setCompleters bool, plSta
440465
}
441466
plState.qsCompleter = qsCompleter
442467
}
443-
currentCL := int(math.Round(float64(allocs[idx])))
468+
var currentCL int
469+
if !isNonExempt {
470+
currentCL = cclOfExempt[plName]
471+
} else if remainingServerCL <= minCLSum {
472+
currentCL = plState.minCL
473+
} else if remainingServerCL <= minCurrentCLSum {
474+
minCurrentCL := max(plState.minCL, min(plState.nominalCL, plState.seatDemandStats.highWatermark))
475+
currentCL = plState.minCL + int(math.Round(float64(minCurrentCL-plState.minCL)*shareFrac))
476+
} else {
477+
currentCL = int(math.Round(float64(allocs[idx])))
478+
}
444479
relChange := relDiff(float64(currentCL), float64(plState.currentCL))
445480
plState.currentCL = currentCL
446-
metrics.NotePriorityLevelConcurrencyAdjustment(plState.pl.Name, plState.seatDemandStats.highWatermark, plState.seatDemandStats.avg, plState.seatDemandStats.stdDev, plState.seatDemandStats.smoothed, float64(items[idx].target), currentCL)
481+
metrics.NotePriorityLevelConcurrencyAdjustment(plState.pl.Name, float64(plState.seatDemandStats.highWatermark), plState.seatDemandStats.avg, plState.seatDemandStats.stdDev, plState.seatDemandStats.smoothed, float64(items[idx].target), currentCL)
447482
logLevel := klog.Level(4)
448483
if relChange >= 0.05 {
449484
logLevel = 2
@@ -458,7 +493,6 @@ func (cfgCtlr *configController) updateBorrowingLocked(setCompleters bool, plSta
458493
klog.V(logLevel).InfoS("Update CurrentCL", "plName", plName, "seatDemandHighWatermark", plState.seatDemandStats.highWatermark, "seatDemandAvg", plState.seatDemandStats.avg, "seatDemandStdev", plState.seatDemandStats.stdDev, "seatDemandSmoothed", plState.seatDemandStats.smoothed, "fairFrac", fairFrac, "currentCL", currentCL, "concurrencyDenominator", concurrencyDenominator, "backstop", err != nil)
459494
plState.queues = plState.qsCompleter.Complete(fq.DispatchingConfig{ConcurrencyLimit: currentCL, ConcurrencyDenominator: concurrencyDenominator})
460495
}
461-
metrics.SetFairFrac(float64(fairFrac))
462496
}
463497

464498
// runWorker is the logic of the one and only worker goroutine. We
Lines changed: 164 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,164 @@
1+
/*
2+
Copyright 2024 The Kubernetes Authors.
3+
4+
Licensed under the Apache License, Version 2.0 (the "License");
5+
you may not use this file except in compliance with the License.
6+
You may obtain a copy of the License at
7+
8+
http://www.apache.org/licenses/LICENSE-2.0
9+
10+
Unless required by applicable law or agreed to in writing, software
11+
distributed under the License is distributed on an "AS IS" BASIS,
12+
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
See the License for the specific language governing permissions and
14+
limitations under the License.
15+
*/
16+
17+
package flowcontrol
18+
19+
import (
20+
"testing"
21+
"time"
22+
23+
fcboot "k8s.io/apiserver/pkg/apis/flowcontrol/bootstrap"
24+
fqs "k8s.io/apiserver/pkg/util/flowcontrol/fairqueuing/queueset"
25+
testeventclock "k8s.io/apiserver/pkg/util/flowcontrol/fairqueuing/testing/eventclock"
26+
"k8s.io/apiserver/pkg/util/flowcontrol/metrics"
27+
"k8s.io/client-go/informers"
28+
clientsetfake "k8s.io/client-go/kubernetes/fake"
29+
30+
flowcontrol "k8s.io/api/flowcontrol/v1"
31+
)
32+
33+
func TestUpdateBorrowing(t *testing.T) {
34+
startTime := time.Now()
35+
clk, _ := testeventclock.NewFake(startTime, 0, nil)
36+
plcExempt := fcboot.MandatoryPriorityLevelConfigurationExempt
37+
plcHigh := fcboot.SuggestedPriorityLevelConfigurationWorkloadHigh
38+
plcMid := fcboot.SuggestedPriorityLevelConfigurationWorkloadLow
39+
plcLow := fcboot.MandatoryPriorityLevelConfigurationCatchAll
40+
plcs := []*flowcontrol.PriorityLevelConfiguration{plcHigh, plcExempt, plcMid, plcLow}
41+
fses := []*flowcontrol.FlowSchema{}
42+
k8sClient := clientsetfake.NewSimpleClientset(plcLow, plcExempt, plcHigh, plcMid)
43+
informerFactory := informers.NewSharedInformerFactory(k8sClient, 0)
44+
flowcontrolClient := k8sClient.FlowcontrolV1()
45+
serverCL := int(*plcHigh.Spec.Limited.NominalConcurrencyShares+
46+
*plcMid.Spec.Limited.NominalConcurrencyShares+
47+
*plcLow.Spec.Limited.NominalConcurrencyShares) * 6
48+
config := TestableConfig{
49+
Name: "test",
50+
Clock: clk,
51+
AsFieldManager: "testfm",
52+
FoundToDangling: func(found bool) bool { return !found },
53+
InformerFactory: informerFactory,
54+
FlowcontrolClient: flowcontrolClient,
55+
ServerConcurrencyLimit: serverCL,
56+
ReqsGaugeVec: metrics.PriorityLevelConcurrencyGaugeVec,
57+
ExecSeatsGaugeVec: metrics.PriorityLevelExecutionSeatsGaugeVec,
58+
QueueSetFactory: fqs.NewQueueSetFactory(clk),
59+
}
60+
ctlr := newTestableController(config)
61+
_ = ctlr.lockAndDigestConfigObjects(plcs, fses)
62+
if ctlr.nominalCLSum != serverCL {
63+
t.Fatalf("Unexpected rounding: nominalCLSum=%d", ctlr.nominalCLSum)
64+
}
65+
stateExempt := ctlr.priorityLevelStates[plcExempt.Name]
66+
stateHigh := ctlr.priorityLevelStates[plcHigh.Name]
67+
stateMid := ctlr.priorityLevelStates[plcMid.Name]
68+
stateLow := ctlr.priorityLevelStates[plcLow.Name]
69+
70+
// Scenario 1: everybody wants more than ServerConcurrencyLimit.
71+
// Test the case of exempt borrowing so much that less than minCL
72+
// is available to each non-exempt.
73+
stateExempt.seatDemandIntegrator.Set(float64(serverCL + 100))
74+
stateHigh.seatDemandIntegrator.Set(float64(serverCL + 100))
75+
stateMid.seatDemandIntegrator.Set(float64(serverCL + 100))
76+
stateLow.seatDemandIntegrator.Set(float64(serverCL + 100))
77+
clk.SetTime(startTime.Add(borrowingAdjustmentPeriod))
78+
ctlr.updateBorrowing()
79+
if expected, actual := serverCL+100, stateExempt.currentCL; expected != actual {
80+
t.Errorf("Scenario 1: expected %d, got %d for exempt", expected, actual)
81+
} else {
82+
t.Logf("Scenario 1: expected and got %d for exempt", expected)
83+
}
84+
if expected, actual := stateHigh.minCL, stateHigh.currentCL; expected != actual {
85+
t.Errorf("Scenario 1: expected %d, got %d for hi", expected, actual)
86+
} else {
87+
t.Logf("Scenario 1: expected and got %d for hi", expected)
88+
}
89+
if expected, actual := stateMid.minCL, stateMid.currentCL; expected != actual {
90+
t.Errorf("Scenario 1: expected %d, got %d for mid", expected, actual)
91+
} else {
92+
t.Logf("Scenario 1: expected and got %d for mid", expected)
93+
}
94+
if expected, actual := stateLow.minCL, stateLow.currentCL; expected != actual {
95+
t.Errorf("Scenario 1: expected %d, got %d for lo", expected, actual)
96+
} else {
97+
t.Logf("Scenario 1: expected and got %d for lo", expected)
98+
}
99+
100+
// Scenario 2: non-exempt want more than serverCL but get halfway between minCL and minCurrentCL.
101+
expectedHigh := (stateHigh.nominalCL + stateHigh.minCL) / 2
102+
expectedMid := (stateMid.nominalCL + stateMid.minCL) / 2
103+
expectedLow := (stateLow.nominalCL + stateLow.minCL) / 2
104+
expectedExempt := serverCL - (expectedHigh + expectedMid + expectedLow)
105+
stateExempt.seatDemandIntegrator.Set(float64(expectedExempt))
106+
clk.SetTime(startTime.Add(2 * borrowingAdjustmentPeriod))
107+
ctlr.updateBorrowing()
108+
clk.SetTime(startTime.Add(3 * borrowingAdjustmentPeriod))
109+
ctlr.updateBorrowing()
110+
if expected, actual := expectedExempt, stateExempt.currentCL; expected != actual {
111+
t.Errorf("Scenario 2: expected %d, got %d for exempt", expected, actual)
112+
} else {
113+
t.Logf("Scenario 2: expected and got %d for exempt", expected)
114+
}
115+
if expected, actual := expectedHigh, stateHigh.currentCL; expected != actual {
116+
t.Errorf("Scenario 2: expected %d, got %d for hi", expected, actual)
117+
} else {
118+
t.Logf("Scenario 2: expected and got %d for hi", expected)
119+
}
120+
if expected, actual := expectedMid, stateMid.currentCL; expected != actual {
121+
t.Errorf("Scenario 2: expected %d, got %d for mid", expected, actual)
122+
} else {
123+
t.Logf("Scenario 2: expected and got %d for mid", expected)
124+
}
125+
if expected, actual := expectedLow, stateLow.currentCL; expected != actual {
126+
t.Errorf("Scenario 2: expected %d, got %d for lo", expected, actual)
127+
} else {
128+
t.Logf("Scenario 2: expected and got %d for lo", expected)
129+
}
130+
131+
// Scenario 3: only mid is willing to lend, and exempt borrows all of that.
132+
// Test the case of regular borrowing.
133+
expectedHigh = stateHigh.nominalCL
134+
expectedMid = stateMid.minCL
135+
expectedLow = stateLow.nominalCL
136+
expectedExempt = serverCL - (expectedHigh + expectedMid + expectedLow)
137+
stateExempt.seatDemandIntegrator.Set(float64(expectedExempt))
138+
stateMid.seatDemandIntegrator.Set(float64(1))
139+
clk.SetTime(startTime.Add(4 * borrowingAdjustmentPeriod))
140+
ctlr.updateBorrowing()
141+
clk.SetTime(startTime.Add(5 * borrowingAdjustmentPeriod))
142+
ctlr.updateBorrowing()
143+
if expected, actual := expectedExempt, stateExempt.currentCL; expected != actual {
144+
t.Errorf("Scenario 3: expected %d, got %d for exempt", expected, actual)
145+
} else {
146+
t.Logf("Scenario 3: expected and got %d for exempt", expected)
147+
}
148+
if expected, actual := expectedHigh, stateHigh.currentCL; expected != actual {
149+
t.Errorf("Scenario 3: expected %d, got %d for hi", expected, actual)
150+
} else {
151+
t.Logf("Scenario 3: expected and got %d for hi", expected)
152+
}
153+
if expected, actual := expectedMid, stateMid.currentCL; expected != actual {
154+
t.Errorf("Scenario 3: expected %d, got %d for mid", expected, actual)
155+
} else {
156+
t.Logf("Scenario 3: expected and got %d for mid", expected)
157+
}
158+
if expected, actual := expectedLow, stateLow.currentCL; expected != actual {
159+
t.Errorf("Scenario 3: expected %d, got %d for lo", expected, actual)
160+
} else {
161+
t.Logf("Scenario 3: expected and got %d for lo", expected)
162+
}
163+
164+
}

0 commit comments

Comments
 (0)