Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
183 changes: 183 additions & 0 deletions filebeat/input/filestream/input_integration_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,7 @@
"golang.org/x/text/transform"

"github.com/elastic/beats/v7/libbeat/tests/integration"
"github.com/elastic/elastic-agent-libs/logp"
)

// test_close_renamed from test_harvester.py
Expand Down Expand Up @@ -299,6 +300,37 @@
env.requireNoEntryInRegistry(testlogName, id)
}

// test_exceed_buffer from test_harvester.py
func TestFilestreamExceedBuffer(t *testing.T) {
env := newInputTestingEnvironment(t)

testlogName := "test.log"
id := "fake-ID-" + uuid.Must(uuid.NewV4()).String()
inp := env.mustCreateInput(map[string]interface{}{
"id": id,
"paths": []string{env.abspath(testlogName)},
"buffer_size": 10,
"prospector.scanner.check_interval": "1ms",
"prospector.scanner.fingerprint.enabled": false,
"file_identity.native": map[string]any{},
})

ctx, cancelInput := context.WithCancel(t.Context())
env.startInput(ctx, id, inp)

message := "This exceeds the buffer"
testlines := []byte(message + "\n")
env.mustWriteToFile(testlogName, testlines)

env.waitUntilEventCount(1)
env.requireEventsReceived([]string{message})

cancelInput()
env.waitUntilInputStops()

env.requireOffsetInRegistry(testlogName, id, len(testlines))
}

// test_bom_utf8 from test_harvester.py
func TestFilestreamBOMUTF8(t *testing.T) {
env := newInputTestingEnvironment(t)
Expand Down Expand Up @@ -361,7 +393,7 @@
line := []byte("first line\n")
buf := bytes.NewBuffer(nil)
writer := transform.NewWriter(buf, encoder)
writer.Write(line)

Check failure on line 396 in filebeat/input/filestream/input_integration_test.go

View workflow job for this annotation

GitHub Actions / lint (macos-latest)

Error return value of `writer.Write` is not checked (errcheck)

Check failure on line 396 in filebeat/input/filestream/input_integration_test.go

View workflow job for this annotation

GitHub Actions / lint (windows-latest)

Error return value of `writer.Write` is not checked (errcheck)

Check failure on line 396 in filebeat/input/filestream/input_integration_test.go

View workflow job for this annotation

GitHub Actions / lint (ubuntu-latest)

Error return value of `writer.Write` is not checked (errcheck)
writer.Close()

env.mustWriteToFile(testlogName, buf.Bytes())
Expand Down Expand Up @@ -842,6 +874,38 @@
env.waitUntilInputStops()
}

// test_ignore_symlink from test_harvester.py
func TestFilestreamIgnoreSymlink(t *testing.T) {
env := newInputTestingEnvironment(t)

testlogName := "test.log"
symlinkName := "test.log.symlink"
id := "fake-ID-" + uuid.Must(uuid.NewV4()).String()
inp := env.mustCreateInput(map[string]interface{}{
"id": id,
"paths": []string{
env.abspath(symlinkName),
},
"prospector.scanner.fingerprint.enabled": false,
"file_identity.native": map[string]any{},
})

line := []byte("first line\n")
env.mustWriteToFile(testlogName, line)
env.mustSymlink(testlogName, symlinkName)

ctx, cancelInput := context.WithCancel(t.Context())
env.startInput(ctx, id, inp)

env.WaitLogsContains("is a symlink and they're disabled", 5*time.Second)
require.Empty(t, env.getOutputMessages())

cancelInput()
env.waitUntilInputStops()

env.requireRegistryEntryCount(0)
}

// test_symlinks_enabled from test_harvester.py
func TestFilestreamSymlinksEnabled(t *testing.T) {
env := newInputTestingEnvironment(t)
Expand Down Expand Up @@ -974,6 +1038,40 @@
env.requireRegistryEntryCount(1)
}

// test_symlink_and_file from test_harvester.py
func TestFilestreamSymlinkAndFile(t *testing.T) {
env := newInputTestingEnvironment(t)

testlogName := "test.log"
symlinkName := "test.log.symlink"
id := "fake-ID-" + uuid.Must(uuid.NewV4()).String()
inp := env.mustCreateInput(map[string]interface{}{
"id": id,
"paths": []string{
env.abspath(testlogName),
env.abspath(symlinkName),
},
"prospector.scanner.symlinks": "true",
"prospector.scanner.fingerprint.enabled": false,
"file_identity.native": map[string]any{},
})

line := []byte("first line\n")
env.mustWriteToFile(testlogName, line)
env.mustSymlink(testlogName, symlinkName)

ctx, cancelInput := context.WithCancel(t.Context())
env.startInput(ctx, id, inp)

env.waitUntilEventCount(1)
env.requireEventsReceived([]string{"first line"})

cancelInput()
env.waitUntilInputStops()

env.requireRegistryEntryCount(1)
}

