Skip to content

Commit 11ef951

Browse files
committed
apf: remove RequestWaitLimit from queueset config
1 parent da8a472 commit 11ef951

File tree

14 files changed

+21
-60
lines changed

14 files changed

+21
-60
lines changed

pkg/controlplane/apiserver/config.go

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -196,7 +196,6 @@ func BuildPriorityAndFairness(s controlplaneapiserver.CompletedOptions, extclien
196196
versionedInformer,
197197
extclient.FlowcontrolV1beta3(),
198198
s.GenericServerRunOptions.MaxRequestsInFlight+s.GenericServerRunOptions.MaxMutatingRequestsInFlight,
199-
s.GenericServerRunOptions.RequestTimeout/4,
200199
), nil
201200
}
202201

staging/src/k8s.io/apiserver/pkg/server/filters/priority-and-fairness_test.go

Lines changed: 7 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -687,7 +687,7 @@ func TestPriorityAndFairnessWithPanicRecoveryAndTimeoutFilter(t *testing.T) {
687687

688688
apfConfiguration := newConfiguration(fsName, plName, userName, plConcurrencyShares, 0)
689689
stopCh := make(chan struct{})
690-
controller, controllerCompletedCh := startAPFController(t, stopCh, apfConfiguration, serverConcurrency, requestTimeout/4, plName, plConcurrency)
690+
controller, controllerCompletedCh := startAPFController(t, stopCh, apfConfiguration, serverConcurrency, plName, plConcurrency)
691691

692692
headerMatcher := headerMatcher{}
693693
var executed bool
@@ -757,7 +757,7 @@ func TestPriorityAndFairnessWithPanicRecoveryAndTimeoutFilter(t *testing.T) {
757757

758758
apfConfiguration := newConfiguration(fsName, plName, userName, plConcurrencyShares, 0)
759759
stopCh := make(chan struct{})
760-
controller, controllerCompletedCh := startAPFController(t, stopCh, apfConfiguration, serverConcurrency, requestTimeout/4, plName, plConcurrency)
760+
controller, controllerCompletedCh := startAPFController(t, stopCh, apfConfiguration, serverConcurrency, plName, plConcurrency)
761761

762762
headerMatcher := headerMatcher{}
763763
var executed bool
@@ -833,7 +833,7 @@ func TestPriorityAndFairnessWithPanicRecoveryAndTimeoutFilter(t *testing.T) {
833833

834834
apfConfiguration := newConfiguration(fsName, plName, userName, plConcurrencyShares, 0)
835835
stopCh := make(chan struct{})
836-
controller, controllerCompletedCh := startAPFController(t, stopCh, apfConfiguration, serverConcurrency, requestTimeout/4, plName, plConcurrency)
836+
controller, controllerCompletedCh := startAPFController(t, stopCh, apfConfiguration, serverConcurrency, plName, plConcurrency)
837837

838838
headerMatcher := headerMatcher{}
839839
var innerHandlerWriteErr error
@@ -911,7 +911,7 @@ func TestPriorityAndFairnessWithPanicRecoveryAndTimeoutFilter(t *testing.T) {
911911

912912
apfConfiguration := newConfiguration(fsName, plName, userName, plConcurrencyShares, 0)
913913
stopCh := make(chan struct{})
914-
controller, controllerCompletedCh := startAPFController(t, stopCh, apfConfiguration, serverConcurrency, requestTimeout/4, plName, plConcurrency)
914+
controller, controllerCompletedCh := startAPFController(t, stopCh, apfConfiguration, serverConcurrency, plName, plConcurrency)
915915

916916
headerMatcher := headerMatcher{}
917917
var innerHandlerWriteErr error
@@ -986,7 +986,7 @@ func TestPriorityAndFairnessWithPanicRecoveryAndTimeoutFilter(t *testing.T) {
986986

987987
apfConfiguration := newConfiguration(fsName, plName, userName, plConcurrencyShares, queueLength)
988988
stopCh := make(chan struct{})
989-
controller, controllerCompletedCh := startAPFController(t, stopCh, apfConfiguration, serverConcurrency, requestTimeout/4, plName, plConcurrency)
989+
controller, controllerCompletedCh := startAPFController(t, stopCh, apfConfiguration, serverConcurrency, plName, plConcurrency)
990990

991991
headerMatcher := headerMatcher{}
992992
var firstRequestInnerHandlerWriteErr error
@@ -1118,11 +1118,11 @@ func fmtError(err error) string {
11181118
}
11191119

11201120
func startAPFController(t *testing.T, stopCh <-chan struct{}, apfConfiguration []runtime.Object, serverConcurrency int,
1121-
requestWaitLimit time.Duration, plName string, plConcurrency int) (utilflowcontrol.Interface, <-chan error) {
1121+
plName string, plConcurrency int) (utilflowcontrol.Interface, <-chan error) {
11221122
clientset := newClientset(t, apfConfiguration...)
11231123
// this test does not rely on resync, so resync period is set to zero
11241124
factory := informers.NewSharedInformerFactory(clientset, 0)
1125-
controller := utilflowcontrol.New(factory, clientset.FlowcontrolV1beta3(), serverConcurrency, requestWaitLimit)
1125+
controller := utilflowcontrol.New(factory, clientset.FlowcontrolV1beta3(), serverConcurrency)
11261126

11271127
factory.Start(stopCh)
11281128

staging/src/k8s.io/apiserver/pkg/server/options/recommended.go

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -154,7 +154,6 @@ func (o *RecommendedOptions) ApplyTo(config *server.RecommendedConfig) error {
154154
config.SharedInformerFactory,
155155
kubernetes.NewForConfigOrDie(config.ClientConfig).FlowcontrolV1beta3(),
156156
config.MaxRequestsInFlight+config.MaxMutatingRequestsInFlight,
157-
config.RequestTimeout/4,
158157
)
159158
} else {
160159
klog.Warningf("Neither kubeconfig is provided nor service-account is mounted, so APIPriorityAndFairness will be disabled")

staging/src/k8s.io/apiserver/pkg/util/flowcontrol/apf_controller.go

Lines changed: 10 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -150,9 +150,6 @@ type configController struct {
150150
// from server configuration.
151151
serverConcurrencyLimit int
152152

153-
// requestWaitLimit comes from server configuration.
154-
requestWaitLimit time.Duration
155-
156153
// watchTracker implements the necessary WatchTracker interface.
157154
WatchTracker
158155

@@ -287,13 +284,12 @@ func newTestableController(config TestableConfig) *configController {
287284
asFieldManager: config.AsFieldManager,
288285
foundToDangling: config.FoundToDangling,
289286
serverConcurrencyLimit: config.ServerConcurrencyLimit,
290-
requestWaitLimit: config.RequestWaitLimit,
291287
flowcontrolClient: config.FlowcontrolClient,
292288
priorityLevelStates: make(map[string]*priorityLevelState),
293289
WatchTracker: NewWatchTracker(),
294290
MaxSeatsTracker: NewMaxSeatsTracker(),
295291
}
296-
klog.V(2).Infof("NewTestableController %q with serverConcurrencyLimit=%d, requestWaitLimit=%s, name=%s, asFieldManager=%q", cfgCtlr.name, cfgCtlr.serverConcurrencyLimit, cfgCtlr.requestWaitLimit, cfgCtlr.name, cfgCtlr.asFieldManager)
292+
klog.V(2).Infof("NewTestableController %q with serverConcurrencyLimit=%d, name=%s, asFieldManager=%q", cfgCtlr.name, cfgCtlr.serverConcurrencyLimit, cfgCtlr.name, cfgCtlr.asFieldManager)
297293
// Start with longish delay because conflicts will be between
298294
// different processes, so take some time to go away.
299295
cfgCtlr.configQueue = workqueue.NewNamedRateLimitingQueue(workqueue.NewItemExponentialFailureRateLimiter(200*time.Millisecond, 8*time.Hour), "priority_and_fairness_config_queue")
@@ -433,7 +429,7 @@ func (cfgCtlr *configController) updateBorrowingLocked(setCompleters bool, plSta
433429
plState := plStates[plName]
434430
if setCompleters {
435431
qsCompleter, err := queueSetCompleterForPL(cfgCtlr.queueSetFactory, plState.queues,
436-
plState.pl, cfgCtlr.requestWaitLimit, plState.reqsGaugePair, plState.execSeatsObs,
432+
plState.pl, plState.reqsGaugePair, plState.execSeatsObs,
437433
metrics.NewUnionGauge(plState.seatDemandIntegrator, plState.seatDemandRatioedGauge))
438434
if err != nil {
439435
klog.ErrorS(err, "Inconceivable! Configuration error in existing priority level", "pl", plState.pl)
@@ -657,10 +653,10 @@ func (cfgCtlr *configController) lockAndDigestConfigObjects(newPLs []*flowcontro
657653

658654
// Supply missing mandatory PriorityLevelConfiguration objects
659655
if !meal.haveExemptPL {
660-
meal.imaginePL(fcboot.MandatoryPriorityLevelConfigurationExempt, cfgCtlr.requestWaitLimit)
656+
meal.imaginePL(fcboot.MandatoryPriorityLevelConfigurationExempt)
661657
}
662658
if !meal.haveCatchAllPL {
663-
meal.imaginePL(fcboot.MandatoryPriorityLevelConfigurationCatchAll, cfgCtlr.requestWaitLimit)
659+
meal.imaginePL(fcboot.MandatoryPriorityLevelConfigurationCatchAll)
664660
}
665661

666662
meal.finishQueueSetReconfigsLocked()
@@ -692,7 +688,7 @@ func (meal *cfgMeal) digestNewPLsLocked(newPLs []*flowcontrol.PriorityLevelConfi
692688
}
693689
}
694690
qsCompleter, err := queueSetCompleterForPL(meal.cfgCtlr.queueSetFactory, state.queues,
695-
pl, meal.cfgCtlr.requestWaitLimit, state.reqsGaugePair, state.execSeatsObs,
691+
pl, state.reqsGaugePair, state.execSeatsObs,
696692
metrics.NewUnionGauge(state.seatDemandIntegrator, state.seatDemandRatioedGauge))
697693
if err != nil {
698694
klog.Warningf("Ignoring PriorityLevelConfiguration object %s because its spec (%s) is broken: %s", pl.Name, fcfmt.Fmt(pl.Spec), err)
@@ -798,7 +794,7 @@ func (meal *cfgMeal) processOldPLsLocked() {
798794
}
799795
var err error
800796
plState.qsCompleter, err = queueSetCompleterForPL(meal.cfgCtlr.queueSetFactory, plState.queues,
801-
plState.pl, meal.cfgCtlr.requestWaitLimit, plState.reqsGaugePair, plState.execSeatsObs,
797+
plState.pl, plState.reqsGaugePair, plState.execSeatsObs,
802798
metrics.NewUnionGauge(plState.seatDemandIntegrator, plState.seatDemandRatioedGauge))
803799
if err != nil {
804800
// This can not happen because queueSetCompleterForPL already approved this config
@@ -880,7 +876,7 @@ func (meal *cfgMeal) finishQueueSetReconfigsLocked() {
880876
// queueSetCompleterForPL returns an appropriate QueueSetCompleter for the
881877
// given priority level configuration. Returns nil and an error if the given
882878
// object is malformed in a way that is a problem for this package.
883-
func queueSetCompleterForPL(qsf fq.QueueSetFactory, queues fq.QueueSet, pl *flowcontrol.PriorityLevelConfiguration, requestWaitLimit time.Duration, reqsIntPair metrics.RatioedGaugePair, execSeatsObs metrics.RatioedGauge, seatDemandGauge metrics.Gauge) (fq.QueueSetCompleter, error) {
879+
func queueSetCompleterForPL(qsf fq.QueueSetFactory, queues fq.QueueSet, pl *flowcontrol.PriorityLevelConfiguration, reqsIntPair metrics.RatioedGaugePair, execSeatsObs metrics.RatioedGauge, seatDemandGauge metrics.Gauge) (fq.QueueSetCompleter, error) {
884880
if (pl.Spec.Type == flowcontrol.PriorityLevelEnablementLimited) != (pl.Spec.Limited != nil) {
885881
return nil, errors.New("broken union structure at the top, for Limited")
886882
}
@@ -902,7 +898,6 @@ func queueSetCompleterForPL(qsf fq.QueueSetFactory, queues fq.QueueSet, pl *flow
902898
DesiredNumQueues: int(qcAPI.Queues),
903899
QueueLengthLimit: int(qcAPI.QueueLengthLimit),
904900
HandSize: int(qcAPI.HandSize),
905-
RequestWaitLimit: requestWaitLimit,
906901
}
907902
}
908903
} else {
@@ -956,16 +951,15 @@ func (meal *cfgMeal) presyncFlowSchemaStatus(fs *flowcontrol.FlowSchema, isDangl
956951

957952
// imaginePL adds a priority level based on one of the mandatory ones
958953
// that does not actually exist (right now) as a real API object.
959-
func (meal *cfgMeal) imaginePL(proto *flowcontrol.PriorityLevelConfiguration, requestWaitLimit time.Duration) {
954+
func (meal *cfgMeal) imaginePL(proto *flowcontrol.PriorityLevelConfiguration) {
960955
klog.V(3).Infof("No %s PriorityLevelConfiguration found, imagining one", proto.Name)
961956
labelValues := []string{proto.Name}
962957
reqsGaugePair := metrics.RatioedGaugeVecPhasedElementPair(meal.cfgCtlr.reqsGaugeVec, 1, 1, labelValues)
963958
execSeatsObs := meal.cfgCtlr.execSeatsGaugeVec.NewForLabelValuesSafe(0, 1, labelValues)
964959
seatDemandIntegrator := fq.NewNamedIntegrator(meal.cfgCtlr.clock, proto.Name)
965960
seatDemandRatioedGauge := metrics.ApiserverSeatDemands.NewForLabelValuesSafe(0, 1, []string{proto.Name})
966-
qsCompleter, err := queueSetCompleterForPL(meal.cfgCtlr.queueSetFactory, nil, proto,
967-
requestWaitLimit, reqsGaugePair, execSeatsObs,
968-
metrics.NewUnionGauge(seatDemandIntegrator, seatDemandRatioedGauge))
961+
qsCompleter, err := queueSetCompleterForPL(meal.cfgCtlr.queueSetFactory, nil, proto, reqsGaugePair,
962+
execSeatsObs, metrics.NewUnionGauge(seatDemandIntegrator, seatDemandRatioedGauge))
969963
if err != nil {
970964
// This can not happen because proto is one of the mandatory
971965
// objects and these are not erroneous

staging/src/k8s.io/apiserver/pkg/util/flowcontrol/apf_filter.go

Lines changed: 0 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -90,7 +90,6 @@ func New(
9090
informerFactory kubeinformers.SharedInformerFactory,
9191
flowcontrolClient flowcontrolclient.FlowcontrolV1beta3Interface,
9292
serverConcurrencyLimit int,
93-
requestWaitLimit time.Duration,
9493
) Interface {
9594
clk := eventclock.Real{}
9695
return NewTestable(TestableConfig{
@@ -101,7 +100,6 @@ func New(
101100
InformerFactory: informerFactory,
102101
FlowcontrolClient: flowcontrolClient,
103102
ServerConcurrencyLimit: serverConcurrencyLimit,
104-
RequestWaitLimit: requestWaitLimit,
105103
ReqsGaugeVec: metrics.PriorityLevelConcurrencyGaugeVec,
106104
ExecSeatsGaugeVec: metrics.PriorityLevelExecutionSeatsGaugeVec,
107105
QueueSetFactory: fqs.NewQueueSetFactory(clk),
@@ -139,9 +137,6 @@ type TestableConfig struct {
139137
// ServerConcurrencyLimit for the controller to enforce
140138
ServerConcurrencyLimit int
141139

142-
// RequestWaitLimit configured on the server
143-
RequestWaitLimit time.Duration
144-
145140
// GaugeVec for metrics about requests, broken down by phase and priority_level
146141
ReqsGaugeVec metrics.RatioedGaugeVec
147142

staging/src/k8s.io/apiserver/pkg/util/flowcontrol/apf_filter_test.go

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -109,7 +109,6 @@ func TestQueueWaitTimeLatencyTracker(t *testing.T) {
109109
InformerFactory: informerFactory,
110110
FlowcontrolClient: flowcontrolClient,
111111
ServerConcurrencyLimit: 24,
112-
RequestWaitLimit: time.Minute,
113112
ReqsGaugeVec: metrics.PriorityLevelConcurrencyGaugeVec,
114113
ExecSeatsGaugeVec: metrics.PriorityLevelExecutionSeatsGaugeVec,
115114
QueueSetFactory: fqs.NewQueueSetFactory(clk),

staging/src/k8s.io/apiserver/pkg/util/flowcontrol/borrowing_test.go

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -143,7 +143,6 @@ func TestBorrowing(t *testing.T) {
143143
InformerFactory: informerFactory,
144144
FlowcontrolClient: flowcontrolClient,
145145
ServerConcurrencyLimit: 24,
146-
RequestWaitLimit: time.Minute,
147146
ReqsGaugeVec: metrics.PriorityLevelConcurrencyGaugeVec,
148147
ExecSeatsGaugeVec: metrics.PriorityLevelExecutionSeatsGaugeVec,
149148
QueueSetFactory: fqs.NewQueueSetFactory(clk),

staging/src/k8s.io/apiserver/pkg/util/flowcontrol/controller_test.go

Lines changed: 1 addition & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -251,8 +251,7 @@ func TestConfigConsumer(t *testing.T) {
251251
FoundToDangling: func(found bool) bool { return !found },
252252
InformerFactory: informerFactory,
253253
FlowcontrolClient: flowcontrolClient,
254-
ServerConcurrencyLimit: 100, // server concurrency limit
255-
RequestWaitLimit: time.Minute, // request wait limit
254+
ServerConcurrencyLimit: 100, // server concurrency limit
256255
ReqsGaugeVec: metrics.PriorityLevelConcurrencyGaugeVec,
257256
ExecSeatsGaugeVec: metrics.PriorityLevelExecutionSeatsGaugeVec,
258257
QueueSetFactory: cts,
@@ -384,7 +383,6 @@ func TestAPFControllerWithGracefulShutdown(t *testing.T) {
384383
InformerFactory: informerFactory,
385384
FlowcontrolClient: flowcontrolClient,
386385
ServerConcurrencyLimit: 100,
387-
RequestWaitLimit: time.Minute,
388386
ReqsGaugeVec: metrics.PriorityLevelConcurrencyGaugeVec,
389387
ExecSeatsGaugeVec: metrics.PriorityLevelExecutionSeatsGaugeVec,
390388
QueueSetFactory: cts,

staging/src/k8s.io/apiserver/pkg/util/flowcontrol/fairqueuing/interface.go

Lines changed: 1 addition & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -18,7 +18,6 @@ package fairqueuing
1818

1919
import (
2020
"context"
21-
"time"
2221

2322
"k8s.io/apiserver/pkg/util/flowcontrol/debug"
2423
"k8s.io/apiserver/pkg/util/flowcontrol/metrics"
@@ -117,7 +116,7 @@ type QueuingConfig struct {
117116

118117
// DesiredNumQueues is the number of queues that the API says
119118
// should exist now. This may be non-positive, in which case
120-
// QueueLengthLimit, HandSize, and RequestWaitLimit are ignored.
119+
// QueueLengthLimit, and HandSize are ignored.
121120
// A value of zero means to respect the ConcurrencyLimit of the DispatchingConfig.
122121
// A negative value means to always dispatch immediately upon arrival
123122
// (i.e., the requests are "exempt" from limitation).
@@ -129,10 +128,6 @@ type QueuingConfig struct {
129128
// HandSize is a parameter of shuffle sharding. Upon arrival of a request, a queue is chosen by randomly
130129
// dealing a "hand" of this many queues and then picking one of minimum length.
131130
HandSize int
132-
133-
// RequestWaitLimit is the maximum amount of time that a request may wait in a queue.
134-
// If, by the end of that time, the request has not been dispatched then it is rejected.
135-
RequestWaitLimit time.Duration
136131
}
137132

138133
// DispatchingConfig defines the configuration of the dispatching aspect of a QueueSet.

staging/src/k8s.io/apiserver/pkg/util/flowcontrol/fairqueuing/queueset/queueset.go

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -272,7 +272,6 @@ func (qs *queueSet) setConfiguration(ctx context.Context, qCfg fq.QueuingConfig,
272272
} else {
273273
qCfg.QueueLengthLimit = qs.qCfg.QueueLengthLimit
274274
qCfg.HandSize = qs.qCfg.HandSize
275-
qCfg.RequestWaitLimit = qs.qCfg.RequestWaitLimit
276275
}
277276

278277
qs.qCfg = qCfg

0 commit comments

Comments
 (0)