Skip to content

Commit 6691bf6

Browse files
authored
Dynamic reader buffer for tail (#1809)
1 parent 0e047b1 commit 6691bf6

File tree

3 files changed

+297
-12
lines changed

3 files changed

+297
-12
lines changed

plugins/inputs/logfile/constants/constants.go

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -4,6 +4,10 @@
44
package constants
55

66
const (
7+
// DefaultReaderBufferSize is the default buffer size for file readers (256KB)
8+
// This is much smaller than MaxEventSize to reduce memory usage
9+
DefaultReaderBufferSize = 256 * 1024
10+
711
// DefaultMaxEventSize is the default maximum size for log events (1MB)
812
DefaultMaxEventSize = 1024 * 1024
913

plugins/inputs/logfile/tail/tail.go

Lines changed: 43 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -73,8 +73,9 @@ type Tail struct {
7373
Lines chan *Line
7474
Config
7575

76-
file *os.File
77-
reader *bufio.Reader
76+
file *os.File
77+
reader *bufio.Reader
78+
useLargeBuffer bool
7879

7980
watcher watch.FileWatcher
8081
changes *watch.FileChanges
@@ -544,14 +545,52 @@ func (tail *Tail) waitForChanges() error {
544545

545546
func (tail *Tail) openReader() {
546547
tail.lk.Lock()
547-
if tail.MaxLineSize > 0 {
548+
if tail.useLargeBuffer {
548549
tail.reader = bufio.NewReaderSize(tail.file, tail.MaxLineSize)
549550
} else {
550-
tail.reader = bufio.NewReader(tail.file)
551+
tail.reader = bufio.NewReaderSize(tail.file, constants.DefaultReaderBufferSize)
551552
}
552553
tail.lk.Unlock()
553554
}
554555

556+
func (tail *Tail) readSlice(delim byte) ([]byte, error) {
557+
// First try: normal ReadSlice
558+
word, err := tail.reader.ReadSlice(delim)
559+
if err != bufio.ErrBufferFull {
560+
tail.curOffset += int64(len(word))
561+
return word, err // fast path: no allocation
562+
}
563+
564+
// Check if buffer already upgraded
565+
if tail.reader.Size() == tail.MaxLineSize {
566+
tail.curOffset += int64(len(word))
567+
return word, bufio.ErrBufferFull
568+
}
569+
570+
// Buffer was too small → allocate ONCE
571+
buf := append([]byte(nil), word...)
572+
573+
// Copy any unread buffered data
574+
unread := tail.reader.Buffered()
575+
if unread > 0 {
576+
peek, _ := tail.reader.Peek(unread)
577+
buf = append(buf, peek...)
578+
tail.reader.Discard(unread)
579+
}
580+
581+
// Switch to a bigger buffer
582+
tail.reader = bufio.NewReaderSize(tail.file, tail.MaxLineSize)
583+
// In the event that the tail is re-opened, we don't want to have to do this
584+
// re-sizing of the buffer again. The reader should just re-open with the larger buffer
585+
tail.useLargeBuffer = true
586+
587+
word, err = tail.reader.ReadSlice(delim)
588+
buf = append(buf, word...)
589+
tail.curOffset += int64(len(buf))
590+
591+
return buf, err
592+
}
593+
555594
func (tail *Tail) seekEnd() error {
556595
return tail.seekTo(SeekInfo{Offset: 0, Whence: os.SEEK_END})
557596
}
@@ -618,13 +657,6 @@ func (tail *Tail) Cleanup() {
618657
watch.Cleanup(tail.Filename)
619658
}
620659

621-
// A wrapper of bufio ReadSlice
622-
func (tail *Tail) readSlice(delim byte) (line []byte, err error) {
623-
line, err = tail.reader.ReadSlice(delim)
624-
tail.curOffset += int64(len(line))
625-
return
626-
}
627-
628660
// A wrapper of bufio ReadByte
629661
func (tail *Tail) readByte() (b byte, err error) {
630662
b, err = tail.reader.ReadByte()

plugins/inputs/logfile/tail/tail_test.go

Lines changed: 250 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -15,6 +15,8 @@ import (
1515

1616
"github.com/stretchr/testify/assert"
1717
"github.com/stretchr/testify/require"
18+
19+
"github.com/aws/amazon-cloudwatch-agent/plugins/inputs/logfile/constants"
1820
)
1921

2022
const linesWrittenToFile int = 10
@@ -289,7 +291,7 @@ func TestTail_1MBWithExplicitMaxLineSize(t *testing.T) {
289291
tail, err := TailFile(filename, Config{
290292
Follow: false,
291293
MustExist: true,
292-
MaxLineSize: 1024 * 1024, // Explicitly set 1MB buffer
294+
MaxLineSize: constants.DefaultMaxEventSize, // Explicitly set 1MB buffer
293295
})
294296
require.NoError(t, err)
295297
defer tail.Stop()
@@ -401,3 +403,250 @@ func TestConcurrentLinePoolAccess(t *testing.T) {
401403

402404
wg.Wait()
403405
}
406+
407+
// TestDynamicBufferSmallLines tests that small lines use the default small buffer
408+
func TestDynamicBufferSmallLines(t *testing.T) {
409+
tempDir := t.TempDir()
410+
filename := filepath.Join(tempDir, "small_lines.log")
411+
412+
// Create file with small lines (well within 256KB default buffer)
413+
smallLine := strings.Repeat("a", 1024) // 1KB line
414+
content := smallLine + "\n" + smallLine + "\n"
415+
err := os.WriteFile(filename, []byte(content), 0600)
416+
require.NoError(t, err)
417+
418+
tail, err := TailFile(filename, Config{
419+
Follow: false,
420+
MustExist: true,
421+
MaxLineSize: constants.DefaultMaxEventSize, // 1MB max
422+
})
423+
require.NoError(t, err)
424+
defer tail.Stop()
425+
426+
// Read first line to ensure reader is initialized
427+
select {
428+
case line := <-tail.Lines:
429+
assert.NoError(t, line.Err)
430+
assert.Equal(t, smallLine, line.Text)
431+
tail.ReleaseLine(line)
432+
case <-time.After(time.Second):
433+
t.Fatal("Timeout waiting for first line")
434+
}
435+
436+
// Now check buffer size after reader is initialized
437+
assert.Equal(t, constants.DefaultReaderBufferSize, tail.reader.Size(), "Should use default 256KB buffer for small lines")
438+
assert.False(t, tail.useLargeBuffer, "Should not be using large buffer for small lines")
439+
440+
// Read second line
441+
select {
442+
case line := <-tail.Lines:
443+
assert.NoError(t, line.Err)
444+
assert.Equal(t, smallLine, line.Text)
445+
tail.ReleaseLine(line)
446+
case <-time.After(time.Second):
447+
t.Fatal("Timeout waiting for second line")
448+
}
449+
450+
// Verify buffer is still small after reading small lines
451+
assert.Equal(t, constants.DefaultReaderBufferSize, tail.reader.Size(), "Should still use 256KB buffer for small lines")
452+
assert.False(t, tail.useLargeBuffer, "Should still not be using large buffer")
453+
}
454+
455+
// TestDynamicBufferLargeLineUpgrade tests buffer upgrade when encountering large lines
456+
func TestDynamicBufferLargeLineUpgrade(t *testing.T) {
457+
tempDir := t.TempDir()
458+
filename := filepath.Join(tempDir, "large_line.log")
459+
460+
// Create file with multiple large lines to test buffer upgrade
461+
largeLine := strings.Repeat("b", 300*1024) // 300KB line
462+
nearlyOneMBLine := strings.Repeat("x", constants.DefaultMaxEventSize-1024) // Nearly 1MB line (1MB - 1KB)
463+
content := largeLine + "\nafter large line\n" + nearlyOneMBLine + "\nafter nearly 1MB line\n"
464+
err := os.WriteFile(filename, []byte(content), 0600)
465+
require.NoError(t, err)
466+
467+
tail, err := TailFile(filename, Config{
468+
Follow: false,
469+
MustExist: true,
470+
MaxLineSize: constants.DefaultMaxEventSize, // 1MB max
471+
})
472+
require.NoError(t, err)
473+
defer tail.Stop()
474+
475+
// Read first large line (300KB) - should trigger buffer upgrade
476+
select {
477+
case line := <-tail.Lines:
478+
assert.NoError(t, line.Err)
479+
assert.Equal(t, largeLine, line.Text)
480+
assert.Equal(t, 300*1024, len(line.Text), "First line should be 300KB")
481+
tail.ReleaseLine(line)
482+
case <-time.After(time.Second):
483+
t.Fatal("Timeout waiting for large line")
484+
}
485+
486+
// Verify buffer was upgraded after reading large line
487+
assert.Equal(t, constants.DefaultMaxEventSize, tail.reader.Size(), "Should upgrade to 1MB buffer after large line")
488+
assert.True(t, tail.useLargeBuffer, "Should be using large buffer after upgrade")
489+
490+
// Read line after large line
491+
select {
492+
case line := <-tail.Lines:
493+
assert.NoError(t, line.Err)
494+
assert.Equal(t, "after large line", line.Text)
495+
tail.ReleaseLine(line)
496+
case <-time.After(time.Second):
497+
t.Fatal("Timeout waiting for line after large")
498+
}
499+
500+
// Read nearly 1MB line - should work with already upgraded buffer
501+
select {
502+
case line := <-tail.Lines:
503+
assert.NoError(t, line.Err)
504+
assert.Equal(t, nearlyOneMBLine, line.Text)
505+
assert.Equal(t, constants.DefaultMaxEventSize-1024, len(line.Text), "Line should be nearly 1MB")
506+
tail.ReleaseLine(line)
507+
case <-time.After(2 * time.Second): // Longer timeout for very large line
508+
t.Fatal("Timeout waiting for nearly 1MB line")
509+
}
510+
511+
// Read final line
512+
select {
513+
case line := <-tail.Lines:
514+
assert.NoError(t, line.Err)
515+
assert.Equal(t, "after nearly 1MB line", line.Text)
516+
tail.ReleaseLine(line)
517+
case <-time.After(time.Second):
518+
t.Fatal("Timeout waiting for final line")
519+
}
520+
521+
// Verify buffer remains large throughout
522+
assert.Equal(t, constants.DefaultMaxEventSize, tail.reader.Size(), "Should maintain 1MB buffer")
523+
assert.True(t, tail.useLargeBuffer, "Should continue using large buffer")
524+
}
525+
526+
// TestDynamicBufferPersistentUpgrade tests that buffer upgrade persists across file reopens
527+
func TestDynamicBufferPersistentUpgrade(t *testing.T) {
528+
tempDir := t.TempDir()
529+
filename := filepath.Join(tempDir, "persistent_test.log")
530+
531+
// Create file with large line to trigger upgrade
532+
largeLine := strings.Repeat("c", 300*1024) // 300KB line
533+
content := largeLine + "\n"
534+
err := os.WriteFile(filename, []byte(content), 0600)
535+
require.NoError(t, err)
536+
537+
tail, err := TailFile(filename, Config{
538+
Follow: true,
539+
ReOpen: true,
540+
MustExist: true,
541+
MaxLineSize: constants.DefaultMaxEventSize, // 1MB max
542+
})
543+
require.NoError(t, err)
544+
defer tail.Stop()
545+
546+
// Read large line to trigger upgrade
547+
select {
548+
case line := <-tail.Lines:
549+
assert.NoError(t, line.Err)
550+
assert.Equal(t, largeLine, line.Text)
551+
tail.ReleaseLine(line)
552+
case <-time.After(time.Second):
553+
t.Fatal("Timeout waiting for large line")
554+
}
555+
556+
// Verify buffer was upgraded
557+
assert.True(t, tail.useLargeBuffer, "Should be using large buffer after upgrade")
558+
559+
// Force a reopen by simulating file recreation
560+
err = tail.Reopen(false)
561+
require.NoError(t, err)
562+
563+
// Verify buffer upgrade persisted across reopen
564+
assert.Equal(t, constants.DefaultMaxEventSize, tail.reader.Size(), "Should use 1MB buffer after reopen")
565+
assert.True(t, tail.useLargeBuffer, "Should maintain large buffer flag after reopen")
566+
}
567+
568+
// TestDynamicBufferMaxLineSizeLimit tests behavior when line exceeds MaxLineSize
569+
func TestDynamicBufferMaxLineSizeLimit(t *testing.T) {
570+
tempDir := t.TempDir()
571+
filename := filepath.Join(tempDir, "max_size_test.log")
572+
573+
maxLineSize := 512 * 1024 // 512KB max
574+
// Create line larger than MaxLineSize
575+
hugeLine := strings.Repeat("d", maxLineSize+1024) // 513KB line
576+
content := hugeLine + "\n"
577+
err := os.WriteFile(filename, []byte(content), 0600)
578+
require.NoError(t, err)
579+
580+
tail, err := TailFile(filename, Config{
581+
Follow: false,
582+
MustExist: true,
583+
MaxLineSize: maxLineSize,
584+
})
585+
require.NoError(t, err)
586+
defer tail.Stop()
587+
588+
// Read the line - should be truncated at buffer boundary
589+
select {
590+
case line := <-tail.Lines:
591+
assert.NoError(t, line.Err)
592+
// Line should be truncated to buffer size
593+
assert.Equal(t, maxLineSize, len(line.Text))
594+
assert.Equal(t, strings.Repeat("d", maxLineSize), line.Text)
595+
tail.ReleaseLine(line)
596+
case <-time.After(time.Second):
597+
t.Fatal("Timeout waiting for truncated line")
598+
}
599+
600+
// Verify buffer was upgraded to MaxLineSize
601+
assert.Equal(t, maxLineSize, tail.reader.Size(), "Should upgrade to MaxLineSize buffer")
602+
assert.True(t, tail.useLargeBuffer, "Should be using large buffer")
603+
}
604+
605+
// TestDynamicBufferMultipleUpgrades tests that buffer doesn't upgrade multiple times
606+
func TestDynamicBufferMultipleUpgrades(t *testing.T) {
607+
tempDir := t.TempDir()
608+
filename := filepath.Join(tempDir, "multiple_upgrades.log")
609+
610+
// Create multiple large lines
611+
largeLine1 := strings.Repeat("e", 300*1024) // 300KB
612+
largeLine2 := strings.Repeat("f", 400*1024) // 400KB
613+
content := largeLine1 + "\n" + largeLine2 + "\n"
614+
err := os.WriteFile(filename, []byte(content), 0600)
615+
require.NoError(t, err)
616+
617+
tail, err := TailFile(filename, Config{
618+
Follow: false,
619+
MustExist: true,
620+
MaxLineSize: constants.DefaultMaxEventSize, // 1MB max
621+
})
622+
require.NoError(t, err)
623+
defer tail.Stop()
624+
625+
// Read first large line
626+
select {
627+
case line := <-tail.Lines:
628+
assert.NoError(t, line.Err)
629+
assert.Equal(t, largeLine1, line.Text)
630+
tail.ReleaseLine(line)
631+
case <-time.After(time.Second):
632+
t.Fatal("Timeout waiting for first large line")
633+
}
634+
635+
// Verify buffer was upgraded
636+
assert.Equal(t, constants.DefaultMaxEventSize, tail.reader.Size(), "Should upgrade to 1MB after first large line")
637+
assert.True(t, tail.useLargeBuffer, "Should be using large buffer")
638+
639+
// Read second large line
640+
select {
641+
case line := <-tail.Lines:
642+
assert.NoError(t, line.Err)
643+
assert.Equal(t, largeLine2, line.Text)
644+
tail.ReleaseLine(line)
645+
case <-time.After(time.Second):
646+
t.Fatal("Timeout waiting for second large line")
647+
}
648+
649+
// Verify buffer size didn't change (no double upgrade)
650+
assert.Equal(t, constants.DefaultMaxEventSize, tail.reader.Size(), "Should maintain 1MB buffer")
651+
assert.True(t, tail.useLargeBuffer, "Should continue using large buffer")
652+
}

0 commit comments

Comments
 (0)