Skip to content

Commit 7285314

Browse files
authored
Refactor apps logs log streaming (#4086)
## Changes - Show body details on dial error only if it's a JSON - Extract backoff, tail buffer, formatter and consume state for better readability ## Why To increase code readability for easier maintenance. ## Tests Unit tests are in place. I also did manual tests for the command usage as before. ## Screenshots ### Before We printed HTML which weren't usefull really: <img width="1007" height="772" alt="Screenshot 2025-12-04 at 12 17 05" src="https://github.com/user-attachments/assets/d0281c44-dbed-4db5-8a46-656460b317c3" /> ### After We don't print HTML response. Only if JSON is returned then it would be printed. <img width="800" height="108" alt="image" src="https://github.com/user-attachments/assets/78bcf52c-8a45-48a9-a5f2-d83031e4538b" />
1 parent 8af2555 commit 7285314

File tree

8 files changed

+607
-349
lines changed

8 files changed

+607
-349
lines changed

libs/apps/logstream/backoff.go

Lines changed: 56 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,56 @@
1+
package logstream
2+
3+
import (
4+
"context"
5+
"time"
6+
)
7+
8+
const (
9+
initialReconnectBackoff = 200 * time.Millisecond
10+
maxReconnectBackoff = 5 * time.Second
11+
)
12+
13+
// backoffStrategy manages exponential backoff for reconnection attempts.
14+
type backoffStrategy struct {
15+
initial time.Duration
16+
max time.Duration
17+
current time.Duration
18+
}
19+
20+
// newBackoffStrategy creates a new backoffStrategy with the given initial and max durations.
21+
func newBackoffStrategy(initial, max time.Duration) *backoffStrategy {
22+
return &backoffStrategy{
23+
initial: initial,
24+
max: max,
25+
current: initial,
26+
}
27+
}
28+
29+
// Wait blocks until the current backoff duration has elapsed or the context is canceled.
30+
func (b *backoffStrategy) Wait(ctx context.Context) error {
31+
if b.current <= 0 {
32+
select {
33+
case <-ctx.Done():
34+
return ctx.Err()
35+
default:
36+
return nil
37+
}
38+
}
39+
40+
select {
41+
case <-ctx.Done():
42+
return ctx.Err()
43+
case <-time.After(b.current):
44+
return nil
45+
}
46+
}
47+
48+
// Next increases the backoff duration exponentially, capped at the max duration.
49+
func (b *backoffStrategy) Next() {
50+
b.current = min(b.current*2, b.max)
51+
}
52+
53+
// Reset returns the backoff duration to the initial value.
54+
func (b *backoffStrategy) Reset() {
55+
b.current = b.initial
56+
}
Lines changed: 70 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,70 @@
1+
package logstream
2+
3+
import (
4+
"context"
5+
"testing"
6+
"time"
7+
8+
"github.com/stretchr/testify/assert"
9+
"github.com/stretchr/testify/require"
10+
)
11+
12+
func TestBackoffStrategy_Next(t *testing.T) {
13+
b := newBackoffStrategy(100*time.Millisecond, 500*time.Millisecond)
14+
assert.Equal(t, 100*time.Millisecond, b.current)
15+
16+
b.Next()
17+
assert.Equal(t, 200*time.Millisecond, b.current)
18+
19+
b.Next()
20+
assert.Equal(t, 400*time.Millisecond, b.current)
21+
22+
assertMsg := "should be capped at max"
23+
b.Next()
24+
assert.Equal(t, 500*time.Millisecond, b.current, assertMsg)
25+
b.Next()
26+
assert.Equal(t, 500*time.Millisecond, b.current, assertMsg)
27+
}
28+
29+
func TestBackoffStrategy_Reset(t *testing.T) {
30+
b := newBackoffStrategy(100*time.Millisecond, 1*time.Second)
31+
assert.Equal(t, 100*time.Millisecond, b.current)
32+
33+
b.Next()
34+
b.Next()
35+
assert.Equal(t, 400*time.Millisecond, b.current)
36+
37+
b.Reset()
38+
assert.Equal(t, 100*time.Millisecond, b.current)
39+
}
40+
41+
func TestBackoffStrategy_Wait(t *testing.T) {
42+
t.Run("blocks for duration", func(t *testing.T) {
43+
b := newBackoffStrategy(50*time.Millisecond, 100*time.Millisecond)
44+
45+
start := time.Now()
46+
err := b.Wait(context.Background())
47+
elapsed := time.Since(start)
48+
49+
require.NoError(t, err)
50+
assert.GreaterOrEqual(t, elapsed, 50*time.Millisecond)
51+
assert.Less(t, elapsed, 100*time.Millisecond)
52+
})
53+
54+
t.Run("returns early on cancel", func(t *testing.T) {
55+
b := newBackoffStrategy(1*time.Second, 5*time.Second)
56+
57+
ctx, cancel := context.WithCancel(context.Background())
58+
go func() {
59+
time.Sleep(20 * time.Millisecond)
60+
cancel()
61+
}()
62+
63+
start := time.Now()
64+
err := b.Wait(ctx)
65+
elapsed := time.Since(start)
66+
67+
assert.ErrorIs(t, err, context.Canceled)
68+
assert.Less(t, elapsed, 100*time.Millisecond, "should return early on cancel")
69+
})
70+
}
Lines changed: 100 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,100 @@
1+
package logstream
2+
3+
import (
4+
"context"
5+
"fmt"
6+
"io"
7+
"time"
8+
)
9+
10+
// consumeState manages the tail buffer and flush timing during log consumption.
11+
type consumeState struct {
12+
buffer *tailBuffer
13+
writer io.Writer
14+
flushed bool
15+
flushDeadline time.Time
16+
tail int
17+
follow bool
18+
}
19+
20+
// newConsumeState creates a new consume state.
21+
func newConsumeState(tail int, follow bool, prefetch time.Duration, writer io.Writer, alreadyFlushed bool) *consumeState {
22+
s := &consumeState{
23+
buffer: &tailBuffer{size: tail},
24+
writer: writer,
25+
flushed: tail == 0 || alreadyFlushed,
26+
tail: tail,
27+
follow: follow,
28+
}
29+
if tail > 0 && prefetch > 0 && !alreadyFlushed {
30+
s.flushDeadline = time.Now().Add(prefetch)
31+
}
32+
return s
33+
}
34+
35+
// ReadDeadline returns the effective read deadline considering context and flush deadline.
36+
func (s *consumeState) ReadDeadline(ctx context.Context) time.Time {
37+
ctxDeadline, hasCtxDeadline := ctx.Deadline()
38+
39+
if !s.flushDeadline.IsZero() {
40+
if !hasCtxDeadline || s.flushDeadline.Before(ctxDeadline) {
41+
return s.flushDeadline
42+
}
43+
}
44+
45+
if hasCtxDeadline {
46+
return ctxDeadline
47+
}
48+
return time.Time{}
49+
}
50+
51+
// HasPendingFlushDeadline returns true if a prefetch flush deadline is pending.
52+
func (s *consumeState) HasPendingFlushDeadline() bool {
53+
return !s.flushDeadline.IsZero()
54+
}
55+
56+
// HandleFlushTimeout handles a prefetch timeout by flushing the buffer.
57+
// Only call this when HasPendingFlushDeadline() returns true.
58+
// Returns true if reading should continue (following), false if done (not following).
59+
func (s *consumeState) HandleFlushTimeout() (shouldContinue bool, err error) {
60+
s.flushDeadline = time.Time{}
61+
if s.tail > 0 && !s.flushed {
62+
if err := s.buffer.Flush(s.writer); err != nil {
63+
return false, err
64+
}
65+
s.flushed = true
66+
}
67+
return s.follow, nil
68+
}
69+
70+
// ProcessLine either buffers or writes the line depending on flush state.
71+
func (s *consumeState) ProcessLine(line string) error {
72+
if s.tail > 0 && !s.flushed {
73+
s.buffer.Add(line)
74+
if s.flushDeadline.IsZero() && s.buffer.Len() >= s.tail && s.follow {
75+
if err := s.buffer.Flush(s.writer); err != nil {
76+
return err
77+
}
78+
s.flushed = true
79+
}
80+
return nil
81+
}
82+
_, err := fmt.Fprintln(s.writer, line)
83+
return err
84+
}
85+
86+
// FlushRemaining flushes any remaining buffered lines.
87+
func (s *consumeState) FlushRemaining() error {
88+
if s.tail > 0 && !s.flushed {
89+
if err := s.buffer.Flush(s.writer); err != nil {
90+
return err
91+
}
92+
s.flushed = true
93+
}
94+
return nil
95+
}
96+
97+
// IsFlushed returns whether the buffer has been flushed.
98+
func (s *consumeState) IsFlushed() bool {
99+
return s.flushed
100+
}

libs/apps/logstream/formatter.go

Lines changed: 66 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,66 @@
1+
package logstream
2+
3+
import (
4+
"encoding/json"
5+
"fmt"
6+
"strings"
7+
"time"
8+
9+
"github.com/fatih/color"
10+
)
11+
12+
// wsEntry represents a structured log entry from the websocket stream.
13+
type wsEntry struct {
14+
Source string `json:"source"`
15+
Timestamp float64 `json:"timestamp"`
16+
Message string `json:"message"`
17+
}
18+
19+
// parseLogEntry parses a raw log entry from the websocket stream.
20+
func parseLogEntry(raw []byte) (*wsEntry, error) {
21+
var entry wsEntry
22+
if err := json.Unmarshal(raw, &entry); err != nil {
23+
return nil, err
24+
}
25+
return &entry, nil
26+
}
27+
28+
// logFormatter formats log entries for output.
29+
type logFormatter struct {
30+
colorize bool
31+
}
32+
33+
// newLogFormatter creates a new log formatter.
34+
func newLogFormatter(colorize bool) *logFormatter {
35+
return &logFormatter{colorize: colorize}
36+
}
37+
38+
// FormatEntry formats a structured log entry for output.
39+
func (f *logFormatter) FormatEntry(entry *wsEntry) string {
40+
timestamp := formatTimestamp(entry.Timestamp)
41+
source := strings.ToUpper(entry.Source)
42+
message := strings.TrimRight(entry.Message, "\r\n")
43+
44+
if f.colorize {
45+
timestamp = color.HiBlackString(timestamp)
46+
source = color.HiBlueString(source)
47+
}
48+
49+
return fmt.Sprintf("%s [%s] %s", timestamp, source, message)
50+
}
51+
52+
// FormatPlain formats a plain text message by trimming trailing newlines.
53+
func (f *logFormatter) FormatPlain(raw []byte) string {
54+
return strings.TrimRight(string(raw), "\r\n")
55+
}
56+
57+
// formatTimestamp formats a timestamp as a string.
58+
func formatTimestamp(ts float64) string {
59+
if ts <= 0 {
60+
return "----------"
61+
}
62+
sec := int64(ts)
63+
nsec := int64((ts - float64(sec)) * 1e9)
64+
t := time.Unix(sec, nsec).UTC()
65+
return t.Format(time.RFC3339)
66+
}

0 commit comments

Comments
 (0)