Skip to content

Commit c74f2f6

Browse files
committed
Make drain library more reusable
Move more functionality from the kubectl cmd to a package with fewer dependencies.
1 parent 0d579bf commit c74f2f6

File tree

7 files changed

+389
-283
lines changed

7 files changed

+389
-283
lines changed

staging/src/k8s.io/kubectl/pkg/cmd/drain/BUILD

Lines changed: 0 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -8,13 +8,10 @@ go_library(
88
visibility = ["//visibility:public"],
99
deps = [
1010
"//staging/src/k8s.io/api/core/v1:go_default_library",
11-
"//staging/src/k8s.io/apimachinery/pkg/api/errors:go_default_library",
12-
"//staging/src/k8s.io/apimachinery/pkg/apis/meta/v1:go_default_library",
1311
"//staging/src/k8s.io/apimachinery/pkg/labels:go_default_library",
1412
"//staging/src/k8s.io/apimachinery/pkg/runtime/schema:go_default_library",
1513
"//staging/src/k8s.io/apimachinery/pkg/util/errors:go_default_library",
1614
"//staging/src/k8s.io/apimachinery/pkg/util/sets:go_default_library",
17-
"//staging/src/k8s.io/apimachinery/pkg/util/wait:go_default_library",
1815
"//staging/src/k8s.io/cli-runtime/pkg/genericclioptions:go_default_library",
1916
"//staging/src/k8s.io/cli-runtime/pkg/printers:go_default_library",
2017
"//staging/src/k8s.io/cli-runtime/pkg/resource:go_default_library",
@@ -36,15 +33,11 @@ go_test(
3633
"//staging/src/k8s.io/api/batch/v1:go_default_library",
3734
"//staging/src/k8s.io/api/core/v1:go_default_library",
3835
"//staging/src/k8s.io/api/policy/v1beta1:go_default_library",
39-
"//staging/src/k8s.io/apimachinery/pkg/api/errors:go_default_library",
4036
"//staging/src/k8s.io/apimachinery/pkg/apis/meta/v1:go_default_library",
4137
"//staging/src/k8s.io/apimachinery/pkg/runtime:go_default_library",
4238
"//staging/src/k8s.io/apimachinery/pkg/runtime/schema:go_default_library",
43-
"//staging/src/k8s.io/apimachinery/pkg/types:go_default_library",
4439
"//staging/src/k8s.io/apimachinery/pkg/util/strategicpatch:go_default_library",
45-
"//staging/src/k8s.io/apimachinery/pkg/util/wait:go_default_library",
4640
"//staging/src/k8s.io/cli-runtime/pkg/genericclioptions:go_default_library",
47-
"//staging/src/k8s.io/cli-runtime/pkg/printers:go_default_library",
4841
"//staging/src/k8s.io/client-go/rest/fake:go_default_library",
4942
"//staging/src/k8s.io/kubectl/pkg/cmd/testing:go_default_library",
5043
"//staging/src/k8s.io/kubectl/pkg/cmd/util:go_default_library",

staging/src/k8s.io/kubectl/pkg/cmd/drain/drain.go

Lines changed: 22 additions & 137 deletions
Original file line numberDiff line numberDiff line change
@@ -19,19 +19,14 @@ package drain
1919
import (
2020
"errors"
2121
"fmt"
22-
"math"
23-
"time"
2422

2523
"github.com/spf13/cobra"
2624

2725
corev1 "k8s.io/api/core/v1"
28-
apierrors "k8s.io/apimachinery/pkg/api/errors"
29-
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
3026
"k8s.io/apimachinery/pkg/labels"
3127
"k8s.io/apimachinery/pkg/runtime/schema"
3228
utilerrors "k8s.io/apimachinery/pkg/util/errors"
3329
"k8s.io/apimachinery/pkg/util/sets"
34-
"k8s.io/apimachinery/pkg/util/wait"
3530

3631
"k8s.io/cli-runtime/pkg/genericclioptions"
3732
"k8s.io/cli-runtime/pkg/printers"
@@ -146,14 +141,34 @@ var (
146141
)
147142

148143
func NewDrainCmdOptions(f cmdutil.Factory, ioStreams genericclioptions.IOStreams) *DrainCmdOptions {
149-
return &DrainCmdOptions{
144+
o := &DrainCmdOptions{
150145
PrintFlags: genericclioptions.NewPrintFlags("drained").WithTypeSetter(scheme.Scheme),
151146
IOStreams: ioStreams,
152147
drainer: &drain.Helper{
153148
GracePeriodSeconds: -1,
149+
Out: ioStreams.Out,
154150
ErrOut: ioStreams.ErrOut,
155151
},
156152
}
153+
o.drainer.OnPodDeletedOrEvicted = o.onPodDeletedOrEvicted
154+
return o
155+
}
156+
157+
// onPodDeletedOrEvicted is called by drain.Helper, when the pod has been deleted or evicted
158+
func (o *DrainCmdOptions) onPodDeletedOrEvicted(pod *corev1.Pod, usingEviction bool) {
159+
var verbStr string
160+
if usingEviction {
161+
verbStr = "evicted"
162+
} else {
163+
verbStr = "deleted"
164+
}
165+
printObj, err := o.ToPrinter(verbStr)
166+
if err != nil {
167+
fmt.Fprintf(o.ErrOut, "error building printer: %v\n", err)
168+
fmt.Fprintf(o.Out, "pod %s/%s %s\n", pod.Namespace, pod.Name, verbStr)
169+
} else {
170+
printObj(pod, o.Out)
171+
}
157172
}
158173

159174
func NewCmdDrain(f cmdutil.Factory, ioStreams genericclioptions.IOStreams) *cobra.Command {
@@ -313,7 +328,7 @@ func (o *DrainCmdOptions) deleteOrEvictPodsSimple(nodeInfo *resource.Info) error
313328
fmt.Fprintf(o.ErrOut, "WARNING: %s\n", warnings)
314329
}
315330

316-
if err := o.deleteOrEvictPods(list.Pods()); err != nil {
331+
if err := o.drainer.DeleteOrEvictPods(list.Pods()); err != nil {
317332
pendingList, newErrs := o.drainer.GetPodsForDeletion(nodeInfo.Name)
318333

319334
fmt.Fprintf(o.ErrOut, "There are pending pods in node %q when an error occurred: %v\n", nodeInfo.Name, err)
@@ -328,136 +343,6 @@ func (o *DrainCmdOptions) deleteOrEvictPodsSimple(nodeInfo *resource.Info) error
328343
return nil
329344
}
330345

331-
// deleteOrEvictPods deletes or evicts the pods on the api server
332-
func (o *DrainCmdOptions) deleteOrEvictPods(pods []corev1.Pod) error {
333-
if len(pods) == 0 {
334-
return nil
335-
}
336-
337-
policyGroupVersion, err := drain.CheckEvictionSupport(o.drainer.Client)
338-
if err != nil {
339-
return err
340-
}
341-
342-
getPodFn := func(namespace, name string) (*corev1.Pod, error) {
343-
return o.drainer.Client.CoreV1().Pods(namespace).Get(name, metav1.GetOptions{})
344-
}
345-
346-
if len(policyGroupVersion) > 0 {
347-
return o.evictPods(pods, policyGroupVersion, getPodFn)
348-
} else {
349-
return o.deletePods(pods, getPodFn)
350-
}
351-
}
352-
353-
func (o *DrainCmdOptions) evictPods(pods []corev1.Pod, policyGroupVersion string, getPodFn func(namespace, name string) (*corev1.Pod, error)) error {
354-
returnCh := make(chan error, 1)
355-
356-
for _, pod := range pods {
357-
go func(pod corev1.Pod, returnCh chan error) {
358-
for {
359-
fmt.Fprintf(o.Out, "evicting pod %q\n", pod.Name)
360-
err := o.drainer.EvictPod(pod, policyGroupVersion)
361-
if err == nil {
362-
break
363-
} else if apierrors.IsNotFound(err) {
364-
returnCh <- nil
365-
return
366-
} else if apierrors.IsTooManyRequests(err) {
367-
fmt.Fprintf(o.ErrOut, "error when evicting pod %q (will retry after 5s): %v\n", pod.Name, err)
368-
time.Sleep(5 * time.Second)
369-
} else {
370-
returnCh <- fmt.Errorf("error when evicting pod %q: %v", pod.Name, err)
371-
return
372-
}
373-
}
374-
_, err := o.waitForDelete([]corev1.Pod{pod}, 1*time.Second, time.Duration(math.MaxInt64), true, getPodFn)
375-
if err == nil {
376-
returnCh <- nil
377-
} else {
378-
returnCh <- fmt.Errorf("error when waiting for pod %q terminating: %v", pod.Name, err)
379-
}
380-
}(pod, returnCh)
381-
}
382-
383-
doneCount := 0
384-
var errors []error
385-
386-
// 0 timeout means infinite, we use MaxInt64 to represent it.
387-
var globalTimeout time.Duration
388-
if o.drainer.Timeout == 0 {
389-
globalTimeout = time.Duration(math.MaxInt64)
390-
} else {
391-
globalTimeout = o.drainer.Timeout
392-
}
393-
globalTimeoutCh := time.After(globalTimeout)
394-
numPods := len(pods)
395-
for doneCount < numPods {
396-
select {
397-
case err := <-returnCh:
398-
doneCount++
399-
if err != nil {
400-
errors = append(errors, err)
401-
}
402-
case <-globalTimeoutCh:
403-
return fmt.Errorf("drain did not complete within %v", globalTimeout)
404-
}
405-
}
406-
return utilerrors.NewAggregate(errors)
407-
}
408-
409-
func (o *DrainCmdOptions) deletePods(pods []corev1.Pod, getPodFn func(namespace, name string) (*corev1.Pod, error)) error {
410-
// 0 timeout means infinite, we use MaxInt64 to represent it.
411-
var globalTimeout time.Duration
412-
if o.drainer.Timeout == 0 {
413-
globalTimeout = time.Duration(math.MaxInt64)
414-
} else {
415-
globalTimeout = o.drainer.Timeout
416-
}
417-
for _, pod := range pods {
418-
err := o.drainer.DeletePod(pod)
419-
if err != nil && !apierrors.IsNotFound(err) {
420-
return err
421-
}
422-
}
423-
_, err := o.waitForDelete(pods, 1*time.Second, globalTimeout, false, getPodFn)
424-
return err
425-
}
426-
427-
func (o *DrainCmdOptions) waitForDelete(pods []corev1.Pod, interval, timeout time.Duration, usingEviction bool, getPodFn func(string, string) (*corev1.Pod, error)) ([]corev1.Pod, error) {
428-
var verbStr string
429-
if usingEviction {
430-
verbStr = "evicted"
431-
} else {
432-
verbStr = "deleted"
433-
}
434-
printObj, err := o.ToPrinter(verbStr)
435-
if err != nil {
436-
return pods, err
437-
}
438-
439-
err = wait.PollImmediate(interval, timeout, func() (bool, error) {
440-
pendingPods := []corev1.Pod{}
441-
for i, pod := range pods {
442-
p, err := getPodFn(pod.Namespace, pod.Name)
443-
if apierrors.IsNotFound(err) || (p != nil && p.ObjectMeta.UID != pod.ObjectMeta.UID) {
444-
printObj(&pod, o.Out)
445-
continue
446-
} else if err != nil {
447-
return false, err
448-
} else {
449-
pendingPods = append(pendingPods, pods[i])
450-
}
451-
}
452-
pods = pendingPods
453-
if len(pendingPods) > 0 {
454-
return false, nil
455-
}
456-
return true, nil
457-
})
458-
return pods, err
459-
}
460-
461346
// RunCordonOrUncordon runs either Cordon or Uncordon. The desired value for
462347
// "Unschedulable" is passed as the first arg.
463348
func (o *DrainCmdOptions) RunCordonOrUncordon(desired bool) error {

staging/src/k8s.io/kubectl/pkg/cmd/drain/drain_test.go

Lines changed: 0 additions & 137 deletions
Original file line numberDiff line numberDiff line change
@@ -17,15 +17,11 @@ limitations under the License.
1717
package drain
1818

1919
import (
20-
"errors"
21-
"fmt"
22-
"io"
2320
"io/ioutil"
2421
"net/http"
2522
"net/url"
2623
"os"
2724
"reflect"
28-
"strconv"
2925
"strings"
3026
"sync/atomic"
3127
"testing"
@@ -38,14 +34,10 @@ import (
3834
batchv1 "k8s.io/api/batch/v1"
3935
corev1 "k8s.io/api/core/v1"
4036
policyv1beta1 "k8s.io/api/policy/v1beta1"
41-
apierrors "k8s.io/apimachinery/pkg/api/errors"
4237
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
4338
"k8s.io/apimachinery/pkg/runtime"
4439
"k8s.io/apimachinery/pkg/runtime/schema"
45-
"k8s.io/apimachinery/pkg/types"
4640
"k8s.io/apimachinery/pkg/util/strategicpatch"
47-
"k8s.io/apimachinery/pkg/util/wait"
48-
"k8s.io/cli-runtime/pkg/printers"
4941
"k8s.io/client-go/rest/fake"
5042
cmdtesting "k8s.io/kubectl/pkg/cmd/testing"
5143
cmdutil "k8s.io/kubectl/pkg/cmd/util"
@@ -907,135 +899,6 @@ func TestDrain(t *testing.T) {
907899
}
908900
}
909901

910-
func TestDeletePods(t *testing.T) {
911-
ifHasBeenCalled := map[string]bool{}
912-
tests := []struct {
913-
description string
914-
interval time.Duration
915-
timeout time.Duration
916-
expectPendingPods bool
917-
expectError bool
918-
expectedError *error
919-
getPodFn func(namespace, name string) (*corev1.Pod, error)
920-
}{
921-
{
922-
description: "Wait for deleting to complete",
923-
interval: 100 * time.Millisecond,
924-
timeout: 10 * time.Second,
925-
expectPendingPods: false,
926-
expectError: false,
927-
expectedError: nil,
928-
getPodFn: func(namespace, name string) (*corev1.Pod, error) {
929-
oldPodMap, _ := createPods(false)
930-
newPodMap, _ := createPods(true)
931-
if oldPod, found := oldPodMap[name]; found {
932-
if _, ok := ifHasBeenCalled[name]; !ok {
933-
ifHasBeenCalled[name] = true
934-
return &oldPod, nil
935-
}
936-
if oldPod.ObjectMeta.Generation < 4 {
937-
newPod := newPodMap[name]
938-
return &newPod, nil
939-
}
940-
return nil, apierrors.NewNotFound(schema.GroupResource{Resource: "pods"}, name)
941-
942-
}
943-
return nil, apierrors.NewNotFound(schema.GroupResource{Resource: "pods"}, name)
944-
},
945-
},
946-
{
947-
description: "Deleting could timeout",
948-
interval: 200 * time.Millisecond,
949-
timeout: 3 * time.Second,
950-
expectPendingPods: true,
951-
expectError: true,
952-
expectedError: &wait.ErrWaitTimeout,
953-
getPodFn: func(namespace, name string) (*corev1.Pod, error) {
954-
oldPodMap, _ := createPods(false)
955-
if oldPod, found := oldPodMap[name]; found {
956-
return &oldPod, nil
957-
}
958-
return nil, fmt.Errorf("%q: not found", name)
959-
},
960-
},
961-
{
962-
description: "Client error could be passed out",
963-
interval: 200 * time.Millisecond,
964-
timeout: 5 * time.Second,
965-
expectPendingPods: true,
966-
expectError: true,
967-
expectedError: nil,
968-
getPodFn: func(namespace, name string) (*corev1.Pod, error) {
969-
return nil, errors.New("This is a random error for testing")
970-
},
971-
},
972-
}
973-
974-
for _, test := range tests {
975-
t.Run(test.description, func(t *testing.T) {
976-
tf := cmdtesting.NewTestFactory()
977-
defer tf.Cleanup()
978-
979-
o := DrainCmdOptions{
980-
PrintFlags: genericclioptions.NewPrintFlags("drained").WithTypeSetter(scheme.Scheme),
981-
}
982-
o.Out = os.Stdout
983-
984-
o.ToPrinter = func(operation string) (printers.ResourcePrinterFunc, error) {
985-
return func(obj runtime.Object, out io.Writer) error {
986-
return nil
987-
}, nil
988-
}
989-
990-
_, pods := createPods(false)
991-
pendingPods, err := o.waitForDelete(pods, test.interval, test.timeout, false, test.getPodFn)
992-
993-
if test.expectError {
994-
if err == nil {
995-
t.Fatalf("%s: unexpected non-error", test.description)
996-
} else if test.expectedError != nil {
997-
if *test.expectedError != err {
998-
t.Fatalf("%s: the error does not match expected error", test.description)
999-
}
1000-
}
1001-
}
1002-
if !test.expectError && err != nil {
1003-
t.Fatalf("%s: unexpected error", test.description)
1004-
}
1005-
if test.expectPendingPods && len(pendingPods) == 0 {
1006-
t.Fatalf("%s: unexpected empty pods", test.description)
1007-
}
1008-
if !test.expectPendingPods && len(pendingPods) > 0 {
1009-
t.Fatalf("%s: unexpected pending pods", test.description)
1010-
}
1011-
})
1012-
}
1013-
}
1014-
1015-
func createPods(ifCreateNewPods bool) (map[string]corev1.Pod, []corev1.Pod) {
1016-
podMap := make(map[string]corev1.Pod)
1017-
podSlice := []corev1.Pod{}
1018-
for i := 0; i < 8; i++ {
1019-
var uid types.UID
1020-
if ifCreateNewPods {
1021-
uid = types.UID(i)
1022-
} else {
1023-
uid = types.UID(strconv.Itoa(i) + strconv.Itoa(i))
1024-
}
1025-
pod := corev1.Pod{
1026-
ObjectMeta: metav1.ObjectMeta{
1027-
Name: "pod" + strconv.Itoa(i),
1028-
Namespace: "default",
1029-
UID: uid,
1030-
Generation: int64(i),
1031-
},
1032-
}
1033-
podMap[pod.Name] = pod
1034-
podSlice = append(podSlice, pod)
1035-
}
1036-
return podMap, podSlice
1037-
}
1038-
1039902
type MyReq struct {
1040903
Request *http.Request
1041904
}

0 commit comments

Comments
 (0)