Skip to content

Commit 30ea0d1

Browse files
authored
Merge pull request kubernetes#130317 from macsko/increment_schedulerqueueincomingpods_metric_when_adding_pods_to_unschedulable_when_gated
Add missing increments of queue_incoming_pods_total metric in scheduling queue
2 parents 9d9e1af + 6975572 commit 30ea0d1

File tree

6 files changed

+98
-51
lines changed

6 files changed

+98
-51
lines changed

pkg/scheduler/backend/queue/active_queue.go

Lines changed: 51 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -61,14 +61,55 @@ type activeQueuer interface {
6161
// underLock() method should be used to protect these methods.
6262
type unlockedActiveQueuer interface {
6363
unlockedActiveQueueReader
64-
AddOrUpdate(pInfo *framework.QueuedPodInfo)
64+
// add adds a new pod to the activeQ.
65+
// The event should show which event triggered this addition and is used for the metric recording.
66+
// This method should be called in activeQueue.underLock().
67+
add(pInfo *framework.QueuedPodInfo, event string)
6568
}
6669

6770
// unlockedActiveQueueReader defines activeQ read-only methods that are not protected by the lock itself.
6871
// underLock() or underRLock() method should be used to protect these methods.
6972
type unlockedActiveQueueReader interface {
70-
Get(pInfo *framework.QueuedPodInfo) (*framework.QueuedPodInfo, bool)
71-
Has(pInfo *framework.QueuedPodInfo) bool
73+
// get returns the pod matching pInfo inside the activeQ.
74+
// Returns false if the pInfo doesn't exist in the queue.
75+
// This method should be called in activeQueue.underLock() or activeQueue.underRLock().
76+
get(pInfo *framework.QueuedPodInfo) (*framework.QueuedPodInfo, bool)
77+
// has returns if pInfo exists in the queue.
78+
// This method should be called in activeQueue.underLock() or activeQueue.underRLock().
79+
has(pInfo *framework.QueuedPodInfo) bool
80+
}
81+
82+
// unlockedActiveQueue defines activeQ methods that are not protected by the lock itself.
83+
// activeQueue.underLock() or activeQueue.underRLock() method should be used to protect these methods.
84+
type unlockedActiveQueue struct {
85+
queue *heap.Heap[*framework.QueuedPodInfo]
86+
}
87+
88+
func newUnlockedActiveQueue(queue *heap.Heap[*framework.QueuedPodInfo]) *unlockedActiveQueue {
89+
return &unlockedActiveQueue{
90+
queue: queue,
91+
}
92+
}
93+
94+
// add adds a new pod to the activeQ.
95+
// The event should show which event triggered this addition and is used for the metric recording.
96+
// This method should be called in activeQueue.underLock().
97+
func (uaq *unlockedActiveQueue) add(pInfo *framework.QueuedPodInfo, event string) {
98+
uaq.queue.AddOrUpdate(pInfo)
99+
metrics.SchedulerQueueIncomingPods.WithLabelValues("active", event).Inc()
100+
}
101+
102+
// get returns the pod matching pInfo inside the activeQ.
103+
// Returns false if the pInfo doesn't exist in the queue.
104+
// This method should be called in activeQueue.underLock() or activeQueue.underRLock().
105+
func (uaq *unlockedActiveQueue) get(pInfo *framework.QueuedPodInfo) (*framework.QueuedPodInfo, bool) {
106+
return uaq.queue.Get(pInfo)
107+
}
108+
109+
// has returns if pInfo exists in the queue.
110+
// This method should be called in activeQueue.underLock() or activeQueue.underRLock().
111+
func (uaq *unlockedActiveQueue) has(pInfo *framework.QueuedPodInfo) bool {
112+
return uaq.queue.Has(pInfo)
72113
}
73114

74115
// activeQueue implements activeQueuer. All of the fields have to be protected using the lock.
@@ -85,6 +126,10 @@ type activeQueue struct {
85126
// schedule. Head of heap is the highest priority pod.
86127
queue *heap.Heap[*framework.QueuedPodInfo]
87128

129+
// unlockedQueue is a wrapper of queue providing methods that are not locked themselves
130+
// and can be used in the underLock() or underRLock().
131+
unlockedQueue *unlockedActiveQueue
132+
88133
// cond is a condition that is notified when the pod is added to activeQ.
89134
// It is used with lock.
90135
cond sync.Cond
@@ -134,6 +179,7 @@ func newActiveQueue(queue *heap.Heap[*framework.QueuedPodInfo], isSchedulingQueu
134179
inFlightEvents: list.New(),
135180
isSchedulingQueueHintEnabled: isSchedulingQueueHintEnabled,
136181
metricsRecorder: metricRecorder,
182+
unlockedQueue: newUnlockedActiveQueue(queue),
137183
}
138184
aq.cond.L = &aq.lock
139185

@@ -146,7 +192,7 @@ func newActiveQueue(queue *heap.Heap[*framework.QueuedPodInfo], isSchedulingQueu
146192
func (aq *activeQueue) underLock(fn func(unlockedActiveQ unlockedActiveQueuer)) {
147193
aq.lock.Lock()
148194
defer aq.lock.Unlock()
149-
fn(aq.queue)
195+
fn(aq.unlockedQueue)
150196
}
151197

152198
// underLock runs the fn function under the lock.RLock.
@@ -155,7 +201,7 @@ func (aq *activeQueue) underLock(fn func(unlockedActiveQ unlockedActiveQueuer))
155201
func (aq *activeQueue) underRLock(fn func(unlockedActiveQ unlockedActiveQueueReader)) {
156202
aq.lock.RLock()
157203
defer aq.lock.RUnlock()
158-
fn(aq.queue)
204+
fn(aq.unlockedQueue)
159205
}
160206

161207
// update updates the pod in activeQ if oldPodInfo is already in the queue.

pkg/scheduler/backend/queue/active_queue_test.go

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -34,8 +34,8 @@ func TestClose(t *testing.T) {
3434
aq := newActiveQueue(heap.NewWithRecorder(podInfoKeyFunc, heap.LessFunc[*framework.QueuedPodInfo](newDefaultQueueSort()), metrics.NewActivePodsRecorder()), true, *rr)
3535

3636
aq.underLock(func(unlockedActiveQ unlockedActiveQueuer) {
37-
unlockedActiveQ.AddOrUpdate(&framework.QueuedPodInfo{PodInfo: &framework.PodInfo{Pod: st.MakePod().Namespace("foo").Name("p1").UID("p1").Obj()}})
38-
unlockedActiveQ.AddOrUpdate(&framework.QueuedPodInfo{PodInfo: &framework.PodInfo{Pod: st.MakePod().Namespace("bar").Name("p2").UID("p2").Obj()}})
37+
unlockedActiveQ.add(&framework.QueuedPodInfo{PodInfo: &framework.PodInfo{Pod: st.MakePod().Namespace("foo").Name("p1").UID("p1").Obj()}}, framework.EventUnscheduledPodAdd.Label())
38+
unlockedActiveQ.add(&framework.QueuedPodInfo{PodInfo: &framework.PodInfo{Pod: st.MakePod().Namespace("bar").Name("p2").UID("p2").Obj()}}, framework.EventUnscheduledPodAdd.Label())
3939
})
4040

4141
_, err := aq.pop(logger)

pkg/scheduler/backend/queue/backoff_queue.go

Lines changed: 8 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -43,8 +43,9 @@ type backoffQueuer interface {
4343
podMaxBackoffDuration() time.Duration
4444

4545
// add adds the pInfo to backoffQueue.
46+
// The event should show which event triggered this addition and is used for the metric recording.
4647
// It also ensures that pInfo is not in both queues.
47-
add(logger klog.Logger, pInfo *framework.QueuedPodInfo)
48+
add(logger klog.Logger, pInfo *framework.QueuedPodInfo, event string)
4849
// update updates the pod in backoffQueue if oldPodInfo is already in the queue.
4950
// It returns new pod info if updated, nil otherwise.
5051
update(newPod *v1.Pod, oldPodInfo *framework.QueuedPodInfo) *framework.QueuedPodInfo
@@ -168,8 +169,9 @@ func (bq *backoffQueue) popEachBackoffCompleted(logger klog.Logger, fn func(pInf
168169
}
169170

170171
// add adds the pInfo to backoffQueue.
172+
// The event should show which event triggered this addition and is used for the metric recording.
171173
// It also ensures that pInfo is not in both queues.
172-
func (bq *backoffQueue) add(logger klog.Logger, pInfo *framework.QueuedPodInfo) {
174+
func (bq *backoffQueue) add(logger klog.Logger, pInfo *framework.QueuedPodInfo, event string) {
173175
// If pod has empty both unschedulable plugins and pending plugins,
174176
// it means that it failed because of error and should be moved to podErrorBackoffQ.
175177
if pInfo.UnschedulablePlugins.Len() == 0 && pInfo.PendingPlugins.Len() == 0 {
@@ -178,15 +180,19 @@ func (bq *backoffQueue) add(logger klog.Logger, pInfo *framework.QueuedPodInfo)
178180
err := bq.podBackoffQ.Delete(pInfo)
179181
if err == nil {
180182
logger.Error(nil, "BackoffQueue add() was called with a pod that was already in the podBackoffQ", "pod", klog.KObj(pInfo.Pod))
183+
return
181184
}
185+
metrics.SchedulerQueueIncomingPods.WithLabelValues("backoff", event).Inc()
182186
return
183187
}
184188
bq.podBackoffQ.AddOrUpdate(pInfo)
185189
// Ensure the pod is not in the podErrorBackoffQ and report the error if it happens.
186190
err := bq.podErrorBackoffQ.Delete(pInfo)
187191
if err == nil {
188192
logger.Error(nil, "BackoffQueue add() was called with a pod that was already in the podErrorBackoffQ", "pod", klog.KObj(pInfo.Pod))
193+
return
189194
}
195+
metrics.SchedulerQueueIncomingPods.WithLabelValues("backoff", event).Inc()
190196
}
191197

192198
// update updates the pod in backoffQueue if oldPodInfo is already in the queue.

pkg/scheduler/backend/queue/backoff_queue_test.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -152,7 +152,7 @@ func TestBackoffQueue_popEachBackoffCompleted(t *testing.T) {
152152
logger, _ := ktesting.NewTestContext(t)
153153
bq := newBackoffQueue(fakeClock, DefaultPodInitialBackoffDuration, DefaultPodMaxBackoffDuration)
154154
for _, podName := range tt.podsInBackoff {
155-
bq.add(logger, podInfos[podName])
155+
bq.add(logger, podInfos[podName], framework.EventUnscheduledPodAdd.Label())
156156
}
157157
var gotPods []string
158158
bq.popEachBackoffCompleted(logger, func(pInfo *framework.QueuedPodInfo) {

pkg/scheduler/backend/queue/scheduling_queue.go

Lines changed: 19 additions & 24 deletions
Original file line numberDiff line numberDiff line change
@@ -557,27 +557,30 @@ func (p *PriorityQueue) moveToActiveQ(logger klog.Logger, pInfo *framework.Queue
557557
p.activeQ.underLock(func(unlockedActiveQ unlockedActiveQueuer) {
558558
if pInfo.Gated {
559559
// Add the Pod to unschedulablePods if it's not passing PreEnqueuePlugins.
560-
if unlockedActiveQ.Has(pInfo) {
560+
if unlockedActiveQ.has(pInfo) {
561561
return
562562
}
563563
if p.backoffQ.has(pInfo) {
564564
return
565565
}
566-
p.unschedulablePods.addOrUpdate(pInfo)
566+
if p.unschedulablePods.get(pInfo.Pod) != nil {
567+
return
568+
}
569+
p.unschedulablePods.addOrUpdate(pInfo, event)
570+
logger.V(5).Info("Pod moved to an internal scheduling queue, because the pod is gated", "pod", klog.KObj(pInfo.Pod), "event", event, "queue", unschedulablePods)
567571
return
568572
}
569573
if pInfo.InitialAttemptTimestamp == nil {
570574
now := p.clock.Now()
571575
pInfo.InitialAttemptTimestamp = &now
572576
}
573577

574-
unlockedActiveQ.AddOrUpdate(pInfo)
578+
unlockedActiveQ.add(pInfo, event)
575579
added = true
576580

577581
p.unschedulablePods.delete(pInfo.Pod, gatedBefore)
578582
p.backoffQ.delete(pInfo)
579583
logger.V(5).Info("Pod moved to an internal scheduling queue", "pod", klog.KObj(pInfo.Pod), "event", event, "queue", activeQ)
580-
metrics.SchedulerQueueIncomingPods.WithLabelValues("active", event).Inc()
581584
if event == framework.EventUnscheduledPodAdd.Label() || event == framework.EventUnscheduledPodUpdate.Label() {
582585
p.AddNominatedPod(logger, pInfo.PodInfo, nil)
583586
}
@@ -721,13 +724,11 @@ func (p *PriorityQueue) addUnschedulableWithoutQueueingHint(logger klog.Logger,
721724
// - No unschedulable plugins are associated with this Pod,
722725
// meaning something unusual (a temporal failure on kube-apiserver, etc) happened and this Pod gets moved back to the queue.
723726
// In this case, we should retry scheduling it because this Pod may not be retried until the next flush.
724-
p.backoffQ.add(logger, pInfo)
727+
p.backoffQ.add(logger, pInfo, framework.ScheduleAttemptFailure)
725728
logger.V(5).Info("Pod moved to an internal scheduling queue", "pod", klog.KObj(pod), "event", framework.ScheduleAttemptFailure, "queue", backoffQ)
726-
metrics.SchedulerQueueIncomingPods.WithLabelValues("backoff", framework.ScheduleAttemptFailure).Inc()
727729
} else {
728-
p.unschedulablePods.addOrUpdate(pInfo)
730+
p.unschedulablePods.addOrUpdate(pInfo, framework.ScheduleAttemptFailure)
729731
logger.V(5).Info("Pod moved to an internal scheduling queue", "pod", klog.KObj(pod), "event", framework.ScheduleAttemptFailure, "queue", unschedulablePods)
730-
metrics.SchedulerQueueIncomingPods.WithLabelValues("unschedulable", framework.ScheduleAttemptFailure).Inc()
731732
}
732733

733734
return nil
@@ -933,7 +934,7 @@ func (p *PriorityQueue) Update(logger klog.Logger, oldPod, newPod *v1.Pod) {
933934
// Pod might have completed its backoff time while being in unschedulablePods,
934935
// so we should check isPodBackingoff before moving the pod to backoffQ.
935936
if p.backoffQ.isPodBackingoff(pInfo) {
936-
p.backoffQ.add(logger, pInfo)
937+
p.backoffQ.add(logger, pInfo, framework.EventUnscheduledPodUpdate.Label())
937938
p.unschedulablePods.delete(pInfo.Pod, gated)
938939
logger.V(5).Info("Pod moved to an internal scheduling queue", "pod", klog.KObj(pInfo.Pod), "event", framework.EventUnscheduledPodUpdate.Label(), "queue", backoffQ)
939940
return
@@ -946,7 +947,7 @@ func (p *PriorityQueue) Update(logger klog.Logger, oldPod, newPod *v1.Pod) {
946947
}
947948

948949
// Pod update didn't make it schedulable, keep it in the unschedulable queue.
949-
p.unschedulablePods.addOrUpdate(pInfo)
950+
p.unschedulablePods.addOrUpdate(pInfo, framework.EventUnscheduledPodUpdate.Label())
950951
return
951952
}
952953
// If pod is not in any of the queues, we put it in the active queue.
@@ -1036,30 +1037,22 @@ func (p *PriorityQueue) MoveAllToActiveOrBackoffQueue(logger klog.Logger, event
10361037
// NOTE: this function assumes lock has been acquired in caller
10371038
func (p *PriorityQueue) requeuePodViaQueueingHint(logger klog.Logger, pInfo *framework.QueuedPodInfo, strategy queueingStrategy, event string) string {
10381039
if strategy == queueSkip {
1039-
p.unschedulablePods.addOrUpdate(pInfo)
1040-
metrics.SchedulerQueueIncomingPods.WithLabelValues("unschedulable", event).Inc()
1040+
p.unschedulablePods.addOrUpdate(pInfo, event)
10411041
return unschedulablePods
10421042
}
10431043

10441044
// Pod might have completed its backoff time while being in unschedulablePods,
10451045
// so we should check isPodBackingoff before moving the pod to backoffQ.
10461046
if strategy == queueAfterBackoff && p.backoffQ.isPodBackingoff(pInfo) {
1047-
p.backoffQ.add(logger, pInfo)
1048-
metrics.SchedulerQueueIncomingPods.WithLabelValues("backoff", event).Inc()
1047+
p.backoffQ.add(logger, pInfo, event)
10491048
return backoffQ
10501049
}
10511050

10521051
// Reach here if schedulingHint is QueueImmediately, or schedulingHint is Queue but the pod is not backing off.
10531052
if added := p.moveToActiveQ(logger, pInfo, event); added {
10541053
return activeQ
10551054
}
1056-
if pInfo.Gated {
1057-
// In case the pod is gated, the Pod is pushed back to unschedulable Pods pool in moveToActiveQ.
1058-
return unschedulablePods
1059-
}
1060-
1061-
p.unschedulablePods.addOrUpdate(pInfo)
1062-
metrics.SchedulerQueueIncomingPods.WithLabelValues("unschedulable", framework.ScheduleAttemptFailure).Inc()
1055+
// Pod is gated. We don't have to push it back to unschedulable queue, because moveToActiveQ should already have done that.
10631056
return unschedulablePods
10641057
}
10651058

@@ -1178,7 +1171,7 @@ func (p *PriorityQueue) GetPod(name, namespace string) (pInfo *framework.QueuedP
11781171
}
11791172

11801173
p.activeQ.underRLock(func(unlockedActiveQ unlockedActiveQueueReader) {
1181-
pInfo, ok = unlockedActiveQ.Get(pInfoLookup)
1174+
pInfo, ok = unlockedActiveQ.get(pInfoLookup)
11821175
})
11831176
return
11841177
}
@@ -1205,7 +1198,7 @@ func (p *PriorityQueue) nominatedPodToInfo(np podRef, unlockedActiveQ unlockedAc
12051198
pod := np.toPod()
12061199
pInfoLookup := newQueuedPodInfoForLookup(pod)
12071200

1208-
queuedPodInfo, exists := unlockedActiveQ.Get(pInfoLookup)
1201+
queuedPodInfo, exists := unlockedActiveQ.get(pInfoLookup)
12091202
if exists {
12101203
return queuedPodInfo.PodInfo
12111204
}
@@ -1275,14 +1268,16 @@ type UnschedulablePods struct {
12751268
}
12761269

12771270
// addOrUpdate adds a pod to the unschedulable podInfoMap.
1278-
func (u *UnschedulablePods) addOrUpdate(pInfo *framework.QueuedPodInfo) {
1271+
// The event should show which event triggered the addition and is used for the metric recording.
1272+
func (u *UnschedulablePods) addOrUpdate(pInfo *framework.QueuedPodInfo, event string) {
12791273
podID := u.keyFunc(pInfo.Pod)
12801274
if _, exists := u.podInfoMap[podID]; !exists {
12811275
if pInfo.Gated && u.gatedRecorder != nil {
12821276
u.gatedRecorder.Inc()
12831277
} else if !pInfo.Gated && u.unschedulableRecorder != nil {
12841278
u.unschedulableRecorder.Inc()
12851279
}
1280+
metrics.SchedulerQueueIncomingPods.WithLabelValues("unschedulable", event).Inc()
12861281
}
12871282
u.podInfoMap[podID] = pInfo
12881283
}

0 commit comments

Comments
 (0)