Skip to content

Commit af07d14

Browse files
committed
fix: update StartK8sStreamWithHeartBeat to use context for cancellation and improve error handling
1 parent bd5eb10 commit af07d14

3 files changed

Lines changed: 182 additions & 23 deletions

File tree

api/connector/Connector.go

Lines changed: 46 additions & 21 deletions
Original file line numberDiff line numberDiff line change
@@ -18,29 +18,30 @@ package connector
1818

1919
import (
2020
"bufio"
21+
"context"
2122
"encoding/json"
2223
"fmt"
24+
"io"
25+
"net/http"
26+
"strconv"
27+
"strings"
28+
"sync"
29+
"time"
30+
2331
"github.com/devtron-labs/devtron/api/bean"
2432
"github.com/gogo/protobuf/proto"
2533
"github.com/grpc-ecosystem/grpc-gateway/runtime"
2634
"github.com/juju/errors"
2735
"go.uber.org/zap"
2836
"google.golang.org/grpc/codes"
2937
"google.golang.org/grpc/status"
30-
"io"
31-
"net/http"
32-
"regexp"
33-
"strconv"
34-
"strings"
35-
"sync"
36-
"time"
3738
)
3839

3940
var delimiter = []byte("\n\n")
4041

4142
type Pump interface {
4243
StartStreamWithTransformer(w http.ResponseWriter, recv func() (proto.Message, error), err error, transformer func(interface{}) interface{})
43-
StartK8sStreamWithHeartBeat(w http.ResponseWriter, isReconnect bool, stream io.ReadCloser, err error)
44+
StartK8sStreamWithHeartBeat(ctx context.Context, w http.ResponseWriter, isReconnect bool, stream io.ReadCloser, err error)
4445
}
4546

4647
type PumpImpl struct {
@@ -53,7 +54,7 @@ func NewPumpImpl(logger *zap.SugaredLogger) *PumpImpl {
5354
}
5455
}
5556

