Skip to content

Commit 83f9e4b

Browse files
committed
cleanup: remove event list
1 parent b1b4e5d commit 83f9e4b

File tree

15 files changed

+529
-483
lines changed

15 files changed

+529
-483
lines changed

pkg/scheduler/backend/queue/active_queue.go

Lines changed: 13 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -48,7 +48,7 @@ type activeQueuer interface {
4848
listInFlightEvents() []interface{}
4949
listInFlightPods() []*v1.Pod
5050
clusterEventsForPod(logger klog.Logger, pInfo *framework.QueuedPodInfo) ([]*clusterEvent, error)
51-
addEventIfPodInFlight(oldPod, newPod *v1.Pod, event framework.ClusterEvent) bool
51+
addEventsIfPodInFlight(oldPod, newPod *v1.Pod, events []framework.ClusterEvent) bool
5252
addEventIfAnyInFlight(oldObj, newObj interface{}, event framework.ClusterEvent) bool
5353

5454
schedulingCycle() int64
@@ -304,20 +304,22 @@ func (aq *activeQueue) clusterEventsForPod(logger klog.Logger, pInfo *framework.
304304
return events, nil
305305
}
306306

307-
// addEventIfPodInFlight adds clusterEvent to inFlightEvents if the newPod is in inFlightPods.
307+
// addEventsIfPodInFlight adds clusterEvent to inFlightEvents if the newPod is in inFlightPods.
308308
// It returns true if pushed the event to the inFlightEvents.
309-
func (aq *activeQueue) addEventIfPodInFlight(oldPod, newPod *v1.Pod, event framework.ClusterEvent) bool {
309+
func (aq *activeQueue) addEventsIfPodInFlight(oldPod, newPod *v1.Pod, events []framework.ClusterEvent) bool {
310310
aq.lock.Lock()
311311
defer aq.lock.Unlock()
312312

313313
_, ok := aq.inFlightPods[newPod.UID]
314314
if ok {
315-
aq.metricsRecorder.ObserveInFlightEventsAsync(event.Label, 1, false)
316-
aq.inFlightEvents.PushBack(&clusterEvent{
317-
event: event,
318-
oldObj: oldPod,
319-
newObj: newPod,
320-
})
315+
for _, event := range events {
316+
aq.metricsRecorder.ObserveInFlightEventsAsync(event.Label(), 1, false)
317+
aq.inFlightEvents.PushBack(&clusterEvent{
318+
event: event,
319+
oldObj: oldPod,
320+
newObj: newPod,
321+
})
322+
}
321323
}
322324
return ok
323325
}
@@ -329,7 +331,7 @@ func (aq *activeQueue) addEventIfAnyInFlight(oldObj, newObj interface{}, event f
329331
defer aq.lock.Unlock()
330332

331333
if len(aq.inFlightPods) != 0 {
332-
aq.metricsRecorder.ObserveInFlightEventsAsync(event.Label, 1, false)
334+
aq.metricsRecorder.ObserveInFlightEventsAsync(event.Label(), 1, false)
333335
aq.inFlightEvents.PushBack(&clusterEvent{
334336
event: event,
335337
oldObj: oldObj,
@@ -380,7 +382,7 @@ func (aq *activeQueue) done(pod types.UID) {
380382
break
381383
}
382384
aq.inFlightEvents.Remove(e)
383-
aggrMetricsCounter[ev.event.Label]--
385+
aggrMetricsCounter[ev.event.Label()]--
384386
}
385387

386388
for evLabel, count := range aggrMetricsCounter {

pkg/scheduler/backend/queue/active_queue_test.go

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -46,8 +46,8 @@ func TestClose(t *testing.T) {
4646
if err != nil {
4747
t.Fatalf("unexpected error while pop(): %v", err)
4848
}
49-
aq.addEventIfAnyInFlight(nil, nil, framework.NodeAdd)
50-
aq.addEventIfAnyInFlight(nil, nil, framework.NodeConditionChange)
49+
aq.addEventIfAnyInFlight(nil, nil, nodeAdd)
50+
aq.addEventIfAnyInFlight(nil, nil, csiNodeUpdate)
5151

5252
if len(aq.listInFlightEvents()) != 4 {
5353
t.Fatalf("unexpected number of in-flight events: %v", len(aq.listInFlightEvents()))

pkg/scheduler/backend/queue/scheduling_queue.go

Lines changed: 19 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -450,7 +450,7 @@ func (p *PriorityQueue) isPodWorthRequeuing(logger klog.Logger, pInfo *framework
450450
}
451451
hint = framework.Queue
452452
}
453-
p.metricsRecorder.ObserveQueueingHintDurationAsync(hintfn.PluginName, event.Label, queueingHintToLabel(hint, err), metrics.SinceInSeconds(start))
453+
p.metricsRecorder.ObserveQueueingHintDurationAsync(hintfn.PluginName, event.Label(), queueingHintToLabel(hint, err), metrics.SinceInSeconds(start))
454454

455455
if hint == framework.QueueSkip {
456456
continue
@@ -571,7 +571,7 @@ func (p *PriorityQueue) moveToActiveQ(logger klog.Logger, pInfo *framework.Queue
571571
_ = p.podBackoffQ.Delete(pInfo) // Don't need to react when pInfo is not found.
572572
logger.V(5).Info("Pod moved to an internal scheduling queue", "pod", klog.KObj(pInfo.Pod), "event", event, "queue", activeQ)
573573
metrics.SchedulerQueueIncomingPods.WithLabelValues("active", event).Inc()
574-
if event == framework.PodAdd || event == framework.PodUpdate {
574+
if event == framework.EventUnscheduledPodAdd.Label() || event == framework.EventUnscheduledPodUpdate.Label() {
575575
p.AddNominatedPod(logger, pInfo.PodInfo, nil)
576576
}
577577
})
@@ -585,7 +585,7 @@ func (p *PriorityQueue) Add(logger klog.Logger, pod *v1.Pod) {
585585
defer p.lock.Unlock()
586586

587587
pInfo := p.newQueuedPodInfo(pod)
588-
if added := p.moveToActiveQ(logger, pInfo, framework.PodAdd); added {
588+
if added := p.moveToActiveQ(logger, pInfo, framework.EventUnscheduledPodAdd.Label()); added {
589589
p.activeQ.broadcast()
590590
}
591591
}
@@ -660,7 +660,7 @@ func (p *PriorityQueue) determineSchedulingHintForInFlightPod(logger klog.Logger
660660
// check if there is an event that makes this Pod schedulable based on pInfo.UnschedulablePlugins.
661661
queueingStrategy := queueSkip
662662
for _, e := range events {
663-
logger.V(5).Info("Checking event for in-flight pod", "pod", klog.KObj(pInfo.Pod), "event", e.event.Label)
663+
logger.V(5).Info("Checking event for in-flight pod", "pod", klog.KObj(pInfo.Pod), "event", e.event.Label())
664664

665665
switch p.isPodWorthRequeuing(logger, pInfo, e.event, e.oldObj, e.newObj) {
666666
case queueSkip:
@@ -818,7 +818,7 @@ func (p *PriorityQueue) flushUnschedulablePodsLeftover(logger klog.Logger) {
818818
}
819819

820820
if len(podsToMove) > 0 {
821-
p.movePodsToActiveOrBackoffQueue(logger, podsToMove, framework.UnschedulableTimeout, nil, nil)
821+
p.movePodsToActiveOrBackoffQueue(logger, podsToMove, framework.EventUnschedulableTimeout, nil, nil)
822822
}
823823
}
824824

@@ -878,13 +878,15 @@ func (p *PriorityQueue) Update(logger klog.Logger, oldPod, newPod *v1.Pod) {
878878
p.lock.Lock()
879879
defer p.lock.Unlock()
880880

881+
var events []framework.ClusterEvent
881882
if p.isSchedulingQueueHintEnabled {
883+
events = framework.PodSchedulingPropertiesChange(newPod, oldPod)
882884
// The inflight pod will be requeued using the latest version from the informer cache, which matches what the event delivers.
883-
// Record this update as Pod/Update because
885+
// Record this Pod update because
884886
// this update may make the Pod schedulable in case it gets rejected and comes back to the queue.
885887
// We can clean it up once we change updatePodInSchedulingQueue to call MoveAllToActiveOrBackoffQueue.
886888
// See https://github.com/kubernetes/kubernetes/pull/125578#discussion_r1648338033 for more context.
887-
if exists := p.activeQ.addEventIfPodInFlight(oldPod, newPod, framework.UnscheduledPodUpdate); exists {
889+
if exists := p.activeQ.addEventsIfPodInFlight(oldPod, newPod, events); exists {
888890
logger.V(6).Info("The pod doesn't be queued for now because it's being scheduled and will be queued back if necessary", "pod", klog.KObj(newPod))
889891
return
890892
}
@@ -917,12 +919,11 @@ func (p *PriorityQueue) Update(logger klog.Logger, oldPod, newPod *v1.Pod) {
917919
// whether the update may make the pods schedulable.
918920
// Plugins have to implement a QueueingHint for Pod/Update event
919921
// if the rejection from them could be resolved by updating unscheduled Pods itself.
920-
events := framework.PodSchedulingPropertiesChange(newPod, oldPod)
921922
for _, evt := range events {
922923
hint := p.isPodWorthRequeuing(logger, pInfo, evt, oldPod, newPod)
923-
queue := p.requeuePodViaQueueingHint(logger, pInfo, hint, evt.Label)
924+
queue := p.requeuePodViaQueueingHint(logger, pInfo, hint, evt.Label())
924925
if queue != unschedulablePods {
925-
logger.V(5).Info("Pod moved to an internal scheduling queue because the Pod is updated", "pod", klog.KObj(newPod), "event", evt.Label, "queue", queue)
926+
logger.V(5).Info("Pod moved to an internal scheduling queue because the Pod is updated", "pod", klog.KObj(newPod), "event", evt.Label(), "queue", queue)
926927
p.unschedulablePods.delete(pInfo.Pod, gated)
927928
}
928929
if queue == activeQ {
@@ -936,7 +937,7 @@ func (p *PriorityQueue) Update(logger klog.Logger, oldPod, newPod *v1.Pod) {
936937
if p.isPodBackingoff(pInfo) {
937938
p.podBackoffQ.AddOrUpdate(pInfo)
938939
p.unschedulablePods.delete(pInfo.Pod, gated)
939-
logger.V(5).Info("Pod moved to an internal scheduling queue", "pod", klog.KObj(pInfo.Pod), "event", framework.PodUpdate, "queue", backoffQ)
940+
logger.V(5).Info("Pod moved to an internal scheduling queue", "pod", klog.KObj(pInfo.Pod), "event", framework.EventUnscheduledPodUpdate.Label(), "queue", backoffQ)
940941
return
941942
}
942943

@@ -952,7 +953,7 @@ func (p *PriorityQueue) Update(logger klog.Logger, oldPod, newPod *v1.Pod) {
952953
}
953954
// If pod is not in any of the queues, we put it in the active queue.
954955
pInfo := p.newQueuedPodInfo(newPod)
955-
if added := p.moveToActiveQ(logger, pInfo, framework.PodUpdate); added {
956+
if added := p.moveToActiveQ(logger, pInfo, framework.EventUnscheduledPodUpdate.Label()); added {
956957
p.activeQ.broadcast()
957958
}
958959
}
@@ -980,7 +981,7 @@ func (p *PriorityQueue) AssignedPodAdded(logger klog.Logger, pod *v1.Pod) {
980981

981982
// Pre-filter Pods to move by getUnschedulablePodsWithCrossTopologyTerm
982983
// because Pod related events shouldn't make Pods that rejected by single-node scheduling requirement schedulable.
983-
p.movePodsToActiveOrBackoffQueue(logger, p.getUnschedulablePodsWithCrossTopologyTerm(logger, pod), framework.AssignedPodAdd, nil, pod)
984+
p.movePodsToActiveOrBackoffQueue(logger, p.getUnschedulablePodsWithCrossTopologyTerm(logger, pod), framework.EventAssignedPodAdd, nil, pod)
984985
p.lock.Unlock()
985986
}
986987

@@ -991,7 +992,7 @@ func (p *PriorityQueue) AssignedPodUpdated(logger klog.Logger, oldPod, newPod *v
991992
if event.Resource == framework.Pod && event.ActionType&framework.UpdatePodScaleDown != 0 {
992993
// In this case, we don't want to pre-filter Pods by getUnschedulablePodsWithCrossTopologyTerm
993994
// because Pod related events may make Pods that were rejected by NodeResourceFit schedulable.
994-
p.moveAllToActiveOrBackoffQueue(logger, framework.AssignedPodUpdate, oldPod, newPod, nil)
995+
p.moveAllToActiveOrBackoffQueue(logger, framework.EventAssignedPodUpdate, oldPod, newPod, nil)
995996
} else {
996997
// Pre-filter Pods to move by getUnschedulablePodsWithCrossTopologyTerm
997998
// because Pod related events only make Pods rejected by cross topology term schedulable.
@@ -1093,13 +1094,13 @@ func (p *PriorityQueue) movePodsToActiveOrBackoffQueue(logger klog.Logger, podIn
10931094
schedulingHint := p.isPodWorthRequeuing(logger, pInfo, event, oldObj, newObj)
10941095
if schedulingHint == queueSkip {
10951096
// QueueingHintFn determined that this Pod isn't worth putting to activeQ or backoffQ by this event.
1096-
logger.V(5).Info("Event is not making pod schedulable", "pod", klog.KObj(pInfo.Pod), "event", event.Label)
1097+
logger.V(5).Info("Event is not making pod schedulable", "pod", klog.KObj(pInfo.Pod), "event", event.Label())
10971098
continue
10981099
}
10991100

11001101
p.unschedulablePods.delete(pInfo.Pod, pInfo.Gated)
1101-
queue := p.requeuePodViaQueueingHint(logger, pInfo, schedulingHint, event.Label)
1102-
logger.V(4).Info("Pod moved to an internal scheduling queue", "pod", klog.KObj(pInfo.Pod), "event", event.Label, "queue", queue, "hint", schedulingHint)
1102+
queue := p.requeuePodViaQueueingHint(logger, pInfo, schedulingHint, event.Label())
1103+
logger.V(4).Info("Pod moved to an internal scheduling queue", "pod", klog.KObj(pInfo.Pod), "event", event.Label(), "queue", queue, "hint", schedulingHint)
11031104
if queue == activeQ {
11041105
activated = true
11051106
}
@@ -1112,7 +1113,7 @@ func (p *PriorityQueue) movePodsToActiveOrBackoffQueue(logger klog.Logger, podIn
11121113
// AddUnschedulableIfNotPresent we need to know whether events were
11131114
// observed while scheduling them.
11141115
if added := p.activeQ.addEventIfAnyInFlight(oldObj, newObj, event); added {
1115-
logger.V(5).Info("Event received while pods are in flight", "event", event.Label)
1116+
logger.V(5).Info("Event received while pods are in flight", "event", event.Label())
11161117
}
11171118
}
11181119

0 commit comments

Comments
 (0)