Skip to content

Commit 08bd75c

Browse files
authored
Merge pull request kubernetes#126680 from macsko/move_activeq_related_fields_to_separate_struct_scheduling_queue
Move activeQ related fields to separate struct in scheduling queue
2 parents e54c8ef + 8e630a9 commit 08bd75c

File tree

3 files changed

+465
-312
lines changed

3 files changed

+465
-312
lines changed
Lines changed: 337 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,337 @@
1+
/*
2+
Copyright 2024 The Kubernetes Authors.
3+
4+
Licensed under the Apache License, Version 2.0 (the "License");
5+
you may not use this file except in compliance with the License.
6+
You may obtain a copy of the License at
7+
8+
http://www.apache.org/licenses/LICENSE-2.0
9+
10+
Unless required by applicable law or agreed to in writing, software
11+
distributed under the License is distributed on an "AS IS" BASIS,
12+
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
See the License for the specific language governing permissions and
14+
limitations under the License.
15+
*/
16+
17+
package queue
18+
19+
import (
20+
"container/list"
21+
"fmt"
22+
"sync"
23+
24+
v1 "k8s.io/api/core/v1"
25+
"k8s.io/apimachinery/pkg/types"
26+
"k8s.io/klog/v2"
27+
"k8s.io/kubernetes/pkg/scheduler/framework"
28+
"k8s.io/kubernetes/pkg/scheduler/internal/heap"
29+
"k8s.io/kubernetes/pkg/scheduler/metrics"
30+
)
31+
32+
// activeQueuer is a wrapper for activeQ related operations.
33+
// Its methods, except "unlocked" ones, take the lock inside.
34+
// Note: be careful when using unlocked() methods.
35+
// getLock() methods should be used only for unlocked() methods
36+
// and it is forbidden to call any other activeQueuer's method under this lock.
37+
type activeQueuer interface {
38+
getLock() *sync.RWMutex
39+
unlocked() unlockedActiveQueuer
40+
41+
pop(logger klog.Logger) (*framework.QueuedPodInfo, error)
42+
list() []*v1.Pod
43+
len() int
44+
has(pInfo *framework.QueuedPodInfo) bool
45+
46+
listInFlightEvents() []interface{}
47+
listInFlightPods() []*v1.Pod
48+
clusterEventsForPod(logger klog.Logger, pInfo *framework.QueuedPodInfo) ([]*clusterEvent, error)
49+
addEventIfPodInFlight(oldPod, newPod *v1.Pod, event framework.ClusterEvent) bool
50+
addEventIfAnyInFlight(oldObj, newObj interface{}, event framework.ClusterEvent) bool
51+
52+
schedulingCycle() int64
53+
done(pod types.UID)
54+
close()
55+
broadcast()
56+
}
57+
58+
// unlockedActiveQueuer defines activeQ methods that are not protected by the lock itself.
59+
// getLock() methods should be used to protect these methods.
60+
type unlockedActiveQueuer interface {
61+
Get(pInfo *framework.QueuedPodInfo) (*framework.QueuedPodInfo, bool)
62+
Has(pInfo *framework.QueuedPodInfo) bool
63+
AddOrUpdate(pInfo *framework.QueuedPodInfo)
64+
Delete(pInfo *framework.QueuedPodInfo) error
65+
}
66+
67+
// activeQueue implements activeQueuer. All of the fields have to be protected using the lock.
68+
type activeQueue struct {
69+
// lock synchronizes all operations related to activeQ.
70+
// It protects activeQ, inFlightPods, inFlightEvents, schedulingCycle and closed fields.
71+
// Caution: DO NOT take "SchedulingQueue.lock" after taking "lock".
72+
// You should always take "SchedulingQueue.lock" first, otherwise the queue could end up in deadlock.
73+
// "lock" should not be taken after taking "nLock".
74+
// Correct locking order is: SchedulingQueue.lock > lock > nominator.nLock.
75+
lock sync.RWMutex
76+
77+
// activeQ is heap structure that scheduler actively looks at to find pods to
78+
// schedule. Head of heap is the highest priority pod.
79+
queue *heap.Heap[*framework.QueuedPodInfo]
80+
81+
// cond is a condition that is notified when the pod is added to activeQ.
82+
// It is used with lock.
83+
cond sync.Cond
84+
85+
// inFlightPods holds the UID of all pods which have been popped out for which Done
86+
// hasn't been called yet - in other words, all pods that are currently being
87+
// processed (being scheduled, in permit, or in the binding cycle).
88+
//
89+
// The values in the map are the entry of each pod in the inFlightEvents list.
90+
// The value of that entry is the *v1.Pod at the time that scheduling of that
91+
// pod started, which can be useful for logging or debugging.
92+
inFlightPods map[types.UID]*list.Element
93+
94+
// inFlightEvents holds the events received by the scheduling queue
95+
// (entry value is clusterEvent) together with in-flight pods (entry
96+
// value is *v1.Pod). Entries get added at the end while the mutex is
97+
// locked, so they get serialized.
98+
//
99+
// The pod entries are added in Pop and used to track which events
100+
// occurred after the pod scheduling attempt for that pod started.
101+
// They get removed when the scheduling attempt is done, at which
102+
// point all events that occurred in the meantime are processed.
103+
//
104+
// After removal of a pod, events at the start of the list are no
105+
// longer needed because all of the other in-flight pods started
106+
// later. Those events can be removed.
107+
inFlightEvents *list.List
108+
109+
// schedCycle represents sequence number of scheduling cycle and is incremented
110+
// when a pod is popped.
111+
schedCycle int64
112+
113+
// closed indicates that the queue is closed.
114+
// It is mainly used to let Pop() exit its control loop while waiting for an item.
115+
closed bool
116+
117+
// isSchedulingQueueHintEnabled indicates whether the feature gate for the scheduling queue is enabled.
118+
isSchedulingQueueHintEnabled bool
119+
}
120+
121+
func newActiveQueue(queue *heap.Heap[*framework.QueuedPodInfo], isSchedulingQueueHintEnabled bool) *activeQueue {
122+
aq := &activeQueue{
123+
queue: queue,
124+
inFlightPods: make(map[types.UID]*list.Element),
125+
inFlightEvents: list.New(),
126+
isSchedulingQueueHintEnabled: isSchedulingQueueHintEnabled,
127+
}
128+
aq.cond.L = &aq.lock
129+
130+
return aq
131+
}
132+
133+
// getLock returns lock of activeQueue. Its methods should be used only to protect the unlocked() methods.
134+
func (aq *activeQueue) getLock() *sync.RWMutex {
135+
return &aq.lock
136+
}
137+
138+
// unlocked returns queue methods, that are not protected by the lock itself.
139+
// getLock() methods should be used to protect queue methods.
140+
func (aq *activeQueue) unlocked() unlockedActiveQueuer {
141+
return aq.queue
142+
}
143+
144+
// pop removes the head of the queue and returns it.
145+
// It blocks if the queue is empty and waits until a new item is added to the queue.
146+
// It increments scheduling cycle when a pod is popped.
147+
func (aq *activeQueue) pop(logger klog.Logger) (*framework.QueuedPodInfo, error) {
148+
aq.lock.Lock()
149+
defer aq.lock.Unlock()
150+
for aq.queue.Len() == 0 {
151+
// When the queue is empty, invocation of Pop() is blocked until new item is enqueued.
152+
// When Close() is called, the p.closed is set and the condition is broadcast,
153+
// which causes this loop to continue and return from the Pop().
154+
if aq.closed {
155+
logger.V(2).Info("Scheduling queue is closed")
156+
return nil, nil
157+
}
158+
aq.cond.Wait()
159+
}
160+
pInfo, err := aq.queue.Pop()
161+
if err != nil {
162+
return nil, err
163+
}
164+
pInfo.Attempts++
165+
aq.schedCycle++
166+
// In flight, no concurrent events yet.
167+
if aq.isSchedulingQueueHintEnabled {
168+
aq.inFlightPods[pInfo.Pod.UID] = aq.inFlightEvents.PushBack(pInfo.Pod)
169+
}
170+
171+
// Update metrics and reset the set of unschedulable plugins for the next attempt.
172+
for plugin := range pInfo.UnschedulablePlugins.Union(pInfo.PendingPlugins) {
173+
metrics.UnschedulableReason(plugin, pInfo.Pod.Spec.SchedulerName).Dec()
174+
}
175+
pInfo.UnschedulablePlugins.Clear()
176+
pInfo.PendingPlugins.Clear()
177+
178+
return pInfo, nil
179+
}
180+
181+
// list returns all pods that are in the queue.
182+
func (aq *activeQueue) list() []*v1.Pod {
183+
aq.lock.RLock()
184+
defer aq.lock.RUnlock()
185+
var result []*v1.Pod
186+
for _, pInfo := range aq.queue.List() {
187+
result = append(result, pInfo.Pod)
188+
}
189+
return result
190+
}
191+
192+
// len returns length of the queue.
193+
func (aq *activeQueue) len() int {
194+
return aq.queue.Len()
195+
}
196+
197+
// has inform if pInfo exists in the queue.
198+
func (aq *activeQueue) has(pInfo *framework.QueuedPodInfo) bool {
199+
aq.lock.RLock()
200+
defer aq.lock.RUnlock()
201+
return aq.queue.Has(pInfo)
202+
}
203+
204+
// listInFlightEvents returns all inFlightEvents.
205+
func (aq *activeQueue) listInFlightEvents() []interface{} {
206+
aq.lock.RLock()
207+
defer aq.lock.RUnlock()
208+
var values []interface{}
209+
for event := aq.inFlightEvents.Front(); event != nil; event = event.Next() {
210+
values = append(values, event.Value)
211+
}
212+
return values
213+
}
214+
215+
// listInFlightPods returns all inFlightPods.
216+
func (aq *activeQueue) listInFlightPods() []*v1.Pod {
217+
aq.lock.RLock()
218+
defer aq.lock.RUnlock()
219+
var pods []*v1.Pod
220+
for _, obj := range aq.inFlightPods {
221+
pods = append(pods, obj.Value.(*v1.Pod))
222+
}
223+
return pods
224+
}
225+
226+
// clusterEventsForPod gets all cluster events that have happened during pod for pInfo is being scheduled.
227+
func (aq *activeQueue) clusterEventsForPod(logger klog.Logger, pInfo *framework.QueuedPodInfo) ([]*clusterEvent, error) {
228+
aq.lock.RLock()
229+
defer aq.lock.RUnlock()
230+
logger.V(5).Info("Checking events for in-flight pod", "pod", klog.KObj(pInfo.Pod), "unschedulablePlugins", pInfo.UnschedulablePlugins, "inFlightEventsSize", aq.inFlightEvents.Len(), "inFlightPodsSize", len(aq.inFlightPods))
231+
232+
// AddUnschedulableIfNotPresent is called with the Pod at the end of scheduling or binding.
233+
// So, given pInfo should have been Pop()ed before,
234+
// we can assume pInfo must be recorded in inFlightPods and thus inFlightEvents.
235+
inFlightPod, ok := aq.inFlightPods[pInfo.Pod.UID]
236+
if !ok {
237+
return nil, fmt.Errorf("in flight Pod isn't found in the scheduling queue. If you see this error log, it's likely a bug in the scheduler")
238+
}
239+
240+
var events []*clusterEvent
241+
for event := inFlightPod.Next(); event != nil; event = event.Next() {
242+
e, ok := event.Value.(*clusterEvent)
243+
if !ok {
244+
// Must be another in-flight Pod (*v1.Pod). Can be ignored.
245+
continue
246+
}
247+
events = append(events, e)
248+
}
249+
return events, nil
250+
}
251+
252+
// addEventIfPodInFlight adds clusterEvent to inFlightEvents if the newPod is in inFlightPods.
253+
// It returns true if pushed the event to the inFlightEvents.
254+
func (aq *activeQueue) addEventIfPodInFlight(oldPod, newPod *v1.Pod, event framework.ClusterEvent) bool {
255+
aq.lock.Lock()
256+
defer aq.lock.Unlock()
257+
258+
_, ok := aq.inFlightPods[newPod.UID]
259+
if ok {
260+
aq.inFlightEvents.PushBack(&clusterEvent{
261+
event: event,
262+
oldObj: oldPod,
263+
newObj: newPod,
264+
})
265+
}
266+
return ok
267+
}
268+
269+
// addEventIfAnyInFlight adds clusterEvent to inFlightEvents if any pod is in inFlightPods.
270+
// It returns true if pushed the event to the inFlightEvents.
271+
func (aq *activeQueue) addEventIfAnyInFlight(oldObj, newObj interface{}, event framework.ClusterEvent) bool {
272+
aq.lock.Lock()
273+
defer aq.lock.Unlock()
274+
275+
if len(aq.inFlightPods) != 0 {
276+
aq.inFlightEvents.PushBack(&clusterEvent{
277+
event: event,
278+
oldObj: oldObj,
279+
newObj: newObj,
280+
})
281+
return true
282+
}
283+
return false
284+
}
285+
286+
func (aq *activeQueue) schedulingCycle() int64 {
287+
aq.lock.RLock()
288+
defer aq.lock.RUnlock()
289+
return aq.schedCycle
290+
}
291+
292+
// done must be called for pod returned by Pop. This allows the queue to
293+
// keep track of which pods are currently being processed.
294+
func (aq *activeQueue) done(pod types.UID) {
295+
aq.lock.Lock()
296+
defer aq.lock.Unlock()
297+
298+
inFlightPod, ok := aq.inFlightPods[pod]
299+
if !ok {
300+
// This Pod is already done()ed.
301+
return
302+
}
303+
delete(aq.inFlightPods, pod)
304+
305+
// Remove the pod from the list.
306+
aq.inFlightEvents.Remove(inFlightPod)
307+
308+
// Remove events which are only referred to by this Pod
309+
// so that the inFlightEvents list doesn't grow infinitely.
310+
// If the pod was at the head of the list, then all
311+
// events between it and the next pod are no longer needed
312+
// and can be removed.
313+
for {
314+
e := aq.inFlightEvents.Front()
315+
if e == nil {
316+
// Empty list.
317+
break
318+
}
319+
if _, ok := e.Value.(*clusterEvent); !ok {
320+
// A pod, must stop pruning.
321+
break
322+
}
323+
aq.inFlightEvents.Remove(e)
324+
}
325+
}
326+
327+
// close closes the activeQueue.
328+
func (aq *activeQueue) close() {
329+
aq.lock.Lock()
330+
aq.closed = true
331+
aq.lock.Unlock()
332+
}
333+
334+
// broadcast notifies the pop() operation that new pod(s) was added to the activeQueue.
335+
func (aq *activeQueue) broadcast() {
336+
aq.cond.Broadcast()
337+
}

0 commit comments

Comments
 (0)