Skip to content

Commit 746f08a

Browse files
authored
Merge pull request kubernetes#127033 from dims/fix-for-DaemonRestart-Disruptive-tests
Fix for DaemonRestart [Disruptive] tests
2 parents df577d7 + 0a124e2 commit 746f08a

File tree

2 files changed

+94
-40
lines changed

2 files changed

+94
-40
lines changed

test/e2e/apps/daemon_restart.go

Lines changed: 91 additions & 40 deletions
Original file line numberDiff line numberDiff line change
@@ -40,6 +40,7 @@ import (
4040
e2erc "k8s.io/kubernetes/test/e2e/framework/rc"
4141
e2eskipper "k8s.io/kubernetes/test/e2e/framework/skipper"
4242
e2essh "k8s.io/kubernetes/test/e2e/framework/ssh"
43+
testfwk "k8s.io/kubernetes/test/integration/framework"
4344
testutils "k8s.io/kubernetes/test/utils"
4445
imageutils "k8s.io/kubernetes/test/utils/image"
4546
admissionapi "k8s.io/pod-security-admission/api"
@@ -106,17 +107,20 @@ func (r *RestartDaemonConfig) waitUp(ctx context.Context) {
106107
var healthzCheck string
107108
if r.enableHTTPS {
108109
healthzCheck = fmt.Sprintf(
109-
"curl -sk -o %v -I -w \"%%{http_code}\" https://localhost:%v/healthz", nullDev, r.healthzPort)
110+
"curl -sk -o %v -I -w \"%%{http_code}\" https://127.0.0.1:%v/healthz", nullDev, r.healthzPort)
110111
} else {
111112
healthzCheck = fmt.Sprintf(
112-
"curl -s -o %v -I -w \"%%{http_code}\" http://localhost:%v/healthz", nullDev, r.healthzPort)
113+
"curl -s -o %v -I -w \"%%{http_code}\" http://127.0.0.1:%v/healthz", nullDev, r.healthzPort)
113114

114115
}
116+
115117
err := wait.PollUntilContextTimeout(ctx, r.pollInterval, r.pollTimeout, false, func(ctx context.Context) (bool, error) {
118+
116119
result, err := e2essh.NodeExec(ctx, r.nodeName, healthzCheck, framework.TestContext.Provider)
117120
if err != nil {
118121
return false, err
119122
}
123+
e2essh.LogResult(result)
120124
if result.Code == 0 {
121125
httpCode, err := strconv.Atoi(result.Stdout)
122126
if err != nil {
@@ -274,49 +278,57 @@ var _ = SIGDescribe("DaemonRestart", framework.WithDisruptive(), func() {
274278

275279
// Requires master ssh access.
276280
e2eskipper.SkipUnlessProviderIs("gce", "aws")
277-
restarter := NewRestartConfig(
278-
framework.APIAddress(), "kube-controller", ports.KubeControllerManagerPort, restartPollInterval, restartTimeout, true)
279-
restarter.restart(ctx)
280-
281-
// The intent is to ensure the replication controller manager has observed and reported status of
282-
// the replication controller at least once since the manager restarted, so that we can determine
283-
// that it had the opportunity to create/delete pods, if it were going to do so. Scaling the RC
284-
// to the same size achieves this, because the scale operation advances the RC's sequence number
285-
// and awaits it to be observed and reported back in the RC's status.
286-
e2erc.ScaleRC(ctx, f.ClientSet, f.ScalesGetter, ns, rcName, numPods, true)
287-
288-
// Only check the keys, the pods can be different if the kubelet updated it.
289-
// TODO: Can it really?
290-
existingKeys := sets.NewString()
291-
newKeys := sets.NewString()
292-
for _, k := range existingPods.ListKeys() {
293-
existingKeys.Insert(k)
294-
}
295-
for _, k := range newPods.ListKeys() {
296-
newKeys.Insert(k)
297-
}
298-
if len(newKeys.List()) != len(existingKeys.List()) ||
299-
!newKeys.IsSuperset(existingKeys) {
300-
framework.Failf("RcManager created/deleted pods after restart \n\n %+v", tracker)
281+
nodes, err := getControlPlaneNodes(ctx, f.ClientSet)
282+
framework.ExpectNoError(err)
283+
for i := range nodes.Items {
284+
285+
restarter := NewRestartConfig(
286+
getFirstIPforNode(&nodes.Items[i]), "kube-controller", ports.KubeControllerManagerPort, restartPollInterval, restartTimeout, true)
287+
restarter.restart(ctx)
288+
289+
// The intent is to ensure the replication controller manager has observed and reported status of
290+
// the replication controller at least once since the manager restarted, so that we can determine
291+
// that it had the opportunity to create/delete pods, if it were going to do so. Scaling the RC
292+
// to the same size achieves this, because the scale operation advances the RC's sequence number
293+
// and awaits it to be observed and reported back in the RC's status.
294+
framework.ExpectNoError(e2erc.ScaleRC(ctx, f.ClientSet, f.ScalesGetter, ns, rcName, numPods, true))
295+
296+
// Only check the keys, the pods can be different if the kubelet updated it.
297+
// TODO: Can it really?
298+
existingKeys := sets.NewString()
299+
newKeys := sets.NewString()
300+
for _, k := range existingPods.ListKeys() {
301+
existingKeys.Insert(k)
302+
}
303+
for _, k := range newPods.ListKeys() {
304+
newKeys.Insert(k)
305+
}
306+
if len(newKeys.List()) != len(existingKeys.List()) ||
307+
!newKeys.IsSuperset(existingKeys) {
308+
framework.Failf("RcManager created/deleted pods after restart \n\n %+v", tracker)
309+
}
301310
}
302311
})
303312

304313
ginkgo.It("Scheduler should continue assigning pods to nodes across restart", func(ctx context.Context) {
305-
306314
// Requires master ssh access.
307315
e2eskipper.SkipUnlessProviderIs("gce", "aws")
308-
restarter := NewRestartConfig(
309-
framework.APIAddress(), "kube-scheduler", kubeschedulerconfig.DefaultKubeSchedulerPort, restartPollInterval, restartTimeout, true)
310-
311-
// Create pods while the scheduler is down and make sure the scheduler picks them up by
312-
// scaling the rc to the same size.
313-
restarter.waitUp(ctx)
314-
restarter.kill(ctx)
315-
// This is best effort to try and create pods while the scheduler is down,
316-
// since we don't know exactly when it is restarted after the kill signal.
317-
framework.ExpectNoError(e2erc.ScaleRC(ctx, f.ClientSet, f.ScalesGetter, ns, rcName, numPods+5, false))
318-
restarter.waitUp(ctx)
319-
framework.ExpectNoError(e2erc.ScaleRC(ctx, f.ClientSet, f.ScalesGetter, ns, rcName, numPods+5, true))
316+
nodes, err := getControlPlaneNodes(ctx, f.ClientSet)
317+
framework.ExpectNoError(err)
318+
for i := range nodes.Items {
319+
restarter := NewRestartConfig(
320+
getFirstIPforNode(&nodes.Items[i]), "kube-scheduler", kubeschedulerconfig.DefaultKubeSchedulerPort, restartPollInterval, restartTimeout, true)
321+
322+
// Create pods while the scheduler is down and make sure the scheduler picks them up by
323+
// scaling the rc to the same size.
324+
restarter.waitUp(ctx)
325+
restarter.kill(ctx)
326+
// This is best effort to try and create pods while the scheduler is down,
327+
// since we don't know exactly when it is restarted after the kill signal.
328+
framework.ExpectNoError(e2erc.ScaleRC(ctx, f.ClientSet, f.ScalesGetter, ns, rcName, numPods+5, false))
329+
restarter.waitUp(ctx)
330+
framework.ExpectNoError(e2erc.ScaleRC(ctx, f.ClientSet, f.ScalesGetter, ns, rcName, numPods+5, true))
331+
}
320332
})
321333

322334
ginkgo.It("Kubelet should not restart containers across restart", func(ctx context.Context) {
@@ -331,7 +343,7 @@ var _ = SIGDescribe("DaemonRestart", framework.WithDisruptive(), func() {
331343
}
332344
for _, ip := range nodeIPs {
333345
restarter := NewRestartConfig(
334-
ip, "kubelet", ports.KubeletReadOnlyPort, restartPollInterval, restartTimeout, false)
346+
ip, "kubelet", ports.KubeletHealthzPort, restartPollInterval, restartTimeout, false)
335347
restarter.restart(ctx)
336348
}
337349
postRestarts, badNodes := getContainerRestarts(ctx, f.ClientSet, ns, labelSelector)
@@ -355,3 +367,42 @@ var _ = SIGDescribe("DaemonRestart", framework.WithDisruptive(), func() {
355367
}
356368
})
357369
})
370+
371+
func getFirstIPforNode(node *v1.Node) string {
372+
var ips []string
373+
ips = append(ips, getAddresses(node, v1.NodeExternalIP)...)
374+
if len(ips) == 0 {
375+
// If ExternalIP isn't set, assume the test programs can reach the InternalIP
376+
ips = append(ips, getAddresses(node, v1.NodeInternalIP)...)
377+
}
378+
if len(ips) == 0 {
379+
framework.Failf("did not find any ip(s) for node: %v", node)
380+
}
381+
return ips[0]
382+
}
383+
384+
func getAddresses(node *v1.Node, addressType v1.NodeAddressType) (ips []string) {
385+
for j := range node.Status.Addresses {
386+
nodeAddress := &node.Status.Addresses[j]
387+
if nodeAddress.Type == addressType && nodeAddress.Address != "" {
388+
ips = append(ips, nodeAddress.Address)
389+
}
390+
}
391+
return
392+
}
393+
394+
func getControlPlaneNodes(ctx context.Context, c clientset.Interface) (nodes *v1.NodeList, err error) {
395+
nodes, err = c.CoreV1().Nodes().List(ctx, metav1.ListOptions{})
396+
if err != nil {
397+
return nil, err
398+
}
399+
testfwk.Filter(nodes, func(node v1.Node) bool {
400+
_, isMaster := node.Labels["node-role.kubernetes.io/master"]
401+
_, isControlPlane := node.Labels["node-role.kubernetes.io/control-plane"]
402+
return isMaster || isControlPlane
403+
})
404+
if len(nodes.Items) == 0 {
405+
return nil, fmt.Errorf("there are currently no ready, schedulable control plane nodes in the cluster")
406+
}
407+
return nodes, nil
408+
}

test/e2e/framework/ssh/ssh.go

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -68,6 +68,9 @@ func GetSigner(provider string) (ssh.Signer, error) {
6868
switch provider {
6969
case "gce", "gke", "kubemark":
7070
keyfile = os.Getenv("GCE_SSH_KEY")
71+
if keyfile == "" {
72+
keyfile = os.Getenv("GCE_SSH_PRIVATE_KEY_FILE")
73+
}
7174
if keyfile == "" {
7275
keyfile = "google_compute_engine"
7376
}

0 commit comments

Comments
 (0)