56-
func (impl PumpImpl) StartK8sStreamWithHeartBeat(w http.ResponseWriter, isReconnect bool, stream io.ReadCloser, err error) {
57+
func (impl PumpImpl) StartK8sStreamWithHeartBeat(ctx context.Context, w http.ResponseWriter, isReconnect bool, stream io.ReadCloser, err error) {
5758
f, ok := w.(http.Flusher)
5859
if !ok {
5960
http.Error(w, "unexpected server doesnt support streaming", http.StatusInternalServerError)
@@ -82,47 +83,69 @@ func (impl PumpImpl) StartK8sStreamWithHeartBeat(w http.ResponseWriter, isReconn
8283
}
8384
// heartbeat start
8485
ticker := time.NewTicker(30 * time.Second)
85-
done := make(chan bool)
86+
done := make(chan struct{}) // close(done) never blocks, so no buffer needed
8687
var mux sync.Mutex
87-
go func() error {
88+
89+
go func() {
8890
for {
8991
select {
9092
case <-done:
91-
return nil
93+
return
94+
case <-ctx.Done():
95+
stream.Close() // unblocks the blocking bufReader.ReadString below
96+
return
9297
case t := <-ticker.C:
9398
mux.Lock()
9499
err := impl.sendEvent(nil, []byte("PING"), []byte(t.String()), w)
100+
if err == nil {
101+
f.Flush()
102+
}
95103
mux.Unlock()
96104
if err != nil {
97105
impl.logger.Errorw("error in writing PING over sse", "err", err)
98-
return err
106+
return
99107
}
100-
f.Flush()
101108
}
102109
}
103110
}()
104111
defer func() {
105112
ticker.Stop()
106-
done <- true
113+
stream.Close() // idempotent: safe to call after goroutine already closed it
114+
close(done) // signals goroutine to exit if still running
107115
}()
108116

109117
bufReader := bufio.NewReader(stream)
110118
eof := false
111119
for !eof {
120+
// fast-exit: if ctx expired between reads, return immediately
121+
select {
122+
case <-ctx.Done():
123+
return
124+
default:
125+
}
126+
112127
log, err := bufReader.ReadString('\n')
113128
if err == io.EOF {
114129
eof = true
115-
// stop if we reached end of stream and the next line is empty
116130
if log == "" {
117131
return
118132
}
119-
} else if err != nil && err != io.EOF {
133+
} else if err != nil {
134+
if ctx.Err() != nil {
135+
// stream was closed because ctx expired — not an application error
136+
return
137+
}
120138
impl.logger.Errorw("error in reading buffer string, StartK8sStreamWithHeartBeat", "err", err)
121139
return
122140
}
123-
log = strings.TrimSpace(log) // Remove trailing line ending
124-
a := regexp.MustCompile(" ")
125-
splitLog := a.Split(log, 2)
141+
log = strings.TrimSpace(log)
142+
if log == "" {
143+
continue // blank line mid-stream: skip without aborting
144+
}
145+
splitLog := strings.SplitN(log, " ", 2)
146+
if len(splitLog) < 2 {
147+
continue // no space separator: not a valid log line, skip
148+
}
126149
parsedTime, err := time.Parse(time.RFC3339, splitLog[0])
127150
if err != nil {
128151
impl.logger.Errorw("error in writing data over sse", "err", err)
@@ -133,12 +156,14 @@ func (impl PumpImpl) StartK8sStreamWithHeartBeat(w http.ResponseWriter, isReconn
133156
if len(splitLog) == 2 {
134157
err = impl.sendEvent([]byte(eventId), nil, []byte(splitLog[1]), w)
135158
}
159+
if err == nil {
160+
f.Flush()
161+
}
136162
mux.Unlock()
137163
if err != nil {
138164
impl.logger.Errorw("error in writing data over sse", "err", err)
139165
return
140166
}
141-
f.Flush()
142167
}
143168
// heartbeat end
144169
}

api/connector/connector_test.go

Lines changed: 135 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,135 @@
1+
package connector
2+
3+
import (
4+
"context"
5+
"io"
6+
"net/http/httptest"
7+
"strings"
8+
"testing"
9+
"time"
10+
11+
"go.uber.org/zap"
12+
)
13+
14+
func newTestPump() PumpImpl {
15+
logger, _ := zap.NewDevelopment()
16+
return PumpImpl{logger: logger.Sugar()}
17+
}
18+
19+
// blockingReader blocks on Read until Close is called, then returns io.EOF
20+
type blockingReader struct {
21+
ch chan struct{}
22+
}
23+
24+
func (b *blockingReader) Read(p []byte) (int, error) {
25+
<-b.ch
26+
return 0, io.EOF
27+
}
28+
func (b *blockingReader) Close() error {
29+
select {
30+
case <-b.ch:
31+
// already closed
32+
default:
33+
close(b.ch)
34+
}
35+
return nil
36+
}
37+
38+
func TestStartK8sStreamWithHeartBeat_CtxCancelled_Returns(t *testing.T) {
39+
t.Parallel()
40+
41+
cases := []struct {
42+
name string
43+
waitForStart bool // synchronise so cancel fires after goroutine entry
44+
}{
45+
{"cancel_before_goroutine_blocks", false},
46+
{"cancel_after_goroutine_entry", true},
47+
}
48+
49+
for _, tc := range cases {
50+
tc := tc
51+
t.Run(tc.name, func(t *testing.T) {
52+
t.Parallel()
53+
54+
pump := newTestPump()
55+
ctx, cancel := context.WithCancel(context.Background())
56+
defer cancel()
57+
58+
br := &blockingReader{ch: make(chan struct{})}
59+
w := httptest.NewRecorder()
60+
61+
started := make(chan struct{})
62+
done := make(chan struct{})
63+
go func() {
64+
defer close(done)
65+
close(started)
66+
pump.StartK8sStreamWithHeartBeat(ctx, w, false, br, nil)
67+
}()
68+
69+
if tc.waitForStart {
70+
<-started
71+
}
72+
cancel()
73+
74+
select {
75+
case <-done:
76+
// success: function returned after ctx cancel without deadlock
77+
case <-time.After(3 * time.Second):
78+
t.Fatalf("StartK8sStreamWithHeartBeat did not return after ctx cancel (%s)", tc.name)
79+
}
80+
})
81+
}
82+
}
83+
84+
// fakeStream returns a ReadCloser that emits fixed content then EOF.
85+
type fakeStream struct {
86+
r io.Reader
87+
done chan struct{}
88+
}
89+
90+
func newFakeStream(content string) *fakeStream {
91+
return &fakeStream{r: strings.NewReader(content), done: make(chan struct{})}
92+
}
93+
func (f *fakeStream) Read(p []byte) (int, error) { return f.r.Read(p) }
94+
func (f *fakeStream) Close() error {
95+
select {
96+
case <-f.done:
97+
default:
98+
close(f.done)
99+
}
100+
return nil
101+
}
102+
103+
func TestStartK8sStreamWithHeartBeat_MalformedLines_DoNotAbortStream(t *testing.T) {
104+
t.Parallel()
105+
cases := []struct {
106+
name string
107+
payload string
108+
}{
109+
{"blank_line_mid_stream", "2024-01-01T00:00:00Z hello world\n\n2024-01-01T00:00:01Z second line\n"},
110+
{"line_without_space", "2024-01-01T00:00:00Z hello\nnotimestamp\n2024-01-01T00:00:02Z after bad\n"},
111+
{"empty_stream", ""},
112+
}
113+
for _, tc := range cases {
114+
tc := tc
115+
t.Run(tc.name, func(t *testing.T) {
116+
t.Parallel()
117+
pump := newTestPump()
118+
ctx, cancel := context.WithCancel(context.Background())
119+
defer cancel()
120+
stream := newFakeStream(tc.payload)
121+
w := httptest.NewRecorder()
122+
done := make(chan struct{})
123+
go func() {
124+
defer close(done)
125+
pump.StartK8sStreamWithHeartBeat(ctx, w, false, stream, nil)
126+
}()
127+
select {
128+
case <-done:
129+
// success: returned cleanly without deadlock
130+
case <-time.After(3 * time.Second):
131+
t.Fatalf("stream did not complete in time (%s)", tc.name)
132+
}
133+
})
134+
}
135+
}

api/k8s/application/k8sApplicationRestHandler.go

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -591,8 +591,7 @@ func (handler *K8sApplicationRestHandlerImpl) GetPodLogs(w http.ResponseWriter,
591591
}(ctx.Done(), cn.CloseNotify())
592592
}
593593
defer cancel()
594-
defer util.Close(stream, handler.logger)
595-
handler.pump.StartK8sStreamWithHeartBeat(w, isReconnect, stream, err)
594+
handler.pump.StartK8sStreamWithHeartBeat(ctx, w, isReconnect, stream, err)
596595
}
597596

598597
func (handler *K8sApplicationRestHandlerImpl) DownloadPodLogs(w http.ResponseWriter, r *http.Request) {

0 commit comments

Comments
 (0)