Skip to content

Commit eda07ad

Browse files
authored
Merge pull request kubernetes#91177 from MikeSpreitzer/more-concurrency-details
Introduce more metrics on concurrency
2 parents a789d56 + 57ecea2 commit eda07ad

25 files changed

+909
-336
lines changed

staging/src/k8s.io/apiserver/pkg/endpoints/metrics/metrics.go

Lines changed: 29 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -122,7 +122,7 @@ var (
122122
Help: "Number of requests dropped with 'Try again later' response",
123123
StabilityLevel: compbasemetrics.ALPHA,
124124
},
125-
[]string{"requestKind"},
125+
[]string{"request_kind"},
126126
)
127127
// TLSHandshakeErrors is a number of requests dropped with 'TLS handshake error from' error
128128
TLSHandshakeErrors = compbasemetrics.NewCounter(
@@ -166,7 +166,15 @@ var (
166166
Help: "Maximal number of currently used inflight request limit of this apiserver per request kind in last second.",
167167
StabilityLevel: compbasemetrics.ALPHA,
168168
},
169-
[]string{"requestKind"},
169+
[]string{"request_kind"},
170+
)
171+
currentInqueueRequests = compbasemetrics.NewGaugeVec(
172+
&compbasemetrics.GaugeOpts{
173+
Name: "apiserver_current_inqueue_requests",
174+
Help: "Maximal number of queued requests in this apiserver per request kind in last second.",
175+
StabilityLevel: compbasemetrics.ALPHA,
176+
},
177+
[]string{"request_kind"},
170178
)
171179

