Skip to content

Commit 0e047b1

Browse files
authored
Zero-allocation optimization for sendLine and readLine (#1803)
1 parent e36dc44 commit 0e047b1

File tree

3 files changed

+137
-9
lines changed

3 files changed

+137
-9
lines changed

plugins/inputs/logfile/tail/tail.go

Lines changed: 25 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -37,11 +37,6 @@ type Line struct {
3737
Offset int64 // offset of current reader
3838
}
3939

40-
// NewLine returns a Line with present time.
41-
func NewLine(text string, offset int64) *Line {
42-
return &Line{text, time.Now(), nil, offset}
43-
}
44-
4540
// SeekInfo represents arguments to `os.Seek`
4641
type SeekInfo struct {
4742
Offset int64
@@ -91,6 +86,8 @@ type Tail struct {
9186
lk sync.Mutex
9287

9388
FileDeletedCh chan struct{}
89+
90+
linePool sync.Pool
9491
}
9592

9693
// TailFile begins tailing the file. Output stream is made available
@@ -107,6 +104,9 @@ func TailFile(filename string, config Config) (*Tail, error) {
107104
Lines: make(chan *Line),
108105
Config: config,
109106
FileDeletedCh: make(chan struct{}),
107+
linePool: sync.Pool{
108+
New: func() any { return &Line{} },
109+
},
110110
}
111111

112112
// when Logger was not specified in config, create new one
@@ -418,7 +418,13 @@ func (tail *Tail) tailFileSync() {
418418
// Wait a second before seeking till the end of
419419
// file when rate limit is reached.
420420
msg := "Too much log activity; waiting a second before resuming tailing"
421-
tail.Lines <- &Line{msg, time.Now(), errors.New(msg), tail.curOffset}
421+
// Warning: Make sure to release line once done!
422+
lineObject := tail.linePool.Get().(*Line)
423+
lineObject.Text = msg
424+
lineObject.Time = time.Now()
425+
lineObject.Err = errors.New(msg)
426+
lineObject.Offset = tail.curOffset
427+
tail.Lines <- lineObject
422428
select {
423429
case <-time.After(time.Second):
424430
case <-tail.Dying():
@@ -574,12 +580,18 @@ func (tail *Tail) sendLine(line string, offset int64) bool {
574580

575581
for i, line := range lines {
576582
// This select is to avoid blockage on the tail.Lines chan
583+
// Warning: Make sure to release line once done!
584+
lineObject := tail.linePool.Get().(*Line)
585+
lineObject.Text = line
586+
lineObject.Time = now
587+
lineObject.Err = nil
588+
lineObject.Offset = offset
577589
select {
578-
case tail.Lines <- &Line{line, now, nil, offset}:
590+
case tail.Lines <- lineObject:
579591
case <-tail.Dying():
580592
if tail.Err() == errStopAtEOF {
581593
// Try sending, even if it blocks.
582-
tail.Lines <- &Line{line, now, nil, offset}
594+
tail.Lines <- lineObject
583595
} else {
584596
tail.dropCnt += len(lines) - i
585597
return true
@@ -627,6 +639,11 @@ func (tail *Tail) unreadByte() (err error) {
627639
return
628640
}
629641

642+
func (tail *Tail) ReleaseLine(line *Line) {
643+
*line = Line{}
644+
tail.linePool.Put(line)
645+
}
646+
630647
// A wrapper of tomb Err()
631648
func (tail *Tail) UnexpectedError() (err error) {
632649
err = tail.Err()

plugins/inputs/logfile/tail/tail_test.go

Lines changed: 106 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -9,6 +9,7 @@ import (
99
"os"
1010
"path/filepath"
1111
"strings"
12+
"sync"
1213
"testing"
1314
"time"
1415

@@ -105,7 +106,8 @@ func TestStopAtEOF(t *testing.T) {
105106

106107
// Read to EOF
107108
for i := 0; i < linesWrittenToFile-3; i++ {
108-
<-tail.Lines
109+
line := <-tail.Lines
110+
tail.ReleaseLine(line)
109111
}
110112

111113
// Verify tail.Wait() has completed.
@@ -163,11 +165,13 @@ func readThreelines(t *testing.T, tail *Tail) {
163165
line := <-tail.Lines
164166
if line.Err != nil {
165167
t.Errorf("error tailing test file: %v", line.Err)
168+
tail.ReleaseLine(line) // Release even on error
166169
continue
167170
}
168171
if !strings.HasSuffix(line.Text, "some log line") {
169172
t.Errorf("wrong line from tail found: '%v'", line.Text)
170173
}
174+
tail.ReleaseLine(line) // Release line back to pool
171175
}
172176
// If file was readable, then expect it to exist.
173177
assert.Equal(t, int64(1), OpenFileCount.Load())
@@ -238,6 +242,7 @@ func TestUtf16LineSize(t *testing.T) {
238242
case line := <-tail.Lines:
239243
// The line should be truncated to maxLineSize
240244
assert.LessOrEqual(t, len(line.Text), maxLineSize)
245+
tail.ReleaseLine(line)
241246
case <-time.After(1 * time.Second):
242247
t.Fatal("timeout waiting for line")
243248
}
@@ -265,6 +270,7 @@ func TestTail_DefaultBuffer(t *testing.T) {
265270
case line := <-tail.Lines:
266271
assert.NoError(t, line.Err)
267272
assert.Equal(t, normalContent, line.Text)
273+
tail.ReleaseLine(line)
268274
case <-time.After(time.Second):
269275
t.Fatal("Timeout waiting for line")
270276
}
@@ -292,7 +298,106 @@ func TestTail_1MBWithExplicitMaxLineSize(t *testing.T) {
292298
case line := <-tail.Lines:
293299
assert.NoError(t, line.Err)
294300
assert.Equal(t, largeContent, line.Text)
301+
tail.ReleaseLine(line)
295302
case <-time.After(time.Second):
296303
t.Fatal("Timeout waiting for line")
297304
}
298305
}
306+
307+
// TestLinePooling verifies that Line objects are properly pooled and reused
308+
func TestLinePooling(t *testing.T) {
309+
tmpfile, err := os.CreateTemp("", "pool_test")
310+
require.NoError(t, err)
311+
defer os.Remove(tmpfile.Name())
312+
313+
content := "line1\nline2\nline3\n"
314+
err = os.WriteFile(tmpfile.Name(), []byte(content), 0600)
315+
require.NoError(t, err)
316+
317+
tail, err := TailFile(tmpfile.Name(), Config{
318+
Follow: false,
319+
MustExist: true,
320+
})
321+
require.NoError(t, err)
322+
defer tail.Stop()
323+
324+
var lines []*Line
325+
for i := 0; i < 3; i++ {
326+
select {
327+
case line := <-tail.Lines:
328+
lines = append(lines, line)
329+
case <-time.After(time.Second):
330+
t.Fatal("Timeout waiting for line")
331+
}
332+
}
333+
334+
assert.Equal(t, "line1", lines[0].Text)
335+
assert.Equal(t, "line2", lines[1].Text)
336+
assert.Equal(t, "line3", lines[2].Text)
337+
338+
// Release all lines back to pool
339+
for _, line := range lines {
340+
tail.ReleaseLine(line)
341+
}
342+
343+
// Line object should be zeroed out because we released it
344+
pooledLine := tail.linePool.Get().(*Line)
345+
assert.Empty(t, pooledLine.Text, "Pooled line should remain zeroed")
346+
assert.Empty(t, pooledLine.Time, "Pooled line should remain zeroed")
347+
assert.Empty(t, pooledLine.Err, "Pooled line should remain zeroed")
348+
assert.Empty(t, pooledLine.Offset, "Pooled line should remain zeroed")
349+
tail.ReleaseLine(pooledLine)
350+
}
351+
352+
// TestConcurrentLinePoolAccess tests that the line pool is thread-safe
353+
func TestConcurrentLinePoolAccess(t *testing.T) {
354+
tmpfile, err := os.CreateTemp("", "concurrent_test")
355+
require.NoError(t, err)
356+
defer os.Remove(tmpfile.Name())
357+
358+
// Create content with multiple lines
359+
numLines := 100
360+
content := strings.Repeat("concurrent test line\n", numLines)
361+
err = os.WriteFile(tmpfile.Name(), []byte(content), 0600)
362+
require.NoError(t, err)
363+
364+
tail, err := TailFile(tmpfile.Name(), Config{
365+
Follow: false,
366+
MustExist: true,
367+
})
368+
require.NoError(t, err)
369+
defer tail.Stop()
370+
371+
// Process lines concurrently
372+
var wg sync.WaitGroup
373+
linesChan := make(chan *Line, numLines)
374+
375+
wg.Add(1)
376+
go func() {
377+
defer wg.Done()
378+
for i := 0; i < numLines; i++ {
379+
select {
380+
case line := <-tail.Lines:
381+
linesChan <- line
382+
case <-time.After(5 * time.Second):
383+
t.Errorf("Timeout waiting for line %d", i)
384+
return
385+
}
386+
}
387+
close(linesChan)
388+
}()
389+
390+
numWorkers := 5
391+
for w := 0; w < numWorkers; w++ {
392+
wg.Add(1)
393+
go func() {
394+
defer wg.Done()
395+
for line := range linesChan {
396+
assert.Equal(t, "concurrent test line", line.Text)
397+
tail.ReleaseLine(line) // Release back to pool
398+
}
399+
}()
400+
}
401+
402+
wg.Wait()
403+
}

plugins/inputs/logfile/tailersrc.go

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -199,6 +199,7 @@ func (ts *tailerSrc) runTail() {
199199

200200
for {
201201
select {
202+
// Warning: Make sure to release line once done!
202203
case line, ok := <-ts.tailer.Lines:
203204
if !ok {
204205
ts.publishEvent(msgBuf, fo)
@@ -207,6 +208,7 @@ func (ts *tailerSrc) runTail() {
207208

208209
if line.Err != nil {
209210
log.Printf("E! [logfile] Error tailing line in file %s, Error: %s\n", ts.tailer.Filename, line.Err)
211+
ts.tailer.ReleaseLine(line)
210212
continue
211213
}
212214

@@ -216,6 +218,7 @@ func (ts *tailerSrc) runTail() {
216218
text, err = ts.enc.NewDecoder().String(text)
217219
if err != nil {
218220
log.Printf("E! [logfile] Cannot decode the log file content for %s: %v\n", ts.tailer.Filename, err)
221+
ts.tailer.ReleaseLine(line)
219222
continue
220223
}
221224
}
@@ -231,18 +234,21 @@ func (ts *tailerSrc) runTail() {
231234
} else if ignoreUntilNextEvent || msgBuf.Len() >= ts.maxEventSize {
232235
ignoreUntilNextEvent = true
233236
fo.ShiftInt64(line.Offset)
237+
ts.tailer.ReleaseLine(line)
234238
continue
235239
} else {
236240
msgBuf.WriteString("\n")
237241
msgBuf.WriteString(text)
238242
fo.ShiftInt64(line.Offset)
243+
ts.tailer.ReleaseLine(line)
239244
continue
240245
}
241246

242247
ts.publishEvent(msgBuf, fo)
243248
msgBuf.Reset()
244249
msgBuf.WriteString(init)
245250
fo.ShiftInt64(line.Offset)
251+
ts.tailer.ReleaseLine(line)
246252
cnt = 0
247253
case <-t.C:
248254
if msgBuf.Len() > 0 {

0 commit comments

Comments
 (0)