Skip to content

Commit f8ca238

Browse files
authored
fix: Add custom log reader implementation to fix hang on long log lines (#263)
This fixes an issue where the CLI would hang indefinitely when it encountered a log line from a plugin that was longer than the buffer size allowed for by `bufio.Scanner`. The fix uses a wrapper around `bufio.NewReader` to skip lines that are too long to be parsed, printing an error in the log with the first 1000 characters to help with identification.
1 parent 0042973 commit f8ca238

File tree

4 files changed

+205
-12
lines changed

4 files changed

+205
-12
lines changed

clients/destination.go

Lines changed: 18 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -1,10 +1,11 @@
11
package clients
22

33
import (
4-
"bufio"
54
"context"
65
"encoding/json"
6+
"errors"
77
"fmt"
8+
"io"
89
"net"
910
"os"
1011
"os/exec"
@@ -124,12 +125,23 @@ func (c *DestinationClient) newManagedClient(ctx context.Context, path string) (
124125
c.wg.Add(1)
125126
go func() {
126127
defer c.wg.Done()
127-
scanner := bufio.NewScanner(reader)
128-
for scanner.Scan() {
128+
lr := newLogReader(reader)
129+
for {
130+
line, err := lr.NextLine()
131+
if errors.Is(err, io.EOF) {
132+
break
133+
}
134+
if errors.Is(err, errLogLineToLong) {
135+
c.logger.Err(err).Str("line", string(line)).Msg("skipping too long log line")
136+
continue
137+
}
138+
if err != nil {
139+
c.logger.Err(err).Msg("failed to read log line from plugin")
140+
break
141+
}
129142
var structuredLogLine map[string]interface{}
130-
b := scanner.Bytes()
131-
if err := json.Unmarshal(b, &structuredLogLine); err != nil {
132-
c.logger.Err(err).Str("line", string(b)).Msg("failed to unmarshal log line from plugin")
143+
if err := json.Unmarshal(line, &structuredLogLine); err != nil {
144+
c.logger.Err(err).Str("line", string(line)).Msg("failed to unmarshal log line from plugin")
133145
} else {
134146
jsonToLog(c.logger, structuredLogLine)
135147
}

clients/log_reader.go

Lines changed: 56 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,56 @@
1+
package clients
2+
3+
import (
4+
"bufio"
5+
"errors"
6+
"io"
7+
)
8+
9+
// logReaderPrefixLen is used when returning a partial line as context in NextLine
10+
const logReaderPrefixLen = 1000
11+
12+
var errLogLineToLong = errors.New("log line too long, discarding")
13+
14+
// logReader is a custom implementation similar to bufio.Scanner, but provides a way to handle lines
15+
// (or tokens) that exceed the buffer size.
16+
type logReader struct {
17+
bufferedReader *bufio.Reader
18+
reader io.ReadCloser // reader provided by the client
19+
}
20+
21+
// newLogReader creates a new logReader to read log lines from an io.ReadCloser
22+
func newLogReader(reader io.ReadCloser) *logReader {
23+
return &logReader{
24+
reader: reader,
25+
bufferedReader: bufio.NewReader(reader),
26+
}
27+
}
28+
29+
// NextLine reads and returns the next log line from the reader. An io.EOF error is returned
30+
// if the end of the stream has been reached. This implementation is different from bufio.Scanner as it
31+
// also returns an error if a line is too long to fit into the buffer. In this case, an error is returned
32+
// together with a limited prefix of the line.
33+
func (r *logReader) NextLine() ([]byte, error) {
34+
line, isPrefix, err := r.bufferedReader.ReadLine()
35+
if !isPrefix || err != nil {
36+
return line, err
37+
}
38+
prefix := make([]byte, logReaderPrefixLen)
39+
for i := 0; isPrefix; i++ {
40+
// this loop is entered if a log line is too long to fit into the buffer. We discard it by
41+
// iterating until isPrefix becomes false. We only log the first few bytes of the line to help with
42+
// identification.
43+
if i == 0 {
44+
prefixLen := logReaderPrefixLen
45+
if len(line) < prefixLen {
46+
prefixLen = len(line)
47+
}
48+
copy(prefix, line[:prefixLen])
49+
}
50+
line, isPrefix, err = r.bufferedReader.ReadLine()
51+
if err != nil {
52+
return nil, err
53+
}
54+
}
55+
return prefix, errLogLineToLong
56+
}

clients/log_reader_test.go

Lines changed: 114 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,114 @@
1+
package clients
2+
3+
import (
4+
"bufio"
5+
"github.com/google/go-cmp/cmp"
6+
"io"
7+
"strings"
8+
"testing"
9+
)
10+
11+
func longStr(len int) string {
12+
b := make([]byte, len)
13+
for i := 0; i < len; i++ {
14+
b[i] = byte(65 + (i % 26)) // cycle through letters A to Z
15+
}
16+
return string(b)
17+
}
18+
19+
func genLogs(num, lineLen int) string {
20+
s := make([]string, num)
21+
for i := 0; i < num; i++ {
22+
s[i] = longStr(lineLen)
23+
}
24+
return strings.Join(s, "\n")
25+
}
26+
27+
func Test_LogReader(t *testing.T) {
28+
cases := []struct {
29+
name string
30+
text string
31+
wantLines []string
32+
wantErr bool
33+
}{
34+
{
35+
name: "basic case",
36+
text: `{"k": "v"}
37+
{"k2": "v2"}`,
38+
wantErr: false,
39+
wantLines: []string{
40+
`{"k": "v"}`,
41+
`{"k2": "v2"}`,
42+
}},
43+
{
44+
name: "very long line",
45+
text: longStr(10000000),
46+
wantLines: []string{
47+
longStr(logReaderPrefixLen),
48+
},
49+
wantErr: true,
50+
},
51+
}
52+
for _, tc := range cases {
53+
t.Run(tc.name, func(t *testing.T) {
54+
r := io.NopCloser(strings.NewReader(tc.text))
55+
lr := newLogReader(r)
56+
var gotErr error
57+
gotLines := make([]string, 0)
58+
for i := 0; i < len(tc.wantLines)+1; i++ {
59+
line, err := lr.NextLine()
60+
if err == io.EOF {
61+
break
62+
} else if err != nil {
63+
gotErr = err
64+
}
65+
gotLines = append(gotLines, string(line))
66+
}
67+
if gotErr == nil && tc.wantErr {
68+
t.Fatal("NextLine() was expected to return error, but didn't")
69+
}
70+
if len(gotLines) != len(tc.wantLines) {
71+
t.Fatalf("NextLine() calls got %d lines, want %d", len(gotLines), len(tc.wantLines))
72+
}
73+
if diff := cmp.Diff(gotLines, tc.wantLines); diff != "" {
74+
t.Errorf("NextLine() lines differ from expected. Diff (-got, +want): %s", diff)
75+
}
76+
})
77+
}
78+
}
79+
80+
// we store these package-level variables so that the compiler cannot eliminate the Benchmarks themselves
81+
var (
82+
bufScannerResult []byte
83+
logReaderResult []byte
84+
)
85+
86+
func Benchmark_BufferedScanner(b *testing.B) {
87+
logs := genLogs(10, 10000)
88+
bs := bufio.NewScanner(io.NopCloser(strings.NewReader(logs)))
89+
b.ResetTimer()
90+
var got []byte
91+
for n := 0; n < b.N; n++ {
92+
for bs.Scan() {
93+
got = bs.Bytes()
94+
}
95+
}
96+
bufScannerResult = got
97+
}
98+
99+
func Benchmark_LogReader(b *testing.B) {
100+
logs := genLogs(10, 10000)
101+
lr := newLogReader(io.NopCloser(strings.NewReader(logs)))
102+
b.ResetTimer()
103+
var got []byte
104+
for n := 0; n < b.N; n++ {
105+
for {
106+
line, err := lr.NextLine()
107+
if err == io.EOF {
108+
break
109+
}
110+
got = line
111+
}
112+
}
113+
logReaderResult = got
114+
}

clients/source.go

Lines changed: 17 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -1,9 +1,9 @@
11
package clients
22

33
import (
4-
"bufio"
54
"context"
65
"encoding/json"
6+
"errors"
77
"fmt"
88
"io"
99
"net"
@@ -129,12 +129,23 @@ func (c *SourceClient) newManagedClient(ctx context.Context, path string) (*Sour
129129
c.wg.Add(1)
130130
go func() {
131131
defer c.wg.Done()
132-
scanner := bufio.NewScanner(reader)
133-
for scanner.Scan() {
132+
lr := newLogReader(reader)
133+
for {
134+
line, err := lr.NextLine()
135+
if errors.Is(err, io.EOF) {
136+
break
137+
}
138+
if errors.Is(err, errLogLineToLong) {
139+
c.logger.Err(err).Str("line", string(line)).Msg("skipping too long log line")
140+
continue
141+
}
142+
if err != nil {
143+
c.logger.Err(err).Msg("failed to read log line from plugin")
144+
break
145+
}
134146
var structuredLogLine map[string]interface{}
135-
b := scanner.Bytes()
136-
if err := json.Unmarshal(b, &structuredLogLine); err != nil {
137-
c.logger.Err(err).Str("line", string(b)).Msg("failed to unmarshal log line from plugin")
147+
if err := json.Unmarshal(line, &structuredLogLine); err != nil {
148+
c.logger.Err(err).Str("line", string(line)).Msg("failed to unmarshal log line from plugin")
138149
} else {
139150
jsonToLog(c.logger, structuredLogLine)
140151
}

0 commit comments

Comments
 (0)