@@ -38,6 +38,7 @@ import (
3838 "golang.org/x/text/transform"
3939
4040 "github.com/elastic/beats/v7/libbeat/tests/integration"
41+ "github.com/elastic/elastic-agent-libs/logp"
4142)
4243
4344// test_close_renamed from test_harvester.py
@@ -298,6 +299,37 @@ func TestFilestreamEmptyLinesOnly(t *testing.T) {
298299 env .requireNoEntryInRegistry (testlogName , id )
299300}
300301
302+ // test_exceed_buffer from test_harvester.py
303+ func TestFilestreamExceedBuffer (t * testing.T ) {
304+ env := newInputTestingEnvironment (t )
305+
306+ testlogName := "test.log"
307+ id := "fake-ID-" + uuid .Must (uuid .NewV4 ()).String ()
308+ inp := env .mustCreateInput (map [string ]interface {}{
309+ "id" : id ,
310+ "paths" : []string {env .abspath (testlogName )},
311+ "buffer_size" : 10 ,
312+ "prospector.scanner.check_interval" : "1ms" ,
313+ "prospector.scanner.fingerprint.enabled" : false ,
314+ "file_identity.native" : map [string ]any {},
315+ })
316+
317+ ctx , cancelInput := context .WithCancel (t .Context ())
318+ env .startInput (ctx , id , inp )
319+
320+ message := "This exceeds the buffer"
321+ testlines := []byte (message + "\n " )
322+ env .mustWriteToFile (testlogName , testlines )
323+
324+ env .waitUntilEventCount (1 )
325+ env .requireEventsReceived ([]string {message })
326+
327+ cancelInput ()
328+ env .waitUntilInputStops ()
329+
330+ env .requireOffsetInRegistry (testlogName , id , len (testlines ))
331+ }
332+
301333// test_bom_utf8 from test_harvester.py
302334func TestFilestreamBOMUTF8 (t * testing.T ) {
303335 env := newInputTestingEnvironment (t )
@@ -841,6 +873,38 @@ func TestFilestreamTruncateBlockedOutput(t *testing.T) {
841873 env .waitUntilInputStops ()
842874}
843875
876+ // test_ignore_symlink from test_harvester.py
877+ func TestFilestreamIgnoreSymlink (t * testing.T ) {
878+ env := newInputTestingEnvironment (t )
879+
880+ testlogName := "test.log"
881+ symlinkName := "test.log.symlink"
882+ id := "fake-ID-" + uuid .Must (uuid .NewV4 ()).String ()
883+ inp := env .mustCreateInput (map [string ]interface {}{
884+ "id" : id ,
885+ "paths" : []string {
886+ env .abspath (symlinkName ),
887+ },
888+ "prospector.scanner.fingerprint.enabled" : false ,
889+ "file_identity.native" : map [string ]any {},
890+ })
891+
892+ line := []byte ("first line\n " )
893+ env .mustWriteToFile (testlogName , line )
894+ env .mustSymlink (testlogName , symlinkName )
895+
896+ ctx , cancelInput := context .WithCancel (t .Context ())
897+ env .startInput (ctx , id , inp )
898+
899+ env .WaitLogsContains ("is a symlink and they're disabled" , 5 * time .Second )
900+ require .Empty (t , env .getOutputMessages ())
901+
902+ cancelInput ()
903+ env .waitUntilInputStops ()
904+
905+ env .requireRegistryEntryCount (0 )
906+ }
907+
844908// test_symlinks_enabled from test_harvester.py
845909func TestFilestreamSymlinksEnabled (t * testing.T ) {
846910 env := newInputTestingEnvironment (t )
@@ -973,6 +1037,40 @@ func TestFilestreamSymlinkRemoved(t *testing.T) {
9731037 env .requireRegistryEntryCount (1 )
9741038}
9751039
1040+ // test_symlink_and_file from test_harvester.py
1041+ func TestFilestreamSymlinkAndFile (t * testing.T ) {
1042+ env := newInputTestingEnvironment (t )
1043+
1044+ testlogName := "test.log"
1045+ symlinkName := "test.log.symlink"
1046+ id := "fake-ID-" + uuid .Must (uuid .NewV4 ()).String ()
1047+ inp := env .mustCreateInput (map [string ]interface {}{
1048+ "id" : id ,
1049+ "paths" : []string {
1050+ env .abspath (testlogName ),
1051+ env .abspath (symlinkName ),
1052+ },
1053+ "prospector.scanner.symlinks" : "true" ,
1054+ "prospector.scanner.fingerprint.enabled" : false ,
1055+ "file_identity.native" : map [string ]any {},
1056+ })
1057+
1058+ line := []byte ("first line\n " )
1059+ env .mustWriteToFile (testlogName , line )
1060+ env .mustSymlink (testlogName , symlinkName )
1061+
1062+ ctx , cancelInput := context .WithCancel (t .Context ())
1063+ env .startInput (ctx , id , inp )
1064+
1065+ env .waitUntilEventCount (1 )
1066+ env .requireEventsReceived ([]string {"first line" })
1067+
1068+ cancelInput ()
1069+ env .waitUntilInputStops ()
1070+
1071+ env .requireRegistryEntryCount (1 )
1072+ }
1073+
9761074// test_truncate from test_harvester.py
9771075func TestFilestreamTruncate (t * testing.T ) {
9781076 env := newInputTestingEnvironment (t )
@@ -1062,6 +1160,91 @@ func TestFilestreamHarvestAllFilesWhenHarvesterLimitExceeded(t *testing.T) {
10621160 env .waitUntilInputStops ()
10631161}
10641162
1163+ // test_decode_error from test_harvester.py
1164+ func TestFilestreamDecodeError (t * testing.T ) {
1165+ env := newInputTestingEnvironment (t )
1166+
1167+ testlogName := "test.log"
1168+ id := "fake-ID-" + uuid .Must (uuid .NewV4 ()).String ()
1169+ inp := env .mustCreateInput (map [string ]interface {}{
1170+ "id" : id ,
1171+ "paths" : []string {env .abspath (testlogName )},
1172+ "encoding" : "utf-16be" ,
1173+ "prospector.scanner.fingerprint.enabled" : false ,
1174+ "file_identity.native" : map [string ]any {},
1175+ })
1176+
1177+ encoder := unicode .UTF16 (unicode .LittleEndian , unicode .IgnoreBOM ).NewEncoder ()
1178+ buf := bytes .NewBuffer (nil )
1179+ writer := transform .NewWriter (buf , encoder )
1180+ _ , err := writer .Write ([]byte ("hello world1\n " ))
1181+ require .NoError (t , err )
1182+ _ , err = writer .Write ([]byte ("\U00012345 =Ra" ))
1183+ require .NoError (t , err )
1184+ _ , err = writer .Write ([]byte ("\n hello world2\n " ))
1185+ require .NoError (t , err )
1186+ require .NoError (t , writer .Close ())
1187+
1188+ env .mustWriteToFile (testlogName , buf .Bytes ())
1189+
1190+ ctx , cancelInput := context .WithCancel (t .Context ())
1191+ env .startInput (ctx , id , inp )
1192+
1193+ env .waitUntilEventCount (3 )
1194+ messages := env .getOutputMessages ()
1195+ require .Equal (t , "hello world2" , messages [2 ])
1196+
1197+ cancelInput ()
1198+ env .waitUntilInputStops ()
1199+
1200+ env .requireRegistryEntryCount (1 )
1201+ }
1202+
1203+ // test_debug_reader from test_harvester.py
1204+ func TestFilestreamDebugReader (t * testing.T ) {
1205+ env := newInputTestingEnvironment (t )
1206+ l , err := logp .ConfigureWithCoreLocal (logp.Config {Level : logp .DebugLevel , Selectors : []string {"*" }}, env .testLogger .Core ())
1207+ if err != nil {
1208+ t .Fatalf ("failed to configure logger: %+v" , err )
1209+ }
1210+ env .testLogger .Logger = l
1211+
1212+ testlogName := "test.log"
1213+ id := "fake-ID-" + uuid .Must (uuid .NewV4 ()).String ()
1214+ inp := env .mustCreateInput (map [string ]interface {}{
1215+ "id" : id ,
1216+ "paths" : []string {env .abspath (testlogName )},
1217+ "prospector.scanner.fingerprint.enabled" : false ,
1218+ "file_identity.native" : map [string ]any {},
1219+ })
1220+
1221+ lines := [][]byte {
1222+ []byte ("hello world1" ),
1223+ []byte ("\n " ),
1224+ {0 , 0 , 0 , 0 },
1225+ []byte ("\n " ),
1226+ []byte ("hello world2" ),
1227+ []byte ("\n " ),
1228+ {0 , 0 , 0 , 0 },
1229+ []byte ("Hello World\n " ),
1230+ bytes .Repeat ([]byte ("A" ), 16 * 1024 ),
1231+ }
1232+
1233+ var fileContents []byte
1234+ for _ , line := range lines {
1235+ fileContents = append (fileContents , line ... )
1236+ }
1237+ env .mustWriteToFile (testlogName , fileContents )
1238+
1239+ ctx , cancelInput := context .WithCancel (t .Context ())
1240+ env .startInput (ctx , id , inp )
1241+
1242+ env .WaitLogsContains ("Matching null byte found at offset" , 10 * time .Second )
1243+
1244+ cancelInput ()
1245+ env .waitUntilInputStops ()
1246+ }
1247+
10651248func TestGlobalIDCannotBeUsed (t * testing.T ) {
10661249 env := newInputTestingEnvironment (t )
10671250 testlogName := "test.log"
0 commit comments