Skip to content

Commit 6303016

Browse files
Improving port forwarding error handling (#1839)
* adding error handling hook to portforwarder. documenting exported symbols. removing unnecessary build constraints * improving the port forward wrapper api
1 parent 70aafa5 commit 6303016

File tree

3 files changed

+186
-109
lines changed

3 files changed

+186
-109
lines changed

test/integration/k8s_test.go

Lines changed: 74 additions & 68 deletions
Original file line numberDiff line numberDiff line change
@@ -4,8 +4,6 @@ package k8s
44

55
import (
66
"context"
7-
"log"
8-
97
//"dnc/test/integration/goldpinger"
108
"errors"
119
"flag"
@@ -43,7 +41,6 @@ var (
4341
kubeconfig = flag.String("test-kubeconfig", filepath.Join(homedir.HomeDir(), ".kube", "config"), "(optional) absolute path to the kubeconfig file")
4442
delegatedSubnetID = flag.String("delegated-subnet-id", "", "delegated subnet id for node labeling")
4543
delegatedSubnetName = flag.String("subnet-name", "", "subnet name for node labeling")
46-
gpPodScaleCounts = []int{2, 10, 100, 2}
4744
)
4845

4946
func shouldLabelNodes() bool {
@@ -141,8 +138,12 @@ func TestPodScaling(t *testing.T) {
141138
}
142139
})
143140

141+
podsClient := clientset.CoreV1().Pods(deployment.Namespace)
142+
143+
gpPodScaleCounts := []int{2, 10, 100, 2}
144144
for _, c := range gpPodScaleCounts {
145145
count := c
146+
146147
t.Run(fmt.Sprintf("replica count %d", count), func(t *testing.T) {
147148
replicaCtx, cancel := context.WithTimeout(ctx, (retryAttempts+1)*retryDelaySec)
148149
defer cancel()
@@ -151,93 +152,98 @@ func TestPodScaling(t *testing.T) {
151152
t.Fatalf("could not scale deployment: %v", err)
152153
}
153154

154-
if !t.Run("all pods have IPs assigned", func(t *testing.T) {
155-
podsClient := clientset.CoreV1().Pods(deployment.Namespace)
155+
t.Log("checking that all pods have IPs assigned")
156156

157-
checkPodIPsFn := func() error {
158-
podList, err := podsClient.List(ctx, metav1.ListOptions{LabelSelector: "app=goldpinger"})
159-
if err != nil {
160-
return err
161-
}
157+
checkPodIPsFn := func() error {
158+
podList, err := podsClient.List(ctx, metav1.ListOptions{LabelSelector: "app=goldpinger"})
159+
if err != nil {
160+
return err
161+
}
162162

163-
if len(podList.Items) == 0 {
164-
return errors.New("no pods scheduled")
165-
}
163+
if len(podList.Items) == 0 {
164+
return errors.New("no pods scheduled")
165+
}
166166

167-
for _, pod := range podList.Items {
168-
if pod.Status.Phase == apiv1.PodPending {
169-
return errors.New("some pods still pending")
170-
}
167+
for _, pod := range podList.Items {
168+
if pod.Status.Phase == apiv1.PodPending {
169+
return errors.New("some pods still pending")
171170
}
171+
}
172172

173-
for _, pod := range podList.Items {
174-
if pod.Status.PodIP == "" {
175-
return errors.New("a pod has not been allocated an IP")
176-
}
173+
for _, pod := range podList.Items {
174+
if pod.Status.PodIP == "" {
175+
return errors.New("a pod has not been allocated an IP")
177176
}
178-
179-
return nil
180177
}
181-
err := defaultRetrier.Do(ctx, checkPodIPsFn)
178+
179+
return nil
180+
}
181+
182+
if err := defaultRetrier.Do(ctx, checkPodIPsFn); err != nil {
183+
t.Fatalf("not all pods were allocated IPs: %v", err)
184+
}
185+
186+
t.Log("all pods have been allocated IPs")
187+
t.Log("checking that all pods can ping each other")
188+
189+
clusterCheckCtx, cancel := context.WithTimeout(ctx, 20*time.Minute)
190+
defer cancel()
191+
192+
pfOpts := PortForwardingOpts{
193+
Namespace: "default",
194+
LabelSelector: "type=goldpinger-pod",
195+
LocalPort: 9090,
196+
DestPort: 8080,
197+
}
198+
199+
pingCheckFn := func() error {
200+
pf, err := NewPortForwarder(restConfig, t, pfOpts)
182201
if err != nil {
183-
t.Fatalf("not all pods were allocated IPs: %v", err)
202+
t.Fatalf("could not build port forwarder: %v", err)
184203
}
185-
t.Log("all pods have been allocated IPs")
186-
}) {
187-
errors.New("Pods don't have IP's")
188-
return
189-
}
190204

191-
t.Run("all pods can ping each other", func(t *testing.T) {
192-
clusterCheckCtx, cancel := context.WithTimeout(ctx, 20*time.Minute)
205+
portForwardCtx, cancel := context.WithTimeout(ctx, (retryAttempts+1)*retryDelaySec)
193206
defer cancel()
194-
clusterCheckFn := func() error {
195-
pf, err := NewPortForwarder(restConfig)
196-
if err != nil {
197-
t.Fatal(err)
198-
}
199-
200-
portForwardCtx, cancel := context.WithTimeout(ctx, (retryAttempts+1)*retryDelaySec)
201-
defer cancel()
202207

203-
var streamHandle PortForwardStreamHandle
204-
portForwardFn := func() error {
205-
log.Printf("attempting port forward")
206-
handle, err := pf.Forward(ctx, "default", "type=goldpinger-pod", 9090, 8080)
207-
if err != nil {
208-
return err
209-
}
208+
portForwardFn := func() error {
209+
t.Log("attempting port forward")
210210

211-
streamHandle = handle
212-
return nil
211+
if err := pf.Forward(portForwardCtx); err != nil {
212+
return fmt.Errorf("could not start port forward: %w", err)
213213
}
214-
if err := defaultRetrier.Do(portForwardCtx, portForwardFn); err != nil {
215-
t.Fatalf("could not start port forward within %v: %v", retryDelaySec.String(), err)
216-
}
217-
defer streamHandle.Stop()
218214

219-
gpClient := goldpinger.Client{Host: streamHandle.Url()}
215+
return nil
216+
}
220217

221-
clusterState, err := gpClient.CheckAll(clusterCheckCtx)
222-
if err != nil {
223-
return err
224-
}
218+
if err := defaultRetrier.Do(portForwardCtx, portForwardFn); err != nil {
219+
t.Fatalf("could not start port forward within %v: %v", retryDelaySec.String(), err)
220+
}
225221

226-
stats := goldpinger.ClusterStats(clusterState)
227-
stats.PrintStats()
228-
if stats.AllPingsHealthy() {
229-
return nil
230-
}
222+
go pf.KeepAlive(clusterCheckCtx)
223+
224+
defer pf.Stop()
225+
226+
gpClient := goldpinger.Client{Host: pf.Address()}
231227

232-
return errors.New("not all pings are healthy")
228+
clusterState, err := gpClient.CheckAll(clusterCheckCtx)
229+
if err != nil {
230+
return fmt.Errorf("could not check all goldpinger pods: %w", err)
233231
}
234232

235-
if err := defaultRetrier.Do(clusterCheckCtx, clusterCheckFn); err != nil {
236-
t.Fatalf("cluster could not reach healthy state: %v", err)
233+
stats := goldpinger.ClusterStats(clusterState)
234+
stats.PrintStats()
235+
if stats.AllPingsHealthy() {
236+
return nil
237237
}
238238

239-
t.Log("all pings successful!")
240-
})
239+
return errors.New("not all pings are healthy")
240+
}
241+
242+
if err := defaultRetrier.Do(clusterCheckCtx, pingCheckFn); err != nil {
243+
t.Fatalf("cluster could not reach healthy state: %v", err)
244+
}
245+
246+
t.Log("all pings successful!")
241247
})
242248
}
243249
}

test/integration/label.go

Lines changed: 0 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,3 @@
1-
//go:build integration
2-
31
package k8s
42

53
import (

0 commit comments

Comments
 (0)