Skip to content

Commit cf7a371

Browse files
committed
fix(e2e): make WaitUntilNodeReady robust to watch disconnections
Replace watch-based node waiting with ticker-based polling. - Use List instead of Watch to avoid silent failures on network errors - Return (string, error) instead of calling t.Fatalf() - Log transient API errors for debugging - Provide clear error messages distinguishing "not found" vs "not ready"
1 parent a18595d commit cf7a371

File tree

2 files changed

+42
-53
lines changed

2 files changed

+42
-53
lines changed

e2e/kube.go

Lines changed: 36 additions & 48 deletions
Original file line numberDiff line numberDiff line change
@@ -6,20 +6,17 @@ import (
66
"encoding/json"
77
"fmt"
88
"strings"
9-
"testing"
109
"time"
1110

1211
"github.com/Azure/agentbaker/e2e/config"
1312
"github.com/Azure/azure-sdk-for-go/sdk/azcore/to"
14-
"github.com/stretchr/testify/require"
1513
appsv1 "k8s.io/api/apps/v1"
1614
corev1 "k8s.io/api/core/v1"
1715
v1 "k8s.io/api/core/v1"
1816
errorsk8s "k8s.io/apimachinery/pkg/api/errors"
1917
"k8s.io/apimachinery/pkg/api/resource"
2018
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
2119
"k8s.io/apimachinery/pkg/util/wait"
22-
"k8s.io/apimachinery/pkg/watch"
2320
"k8s.io/client-go/kubernetes"
2421
"k8s.io/client-go/rest"
2522
"k8s.io/client-go/tools/clientcmd"
@@ -143,60 +140,51 @@ func (k *Kubeclient) WaitUntilPodRunning(ctx context.Context, namespace string,
143140
return k.WaitUntilPodRunningWithRetry(ctx, namespace, labelSelector, fieldSelector, 0)
144141
}
145142

146-
func (k *Kubeclient) WaitUntilNodeReady(ctx context.Context, t testing.TB, vmssName string) string {
147-
startTime := time.Now()
148-
t.Logf("waiting for node %s to be ready in k8s API", vmssName)
149-
defer func() {
150-
t.Logf("waited for node %s to be ready in k8s API for %s", vmssName, time.Since(startTime))
151-
}()
152-
153-
var node *corev1.Node = nil
154-
watcher, err := k.Typed.CoreV1().Nodes().Watch(ctx, metav1.ListOptions{})
155-
require.NoError(t, err, "failed to start watching nodes")
156-
defer watcher.Stop()
157-
158-
for event := range watcher.ResultChan() {
159-
if event.Type != watch.Added && event.Type != watch.Modified {
160-
continue
161-
}
162-
163-
var nodeFromEvent *corev1.Node
164-
switch v := event.Object.(type) {
165-
case *corev1.Node:
166-
nodeFromEvent = v
143+
func (k *Kubeclient) WaitUntilNodeReady(ctx context.Context, vmssName string) (string, error) {
144+
ticker := time.NewTicker(5 * time.Second)
145+
defer ticker.Stop()
167146

168-
default:
169-
t.Logf("skipping object type %T", event.Object)
170-
continue
171-
}
147+
var foundNode *corev1.Node
172148

173-
if !strings.HasPrefix(nodeFromEvent.Name, vmssName) {
174-
continue
149+
for {
150+
nodes, err := k.Typed.CoreV1().Nodes().List(ctx, metav1.ListOptions{})
151+
if err != nil {
152+
logf(ctx, "failed to list nodes: %v", err)
153+
} else {
154+
foundNode = nil
155+
for _, node := range nodes.Items {
156+
if strings.HasPrefix(node.Name, vmssName) {
157+
foundNode = &node
158+
if isNodeReady(&node) {
159+
return node.Name, nil
160+
}
161+
break
162+
}
163+
}
175164
}
176165

177-
// found the right node. Use it!
178-
node = nodeFromEvent
179-
nodeTaints, _ := json.Marshal(node.Spec.Taints)
180-
nodeConditions, _ := json.Marshal(node.Status.Conditions)
181-
182-
for _, cond := range node.Status.Conditions {
183-
if cond.Type == corev1.NodeReady && cond.Status == corev1.ConditionTrue {
184-
t.Logf("node %s is ready. Taints: %s Conditions: %s", node.Name, string(nodeTaints), string(nodeConditions))
185-
return node.Name
166+
select {
167+
case <-ctx.Done():
168+
if foundNode != nil {
169+
return "", fmt.Errorf("node %s not ready: conditions=%v, taints=%v: %w",
170+
foundNode.Name,
171+
foundNode.Status.Conditions,
172+
foundNode.Spec.Taints,
173+
ctx.Err())
186174
}
175+
return "", fmt.Errorf("node with prefix %q not found: %w", vmssName, ctx.Err())
176+
case <-ticker.C:
187177
}
188-
189-
t.Logf("node %s is not ready. Taints: %s Conditions: %s", node.Name, string(nodeTaints), string(nodeConditions))
190178
}
179+
}
191180

192-
if node == nil {
193-
t.Fatalf("%q haven't appeared in k8s API server", vmssName)
194-
return ""
181+
func isNodeReady(node *corev1.Node) bool {
182+
for _, cond := range node.Status.Conditions {
183+
if cond.Type == corev1.NodeReady && cond.Status == corev1.ConditionTrue {
184+
return true
185+
}
195186
}
196-
197-
nodeString, _ := json.Marshal(node)
198-
t.Fatalf("failed to wait for %q (%s) to be ready %+v. Detail: %s", vmssName, node.Name, node.Status, string(nodeString))
199-
return node.Name
187+
return false
200188
}
201189

202190
// GetPodNetworkDebugPodForNode returns a pod that's a member of the 'debugnonhost' daemonset running in the cluster - this will return

e2e/test_helpers.go

Lines changed: 6 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -280,12 +280,13 @@ func prepareAKSNode(ctx context.Context, s *Scenario) (*ScenarioVM, error) {
280280
require.NoError(s.T, err)
281281

282282
if !s.Config.SkipDefaultValidation {
283-
vmssCreatedAt := time.Now() // Record the start time
284-
creationElapse := time.Since(start) // Calculate the elapsed time
285-
scenarioVM.KubeName = s.Runtime.Cluster.Kube.WaitUntilNodeReady(ctx, s.T, s.Runtime.VMSSName)
286-
readyElapse := time.Since(vmssCreatedAt) // Calculate the elapsed time
283+
vmssCreatedAt := time.Now()
284+
kubeName, err := s.Runtime.Cluster.Kube.WaitUntilNodeReady(ctx, s.Runtime.VMSSName)
285+
require.NoError(s.T, err)
286+
scenarioVM.KubeName = kubeName
287+
nodeReadyElapse := time.Since(vmssCreatedAt)
287288
totalElapse := time.Since(start)
288-
toolkit.LogDuration(ctx, totalElapse, 3*time.Minute, fmt.Sprintf("Node %s took %s to be created and %s to be ready", s.Runtime.VMSSName, creationElapse, readyElapse))
289+
toolkit.LogDuration(ctx, totalElapse, 3*time.Minute, fmt.Sprintf("Node %s: total %s (VMSS creation + %s for node ready)", s.Runtime.VMSSName, totalElapse, nodeReadyElapse))
289290
}
290291

291292
return scenarioVM, nil

0 commit comments

Comments
 (0)