// test_truncate from test_harvester.py
func TestFilestreamTruncate(t *testing.T) {
env := newInputTestingEnvironment(t)
Expand Down Expand Up @@ -1063,6 +1161,91 @@
env.waitUntilInputStops()
}

// test_decode_error from test_harvester.py
func TestFilestreamDecodeError(t *testing.T) {
env := newInputTestingEnvironment(t)

testlogName := "test.log"
id := "fake-ID-" + uuid.Must(uuid.NewV4()).String()
inp := env.mustCreateInput(map[string]interface{}{
"id": id,
"paths": []string{env.abspath(testlogName)},
"encoding": "utf-16be",
"prospector.scanner.fingerprint.enabled": false,
"file_identity.native": map[string]any{},
})

encoder := unicode.UTF16(unicode.LittleEndian, unicode.IgnoreBOM).NewEncoder()
buf := bytes.NewBuffer(nil)
writer := transform.NewWriter(buf, encoder)
_, err := writer.Write([]byte("hello world1\n"))
require.NoError(t, err)
_, err = writer.Write([]byte("\U00012345=Ra"))
require.NoError(t, err)
_, err = writer.Write([]byte("\nhello world2\n"))
require.NoError(t, err)
require.NoError(t, writer.Close())

env.mustWriteToFile(testlogName, buf.Bytes())

ctx, cancelInput := context.WithCancel(t.Context())
env.startInput(ctx, id, inp)

env.waitUntilEventCount(3)
messages := env.getOutputMessages()
require.Equal(t, "hello world2", messages[2])

cancelInput()
env.waitUntilInputStops()

env.requireRegistryEntryCount(1)
}

// test_debug_reader from test_harvester.py
func TestFilestreamDebugReader(t *testing.T) {
env := newInputTestingEnvironment(t)
l, err := logp.ConfigureWithCoreLocal(logp.Config{Level: logp.DebugLevel, Selectors: []string{"*"}}, env.testLogger.Core())
if err != nil {
t.Fatalf("failed to configure logger: %+v", err)
}
env.testLogger.Logger = l

testlogName := "test.log"
id := "fake-ID-" + uuid.Must(uuid.NewV4()).String()
inp := env.mustCreateInput(map[string]interface{}{
"id": id,
"paths": []string{env.abspath(testlogName)},
"prospector.scanner.fingerprint.enabled": false,
"file_identity.native": map[string]any{},
})

lines := [][]byte{
[]byte("hello world1"),
[]byte("\n"),
{0, 0, 0, 0},
[]byte("\n"),
[]byte("hello world2"),
[]byte("\n"),
{0, 0, 0, 0},
[]byte("Hello World\n"),
bytes.Repeat([]byte("A"), 16*1024),
}

var fileContents []byte
for _, line := range lines {
fileContents = append(fileContents, line...)
}
env.mustWriteToFile(testlogName, fileContents)

ctx, cancelInput := context.WithCancel(t.Context())
env.startInput(ctx, id, inp)

env.WaitLogsContains("Matching null byte found at offset", 10*time.Second)

cancelInput()
env.waitUntilInputStops()
}

func TestGlobalIDCannotBeUsed(t *testing.T) {
env := newInputTestingEnvironment(t)
testlogName := "test.log"
Expand Down Expand Up @@ -1111,7 +1294,7 @@
}
n := 0
for n <= iterations {
f.Write([]byte(fmt.Sprintf("hello world %d\n", r*iterations+n)))

Check failure on line 1297 in filebeat/input/filestream/input_integration_test.go

View workflow job for this annotation

GitHub Actions / lint (macos-latest)

Error return value of `f.Write` is not checked (errcheck)

Check failure on line 1297 in filebeat/input/filestream/input_integration_test.go

View workflow job for this annotation

GitHub Actions / lint (windows-latest)

Error return value of `f.Write` is not checked (errcheck)

Check failure on line 1297 in filebeat/input/filestream/input_integration_test.go

View workflow job for this annotation

GitHub Actions / lint (ubuntu-latest)

Error return value of `f.Write` is not checked (errcheck)
n += 1
time.Sleep(100 * time.Millisecond)
}
Expand Down
Loading
Loading