Skip to content

Commit 1bd0911

Browse files
committed
fix: add timeout for stuck log streaming (#6961)
1 parent 22fb804 commit 1bd0911

File tree

2 files changed

+189
-2
lines changed

2 files changed

+189
-2
lines changed

pkg/testworkflows/executionworker/controller/logs.go

Lines changed: 42 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -4,9 +4,11 @@ import (
44
"bufio"
55
"bytes"
66
"context"
7+
"fmt"
78
"io"
89
"strings"
910
"sync"
11+
"sync/atomic"
1012
"time"
1113
"unsafe"
1214

@@ -29,6 +31,7 @@ const (
2931

3032
LogRetryOnConnectionLostDelay = 300 * time.Millisecond
3133
LogRetryOnWaitingForStartDelay = 100 * time.Millisecond
34+
LogStreamIdleTimeout = 30 * time.Second
3235
)
3336

3437
type Comment struct {
@@ -114,10 +117,21 @@ func getContainerLogsStream(ctx context.Context, clientSet kubernetes.Interface,
114117
return stream, nil
115118
}
116119

120+
type logStreamOpener func(ctx context.Context, clientSet kubernetes.Interface, namespace, podName, containerName string, isDone func() bool, since *time.Time) (io.Reader, error)
121+
117122
func WatchContainerLogs(parentCtx context.Context, clientSet kubernetes.Interface, namespace, podName, containerName string, bufferSize int, isDone func() bool, isLastHint func(*instructions.Instruction) bool) <-chan ChannelMessage[ContainerLog] {
123+
return watchContainerLogsWithStream(parentCtx, getContainerLogsStream, clientSet, namespace, podName, containerName, bufferSize, isDone, isLastHint, LogStreamIdleTimeout)
124+
}
125+
126+
func watchContainerLogsWithStream(parentCtx context.Context, opener logStreamOpener, clientSet kubernetes.Interface, namespace, podName, containerName string, bufferSize int, isDone func() bool, isLastHint func(*instructions.Instruction) bool, idleTimeout time.Duration) <-chan ChannelMessage[ContainerLog] {
118127
ctx, ctxCancel := context.WithCancel(parentCtx)
119128
ch := make(chan ChannelMessage[ContainerLog], bufferSize)
120129
var mu sync.Mutex
130+
var lastActivity int64
131+
atomic.StoreInt64(&lastActivity, time.Now().UnixNano())
132+
touch := func() {
133+
atomic.StoreInt64(&lastActivity, time.Now().UnixNano())
134+
}
121135

122136
sendError := func(err error) {
123137
defer func() {
@@ -144,14 +158,37 @@ func WatchContainerLogs(parentCtx context.Context, clientSet kubernetes.Interfac
144158
close(ch)
145159
}()
146160

161+
if idleTimeout > 0 {
162+
go func() {
163+
ticker := time.NewTicker(idleTimeout)
164+
defer ticker.Stop()
165+
for {
166+
select {
167+
case <-ctx.Done():
168+
return
169+
case <-ticker.C:
170+
if !isDone() {
171+
continue
172+
}
173+
last := time.Unix(0, atomic.LoadInt64(&lastActivity))
174+
if time.Since(last) >= idleTimeout {
175+
sendError(fmt.Errorf("log stream idle timeout after %s", idleTimeout))
176+
ctxCancel()
177+
return
178+
}
179+
}
180+
}
181+
}()
182+
}
183+
147184
go func() {
148185
defer ctxCancel()
149186
var err error
150187

151188
var since *time.Time
152189

153190
// Create logs stream request
154-
stream, err := getContainerLogsStream(ctx, clientSet, namespace, podName, containerName, isDone, since)
191+
stream, err := opener(ctx, clientSet, namespace, podName, containerName, isDone, since)
155192
if err == io.EOF {
156193
return
157194
} else if err != nil {
@@ -268,6 +305,9 @@ func WatchContainerLogs(parentCtx context.Context, clientSet kubernetes.Interfac
268305

269306
// Read next timestamp
270307
err = tsReader.Read(reader)
308+
if err == nil || errors.Is(err, ErrInvalidTimestamp) {
309+
touch()
310+
}
271311

272312
// Handle context canceled
273313
if errors.Is(err, context.Canceled) {
@@ -305,7 +345,7 @@ func WatchContainerLogs(parentCtx context.Context, clientSet kubernetes.Interfac
305345
// Similarly for GOAWAY, that may be caused by too long connection.
306346
if err == io.EOF || (err != nil && strings.Contains(err.Error(), "GOAWAY")) {
307347
since = common.Ptr(lastTs.Add(1))
308-
stream, err = getContainerLogsStream(ctx, clientSet, namespace, podName, containerName, isDone, since)
348+
stream, err = opener(ctx, clientSet, namespace, podName, containerName, isDone, since)
309349
if err != nil {
310350
return
311351
}

pkg/testworkflows/executionworker/controller/logs_test.go

Lines changed: 147 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -3,11 +3,16 @@ package controller
33
import (
44
"bufio"
55
"bytes"
6+
"context"
67
"io"
8+
"sync/atomic"
79
"testing"
810
"time"
911

1012
"github.com/stretchr/testify/assert"
13+
"k8s.io/client-go/kubernetes"
14+
15+
"github.com/kubeshop/testkube/cmd/testworkflow-init/instructions"
1116
)
1217

1318
func Test_ReadTimestamp_UTC_Initial(t *testing.T) {
@@ -65,3 +70,145 @@ func Test_ReadTimestamp_NonUTC_Recurring(t *testing.T) {
6570
assert.Equal(t, []byte(message), rest)
6671
assert.Equal(t, time.Date(2024, 6, 7, 12, 41, 49, 37275300, time.UTC), reader.ts)
6772
}
73+
74+
type blockingReader struct {
75+
ctx context.Context
76+
}
77+
78+
func (r *blockingReader) Read(p []byte) (int, error) {
79+
<-r.ctx.Done()
80+
return 0, r.ctx.Err()
81+
}
82+
83+
func TestWatchContainerLogsIdleTimeoutCancelsWhenDone(t *testing.T) {
84+
t.Parallel()
85+
86+
ctx, cancel := context.WithCancel(context.Background())
87+
defer cancel()
88+
89+
idleTimeout := 50 * time.Millisecond
90+
ch := watchContainerLogsWithStream(
91+
ctx,
92+
func(ctx context.Context, _ kubernetes.Interface, _, _, _ string, _ func() bool, _ *time.Time) (io.Reader, error) {
93+
return &blockingReader{ctx: ctx}, nil
94+
},
95+
nil,
96+
"default",
97+
"pod",
98+
"container",
99+
1,
100+
func() bool { return true },
101+
func(*instructions.Instruction) bool { return false },
102+
idleTimeout,
103+
)
104+
105+
deadline := time.NewTimer(500 * time.Millisecond)
106+
defer deadline.Stop()
107+
108+
var gotErr bool
109+
for {
110+
select {
111+
case msg, ok := <-ch:
112+
if !ok {
113+
assert.True(t, gotErr, "expected idle timeout error before channel close")
114+
return
115+
}
116+
if msg.Error != nil {
117+
gotErr = true
118+
assert.Contains(t, msg.Error.Error(), "idle timeout")
119+
}
120+
case <-deadline.C:
121+
t.Fatal("timed out waiting for idle timeout to close the channel")
122+
}
123+
}
124+
}
125+
126+
func TestWatchContainerLogsReopensOnEOF(t *testing.T) {
127+
t.Parallel()
128+
129+
ctx, cancel := context.WithCancel(context.Background())
130+
defer cancel()
131+
132+
var calls int32
133+
line := "2024-06-07T12:41:49.037275300Z hello\n"
134+
ch := watchContainerLogsWithStream(
135+
ctx,
136+
func(_ context.Context, _ kubernetes.Interface, _, _, _ string, _ func() bool, _ *time.Time) (io.Reader, error) {
137+
call := atomic.AddInt32(&calls, 1)
138+
if call == 1 {
139+
return bytes.NewBufferString(line), nil
140+
}
141+
return bytes.NewBuffer(nil), nil
142+
},
143+
nil,
144+
"default",
145+
"pod",
146+
"container",
147+
5,
148+
func() bool { return false },
149+
func(*instructions.Instruction) bool { return false },
150+
500*time.Millisecond,
151+
)
152+
153+
deadline := time.NewTimer(2 * time.Second)
154+
defer deadline.Stop()
155+
156+
var gotLog bool
157+
for {
158+
select {
159+
case msg, ok := <-ch:
160+
if !ok {
161+
assert.True(t, gotLog, "expected at least one log message before channel close")
162+
assert.GreaterOrEqual(t, atomic.LoadInt32(&calls), int32(2))
163+
return
164+
}
165+
if msg.Error != nil {
166+
t.Fatalf("unexpected error from logs channel: %v", msg.Error)
167+
}
168+
if bytes.Contains(msg.Value.Log, []byte("hello")) {
169+
gotLog = true
170+
}
171+
case <-deadline.C:
172+
t.Fatal("timed out waiting for log stream to close")
173+
}
174+
}
175+
}
176+
177+
func TestWatchContainerLogsDoneWithNoLogsCloses(t *testing.T) {
178+
t.Parallel()
179+
180+
ctx, cancel := context.WithCancel(context.Background())
181+
defer cancel()
182+
183+
ch := watchContainerLogsWithStream(
184+
ctx,
185+
func(_ context.Context, _ kubernetes.Interface, _, _, _ string, _ func() bool, _ *time.Time) (io.Reader, error) {
186+
return bytes.NewBuffer(nil), nil
187+
},
188+
nil,
189+
"default",
190+
"pod",
191+
"container",
192+
1,
193+
func() bool { return true },
194+
func(*instructions.Instruction) bool { return false },
195+
500*time.Millisecond,
196+
)
197+
198+
deadline := time.NewTimer(500 * time.Millisecond)
199+
defer deadline.Stop()
200+
201+
for {
202+
select {
203+
case msg, ok := <-ch:
204+
if !ok {
205+
return
206+
}
207+
if msg.Error != nil {
208+
t.Fatalf("unexpected error from logs channel: %v", msg.Error)
209+
}
210+
case <-deadline.C:
211+
t.Fatal("timed out waiting for log stream to close")
212+
}
213+
}
214+
}

0 commit comments

Comments
 (0)