Skip to content

Commit 746de97

Browse files
authored
Merge pull request #7 from negz/podfilter
Make pod eviction filters configurable
2 parents 2c77d78 + 8fd35d1 commit 746de97

File tree

8 files changed

+377
-172
lines changed

8 files changed

+377
-172
lines changed

README.md

Lines changed: 15 additions & 24 deletions
Original file line numberDiff line numberDiff line change
@@ -27,30 +27,24 @@ usage: draino [<flags>] [<node-conditions>...]
2727
Automatically cordons and drains nodes that match the supplied conditions.
2828
2929
Flags:
30-
--help Show context-sensitive help (also try --help-long
31-
and --help-man).
32-
-d, --debug Run with debug logging.
33-
--listen=":10002" Address at which to expose /metrics and /healthz.
34-
--kubeconfig=KUBECONFIG Path to kubeconfig file. Leave unset to use
35-
in-cluster config.
36-
--master=MASTER Address of Kubernetes API server. Leave unset to
37-
use in-cluster config.
38-
--dry-run Emit an event without cordoning or draining
39-
matching nodes.
40-
--max-grace-period=8m0s Maximum time evicted pods will be given to
41-
terminate gracefully.
42-
--eviction-headroom=30s Additional time to wait after a pod's termination
43-
grace period for it to have been deleted.
44-
--drain-buffer=10m0s Minimum time between starting each drain. Nodes
45-
are always cordoned immediately.
30+
--help Show context-sensitive help (also try --help-long and --help-man).
31+
-d, --debug Run with debug logging.
32+
--listen=":10002" Address at which to expose /metrics and /healthz.
33+
--kubeconfig=KUBECONFIG Path to kubeconfig file. Leave unset to use in-cluster config.
34+
--master=MASTER Address of Kubernetes API server. Leave unset to use in-cluster config.
35+
--dry-run Emit an event without cordoning or draining matching nodes.
36+
--max-grace-period=8m0s Maximum time evicted pods will be given to terminate gracefully.
37+
--eviction-headroom=30s Additional time to wait after a pod's termination grace period for it to have been deleted.
38+
--drain-buffer=10m0s Minimum time between starting each drain. Nodes are always cordoned immediately.
4639
--node-label=KEY=VALUE ...
47-
Only nodes with this label will be eligible for
48-
cordoning and draining. May be specified multiple
49-
times.
40+
Only nodes with this label will be eligible for cordoning and draining. May be specified multiple times.
41+
--evict-daemonset-pods Evict pods that were created by an extant DaemonSet.
42+
--evict-emptydir-pods Evict pods with local storage, i.e. with emptyDir volumes.
43+
--evict-unreplicated-pods Evict pods that were not created by a replication controller.
5044
5145
Args:
52-
[<node-conditions>] Nodes for which any of these conditions are true will be
53-
cordoned and drained.
46+
[<node-conditions>] Nodes for which any of these conditions are true will be cordoned and drained.
47+
5448
```
5549

5650
## Considerations
@@ -59,9 +53,6 @@ Keep the following in mind before deploying Draino:
5953
* Always run Draino in `--dry-run` mode first to ensure it would drain the nodes
6054
you expect it to. In dry run mode Draino will emit logs, metrics, and events
6155
but will not actually cordon or drain nodes.
62-
* Draino will not evict [DaemonSet](https://kubernetes.io/docs/concepts/workloads/controllers/daemonset/)
63-
or [mirror](https://kubernetes.io/docs/tasks/administer-cluster/static-pod/)
64-
pods. It _will_ evict pods with local storage, and unreplicated pods.
6556
* Draino immediately cordons nodes that match its configured labels and node
6657
conditions, but will wait a configurable amount of time (10 minutes by default)
6758
between draining nodes. i.e. If two nodes begin exhibiting a node condition

cmd/draino/draino.go

Lines changed: 14 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -38,6 +38,10 @@ func main() {
3838
drainBuffer = app.Flag("drain-buffer", "Minimum time between starting each drain. Nodes are always cordoned immediately.").Default(kubernetes.DefaultDrainBuffer.String()).Duration()
3939
nodeLabels = app.Flag("node-label", "Only nodes with this label will be eligible for cordoning and draining. May be specified multiple times.").PlaceHolder("KEY=VALUE").StringMap()
4040

41+
evictDaemonSetPods = app.Flag("evict-daemonset-pods", "Evict pods that were created by an extant DaemonSet.").Bool()
42+
evictLocalStoragePods = app.Flag("evict-emptydir-pods", "Evict pods with local storage, i.e. with emptyDir volumes.").Bool()
43+
evictUnreplicatedPods = app.Flag("evict-unreplicated-pods", "Evict pods that were not created by a replication controller.").Bool()
44+
4145
conditions = app.Arg("node-conditions", "Nodes for which any of these conditions are true will be cordoned and drained.").Strings()
4246
)
4347
kingpin.MustParse(app.Parse(os.Args[1:]))
@@ -82,11 +86,20 @@ func main() {
8286
cs, err := client.NewForConfig(c)
8387
kingpin.FatalIfError(err, "cannot create Kubernetes client")
8488

89+
pf := []kubernetes.PodFilterFunc{kubernetes.MirrorPodFilter}
90+
switch {
91+
case !*evictLocalStoragePods:
92+
pf = append(pf, kubernetes.LocalStoragePodFilter)
93+
case !*evictUnreplicatedPods:
94+
pf = append(pf, kubernetes.UnreplicatedPodFilter)
95+
case !*evictDaemonSetPods:
96+
pf = append(pf, kubernetes.NewDaemonSetPodFilter(cs))
97+
}
8598
var h cache.ResourceEventHandler = kubernetes.NewDrainingResourceEventHandler(
8699
kubernetes.NewAPICordonDrainer(cs,
87100
kubernetes.MaxGracePeriod(*maxGracePeriod),
88101
kubernetes.EvictionHeadroom(*evictionHeadroom),
89-
kubernetes.WithPodFilter(kubernetes.NewPodFilters(kubernetes.MirrorPodFilter, kubernetes.NewDaemonSetPodFilter(cs)))),
102+
kubernetes.WithPodFilter(kubernetes.NewPodFilters(pf...))),
90103
kubernetes.NewEventRecorder(cs),
91104
kubernetes.WithLogger(log),
92105
kubernetes.WithDrainBuffer(*drainBuffer))

internal/kubernetes/drainer.go

Lines changed: 8 additions & 51 deletions
Original file line numberDiff line numberDiff line change
@@ -38,54 +38,6 @@ func IsTimeout(err error) bool {
3838
return ok
3939
}
4040

41-
// A FilterFunc returns true if the supplied pod passes the filter.
42-
type FilterFunc func(p core.Pod) (bool, error)
43-
44-
// MirrorPodFilter returns true if the supplied pod is not a mirror pod, i.e. a
45-
// pod created by a manifest on the node rather than the API server.
46-
func MirrorPodFilter(p core.Pod) (bool, error) {
47-
_, mirrorPod := p.GetAnnotations()[core.MirrorPodAnnotationKey]
48-
return !mirrorPod, nil
49-
}
50-
51-
// NewDaemonSetPodFilter returns a FilterFunc that returns true if the supplied
52-
// pod is not managed by an extant DaemonSet.
53-
func NewDaemonSetPodFilter(client kubernetes.Interface) FilterFunc {
54-
return func(p core.Pod) (bool, error) {
55-
c := meta.GetControllerOf(&p)
56-
if c == nil || c.Kind != kindDaemonSet {
57-
return true, nil
58-
}
59-
60-
// Pods pass the filter if they were created by a DaemonSet that no
61-
// longer exists.
62-
if _, err := client.ExtensionsV1beta1().DaemonSets(p.GetNamespace()).Get(c.Name, meta.GetOptions{}); err != nil {
63-
if apierrors.IsNotFound(err) {
64-
return true, nil
65-
}
66-
return false, errors.Wrapf(err, "cannot get DaemonSet %s/%s", p.GetNamespace(), c.Name)
67-
}
68-
return false, nil
69-
}
70-
}
71-
72-
// NewPodFilters returns a FilterFunc that returns true if all of the supplied
73-
// FilterFuncs return true.
74-
func NewPodFilters(filters ...FilterFunc) FilterFunc {
75-
return func(p core.Pod) (bool, error) {
76-
for _, fn := range filters {
77-
passes, err := fn(p)
78-
if err != nil {
79-
return false, errors.Wrap(err, "cannot apply filters")
80-
}
81-
if !passes {
82-
return false, nil
83-
}
84-
}
85-
return true, nil
86-
}
87-
}
88-
8941
// A Cordoner cordons nodes.
9042
type Cordoner interface {
9143
// Cordon the supplied node. Marks it unschedulable for new pods.
@@ -117,7 +69,7 @@ func (d *NoopCordonDrainer) Drain(n *core.Node) error { return nil }
11769
type APICordonDrainer struct {
11870
c kubernetes.Interface
11971

120-
filter FilterFunc
72+
filter PodFilterFunc
12173

12274
maxGracePeriod time.Duration
12375
evictionHeadroom time.Duration
@@ -145,7 +97,7 @@ func EvictionHeadroom(h time.Duration) APICordonDrainerOption {
14597

14698
// WithPodFilter configures a filter that may be used to exclude certain pods
14799
// from eviction when draining.
148-
func WithPodFilter(f FilterFunc) APICordonDrainerOption {
100+
func WithPodFilter(f PodFilterFunc) APICordonDrainerOption {
149101
return func(d *APICordonDrainer) {
150102
d.filter = f
151103
}
@@ -154,7 +106,12 @@ func WithPodFilter(f FilterFunc) APICordonDrainerOption {
154106
// NewAPICordonDrainer returns a CordonDrainer that cordons and drains nodes via
155107
// the Kubernetes API.
156108
func NewAPICordonDrainer(c kubernetes.Interface, ao ...APICordonDrainerOption) *APICordonDrainer {
157-
d := &APICordonDrainer{c: c, maxGracePeriod: DefaultMaxGracePeriod, evictionHeadroom: DefaultEvictionOverhead}
109+
d := &APICordonDrainer{
110+
c: c,
111+
filter: NewPodFilters(),
112+
maxGracePeriod: DefaultMaxGracePeriod,
113+
evictionHeadroom: DefaultEvictionOverhead,
114+
}
158115
for _, o := range ao {
159116
o(d)
160117
}

internal/kubernetes/drainer_test.go

Lines changed: 43 additions & 96 deletions
Original file line numberDiff line numberDiff line change
@@ -10,19 +10,30 @@ import (
1010
meta "k8s.io/apimachinery/pkg/apis/meta/v1"
1111
"k8s.io/apimachinery/pkg/runtime"
1212
"k8s.io/apimachinery/pkg/runtime/schema"
13+
"k8s.io/client-go/kubernetes"
1314
"k8s.io/client-go/kubernetes/fake"
1415
clienttesting "k8s.io/client-go/testing"
1516
)
1617

1718
const (
18-
ns = "coolNamespace"
19-
nodeName = "coolNode"
20-
podName = "coolPod"
21-
daemonsetName = "coolDaemonSet"
19+
ns = "coolNamespace"
20+
21+
nodeName = "coolNode"
22+
podName = "coolPod"
23+
24+
daemonsetName = "coolDaemonSet"
25+
deploymentName = "coolDeployment"
26+
kindDeployment = "Deployment"
27+
)
28+
29+
var (
30+
_ CordonDrainer = (*APICordonDrainer)(nil)
31+
_ CordonDrainer = (*NoopCordonDrainer)(nil)
2232
)
2333

2434
var podGracePeriodSeconds int64 = 10
2535
var isController = true
36+
var errExploded = errors.New("kaboom")
2637

2738
type reactor struct {
2839
verb string
@@ -41,6 +52,14 @@ func (r reactor) Fn() clienttesting.ReactionFunc {
4152
}
4253
}
4354

55+
func newFakeClientSet(rs ...reactor) kubernetes.Interface {
56+
cs := &fake.Clientset{}
57+
for _, r := range rs {
58+
cs.AddReactor(r.verb, r.resource, r.Fn())
59+
}
60+
return cs
61+
}
62+
4463
func TestCordon(t *testing.T) {
4564
cases := []struct {
4665
name string
@@ -274,18 +293,22 @@ func TestDrain(t *testing.T) {
274293
},
275294
},
276295
{
277-
name: "FilterMirrorPod",
296+
name: "PodDoesNotPassFilter",
278297
node: &core.Node{ObjectMeta: meta.ObjectMeta{Name: nodeName}},
298+
options: []APICordonDrainerOption{WithPodFilter(func(p core.Pod) (bool, error) {
299+
if p.GetName() == "lamePod" {
300+
// This pod does not pass the filter.
301+
return false, nil
302+
}
303+
return true, nil
304+
})},
279305
reactions: []reactor{
280306
reactor{
281307
verb: "list",
282308
resource: "pods",
283309
ret: &core.PodList{Items: []core.Pod{
310+
core.Pod{ObjectMeta: meta.ObjectMeta{Name: "lamePod"}},
284311
core.Pod{ObjectMeta: meta.ObjectMeta{Name: podName}},
285-
core.Pod{ObjectMeta: meta.ObjectMeta{
286-
Name: "mirrorPod",
287-
Annotations: map[string]string{core.MirrorPodAnnotationKey: "true"},
288-
}},
289312
}},
290313
},
291314
reactor{
@@ -301,23 +324,20 @@ func TestDrain(t *testing.T) {
301324
},
302325
},
303326
{
304-
name: "FilterDaemonSetPod",
327+
name: "PodFilterErrors",
305328
node: &core.Node{ObjectMeta: meta.ObjectMeta{Name: nodeName}},
329+
options: []APICordonDrainerOption{WithPodFilter(func(p core.Pod) (bool, error) {
330+
if p.GetName() == "explodeyPod" {
331+
return false, errExploded
332+
}
333+
return true, nil
334+
})},
306335
reactions: []reactor{
307336
reactor{
308337
verb: "list",
309338
resource: "pods",
310339
ret: &core.PodList{Items: []core.Pod{
311-
core.Pod{
312-
ObjectMeta: meta.ObjectMeta{
313-
Name: "daemonsetPod",
314-
OwnerReferences: []meta.OwnerReference{meta.OwnerReference{
315-
Controller: &isController,
316-
Kind: kindDaemonSet,
317-
}},
318-
},
319-
Spec: core.PodSpec{TerminationGracePeriodSeconds: &podGracePeriodSeconds},
320-
},
340+
core.Pod{ObjectMeta: meta.ObjectMeta{Name: "explodeyPod"}},
321341
core.Pod{ObjectMeta: meta.ObjectMeta{Name: podName}},
322342
}},
323343
},
@@ -332,74 +352,7 @@ func TestDrain(t *testing.T) {
332352
err: apierrors.NewNotFound(schema.GroupResource{Resource: "pods"}, podName),
333353
},
334354
},
335-
},
336-
{
337-
name: "EvictOrphanedDaemonSetPod",
338-
node: &core.Node{ObjectMeta: meta.ObjectMeta{Name: nodeName}},
339-
reactions: []reactor{
340-
reactor{
341-
verb: "list",
342-
resource: "pods",
343-
ret: &core.PodList{Items: []core.Pod{
344-
core.Pod{
345-
ObjectMeta: meta.ObjectMeta{
346-
Name: "orphanedDaemonsetPod",
347-
Namespace: ns,
348-
OwnerReferences: []meta.OwnerReference{meta.OwnerReference{
349-
Controller: &isController,
350-
Kind: kindDaemonSet,
351-
Name: daemonsetName,
352-
}},
353-
},
354-
Spec: core.PodSpec{TerminationGracePeriodSeconds: &podGracePeriodSeconds},
355-
},
356-
}},
357-
},
358-
reactor{
359-
verb: "get",
360-
resource: "daemonsets",
361-
err: apierrors.NewNotFound(schema.GroupResource{Resource: "daemonsets"}, daemonsetName),
362-
},
363-
reactor{
364-
verb: "create",
365-
resource: "pods",
366-
subresource: "eviction",
367-
},
368-
reactor{
369-
verb: "get",
370-
resource: "pods",
371-
err: apierrors.NewNotFound(schema.GroupResource{Resource: "pods"}, podName),
372-
},
373-
},
374-
},
375-
{
376-
name: "ErrorGettingDaemonset",
377-
node: &core.Node{ObjectMeta: meta.ObjectMeta{Name: nodeName}},
378-
reactions: []reactor{
379-
reactor{
380-
verb: "list",
381-
resource: "pods",
382-
ret: &core.PodList{Items: []core.Pod{
383-
core.Pod{
384-
ObjectMeta: meta.ObjectMeta{
385-
Name: "orphanedDaemonsetPod",
386-
Namespace: ns,
387-
OwnerReferences: []meta.OwnerReference{meta.OwnerReference{
388-
Controller: &isController,
389-
Kind: kindDaemonSet,
390-
Name: daemonsetName,
391-
}},
392-
},
393-
Spec: core.PodSpec{TerminationGracePeriodSeconds: &podGracePeriodSeconds},
394-
},
395-
}},
396-
},
397-
reactor{
398-
verb: "get",
399-
resource: "daemonsets",
400-
err: errors.New("nope"),
401-
},
402-
},
355+
errFn: func(err error) bool { return errors.Cause(err) == errExploded },
403356
},
404357
{
405358
name: "ErrorListingPods",
@@ -416,14 +369,8 @@ func TestDrain(t *testing.T) {
416369

417370
for _, tc := range cases {
418371
t.Run(tc.name, func(t *testing.T) {
419-
c := &fake.Clientset{}
420-
for _, r := range tc.reactions {
421-
c.AddReactor(r.verb, r.resource, r.Fn())
422-
}
423-
424-
o := tc.options
425-
o = append(o, WithPodFilter(NewPodFilters(MirrorPodFilter, NewDaemonSetPodFilter(c))))
426-
d := NewAPICordonDrainer(c, o...)
372+
c := newFakeClientSet(tc.reactions...)
373+
d := NewAPICordonDrainer(c, tc.options...)
427374
if err := d.Drain(tc.node); err != nil {
428375
for _, r := range tc.reactions {
429376
if errors.Cause(err) == r.err {

0 commit comments

Comments
 (0)