Skip to content

Commit 57ecea2

Browse files
committed
Introduce more metrics on concurrency
Introduce min, average, and standard deviation for the number of executing mutating and readOnly requests. Introduce min, max, average, and standard deviation for the number waiting and number waiting per priority level. Later: Revised to use a series of windows Use three individuals instead of array of powers Later: Add coarse queue count metrics, removed windowed avg and stddev Add metrics for number of queued mutating and readOnly requests, to complement metrics for number executing. Later: Removed windowed average and standard deviation because consumers can derive such from integrals of consumer's chosen window. Also replaced "requestKind" Prometheus label with "request_kind". Later: Revised to focus on sampling Make the clock intrinsic to a TimedObserver ... so that the clock can be read while holding the observer's lock; otherwise, forward progress is not guaranteed (and violations were observed in testing). Bug fixes and histogram buckets revision SetX1 to 1 when queue length limit is zero, beause dividing by zero is nasty. Remove obsolete argument in gen_test.go. Add a bucket boundary at 0 for sample-and-water-mark histograms, to distinguish zeroes from non-zeros. This includes adding Integrator test. Simplified test code. More pervasively used "ctlr" instead of "ctl" as abbreviation for "controller".
1 parent 30b0ebd commit 57ecea2

25 files changed

+909
-336
lines changed

staging/src/k8s.io/apiserver/pkg/endpoints/metrics/metrics.go

Lines changed: 29 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -122,7 +122,7 @@ var (
122122
Help: "Number of requests dropped with 'Try again later' response",
123123
StabilityLevel: compbasemetrics.ALPHA,
124124
},
125-
[]string{"requestKind"},
125+
[]string{"request_kind"},
126126
)
127127
// TLSHandshakeErrors is a number of requests dropped with 'TLS handshake error from' error
128128
TLSHandshakeErrors = compbasemetrics.NewCounter(
@@ -166,7 +166,15 @@ var (
166166
Help: "Maximal number of currently used inflight request limit of this apiserver per request kind in last second.",
167167
StabilityLevel: compbasemetrics.ALPHA,
168168
},
169-
[]string{"requestKind"},
169+
[]string{"request_kind"},
170+
)
171+
currentInqueueRequests = compbasemetrics.NewGaugeVec(
172+
&compbasemetrics.GaugeOpts{
173+
Name: "apiserver_current_inqueue_requests",
174+
Help: "Maximal number of queued requests in this apiserver per request kind in last second.",
175+
StabilityLevel: compbasemetrics.ALPHA,
176+
},
177+
[]string{"request_kind"},
170178
)
171179

