Skip to content

Commit 7afbd68

Browse files
committed
add metrics to record number of pending pods in different queues
1 parent 62f5fd4 commit 7afbd68

File tree

6 files changed

+233
-9
lines changed

6 files changed

+233
-9
lines changed

pkg/scheduler/internal/queue/scheduling_queue.go

Lines changed: 24 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -42,6 +42,7 @@ import (
4242
podutil "k8s.io/kubernetes/pkg/api/v1/pod"
4343
"k8s.io/kubernetes/pkg/scheduler/algorithm/predicates"
4444
priorityutil "k8s.io/kubernetes/pkg/scheduler/algorithm/priorities/util"
45+
"k8s.io/kubernetes/pkg/scheduler/metrics"
4546
"k8s.io/kubernetes/pkg/scheduler/util"
4647
)
4748

@@ -283,13 +284,13 @@ func NewPriorityQueueWithClock(stop <-chan struct{}, clock util.Clock) *Priority
283284
clock: clock,
284285
stop: stop,
285286
podBackoff: NewPodBackoffMap(1*time.Second, 10*time.Second),
286-
activeQ: util.NewHeap(podInfoKeyFunc, activeQComp),
287-
unschedulableQ: newUnschedulablePodsMap(),
287+
activeQ: util.NewHeapWithRecorder(podInfoKeyFunc, activeQComp, metrics.NewActivePodsRecorder()),
288+
unschedulableQ: newUnschedulablePodsMap(metrics.NewUnschedulablePodsRecorder()),
288289
nominatedPods: newNominatedPodMap(),
289290
moveRequestCycle: -1,
290291
}
291292
pq.cond.L = &pq.lock
292-
pq.podBackoffQ = util.NewHeap(podInfoKeyFunc, pq.podsCompareBackoffCompleted)
293+
pq.podBackoffQ = util.NewHeapWithRecorder(podInfoKeyFunc, pq.podsCompareBackoffCompleted, metrics.NewBackoffPodsRecorder())
293294

294295
pq.run()
295296

@@ -777,16 +778,27 @@ type UnschedulablePodsMap struct {
777778
// podInfoMap is a map key by a pod's full-name and the value is a pointer to the podInfo.
778779
podInfoMap map[string]*podInfo
779780
keyFunc func(*v1.Pod) string
781+
// metricRecorder updates the counter when elements of an unschedulablePodsMap
782+
// get added or removed, and it does nothing if it's nil
783+
metricRecorder metrics.MetricRecorder
780784
}
781785

782786
// Add adds a pod to the unschedulable podInfoMap.
783787
func (u *UnschedulablePodsMap) addOrUpdate(pInfo *podInfo) {
784-
u.podInfoMap[u.keyFunc(pInfo.pod)] = pInfo
788+
podID := u.keyFunc(pInfo.pod)
789+
if _, exists := u.podInfoMap[podID]; !exists && u.metricRecorder != nil {
790+
u.metricRecorder.Inc()
791+
}
792+
u.podInfoMap[podID] = pInfo
785793
}
786794

787795
// Delete deletes a pod from the unschedulable podInfoMap.
788796
func (u *UnschedulablePodsMap) delete(pod *v1.Pod) {
789-
delete(u.podInfoMap, u.keyFunc(pod))
797+
podID := u.keyFunc(pod)
798+
if _, exists := u.podInfoMap[podID]; exists && u.metricRecorder != nil {
799+
u.metricRecorder.Dec()
800+
}
801+
delete(u.podInfoMap, podID)
790802
}
791803

