Skip to content
Merged
Show file tree
Hide file tree
Changes from 3 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
177 changes: 177 additions & 0 deletions filebeat/input/filestream/input_integration_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -297,6 +297,37 @@ func TestFilestreamEmptyLinesOnly(t *testing.T) {
env.requireNoEntryInRegistry(testlogName, id)
}

// test_exceed_buffer from test_harvester.py
func TestFilestreamExceedBuffer(t *testing.T) {
env := newInputTestingEnvironment(t)

testlogName := "test.log"
id := "fake-ID-" + uuid.Must(uuid.NewV4()).String()
inp := env.mustCreateInput(map[string]interface{}{
"id": id,
"paths": []string{env.abspath(testlogName)},
"buffer_size": 10,
"prospector.scanner.check_interval": "1ms",
"prospector.scanner.fingerprint.enabled": false,
"file_identity.native": map[string]any{},
})

ctx, cancelInput := context.WithCancel(t.Context())
env.startInput(ctx, id, inp)

message := "This exceeds the buffer"
testlines := []byte(message + "\n")
env.mustWriteToFile(testlogName, testlines)

env.waitUntilEventCount(1)
env.requireEventsReceived([]string{message})

cancelInput()
env.waitUntilInputStops()

env.requireOffsetInRegistry(testlogName, id, len(testlines))
}

// test_bom_utf8 from test_harvester.py
func TestFilestreamBOMUTF8(t *testing.T) {
env := newInputTestingEnvironment(t)
Expand Down Expand Up @@ -840,6 +871,38 @@ func TestFilestreamTruncateBlockedOutput(t *testing.T) {
env.waitUntilInputStops()
}

// test_ignore_symlink from test_harvester.py
func TestFilestreamIgnoreSymlink(t *testing.T) {
env := newInputTestingEnvironment(t)

testlogName := "test.log"
symlinkName := "test.log.symlink"
id := "fake-ID-" + uuid.Must(uuid.NewV4()).String()
inp := env.mustCreateInput(map[string]interface{}{
"id": id,
"paths": []string{
env.abspath(symlinkName),
},
"prospector.scanner.fingerprint.enabled": false,
"file_identity.native": map[string]any{},
})

line := []byte("first line\n")
env.mustWriteToFile(testlogName, line)
env.mustSymlink(testlogName, symlinkName)

ctx, cancelInput := context.WithCancel(t.Context())
env.startInput(ctx, id, inp)

env.WaitLogsContains("is a symlink and they're disabled", 5*time.Second)
require.Empty(t, env.getOutputMessages())

cancelInput()
env.waitUntilInputStops()

env.requireRegistryEntryCount(0)
}

// test_symlinks_enabled from test_harvester.py
func TestFilestreamSymlinksEnabled(t *testing.T) {
env := newInputTestingEnvironment(t)
Expand Down Expand Up @@ -972,6 +1035,40 @@ func TestFilestreamSymlinkRemoved(t *testing.T) {
env.requireRegistryEntryCount(1)
}

// test_symlink_and_file from test_harvester.py
func TestFilestreamSymlinkAndFile(t *testing.T) {
env := newInputTestingEnvironment(t)

testlogName := "test.log"
symlinkName := "test.log.symlink"
id := "fake-ID-" + uuid.Must(uuid.NewV4()).String()
inp := env.mustCreateInput(map[string]interface{}{
"id": id,
"paths": []string{
env.abspath(testlogName),
env.abspath(symlinkName),
},
"prospector.scanner.symlinks": "true",
"prospector.scanner.fingerprint.enabled": false,
"file_identity.native": map[string]any{},
})

line := []byte("first line\n")
env.mustWriteToFile(testlogName, line)
env.mustSymlink(testlogName, symlinkName)

ctx, cancelInput := context.WithCancel(t.Context())
env.startInput(ctx, id, inp)

env.waitUntilEventCount(1)
env.requireEventsReceived([]string{"first line"})

cancelInput()
env.waitUntilInputStops()

env.requireRegistryEntryCount(1)
}

// test_truncate from test_harvester.py
func TestFilestreamTruncate(t *testing.T) {
env := newInputTestingEnvironment(t)
Expand Down Expand Up @@ -1061,6 +1158,86 @@ func TestFilestreamHarvestAllFilesWhenHarvesterLimitExceeded(t *testing.T) {
env.waitUntilInputStops()
}

// test_decode_error from test_harvester.py
func TestFilestreamDecodeError(t *testing.T) {
env := newInputTestingEnvironment(t)

testlogName := "test.log"
id := "fake-ID-" + uuid.Must(uuid.NewV4()).String()
inp := env.mustCreateInput(map[string]interface{}{
"id": id,
"paths": []string{env.abspath(testlogName)},
"encoding": "utf-16be",
"prospector.scanner.fingerprint.enabled": false,
"file_identity.native": map[string]any{},
})

encoder := unicode.UTF16(unicode.LittleEndian, unicode.IgnoreBOM).NewEncoder()
buf := bytes.NewBuffer(nil)
writer := transform.NewWriter(buf, encoder)
_, err := writer.Write([]byte("hello world1\n"))
require.NoError(t, err)
_, err = writer.Write([]byte("\U00012345=Ra"))
require.NoError(t, err)
_, err = writer.Write([]byte("\nhello world2\n"))
require.NoError(t, err)
require.NoError(t, writer.Close())

env.mustWriteToFile(testlogName, buf.Bytes())

ctx, cancelInput := context.WithCancel(t.Context())
env.startInput(ctx, id, inp)

env.waitUntilEventCount(3)
messages := env.getOutputMessages()
require.Equal(t, "hello world2", messages[2])

cancelInput()
env.waitUntilInputStops()

env.requireRegistryEntryCount(1)
}

// test_debug_reader from test_harvester.py
func TestFilestreamDebugReader(t *testing.T) {
env := newInputTestingEnvironment(t)

testlogName := "test.log"
id := "fake-ID-" + uuid.Must(uuid.NewV4()).String()
inp := env.mustCreateInput(map[string]interface{}{
"id": id,
"paths": []string{env.abspath(testlogName)},
"prospector.scanner.fingerprint.enabled": false,
"file_identity.native": map[string]any{},
})

lines := [][]byte{
[]byte("hello world1"),
[]byte("\n"),
{0, 0, 0, 0},
[]byte("\n"),
[]byte("hello world2"),
[]byte("\n"),
{0, 0, 0, 0},
[]byte("Hello World\n"),
bytes.Repeat([]byte("A"), 16*1024),
}

var fileContents []byte
for _, line := range lines {
fileContents = append(fileContents, line...)
}
env.mustWriteToFile(testlogName, fileContents)

ctx, cancelInput := context.WithCancel(t.Context())
env.startInput(ctx, id, inp)

env.WaitLogsContains("Matching null byte found at offset", 10*time.Second)

cancelInput()
env.waitUntilInputStops()
}

func TestGlobalIDCannotBeUsed(t *testing.T) {
env := newInputTestingEnvironment(t)
testlogName := "test.log"
Expand Down
Loading
Loading