Skip to content

Commit 110d283

Browse files
committed
feat(scheduler): support inflight_events metric
1 parent 8486ed0 commit 110d283

File tree

6 files changed

+161
-13
lines changed

6 files changed

+161
-13
lines changed

pkg/scheduler/backend/queue/active_queue.go

Lines changed: 16 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -123,14 +123,17 @@ type activeQueue struct {
123123

124124
// isSchedulingQueueHintEnabled indicates whether the feature gate for the scheduling queue is enabled.
125125
isSchedulingQueueHintEnabled bool
126+
127+
metricsRecorder metrics.MetricAsyncRecorder
126128
}
127129

128-
func newActiveQueue(queue *heap.Heap[*framework.QueuedPodInfo], isSchedulingQueueHintEnabled bool) *activeQueue {
130+
func newActiveQueue(queue *heap.Heap[*framework.QueuedPodInfo], isSchedulingQueueHintEnabled bool, metricRecorder metrics.MetricAsyncRecorder) *activeQueue {
129131
aq := &activeQueue{
130132
queue: queue,
131133
inFlightPods: make(map[types.UID]*list.Element),
132134
inFlightEvents: list.New(),
133135
isSchedulingQueueHintEnabled: isSchedulingQueueHintEnabled,
136+
metricsRecorder: metricRecorder,
134137
}
135138
aq.cond.L = &aq.lock
136139

@@ -201,6 +204,7 @@ func (aq *activeQueue) pop(logger klog.Logger) (*framework.QueuedPodInfo, error)
201204
aq.schedCycle++
202205
// In flight, no concurrent events yet.
203206
if aq.isSchedulingQueueHintEnabled {
207+
aq.metricsRecorder.ObserveInFlightEventsAsync(metrics.PodPoppedInFlightEvent, 1)
204208
aq.inFlightPods[pInfo.Pod.UID] = aq.inFlightEvents.PushBack(pInfo.Pod)
205209
}
206210

@@ -293,6 +297,7 @@ func (aq *activeQueue) addEventIfPodInFlight(oldPod, newPod *v1.Pod, event frame
293297

294298
_, ok := aq.inFlightPods[newPod.UID]
295299
if ok {
300+
aq.metricsRecorder.ObserveInFlightEventsAsync(event.Label, 1)
296301
aq.inFlightEvents.PushBack(&clusterEvent{
297302
event: event,
298303
oldObj: oldPod,
@@ -309,6 +314,7 @@ func (aq *activeQueue) addEventIfAnyInFlight(oldObj, newObj interface{}, event f
309314
defer aq.lock.Unlock()
310315

311316
if len(aq.inFlightPods) != 0 {
317+
aq.metricsRecorder.ObserveInFlightEventsAsync(event.Label, 1)
312318
aq.inFlightEvents.PushBack(&clusterEvent{
313319
event: event,
314320
oldObj: oldObj,
@@ -340,7 +346,9 @@ func (aq *activeQueue) done(pod types.UID) {
340346

341347
// Remove the pod from the list.
342348
aq.inFlightEvents.Remove(inFlightPod)
349+
aq.metricsRecorder.ObserveInFlightEventsAsync(metrics.PodPoppedInFlightEvent, -1)
343350

351+
aggrMetricsCounter := map[string]int{}
344352
// Remove events which are only referred to by this Pod
345353
// so that the inFlightEvents list doesn't grow infinitely.
346354
// If the pod was at the head of the list, then all
@@ -352,11 +360,17 @@ func (aq *activeQueue) done(pod types.UID) {
352360
// Empty list.
353361
break
354362
}
355-
if _, ok := e.Value.(*clusterEvent); !ok {
363+
ev, ok := e.Value.(*clusterEvent)
364+
if !ok {
356365
// A pod, must stop pruning.
357366
break
358367
}
359368
aq.inFlightEvents.Remove(e)
369+
aggrMetricsCounter[ev.event.Label]--
370+
}
371+
372+
for evLabel, count := range aggrMetricsCounter {
373+
aq.metricsRecorder.ObserveInFlightEventsAsync(evLabel, float64(count))
360374
}
361375
}
362376

pkg/scheduler/backend/queue/scheduling_queue.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -331,7 +331,7 @@ func NewPriorityQueue(
331331
podInitialBackoffDuration: options.podInitialBackoffDuration,
332332
podMaxBackoffDuration: options.podMaxBackoffDuration,
333333
podMaxInUnschedulablePodsDuration: options.podMaxInUnschedulablePodsDuration,
334-
activeQ: newActiveQueue(heap.NewWithRecorder(podInfoKeyFunc, heap.LessFunc[*framework.QueuedPodInfo](lessFn), metrics.NewActivePodsRecorder()), isSchedulingQueueHintEnabled),
334+
activeQ: newActiveQueue(heap.NewWithRecorder(podInfoKeyFunc, heap.LessFunc[*framework.QueuedPodInfo](lessFn), metrics.NewActivePodsRecorder()), isSchedulingQueueHintEnabled, options.metricsRecorder),
335335
unschedulablePods: newUnschedulablePods(metrics.NewUnschedulablePodsRecorder(), metrics.NewGatedPodsRecorder()),
336336
preEnqueuePluginMap: options.preEnqueuePluginMap,
337337
queueingHintMap: options.queueingHintMap,

pkg/scheduler/backend/queue/testing.go

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -18,11 +18,13 @@ package queue
1818

1919
import (
2020
"context"
21+
"time"
2122

2223
"k8s.io/apimachinery/pkg/runtime"
2324
"k8s.io/client-go/informers"
2425
"k8s.io/client-go/kubernetes/fake"
2526
"k8s.io/kubernetes/pkg/scheduler/framework"
27+
"k8s.io/kubernetes/pkg/scheduler/metrics"
2628
)
2729

2830
// NewTestQueue creates a priority queue with an empty informer factory.
@@ -39,6 +41,12 @@ func NewTestQueueWithObjects(
3941
opts ...Option,
4042
) *PriorityQueue {
4143
informerFactory := informers.NewSharedInformerFactory(fake.NewClientset(objs...), 0)
44+
45+
// Because some major functions (e.g., Pop) requires the metric recorder to be set,
46+
// we always set a metric recorder here.
47+
recorder := metrics.NewMetricsAsyncRecorder(10, 20*time.Microsecond, ctx.Done())
48+
// We set it before the options that users provide, so that users can override it.
49+
opts = append([]Option{WithMetricsRecorder(*recorder)}, opts...)
4250
return NewTestQueueWithInformerFactory(ctx, lessFn, informerFactory, opts...)
4351
}
4452

pkg/scheduler/metrics/metric_recorder.go

Lines changed: 63 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -80,23 +80,42 @@ func (r *PendingPodsRecorder) Clear() {
8080
r.recorder.Set(float64(0))
8181
}
8282

83-
// metric is the data structure passed in the buffer channel between the main framework thread
83+
// histgramVecMetric is the data structure passed in the buffer channel between the main framework thread
8484
// and the metricsRecorder goroutine.
85-
type metric struct {
85+
type histgramVecMetric struct {
8686
metric *metrics.HistogramVec
8787
labelValues []string
8888
value float64
8989
}
9090

91+
type gaugeVecMetric struct {
92+
metric *metrics.GaugeVec
93+
labelValues []string
94+
valueToAdd float64
95+
}
96+
97+
type gaugeVecMetricKey struct {
98+
metricName string
99+
labelValue string
100+
}
101+
91102
// MetricAsyncRecorder records metric in a separate goroutine to avoid overhead in the critical path.
92103
type MetricAsyncRecorder struct {
93104
// bufferCh is a channel that serves as a metrics buffer before the metricsRecorder goroutine reports it.
94-
bufferCh chan *metric
105+
bufferCh chan *histgramVecMetric
95106
// if bufferSize is reached, incoming metrics will be discarded.
96107
bufferSize int
97108
// how often the recorder runs to flush the metrics.
98109
interval time.Duration
99110

111+
// aggregatedInflightEventMetric is only to record InFlightEvents metric asynchronously.
112+
// It's a map from gaugeVecMetricKey to the aggregated value
113+
// and the aggregated value is flushed to Prometheus every time the interval is reached.
114+
// Note that we don't lock the map deliberately because we assume the queue takes lock before updating the in-flight events.
115+
aggregatedInflightEventMetric map[gaugeVecMetricKey]int
116+
aggregatedInflightEventMetricLastFlushTime time.Time
117+
aggregatedInflightEventMetricBufferCh chan *gaugeVecMetric
118+
100119
// stopCh is used to stop the goroutine which periodically flushes metrics.
101120
stopCh <-chan struct{}
102121
// IsStoppedCh indicates whether the goroutine is stopped. It's used in tests only to make sure
@@ -106,11 +125,14 @@ type MetricAsyncRecorder struct {
106125

107126
func NewMetricsAsyncRecorder(bufferSize int, interval time.Duration, stopCh <-chan struct{}) *MetricAsyncRecorder {
108127
recorder := &MetricAsyncRecorder{
109-
bufferCh: make(chan *metric, bufferSize),
110-
bufferSize: bufferSize,
111-
interval: interval,
112-
stopCh: stopCh,
113-
IsStoppedCh: make(chan struct{}),
128+
bufferCh: make(chan *histgramVecMetric, bufferSize),
129+
bufferSize: bufferSize,
130+
interval: interval,
131+
stopCh: stopCh,
132+
aggregatedInflightEventMetric: make(map[gaugeVecMetricKey]int),
133+
aggregatedInflightEventMetricLastFlushTime: time.Now(),
134+
aggregatedInflightEventMetricBufferCh: make(chan *gaugeVecMetric, bufferSize),
135+
IsStoppedCh: make(chan struct{}),
114136
}
115137
go recorder.run()
116138
return recorder
@@ -128,8 +150,32 @@ func (r *MetricAsyncRecorder) ObserveQueueingHintDurationAsync(pluginName, event
128150
r.observeMetricAsync(queueingHintExecutionDuration, value, pluginName, event, hint)
129151
}
130152

153+
// ObserveInFlightEventsAsync observes the in_flight_events metric.
154+
// Note that this function is not goroutine-safe;
155+
// we don't lock the map deliberately for the performance reason and we assume the queue (i.e., the caller) takes lock before updating the in-flight events.
156+
func (r *MetricAsyncRecorder) ObserveInFlightEventsAsync(eventLabel string, valueToAdd float64) {
157+
r.aggregatedInflightEventMetric[gaugeVecMetricKey{metricName: InFlightEvents.Name, labelValue: eventLabel}] += int(valueToAdd)
158+
159+
// Only flush the metric to the channal if the interval is reached.
160+
// The values are flushed to Prometheus in the run() function, which runs once the interval time.
161+
if time.Since(r.aggregatedInflightEventMetricLastFlushTime) > r.interval {
162+
for key, value := range r.aggregatedInflightEventMetric {
163+
newMetric := &gaugeVecMetric{
164+
metric: InFlightEvents,
165+
labelValues: []string{key.labelValue},
166+
valueToAdd: float64(value),
167+
}
168+
select {
169+
case r.aggregatedInflightEventMetricBufferCh <- newMetric:
170+
default:
171+
}
172+
}
173+
r.aggregatedInflightEventMetricLastFlushTime = time.Now()
174+
}
175+
}
176+
131177
func (r *MetricAsyncRecorder) observeMetricAsync(m *metrics.HistogramVec, value float64, labelsValues ...string) {
132-
newMetric := &metric{
178+
newMetric := &histgramVecMetric{
133179
metric: m,
134180
labelValues: labelsValues,
135181
value: value,
@@ -161,7 +207,14 @@ func (r *MetricAsyncRecorder) FlushMetrics() {
161207
case m := <-r.bufferCh:
162208
m.metric.WithLabelValues(m.labelValues...).Observe(m.value)
163209
default:
164-
return
210+
// no more value
211+
}
212+
213+
select {
214+
case m := <-r.aggregatedInflightEventMetricBufferCh:
215+
m.metric.WithLabelValues(m.labelValues...).Add(m.valueToAdd)
216+
default:
217+
// no more value
165218
}
166219
}
167220
}

pkg/scheduler/metrics/metric_recorder_test.go

Lines changed: 61 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -17,9 +17,14 @@ limitations under the License.
1717
package metrics
1818

1919
import (
20+
"sort"
2021
"sync"
2122
"sync/atomic"
2223
"testing"
24+
"time"
25+
26+
"github.com/google/go-cmp/cmp"
27+
"github.com/google/go-cmp/cmp/cmpopts"
2328
)
2429

2530
var _ MetricRecorder = &fakePodsRecorder{}
@@ -101,3 +106,59 @@ func TestClear(t *testing.T) {
101106
t.Errorf("Expected %v, got %v", 0, fakeRecorder.counter)
102107
}
103108
}
109+
110+
func TestInFlightEventAsync(t *testing.T) {
111+
r := &MetricAsyncRecorder{
112+
aggregatedInflightEventMetric: map[gaugeVecMetricKey]int{},
113+
aggregatedInflightEventMetricLastFlushTime: time.Now(),
114+
aggregatedInflightEventMetricBufferCh: make(chan *gaugeVecMetric, 100),
115+
interval: time.Hour,
116+
}
117+
118+
podAddLabel := "Pod/Add"
119+
r.ObserveInFlightEventsAsync(podAddLabel, 10)
120+
r.ObserveInFlightEventsAsync(podAddLabel, -1)
121+
r.ObserveInFlightEventsAsync(PodPoppedInFlightEvent, 1)
122+
123+
if d := cmp.Diff(r.aggregatedInflightEventMetric, map[gaugeVecMetricKey]int{
124+
{metricName: InFlightEvents.Name, labelValue: podAddLabel}: 9,
125+
{metricName: InFlightEvents.Name, labelValue: PodPoppedInFlightEvent}: 1,
126+
}, cmp.AllowUnexported(gaugeVecMetric{})); d != "" {
127+
t.Errorf("unexpected aggregatedInflightEventMetric: %s", d)
128+
}
129+
130+
r.aggregatedInflightEventMetricLastFlushTime = time.Now().Add(-time.Hour) // to test flush
131+
132+
// It adds -4 and flushes the metric to the channel.
133+
r.ObserveInFlightEventsAsync(podAddLabel, -4)
134+
135+
got := []gaugeVecMetric{}
136+
for {
137+
select {
138+
case m := <-r.aggregatedInflightEventMetricBufferCh:
139+
got = append(got, *m)
140+
continue
141+
default:
142+
}
143+
// got all
144+
break
145+
}
146+
147+
// sort got to avoid the flaky test
148+
sort.Slice(got, func(i, j int) bool {
149+
return got[i].labelValues[0] < got[j].labelValues[0]
150+
})
151+
152+
if d := cmp.Diff(got, []gaugeVecMetric{
153+
{
154+
labelValues: []string{podAddLabel},
155+
valueToAdd: 5,
156+
},
157+
{
158+
labelValues: []string{PodPoppedInFlightEvent},
159+
valueToAdd: 1,
160+
},
161+
}, cmp.AllowUnexported(gaugeVecMetric{}), cmpopts.IgnoreFields(gaugeVecMetric{}, "metric")); d != "" {
162+
t.Errorf("unexpected metrics are sent to aggregatedInflightEventMetricBufferCh: %s", d)
163+
}
164+
}

pkg/scheduler/metrics/metrics.go

Lines changed: 12 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -81,6 +81,10 @@ const (
8181
QueueingHintResultError = "Error"
8282
)
8383

84+
const (
85+
PodPoppedInFlightEvent = "PodPopped"
86+
)
87+
8488
// All the histogram based metrics have 1ms as size for the smallest bucket.
8589
var (
8690
scheduleAttempts = metrics.NewCounterVec(
@@ -141,6 +145,13 @@ var (
141145
Help: "Number of pending pods, by the queue type. 'active' means number of pods in activeQ; 'backoff' means number of pods in backoffQ; 'unschedulable' means number of pods in unschedulablePods that the scheduler attempted to schedule and failed; 'gated' is the number of unschedulable pods that the scheduler never attempted to schedule because they are gated.",
142146
StabilityLevel: metrics.STABLE,
143147
}, []string{"queue"})
148+
InFlightEvents = metrics.NewGaugeVec(
149+
&metrics.GaugeOpts{
150+
Subsystem: SchedulerSubsystem,
151+
Name: "inflight_events",
152+
Help: "Number of events recorded in the scheduling queue.",
153+
StabilityLevel: metrics.ALPHA,
154+
}, []string{"event"})
144155
Goroutines = metrics.NewGaugeVec(
145156
&metrics.GaugeOpts{
146157
Subsystem: SchedulerSubsystem,
@@ -292,6 +303,7 @@ func Register() {
292303
RegisterMetrics(metricsList...)
293304
if utilfeature.DefaultFeatureGate.Enabled(features.SchedulerQueueingHints) {
294305
RegisterMetrics(queueingHintExecutionDuration)
306+
RegisterMetrics(InFlightEvents)
295307
}
296308
volumebindingmetrics.RegisterVolumeSchedulingMetrics()
297309
})

0 commit comments

Comments
 (0)