@@ -18,9 +18,11 @@ package v1alpha1
18
18
19
19
import (
20
20
"fmt"
21
+ "time"
21
22
22
23
"k8s.io/api/core/v1"
23
24
"k8s.io/apimachinery/pkg/runtime"
25
+ "k8s.io/apimachinery/pkg/types"
24
26
"k8s.io/klog"
25
27
"k8s.io/kubernetes/pkg/scheduler/internal/cache"
26
28
)
@@ -30,12 +32,19 @@ import (
30
32
type framework struct {
31
33
registry Registry
32
34
nodeInfoSnapshot * cache.NodeInfoSnapshot
35
+ waitingPods * waitingPodsMap
33
36
plugins map [string ]Plugin // a map of initialized plugins. Plugin name:plugin instance.
34
37
reservePlugins []ReservePlugin
35
38
prebindPlugins []PrebindPlugin
36
39
unreservePlugins []UnreservePlugin
40
+ permitPlugins []PermitPlugin
37
41
}
38
42
43
+ const (
44
+ // Specifies the maximum timeout a permit plugin can return.
45
+ maxTimeout time.Duration = 15 * time .Minute
46
+ )
47
+
39
48
var _ = Framework (& framework {})
40
49
41
50
// NewFramework initializes plugins given the configuration and the registry.
@@ -44,6 +53,7 @@ func NewFramework(r Registry, _ *runtime.Unknown) (Framework, error) {
44
53
registry : r ,
45
54
nodeInfoSnapshot : cache .NewNodeInfoSnapshot (),
46
55
plugins : make (map [string ]Plugin ),
56
+ waitingPods : newWaitingPodsMap (),
47
57
}
48
58
49
59
// TODO: The framework needs to read the scheduler config and initialize only
@@ -68,6 +78,9 @@ func NewFramework(r Registry, _ *runtime.Unknown) (Framework, error) {
68
78
if up , ok := p .(UnreservePlugin ); ok {
69
79
f .unreservePlugins = append (f .unreservePlugins , up )
70
80
}
81
+ if pr , ok := p .(PermitPlugin ); ok {
82
+ f .permitPlugins = append (f .permitPlugins , pr )
83
+ }
71
84
}
72
85
return f , nil
73
86
}
@@ -117,10 +130,83 @@ func (f *framework) RunUnreservePlugins(
117
130
}
118
131
}
119
132
133
+ // RunPermitPlugins runs the set of configured permit plugins. If any of these
134
+ // plugins returns a status other than "Success" or "Wait", it does not continue
135
+ // running the remaining plugins and returns an error. Otherwise, if any of the
136
+ // plugins returns "Wait", then this function will block for the timeout period
137
+ // returned by the plugin, if the time expires, then it will return an error.
138
+ // Note that if multiple plugins asked to wait, then we wait for the minimum
139
+ // timeout duration.
140
+ func (f * framework ) RunPermitPlugins (
141
+ pc * PluginContext , pod * v1.Pod , nodeName string ) * Status {
142
+ timeout := maxTimeout
143
+ statusCode := Success
144
+ for _ , pl := range f .permitPlugins {
145
+ status , d := pl .Permit (pc , pod , nodeName )
146
+ if ! status .IsSuccess () {
147
+ if status .Code () == Unschedulable {
148
+ msg := fmt .Sprintf ("rejected by %v at permit: %v" , pl .Name (), status .Message ())
149
+ klog .V (4 ).Infof (msg )
150
+ return NewStatus (status .Code (), msg )
151
+ }
152
+ if status .Code () == Wait {
153
+ // Use the minimum timeout duration.
154
+ if timeout > d {
155
+ timeout = d
156
+ }
157
+ statusCode = Wait
158
+ } else {
159
+ msg := fmt .Sprintf ("error while running %v permit plugin for pod %v: %v" , pl .Name (), pod .Name , status .Message ())
160
+ klog .Error (msg )
161
+ return NewStatus (Error , msg )
162
+ }
163
+ }
164
+ }
165
+
166
+ // We now wait for the minimum duration if at least one plugin asked to
167
+ // wait (and no plugin rejected the pod)
168
+ if statusCode == Wait {
169
+ w := newWaitingPod (pod )
170
+ f .waitingPods .add (w )
171
+ defer f .waitingPods .remove (pod .UID )
172
+ timer := time .NewTimer (timeout )
173
+ klog .V (4 ).Infof ("waiting for %v for pod %v at permit" , timeout , pod .Name )
174
+ select {
175
+ case <- timer .C :
176
+ msg := fmt .Sprintf ("pod %v rejected due to timeout after waiting %v at permit" , pod .Name , timeout )
177
+ klog .V (4 ).Infof (msg )
178
+ return NewStatus (Unschedulable , msg )
179
+ case s := <- w .s :
180
+ if ! s .IsSuccess () {
181
+ if s .Code () == Unschedulable {
182
+ msg := fmt .Sprintf ("rejected while waiting at permit: %v" , s .Message ())
183
+ klog .V (4 ).Infof (msg )
184
+ return NewStatus (s .Code (), msg )
185
+ }
186
+ msg := fmt .Sprintf ("error received while waiting at permit for pod %v: %v" , pod .Name , s .Message ())
187
+ klog .Error (msg )
188
+ return NewStatus (Error , msg )
189
+ }
190
+ }
191
+ }
192
+
193
+ return nil
194
+ }
195
+
120
196
// NodeInfoSnapshot returns the latest NodeInfo snapshot. The snapshot
121
197
// is taken at the beginning of a scheduling cycle and remains unchanged until a
122
198
// pod finishes "Reserve". There is no guarantee that the information remains
123
199
// unchanged after "Reserve".
124
200
func (f * framework ) NodeInfoSnapshot () * cache.NodeInfoSnapshot {
125
201
return f .nodeInfoSnapshot
126
202
}
203
+
204
+ // IterateOverWaitingPods acquires a read lock and iterates over the WaitingPods map.
205
+ func (f * framework ) IterateOverWaitingPods (callback func (WaitingPod )) {
206
+ f .waitingPods .iterate (callback )
207
+ }
208
+
209
+ // GetWaitingPod returns a reference to a WaitingPod given its UID.
210
+ func (f * framework ) GetWaitingPod (uid types.UID ) WaitingPod {
211
+ return f .waitingPods .get (uid )
212
+ }
0 commit comments