Skip to content

Commit 358778e

Browse files
authored
fixing cloud agents log streaming early exit based on actual logs being received (#834)
* fixing log streaming early exit based on real logs being received * feedback
1 parent 49e93af commit 358778e

File tree

2 files changed

+191
-4
lines changed

2 files changed

+191
-4
lines changed

pkg/cloudagents/logs.go

Lines changed: 0 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -22,7 +22,6 @@ import (
2222
"io"
2323
"net/http"
2424
"net/url"
25-
"strings"
2625
)
2726

2827
type APIError struct {
@@ -71,9 +70,6 @@ func (c *Client) StreamLogs(ctx context.Context, logType, agentID string, writer
7170
return nil
7271
}
7372
line := scanner.Text()
74-
if strings.HasPrefix(line, "ERROR:") {
75-
return fmt.Errorf("%s", strings.TrimPrefix(line, "ERROR: "))
76-
}
7773
if _, err := fmt.Fprintln(writer, line); err != nil {
7874
return fmt.Errorf("failed to write log line: %w", err)
7975
}

pkg/cloudagents/logs_test.go

Lines changed: 191 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,191 @@
1+
package cloudagents
2+
3+
import (
4+
"bytes"
5+
"context"
6+
"encoding/json"
7+
"errors"
8+
"fmt"
9+
"io"
10+
"net/http"
11+
"net/http/httptest"
12+
"strings"
13+
"testing"
14+
"time"
15+
16+
"github.com/livekit/protocol/logger"
17+
)
18+
19+
func TestStreamLogs_WriterClosesEarly(t *testing.T) {
20+
_, pw := io.Pipe()
21+
22+
server := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
23+
w.WriteHeader(http.StatusOK)
24+
flusher := w.(http.Flusher)
25+
for i := 0; i < 10; i++ {
26+
fmt.Fprintf(w, "log line %d\n", i)
27+
flusher.Flush()
28+
time.Sleep(50 * time.Millisecond)
29+
}
30+
}))
31+
defer server.Close()
32+
33+
client := &Client{
34+
httpClient: server.Client(),
35+
logger: logger.GetLogger(),
36+
apiKey: "test-api-key",
37+
apiSecret: "test-api-secret",
38+
projectURL: server.URL,
39+
}
40+
41+
go func() {
42+
time.Sleep(100 * time.Millisecond)
43+
pw.Close()
44+
}()
45+
46+
err := client.StreamLogs(context.Background(), "deploy", "test-agent", pw, "us-west")
47+
48+
if err == nil {
49+
t.Fatal("expected error when writer closes, got nil")
50+
}
51+
if !strings.Contains(err.Error(), "failed to write log line") {
52+
t.Errorf("expected 'failed to write log line' error, got: %v", err)
53+
}
54+
}
55+
56+
func TestStreamLogs_ContextCanceledDuringWrite(t *testing.T) {
57+
server := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
58+
w.WriteHeader(http.StatusOK)
59+
flusher := w.(http.Flusher)
60+
for i := 0; i < 100; i++ {
61+
fmt.Fprintf(w, "log line %d\n", i)
62+
flusher.Flush()
63+
time.Sleep(50 * time.Millisecond)
64+
}
65+
}))
66+
defer server.Close()
67+
68+
client := &Client{
69+
httpClient: server.Client(),
70+
logger: logger.GetLogger(),
71+
apiKey: "test-api-key",
72+
apiSecret: "test-api-secret",
73+
projectURL: server.URL,
74+
}
75+
76+
ctx, cancel := context.WithCancel(context.Background())
77+
var buf bytes.Buffer
78+
79+
go func() {
80+
time.Sleep(100 * time.Millisecond)
81+
cancel()
82+
}()
83+
84+
err := client.StreamLogs(ctx, "deploy", "test-agent", &buf, "us-west")
85+
86+
if !errors.Is(err, context.Canceled) {
87+
t.Errorf("expected context.Canceled error, got: %v", err)
88+
}
89+
}
90+
91+
type failingWriter struct {
92+
failAfter int
93+
written int
94+
}
95+
96+
func (fw *failingWriter) Write(p []byte) (n int, err error) {
97+
fw.written++
98+
if fw.written > fw.failAfter {
99+
return 0, fmt.Errorf("writer closed")
100+
}
101+
return len(p), nil
102+
}
103+
104+
func TestStreamLogs_WriterReturnsError(t *testing.T) {
105+
server := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
106+
w.WriteHeader(http.StatusOK)
107+
for i := 0; i < 10; i++ {
108+
fmt.Fprintf(w, "log line %d\n", i)
109+
}
110+
}))
111+
defer server.Close()
112+
113+
client := &Client{
114+
httpClient: server.Client(),
115+
logger: logger.GetLogger(),
116+
apiKey: "test-api-key",
117+
apiSecret: "test-api-secret",
118+
projectURL: server.URL,
119+
}
120+
121+
writer := &failingWriter{failAfter: 2}
122+
err := client.StreamLogs(context.Background(), "deploy", "test-agent", writer, "us-west")
123+
124+
if err == nil {
125+
t.Fatal("expected error from failing writer, got nil")
126+
}
127+
if !strings.Contains(err.Error(), "failed to write log line") {
128+
t.Errorf("expected 'failed to write log line' error, got: %v", err)
129+
}
130+
}
131+
132+
func TestStreamLogs_NonOKResponse(t *testing.T) {
133+
server := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
134+
w.WriteHeader(http.StatusBadRequest)
135+
json.NewEncoder(w).Encode(APIError{
136+
Message: "bad request",
137+
Meta: map[string]string{"foo": "bar"},
138+
})
139+
}))
140+
defer server.Close()
141+
142+
client := &Client{
143+
httpClient: server.Client(),
144+
logger: logger.GetLogger(),
145+
apiKey: "test-api-key",
146+
apiSecret: "test-api-secret",
147+
projectURL: server.URL,
148+
}
149+
150+
err := client.StreamLogs(context.Background(), "deploy", "test-agent", &bytes.Buffer{}, "us-west")
151+
if err == nil {
152+
t.Fatal("expected error when server responds with non-200 status")
153+
}
154+
if !strings.Contains(err.Error(), "failed to get logs") {
155+
t.Errorf("unexpected error message: %v", err)
156+
}
157+
}
158+
159+
func TestStreamLogs_ServerClosesConnection(t *testing.T) {
160+
done := make(chan struct{})
161+
server := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
162+
w.WriteHeader(http.StatusOK)
163+
flusher := w.(http.Flusher)
164+
fmt.Fprintln(w, "log line 1")
165+
flusher.Flush()
166+
<-done
167+
}))
168+
defer server.Close()
169+
defer close(done)
170+
171+
go func() {
172+
time.Sleep(50 * time.Millisecond)
173+
server.CloseClientConnections()
174+
}()
175+
176+
client := &Client{
177+
httpClient: server.Client(),
178+
logger: logger.GetLogger(),
179+
apiKey: "test-api-key",
180+
apiSecret: "test-api-secret",
181+
projectURL: server.URL,
182+
}
183+
184+
err := client.StreamLogs(context.Background(), "deploy", "test-agent", &bytes.Buffer{}, "us-west")
185+
if err == nil {
186+
t.Fatal("expected error when server closes connection mid-stream")
187+
}
188+
if !strings.Contains(err.Error(), "scanner error") && !strings.Contains(err.Error(), "unexpected EOF") {
189+
t.Fatalf("unexpected error when server disconnects: %v", err)
190+
}
191+
}

0 commit comments

Comments
 (0)