792804
// Get returns the podInfo if a pod with the same key as the key of the given "pod"
@@ -802,13 +814,17 @@ func (u *UnschedulablePodsMap) get(pod *v1.Pod) *podInfo {
802814
// Clear removes all the entries from the unschedulable podInfoMap.
803815
func (u *UnschedulablePodsMap) clear() {
804816
u.podInfoMap = make(map[string]*podInfo)
817+
if u.metricRecorder != nil {
818+
u.metricRecorder.Clear()
819+
}
805820
}
806821

807822
// newUnschedulablePodsMap initializes a new object of UnschedulablePodsMap.
808-
func newUnschedulablePodsMap() *UnschedulablePodsMap {
823+
func newUnschedulablePodsMap(metricRecorder metrics.MetricRecorder) *UnschedulablePodsMap {
809824
return &UnschedulablePodsMap{
810-
podInfoMap: make(map[string]*podInfo),
811-
keyFunc: util.GetPodFullName,
825+
podInfoMap: make(map[string]*podInfo),
826+
keyFunc: util.GetPodFullName,
827+
metricRecorder: metricRecorder,
812828
}
813829
}
814830

pkg/scheduler/internal/queue/scheduling_queue_test.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -647,7 +647,7 @@ func TestUnschedulablePodsMap(t *testing.T) {
647647

648648
for _, test := range tests {
649649
t.Run(test.name, func(t *testing.T) {
650-
upm := newUnschedulablePodsMap()
650+
upm := newUnschedulablePodsMap(nil)
651651
for _, p := range test.podsToAdd {
652652
upm.addOrUpdate(newPodInfoNoTimestamp(p))
653653
}
Lines changed: 72 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,72 @@
1+
/*
2+
Copyright 2019 The Kubernetes Authors.
3+
4+
Licensed under the Apache License, Version 2.0 (the "License");
5+
you may not use this file except in compliance with the License.
6+
You may obtain a copy of the License at
7+
8+
http://www.apache.org/licenses/LICENSE-2.0
9+
10+
Unless required by applicable law or agreed to in writing, software
11+
distributed under the License is distributed on an "AS IS" BASIS,
12+
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
See the License for the specific language governing permissions and
14+
limitations under the License.
15+
*/
16+
17+
package metrics
18+
19+
import (
20+
"github.com/prometheus/client_golang/prometheus"
21+
)
22+
23+
// MetricRecorder represents a metric recorder which takes action when the
24+
// metric Inc(), Dec() and Clear()
25+
type MetricRecorder interface {
26+
Inc()
27+
Dec()
28+
Clear()
29+
}
30+
31+
var _ MetricRecorder = &PendingPodsRecorder{}
32+
33+
// PendingPodsRecorder is an implementation of MetricRecorder
34+
type PendingPodsRecorder struct {
35+
recorder prometheus.Gauge
36+
}
37+
38+
// NewActivePodsRecorder returns ActivePods in a Prometheus metric fashion
39+
func NewActivePodsRecorder() *PendingPodsRecorder {
40+
return &PendingPodsRecorder{
41+
recorder: ActivePods,
42+
}
43+
}
44+
45+
// NewUnschedulablePodsRecorder returns UnschedulablePods in a Prometheus metric fashion
46+
func NewUnschedulablePodsRecorder() *PendingPodsRecorder {
47+
return &PendingPodsRecorder{
48+
recorder: UnschedulablePods,
49+
}
50+
}
51+
52+
// NewBackoffPodsRecorder returns BackoffPods in a Prometheus metric fashion
53+
func NewBackoffPodsRecorder() *PendingPodsRecorder {
54+
return &PendingPodsRecorder{
55+
recorder: BackoffPods,
56+
}
57+
}
58+
59+
// Inc increases a metric counter by 1, in an atomic way
60+
func (r *PendingPodsRecorder) Inc() {
61+
r.recorder.Inc()
62+
}
63+
64+
// Dec decreases a metric counter by 1, in an atomic way
65+
func (r *PendingPodsRecorder) Dec() {
66+
r.recorder.Dec()
67+
}
68+
69+
// Clear set a metric counter to 0, in an atomic way
70+
func (r *PendingPodsRecorder) Clear() {
71+
r.recorder.Set(float64(0))
72+
}
Lines changed: 103 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,103 @@
1+
/*
2+
Copyright 2019 The Kubernetes Authors.
3+
4+
Licensed under the Apache License, Version 2.0 (the "License");
5+
you may not use this file except in compliance with the License.
6+
You may obtain a copy of the License at
7+
8+
http://www.apache.org/licenses/LICENSE-2.0
9+
10+
Unless required by applicable law or agreed to in writing, software
11+
distributed under the License is distributed on an "AS IS" BASIS,
12+
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
See the License for the specific language governing permissions and
14+
limitations under the License.
15+
*/
16+
17+
package metrics
18+
19+
import (
20+
"sync"
21+
"sync/atomic"
22+
"testing"
23+
)
24+
25+
var _ MetricRecorder = &fakePodsRecorder{}
26+
27+
type fakePodsRecorder struct {
28+
counter int64
29+
}
30+
31+
func (r *fakePodsRecorder) Inc() {
32+
atomic.AddInt64(&r.counter, 1)
33+
}
34+
35+
func (r *fakePodsRecorder) Dec() {
36+
atomic.AddInt64(&r.counter, -1)
37+
}
38+
39+
func (r *fakePodsRecorder) Clear() {
40+
atomic.StoreInt64(&r.counter, 0)
41+
}
42+
43+
func TestInc(t *testing.T) {
44+
fakeRecorder := fakePodsRecorder{}
45+
var wg sync.WaitGroup
46+
loops := 100
47+
wg.Add(loops)
48+
for i := 0; i < loops; i++ {
49+
go func() {
50+
fakeRecorder.Inc()
51+
wg.Done()
52+
}()
53+
}
54+
wg.Wait()
55+
if fakeRecorder.counter != int64(loops) {
56+
t.Errorf("Expected %v, got %v", loops, fakeRecorder.counter)
57+
}
58+
}
59+
60+
func TestDec(t *testing.T) {
61+
fakeRecorder := fakePodsRecorder{counter: 100}
62+
var wg sync.WaitGroup
63+
loops := 100
64+
wg.Add(loops)
65+
for i := 0; i < loops; i++ {
66+
go func() {
67+
fakeRecorder.Dec()
68+
wg.Done()
69+
}()
70+
}
71+
wg.Wait()
72+
if fakeRecorder.counter != int64(0) {
73+
t.Errorf("Expected %v, got %v", loops, fakeRecorder.counter)
74+
}
75+
}
76+
77+
func TestClear(t *testing.T) {
78+
fakeRecorder := fakePodsRecorder{}
79+
var wg sync.WaitGroup
80+
incLoops, decLoops := 100, 80
81+
wg.Add(incLoops + decLoops)
82+
for i := 0; i < incLoops; i++ {
83+
go func() {
84+
fakeRecorder.Inc()
85+
wg.Done()
86+
}()
87+
}
88+
for i := 0; i < decLoops; i++ {
89+
go func() {
90+
fakeRecorder.Dec()
91+
wg.Done()
92+
}()
93+
}
94+
wg.Wait()
95+
if fakeRecorder.counter != int64(incLoops-decLoops) {
96+
t.Errorf("Expected %v, got %v", incLoops-decLoops, fakeRecorder.counter)
97+
}
98+
// verify Clear() works
99+
fakeRecorder.Clear()
100+
if fakeRecorder.counter != int64(0) {
101+
t.Errorf("Expected %v, got %v", 0, fakeRecorder.counter)
102+
}
103+
}

pkg/scheduler/metrics/metrics.go

Lines changed: 11 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -192,6 +192,16 @@ var (
192192
Help: "Total preemption attempts in the cluster till now",
193193
})
194194

195+
pendingPods = prometheus.NewGaugeVec(
196+
prometheus.GaugeOpts{
197+
Subsystem: SchedulerSubsystem,
198+
Name: "pending_pods_total",
199+
Help: "Number of pending pods, by the queue type. 'active' means number of pods in activeQ; 'backoff' means number of pods in backoffQ; 'unschedulable' means number of pods in unschedulableQ.",
200+
}, []string{"queue"})
201+
ActivePods = pendingPods.With(prometheus.Labels{"queue": "active"})
202+
BackoffPods = pendingPods.With(prometheus.Labels{"queue": "backoff"})
203+
UnschedulablePods = pendingPods.With(prometheus.Labels{"queue": "unschedulable"})
204+
195205
metricsList = []prometheus.Collector{
196206
scheduleAttempts,
197207
SchedulingLatency,
@@ -210,6 +220,7 @@ var (
210220
DeprecatedSchedulingAlgorithmPremptionEvaluationDuration,
211221
PreemptionVictims,
212222
PreemptionAttempts,
223+
pendingPods,
213224
}
214225
)
215226

pkg/scheduler/util/heap.go

Lines changed: 22 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -25,6 +25,7 @@ import (
2525
"fmt"
2626

2727
"k8s.io/client-go/tools/cache"
28+
"k8s.io/kubernetes/pkg/scheduler/metrics"
2829
)
2930

3031
// KeyFunc is a function type to get the key from an object.
@@ -127,6 +128,9 @@ type Heap struct {
127128
// data stores objects and has a queue that keeps their ordering according
128129
// to the heap invariant.
129130
data *heapData
131+
// metricRecorder updates the counter when elements of a heap get added or
132+
// removed, and it does nothing if it's nil
133+
metricRecorder metrics.MetricRecorder
130134
}
131135

132136
// Add inserts an item, and puts it in the queue. The item is updated if it
@@ -141,6 +145,9 @@ func (h *Heap) Add(obj interface{}) error {
141145
heap.Fix(h.data, h.data.items[key].index)
142146
} else {
143147
heap.Push(h.data, &itemKeyValue{key, obj})
148+
if h.metricRecorder != nil {
149+
h.metricRecorder.Inc()
150+
}
144151
}
145152
return nil
146153
}
@@ -154,6 +161,9 @@ func (h *Heap) AddIfNotPresent(obj interface{}) error {
154161
}
155162
if _, exists := h.data.items[key]; !exists {
156163
heap.Push(h.data, &itemKeyValue{key, obj})
164+
if h.metricRecorder != nil {
165+
h.metricRecorder.Inc()
166+
}
157167
}
158168
return nil
159169
}
@@ -172,6 +182,9 @@ func (h *Heap) Delete(obj interface{}) error {
172182
}
173183
if item, ok := h.data.items[key]; ok {
174184
heap.Remove(h.data, item.index)
185+
if h.metricRecorder != nil {
186+
h.metricRecorder.Dec()
187+
}
175188
return nil
176189
}
177190
return fmt.Errorf("object not found")
@@ -186,6 +199,9 @@ func (h *Heap) Peek() interface{} {
186199
func (h *Heap) Pop() (interface{}, error) {
187200
obj := heap.Pop(h.data)
188201
if obj != nil {
202+
if h.metricRecorder != nil {
203+
h.metricRecorder.Dec()
204+
}
189205
return obj, nil
190206
}
191207
return nil, fmt.Errorf("object was removed from heap data")
@@ -225,12 +241,18 @@ func (h *Heap) Len() int {
225241

226242
// NewHeap returns a Heap which can be used to queue up items to process.
227243
func NewHeap(keyFn KeyFunc, lessFn LessFunc) *Heap {
244+
return NewHeapWithRecorder(keyFn, lessFn, nil)
245+
}
246+
247+
// NewHeapWithRecorder wraps an optional metricRecorder to compose a Heap object.
248+
func NewHeapWithRecorder(keyFn KeyFunc, lessFn LessFunc, metricRecorder metrics.MetricRecorder) *Heap {
228249
return &Heap{
229250
data: &heapData{
230251
items: map[string]*heapItem{},
231252
queue: []string{},
232253
keyFunc: keyFn,
233254
lessFunc: lessFn,
234255
},
256+
metricRecorder: metricRecorder,
235257
}
236258
}

0 commit comments

Comments
 (0)