Skip to content

Commit 4d30835

Browse files
authored
Fix state file truncation detection (#1858)
1 parent 12cbbb4 commit 4d30835

File tree

9 files changed

+224
-35
lines changed

9 files changed

+224
-35
lines changed

internal/state/range.go

Lines changed: 6 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -25,8 +25,7 @@ const (
2525
// Range represents a pair of offsets [start, end).
2626
type Range struct {
2727
start, end uint64
28-
// seq handles file truncation, when a file is truncated, we increase the seq
29-
seq uint64
28+
seq uint64
3029
}
3130

3231
var _ encoding.TextMarshaler = (*Range)(nil)
@@ -74,6 +73,11 @@ func (r Range) EndOffsetInt64() int64 {
7473
return convertInt64(r.end)
7574
}
7675

76+
// SequenceNumber handles file truncation detection. When a file is truncated, the sequence number will increase.
77+
func (r Range) SequenceNumber() uint64 {
78+
return r.seq
79+
}
80+
7781
// Shift moves the previous end to the start and sets the new end. If the new end is before the previous one, it resets
7882
// the range to [0, newEnd) and increments the sequence number.
7983
func (r *Range) Shift(newEnd uint64) {

internal/state/range_test.go

Lines changed: 14 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -18,9 +18,9 @@ func TestRange(t *testing.T) {
1818
r.Set(10, 20)
1919
assert.Equal(t, uint64(10), r.StartOffset())
2020
assert.Equal(t, uint64(20), r.EndOffset())
21-
assert.Equal(t, uint64(0), r.seq)
21+
assert.Equal(t, uint64(0), r.SequenceNumber())
2222
r.Set(5, 30)
23-
assert.Equal(t, uint64(1), r.seq)
23+
assert.Equal(t, uint64(1), r.SequenceNumber())
2424
})
2525
t.Run("SetGet/Int64", func(t *testing.T) {
2626
var r Range
@@ -37,21 +37,21 @@ func TestRange(t *testing.T) {
3737
t.Run("Shift", func(t *testing.T) {
3838
var r Range
3939
r.ShiftInt64(100)
40-
assert.Equal(t, uint64(0), r.start)
41-
assert.Equal(t, uint64(100), r.end)
42-
assert.Equal(t, uint64(0), r.seq)
40+
assert.Equal(t, uint64(0), r.StartOffset())
41+
assert.Equal(t, uint64(100), r.EndOffset())
42+
assert.Equal(t, uint64(0), r.SequenceNumber())
4343
r.ShiftInt64(200)
44-
assert.Equal(t, uint64(100), r.start)
45-
assert.Equal(t, uint64(200), r.end)
46-
assert.Equal(t, uint64(0), r.seq)
44+
assert.Equal(t, uint64(100), r.StartOffset())
45+
assert.Equal(t, uint64(200), r.EndOffset())
46+
assert.Equal(t, uint64(0), r.SequenceNumber())
4747
r.ShiftInt64(50)
48-
assert.Equal(t, uint64(0), r.start)
49-
assert.Equal(t, uint64(50), r.end)
50-
assert.Equal(t, uint64(1), r.seq)
48+
assert.Equal(t, uint64(0), r.StartOffset())
49+
assert.Equal(t, uint64(50), r.EndOffset())
50+
assert.Equal(t, uint64(1), r.SequenceNumber())
5151
r.ShiftInt64(-1)
52-
assert.Equal(t, uint64(0), r.start)
53-
assert.Equal(t, uint64(50), r.end)
54-
assert.Equal(t, uint64(1), r.seq)
52+
assert.Equal(t, uint64(0), r.StartOffset())
53+
assert.Equal(t, uint64(50), r.EndOffset())
54+
assert.Equal(t, uint64(1), r.SequenceNumber())
5555
})
5656
t.Run("Contains", func(t *testing.T) {
5757
r1 := Range{start: 0, end: 10}
Lines changed: 41 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,41 @@
1+
// Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved.
2+
// SPDX-License-Identifier: MIT
3+
4+
package statetest
5+
6+
import "github.com/aws/amazon-cloudwatch-agent/internal/state"
7+
8+
type FileRangeManagerSink struct {
9+
base state.FileRangeManager
10+
sink state.RangeList
11+
}
12+
13+
var _ state.FileRangeManager = (*FileRangeManagerSink)(nil)
14+
15+
func NewFileManagerSink(base state.FileRangeManager) *FileRangeManagerSink {
16+
return &FileRangeManagerSink{
17+
base: base,
18+
sink: make(state.RangeList, 0),
19+
}
20+
}
21+
22+
func (f *FileRangeManagerSink) ID() string {
23+
return f.base.ID()
24+
}
25+
26+
func (f *FileRangeManagerSink) Enqueue(r state.Range) {
27+
f.sink = append(f.sink, r)
28+
f.base.Enqueue(r)
29+
}
30+
31+
func (f *FileRangeManagerSink) Restore() (state.RangeList, error) {
32+
return f.base.Restore()
33+
}
34+
35+
func (f *FileRangeManagerSink) Run(ch state.Notification) {
36+
f.base.Run(ch)
37+
}
38+
39+
func (f *FileRangeManagerSink) GetSink() state.RangeList {
40+
return f.sink
41+
}
Lines changed: 47 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,47 @@
1+
// Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved.
2+
// SPDX-License-Identifier: MIT
3+
4+
package statetest
5+
6+
import (
7+
"sync"
8+
"testing"
9+
"time"
10+
11+
"github.com/stretchr/testify/assert"
12+
13+
"github.com/aws/amazon-cloudwatch-agent/internal/state"
14+
)
15+
16+
func TestNewFileManagerSink(t *testing.T) {
17+
tmpDir := t.TempDir()
18+
sink := NewFileManagerSink(state.NewFileRangeManager(state.ManagerConfig{
19+
StateFileDir: tmpDir,
20+
Name: "sink",
21+
MaxPersistedItems: 1,
22+
}))
23+
done := make(chan struct{})
24+
var wg sync.WaitGroup
25+
wg.Add(1)
26+
go func() {
27+
defer wg.Done()
28+
sink.Run(state.Notification{Done: done})
29+
}()
30+
assert.Equal(t, "sink", sink.ID())
31+
sink.Enqueue(state.NewRange(0, 5))
32+
sink.Enqueue(state.NewRange(5, 10))
33+
time.Sleep(time.Millisecond)
34+
close(done)
35+
wg.Wait()
36+
37+
got, err := sink.Restore()
38+
assert.NoError(t, err)
39+
assert.Equal(t, state.RangeList{
40+
state.NewRange(0, 10),
41+
}, got)
42+
43+
assert.Equal(t, state.RangeList{
44+
state.NewRange(0, 5),
45+
state.NewRange(5, 10),
46+
}, sink.GetSink())
47+
}

plugins/inputs/logfile/logfile.go

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -203,8 +203,11 @@ func (t *LogFile) FindLogSrc() []logs.LogSrc {
203203
seekFile = &tail.SeekInfo{Whence: io.SeekEnd, Offset: 0}
204204
}
205205

206+
var initialStateOffset int64
206207
var gapsToRead state.RangeList
207-
if !restored.OnlyUseMaxOffset() {
208+
if restored.OnlyUseMaxOffset() {
209+
initialStateOffset = restored.Last().EndOffsetInt64()
210+
} else {
208211
gapsToRead = state.InvertRanges(restored)
209212
}
210213
isutf16 := false
@@ -257,6 +260,7 @@ func (t *LogFile) FindLogSrc() []logs.LogSrc {
257260
groupName, streamName,
258261
t.Destination,
259262
stateManager,
263+
initialStateOffset,
260264
fileconfig.LogGroupClass,
261265
fileconfig.FilePath,
262266
tailer,

plugins/inputs/logfile/logfile_test.go

Lines changed: 78 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,7 @@ import (
1818
"golang.org/x/text/transform"
1919

2020
"github.com/aws/amazon-cloudwatch-agent/internal/state"
21+
"github.com/aws/amazon-cloudwatch-agent/internal/state/statetest"
2122
"github.com/aws/amazon-cloudwatch-agent/logs"
2223
)
2324

@@ -571,7 +572,7 @@ func TestLogsMultilineTimeout(t *testing.T) {
571572
func TestLogsFileTruncate(t *testing.T) {
572573
multilineWaitPeriod = 10 * time.Millisecond
573574
lineBeforeFileTruncate := "lineBeforeFileTruncate"
574-
lineAfterFileTruncate := "lineAfterFileTruncate"
575+
lineAfterFileTruncate := "afterTruncate"
575576

576577
tmpfile, err := createTempFile("", "")
577578
defer os.Remove(tmpfile.Name())
@@ -589,9 +590,17 @@ func TestLogsFileTruncate(t *testing.T) {
589590
}
590591

591592
lsrc := lsrcs[0]
593+
ts, ok := lsrc.(*tailerSrc)
594+
assert.True(t, ok)
595+
sink := statetest.NewFileManagerSink(ts.stateManager)
596+
ts.stateManager = sink
597+
592598
evts := make(chan logs.LogEvent)
593599
lsrc.SetOutput(func(e logs.LogEvent) {
594-
evts <- e
600+
if e != nil {
601+
e.Done()
602+
evts <- e
603+
}
595604
})
596605

597606
go func() {
@@ -620,6 +629,73 @@ func TestLogsFileTruncate(t *testing.T) {
620629

621630
lsrc.Stop()
622631
tt.Stop()
632+
633+
got := sink.GetSink()
634+
assert.Len(t, got, 2)
635+
assert.EqualValues(t, 0, got[0].SequenceNumber())
636+
assert.EqualValues(t, 1, got.Last().SequenceNumber())
637+
}
638+
639+
func TestLogsFileTruncateRestart(t *testing.T) {
640+
logEntryString := "postTruncateRestart"
641+
multilineWaitPeriod = 10 * time.Millisecond
642+
643+
tmpfile, err := createTempFile("", "")
644+
defer os.Remove(tmpfile.Name())
645+
require.NoError(t, err)
646+
647+
stateDir, err := os.MkdirTemp("", "state")
648+
require.NoError(t, err)
649+
defer os.Remove(stateDir)
650+
651+
stateFileName := state.FilePath(stateDir, tmpfile.Name())
652+
stateFile, err := os.OpenFile(stateFileName, os.O_RDWR|os.O_CREATE|os.O_EXCL, 0600)
653+
require.NoError(t, err)
654+
defer os.Remove(stateFileName)
655+
656+
_, err = stateFile.WriteString("1000")
657+
require.NoError(t, err)
658+
659+
_, err = tmpfile.WriteString(logEntryString + "\n")
660+
require.NoError(t, err)
661+
662+
tt := NewLogFile()
663+
tt.FileStateFolder = stateDir
664+
tt.Log = TestLogger{t}
665+
tt.FileConfig = []FileConfig{{FilePath: tmpfile.Name(), FromBeginning: true}}
666+
tt.FileConfig[0].init()
667+
tt.started = true
668+
669+
lsrcs := tt.FindLogSrc()
670+
if len(lsrcs) != 1 {
671+
t.Fatalf("%v log src was returned when 1 should be available", len(lsrcs))
672+
}
673+
674+
lsrc := lsrcs[0]
675+
ts, ok := lsrc.(*tailerSrc)
676+
assert.True(t, ok)
677+
sink := statetest.NewFileManagerSink(ts.stateManager)
678+
ts.stateManager = sink
679+
680+
evts := make(chan logs.LogEvent)
681+
lsrc.SetOutput(func(e logs.LogEvent) {
682+
if e != nil {
683+
e.Done()
684+
evts <- e
685+
}
686+
})
687+
688+
e := <-evts
689+
if e.Message() != logEntryString {
690+
t.Errorf("Wrong log found after offset: \n%v\nExpecting:\n%v\n", e.Message(), logEntryString)
691+
}
692+
693+
lsrc.Stop()
694+
tt.Stop()
695+
696+
got := sink.GetSink()
697+
assert.Len(t, got, 1)
698+
assert.EqualValues(t, 1, got.Last().SequenceNumber())
623699
}
624700

625701
func TestLogsFileWithOffset(t *testing.T) {

plugins/inputs/logfile/tailersrc.go

Lines changed: 18 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -59,18 +59,19 @@ func (le LogEvent) RangeQueue() state.FileRangeQueue {
5959
}
6060

6161
type tailerSrc struct {
62-
group string
63-
stream string
64-
class string
65-
fileGlobPath string
66-
destination string
67-
stateManager state.FileRangeManager
68-
tailer *tail.Tail
69-
autoRemoval bool
70-
timestampFn func(string) (time.Time, string)
71-
enc encoding.Encoding
72-
maxEventSize int
73-
retentionInDays int
62+
group string
63+
stream string
64+
class string
65+
fileGlobPath string
66+
destination string
67+
stateManager state.FileRangeManager
68+
initialStateOffset int64
69+
tailer *tail.Tail
70+
autoRemoval bool
71+
timestampFn func(string) (time.Time, string)
72+
enc encoding.Encoding
73+
maxEventSize int
74+
retentionInDays int
7475

7576
outputFn func(logs.LogEvent)
7677
isMLStart func(string) bool
@@ -89,6 +90,7 @@ var _ logs.LogSrc = (*tailerSrc)(nil)
8990
func NewTailerSrc(
9091
group, stream, destination string,
9192
stateManager state.FileRangeManager,
93+
initialStateOffset int64,
9294
logClass, fileGlobPath string,
9395
tailer *tail.Tail,
9496
autoRemoval bool,
@@ -105,6 +107,7 @@ func NewTailerSrc(
105107
stream: stream,
106108
destination: destination,
107109
stateManager: stateManager,
110+
initialStateOffset: initialStateOffset,
108111
class: logClass,
109112
fileGlobPath: fileGlobPath,
110113
tailer: tailer,
@@ -195,6 +198,9 @@ func (ts *tailerSrc) runTail() {
195198
var msgBuf bytes.Buffer
196199
var cnt int
197200
fo := state.Range{}
201+
if ts.initialStateOffset > 0 {
202+
fo.SetInt64(0, ts.initialStateOffset)
203+
}
198204
ignoreUntilNextEvent := false
199205

200206
for {

plugins/inputs/logfile/tailersrc_test.go

Lines changed: 9 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -72,8 +72,11 @@ func TestTailerSrc(t *testing.T) {
7272
})
7373

7474
ts := NewTailerSrc(
75-
"groupName", "streamName",
76-
"destination", m,
75+
"groupName",
76+
"streamName",
77+
"destination",
78+
m,
79+
0,
7780
util.InfrequentAccessLogGroupClass,
7881
"tailsrctest-*.log",
7982
tailer,
@@ -181,9 +184,11 @@ func TestEventDoneCallback(t *testing.T) {
181184
})
182185

183186
ts := NewTailerSrc(
184-
"groupName", "streamName",
187+
"groupName",
188+
"streamName",
185189
"destination",
186190
m,
191+
0,
187192
util.InfrequentAccessLogGroupClass,
188193
"tailsrctest-*.log",
189194
tailer,
@@ -414,6 +419,7 @@ func setupTailer(t *testing.T, multiLineFn func(string) bool, maxEventSize int,
414419
t.Name(),
415420
"destination",
416421
m,
422+
0,
417423
util.InfrequentAccessLogGroupClass,
418424
"tailsrctest-*.log",
419425
tailer,

0 commit comments

Comments
 (0)