Skip to content
This repository was archived by the owner on Jul 30, 2021. It is now read-only.

Commit 645d99d

Browse files
author
Yifan Gu
committed
e2e: Add checkpointer_test.go
1 parent 3ce7d09 commit 645d99d

File tree

1 file changed

+361
-0
lines changed

1 file changed

+361
-0
lines changed

e2e/checkpointer_test.go

Lines changed: 361 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,361 @@
1+
package e2e
2+
3+
import (
4+
"bytes"
5+
"fmt"
6+
"strings"
7+
"testing"
8+
"time"
9+
10+
"k8s.io/apimachinery/pkg/types"
11+
"k8s.io/client-go/pkg/api"
12+
"k8s.io/client-go/pkg/api/v1"
13+
"k8s.io/client-go/pkg/apis/extensions/v1beta1"
14+
)
15+
16+
var nginxDS = []byte(`apiVersion: extensions/v1beta1
17+
kind: DaemonSet
18+
metadata:
19+
name: nginx-daemonset
20+
spec:
21+
template:
22+
metadata:
23+
labels:
24+
app: nginx
25+
annotations:
26+
checkpointer.alpha.coreos.com/checkpoint: "true"
27+
spec:
28+
hostNetwork: true
29+
containers:
30+
- name: nginx
31+
image: nginx`)
32+
33+
func verifyCheckpoint(c *Cluster, namespace, daemonsetName string, shouldExist, shouldBeActive bool) error {
34+
checkpointed := func() error {
35+
dirs := []string{
36+
"/etc/kubernetes/inactive-manifests/",
37+
"/etc/kubernetes/checkpoint-secrets/" + namespace,
38+
// TODO(yifan): Add configmap.
39+
}
40+
41+
if shouldBeActive {
42+
dirs = append(dirs, "/etc/kubernetes/manifests")
43+
}
44+
45+
for _, dir := range dirs {
46+
stdout, stderr, err := c.Workers[0].SSH("sudo ls " + dir)
47+
if err != nil {
48+
return fmt.Errorf("unable to ls %q, error: %v\nstdout: %s\nstderr: %s", dir, err, stdout, stderr)
49+
}
50+
51+
if shouldExist && !bytes.Contains(stdout, []byte(daemonsetName)) {
52+
return fmt.Errorf("unable to find checkpoint %q in %q: error: %v, output: %q", daemonsetName, dir, err, stdout)
53+
}
54+
if !shouldExist && bytes.Contains(stdout, []byte(daemonsetName)) {
55+
return fmt.Errorf("should not find checkpoint %q in %q, error: %v, output: %q", daemonsetName, dir, err, stdout)
56+
}
57+
}
58+
59+
// Check active checkpoints.
60+
dir := "/etc/kubernetes/manifests"
61+
stdout, stderr, err := c.Workers[0].SSH("sudo ls " + dir)
62+
if err != nil {
63+
return fmt.Errorf("unable to ls %q, error: %v\nstdout: %s\nstderr: %s", dir, err, stdout, stderr)
64+
}
65+
if shouldBeActive && !bytes.Contains(stdout, []byte(daemonsetName)) {
66+
return fmt.Errorf("unable to find checkpoint %q in %q: error: %v, output: %q", daemonsetName, dir, err, stdout)
67+
}
68+
if !shouldBeActive && bytes.Contains(stdout, []byte(daemonsetName)) {
69+
return fmt.Errorf("should not find checkpoint %q in %q, error: %v, output: %q", daemonsetName, dir, err, stdout)
70+
}
71+
72+
return nil
73+
}
74+
return retry(20, 10*time.Second, checkpointed)
75+
}
76+
77+
func verifyPod(c *Cluster, daemonsetName string, shouldRun bool) error {
78+
checkpointsRunning := func() error {
79+
stdout, stderr, err := c.Workers[0].SSH("docker ps")
80+
if err != nil {
81+
return fmt.Errorf("unable to docker ps, error: %v\nstdout: %s\nstderr: %s", err, stdout, stderr)
82+
}
83+
84+
if shouldRun && !bytes.Contains(stdout, []byte(daemonsetName)) {
85+
return fmt.Errorf("unable to find running checkpoints %q, error: %v, output: %q", daemonsetName, err, stdout)
86+
}
87+
if !shouldRun && bytes.Contains(stdout, []byte(daemonsetName)) {
88+
return fmt.Errorf("should not find running checkpoints %q, error: %v, output: %q", daemonsetName, err, stdout)
89+
}
90+
return nil
91+
}
92+
return retry(20, 10*time.Second, checkpointsRunning)
93+
}
94+
95+
func isNodeReady(n *Node) bool {
96+
for _, condition := range n.Status.Conditions {
97+
if condition.Type == v1.NodeReady && condition.Status == v1.ConditionTrue {
98+
return true
99+
}
100+
}
101+
return false
102+
}
103+
104+
// waitCluster waits for master and workers to be ready.
105+
func waitCluster(t *testing.T) *Cluster {
106+
var c *Cluster
107+
var err error
108+
109+
f := func() error {
110+
c, err = GetCluster()
111+
if err != nil {
112+
t.Fatalf("Failed to get cluster")
113+
}
114+
if len(c.Masters) == 0 {
115+
return fmt.Errorf("no masters")
116+
}
117+
if len(c.Workers) == 0 {
118+
return fmt.Errorf("no workers")
119+
}
120+
for i := range c.Masters {
121+
if !isNodeReady(c.Masters[i]) {
122+
return fmt.Errorf("masters[%d] is not ready", i)
123+
}
124+
}
125+
for i := range c.Workers {
126+
if !isNodeReady(c.Workers[i]) {
127+
return fmt.Errorf("workers[%d] is not ready", i)
128+
}
129+
}
130+
return nil
131+
}
132+
133+
if err := retry(20, 10*time.Second, f); err != nil {
134+
t.Fatalf("Failed to wait cluster: %v", err)
135+
}
136+
return c
137+
}
138+
139+
// 1. Schedule a pod checkpointer on worker node.
140+
// 2. Schedule a test pod on worker node.
141+
// 3. Reboot the worker without starting the kubelet.
142+
// 4. Delete the checkpointer on API server.
143+
// 5. Reboot the masters without starting the kubelet.
144+
// 6. Start the worker kubelet, verify the checkpointer and the pod are still running as static pods.
145+
// 7. Start the master kubelets, verify the checkpointer is removed but the pod is still running.
146+
func TestCheckpointerUnscheduleCheckpointer(t *testing.T) {
147+
// Get the cluster
148+
c := waitCluster(t)
149+
150+
testNS := strings.ToLower(fmt.Sprintf("%s-%s", namespace, t.Name()))
151+
if _, err := createNamespace(client, testNS); err != nil {
152+
t.Fatalf("Failed to create namespace: %v", err)
153+
}
154+
defer deleteNamespace(client, testNS)
155+
156+
// Run the pod checkpointer on worker nodes as well.
157+
patch := `[{"op": "replace", "path": "/spec/template/spec/nodeSelector", "value": {}}]`
158+
_, err := client.ExtensionsV1beta1().DaemonSets("kube-system").Patch("pod-checkpointer", types.JSONPatchType, []byte(patch))
159+
if err != nil {
160+
t.Fatalf("Failed to patch checkpointer: %v", err)
161+
}
162+
163+
// Create test pod.
164+
obj, _, err := api.Codecs.UniversalDecoder().Decode(nginxDS, nil, &v1beta1.DaemonSet{})
165+
if err != nil {
166+
t.Fatalf("Unable to decode manifest: %v", err)
167+
}
168+
ds, ok := obj.(*v1beta1.DaemonSet)
169+
if !ok {
170+
t.Fatalf("Expected manifest to decode into *api.Daemonset, got %T", ds)
171+
}
172+
_, err = client.ExtensionsV1beta1().DaemonSets(testNS).Create(ds)
173+
if err != nil {
174+
t.Fatalf("Failed to create the checkpoint parent: %v", err)
175+
}
176+
177+
// Verify the checkpoints are created.
178+
if err := verifyCheckpoint(c, "kube-system", "pod-checkpointer", true, true); err != nil {
179+
t.Fatalf("Failed to verify checkpoint: %v", err)
180+
}
181+
if err := verifyCheckpoint(c, testNS, "nginx-daemonset", true, false); err != nil {
182+
t.Fatalf("Failed to verify checkpoint: %v", err)
183+
}
184+
185+
// Disable the kubelet and reboot the worker.
186+
stdout, stderr, err := c.Workers[0].SSH("sudo systemctl disable kubelet")
187+
if err != nil {
188+
t.Fatalf("Failed to disable kubelet on workder %q: %v\nstdout: %s\nstderr: %s", c.Workers[0].Name, err, stdout, stderr)
189+
}
190+
if err := c.Workers[0].Reboot(); err != nil {
191+
t.Fatalf("Failed to reboot worker: %v", err)
192+
}
193+
194+
// Delete the pod checkpointer on the worker node by updating the daemonset.
195+
patch = `[{"op": "replace", "path": "/spec/template/spec/nodeSelector", "value": {"node-role.kubernetes.io/master":""}}]`
196+
_, err = client.ExtensionsV1beta1().DaemonSets("kube-system").Patch("pod-checkpointer", types.JSONPatchType, []byte(patch))
197+
if err != nil {
198+
t.Fatalf("Failed to patch checkpointer: %v", err)
199+
}
200+
201+
// Disable the kubelet and reboot the masters.
202+
for i := range c.Masters {
203+
stdout, stderr, err = c.Masters[i].SSH("sudo systemctl disable kubelet")
204+
if err != nil {
205+
t.Fatalf("Failed to disable kubelet on master %q: %v\nstdout: %s\nstderr: %s", c.Masters[i].Name, err, stdout, stderr)
206+
}
207+
if err := c.Masters[i].Reboot(); err != nil {
208+
t.Fatalf("Failed to reboot master: %v", err)
209+
}
210+
}
211+
212+
// Start the worker kubelet.
213+
stdout, stderr, err = c.Workers[0].SSH("sudo systemctl enable kubelet && sudo systemctl start kubelet")
214+
if err != nil {
215+
t.Fatalf("Failed to start kubelet on worker %q: %v\nstdout: %s\nstderr: %s", c.Workers[0].Name, err, stdout, stderr)
216+
}
217+
218+
// Verify that the checkpoints are still running.
219+
if err := verifyPod(c, "pod-checkpointer", true); err != nil {
220+
t.Fatalf("Failed to verifyPod: %s", err)
221+
222+
}
223+
if err := verifyPod(c, "nginx-daemonset", true); err != nil {
224+
t.Fatalf("Failed to verifyPod: %s", err)
225+
226+
}
227+
228+
// Start the master kubelet.
229+
for i := range c.Masters {
230+
stdout, stderr, err = c.Masters[i].SSH("sudo systemctl enable kubelet && sudo systemctl start kubelet")
231+
if err != nil {
232+
t.Fatalf("Failed to start kubelet on master %q: %v\nstdout: %s\nstderr: %s", c.Masters[i].Name, err, stdout, stderr)
233+
}
234+
}
235+
236+
// Verify that the pod-checkpointer is cleaned up but the daemonset is still running.
237+
if err := verifyPod(c, "pod-checkpointer", false); err != nil {
238+
t.Fatalf("Failed to verifyPod: %s", err)
239+
}
240+
if err := verifyPod(c, "nginx-daemonset", true); err != nil {
241+
t.Fatalf("Failed to verifyPod: %s", err)
242+
}
243+
if err := verifyCheckpoint(c, "kube-system", "pod-checkpointer", false, false); err != nil {
244+
t.Fatalf("Failed to verifyCheckpoint: %s", err)
245+
}
246+
if err := verifyCheckpoint(c, testNS, "nginx-daemonset", false, false); err != nil {
247+
t.Fatalf("Failed to verifyCheckpoint: %s", err)
248+
}
249+
}
250+
251+
// 1. Schedule a pod checkpointer on worker node.
252+
// 2. Schedule a test pod on worker node.
253+
// 3. Reboot the worker without starting the kubelet.
254+
// 4. Delete the test pod on API server.
255+
// 5. Reboot the masters without starting the kubelet.
256+
// 6. Start the worker kubelet, verify the checkpointer and the pod are still running as static pods.
257+
// 7. Start the master kubelets, verify the test pod is removed, but not the checkpointer.
258+
func TestCheckpointerUnscheduleParent(t *testing.T) {
259+
// Get the cluster
260+
c := waitCluster(t)
261+
262+
testNS := strings.ToLower(fmt.Sprintf("%s-%s", namespace, t.Name()))
263+
if _, err := createNamespace(client, testNS); err != nil {
264+
t.Fatalf("Failed to create namespace: %v", err)
265+
}
266+
defer deleteNamespace(client, testNS)
267+
268+
// Run the pod checkpointer on worker nodes as well.
269+
patch := `[{"op": "replace", "path": "/spec/template/spec/nodeSelector", "value": {}}]`
270+
_, err := client.ExtensionsV1beta1().DaemonSets("kube-system").Patch("pod-checkpointer", types.JSONPatchType, []byte(patch))
271+
if err != nil {
272+
t.Fatalf("Failed to patch checkpointer: %v", err)
273+
}
274+
275+
// Create test pod.
276+
obj, _, err := api.Codecs.UniversalDecoder().Decode(nginxDS, nil, &v1beta1.DaemonSet{})
277+
if err != nil {
278+
t.Fatalf("Unable to decode manifest: %v", err)
279+
}
280+
ds, ok := obj.(*v1beta1.DaemonSet)
281+
if !ok {
282+
t.Fatalf("Expected manifest to decode into *api.Daemonset, got %T", ds)
283+
}
284+
_, err = client.ExtensionsV1beta1().DaemonSets(testNS).Create(ds)
285+
if err != nil {
286+
t.Fatalf("Failed to create the checkpoint parent: %v", err)
287+
}
288+
289+
// Verify the checkpoints are created.
290+
if err := verifyCheckpoint(c, "kube-system", "pod-checkpointer", true, true); err != nil {
291+
t.Fatalf("verifyCheckpoint: %s", err)
292+
}
293+
if err := verifyCheckpoint(c, testNS, "nginx-daemonset", true, false); err != nil {
294+
t.Fatalf("verifyCheckpoint: %s", err)
295+
}
296+
297+
// Disable the kubelet and reboot the worker.
298+
stdout, stderr, err := c.Workers[0].SSH("sudo systemctl disable kubelet")
299+
if err != nil {
300+
t.Fatalf("Failed to disable kubelet on worker %q: %v\nstdout: %s\nstderr: %s", c.Workers[0].Name, err, stdout, stderr)
301+
}
302+
if err := c.Workers[0].Reboot(); err != nil {
303+
t.Fatalf("Failed to reboot worker: %v", err)
304+
}
305+
306+
// Delete test pod on the workers.
307+
patch = `[{"op": "replace", "path": "/spec/template/spec/nodeSelector", "value": {"node-role.kubernetes.io/master":""}}]`
308+
_, err = client.ExtensionsV1beta1().DaemonSets(testNS).Patch("nginx-daemonset", types.JSONPatchType, []byte(patch))
309+
if err != nil {
310+
t.Fatalf("unable to patch daemonset: %v", err)
311+
}
312+
313+
// Disable the kubelet and reboot the masters.
314+
for i := range c.Masters {
315+
stdout, stderr, err = c.Masters[i].SSH("sudo systemctl disable kubelet")
316+
if err != nil {
317+
t.Fatalf("Failed to disable kubelet on master %q: %v\nstdout: %s\nstderr: %s", c.Masters[i].Name, err, stdout, stderr)
318+
}
319+
if err := c.Masters[i].Reboot(); err != nil {
320+
t.Fatalf("Failed to reboot master: %v", err)
321+
}
322+
}
323+
324+
// Start the worker kubelet.
325+
stdout, stderr, err = c.Workers[0].SSH("sudo systemctl enable kubelet && sudo systemctl start kubelet")
326+
if err != nil {
327+
t.Fatalf("unable to start kubelet on worker %q: %v\nstdout: %s\nstderr: %s", c.Workers[0].Name, err, stdout, stderr)
328+
}
329+
330+
// Verify that the checkpoints are running.
331+
if err := verifyPod(c, "pod-checkpointer", true); err != nil {
332+
t.Fatalf("verifyPod: %s", err)
333+
334+
}
335+
if err := verifyPod(c, "nginx-daemonset", true); err != nil {
336+
t.Fatalf("verifyPod: %s", err)
337+
}
338+
339+
// Start the master kubelets.
340+
for i := range c.Masters {
341+
stdout, stderr, err = c.Masters[i].SSH("sudo systemctl enable kubelet && sudo systemctl start kubelet")
342+
if err != nil {
343+
t.Fatalf("unable to start kubelet on master %q: %v\nstdout: %s\nstderr: %s", c.Masters[i].Name, err, stdout, stderr)
344+
}
345+
}
346+
347+
// Verify that checkpoint is cleaned up and not running, but the pod checkpointer should still be running.
348+
if err := verifyPod(c, "pod-checkpointer", true); err != nil {
349+
t.Fatalf("verifyPod: %s", err)
350+
}
351+
if err := verifyPod(c, "nginx-daemonset", false); err != nil {
352+
t.Fatalf("verifyPod: %s", err)
353+
}
354+
if err := verifyCheckpoint(c, "kube-system", "pod-checkpointer", true, true); err != nil {
355+
t.Fatalf("verifyCheckpoint: %s", err)
356+
}
357+
if err := verifyCheckpoint(c, testNS, "nginx-daemonset", false, false); err != nil {
358+
t.Fatalf("verifyCheckpoint: %s", err)
359+
}
360+
return
361+
}

0 commit comments

Comments
 (0)