Skip to content

Commit 70af0ff

Browse files
authored
fix: Collect logs from all pods specified in logs collector spec (#952)
Fixes: #945
1 parent 4601db5 commit 70af0ff

File tree

3 files changed

+106
-51
lines changed

3 files changed

+106
-51
lines changed

pkg/collect/logs.go

Lines changed: 29 additions & 37 deletions
Original file line numberDiff line numberDiff line change
@@ -10,6 +10,7 @@ import (
1010

1111
"github.com/pkg/errors"
1212
troubleshootv1beta2 "github.com/replicatedhq/troubleshoot/pkg/apis/troubleshoot/v1beta2"
13+
"github.com/replicatedhq/troubleshoot/pkg/constants"
1314
"github.com/replicatedhq/troubleshoot/pkg/logger"
1415
corev1 "k8s.io/api/core/v1"
1516
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
@@ -20,7 +21,7 @@ import (
2021
type CollectLogs struct {
2122
Collector *troubleshootv1beta2.Logs
2223
BundlePath string
23-
Namespace string // There is a Namespace parameter in troubleshootv1beta2.Logs. Should we remove this?
24+
Namespace string // TODO: There is a Namespace parameter in troubleshootv1beta2.Logs. Should we remove this?
2425
ClientConfig *rest.Config
2526
Client kubernetes.Interface
2627
Context context.Context
@@ -42,24 +43,30 @@ func (c *CollectLogs) Collect(progressChan chan<- interface{}) (CollectorResult,
4243
return nil, err
4344
}
4445

45-
output := NewResult()
46+
return c.CollectWithClient(progressChan, client)
4647

47-
ctx := context.Background()
48+
}
4849

49-
const timeout = 60 //timeout in seconds used for context timeout value
50+
// CollectWithClient is a helper function that allows passing in a kubernetes client
51+
// It's a stopgap implementation before it's decided whether to either always use a single
52+
// client for collectors or leave the implementation as is.
53+
// Ref: https://github.com/replicatedhq/troubleshoot/pull/821#discussion_r1026258904
54+
func (c *CollectLogs) CollectWithClient(progressChan chan<- interface{}, client kubernetes.Interface) (CollectorResult, error) {
55+
out := NewResult()
5056

51-
// timeout context
52-
ctxTimeout, cancel := context.WithTimeout(context.Background(), timeout*time.Second)
57+
ctx, cancel := context.WithTimeout(c.Context, constants.DEFAULT_LOGS_COLLECTOR_TIMEOUT)
5358
defer cancel()
5459

5560
errCh := make(chan error, 1)
56-
resultCh := make(chan CollectorResult, 1)
57-
58-
//wrapped code go func for context timeout solution
59-
go func() {
60-
61-
output := NewResult()
62-
61+
done := make(chan struct{}, 1)
62+
63+
// Collect logs in a go routine to allow timing out of long running operations
64+
// If a timeout occurs, the passed in collector result will contain logs collected
65+
// prior. We want this to be the case so as to have some logs in the support bundle
66+
// even if not from all expected pods.
67+
// TODO: In future all collectors will have a timeout. This will be implemented in the
68+
// framework level (caller of Collect function). Remove this code when we get there.
69+
go func(output CollectorResult) {
6370
if c.SinceTime != nil {
6471
if c.Collector.Limits == nil {
6572
c.Collector.Limits = new(troubleshootv1beta2.LogLimits)
@@ -98,8 +105,6 @@ func (c *CollectLogs) Collect(progressChan chan<- interface{}) (CollectorResult,
98105
continue
99106
}
100107
output.AddResult(podLogs)
101-
102-
resultCh <- output
103108
}
104109
} else {
105110
for _, container := range c.Collector.ContainerNames {
@@ -113,24 +118,24 @@ func (c *CollectLogs) Collect(progressChan chan<- interface{}) (CollectorResult,
113118
continue
114119
}
115120
output.AddResult(containerLogs)
116-
resultCh <- output
117121
}
118122
}
119123
}
120-
} else {
121-
resultCh <- output
122124
}
123-
}()
125+
126+
// Send a signal to indicate that we are done collecting logs
127+
done <- struct{}{}
128+
}(out)
124129

125130
select {
126-
case <-ctxTimeout.Done():
127-
return nil, fmt.Errorf("%s (%s) collector timeout exceeded", c.Title(), c.Collector.CollectorName)
128-
case o := <-resultCh:
129-
output = o
131+
case <-ctx.Done():
132+
// When we timeout, return the logs we have collected so far
133+
return out, fmt.Errorf("%s (%s) collector timeout exceeded", c.Title(), c.Collector.CollectorName)
134+
case <-done:
135+
return out, nil
130136
case err := <-errCh:
131137
return nil, err
132138
}
133-
return output, nil
134139
}
135140

136141
func listPodsInSelectors(ctx context.Context, client kubernetes.Interface, namespace string, selector []string) ([]corev1.Pod, []string) {
@@ -149,19 +154,6 @@ func listPodsInSelectors(ctx context.Context, client kubernetes.Interface, names
149154
}
150155

151156
func savePodLogs(
152-
ctx context.Context,
153-
bundlePath string,
154-
client *kubernetes.Clientset,
155-
pod *corev1.Pod,
156-
collectorName, container string,
157-
limits *troubleshootv1beta2.LogLimits,
158-
follow bool,
159-
createSymLinks bool,
160-
) (CollectorResult, error) {
161-
return savePodLogsWithInterface(ctx, bundlePath, client, pod, collectorName, container, limits, follow, createSymLinks)
162-
}
163-
164-
func savePodLogsWithInterface(
165157
ctx context.Context,
166158
bundlePath string,
167159
client kubernetes.Interface,

pkg/collect/logs_test.go

Lines changed: 73 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -7,8 +7,10 @@ import (
77

88
troubleshootv1beta2 "github.com/replicatedhq/troubleshoot/pkg/apis/troubleshoot/v1beta2"
99
"github.com/stretchr/testify/assert"
10+
"github.com/stretchr/testify/require"
1011
corev1 "k8s.io/api/core/v1"
1112
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
13+
"k8s.io/client-go/kubernetes"
1214
testclient "k8s.io/client-go/kubernetes/fake"
1315
)
1416

@@ -158,25 +160,82 @@ func Test_savePodLogs(t *testing.T) {
158160
MaxLines: 500,
159161
MaxBytes: 10000000,
160162
}
161-
pod, err := client.CoreV1().Pods("my-namespace").Create(ctx, &corev1.Pod{
162-
ObjectMeta: metav1.ObjectMeta{
163-
Name: "test-pod",
164-
},
165-
Spec: corev1.PodSpec{
166-
Containers: []corev1.Container{
167-
{
168-
Name: containerName,
169-
},
170-
},
171-
},
172-
}, metav1.CreateOptions{})
173-
assert.NoError(t, err)
163+
pod, err := createPod(client, containerName, "test-pod", "my-namespace")
164+
require.NoError(t, err)
174165
if !tt.withContainerName {
175166
containerName = ""
176167
}
177-
got, err := savePodLogsWithInterface(ctx, "", client, pod, tt.collectorName, containerName, limits, false, tt.createSymLinks)
168+
got, err := savePodLogs(ctx, "", client, pod, tt.collectorName, containerName, limits, false, tt.createSymLinks)
178169
assert.NoError(t, err)
179170
assert.Equal(t, tt.want, got)
180171
})
181172
}
182173
}
174+
175+
func Test_CollectLogs(t *testing.T) {
176+
tests := []struct {
177+
name string
178+
collectorName string
179+
podNames []string
180+
want CollectorResult
181+
}{
182+
{
183+
name: "from multiple pods",
184+
collectorName: "all-logs",
185+
podNames: []string{
186+
"firstPod",
187+
"secondPod",
188+
},
189+
want: CollectorResult{
190+
"all-logs/firstPod/nginx.log": []byte("fake logs"),
191+
"all-logs/firstPod/nginx-previous.log": []byte("fake logs"),
192+
"all-logs/secondPod/nginx.log": []byte("fake logs"),
193+
"all-logs/secondPod/nginx-previous.log": []byte("fake logs"),
194+
"cluster-resources/pods/logs/my-namespace/firstPod/nginx.log": []byte("fake logs"),
195+
"cluster-resources/pods/logs/my-namespace/firstPod/nginx-previous.log": []byte("fake logs"),
196+
"cluster-resources/pods/logs/my-namespace/secondPod/nginx.log": []byte("fake logs"),
197+
"cluster-resources/pods/logs/my-namespace/secondPod/nginx-previous.log": []byte("fake logs"),
198+
},
199+
},
200+
}
201+
for _, tt := range tests {
202+
t.Run(tt.name, func(t *testing.T) {
203+
ctx := context.TODO()
204+
ns := "my-namespace"
205+
client := testclient.NewSimpleClientset()
206+
207+
for _, podName := range tt.podNames {
208+
_, err := createPod(client, "nginx", podName, ns)
209+
require.NoError(t, err)
210+
}
211+
212+
progresChan := make(chan any)
213+
c := &CollectLogs{
214+
Context: ctx,
215+
Namespace: ns,
216+
Collector: &troubleshootv1beta2.Logs{
217+
Name: tt.collectorName,
218+
},
219+
}
220+
got, err := c.CollectWithClient(progresChan, client)
221+
222+
assert.NoError(t, err)
223+
assert.Equal(t, tt.want, got)
224+
})
225+
}
226+
}
227+
228+
func createPod(client kubernetes.Interface, containerName, podName, ns string) (*corev1.Pod, error) {
229+
return client.CoreV1().Pods(ns).Create(context.TODO(), &corev1.Pod{
230+
ObjectMeta: metav1.ObjectMeta{
231+
Name: podName,
232+
},
233+
Spec: corev1.PodSpec{
234+
Containers: []corev1.Container{
235+
{
236+
Name: containerName,
237+
},
238+
},
239+
},
240+
}, metav1.CreateOptions{})
241+
}

pkg/constants/constants.go

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,7 @@
11
package constants
22

3+
import "time"
4+
35
const (
46
// DEFAULT_CLIENT_QPS indicates the maximum QPS from troubleshoot client.
57
DEFAULT_CLIENT_QPS = 100
@@ -9,4 +11,6 @@ const (
911
DEFAULT_CLIENT_USER_AGENT = "ReplicatedTroubleshoot"
1012
// VersionFilename is the name of the file that contains the support bundle version.
1113
VersionFilename = "version.yaml"
14+
// DEFAULT_LOGS_COLLECTOR_TIMEOUT is the default timeout for logs collector.
15+
DEFAULT_LOGS_COLLECTOR_TIMEOUT = 60 * time.Second
1216
)

0 commit comments

Comments
 (0)