Skip to content

Commit 65d9066

Browse files
authored
Merge pull request kubernetes#130680 from macsko/update_backoffq_less_function_to_order_by_priority_in_windows
Update backoffQ's less function to order pods by priority in windows
2 parents 16abcd7 + e367dca commit 65d9066

File tree

10 files changed

+332
-122
lines changed

10 files changed

+332
-122
lines changed

pkg/scheduler/backend/queue/backoff_queue.go

Lines changed: 114 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -27,10 +27,18 @@ import (
2727
"k8s.io/utils/clock"
2828
)
2929

30+
// backoffQOrderingWindowDuration is a duration of an ordering window in the podBackoffQ.
31+
// In each window, represented as a whole second, pods are ordered by priority.
32+
// It is the same as interval of flushing the pods from the podBackoffQ to the activeQ, to flush the whole windows there.
33+
// This works only if PopFromBackoffQ feature is enabled.
34+
// See the KEP-5142 (http://kep.k8s.io/5142) for rationale.
35+
const backoffQOrderingWindowDuration = time.Second
36+
3037
// backoffQueuer is a wrapper for backoffQ related operations.
3138
type backoffQueuer interface {
3239
// isPodBackingoff returns true if a pod is still waiting for its backoff timer.
3340
// If this returns true, the pod should not be re-tried.
41+
// If the pod backoff time is in the actual ordering window, it should still be backing off.
3442
isPodBackingoff(podInfo *framework.QueuedPodInfo) bool
3543
// popEachBackoffCompleted run fn for all pods from podBackoffQ and podErrorBackoffQ that completed backoff while popping them.
3644
popEachBackoffCompleted(logger klog.Logger, fn func(pInfo *framework.QueuedPodInfo))
@@ -39,6 +47,11 @@ type backoffQueuer interface {
3947
podInitialBackoffDuration() time.Duration
4048
// podMaxBackoffDuration returns maximum backoff duration that pod can get.
4149
podMaxBackoffDuration() time.Duration
50+
// waitUntilAlignedWithOrderingWindow waits until the time reaches a multiple of backoffQOrderingWindowDuration.
51+
// It then runs the f function at the backoffQOrderingWindowDuration interval using a ticker.
52+
// It's important to align the flushing time, because podBackoffQ's ordering is based on the windows
53+
// and whole windows have to be flushed at one time without a visible latency.
54+
waitUntilAlignedWithOrderingWindow(f func(), stopCh <-chan struct{})
4255

4356
// add adds the pInfo to backoffQueue.
4457
// The event should show which event triggered this addition and is used for the metric recording.
@@ -54,15 +67,15 @@ type backoffQueuer interface {
5467
// has inform if pInfo exists in the queue.
5568
has(pInfo *framework.QueuedPodInfo) bool
5669
// list returns all pods that are in the queue.
57-
list() []*framework.QueuedPodInfo
70+
list() []*v1.Pod
5871
// len returns length of the queue.
5972
len() int
6073
}
6174

6275
// backoffQueue implements backoffQueuer and wraps two queues inside,
6376
// providing seamless access as if it were one queue.
6477
type backoffQueue struct {
65-
clock clock.Clock
78+
clock clock.WithTicker
6679

6780
// podBackoffQ is a heap ordered by backoff expiry. Pods which have completed backoff
6881
// are popped from this heap before the scheduler looks at activeQ
@@ -73,15 +86,27 @@ type backoffQueue struct {
7386

7487
podInitialBackoff time.Duration
7588
podMaxBackoff time.Duration
89+
// activeQLessFn is used as an eventual less function if two backoff times are equal,
90+
// when the SchedulerPopFromBackoffQ feature is enabled.
91+
activeQLessFn framework.LessFunc
92+
93+
// isPopFromBackoffQEnabled indicates whether the feature gate SchedulerPopFromBackoffQ is enabled.
94+
isPopFromBackoffQEnabled bool
7695
}
7796

78-
func newBackoffQueue(clock clock.Clock, podInitialBackoffDuration time.Duration, podMaxBackoffDuration time.Duration) *backoffQueue {
97+
func newBackoffQueue(clock clock.WithTicker, podInitialBackoffDuration time.Duration, podMaxBackoffDuration time.Duration, activeQLessFn framework.LessFunc, popFromBackoffQEnabled bool) *backoffQueue {
7998
bq := &backoffQueue{
80-
clock: clock,
81-
podInitialBackoff: podInitialBackoffDuration,
82-
podMaxBackoff: podMaxBackoffDuration,
99+
clock: clock,
100+
podInitialBackoff: podInitialBackoffDuration,
101+
podMaxBackoff: podMaxBackoffDuration,
102+
isPopFromBackoffQEnabled: popFromBackoffQEnabled,
103+
activeQLessFn: activeQLessFn,
83104
}
84-
bq.podBackoffQ = heap.NewWithRecorder(podInfoKeyFunc, bq.lessBackoffCompleted, metrics.NewBackoffPodsRecorder())
105+
podBackoffQLessFn := bq.lessBackoffCompleted
106+
if popFromBackoffQEnabled {
107+
podBackoffQLessFn = bq.lessBackoffCompletedWithPriority
108+
}
109+
bq.podBackoffQ = heap.NewWithRecorder(podInfoKeyFunc, podBackoffQLessFn, metrics.NewBackoffPodsRecorder())
85110
bq.podErrorBackoffQ = heap.NewWithRecorder(podInfoKeyFunc, bq.lessBackoffCompleted, metrics.NewBackoffPodsRecorder())
86111

87112
return bq
@@ -97,7 +122,70 @@ func (bq *backoffQueue) podMaxBackoffDuration() time.Duration {
97122
return bq.podMaxBackoff
98123
}
99124

100-
// lessBackoffCompleted is a less function of podBackoffQ and podErrorBackoffQ.
125+
// alignToWindow truncates the provided time to the podBackoffQ ordering window.
126+
// It returns the lowest possible timestamp in the window.
127+
func (bq *backoffQueue) alignToWindow(t time.Time) time.Time {
128+
if !bq.isPopFromBackoffQEnabled {
129+
return t
130+
}
131+
return t.Truncate(backoffQOrderingWindowDuration)
132+
}
133+
134+
// waitUntilAlignedWithOrderingWindow waits until the time reaches a multiple of backoffQOrderingWindowDuration.
135+
// It then runs the f function at the backoffQOrderingWindowDuration interval using a ticker.
136+
// It's important to align the flushing time, because podBackoffQ's ordering is based on the windows
137+
// and whole windows have to be flushed at one time without a visible latency.
138+
func (bq *backoffQueue) waitUntilAlignedWithOrderingWindow(f func(), stopCh <-chan struct{}) {
139+
now := bq.clock.Now()
140+
// Wait until the time reaches the multiple of backoffQOrderingWindowDuration.
141+
durationToNextWindow := bq.alignToWindow(now.Add(backoffQOrderingWindowDuration)).Sub(now)
142+
timer := bq.clock.NewTimer(durationToNextWindow)
143+
select {
144+
case <-stopCh:
145+
timer.Stop()
146+
return
147+
case <-timer.C():
148+
}
149+
150+
// Run a ticker to make sure the invocations of f function
151+
// are aligned with the backoffQ's ordering window.
152+
ticker := bq.clock.NewTicker(backoffQOrderingWindowDuration)
153+
for {
154+
select {
155+
case <-stopCh:
156+
return
157+
default:
158+
}
159+
160+
f()
161+
162+
// NOTE: b/c there is no priority selection in golang
163+
// it is possible for this to race, meaning we could
164+
// trigger ticker.C and stopCh, and ticker.C select falls through.
165+
// In order to mitigate we re-check stopCh at the beginning
166+
// of every loop to prevent extra executions of f().
167+
select {
168+
case <-stopCh:
169+
ticker.Stop()
170+
return
171+
case <-ticker.C():
172+
}
173+
}
174+
}
175+
176+
// lessBackoffCompletedWithPriority is a less function of podBackoffQ if PopFromBackoffQ feature is enabled.
177+
// It orders the pods in the same BackoffOrderingWindow the same as the activeQ will do to improve popping order from backoffQ when activeQ is empty.
178+
func (bq *backoffQueue) lessBackoffCompletedWithPriority(pInfo1, pInfo2 *framework.QueuedPodInfo) bool {
179+
bo1 := bq.getBackoffTime(pInfo1)
180+
bo2 := bq.getBackoffTime(pInfo2)
181+
if !bo1.Equal(bo2) {
182+
return bo1.Before(bo2)
183+
}
184+
// If the backoff time is the same, sort the pod in the same manner as activeQ does.
185+
return bq.activeQLessFn(pInfo1, pInfo2)
186+
}
187+
188+
// lessBackoffCompleted is a less function of podErrorBackoffQ.
101189
func (bq *backoffQueue) lessBackoffCompleted(pInfo1, pInfo2 *framework.QueuedPodInfo) bool {
102190
bo1 := bq.getBackoffTime(pInfo1)
103191
bo2 := bq.getBackoffTime(pInfo2)
@@ -106,9 +194,11 @@ func (bq *backoffQueue) lessBackoffCompleted(pInfo1, pInfo2 *framework.QueuedPod
106194

107195
// isPodBackingoff returns true if a pod is still waiting for its backoff timer.
108196
// If this returns true, the pod should not be re-tried.
197+
// If the pod backoff time is in the actual ordering window, it should still be backing off.
109198
func (bq *backoffQueue) isPodBackingoff(podInfo *framework.QueuedPodInfo) bool {
110199
boTime := bq.getBackoffTime(podInfo)
111-
return boTime.After(bq.clock.Now())
200+
// Don't use After, because in case of windows equality we want to return true.
201+
return !boTime.Before(bq.alignToWindow(bq.clock.Now()))
112202
}
113203

114204
// getBackoffTime returns the time that podInfo completes backoff.
@@ -117,9 +207,14 @@ func (bq *backoffQueue) isPodBackingoff(podInfo *framework.QueuedPodInfo) bool {
117207
// because of the fact that the backoff time is calculated based on podInfo.Attempts,
118208
// which doesn't get changed until the pod's scheduling is retried.
119209
func (bq *backoffQueue) getBackoffTime(podInfo *framework.QueuedPodInfo) time.Time {
210+
if podInfo.Attempts == 0 {
211+
// Don't store backoff expiration if the duration is 0
212+
// to correctly handle isPodBackingoff, if pod should skip backoff, when it wasn't tried at all.
213+
return time.Time{}
214+
}
120215
if podInfo.BackoffExpiration.IsZero() {
121216
duration := bq.calculateBackoffDuration(podInfo)
122-
podInfo.BackoffExpiration = podInfo.Timestamp.Add(duration)
217+
podInfo.BackoffExpiration = bq.alignToWindow(podInfo.Timestamp.Add(duration))
123218
}
124219
return podInfo.BackoffExpiration
125220
}
@@ -238,8 +333,15 @@ func (bq *backoffQueue) has(pInfo *framework.QueuedPodInfo) bool {
238333
}
239334

240335
// list returns all pods that are in the queue.
241-
func (bq *backoffQueue) list() []*framework.QueuedPodInfo {
242-
return append(bq.podBackoffQ.List(), bq.podErrorBackoffQ.List()...)
336+
func (bq *backoffQueue) list() []*v1.Pod {
337+
var result []*v1.Pod
338+
for _, pInfo := range bq.podBackoffQ.List() {
339+
result = append(result, pInfo.Pod)
340+
}
341+
for _, pInfo := range bq.podErrorBackoffQ.List() {
342+
result = append(result, pInfo.Pod)
343+
}
344+
return result
243345
}
244346

245347
// len returns length of the queue.

pkg/scheduler/backend/queue/backoff_queue_test.go

Lines changed: 100 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,7 @@ limitations under the License.
1717
package queue
1818

1919
import (
20+
"fmt"
2021
"math"
2122
"testing"
2223
"time"
@@ -69,7 +70,7 @@ func TestBackoffQueue_calculateBackoffDuration(t *testing.T) {
6970
}
7071
for _, tt := range tests {
7172
t.Run(tt.name, func(t *testing.T) {
72-
bq := newBackoffQueue(clock.RealClock{}, tt.initialBackoffDuration, tt.maxBackoffDuration)
73+
bq := newBackoffQueue(clock.RealClock{}, tt.initialBackoffDuration, tt.maxBackoffDuration, newDefaultQueueSort(), true)
7374
if got := bq.calculateBackoffDuration(tt.podInfo); got != tt.want {
7475
t.Errorf("backoffQueue.calculateBackoffDuration() = %v, want %v", got, tt.want)
7576
}
@@ -84,7 +85,7 @@ func TestBackoffQueue_popEachBackoffCompleted(t *testing.T) {
8485
PodInfo: &framework.PodInfo{
8586
Pod: st.MakePod().Name("pod0").Obj(),
8687
},
87-
Timestamp: fakeClock.Now().Add(-time.Second),
88+
Timestamp: fakeClock.Now().Add(-2 * time.Second),
8889
Attempts: 1,
8990
UnschedulablePlugins: sets.New("plugin"),
9091
},
@@ -100,7 +101,7 @@ func TestBackoffQueue_popEachBackoffCompleted(t *testing.T) {
100101
PodInfo: &framework.PodInfo{
101102
Pod: st.MakePod().Name("pod2").Obj(),
102103
},
103-
Timestamp: fakeClock.Now().Add(-time.Second),
104+
Timestamp: fakeClock.Now().Add(-2 * time.Second),
104105
Attempts: 1,
105106
},
106107
"pod3": {
@@ -147,12 +148,105 @@ func TestBackoffQueue_popEachBackoffCompleted(t *testing.T) {
147148
wantPods: nil,
148149
},
149150
}
151+
for _, tt := range tests {
152+
for _, popFromBackoffQEnabled := range []bool{true, false} {
153+
t.Run(fmt.Sprintf("%s popFromBackoffQEnabled(%v)", tt.name, popFromBackoffQEnabled), func(t *testing.T) {
154+
logger, _ := ktesting.NewTestContext(t)
155+
bq := newBackoffQueue(fakeClock, DefaultPodInitialBackoffDuration, DefaultPodMaxBackoffDuration, newDefaultQueueSort(), popFromBackoffQEnabled)
156+
for _, podName := range tt.podsInBackoff {
157+
bq.add(logger, podInfos[podName], framework.EventUnscheduledPodAdd.Label())
158+
}
159+
var gotPods []string
160+
bq.popEachBackoffCompleted(logger, func(pInfo *framework.QueuedPodInfo) {
161+
gotPods = append(gotPods, pInfo.Pod.Name)
162+
})
163+
if diff := cmp.Diff(tt.wantPods, gotPods); diff != "" {
164+
t.Errorf("Unexpected pods moved (-want, +got):\n%s", diff)
165+
}
166+
podsToStayInBackoff := len(tt.podsInBackoff) - len(tt.wantPods)
167+
if bq.len() != podsToStayInBackoff {
168+
t.Errorf("Expected %v pods to stay in backoffQ, but got: %v", podsToStayInBackoff, bq.len())
169+
}
170+
})
171+
}
172+
}
173+
}
174+
175+
func TestBackoffQueueOrdering(t *testing.T) {
176+
// Align the fake clock with ordering window.
177+
fakeClock := testingclock.NewFakeClock(time.Now().Truncate(backoffQOrderingWindowDuration))
178+
podInfos := []*framework.QueuedPodInfo{
179+
{
180+
PodInfo: &framework.PodInfo{
181+
Pod: st.MakePod().Name("pod0").Priority(1).Obj(),
182+
},
183+
Timestamp: fakeClock.Now(),
184+
Attempts: 1,
185+
UnschedulablePlugins: sets.New("plugin"),
186+
},
187+
{
188+
PodInfo: &framework.PodInfo{
189+
Pod: st.MakePod().Name("pod1").Priority(1).Obj(),
190+
},
191+
Timestamp: fakeClock.Now().Add(-time.Second),
192+
Attempts: 1,
193+
UnschedulablePlugins: sets.New("plugin"),
194+
},
195+
{
196+
PodInfo: &framework.PodInfo{
197+
Pod: st.MakePod().Name("pod2").Priority(2).Obj(),
198+
},
199+
Timestamp: fakeClock.Now().Add(-2*time.Second + time.Millisecond),
200+
Attempts: 1,
201+
UnschedulablePlugins: sets.New("plugin"),
202+
},
203+
{
204+
PodInfo: &framework.PodInfo{
205+
Pod: st.MakePod().Name("pod3").Priority(1).Obj(),
206+
},
207+
Timestamp: fakeClock.Now().Add(-2 * time.Second),
208+
Attempts: 1,
209+
UnschedulablePlugins: sets.New("plugin"),
210+
},
211+
{
212+
PodInfo: &framework.PodInfo{
213+
Pod: st.MakePod().Name("pod4").Priority(2).Obj(),
214+
},
215+
Timestamp: fakeClock.Now().Add(-2 * time.Second),
216+
Attempts: 1,
217+
UnschedulablePlugins: sets.New("plugin"),
218+
},
219+
{
220+
PodInfo: &framework.PodInfo{
221+
Pod: st.MakePod().Name("pod5").Priority(1).Obj(),
222+
},
223+
Timestamp: fakeClock.Now().Add(-3 * time.Second),
224+
Attempts: 1,
225+
UnschedulablePlugins: sets.New("plugin"),
226+
},
227+
}
228+
tests := []struct {
229+
name string
230+
popFromBackoffQEnabled bool
231+
wantPods []string
232+
}{
233+
{
234+
name: "Pods with the same window are ordered by priority if PopFromBackoffQ is enabled",
235+
popFromBackoffQEnabled: true,
236+
wantPods: []string{"pod5", "pod4", "pod2", "pod3"},
237+
},
238+
{
239+
name: "Pods priority doesn't matter if PopFromBackoffQ is disabled",
240+
popFromBackoffQEnabled: false,
241+
wantPods: []string{"pod5", "pod3", "pod4", "pod2"},
242+
},
243+
}
150244
for _, tt := range tests {
151245
t.Run(tt.name, func(t *testing.T) {
152246
logger, _ := ktesting.NewTestContext(t)
153-
bq := newBackoffQueue(fakeClock, DefaultPodInitialBackoffDuration, DefaultPodMaxBackoffDuration)
154-
for _, podName := range tt.podsInBackoff {
155-
bq.add(logger, podInfos[podName], framework.EventUnscheduledPodAdd.Label())
247+
bq := newBackoffQueue(fakeClock, DefaultPodInitialBackoffDuration, DefaultPodMaxBackoffDuration, newDefaultQueueSort(), tt.popFromBackoffQEnabled)
248+
for _, podInfo := range podInfos {
249+
bq.add(logger, podInfo, framework.EventUnscheduledPodAdd.Label())
156250
}
157251
var gotPods []string
158252
bq.popEachBackoffCompleted(logger, func(pInfo *framework.QueuedPodInfo) {
@@ -161,10 +255,6 @@ func TestBackoffQueue_popEachBackoffCompleted(t *testing.T) {
161255
if diff := cmp.Diff(tt.wantPods, gotPods); diff != "" {
162256
t.Errorf("Unexpected pods moved (-want, +got):\n%s", diff)
163257
}
164-
podsToStayInBackoff := len(tt.podsInBackoff) - len(tt.wantPods)
165-
if bq.len() != podsToStayInBackoff {
166-
t.Errorf("Expected %v pods to stay in backoffQ, but got: %v", podsToStayInBackoff, bq.len())
167-
}
168258
})
169259
}
170260
}

0 commit comments

Comments
 (0)