172180
requestTerminationsTotal = compbasemetrics.NewCounterVec(
@@ -191,6 +199,7 @@ var (
191199
WatchEvents,
192200
WatchEventsSizes,
193201
currentInflightRequests,
202+
currentInqueueRequests,
194203
requestTerminationsTotal,
195204
}
196205

@@ -231,6 +240,11 @@ const (
231240
ReadOnlyKind = "readOnly"
232241
// MutatingKind is a string identifying mutating request kind
233242
MutatingKind = "mutating"
243+
244+
// WaitingPhase is the phase value for a request waiting in a queue
245+
WaitingPhase = "waiting"
246+
// ExecutingPhase is the phase value for an executing request
247+
ExecutingPhase = "executing"
234248
)
235249

236250
const (
@@ -261,9 +275,19 @@ func Reset() {
261275
}
262276
}
263277

264-
func UpdateInflightRequestMetrics(nonmutating, mutating int) {
265-
currentInflightRequests.WithLabelValues(ReadOnlyKind).Set(float64(nonmutating))
266-
currentInflightRequests.WithLabelValues(MutatingKind).Set(float64(mutating))
278+
// UpdateInflightRequestMetrics reports concurrency metrics classified by
279+
// mutating vs Readonly.
280+
func UpdateInflightRequestMetrics(phase string, nonmutating, mutating int) {
281+
for _, kc := range []struct {
282+
kind string
283+
count int
284+
}{{ReadOnlyKind, nonmutating}, {MutatingKind, mutating}} {
285+
if phase == ExecutingPhase {
286+
currentInflightRequests.WithLabelValues(kc.kind).Set(float64(kc.count))
287+
} else {
288+
currentInqueueRequests.WithLabelValues(kc.kind).Set(float64(kc.count))
289+
}
290+
}
267291
}
268292

269293
// RecordRequestTermination records that the request was terminated early as part of a resource

staging/src/k8s.io/apiserver/pkg/server/filters/BUILD

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -62,6 +62,7 @@ go_library(
6262
"//staging/src/k8s.io/apiserver/pkg/endpoints/request:go_default_library",
6363
"//staging/src/k8s.io/apiserver/pkg/server/httplog:go_default_library",
6464
"//staging/src/k8s.io/apiserver/pkg/util/flowcontrol:go_default_library",
65+
"//staging/src/k8s.io/apiserver/pkg/util/flowcontrol/metrics:go_default_library",
6566
"//staging/src/k8s.io/client-go/kubernetes/scheme:go_default_library",
6667
"//vendor/k8s.io/klog/v2:go_default_library",
6768
],

staging/src/k8s.io/apiserver/pkg/server/filters/maxinflight.go

Lines changed: 27 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -27,6 +27,7 @@ import (
2727
"k8s.io/apiserver/pkg/authentication/user"
2828
"k8s.io/apiserver/pkg/endpoints/metrics"
2929
apirequest "k8s.io/apiserver/pkg/endpoints/request"
30+
fcmetrics "k8s.io/apiserver/pkg/util/flowcontrol/metrics"
3031

3132
"k8s.io/klog/v2"
3233
)
@@ -50,13 +51,17 @@ func handleError(w http.ResponseWriter, r *http.Request, err error) {
5051
klog.Errorf(err.Error())
5152
}
5253

53-
// requestWatermark is used to trak maximal usage of inflight requests.
54+
// requestWatermark is used to track maximal numbers of requests in a particular phase of handling
5455
type requestWatermark struct {
56+
phase string
57+
readOnlyObserver, mutatingObserver fcmetrics.TimedObserver
5558
lock sync.Mutex
5659
readOnlyWatermark, mutatingWatermark int
5760
}
5861

5962
func (w *requestWatermark) recordMutating(mutatingVal int) {
63+
w.mutatingObserver.Set(float64(mutatingVal))
64+
6065
w.lock.Lock()
6166
defer w.lock.Unlock()
6267

@@ -66,6 +71,8 @@ func (w *requestWatermark) recordMutating(mutatingVal int) {
6671
}
6772

6873
func (w *requestWatermark) recordReadOnly(readOnlyVal int) {
74+
w.readOnlyObserver.Set(float64(readOnlyVal))
75+
6976
w.lock.Lock()
7077
defer w.lock.Unlock()
7178

@@ -74,9 +81,14 @@ func (w *requestWatermark) recordReadOnly(readOnlyVal int) {
7481
}
7582
}
7683

77-
var watermark = &requestWatermark{}
84+
// watermark tracks requests being executed (not waiting in a queue)
85+
var watermark = &requestWatermark{
86+
phase: metrics.ExecutingPhase,
87+
readOnlyObserver: fcmetrics.ReadWriteConcurrencyObserverPairGenerator.Generate(1, 1, []string{metrics.ReadOnlyKind}).RequestsExecuting,
88+
mutatingObserver: fcmetrics.ReadWriteConcurrencyObserverPairGenerator.Generate(1, 1, []string{metrics.MutatingKind}).RequestsExecuting,
89+
}
7890

79-
func startRecordingUsage() {
91+
func startRecordingUsage(watermark *requestWatermark) {
8092
go func() {
8193
wait.Forever(func() {
8294
watermark.lock.Lock()
@@ -86,7 +98,7 @@ func startRecordingUsage() {
8698
watermark.mutatingWatermark = 0
8799
watermark.lock.Unlock()
88100

89-
metrics.UpdateInflightRequestMetrics(readOnlyWatermark, mutatingWatermark)
101+
metrics.UpdateInflightRequestMetrics(watermark.phase, readOnlyWatermark, mutatingWatermark)
90102
}, inflightUsageMetricUpdatePeriod)
91103
}()
92104
}
@@ -100,17 +112,19 @@ func WithMaxInFlightLimit(
100112
mutatingLimit int,
101113
longRunningRequestCheck apirequest.LongRunningRequestCheck,
102114
) http.Handler {
103-
startOnce.Do(startRecordingUsage)
115+
startOnce.Do(func() { startRecordingUsage(watermark) })
104116
if nonMutatingLimit == 0 && mutatingLimit == 0 {
105117
return handler
106118
}
107119
var nonMutatingChan chan bool
108120
var mutatingChan chan bool
109121
if nonMutatingLimit != 0 {
110122
nonMutatingChan = make(chan bool, nonMutatingLimit)
123+
watermark.readOnlyObserver.SetX1(float64(nonMutatingLimit))
111124
}
112125
if mutatingLimit != 0 {
113126
mutatingChan = make(chan bool, mutatingLimit)
127+
watermark.mutatingObserver.SetX1(float64(mutatingLimit))
114128
}
115129

116130
return http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
@@ -141,21 +155,22 @@ func WithMaxInFlightLimit(
141155

142156
select {
143157
case c <- true:
144-
var mutatingLen, readOnlyLen int
158+
// We note the concurrency level both while the
159+
// request is being served and after it is done being
160+
// served, because both states contribute to the
161+
// sampled stats on concurrency.
145162
if isMutatingRequest {
146-
mutatingLen = len(mutatingChan)
163+
watermark.recordMutating(len(c))
147164
} else {
148-
readOnlyLen = len(nonMutatingChan)
165+
watermark.recordReadOnly(len(c))
149166
}
150-
151167
defer func() {
152168
<-c
153169
if isMutatingRequest {
154-
watermark.recordMutating(mutatingLen)
170+
watermark.recordMutating(len(c))
155171
} else {
156-
watermark.recordReadOnly(readOnlyLen)
172+
watermark.recordReadOnly(len(c))
157173
}
158-
159174
}()
160175
handler.ServeHTTP(w, r)
161176

staging/src/k8s.io/apiserver/pkg/server/filters/priority-and-fairness.go

Lines changed: 36 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -24,8 +24,10 @@ import (
2424

2525
fcv1a1 "k8s.io/api/flowcontrol/v1alpha1"
2626
apitypes "k8s.io/apimachinery/pkg/types"
27+
epmetrics "k8s.io/apiserver/pkg/endpoints/metrics"
2728
apirequest "k8s.io/apiserver/pkg/endpoints/request"
2829
utilflowcontrol "k8s.io/apiserver/pkg/util/flowcontrol"
30+
fcmetrics "k8s.io/apiserver/pkg/util/flowcontrol/metrics"
2931
"k8s.io/klog/v2"
3032
)
3133

@@ -53,7 +55,15 @@ func GetClassification(ctx context.Context) *PriorityAndFairnessClassification {
5355
return ctx.Value(priorityAndFairnessKey).(*PriorityAndFairnessClassification)
5456
}
5557

56-
var atomicMutatingLen, atomicNonMutatingLen int32
58+
// waitingMark tracks requests waiting rather than being executed
59+
var waitingMark = &requestWatermark{
60+
phase: epmetrics.WaitingPhase,
61+
readOnlyObserver: fcmetrics.ReadWriteConcurrencyObserverPairGenerator.Generate(1, 1, []string{epmetrics.ReadOnlyKind}).RequestsWaiting,
62+
mutatingObserver: fcmetrics.ReadWriteConcurrencyObserverPairGenerator.Generate(1, 1, []string{epmetrics.MutatingKind}).RequestsWaiting,
63+
}
64+
65+
var atomicMutatingExecuting, atomicReadOnlyExecuting int32
66+
var atomicMutatingWaiting, atomicReadOnlyWaiting int32
5767

5868
// WithPriorityAndFairness limits the number of in-flight
5969
// requests in a fine-grained way.
@@ -66,7 +76,10 @@ func WithPriorityAndFairness(
6676
klog.Warningf("priority and fairness support not found, skipping")
6777
return handler
6878
}
69-
startOnce.Do(startRecordingUsage)
79+
startOnce.Do(func() {
80+
startRecordingUsage(watermark)
81+
startRecordingUsage(waitingMark)
82+
})
7083
return http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
7184
ctx := r.Context()
7285
requestInfo, ok := apirequest.RequestInfoFrom(ctx)
@@ -98,22 +111,23 @@ func WithPriorityAndFairness(
98111

99112
var served bool
100113
isMutatingRequest := !nonMutatingRequestVerbs.Has(requestInfo.Verb)
101-
execute := func() {
102-
var mutatingLen, readOnlyLen int
114+
noteExecutingDelta := func(delta int32) {
103115
if isMutatingRequest {
104-
mutatingLen = int(atomic.AddInt32(&atomicMutatingLen, 1))
116+
watermark.recordMutating(int(atomic.AddInt32(&atomicMutatingExecuting, delta)))
105117
} else {
106-
readOnlyLen = int(atomic.AddInt32(&atomicNonMutatingLen, 1))
118+
watermark.recordReadOnly(int(atomic.AddInt32(&atomicReadOnlyExecuting, delta)))
107119
}
108-
defer func() {
109-
if isMutatingRequest {
110-
atomic.AddInt32(&atomicMutatingLen, -11)
111-
watermark.recordMutating(mutatingLen)
112-
} else {
113-
atomic.AddInt32(&atomicNonMutatingLen, -1)
114-
watermark.recordReadOnly(readOnlyLen)
115-
}
116-
}()
120+
}
121+
noteWaitingDelta := func(delta int32) {
122+
if isMutatingRequest {
123+
waitingMark.recordMutating(int(atomic.AddInt32(&atomicMutatingWaiting, delta)))
124+
} else {
125+
waitingMark.recordReadOnly(int(atomic.AddInt32(&atomicReadOnlyWaiting, delta)))
126+
}
127+
}
128+
execute := func() {
129+
noteExecutingDelta(1)
130+
defer noteExecutingDelta(-1)
117131
served = true
118132
innerCtx := context.WithValue(ctx, priorityAndFairnessKey, classification)
119133
innerReq := r.Clone(innerCtx)
@@ -122,10 +136,15 @@ func WithPriorityAndFairness(
122136
handler.ServeHTTP(w, innerReq)
123137
}
124138
digest := utilflowcontrol.RequestDigest{requestInfo, user}
125-
fcIfc.Handle(ctx, digest, note, execute)
139+
fcIfc.Handle(ctx, digest, note, func(inQueue bool) {
140+
if inQueue {
141+
noteWaitingDelta(1)
142+
} else {
143+
noteWaitingDelta(-1)
144+
}
145+
}, execute)
126146
if !served {
127147
tooManyRequests(r, w)
128-
return
129148
}
130149

131150
})

staging/src/k8s.io/apiserver/pkg/util/flowcontrol/BUILD

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -85,6 +85,7 @@ go_test(
8585
"//staging/src/k8s.io/apiserver/pkg/util/flowcontrol/fairqueuing:go_default_library",
8686
"//staging/src/k8s.io/apiserver/pkg/util/flowcontrol/fairqueuing/testing:go_default_library",
8787
"//staging/src/k8s.io/apiserver/pkg/util/flowcontrol/format:go_default_library",
88+
"//staging/src/k8s.io/apiserver/pkg/util/flowcontrol/metrics:go_default_library",
8889
"//staging/src/k8s.io/client-go/informers:go_default_library",
8990
"//staging/src/k8s.io/client-go/kubernetes/fake:go_default_library",
9091
"//staging/src/k8s.io/client-go/kubernetes/typed/flowcontrol/v1alpha1:go_default_library",

0 commit comments

Comments
 (0)