77package integration
88
99import (
10+ "bytes"
1011 "context"
1112 "crypto/tls"
13+ "encoding/json"
1214 "fmt"
1315 "net/http"
16+ "net/http/httptest"
1417 "os"
1518 "path/filepath"
19+ "regexp"
20+ "slices"
21+ "strconv"
1622 "strings"
23+ "sync"
1724 "testing"
25+ "text/template"
1826 "time"
1927
2028 "github.com/google/go-cmp/cmp"
2129 "github.com/stretchr/testify/assert"
2230 "github.com/stretchr/testify/require"
31+ "go.opentelemetry.io/otel/sdk/metric"
32+ "go.opentelemetry.io/otel/sdk/metric/metricdata"
2333
2434 "github.com/gofrs/uuid/v5"
2535
@@ -28,6 +38,7 @@ import (
2838 "github.com/elastic/elastic-agent-libs/mapstr"
2939 "github.com/elastic/elastic-agent-libs/testing/estools"
3040 "github.com/elastic/go-elasticsearch/v8"
41+ "github.com/elastic/mock-es/pkg/api"
3142)
3243
3344var beatsCfgFile = `
@@ -309,3 +320,255 @@ processors:
309320 require .Contains (collect , out , expectedService )
310321 }, 10 * time .Second , 500 * time .Millisecond , "failed to get output of inspect command" )
311322}
323+
324+ func TestFilebeatOTelDocumentLevelRetries (t * testing.T ) {
325+ tests := []struct {
326+ name string
327+ maxRetries int
328+ failuresPerEvent int
329+ bulkErrorCode string
330+ eventIDsToFail []int
331+ expectedIngestedEventIDs []int
332+ }{
333+ {
334+ name : "bulk 429 with retries" ,
335+ maxRetries : 3 ,
336+ failuresPerEvent : 2 , // Fail 2 times, succeed on 3rd attempt
337+ bulkErrorCode : "429" , // retryable error
338+ eventIDsToFail : []int {1 , 3 , 5 , 7 },
339+ expectedIngestedEventIDs : []int {0 , 1 , 2 , 3 , 4 , 5 , 6 , 7 , 8 , 9 }, // All events should eventually be ingested
340+ },
341+ {
342+ name : "bulk exhausts retries" ,
343+ maxRetries : 3 ,
344+ failuresPerEvent : 5 , // Fail more than max_retries
345+ bulkErrorCode : "429" ,
346+ eventIDsToFail : []int {2 , 4 , 6 , 8 },
347+ expectedIngestedEventIDs : []int {0 , 1 , 3 , 5 , 7 , 9 }, // Only non-failing events should be ingested
348+ },
349+ {
350+ name : "bulk with permanent mapping errors" ,
351+ maxRetries : 3 ,
352+ failuresPerEvent : 0 , // always fail
353+ bulkErrorCode : "400" , // never retried
354+ eventIDsToFail : []int {1 , 4 , 8 }, // Only specific events fail
355+ expectedIngestedEventIDs : []int {0 , 2 , 3 , 5 , 6 , 7 , 9 }, // Only non-failing events should be ingested
356+ },
357+ }
358+
359+ const numTestEvents = 10
360+ reEventLine := regexp .MustCompile (`"message":"Line (\d+)"` )
361+
362+ for _ , tt := range tests {
363+ t .Run (tt .name , func (t * testing.T ) {
364+ var ingestedTestEvents []string
365+ var mu sync.Mutex
366+ eventFailureCounts := make (map [string ]int )
367+
368+ deterministicHandler := func (action api.Action , event []byte ) int {
369+ // Handle non-bulk requests
370+ if action .Action != "create" {
371+ return http .StatusOK
372+ }
373+
374+ // Extract event ID from the event data
375+ if matches := reEventLine .FindSubmatch (event ); len (matches ) > 1 {
376+ eventIDStr := string (matches [1 ])
377+ eventID , err := strconv .Atoi (eventIDStr )
378+ if err != nil {
379+ return http .StatusInternalServerError
380+ }
381+
382+ eventKey := "Line " + eventIDStr
383+
384+ mu .Lock ()
385+ defer mu .Unlock ()
386+
387+ isFailingEvent := slices .Contains (tt .eventIDsToFail , eventID )
388+
389+ var shouldFail bool
390+ if isFailingEvent {
391+ // This event is configured to fail
392+ failureCount := eventFailureCounts [eventKey ]
393+
394+ switch tt .bulkErrorCode {
395+ case "400" :
396+ // Permanent errors always fail
397+ shouldFail = true
398+ case "429" :
399+ // Temporary errors fail until failuresPerEvent threshold
400+ shouldFail = failureCount < tt .failuresPerEvent
401+ }
402+ } else {
403+ // Events not in the fail list always succeed
404+ shouldFail = false
405+ }
406+
407+ if shouldFail {
408+ eventFailureCounts [eventKey ] = eventFailureCounts [eventKey ] + 1
409+ if tt .bulkErrorCode == "429" {
410+ return http .StatusTooManyRequests
411+ } else {
412+ return http .StatusBadRequest
413+ }
414+ }
415+
416+ // track ingested event
417+ found := false
418+ for _ , existing := range ingestedTestEvents {
419+ if existing == eventKey {
420+ found = true
421+ break
422+ }
423+ }
424+ if ! found {
425+ ingestedTestEvents = append (ingestedTestEvents , eventKey )
426+ }
427+ return http .StatusOK
428+ }
429+
430+ return http .StatusOK
431+ }
432+
433+ reader := metric .NewManualReader ()
434+ provider := metric .NewMeterProvider (metric .WithReader (reader ))
435+
436+ mux := http .NewServeMux ()
437+ mux .Handle ("/" , api .NewDeterministicAPIHandler (
438+ uuid .Must (uuid .NewV4 ()),
439+ "" ,
440+ provider ,
441+ time .Now ().Add (24 * time .Hour ),
442+ 0 ,
443+ 0 ,
444+ deterministicHandler ,
445+ ))
446+
447+ server := httptest .NewServer (mux )
448+ defer server .Close ()
449+
450+ filebeatOTel := integration .NewBeat (
451+ t ,
452+ "filebeat-otel" ,
453+ "../../filebeat.test" ,
454+ "otel" ,
455+ )
456+
457+ namespace := strings .ReplaceAll (uuid .Must (uuid .NewV4 ()).String (), "-" , "" )
458+ index := "logs-integration-" + namespace
459+
460+ beatsConfig := struct {
461+ Index string
462+ InputFile string
463+ ESEndpoint string
464+ MaxRetries int
465+ MonitoringPort int
466+ }{
467+ Index : index ,
468+ InputFile : filepath .Join (filebeatOTel .TempDir (), "log.log" ),
469+ ESEndpoint : server .URL ,
470+ MaxRetries : tt .maxRetries ,
471+ MonitoringPort : int (libbeattesting .MustAvailableTCP4Port (t )),
472+ }
473+
474+ cfg := `
475+ filebeat.inputs:
476+ - type: filestream
477+ id: filestream-input-id
478+ enabled: true
479+ file_identity.native: ~
480+ prospector.scanner.fingerprint.enabled: false
481+ paths:
482+ - {{.InputFile}}
483+ output:
484+ elasticsearch:
485+ hosts:
486+ - {{.ESEndpoint}}
487+ username: admin
488+ password: testing
489+ index: {{.Index}}
490+ compression_level: 0
491+ max_retries: {{.MaxRetries}}
492+ logging.level: debug
493+ queue.mem.flush.timeout: 0s
494+ setup.template.enabled: false
495+ http.enabled: true
496+ http.host: localhost
497+ http.port: {{.MonitoringPort}}
498+ `
499+ var configBuffer bytes.Buffer
500+ require .NoError (t ,
501+ template .Must (template .New ("config" ).Parse (cfg )).Execute (& configBuffer , beatsConfig ))
502+
503+ filebeatOTel .WriteConfigFile (configBuffer .String ())
504+ writeEventsToLogFile (t , beatsConfig .InputFile , numTestEvents )
505+ filebeatOTel .Start ()
506+ defer filebeatOTel .Stop ()
507+
508+ // Wait for file input to be fully read
509+ filebeatOTel .WaitStdErrContains (fmt .Sprintf ("End of file reached: %s; Backoff now." , beatsConfig .InputFile ), 30 * time .Second )
510+
511+ // Wait for expected events to be ingested
512+ require .EventuallyWithT (t , func (ct * assert.CollectT ) {
513+ mu .Lock ()
514+ defer mu .Unlock ()
515+
516+ // collect mock-es metrics
517+ rm := metricdata.ResourceMetrics {}
518+ err := reader .Collect (context .Background (), & rm )
519+ assert .NoError (ct , err , "failed to collect metrics from mock-es" )
520+ metrics := make (map [string ]int64 )
521+ for _ , sm := range rm .ScopeMetrics {
522+ for _ , m := range sm .Metrics {
523+ if sum , ok := m .Data .(metricdata.Sum [int64 ]); ok {
524+ var total int64
525+ for _ , dp := range sum .DataPoints {
526+ total += dp .Value
527+ }
528+ metrics [m .Name ] = total
529+ }
530+ }
531+ }
532+ assert .Equal (ct , int64 (len (tt .expectedIngestedEventIDs )), metrics ["bulk.create.ok" ], "expected bulk.create.ok metric to match ingested events" )
533+
534+ // If we have the right count, validate the specific events
535+ // Verify we have the correct events ingested
536+ for _ , expectedID := range tt .expectedIngestedEventIDs {
537+ expectedEventKey := fmt .Sprintf ("Line %d" , expectedID )
538+ found := false
539+ for _ , ingested := range ingestedTestEvents {
540+ if ingested == expectedEventKey {
541+ found = true
542+ break
543+ }
544+ }
545+ assert .True (ct , found , "expected _bulk event %s to be ingested" , expectedEventKey )
546+ }
547+
548+ // Verify we have valid line content for all ingested events
549+ for _ , ingested := range ingestedTestEvents {
550+ assert .Regexp (ct , `^Line \d+$` , ingested , "unexpected ingested event format: %s" , ingested )
551+ }
552+ }, 30 * time .Second , 1 * time .Second , "timed out waiting for expected event processing" )
553+
554+ // Confirm filebeat agreed with our accounting of ingested events
555+ require .EventuallyWithT (t , func (ct * assert.CollectT ) {
556+ address := fmt .Sprintf ("http://localhost:%d" , beatsConfig .MonitoringPort )
557+ r , err := http .Get (address + "/stats" ) //nolint:noctx,bodyclose // fine for tests
558+ assert .NoError (ct , err )
559+ assert .Equal (ct , http .StatusOK , r .StatusCode , "incorrect status code" )
560+ var m mapstr.M
561+ err = json .NewDecoder (r .Body ).Decode (& m )
562+ assert .NoError (ct , err )
563+
564+ m = m .Flatten ()
565+
566+ // Currently, otelconsumer either ACKs or fails the entire batch and has no visibility into individual event failures within the exporter.
567+ // From otelconsumer's perspective, the whole batch is considered successful as long as ConsumeLogs returns no error.
568+ assert .Equal (ct , float64 (numTestEvents ), m ["libbeat.output.events.total" ], "expected total events sent to output to match" )
569+ assert .Equal (ct , float64 (numTestEvents ), m ["libbeat.output.events.acked" ], "expected total events acked to match" )
570+ assert .Equal (ct , float64 (0 ), m ["libbeat.output.events.dropped" ], "expected total events dropped to match" )
571+ }, 10 * time .Second , 100 * time .Millisecond , "expected output stats to be available in monitoring endpoint" )
572+ })
573+ }
574+ }
0 commit comments