Skip to content

Commit a2b3a4f

Browse files
committed
chore: ensure the scheduler handles events before checking the pod position
1 parent 6df3b28 commit a2b3a4f

File tree

3 files changed

+164
-137
lines changed

3 files changed

+164
-137
lines changed

pkg/scheduler/backend/queue/scheduling_queue.go

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -920,9 +920,9 @@ func (p *PriorityQueue) Update(logger klog.Logger, oldPod, newPod *v1.Pod) {
920920
events := framework.PodSchedulingPropertiesChange(newPod, oldPod)
921921
for _, evt := range events {
922922
hint := p.isPodWorthRequeuing(logger, pInfo, evt, oldPod, newPod)
923-
queue := p.requeuePodViaQueueingHint(logger, pInfo, hint, framework.UnscheduledPodUpdate.Label)
923+
queue := p.requeuePodViaQueueingHint(logger, pInfo, hint, evt.Label)
924924
if queue != unschedulablePods {
925-
logger.V(5).Info("Pod moved to an internal scheduling queue because the Pod is updated", "pod", klog.KObj(newPod), "event", framework.PodUpdate, "queue", queue)
925+
logger.V(5).Info("Pod moved to an internal scheduling queue because the Pod is updated", "pod", klog.KObj(newPod), "event", evt.Label, "queue", queue)
926926
p.unschedulablePods.delete(pInfo.Pod, gated)
927927
}
928928
if queue == activeQ {

pkg/scheduler/eventhandlers.go

Lines changed: 13 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -134,6 +134,7 @@ func (sched *Scheduler) addPodToSchedulingQueue(obj interface{}) {
134134
func (sched *Scheduler) updatePodInSchedulingQueue(oldObj, newObj interface{}) {
135135
start := time.Now()
136136
defer metrics.EventHandlingLatency.WithLabelValues(framework.UnscheduledPodUpdate.Label).Observe(metrics.SinceInSeconds(start))
137+
137138
logger := sched.logger
138139
oldPod, newPod := oldObj.(*v1.Pod), newObj.(*v1.Pod)
139140
// Bypass update event that carries identical objects; otherwise, a duplicated
@@ -142,6 +143,10 @@ func (sched *Scheduler) updatePodInSchedulingQueue(oldObj, newObj interface{}) {
142143
return
143144
}
144145

146+
for _, evt := range framework.PodSchedulingPropertiesChange(newPod, oldPod) {
147+
defer metrics.EventHandlingLatency.WithLabelValues(evt.Label).Observe(metrics.SinceInSeconds(start))
148+
}
149+
145150
isAssumed, err := sched.Cache.IsAssumedPod(newPod)
146151
if err != nil {
147152
utilruntime.HandleError(fmt.Errorf("failed to check whether pod %s/%s is assumed: %v", newPod.Namespace, newPod.Name, err))
@@ -246,7 +251,12 @@ func (sched *Scheduler) updatePodInCache(oldObj, newObj interface{}) {
246251
}
247252

248253
events := framework.PodSchedulingPropertiesChange(newPod, oldPod)
254+
255+
// Save the time it takes to update the pod in the cache.
256+
updatingDuration := metrics.SinceInSeconds(start)
257+
249258
for _, evt := range events {
259+
startMoving := time.Now()
250260
// SchedulingQueue.AssignedPodUpdated has a problem:
251261
// It internally pre-filters Pods to move to activeQ,
252262
// while taking only in-tree plugins into consideration.
@@ -257,10 +267,12 @@ func (sched *Scheduler) updatePodInCache(oldObj, newObj interface{}) {
257267
// Here we use MoveAllToActiveOrBackoffQueue only when QueueingHint is enabled.
258268
// (We cannot switch to MoveAllToActiveOrBackoffQueue right away because of throughput concern.)
259269
if utilfeature.DefaultFeatureGate.Enabled(features.SchedulerQueueingHints) {
260-
sched.SchedulingQueue.MoveAllToActiveOrBackoffQueue(logger, framework.AssignedPodUpdate, oldPod, newPod, nil)
270+
sched.SchedulingQueue.MoveAllToActiveOrBackoffQueue(logger, evt, oldPod, newPod, nil)
261271
} else {
262272
sched.SchedulingQueue.AssignedPodUpdated(logger, oldPod, newPod, evt)
263273
}
274+
movingDuration := metrics.SinceInSeconds(startMoving)
275+
metrics.EventHandlingLatency.WithLabelValues(evt.Label).Observe(updatingDuration + movingDuration)
264276
}
265277
}
266278

0 commit comments

Comments
 (0)