Skip to content

Commit 4e6314d

Browse files
committed
add a trap to catch SIGHUP and improvements for pty
1 parent d8a6355 commit 4e6314d

File tree

5 files changed

+152
-81
lines changed

5 files changed

+152
-81
lines changed

e2e/common.go

Lines changed: 94 additions & 35 deletions
Original file line numberDiff line numberDiff line change
@@ -5,6 +5,7 @@ import (
55
"io"
66
"os/exec"
77
"strings"
8+
"sync"
89
"syscall"
910
"time"
1011

@@ -13,7 +14,7 @@ import (
1314
)
1415

1516
const (
16-
StartCommandWait = 30 * time.Second
17+
StartCommandWait = 15 * time.Second
1718
RunCommandTimeout = 60 * time.Second
1819
)
1920

@@ -32,39 +33,56 @@ func StartCommand(log *logrus.Entry, commandName string, arg ...string) (string,
3233
outPipe, _ := cmd.StdoutPipe()
3334
errPipe, _ := cmd.StderrPipe()
3435

35-
var sb strings.Builder
36+
var sbOut strings.Builder
37+
var sbErr strings.Builder
38+
3639
go func(_ io.ReadCloser) {
3740
reader := bufio.NewReader(errPipe)
38-
line, err := reader.ReadString('\n')
39-
for err == nil {
40-
sb.WriteString(line)
41-
line, err = reader.ReadString('\n')
41+
for {
42+
line, err := reader.ReadString('\n')
43+
// Write line even if there's an error, as long as we got data
44+
if len(line) > 0 {
45+
sbErr.WriteString(line)
46+
}
47+
if err != nil {
48+
break
49+
}
4250
}
4351
}(errPipe)
4452

4553
go func(_ io.ReadCloser) {
4654
reader := bufio.NewReader(outPipe)
47-
line, err := reader.ReadString('\n')
48-
for err == nil {
49-
sb.WriteString(line)
50-
line, err = reader.ReadString('\n')
55+
for {
56+
line, err := reader.ReadString('\n')
57+
// Write line even if there's an error, as long as we got data
58+
if len(line) > 0 {
59+
sbOut.WriteString(line)
60+
}
61+
if err != nil {
62+
break
63+
}
5164
}
5265
}(outPipe)
5366

5467
// start async
5568
go func() {
5669
log.Debug("Starting async ...")
57-
_, err := pty.Start(cmd)
70+
ptmx, err := pty.Start(cmd)
5871
if err != nil {
5972
log.Errorf("Start returned error: %v", err)
73+
return
6074
}
75+
// Note: PTY is intentionally NOT closed here as command continues running
76+
// Keep the PTY file descriptor alive to prevent SIGHUP
77+
_ = ptmx // Keep reference to prevent premature PTY closure
6178
}()
6279

6380
log.Debugf("Waiting %v ...", StartCommandWait)
6481
time.Sleep(StartCommandWait)
6582

6683
log.Debug("Returning result while command still running")
67-
return sb.String(), nil
84+
// Combine stderr first (errors more visible), then stdout
85+
return sbErr.String() + sbOut.String(), nil
6886
}
6987

7088
// run command with tty support and wait for stop
@@ -77,44 +95,65 @@ func RunCommand(log *logrus.Entry, commandName string, arg ...string) (string, e
7795
outPipe, _ := cmd.StdoutPipe()
7896
errPipe, _ := cmd.StderrPipe()
7997

80-
var sb strings.Builder
98+
var sbOut strings.Builder
99+
var sbErr strings.Builder
100+
var wg sync.WaitGroup
101+
102+
wg.Add(2)
81103
go func(_ io.ReadCloser) {
104+
defer wg.Done()
82105
reader := bufio.NewReader(errPipe)
83-
line, err := reader.ReadString('\n')
84-
for err == nil {
85-
sb.WriteString(line)
86-
line, err = reader.ReadString('\n')
106+
for {
107+
line, err := reader.ReadString('\n')
108+
// Write line even if there's an error, as long as we got data
109+
if len(line) > 0 {
110+
sbErr.WriteString(line)
111+
}
112+
if err != nil {
113+
break
114+
}
87115
}
88116
}(errPipe)
89117

90118
go func(_ io.ReadCloser) {
119+
defer wg.Done()
91120
reader := bufio.NewReader(outPipe)
92-
line, err := reader.ReadString('\n')
93-
for err == nil {
94-
sb.WriteString(line)
95-
line, err = reader.ReadString('\n')
121+
for {
122+
line, err := reader.ReadString('\n')
123+
// Write line even if there's an error, as long as we got data
124+
if len(line) > 0 {
125+
sbOut.WriteString(line)
126+
}
127+
if err != nil {
128+
break
129+
}
96130
}
97131
}(outPipe)
98132

99133
log.Debug("Starting ...")
100-
_, err := pty.Start(cmd)
134+
ptmx, err := pty.Start(cmd)
101135
if err != nil {
102136
log.Errorf("Start returned error: %v", err)
103137
return "", err
104138
}
139+
defer ptmx.Close() // Ensure PTY is closed after command finishes
105140

106141
log.Debug("Waiting ...")
107142
err = cmd.Wait()
108143
if err != nil {
109144
log.Errorf("Wait returned error: %v", err)
110145
}
111146

147+
log.Debug("Waiting for output goroutines to finish...")
148+
wg.Wait()
149+
112150
// TODO: find why this returns -1. That may be related to pty implementation
113151
/*if cmd.ProcessState.ExitCode() != 0 {
114-
return sb.String(), fmt.Errorf("Cmd returned code %d", cmd.ProcessState.ExitCode())
152+
return sbErr.String() + sbOut.String(), fmt.Errorf("Cmd returned code %d", cmd.ProcessState.ExitCode())
115153
}*/
116154

117-
return sb.String(), nil
155+
// Combine stderr first (errors more visible), then stdout
156+
return sbErr.String() + sbOut.String(), nil
118157
}
119158

120159
// run command with tty support and terminate it after timeout
@@ -137,22 +176,38 @@ func RunCommandAndTerminate(log *logrus.Entry, commandName string, arg ...string
137176
})
138177
defer timer.Stop()
139178

140-
var sb strings.Builder
179+
var sbOut strings.Builder
180+
var sbErr strings.Builder
181+
var wg sync.WaitGroup
182+
183+
wg.Add(2)
141184
go func(_ io.ReadCloser) {
185+
defer wg.Done()
142186
reader := bufio.NewReader(errPipe)
143-
line, err := reader.ReadString('\n')
144-
for err == nil {
145-
sb.WriteString(line)
146-
line, err = reader.ReadString('\n')
187+
for {
188+
line, err := reader.ReadString('\n')
189+
// Write line even if there's an error, as long as we got data
190+
if len(line) > 0 {
191+
sbErr.WriteString(line)
192+
}
193+
if err != nil {
194+
break
195+
}
147196
}
148197
}(errPipe)
149198

150199
go func(_ io.ReadCloser) {
200+
defer wg.Done()
151201
reader := bufio.NewReader(outPipe)
152-
line, err := reader.ReadString('\n')
153-
for err == nil {
154-
sb.WriteString(line)
155-
line, err = reader.ReadString('\n')
202+
for {
203+
line, err := reader.ReadString('\n')
204+
// Write line even if there's an error, as long as we got data
205+
if len(line) > 0 {
206+
sbOut.WriteString(line)
207+
}
208+
if err != nil {
209+
break
210+
}
156211
}
157212
}(outPipe)
158213

@@ -178,10 +233,14 @@ func RunCommandAndTerminate(log *logrus.Entry, commandName string, arg ...string
178233
log.Errorf("Wait returned error: %v", err)
179234
}
180235

236+
log.Debug("Waiting for output goroutines to finish...")
237+
wg.Wait()
238+
181239
// TODO: find why this returns -1. That may be related to pty implementation
182240
/*if cmd.ProcessState.ExitCode() != 0 {
183-
return sb.String(), fmt.Errorf("Cmd returned code %d", cmd.ProcessState.ExitCode())
241+
return sbErr.String() + sbOut.String(), fmt.Errorf("Cmd returned code %d", cmd.ProcessState.ExitCode())
184242
}*/
185243

186-
return sb.String(), nil
244+
// Combine stderr first (errors more visible), then stdout
245+
return sbErr.String() + sbOut.String(), nil
187246
}

e2e/integration-tests/cli.go

Lines changed: 14 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -6,6 +6,7 @@ import (
66
"context"
77
"io/fs"
88
"os"
9+
"path/filepath"
910
"time"
1011

1112
"k8s.io/apimachinery/pkg/api/errors"
@@ -27,8 +28,8 @@ var (
2728
)
2829

2930
func isNamespace(clientset *kubernetes.Clientset, cliNS string, exists bool) (bool, error) {
30-
err := wait.PollUntilContextTimeout(context.Background(), PollInterval, PollTimeout, true, func(context.Context) (done bool, err error) {
31-
namespace, err := getNamespace(clientset, cliNS)
31+
err := wait.PollUntilContextTimeout(context.Background(), PollInterval, PollTimeout, true, func(ctx context.Context) (done bool, err error) {
32+
namespace, err := getNamespace(ctx, clientset, cliNS)
3233
if exists {
3334
if err != nil {
3435
return false, err
@@ -46,8 +47,8 @@ func isNamespace(clientset *kubernetes.Clientset, cliNS string, exists bool) (bo
4647
}
4748

4849
func isCollector(clientset *kubernetes.Clientset, cliNS string, ready bool) (bool, error) {
49-
err := wait.PollUntilContextTimeout(context.Background(), PollInterval, PollTimeout, true, func(context.Context) (done bool, err error) {
50-
collectorPod, err := getNamespacePods(clientset, cliNS, &metav1.ListOptions{FieldSelector: "status.phase=Running", LabelSelector: "run=collector"})
50+
err := wait.PollUntilContextTimeout(context.Background(), PollInterval, PollTimeout, true, func(ctx context.Context) (done bool, err error) {
51+
collectorPod, err := getNamespacePods(ctx, clientset, cliNS, &metav1.ListOptions{FieldSelector: "status.phase=Running", LabelSelector: "run=collector"})
5152
if err != nil {
5253
return false, err
5354
}
@@ -63,11 +64,11 @@ func isCollector(clientset *kubernetes.Clientset, cliNS string, ready bool) (boo
6364
}
6465

6566
func isDaemonsetReady(clientset *kubernetes.Clientset, daemonsetName string, cliNS string) (bool, error) {
66-
err := wait.PollUntilContextTimeout(context.Background(), PollInterval, PollTimeout, true, func(context.Context) (done bool, err error) {
67-
cliDaemonset, err := getDaemonSet(clientset, daemonsetName, cliNS)
67+
err := wait.PollUntilContextTimeout(context.Background(), PollInterval, PollTimeout, true, func(ctx context.Context) (done bool, err error) {
68+
cliDaemonset, err := getDaemonSet(ctx, clientset, daemonsetName, cliNS)
6869
if err != nil {
6970
if errors.IsNotFound(err) {
70-
clog.Info("daemonset not found")
71+
clog.Infof("daemonset not found %v", err)
7172
return false, nil
7273
}
7374
return false, err
@@ -124,13 +125,17 @@ func getFlowsJSONFile() (string, error) {
124125
// this could be problematic if two tests are running in parallel with --copy=true
125126
var mostRecentFile fs.FileInfo
126127
for _, file := range files {
127-
fileInfo, err := os.Stat(outputDir + file)
128+
fileInfo, err := os.Stat(filepath.Join(outputDir, file))
128129
if err != nil {
129130
return "", nil
130131
}
131132
if mostRecentFile == nil || fileInfo.ModTime().After(mostRecentFile.ModTime()) {
132133
mostRecentFile = fileInfo
133134
}
134135
}
135-
return outputDir + mostRecentFile.Name(), nil
136+
absPath, err := filepath.Abs(filepath.Join(outputDir, mostRecentFile.Name()))
137+
if err != nil {
138+
return "", err
139+
}
140+
return absPath, nil
136141
}

e2e/integration-tests/cluster.go

Lines changed: 12 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -39,7 +39,7 @@ func getNewClient() (*kubernetes.Clientset, error) {
3939
}
4040

4141
func getClusterNodes(clientset *kubernetes.Clientset, options *metav1.ListOptions) ([]string, error) {
42-
nodes, err := clientset.CoreV1().Nodes().List(context.TODO(), *options)
42+
nodes, err := clientset.CoreV1().Nodes().List(context.Background(), *options)
4343
var allNodes []string
4444
if err != nil {
4545
return allNodes, err
@@ -50,8 +50,8 @@ func getClusterNodes(clientset *kubernetes.Clientset, options *metav1.ListOption
5050
return allNodes, nil
5151
}
5252

53-
func getDaemonSet(clientset *kubernetes.Clientset, daemonset string, ns string) (*appsv1.DaemonSet, error) {
54-
ds, err := clientset.AppsV1().DaemonSets(ns).Get(context.TODO(), daemonset, metav1.GetOptions{})
53+
func getDaemonSet(ctx context.Context, clientset *kubernetes.Clientset, daemonset string, ns string) (*appsv1.DaemonSet, error) {
54+
ds, err := clientset.AppsV1().DaemonSets(ns).Get(ctx, daemonset, metav1.GetOptions{})
5555

5656
if err != nil {
5757
return nil, err
@@ -60,16 +60,16 @@ func getDaemonSet(clientset *kubernetes.Clientset, daemonset string, ns string)
6060
return ds, nil
6161
}
6262

63-
func getNamespace(clientset *kubernetes.Clientset, name string) (*corev1.Namespace, error) {
64-
namespace, err := clientset.CoreV1().Namespaces().Get(context.TODO(), name, metav1.GetOptions{})
63+
func getNamespace(ctx context.Context, clientset *kubernetes.Clientset, name string) (*corev1.Namespace, error) {
64+
namespace, err := clientset.CoreV1().Namespaces().Get(ctx, name, metav1.GetOptions{})
6565
if err != nil {
6666
return nil, err
6767
}
6868
return namespace, nil
6969
}
7070

71-
func getNamespacePods(clientset *kubernetes.Clientset, namespace string, options *metav1.ListOptions) ([]string, error) {
72-
pods, err := clientset.CoreV1().Pods(namespace).List(context.TODO(), *options)
71+
func getNamespacePods(ctx context.Context, clientset *kubernetes.Clientset, namespace string, options *metav1.ListOptions) ([]string, error) {
72+
pods, err := clientset.CoreV1().Pods(namespace).List(ctx, *options)
7373
var allPods []string
7474
if err != nil {
7575
return allPods, err
@@ -80,8 +80,8 @@ func getNamespacePods(clientset *kubernetes.Clientset, namespace string, options
8080
return allPods, nil
8181
}
8282

83-
func getConfigMap(clientset *kubernetes.Clientset, name string, namespace string) (*corev1.ConfigMap, error) {
84-
cm, err := clientset.CoreV1().ConfigMaps(namespace).Get(context.TODO(), name, metav1.GetOptions{})
83+
func getConfigMap(ctx context.Context, clientset *kubernetes.Clientset, name string, namespace string) (*corev1.ConfigMap, error) {
84+
cm, err := clientset.CoreV1().ConfigMaps(namespace).Get(ctx, name, metav1.GetOptions{})
8585
if err != nil {
8686
return nil, err
8787
}
@@ -111,7 +111,7 @@ func queryPrometheusMetric(clientset *kubernetes.Clientset, query string) (float
111111

112112
// Get the Prometheus route from openshift-monitoring namespace
113113
var routeGVR = schema.GroupVersionResource{Group: "route.openshift.io", Version: "v1", Resource: "routes"}
114-
unstructuredRoute, err := dynclient.Resource(routeGVR).Namespace("openshift-monitoring").Get(context.TODO(), "prometheus-k8s", metav1.GetOptions{})
114+
unstructuredRoute, err := dynclient.Resource(routeGVR).Namespace("openshift-monitoring").Get(context.Background(), "prometheus-k8s", metav1.GetOptions{})
115115
if err != nil {
116116
return 0.0, fmt.Errorf("failed to get prometheus route: %w", err)
117117
}
@@ -159,7 +159,7 @@ func queryPrometheusMetric(clientset *kubernetes.Clientset, query string) (float
159159

160160
var finalResult float64
161161
// Poll for 5 minutes at 20-second intervals
162-
err = wait.PollUntilContextTimeout(context.Background(), 20*time.Second, 5*time.Minute, false, func(context.Context) (done bool, err error) {
162+
err = wait.PollUntilContextTimeout(context.Background(), 20*time.Second, 5*time.Minute, false, func(ctx context.Context) (done bool, err error) {
163163
// Execute the request
164164
resp, err := httpClient.Do(req)
165165
if err != nil {
@@ -235,7 +235,7 @@ func createServiceAccountToken(clientset *kubernetes.Clientset, serviceAccountNa
235235
}
236236

237237
token, err := clientset.CoreV1().ServiceAccounts(namespace).CreateToken(
238-
context.TODO(),
238+
context.Background(),
239239
serviceAccountName,
240240
tokenRequest,
241241
metav1.CreateOptions{},

0 commit comments

Comments
 (0)