Skip to content

Commit 400d3d8

Browse files
github-actions[bot]Copilotmauri870
authored andcommitted
filebeat: port remaining harvester tests to filestream (#49706)
* filebeat: port remaining harvester tests to filestream Port remaining coverage from test_harvester.py into filestream input integration tests and remove the redundant Python test file. Co-authored-by: Copilot <223556219+Copilot@users.noreply.github.com> * fix linter issues * fix test context * fix issue with logger with no selectors --------- Co-authored-by: github-actions[bot] <github-actions[bot]@users.noreply.github.com> Co-authored-by: Copilot <223556219+Copilot@users.noreply.github.com> Co-authored-by: Mauri de Souza Meneguzzo <mauri870@gmail.com> (cherry picked from commit f558378)
1 parent 1058ebd commit 400d3d8

File tree

2 files changed

+183
-863
lines changed

2 files changed

+183
-863
lines changed

filebeat/input/filestream/input_integration_test.go

Lines changed: 183 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -38,6 +38,7 @@ import (
3838
"golang.org/x/text/transform"
3939

4040
"github.com/elastic/beats/v7/libbeat/tests/integration"
41+
"github.com/elastic/elastic-agent-libs/logp"
4142
)
4243

4344
// test_close_renamed from test_harvester.py
@@ -299,6 +300,37 @@ func TestFilestreamEmptyLinesOnly(t *testing.T) {
299300
env.requireNoEntryInRegistry(testlogName, id)
300301
}
301302

303+
// test_exceed_buffer from test_harvester.py
304+
func TestFilestreamExceedBuffer(t *testing.T) {
305+
env := newInputTestingEnvironment(t)
306+
307+
testlogName := "test.log"
308+
id := "fake-ID-" + uuid.Must(uuid.NewV4()).String()
309+
inp := env.mustCreateInput(map[string]interface{}{
310+
"id": id,
311+
"paths": []string{env.abspath(testlogName)},
312+
"buffer_size": 10,
313+
"prospector.scanner.check_interval": "1ms",
314+
"prospector.scanner.fingerprint.enabled": false,
315+
"file_identity.native": map[string]any{},
316+
})
317+
318+
ctx, cancelInput := context.WithCancel(t.Context())
319+
env.startInput(ctx, id, inp)
320+
321+
message := "This exceeds the buffer"
322+
testlines := []byte(message + "\n")
323+
env.mustWriteToFile(testlogName, testlines)
324+
325+
env.waitUntilEventCount(1)
326+
env.requireEventsReceived([]string{message})
327+
328+
cancelInput()
329+
env.waitUntilInputStops()
330+
331+
env.requireOffsetInRegistry(testlogName, id, len(testlines))
332+
}
333+
302334
// test_bom_utf8 from test_harvester.py
303335
func TestFilestreamBOMUTF8(t *testing.T) {
304336
env := newInputTestingEnvironment(t)
@@ -842,6 +874,38 @@ func TestFilestreamTruncateBlockedOutput(t *testing.T) {
842874
env.waitUntilInputStops()
843875
}
844876

877+
// test_ignore_symlink from test_harvester.py
878+
func TestFilestreamIgnoreSymlink(t *testing.T) {
879+
env := newInputTestingEnvironment(t)
880+
881+
testlogName := "test.log"
882+
symlinkName := "test.log.symlink"
883+
id := "fake-ID-" + uuid.Must(uuid.NewV4()).String()
884+
inp := env.mustCreateInput(map[string]interface{}{
885+
"id": id,
886+
"paths": []string{
887+
env.abspath(symlinkName),
888+
},
889+
"prospector.scanner.fingerprint.enabled": false,
890+
"file_identity.native": map[string]any{},
891+
})
892+
893+
line := []byte("first line\n")
894+
env.mustWriteToFile(testlogName, line)
895+
env.mustSymlink(testlogName, symlinkName)
896+
897+
ctx, cancelInput := context.WithCancel(t.Context())
898+
env.startInput(ctx, id, inp)
899+
900+
env.WaitLogsContains("is a symlink and they're disabled", 5*time.Second)
901+
require.Empty(t, env.getOutputMessages())
902+
903+
cancelInput()
904+
env.waitUntilInputStops()
905+
906+
env.requireRegistryEntryCount(0)
907+
}
908+
845909
// test_symlinks_enabled from test_harvester.py
846910
func TestFilestreamSymlinksEnabled(t *testing.T) {
847911
env := newInputTestingEnvironment(t)
@@ -974,6 +1038,40 @@ func TestFilestreamSymlinkRemoved(t *testing.T) {
9741038
env.requireRegistryEntryCount(1)
9751039
}
9761040

1041+
// test_symlink_and_file from test_harvester.py
1042+
func TestFilestreamSymlinkAndFile(t *testing.T) {
1043+
env := newInputTestingEnvironment(t)
1044+
1045+
testlogName := "test.log"
1046+
symlinkName := "test.log.symlink"
1047+
id := "fake-ID-" + uuid.Must(uuid.NewV4()).String()
1048+
inp := env.mustCreateInput(map[string]interface{}{
1049+
"id": id,
1050+
"paths": []string{
1051+
env.abspath(testlogName),
1052+
env.abspath(symlinkName),
1053+
},
1054+
"prospector.scanner.symlinks": "true",
1055+
"prospector.scanner.fingerprint.enabled": false,
1056+
"file_identity.native": map[string]any{},
1057+
})
1058+
1059+
line := []byte("first line\n")
1060+
env.mustWriteToFile(testlogName, line)
1061+
env.mustSymlink(testlogName, symlinkName)
1062+
1063+
ctx, cancelInput := context.WithCancel(t.Context())
1064+
env.startInput(ctx, id, inp)
1065+
1066+
env.waitUntilEventCount(1)
1067+
env.requireEventsReceived([]string{"first line"})
1068+
1069+
cancelInput()
1070+
env.waitUntilInputStops()
1071+
1072+
env.requireRegistryEntryCount(1)
1073+
}
1074+
9771075
// test_truncate from test_harvester.py
9781076
func TestFilestreamTruncate(t *testing.T) {
9791077
env := newInputTestingEnvironment(t)
@@ -1063,6 +1161,91 @@ func TestFilestreamHarvestAllFilesWhenHarvesterLimitExceeded(t *testing.T) {
10631161
env.waitUntilInputStops()
10641162
}
10651163

1164+
// test_decode_error from test_harvester.py
1165+
func TestFilestreamDecodeError(t *testing.T) {
1166+
env := newInputTestingEnvironment(t)
1167+
1168+
testlogName := "test.log"
1169+
id := "fake-ID-" + uuid.Must(uuid.NewV4()).String()
1170+
inp := env.mustCreateInput(map[string]interface{}{
1171+
"id": id,
1172+
"paths": []string{env.abspath(testlogName)},
1173+
"encoding": "utf-16be",
1174+
"prospector.scanner.fingerprint.enabled": false,
1175+
"file_identity.native": map[string]any{},
1176+
})
1177+
1178+
encoder := unicode.UTF16(unicode.LittleEndian, unicode.IgnoreBOM).NewEncoder()
1179+
buf := bytes.NewBuffer(nil)
1180+
writer := transform.NewWriter(buf, encoder)
1181+
_, err := writer.Write([]byte("hello world1\n"))
1182+
require.NoError(t, err)
1183+
_, err = writer.Write([]byte("\U00012345=Ra"))
1184+
require.NoError(t, err)
1185+
_, err = writer.Write([]byte("\nhello world2\n"))
1186+
require.NoError(t, err)
1187+
require.NoError(t, writer.Close())
1188+
1189+
env.mustWriteToFile(testlogName, buf.Bytes())
1190+
1191+
ctx, cancelInput := context.WithCancel(t.Context())
1192+
env.startInput(ctx, id, inp)
1193+
1194+
env.waitUntilEventCount(3)
1195+
messages := env.getOutputMessages()
1196+
require.Equal(t, "hello world2", messages[2])
1197+
1198+
cancelInput()
1199+
env.waitUntilInputStops()
1200+
1201+
env.requireRegistryEntryCount(1)
1202+
}
1203+
1204+
// test_debug_reader from test_harvester.py
1205+
func TestFilestreamDebugReader(t *testing.T) {
1206+
env := newInputTestingEnvironment(t)
1207+
l, err := logp.ConfigureWithCoreLocal(logp.Config{Level: logp.DebugLevel, Selectors: []string{"*"}}, env.testLogger.Core())
1208+
if err != nil {
1209+
t.Fatalf("failed to configure logger: %+v", err)
1210+
}
1211+
env.testLogger.Logger = l
1212+
1213+
testlogName := "test.log"
1214+
id := "fake-ID-" + uuid.Must(uuid.NewV4()).String()
1215+
inp := env.mustCreateInput(map[string]interface{}{
1216+
"id": id,
1217+
"paths": []string{env.abspath(testlogName)},
1218+
"prospector.scanner.fingerprint.enabled": false,
1219+
"file_identity.native": map[string]any{},
1220+
})
1221+
1222+
lines := [][]byte{
1223+
[]byte("hello world1"),
1224+
[]byte("\n"),
1225+
{0, 0, 0, 0},
1226+
[]byte("\n"),
1227+
[]byte("hello world2"),
1228+
[]byte("\n"),
1229+
{0, 0, 0, 0},
1230+
[]byte("Hello World\n"),
1231+
bytes.Repeat([]byte("A"), 16*1024),
1232+
}
1233+
1234+
var fileContents []byte
1235+
for _, line := range lines {
1236+
fileContents = append(fileContents, line...)
1237+
}
1238+
env.mustWriteToFile(testlogName, fileContents)
1239+
1240+
ctx, cancelInput := context.WithCancel(t.Context())
1241+
env.startInput(ctx, id, inp)
1242+
1243+
env.WaitLogsContains("Matching null byte found at offset", 10*time.Second)
1244+
1245+
cancelInput()
1246+
env.waitUntilInputStops()
1247+
}
1248+
10661249
func TestGlobalIDCannotBeUsed(t *testing.T) {
10671250
env := newInputTestingEnvironment(t)
10681251
testlogName := "test.log"

0 commit comments

Comments
 (0)