172180
requestTerminationsTotal = compbasemetrics.NewCounterVec(
@@ -191,6 +199,7 @@ var (
191199
WatchEvents,
192200
WatchEventsSizes,
193201
currentInflightRequests,
202+
currentInqueueRequests,
194203
requestTerminationsTotal,
195204
}
196205

@@ -231,6 +240,11 @@ const (
231240
ReadOnlyKind = "readOnly"
232241
// MutatingKind is a string identifying mutating request kind
233242
MutatingKind = "mutating"
243+
244+
// WaitingPhase is the phase value for a request waiting in a queue
245+
WaitingPhase = "waiting"
246+
// ExecutingPhase is the phase value for an executing request
247+
ExecutingPhase = "executing"
234248
)
235249

236250
const (
@@ -261,9 +275,19 @@ func Reset() {
261275
}
262276
}
263277

264-
func UpdateInflightRequestMetrics(nonmutating, mutating int) {
265-
currentInflightRequests.WithLabelValues(ReadOnlyKind).Set(float64(nonmutating))
266-
currentInflightRequests.WithLabelValues(MutatingKind).Set(float64(mutating))
278+
// UpdateInflightRequestMetrics reports concurrency metrics classified by
279+
// mutating vs Readonly.
280+
func UpdateInflightRequestMetrics(phase string, nonmutating, mutating int) {
281+
for _, kc := range []struct {
282+
kind string
283+
count int
284+
}{{ReadOnlyKind, nonmutating}, {MutatingKind, mutating}} {
285+
if phase == ExecutingPhase {
286+
currentInflightRequests.WithLabelValues(kc.kind).Set(float64(kc.count))
287+
} else {
288+
currentInqueueRequests.WithLabelValues(kc.kind).Set(float64(kc.count))
289+
}
290+
}
267291
}
268292

269293
// RecordRequestTermination records that the request was terminated early as part of a resource

staging/src/k8s.io/apiserver/pkg/server/filters/BUILD

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -62,6 +62,7 @@ go_library(
6262
"//staging/src/k8s.io/apiserver/pkg/endpoints/request:go_default_library",
6363
"//staging/src/k8s.io/apiserver/pkg/server/httplog:go_default_library",
6464
"//staging/src/k8s.io/apiserver/pkg/util/flowcontrol:go_default_library",
65+
"//staging/src/k8s.io/apiserver/pkg/util/flowcontrol/metrics:go_default_library",
6566
"//staging/src/k8s.io/client-go/kubernetes/scheme:go_default_library",
6667
"//vendor/k8s.io/klog/v2:go_default_library",
6768
],

staging/src/k8s.io/apiserver/pkg/server/filters/maxinflight.go

Lines changed: 27 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -27,6 +27,7 @@ import (
2727
"k8s.io/apiserver/pkg/authentication/user"
2828
"k8s.io/apiserver/pkg/endpoints/metrics"
2929
apirequest "k8s.io/apiserver/pkg/endpoints/request"
30+
fcmetrics "k8s.io/apiserver/pkg/util/flowcontrol/metrics"
3031

3132
"k8s.io/klog/v2"
3233
)
@@ -50,13 +51,17 @@ func handleError(w http.ResponseWriter, r *http.Request, err error) {
5051
klog.Errorf(err.Error())
5152
}
5253

53-
// requestWatermark is used to trak maximal usage of inflight requests.
54+
// requestWatermark is used to track maximal numbers of requests in a particular phase of handling
5455
type requestWatermark struct {
56+
phase string
57+
readOnlyObserver, mutatingObserver fcmetrics.TimedObserver
5558
lock sync.Mutex
5659
readOnlyWatermark, mutatingWatermark int
5760
}
5861

5962
func (w *requestWatermark) recordMutating(mutatingVal int) {
63+
w.mutatingObserver.Set(float64(mutatingVal))
64+
6065
w.lock.Lock()
6166
defer w.lock.Unlock()
6267

@@ -66,6 +71,8 @@ func (w *requestWatermark) recordMutating(mutatingVal int) {
6671
}
6772

6873
func (w *requestWatermark) recordReadOnly(readOnlyVal int) {
74+
w.readOnlyObserver.Set(float64(readOnlyVal))
75+
6976
w.lock.Lock()
7077
defer w.lock.Unlock()
7178

@@ -74,9 +81,14 @@ func (w *requestWatermark) recordReadOnly(readOnlyVal int) {
7481
}
7582
}
7683

77-
var watermark = &requestWatermark{}
84+
// watermark tracks requests being executed (not waiting in a queue)
85+
var watermark = &requestWatermark{
86+
phase: metrics.ExecutingPhase,
87+
readOnlyObserver: fcmetrics.ReadWriteConcurrencyObserverPairGenerator.Generate(1, 1, []string{metrics.ReadOnlyKind}).RequestsExecuting,
88+
mutatingObserver: fcmetrics.ReadWriteConcurrencyObserverPairGenerator.Generate(1, 1, []string{metrics.MutatingKind}).RequestsExecuting,
89+
}
7890

79-
func startRecordingUsage() {
91+
func startRecordingUsage(watermark *requestWatermark) {
8092
go func() {
8193
wait.Forever(func() {
8294
watermark.lock.Lock()
@@ -86,7 +98,7 @@ func startRecordingUsage() {
8698
watermark.mutatingWatermark = 0
8799
watermark.lock.Unlock()
88100

89-
metrics.UpdateInflightRequestMetrics(readOnlyWatermark, mutatingWatermark)
101+
metrics.UpdateInflightRequestMetrics(watermark.phase, readOnlyWatermark, mutatingWatermark)
90102
}, inflightUsageMetricUpdatePeriod)
91103
}()
92104
}
@@ -100,17 +112,19 @@ func WithMaxInFlightLimit(
100112
mutatingLimit int,
101113
longRunningRequestCheck apirequest.LongRunningRequestCheck,
102114
) http.Handler {
103-
startOnce.Do(startRecordingUsage)
115+
startOnce.Do(func() { startRecordingUsage(watermark) })
104116
if nonMutatingLimit == 0 && mutatingLimit == 0 {
105117
return handler
106118
}
107119
var nonMutatingChan chan bool
108120
var mutatingChan chan bool
109121
if nonMutatingLimit != 0 {
110122
nonMutatingChan = make(chan bool, nonMutatingLimit)
123+
watermark.readOnlyObserver.SetX1(float64(nonMutatingLimit))
111124
}
112125
if mutatingLimit != 0 {
113126
mutatingChan = make(chan bool, mutatingLimit)
127+
watermark.mutatingObserver.SetX1(float64(mutatingLimit))
114128
}
115129

116130
return http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
@@ -141,21 +155,22 @@ func WithMaxInFlightLimit(
141155

142156
select {
143157
case c <- true:
144-
var mutatingLen, readOnlyLen int
158+
// We note the concurrency level both while the
159+
// request is being served and after it is done being
160+
// served, because both states contribute to the
161+
// sampled stats on concurrency.
145162
if isMutatingRequest {
146-
mutatingLen = len(mutatingChan)
163+
watermark.recordMutating(len(c))
147164
} else {
148-
readOnlyLen = len(nonMutatingChan)
165+
watermark.recordReadOnly(len(c))
149166
}
150-
151167
defer func() {
152168
<-c
153169
if isMutatingRequest {
154-
watermark.recordMutating(mutatingLen)
170+
watermark.recordMutating(len(c))
155171
} else {
156-
watermark.recordReadOnly(readOnlyLen)
172+
watermark.recordReadOnly(len(c))
157173
}
158-
159174
}()
160175
handler.ServeHTTP(w, r)
161176

staging/src/k8s.io/apiserver/pkg/server/filters/priority-and-fairness.go

Lines changed: 36 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -24,8 +24,10 @@ import (
2424

2525
fcv1a1 "k8s.io/api/flowcontrol/v1alpha1"
2626
apitypes "k8s.io/apimachinery/pkg/types"
27+
epmetrics "k8s.io/apiserver/pkg/endpoints/metrics"
2728
apirequest "k8s.io/apiserver/pkg/endpoints/request"
2829
utilflowcontrol "k8s.io/apiserver/pkg/util/flowcontrol"
30+
fcmetrics "k8s.io/apiserver/pkg/util/flowcontrol/metrics"
2931
"k8s.io/klog/v2"
3032
)
3133

@@ -53,7 +55,15 @@ func GetClassification(ctx context.Context) *PriorityAndFairnessClassification {
5355
return ctx.Value(priorityAndFairnessKey).(*PriorityAndFairnessClassification)
5456
}
5557

56-
var atomicMutatingLen, atomicNonMutatingLen int32
58+
// waitingMark tracks requests waiting rather than being executed
59+
var waitingMark = &requestWatermark{
60+
phase: epmetrics.WaitingPhase,
61+
readOnlyObserver: fcmetrics.ReadWriteConcurrencyObserverPairGenerator.Generate(1, 1, []string{epmetrics.ReadOnlyKind}).RequestsWaiting,
62+
mutatingObserver: fcmetrics.ReadWriteConcurrencyObserverPairGenerator.Generate(1, 1, []string{epmetrics.MutatingKind}).RequestsWaiting,
63+
}
64+
65+
var atomicMutatingExecuting, atomicReadOnlyExecuting int32
66+
var atomicMutatingWaiting, atomicReadOnlyWaiting int32
5767

5868
// WithPriorityAndFairness limits the number of in-flight
5969
// requests in a fine-grained way.
@@ -66,7 +76,10 @@ func WithPriorityAndFairness(
6676
klog.Warningf("priority and fairness support not found, skipping")
6777
return handler
6878
}
69-
startOnce.Do(startRecordingUsage)
79+
startOnce.Do(func() {
80+
startRecordingUsage(watermark)
81+
startRecordingUsage(waitingMark)
82+
})
7083
return http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
7184
ctx := r.Context()
7285
requestInfo, ok := apirequest.RequestInfoFrom(ctx)
@@ -98,22 +111,23 @@ func WithPriorityAndFairness(
98111

99112
var served bool
100113
isMutatingRequest := !nonMutatingRequestVerbs.Has(requestInfo.Verb)
101-
execute := func() {
102-
var mutatingLen, readOnlyLen int
114+
noteExecutingDelta := func(delta int32) {
103115
if isMutatingRequest {
104-
mutatingLen = int(atomic.AddInt32(&atomicMutatingLen, 1))
116+
watermark.recordMutating(int(atomic.AddInt32(&atomicMutatingExecuting, delta)))
105117
} else {
106-
readOnlyLen = int(atomic.AddInt32(&atomicNonMutatingLen, 1))
118+
watermark.recordReadOnly(int(atomic.AddInt32(&atomicReadOnlyExecuting, delta)))
107119
}
108-
defer func() {
109-
if isMutatingRequest {
110-
atomic.AddInt32(&atomicMutatingLen, -11)
111-
watermark.recordMutating(mutatingLen)
112-
} else {
113-
atomic.AddInt32(&atomicNonMutatingLen, -1)
114-
watermark.recordReadOnly(readOnlyLen)
115-
}
116-
}()
120+
}
121+
noteWaitingDelta := func(delta int32) {
122+
if isMutatingRequest {
123+
waitingMark.recordMutating(int(atomic.AddInt32(&atomicMutatingWaiting, delta)))
124+
} else {
125+
waitingMark.recordReadOnly(int(atomic.AddInt32(&atomicReadOnlyWaiting, delta)))
126+
}
127+
}
128+
execute := func() {
129+
noteExecutingDelta(1)
130+
defer noteExecutingDelta(-1)
117131
served = true
118132
innerCtx := context.WithValue(ctx, priorityAndFairnessKey, classification)
119133
innerReq := r.Clone(innerCtx)
@@ -122,10 +136,15 @@ func WithPriorityAndFairness(
122136
handler.ServeHTTP(w, innerReq)
123137
}
124138
digest := utilflowcontrol.RequestDigest{requestInfo, user}
125-
fcIfc.Handle(ctx, digest, note, execute)
139+
fcIfc.Handle(ctx, digest, note, func(inQueue bool) {
140+
if inQueue {
141+
noteWaitingDelta(1)
142+
} else {
143+
noteWaitingDelta(-1)
144+
}
145+
}, execute)
126146
if !served {
127147
tooManyRequests(r, w)
128-
return
129148
}
130149

131150
})

staging/src/k8s.io/apiserver/pkg/util/flowcontrol/BUILD

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -85,6 +85,7 @@ go_test(
8585
"//staging/src/k8s.io/apiserver/pkg/util/flowcontrol/fairqueuing:go_default_library",
8686
"//staging/src/k8s.io/apiserver/pkg/util/flowcontrol/fairqueuing/testing:go_default_library",
8787
"//staging/src/k8s.io/apiserver/pkg/util/flowcontrol/format:go_default_library",
88+
"//staging/src/k8s.io/apiserver/pkg/util/flowcontrol/metrics:go_default_library",
8889
"//staging/src/k8s.io/client-go/informers:go_default_library",
8990
"//staging/src/k8s.io/client-go/kubernetes/fake:go_default_library",
9091
"//staging/src/k8s.io/client-go/kubernetes/typed/flowcontrol/v1alpha1:go_default_library",

0 commit comments

Comments
 (0)