@@ -27,10 +27,18 @@ import (
27
27
"k8s.io/utils/clock"
28
28
)
29
29
30
+ // backoffQOrderingWindowDuration is a duration of an ordering window in the podBackoffQ.
31
+ // In each window, represented as a whole second, pods are ordered by priority.
32
+ // It is the same as interval of flushing the pods from the podBackoffQ to the activeQ, to flush the whole windows there.
33
+ // This works only if PopFromBackoffQ feature is enabled.
34
+ // See the KEP-5142 (http://kep.k8s.io/5142) for rationale.
35
+ const backoffQOrderingWindowDuration = time .Second
36
+
30
37
// backoffQueuer is a wrapper for backoffQ related operations.
31
38
type backoffQueuer interface {
32
39
// isPodBackingoff returns true if a pod is still waiting for its backoff timer.
33
40
// If this returns true, the pod should not be re-tried.
41
+ // If the pod backoff time is in the actual ordering window, it should still be backing off.
34
42
isPodBackingoff (podInfo * framework.QueuedPodInfo ) bool
35
43
// popEachBackoffCompleted run fn for all pods from podBackoffQ and podErrorBackoffQ that completed backoff while popping them.
36
44
popEachBackoffCompleted (logger klog.Logger , fn func (pInfo * framework.QueuedPodInfo ))
@@ -39,6 +47,11 @@ type backoffQueuer interface {
39
47
podInitialBackoffDuration () time.Duration
40
48
// podMaxBackoffDuration returns maximum backoff duration that pod can get.
41
49
podMaxBackoffDuration () time.Duration
50
+ // waitUntilAlignedWithOrderingWindow waits until the time reaches a multiple of backoffQOrderingWindowDuration.
51
+ // It then runs the f function at the backoffQOrderingWindowDuration interval using a ticker.
52
+ // It's important to align the flushing time, because podBackoffQ's ordering is based on the windows
53
+ // and whole windows have to be flushed at one time without a visible latency.
54
+ waitUntilAlignedWithOrderingWindow (f func (), stopCh <- chan struct {})
42
55
43
56
// add adds the pInfo to backoffQueue.
44
57
// The event should show which event triggered this addition and is used for the metric recording.
@@ -54,15 +67,15 @@ type backoffQueuer interface {
54
67
// has inform if pInfo exists in the queue.
55
68
has (pInfo * framework.QueuedPodInfo ) bool
56
69
// list returns all pods that are in the queue.
57
- list () []* framework. QueuedPodInfo
70
+ list () []* v1. Pod
58
71
// len returns length of the queue.
59
72
len () int
60
73
}
61
74
62
75
// backoffQueue implements backoffQueuer and wraps two queues inside,
63
76
// providing seamless access as if it were one queue.
64
77
type backoffQueue struct {
65
- clock clock.Clock
78
+ clock clock.WithTicker
66
79
67
80
// podBackoffQ is a heap ordered by backoff expiry. Pods which have completed backoff
68
81
// are popped from this heap before the scheduler looks at activeQ
@@ -73,15 +86,27 @@ type backoffQueue struct {
73
86
74
87
podInitialBackoff time.Duration
75
88
podMaxBackoff time.Duration
89
+ // activeQLessFn is used as an eventual less function if two backoff times are equal,
90
+ // when the SchedulerPopFromBackoffQ feature is enabled.
91
+ activeQLessFn framework.LessFunc
92
+
93
+ // isPopFromBackoffQEnabled indicates whether the feature gate SchedulerPopFromBackoffQ is enabled.
94
+ isPopFromBackoffQEnabled bool
76
95
}
77
96
78
- func newBackoffQueue (clock clock.Clock , podInitialBackoffDuration time.Duration , podMaxBackoffDuration time.Duration ) * backoffQueue {
97
+ func newBackoffQueue (clock clock.WithTicker , podInitialBackoffDuration time.Duration , podMaxBackoffDuration time.Duration , activeQLessFn framework. LessFunc , popFromBackoffQEnabled bool ) * backoffQueue {
79
98
bq := & backoffQueue {
80
- clock : clock ,
81
- podInitialBackoff : podInitialBackoffDuration ,
82
- podMaxBackoff : podMaxBackoffDuration ,
99
+ clock : clock ,
100
+ podInitialBackoff : podInitialBackoffDuration ,
101
+ podMaxBackoff : podMaxBackoffDuration ,
102
+ isPopFromBackoffQEnabled : popFromBackoffQEnabled ,
103
+ activeQLessFn : activeQLessFn ,
83
104
}
84
- bq .podBackoffQ = heap .NewWithRecorder (podInfoKeyFunc , bq .lessBackoffCompleted , metrics .NewBackoffPodsRecorder ())
105
+ podBackoffQLessFn := bq .lessBackoffCompleted
106
+ if popFromBackoffQEnabled {
107
+ podBackoffQLessFn = bq .lessBackoffCompletedWithPriority
108
+ }
109
+ bq .podBackoffQ = heap .NewWithRecorder (podInfoKeyFunc , podBackoffQLessFn , metrics .NewBackoffPodsRecorder ())
85
110
bq .podErrorBackoffQ = heap .NewWithRecorder (podInfoKeyFunc , bq .lessBackoffCompleted , metrics .NewBackoffPodsRecorder ())
86
111
87
112
return bq
@@ -97,7 +122,70 @@ func (bq *backoffQueue) podMaxBackoffDuration() time.Duration {
97
122
return bq .podMaxBackoff
98
123
}
99
124
100
- // lessBackoffCompleted is a less function of podBackoffQ and podErrorBackoffQ.
125
+ // alignToWindow truncates the provided time to the podBackoffQ ordering window.
126
+ // It returns the lowest possible timestamp in the window.
127
+ func (bq * backoffQueue ) alignToWindow (t time.Time ) time.Time {
128
+ if ! bq .isPopFromBackoffQEnabled {
129
+ return t
130
+ }
131
+ return t .Truncate (backoffQOrderingWindowDuration )
132
+ }
133
+
134
+ // waitUntilAlignedWithOrderingWindow waits until the time reaches a multiple of backoffQOrderingWindowDuration.
135
+ // It then runs the f function at the backoffQOrderingWindowDuration interval using a ticker.
136
+ // It's important to align the flushing time, because podBackoffQ's ordering is based on the windows
137
+ // and whole windows have to be flushed at one time without a visible latency.
138
+ func (bq * backoffQueue ) waitUntilAlignedWithOrderingWindow (f func (), stopCh <- chan struct {}) {
139
+ now := bq .clock .Now ()
140
+ // Wait until the time reaches the multiple of backoffQOrderingWindowDuration.
141
+ durationToNextWindow := bq .alignToWindow (now .Add (backoffQOrderingWindowDuration )).Sub (now )
142
+ timer := bq .clock .NewTimer (durationToNextWindow )
143
+ select {
144
+ case <- stopCh :
145
+ timer .Stop ()
146
+ return
147
+ case <- timer .C ():
148
+ }
149
+
150
+ // Run a ticker to make sure the invocations of f function
151
+ // are aligned with the backoffQ's ordering window.
152
+ ticker := bq .clock .NewTicker (backoffQOrderingWindowDuration )
153
+ for {
154
+ select {
155
+ case <- stopCh :
156
+ return
157
+ default :
158
+ }
159
+
160
+ f ()
161
+
162
+ // NOTE: b/c there is no priority selection in golang
163
+ // it is possible for this to race, meaning we could
164
+ // trigger ticker.C and stopCh, and ticker.C select falls through.
165
+ // In order to mitigate we re-check stopCh at the beginning
166
+ // of every loop to prevent extra executions of f().
167
+ select {
168
+ case <- stopCh :
169
+ ticker .Stop ()
170
+ return
171
+ case <- ticker .C ():
172
+ }
173
+ }
174
+ }
175
+
176
+ // lessBackoffCompletedWithPriority is a less function of podBackoffQ if PopFromBackoffQ feature is enabled.
177
+ // It orders the pods in the same BackoffOrderingWindow the same as the activeQ will do to improve popping order from backoffQ when activeQ is empty.
178
+ func (bq * backoffQueue ) lessBackoffCompletedWithPriority (pInfo1 , pInfo2 * framework.QueuedPodInfo ) bool {
179
+ bo1 := bq .getBackoffTime (pInfo1 )
180
+ bo2 := bq .getBackoffTime (pInfo2 )
181
+ if ! bo1 .Equal (bo2 ) {
182
+ return bo1 .Before (bo2 )
183
+ }
184
+ // If the backoff time is the same, sort the pod in the same manner as activeQ does.
185
+ return bq .activeQLessFn (pInfo1 , pInfo2 )
186
+ }
187
+
188
+ // lessBackoffCompleted is a less function of podErrorBackoffQ.
101
189
func (bq * backoffQueue ) lessBackoffCompleted (pInfo1 , pInfo2 * framework.QueuedPodInfo ) bool {
102
190
bo1 := bq .getBackoffTime (pInfo1 )
103
191
bo2 := bq .getBackoffTime (pInfo2 )
@@ -106,9 +194,11 @@ func (bq *backoffQueue) lessBackoffCompleted(pInfo1, pInfo2 *framework.QueuedPod
106
194
107
195
// isPodBackingoff returns true if a pod is still waiting for its backoff timer.
108
196
// If this returns true, the pod should not be re-tried.
197
+ // If the pod backoff time is in the actual ordering window, it should still be backing off.
109
198
func (bq * backoffQueue ) isPodBackingoff (podInfo * framework.QueuedPodInfo ) bool {
110
199
boTime := bq .getBackoffTime (podInfo )
111
- return boTime .After (bq .clock .Now ())
200
+ // Don't use After, because in case of windows equality we want to return true.
201
+ return ! boTime .Before (bq .alignToWindow (bq .clock .Now ()))
112
202
}
113
203
114
204
// getBackoffTime returns the time that podInfo completes backoff.
@@ -117,9 +207,14 @@ func (bq *backoffQueue) isPodBackingoff(podInfo *framework.QueuedPodInfo) bool {
117
207
// because of the fact that the backoff time is calculated based on podInfo.Attempts,
118
208
// which doesn't get changed until the pod's scheduling is retried.
119
209
func (bq * backoffQueue ) getBackoffTime (podInfo * framework.QueuedPodInfo ) time.Time {
210
+ if podInfo .Attempts == 0 {
211
+ // Don't store backoff expiration if the duration is 0
212
+ // to correctly handle isPodBackingoff, if pod should skip backoff, when it wasn't tried at all.
213
+ return time.Time {}
214
+ }
120
215
if podInfo .BackoffExpiration .IsZero () {
121
216
duration := bq .calculateBackoffDuration (podInfo )
122
- podInfo .BackoffExpiration = podInfo .Timestamp .Add (duration )
217
+ podInfo .BackoffExpiration = bq . alignToWindow ( podInfo .Timestamp .Add (duration ) )
123
218
}
124
219
return podInfo .BackoffExpiration
125
220
}
@@ -238,8 +333,15 @@ func (bq *backoffQueue) has(pInfo *framework.QueuedPodInfo) bool {
238
333
}
239
334
240
335
// list returns all pods that are in the queue.
241
- func (bq * backoffQueue ) list () []* framework.QueuedPodInfo {
242
- return append (bq .podBackoffQ .List (), bq .podErrorBackoffQ .List ()... )
336
+ func (bq * backoffQueue ) list () []* v1.Pod {
337
+ var result []* v1.Pod
338
+ for _ , pInfo := range bq .podBackoffQ .List () {
339
+ result = append (result , pInfo .Pod )
340
+ }
341
+ for _ , pInfo := range bq .podErrorBackoffQ .List () {
342
+ result = append (result , pInfo .Pod )
343
+ }
344
+ return result
243
345
}
244
346
245
347
// len returns length of the queue.
0 commit comments