Skip to content

Commit 05df9f4

Browse files
authored
Merge pull request kubernetes#127052 from sanposhiho/add-inflight-event-metric
feat(scheduler): support inflight_events metric
2 parents cc27d21 + b5ed15b commit 05df9f4

File tree

6 files changed

+179
-13
lines changed

6 files changed

+179
-13
lines changed

pkg/scheduler/backend/queue/active_queue.go

Lines changed: 21 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -123,14 +123,17 @@ type activeQueue struct {
123123

124124
// isSchedulingQueueHintEnabled indicates whether the feature gate for the scheduling queue is enabled.
125125
isSchedulingQueueHintEnabled bool
126+
127+
metricsRecorder metrics.MetricAsyncRecorder
126128
}
127129

128-
func newActiveQueue(queue *heap.Heap[*framework.QueuedPodInfo], isSchedulingQueueHintEnabled bool) *activeQueue {
130+
func newActiveQueue(queue *heap.Heap[*framework.QueuedPodInfo], isSchedulingQueueHintEnabled bool, metricRecorder metrics.MetricAsyncRecorder) *activeQueue {
129131
aq := &activeQueue{
130132
queue: queue,
131133
inFlightPods: make(map[types.UID]*list.Element),
132134
inFlightEvents: list.New(),
133135
isSchedulingQueueHintEnabled: isSchedulingQueueHintEnabled,
136+
metricsRecorder: metricRecorder,
134137
}
135138
aq.cond.L = &aq.lock
136139

@@ -201,6 +204,7 @@ func (aq *activeQueue) pop(logger klog.Logger) (*framework.QueuedPodInfo, error)
201204
aq.schedCycle++
202205
// In flight, no concurrent events yet.
203206
if aq.isSchedulingQueueHintEnabled {
207+
aq.metricsRecorder.ObserveInFlightEventsAsync(metrics.PodPoppedInFlightEvent, 1, false)
204208
aq.inFlightPods[pInfo.Pod.UID] = aq.inFlightEvents.PushBack(pInfo.Pod)
205209
}
206210

@@ -293,6 +297,7 @@ func (aq *activeQueue) addEventIfPodInFlight(oldPod, newPod *v1.Pod, event frame
293297

294298
_, ok := aq.inFlightPods[newPod.UID]
295299
if ok {
300+
aq.metricsRecorder.ObserveInFlightEventsAsync(event.Label, 1, false)
296301
aq.inFlightEvents.PushBack(&clusterEvent{
297302
event: event,
298303
oldObj: oldPod,
@@ -309,6 +314,7 @@ func (aq *activeQueue) addEventIfAnyInFlight(oldObj, newObj interface{}, event f
309314
defer aq.lock.Unlock()
310315

311316
if len(aq.inFlightPods) != 0 {
317+
aq.metricsRecorder.ObserveInFlightEventsAsync(event.Label, 1, false)
312318
aq.inFlightEvents.PushBack(&clusterEvent{
313319
event: event,
314320
oldObj: oldObj,
@@ -341,6 +347,7 @@ func (aq *activeQueue) done(pod types.UID) {
341347
// Remove the pod from the list.
342348
aq.inFlightEvents.Remove(inFlightPod)
343349

350+
aggrMetricsCounter := map[string]int{}
344351
// Remove events which are only referred to by this Pod
345352
// so that the inFlightEvents list doesn't grow infinitely.
346353
// If the pod was at the head of the list, then all
@@ -352,12 +359,24 @@ func (aq *activeQueue) done(pod types.UID) {
352359
// Empty list.
353360
break
354361
}
355-
if _, ok := e.Value.(*clusterEvent); !ok {
362+
ev, ok := e.Value.(*clusterEvent)
363+
if !ok {
356364
// A pod, must stop pruning.
357365
break
358366
}
359367
aq.inFlightEvents.Remove(e)
368+
aggrMetricsCounter[ev.event.Label]--
360369
}
370+
371+
for evLabel, count := range aggrMetricsCounter {
372+
aq.metricsRecorder.ObserveInFlightEventsAsync(evLabel, float64(count), false)
373+
}
374+
375+
aq.metricsRecorder.ObserveInFlightEventsAsync(metrics.PodPoppedInFlightEvent, -1,
376+
// If it's the last Pod in inFlightPods, we should force-flush the metrics.
377+
// Otherwise, especially in small clusters, which don't get a new Pod frequently,
378+
// the metrics might not be flushed for a long time.
379+
len(aq.inFlightPods) == 0)
361380
}
362381

363382
// close closes the activeQueue.

pkg/scheduler/backend/queue/scheduling_queue.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -331,7 +331,7 @@ func NewPriorityQueue(
331331
podInitialBackoffDuration: options.podInitialBackoffDuration,
332332
podMaxBackoffDuration: options.podMaxBackoffDuration,
333333
podMaxInUnschedulablePodsDuration: options.podMaxInUnschedulablePodsDuration,
334-
activeQ: newActiveQueue(heap.NewWithRecorder(podInfoKeyFunc, heap.LessFunc[*framework.QueuedPodInfo](lessFn), metrics.NewActivePodsRecorder()), isSchedulingQueueHintEnabled),
334+
activeQ: newActiveQueue(heap.NewWithRecorder(podInfoKeyFunc, heap.LessFunc[*framework.QueuedPodInfo](lessFn), metrics.NewActivePodsRecorder()), isSchedulingQueueHintEnabled, options.metricsRecorder),
335335
unschedulablePods: newUnschedulablePods(metrics.NewUnschedulablePodsRecorder(), metrics.NewGatedPodsRecorder()),
336336
preEnqueuePluginMap: options.preEnqueuePluginMap,
337337
queueingHintMap: options.queueingHintMap,

pkg/scheduler/backend/queue/testing.go

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -18,11 +18,13 @@ package queue
1818

1919
import (
2020
"context"
21+
"time"
2122

2223
"k8s.io/apimachinery/pkg/runtime"
2324
"k8s.io/client-go/informers"
2425
"k8s.io/client-go/kubernetes/fake"
2526
"k8s.io/kubernetes/pkg/scheduler/framework"
27+
"k8s.io/kubernetes/pkg/scheduler/metrics"
2628
)
2729

2830
// NewTestQueue creates a priority queue with an empty informer factory.
@@ -39,6 +41,12 @@ func NewTestQueueWithObjects(
3941
opts ...Option,
4042
) *PriorityQueue {
4143
informerFactory := informers.NewSharedInformerFactory(fake.NewClientset(objs...), 0)
44+
45+
// Because some major functions (e.g., Pop) requires the metric recorder to be set,
46+
// we always set a metric recorder here.
47+
recorder := metrics.NewMetricsAsyncRecorder(10, 20*time.Microsecond, ctx.Done())
48+
// We set it before the options that users provide, so that users can override it.
49+
opts = append([]Option{WithMetricsRecorder(*recorder)}, opts...)
4250
return NewTestQueueWithInformerFactory(ctx, lessFn, informerFactory, opts...)
4351
}
4452

pkg/scheduler/metrics/metric_recorder.go

Lines changed: 67 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -80,23 +80,42 @@ func (r *PendingPodsRecorder) Clear() {
8080
r.recorder.Set(float64(0))
8181
}
8282

83-
// metric is the data structure passed in the buffer channel between the main framework thread
83+
// histogramVecMetric is the data structure passed in the buffer channel between the main framework thread
8484
// and the metricsRecorder goroutine.
85-
type metric struct {
85+
type histogramVecMetric struct {
8686
metric *metrics.HistogramVec
8787
labelValues []string
8888
value float64
8989
}
9090

91+
type gaugeVecMetric struct {
92+
metric *metrics.GaugeVec
93+
labelValues []string
94+
valueToAdd float64
95+
}
96+
97+
type gaugeVecMetricKey struct {
98+
metricName string
99+
labelValue string
100+
}
101+
91102
// MetricAsyncRecorder records metric in a separate goroutine to avoid overhead in the critical path.
92103
type MetricAsyncRecorder struct {
93104
// bufferCh is a channel that serves as a metrics buffer before the metricsRecorder goroutine reports it.
94-
bufferCh chan *metric
105+
bufferCh chan *histogramVecMetric
95106
// if bufferSize is reached, incoming metrics will be discarded.
96107
bufferSize int
97108
// how often the recorder runs to flush the metrics.
98109
interval time.Duration
99110

111+
// aggregatedInflightEventMetric is only to record InFlightEvents metric asynchronously.
112+
// It's a map from gaugeVecMetricKey to the aggregated value
113+
// and the aggregated value is flushed to Prometheus every time the interval is reached.
114+
// Note that we don't lock the map deliberately because we assume the queue takes lock before updating the in-flight events.
115+
aggregatedInflightEventMetric map[gaugeVecMetricKey]int
116+
aggregatedInflightEventMetricLastFlushTime time.Time
117+
aggregatedInflightEventMetricBufferCh chan *gaugeVecMetric
118+
100119
// stopCh is used to stop the goroutine which periodically flushes metrics.
101120
stopCh <-chan struct{}
102121
// IsStoppedCh indicates whether the goroutine is stopped. It's used in tests only to make sure
@@ -106,11 +125,14 @@ type MetricAsyncRecorder struct {
106125

107126
func NewMetricsAsyncRecorder(bufferSize int, interval time.Duration, stopCh <-chan struct{}) *MetricAsyncRecorder {
108127
recorder := &MetricAsyncRecorder{
109-
bufferCh: make(chan *metric, bufferSize),
110-
bufferSize: bufferSize,
111-
interval: interval,
112-
stopCh: stopCh,
113-
IsStoppedCh: make(chan struct{}),
128+
bufferCh: make(chan *histogramVecMetric, bufferSize),
129+
bufferSize: bufferSize,
130+
interval: interval,
131+
stopCh: stopCh,
132+
aggregatedInflightEventMetric: make(map[gaugeVecMetricKey]int),
133+
aggregatedInflightEventMetricLastFlushTime: time.Now(),
134+
aggregatedInflightEventMetricBufferCh: make(chan *gaugeVecMetric, bufferSize),
135+
IsStoppedCh: make(chan struct{}),
114136
}
115137
go recorder.run()
116138
return recorder
@@ -128,8 +150,36 @@ func (r *MetricAsyncRecorder) ObserveQueueingHintDurationAsync(pluginName, event
128150
r.observeMetricAsync(queueingHintExecutionDuration, value, pluginName, event, hint)
129151
}
130152

153+
// ObserveInFlightEventsAsync observes the in_flight_events metric.
154+
//
155+
// Note that this function is not goroutine-safe;
156+
// we don't lock the map deliberately for the performance reason and we assume the queue (i.e., the caller) takes lock before updating the in-flight events.
157+
func (r *MetricAsyncRecorder) ObserveInFlightEventsAsync(eventLabel string, valueToAdd float64, forceFlush bool) {
158+
r.aggregatedInflightEventMetric[gaugeVecMetricKey{metricName: InFlightEvents.Name, labelValue: eventLabel}] += int(valueToAdd)
159+
160+
// Only flush the metric to the channel if the interval is reached.
161+
// The values are flushed to Prometheus in the run() function, which runs once the interval time.
162+
// Note: we implement this flushing here, not in FlushMetrics, because, if we did so, we would need to implement a lock for the map, which we want to avoid.
163+
if forceFlush || time.Since(r.aggregatedInflightEventMetricLastFlushTime) > r.interval {
164+
for key, value := range r.aggregatedInflightEventMetric {
165+
newMetric := &gaugeVecMetric{
166+
metric: InFlightEvents,
167+
labelValues: []string{key.labelValue},
168+
valueToAdd: float64(value),
169+
}
170+
select {
171+
case r.aggregatedInflightEventMetricBufferCh <- newMetric:
172+
default:
173+
}
174+
}
175+
r.aggregatedInflightEventMetricLastFlushTime = time.Now()
176+
// reset
177+
r.aggregatedInflightEventMetric = make(map[gaugeVecMetricKey]int)
178+
}
179+
}
180+
131181
func (r *MetricAsyncRecorder) observeMetricAsync(m *metrics.HistogramVec, value float64, labelsValues ...string) {
132-
newMetric := &metric{
182+
newMetric := &histogramVecMetric{
133183
metric: m,
134184
labelValues: labelsValues,
135185
value: value,
@@ -161,7 +211,14 @@ func (r *MetricAsyncRecorder) FlushMetrics() {
161211
case m := <-r.bufferCh:
162212
m.metric.WithLabelValues(m.labelValues...).Observe(m.value)
163213
default:
164-
return
214+
// no more value
215+
}
216+
217+
select {
218+
case m := <-r.aggregatedInflightEventMetricBufferCh:
219+
m.metric.WithLabelValues(m.labelValues...).Add(m.valueToAdd)
220+
default:
221+
// no more value
165222
}
166223
}
167224
}

pkg/scheduler/metrics/metric_recorder_test.go

Lines changed: 70 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -17,9 +17,14 @@ limitations under the License.
1717
package metrics
1818

1919
import (
20+
"sort"
2021
"sync"
2122
"sync/atomic"
2223
"testing"
24+
"time"
25+
26+
"github.com/google/go-cmp/cmp"
27+
"github.com/google/go-cmp/cmp/cmpopts"
2328
)
2429

2530
var _ MetricRecorder = &fakePodsRecorder{}
@@ -101,3 +106,68 @@ func TestClear(t *testing.T) {
101106
t.Errorf("Expected %v, got %v", 0, fakeRecorder.counter)
102107
}
103108
}
109+
110+
func TestInFlightEventAsync(t *testing.T) {
111+
r := &MetricAsyncRecorder{
112+
aggregatedInflightEventMetric: map[gaugeVecMetricKey]int{},
113+
aggregatedInflightEventMetricLastFlushTime: time.Now(),
114+
aggregatedInflightEventMetricBufferCh: make(chan *gaugeVecMetric, 100),
115+
interval: time.Hour,
116+
}
117+
118+
podAddLabel := "Pod/Add"
119+
r.ObserveInFlightEventsAsync(podAddLabel, 10, false)
120+
r.ObserveInFlightEventsAsync(podAddLabel, -1, false)
121+
r.ObserveInFlightEventsAsync(PodPoppedInFlightEvent, 1, false)
122+
123+
if d := cmp.Diff(r.aggregatedInflightEventMetric, map[gaugeVecMetricKey]int{
124+
{metricName: InFlightEvents.Name, labelValue: podAddLabel}: 9,
125+
{metricName: InFlightEvents.Name, labelValue: PodPoppedInFlightEvent}: 1,
126+
}, cmp.AllowUnexported(gaugeVecMetric{})); d != "" {
127+
t.Errorf("unexpected aggregatedInflightEventMetric: %s", d)
128+
}
129+
130+
r.aggregatedInflightEventMetricLastFlushTime = time.Now().Add(-time.Hour) // to test flush
131+
132+
// It adds -4 and flushes the metric to the channel.
133+
r.ObserveInFlightEventsAsync(podAddLabel, -4, false)
134+
if len(r.aggregatedInflightEventMetric) != 0 {
135+
t.Errorf("aggregatedInflightEventMetric should be cleared, but got: %v", r.aggregatedInflightEventMetric)
136+
}
137+
138+
got := []gaugeVecMetric{}
139+
for {
140+
select {
141+
case m := <-r.aggregatedInflightEventMetricBufferCh:
142+
got = append(got, *m)
143+
continue
144+
default:
145+
}
146+
// got all
147+
break
148+
}
149+
150+
// sort got to avoid the flaky test
151+
sort.Slice(got, func(i, j int) bool {
152+
return got[i].labelValues[0] < got[j].labelValues[0]
153+
})
154+
155+
if d := cmp.Diff(got, []gaugeVecMetric{
156+
{
157+
labelValues: []string{podAddLabel},
158+
valueToAdd: 5,
159+
},
160+
{
161+
labelValues: []string{PodPoppedInFlightEvent},
162+
valueToAdd: 1,
163+
},
164+
}, cmp.AllowUnexported(gaugeVecMetric{}), cmpopts.IgnoreFields(gaugeVecMetric{}, "metric")); d != "" {
165+
t.Errorf("unexpected metrics are sent to aggregatedInflightEventMetricBufferCh: %s", d)
166+
}
167+
168+
// Test force flush
169+
r.ObserveInFlightEventsAsync(podAddLabel, 1, true)
170+
if len(r.aggregatedInflightEventMetric) != 0 {
171+
t.Errorf("aggregatedInflightEventMetric should be force-flushed, but got: %v", r.aggregatedInflightEventMetric)
172+
}
173+
}

pkg/scheduler/metrics/metrics.go

Lines changed: 12 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -81,6 +81,10 @@ const (
8181
QueueingHintResultError = "Error"
8282
)
8383

84+
const (
85+
PodPoppedInFlightEvent = "PodPopped"
86+
)
87+
8488
// All the histogram based metrics have 1ms as size for the smallest bucket.
8589
var (
8690
scheduleAttempts = metrics.NewCounterVec(
@@ -141,6 +145,13 @@ var (
141145
Help: "Number of pending pods, by the queue type. 'active' means number of pods in activeQ; 'backoff' means number of pods in backoffQ; 'unschedulable' means number of pods in unschedulablePods that the scheduler attempted to schedule and failed; 'gated' is the number of unschedulable pods that the scheduler never attempted to schedule because they are gated.",
142146
StabilityLevel: metrics.STABLE,
143147
}, []string{"queue"})
148+
InFlightEvents = metrics.NewGaugeVec(
149+
&metrics.GaugeOpts{
150+
Subsystem: SchedulerSubsystem,
151+
Name: "inflight_events",
152+
Help: "Number of events currently tracked in the scheduling queue.",
153+
StabilityLevel: metrics.ALPHA,
154+
}, []string{"event"})
144155
Goroutines = metrics.NewGaugeVec(
145156
&metrics.GaugeOpts{
146157
Subsystem: SchedulerSubsystem,
@@ -292,6 +303,7 @@ func Register() {
292303
RegisterMetrics(metricsList...)
293304
if utilfeature.DefaultFeatureGate.Enabled(features.SchedulerQueueingHints) {
294305
RegisterMetrics(queueingHintExecutionDuration)
306+
RegisterMetrics(InFlightEvents)
295307
}
296308
volumebindingmetrics.RegisterVolumeSchedulingMetrics()
297309
})

0 commit comments

Comments
 (0)