Skip to content

Commit 65321bf

Browse files
authored
Merge pull request kubernetes#130214 from macsko/split_backoffq_into_backoffq_and_errorbackoffq
Split backoffQ into backoffQ and errorBackoffQ in scheduler
2 parents b14fad5 + 0f24b9f commit 65321bf

File tree

4 files changed

+490
-191
lines changed

4 files changed

+490
-191
lines changed
Lines changed: 238 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,238 @@
1+
/*
2+
Copyright 2025 The Kubernetes Authors.
3+
4+
Licensed under the Apache License, Version 2.0 (the "License");
5+
you may not use this file except in compliance with the License.
6+
You may obtain a copy of the License at
7+
8+
http://www.apache.org/licenses/LICENSE-2.0
9+
10+
Unless required by applicable law or agreed to in writing, software
11+
distributed under the License is distributed on an "AS IS" BASIS,
12+
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
See the License for the specific language governing permissions and
14+
limitations under the License.
15+
*/
16+
17+
package queue
18+
19+
import (
20+
"time"
21+
22+
v1 "k8s.io/api/core/v1"
23+
"k8s.io/klog/v2"
24+
"k8s.io/kubernetes/pkg/scheduler/backend/heap"
25+
"k8s.io/kubernetes/pkg/scheduler/framework"
26+
"k8s.io/kubernetes/pkg/scheduler/metrics"
27+
"k8s.io/utils/clock"
28+
)
29+
30+
// backoffQueuer is a wrapper for backoffQ related operations.
31+
type backoffQueuer interface {
32+
// isPodBackingoff returns true if a pod is still waiting for its backoff timer.
33+
// If this returns true, the pod should not be re-tried.
34+
isPodBackingoff(podInfo *framework.QueuedPodInfo) bool
35+
// getBackoffTime returns the time that podInfo completes backoff
36+
getBackoffTime(podInfo *framework.QueuedPodInfo) time.Time
37+
// popEachBackoffCompleted run fn for all pods from podBackoffQ and podErrorBackoffQ that completed backoff while popping them.
38+
popEachBackoffCompleted(logger klog.Logger, fn func(pInfo *framework.QueuedPodInfo))
39+
40+
// podInitialBackoffDuration returns initial backoff duration that pod can get.
41+
podInitialBackoffDuration() time.Duration
42+
// podMaxBackoffDuration returns maximum backoff duration that pod can get.
43+
podMaxBackoffDuration() time.Duration
44+
45+
// add adds the pInfo to backoffQueue.
46+
// It also ensures that pInfo is not in both queues.
47+
add(logger klog.Logger, pInfo *framework.QueuedPodInfo)
48+
// update updates the pod in backoffQueue if oldPodInfo is already in the queue.
49+
// It returns new pod info if updated, nil otherwise.
50+
update(newPod *v1.Pod, oldPodInfo *framework.QueuedPodInfo) *framework.QueuedPodInfo
51+
// delete deletes the pInfo from backoffQueue.
52+
delete(pInfo *framework.QueuedPodInfo)
53+
// get returns the pInfo matching given pInfoLookup, if exists.
54+
get(pInfoLookup *framework.QueuedPodInfo) (*framework.QueuedPodInfo, bool)
55+
// has inform if pInfo exists in the queue.
56+
has(pInfo *framework.QueuedPodInfo) bool
57+
// list returns all pods that are in the queue.
58+
list() []*framework.QueuedPodInfo
59+
// len returns length of the queue.
60+
len() int
61+
}
62+
63+
// backoffQueue implements backoffQueuer and wraps two queues inside,
64+
// providing seamless access as if it were one queue.
65+
type backoffQueue struct {
66+
clock clock.Clock
67+
68+
// podBackoffQ is a heap ordered by backoff expiry. Pods which have completed backoff
69+
// are popped from this heap before the scheduler looks at activeQ
70+
podBackoffQ *heap.Heap[*framework.QueuedPodInfo]
71+
// podErrorBackoffQ is a heap ordered by error backoff expiry. Pods which have completed backoff
72+
// are popped from this heap before the scheduler looks at activeQ
73+
podErrorBackoffQ *heap.Heap[*framework.QueuedPodInfo]
74+
75+
podInitialBackoff time.Duration
76+
podMaxBackoff time.Duration
77+
}
78+
79+
func newBackoffQueue(clock clock.Clock, podInitialBackoffDuration time.Duration, podMaxBackoffDuration time.Duration) *backoffQueue {
80+
bq := &backoffQueue{
81+
clock: clock,
82+
podInitialBackoff: podInitialBackoffDuration,
83+
podMaxBackoff: podMaxBackoffDuration,
84+
}
85+
bq.podBackoffQ = heap.NewWithRecorder(podInfoKeyFunc, bq.lessBackoffCompleted, metrics.NewBackoffPodsRecorder())
86+
bq.podErrorBackoffQ = heap.NewWithRecorder(podInfoKeyFunc, bq.lessBackoffCompleted, metrics.NewBackoffPodsRecorder())
87+
88+
return bq
89+
}
90+
91+
// podInitialBackoffDuration returns initial backoff duration that pod can get.
92+
func (bq *backoffQueue) podInitialBackoffDuration() time.Duration {
93+
return bq.podInitialBackoff
94+
}
95+
96+
// podMaxBackoffDuration returns maximum backoff duration that pod can get.
97+
func (bq *backoffQueue) podMaxBackoffDuration() time.Duration {
98+
return bq.podMaxBackoff
99+
}
100+
101+
// lessBackoffCompleted is a less function of podBackoffQ and podErrorBackoffQ.
102+
func (bq *backoffQueue) lessBackoffCompleted(pInfo1, pInfo2 *framework.QueuedPodInfo) bool {
103+
bo1 := bq.getBackoffTime(pInfo1)
104+
bo2 := bq.getBackoffTime(pInfo2)
105+
return bo1.Before(bo2)
106+
}
107+
108+
// isPodBackingoff returns true if a pod is still waiting for its backoff timer.
109+
// If this returns true, the pod should not be re-tried.
110+
func (bq *backoffQueue) isPodBackingoff(podInfo *framework.QueuedPodInfo) bool {
111+
boTime := bq.getBackoffTime(podInfo)
112+
return boTime.After(bq.clock.Now())
113+
}
114+
115+
// getBackoffTime returns the time that podInfo completes backoff
116+
func (bq *backoffQueue) getBackoffTime(podInfo *framework.QueuedPodInfo) time.Time {
117+
duration := bq.calculateBackoffDuration(podInfo)
118+
backoffTime := podInfo.Timestamp.Add(duration)
119+
return backoffTime
120+
}
121+
122+
// calculateBackoffDuration is a helper function for calculating the backoffDuration
123+
// based on the number of attempts the pod has made.
124+
func (bq *backoffQueue) calculateBackoffDuration(podInfo *framework.QueuedPodInfo) time.Duration {
125+
if podInfo.Attempts == 0 {
126+
// When the Pod hasn't experienced any scheduling attempts,
127+
// they aren't obliged to get a backoff penalty at all.
128+
return 0
129+
}
130+
131+
duration := bq.podInitialBackoff
132+
for i := 1; i < podInfo.Attempts; i++ {
133+
// Use subtraction instead of addition or multiplication to avoid overflow.
134+
if duration > bq.podMaxBackoff-duration {
135+
return bq.podMaxBackoff
136+
}
137+
duration += duration
138+
}
139+
return duration
140+
}
141+
142+
func (bq *backoffQueue) popEachBackoffCompletedWithQueue(logger klog.Logger, fn func(pInfo *framework.QueuedPodInfo), queue *heap.Heap[*framework.QueuedPodInfo]) {
143+
for {
144+
pInfo, ok := queue.Peek()
145+
if !ok || pInfo == nil {
146+
break
147+
}
148+
pod := pInfo.Pod
149+
if bq.isPodBackingoff(pInfo) {
150+
break
151+
}
152+
_, err := queue.Pop()
153+
if err != nil {
154+
logger.Error(err, "Unable to pop pod from backoff queue despite backoff completion", "pod", klog.KObj(pod))
155+
break
156+
}
157+
if fn != nil {
158+
fn(pInfo)
159+
}
160+
}
161+
}
162+
163+
// popEachBackoffCompleted run fn for all pods from podBackoffQ and podErrorBackoffQ that completed backoff while popping them.
164+
func (bq *backoffQueue) popEachBackoffCompleted(logger klog.Logger, fn func(pInfo *framework.QueuedPodInfo)) {
165+
// Ensure both queues are called
166+
bq.popEachBackoffCompletedWithQueue(logger, fn, bq.podBackoffQ)
167+
bq.popEachBackoffCompletedWithQueue(logger, fn, bq.podErrorBackoffQ)
168+
}
169+
170+
// add adds the pInfo to backoffQueue.
171+
// It also ensures that pInfo is not in both queues.
172+
func (bq *backoffQueue) add(logger klog.Logger, pInfo *framework.QueuedPodInfo) {
173+
// If pod has empty both unschedulable plugins and pending plugins,
174+
// it means that it failed because of error and should be moved to podErrorBackoffQ.
175+
if pInfo.UnschedulablePlugins.Len() == 0 && pInfo.PendingPlugins.Len() == 0 {
176+
bq.podErrorBackoffQ.AddOrUpdate(pInfo)
177+
// Ensure the pod is not in the podBackoffQ and report the error if it happens.
178+
err := bq.podBackoffQ.Delete(pInfo)
179+
if err == nil {
180+
logger.Error(nil, "BackoffQueue add() was called with a pod that was already in the podBackoffQ", "pod", klog.KObj(pInfo.Pod))
181+
}
182+
return
183+
}
184+
bq.podBackoffQ.AddOrUpdate(pInfo)
185+
// Ensure the pod is not in the podErrorBackoffQ and report the error if it happens.
186+
err := bq.podErrorBackoffQ.Delete(pInfo)
187+
if err == nil {
188+
logger.Error(nil, "BackoffQueue add() was called with a pod that was already in the podErrorBackoffQ", "pod", klog.KObj(pInfo.Pod))
189+
}
190+
}
191+
192+
// update updates the pod in backoffQueue if oldPodInfo is already in the queue.
193+
// It returns new pod info if updated, nil otherwise.
194+
func (bq *backoffQueue) update(newPod *v1.Pod, oldPodInfo *framework.QueuedPodInfo) *framework.QueuedPodInfo {
195+
// If the pod is in the backoff queue, update it there.
196+
if pInfo, exists := bq.podBackoffQ.Get(oldPodInfo); exists {
197+
_ = pInfo.Update(newPod)
198+
bq.podBackoffQ.AddOrUpdate(pInfo)
199+
return pInfo
200+
}
201+
// If the pod is in the error backoff queue, update it there.
202+
if pInfo, exists := bq.podErrorBackoffQ.Get(oldPodInfo); exists {
203+
_ = pInfo.Update(newPod)
204+
bq.podErrorBackoffQ.AddOrUpdate(pInfo)
205+
return pInfo
206+
}
207+
return nil
208+
}
209+
210+
// delete deletes the pInfo from backoffQueue.
211+
func (bq *backoffQueue) delete(pInfo *framework.QueuedPodInfo) {
212+
_ = bq.podBackoffQ.Delete(pInfo)
213+
_ = bq.podErrorBackoffQ.Delete(pInfo)
214+
}
215+
216+
// get returns the pInfo matching given pInfoLookup, if exists.
217+
func (bq *backoffQueue) get(pInfoLookup *framework.QueuedPodInfo) (*framework.QueuedPodInfo, bool) {
218+
pInfo, exists := bq.podBackoffQ.Get(pInfoLookup)
219+
if exists {
220+
return pInfo, true
221+
}
222+
return bq.podErrorBackoffQ.Get(pInfoLookup)
223+
}
224+
225+
// has inform if pInfo exists in the queue.
226+
func (bq *backoffQueue) has(pInfo *framework.QueuedPodInfo) bool {
227+
return bq.podBackoffQ.Has(pInfo) || bq.podErrorBackoffQ.Has(pInfo)
228+
}
229+
230+
// list returns all pods that are in the queue.
231+
func (bq *backoffQueue) list() []*framework.QueuedPodInfo {
232+
return append(bq.podBackoffQ.List(), bq.podErrorBackoffQ.List()...)
233+
}
234+
235+
// len returns length of the queue.
236+
func (bq *backoffQueue) len() int {
237+
return bq.podBackoffQ.Len() + bq.podErrorBackoffQ.Len()
238+
}

0 commit comments

Comments
 (0)