Skip to content

Commit 14a1106

Browse files
authored
Merge pull request kubernetes#94146 from MikeSpreitzer/limit-lag
Make sampleAndWaterMarkHistograms not fall very far behind
2 parents 61edc6f + 9e89b92 commit 14a1106

File tree

7 files changed

+43
-4
lines changed

7 files changed

+43
-4
lines changed

staging/src/k8s.io/apiserver/pkg/server/config.go

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -626,6 +626,7 @@ func (c completedConfig) New(name string, delegationTarget DelegationTarget) (*G
626626
if s.isPostStartHookRegistered(priorityAndFairnessConfigConsumerHookName) {
627627
} else if c.FlowControl != nil {
628628
err := s.AddPostStartHook(priorityAndFairnessConfigConsumerHookName, func(context PostStartHookContext) error {
629+
go c.FlowControl.MaintainObservations(context.StopCh)
629630
go c.FlowControl.Run(context.StopCh)
630631
return nil
631632
})

staging/src/k8s.io/apiserver/pkg/util/flowcontrol/apf_controller.go

Lines changed: 17 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -227,6 +227,23 @@ func (cfgCtlr *configController) initializeConfigController(informerFactory kube
227227
}})
228228
}
229229

230+
// MaintainObservations keeps the observers from
231+
// metrics.PriorityLevelConcurrencyObserverPairGenerator from falling
232+
// too far behind
233+
func (cfgCtlr *configController) MaintainObservations(stopCh <-chan struct{}) {
234+
wait.Until(cfgCtlr.updateObservations, 10*time.Second, stopCh)
235+
}
236+
237+
func (cfgCtlr *configController) updateObservations() {
238+
cfgCtlr.lock.Lock()
239+
defer cfgCtlr.lock.Unlock()
240+
for _, plc := range cfgCtlr.priorityLevelStates {
241+
if plc.queues != nil {
242+
plc.queues.UpdateObservations()
243+
}
244+
}
245+
}
246+
230247
func (cfgCtlr *configController) Run(stopCh <-chan struct{}) error {
231248
defer cfgCtlr.configQueue.ShutDown()
232249
klog.Info("Starting API Priority and Fairness config controller")

staging/src/k8s.io/apiserver/pkg/util/flowcontrol/apf_filter.go

Lines changed: 10 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -38,17 +38,23 @@ import (
3838
type Interface interface {
3939
// Handle takes care of queuing and dispatching a request
4040
// characterized by the given digest. The given `noteFn` will be
41-
// invoked with the results of request classification. If Handle
42-
// decides that the request should be executed then `execute()`
43-
// will be invoked once to execute the request; otherwise
44-
// `execute()` will not be invoked.
41+
// invoked with the results of request classification. If the
42+
// request is queued then `queueNoteFn` will be called twice,
43+
// first with `true` and then with `false`; otherwise
44+
// `queueNoteFn` will not be called at all. If Handle decides
45+
// that the request should be executed then `execute()` will be
46+
// invoked once to execute the request; otherwise `execute()` will
47+
// not be invoked.
4548
Handle(ctx context.Context,
4649
requestDigest RequestDigest,
4750
noteFn func(fs *fctypesv1a1.FlowSchema, pl *fctypesv1a1.PriorityLevelConfiguration),
4851
queueNoteFn fq.QueueNoteFn,
4952
execFn func(),
5053
)
5154

55+
// MaintainObservations is a helper for maintaining statistics.
56+
MaintainObservations(stopCh <-chan struct{})
57+
5258
// Run monitors config objects from the main apiservers and causes
5359
// any needed changes to local behavior. This method ceases
5460
// activity and returns after the given channel is closed.

staging/src/k8s.io/apiserver/pkg/util/flowcontrol/controller_test.go

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -97,6 +97,9 @@ func (cqs *ctlrTestQueueSet) BeginConfigChange(qc fq.QueuingConfig) (fq.QueueSet
9797
return ctlrTestQueueSetCompleter{cqs.cts, cqs, qc}, nil
9898
}
9999

100+
func (cqs *ctlrTestQueueSet) UpdateObservations() {
101+
}
102+
100103
func (cqs *ctlrTestQueueSet) Dump(bool) debug.QueueSetDump {
101104
return debug.QueueSetDump{}
102105
}

staging/src/k8s.io/apiserver/pkg/util/flowcontrol/fairqueuing/interface.go

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -82,6 +82,10 @@ type QueueSet interface {
8282
// exactly once.
8383
StartRequest(ctx context.Context, hashValue uint64, flowDistinguisher, fsName string, descr1, descr2 interface{}, queueNoteFn QueueNoteFn) (req Request, idle bool)
8484

85+
// UpdateObservations makes sure any time-based statistics have
86+
// caught up with the current clock reading
87+
UpdateObservations()
88+
8589
// Dump saves and returns the instant internal state of the queue-set.
8690
// Note that dumping process will stop the queue-set from proceeding
8791
// any requests.

staging/src/k8s.io/apiserver/pkg/util/flowcontrol/fairqueuing/queueset/queueset.go

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -743,6 +743,11 @@ func (qs *queueSet) goroutineDoneOrBlocked() {
743743
qs.counter.Add(-1)
744744
}
745745

746+
func (qs *queueSet) UpdateObservations() {
747+
qs.obsPair.RequestsWaiting.Add(0)
748+
qs.obsPair.RequestsExecuting.Add(0)
749+
}
750+
746751
func (qs *queueSet) Dump(includeRequestDetails bool) debug.QueueSetDump {
747752
qs.lock.Lock()
748753
defer qs.lock.Unlock()

staging/src/k8s.io/apiserver/pkg/util/flowcontrol/fairqueuing/testing/no-restraint.go

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -59,6 +59,9 @@ func (noRestraint) StartRequest(ctx context.Context, hashValue uint64, flowDisti
5959
return noRestraintRequest{}, false
6060
}
6161

62+
func (noRestraint) UpdateObservations() {
63+
}
64+
6265
func (noRestraint) Dump(bool) debug.QueueSetDump {
6366
return debug.QueueSetDump{}
6467
}

0 commit comments

Comments
 (0)