From 3e8064149a8518e192279f5696ce8d71a7262626 Mon Sep 17 00:00:00 2001 From: Panos Koutsovasilis Date: Thu, 9 Oct 2025 11:41:39 +0300 Subject: [PATCH 1/2] feat: utilise continue_on_err in beatsauthextension (#10343) * feat: rework elasticsearch output translation to otel config to exclude validation errors * ci: add integration test (cherry picked from commit 0c0dada005589778334dff33753b2b945269a652) # Conflicts: # internal/pkg/otel/translate/otelconfig.go --- internal/pkg/otel/translate/otelconfig.go | 30 +- .../pkg/otel/translate/otelconfig_test.go | 1 + .../otel/translate/output_elasticsearch.go | 263 +++++++++++++ .../translate/output_elasticsearch_test.go | 361 ++++++++++++++++++ testing/integration/ess/otel_test.go | 135 +++++++ 5 files changed, 786 insertions(+), 4 deletions(-) create mode 100644 internal/pkg/otel/translate/output_elasticsearch.go create mode 100644 internal/pkg/otel/translate/output_elasticsearch_test.go diff --git a/internal/pkg/otel/translate/otelconfig.go b/internal/pkg/otel/translate/otelconfig.go index 63b7305182f..46559066410 100644 --- a/internal/pkg/otel/translate/otelconfig.go +++ b/internal/pkg/otel/translate/otelconfig.go @@ -11,8 +11,12 @@ import ( "slices" "strings" +<<<<<<< HEAD "github.com/elastic/elastic-agent-libs/logp" +======= + "github.com/go-viper/mapstructure/v2" +>>>>>>> 0c0dada00 (feat: utilise continue_on_err in beatsauthextension (#10343)) koanfmaps "github.com/knadh/koanf/maps" componentmonitoring "github.com/elastic/elastic-agent/internal/pkg/agent/application/monitoring/component" @@ -22,7 +26,6 @@ import ( "go.opentelemetry.io/collector/pipeline" "golang.org/x/exp/maps" - elasticsearchtranslate "github.com/elastic/beats/v7/libbeat/otelbeat/oteltranslate/outputs/elasticsearch" "github.com/elastic/beats/v7/libbeat/outputs/elasticsearch" "github.com/elastic/beats/v7/x-pack/filebeat/fbreceiver" "github.com/elastic/beats/v7/x-pack/libbeat/management" @@ -500,7 +503,7 @@ func getDefaultDatastreamTypeForComponent(comp *component.Component) (string, er // translateEsOutputToExporter translates an elasticsearch output configuration to an elasticsearch exporter configuration. func translateEsOutputToExporter(cfg *config.C, logger *logp.Logger) (map[string]any, error) { - esConfig, err := elasticsearchtranslate.ToOTelConfig(cfg, logger) + esConfig, err := ToOTelConfig(cfg, logger) if err != nil { return nil, err } @@ -520,13 +523,28 @@ func BeatDataPath(componentId string) string { // getBeatsAuthExtensionConfig sets http transport settings on beatsauth // currently this is only supported for elasticsearch output -func getBeatsAuthExtensionConfig(cfg *config.C) (map[string]any, error) { +func getBeatsAuthExtensionConfig(outputCfg *config.C) (map[string]any, error) { defaultTransportSettings := elasticsearch.ESDefaultTransportSettings() - err := cfg.Unpack(&defaultTransportSettings) + + var resultMap map[string]any + if err := outputCfg.Unpack(&resultMap); err != nil { + return nil, err + } + + decoder, err := mapstructure.NewDecoder(&mapstructure.DecoderConfig{ + Result: &defaultTransportSettings, + TagName: "config", + SquashTagOption: "inline", + DecodeHook: cfgDecodeHookFunc(), + }) if err != nil { return nil, err } + if err = decoder.Decode(&resultMap); err != nil { + return nil, err + } + newConfig, err := config.NewConfigFrom(defaultTransportSettings) if err != nil { return nil, err @@ -538,5 +556,9 @@ func getBeatsAuthExtensionConfig(cfg *config.C) (map[string]any, error) { return nil, err } + // required to make the extension not cause the collector to fail and exit + // on startup + newMap["continue_on_error"] = true + return newMap, nil } diff --git a/internal/pkg/otel/translate/otelconfig_test.go b/internal/pkg/otel/translate/otelconfig_test.go index 8bc6a53118a..d03048d86b6 100644 --- a/internal/pkg/otel/translate/otelconfig_test.go +++ b/internal/pkg/otel/translate/otelconfig_test.go @@ -236,6 +236,7 @@ func TestGetOtelConfig(t *testing.T) { expectedExtensionConfig := func(extra ...extraParams) map[string]any { finalOutput := map[string]any{ + "continue_on_error": true, "idle_connection_timeout": "3s", "proxy_disable": false, "ssl": map[string]interface{}{ diff --git a/internal/pkg/otel/translate/output_elasticsearch.go b/internal/pkg/otel/translate/output_elasticsearch.go new file mode 100644 index 00000000000..f3b6960431d --- /dev/null +++ b/internal/pkg/otel/translate/output_elasticsearch.go @@ -0,0 +1,263 @@ +// Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one +// or more contributor license agreements. Licensed under the Elastic License 2.0; +// you may not use this file except in compliance with the Elastic License 2.0. + +package translate + +import ( + "encoding/base64" + "errors" + "fmt" + "reflect" + "strings" + "time" + + "github.com/go-viper/mapstructure/v2" + + "github.com/elastic/beats/v7/libbeat/common" + "github.com/elastic/beats/v7/libbeat/outputs" + "github.com/elastic/beats/v7/libbeat/outputs/elasticsearch" + "github.com/elastic/beats/v7/libbeat/publisher/queue/memqueue" + "github.com/elastic/elastic-agent-libs/config" + "github.com/elastic/elastic-agent-libs/logp" + "github.com/elastic/elastic-agent-libs/transport/tlscommon" +) + +type esToOTelOptions struct { + elasticsearch.ElasticsearchConfig `config:",inline"` + outputs.HostWorkerCfg `config:",inline"` + + Index string `config:"index"` + Pipeline string `config:"pipeline"` + Preset string `config:"preset"` +} + +var defaultOptions = esToOTelOptions{ + ElasticsearchConfig: elasticsearch.DefaultConfig(), + + Index: "", // Dynamic routing is disabled if index is set + Pipeline: "", + Preset: "custom", // default is custom if not set + HostWorkerCfg: outputs.HostWorkerCfg{ + Workers: 1, + }, +} + +// ToOTelConfig converts a Beat config into OTel elasticsearch exporter config +// Ensure cloudid is handled before calling this method +// Note: This method may override output queue settings defined by user. +func ToOTelConfig(output *config.C, logger *logp.Logger) (map[string]any, error) { + escfg := defaultOptions + + // check for unsupported config + err := checkUnsupportedConfig(output, logger) + if err != nil { + return nil, err + } + + // apply preset here + // It is important to apply preset before unpacking the config, as preset can override output fields + preset, err := output.String("preset", -1) + if err == nil { + // Performance preset is present, apply it and log any fields that + // were overridden + overriddenFields, presetConfig, err := elasticsearch.ApplyPreset(preset, output) + if err != nil { + return nil, err + } + logger.Infof("Applying performance preset '%v': %v", + preset, config.DebugString(presetConfig, false)) + logger.Warnf("Performance preset '%v' overrides user setting for field(s): %s", + preset, strings.Join(overriddenFields, ",")) + } + + unpackedMap := make(map[string]any) + // unpack and validate ES config + if err := output.Unpack(&unpackedMap); err != nil { + return nil, fmt.Errorf("failed unpacking config. %w", err) + } + + decoder, err := mapstructure.NewDecoder(&mapstructure.DecoderConfig{ + Result: &escfg, + TagName: "config", + SquashTagOption: "inline", + DecodeHook: cfgDecodeHookFunc(), + }) + if err != nil { + return nil, fmt.Errorf("failed creating decoder. %w", err) + } + + err = decoder.Decode(&unpackedMap) + if err != nil { + return nil, fmt.Errorf("failed decoding config. %w", err) + } + + if err := escfg.Validate(); err != nil { + return nil, err + } + + // Create url using host name, protocol and path + hosts := []string{} + for _, h := range escfg.Hosts { + esURL, err := common.MakeURL(escfg.Protocol, escfg.Path, h, 9200) + if err != nil { + return nil, fmt.Errorf("cannot generate ES URL from host %w", err) + } + hosts = append(hosts, esURL) + } + + otelYAMLCfg := map[string]any{ + "endpoints": hosts, // hosts, protocol, path, port + + // max_conns_per_host is a "hard" limit on number of open connections. + // Ideally, escfg.NumWorkers() should map to num_consumer, but we had a bug in upstream + // where it could spin as many goroutines as it liked. + // Given that batcher implementation can change and it has a history of such changes, + // let's keep max_conns_per_host setting for now and remove it once exporterhelper is stable. + "max_conns_per_host": escfg.NumWorkers(), + + // Retry + "retry": map[string]any{ + "enabled": true, + "initial_interval": escfg.Backoff.Init, // backoff.init + "max_interval": escfg.Backoff.Max, // backoff.max + "max_retries": escfg.MaxRetries, // max_retries + }, + + "sending_queue": map[string]any{ + "batch": map[string]any{ + "flush_timeout": "10s", + "max_size": escfg.BulkMaxSize, // bulk_max_size + "min_size": 0, // 0 means immediately trigger a flush + "sizer": "items", + }, + "enabled": true, + "queue_size": getQueueSize(logger, output), + "block_on_overflow": true, + "wait_for_result": true, + "num_consumers": escfg.NumWorkers(), + }, + + "mapping": map[string]any{ + "mode": "bodymap", + }, + } + + // Compression + otelYAMLCfg["compression"] = "none" + if escfg.CompressionLevel > 0 { + otelYAMLCfg["compression"] = "gzip" + otelYAMLCfg["compression_params"] = map[string]any{ + "level": escfg.CompressionLevel, + } + } + + // Authentication + setIfNotNil(otelYAMLCfg, "user", escfg.Username) // username + setIfNotNil(otelYAMLCfg, "password", escfg.Password) // password + setIfNotNil(otelYAMLCfg, "api_key", base64.StdEncoding.EncodeToString([]byte(escfg.APIKey))) // api_key + + setIfNotNil(otelYAMLCfg, "headers", escfg.Headers) // headers + setIfNotNil(otelYAMLCfg, "pipeline", escfg.Pipeline) // pipeline + // Dynamic routing is disabled if output.elasticsearch.index is set + setIfNotNil(otelYAMLCfg, "logs_index", escfg.Index) // index + + // idle_connection_timeout, timeout, ssl block, + // proxy_url, proxy_headers, proxy_disable are handled by beatsauthextension https://github.com/elastic/opentelemetry-collector-components/tree/main/extension/beatsauthextension + // caller of this method should take care of integrating the extension + + return otelYAMLCfg, nil +} + +// log warning for unsupported config +func checkUnsupportedConfig(cfg *config.C, logger *logp.Logger) error { + if cfg.HasField("indices") { + return fmt.Errorf("indices is currently not supported: %w", errors.ErrUnsupported) + } else if cfg.HasField("pipelines") { + return fmt.Errorf("pipelines is currently not supported: %w", errors.ErrUnsupported) + } else if cfg.HasField("parameters") { + return fmt.Errorf("parameters is currently not supported: %w", errors.ErrUnsupported) + } else if value, err := cfg.Bool("allow_older_versions", -1); err == nil && !value { + return fmt.Errorf("allow_older_versions:false is currently not supported: %w", errors.ErrUnsupported) + } else if cfg.HasField("loadbalance") { + return fmt.Errorf("loadbalance is currently not supported: %w", errors.ErrUnsupported) + } else if cfg.HasField("non_indexable_policy") { + return fmt.Errorf("non_indexable_policy is currently not supported: %w", errors.ErrUnsupported) + } else if cfg.HasField("escape_html") { + return fmt.Errorf("escape_html is currently not supported: %w", errors.ErrUnsupported) + } else if cfg.HasField("kerberos") { + return fmt.Errorf("kerberos is currently not supported: %w", errors.ErrUnsupported) + } + + return nil +} + +// Helper function to check if a struct is empty +func isStructEmpty(s any) bool { + return reflect.DeepEqual(s, reflect.Zero(reflect.TypeOf(s)).Interface()) +} + +// Helper function to conditionally add fields to the map +func setIfNotNil(m map[string]any, key string, value any) { + if value == nil { + return + } + + v := reflect.ValueOf(value) + + switch v.Kind() { + case reflect.String: + if v.String() != "" { + m[key] = value + } + case reflect.Map, reflect.Slice: + if v.Len() > 0 { + m[key] = value + } + case reflect.Struct: + if !isStructEmpty(value) { + m[key] = value + } + default: + m[key] = value + } +} + +func getQueueSize(logger *logp.Logger, output *config.C) int { + size, err := output.Int("queue.mem.events", -1) + if err != nil { + logger.Debugf("Failed to get queue size: %v", err) + return memqueue.DefaultEvents // return default queue.mem.events for sending_queue in case of an errr + } + return int(size) +} + +func cfgDecodeHookFunc() mapstructure.DecodeHookFunc { + return func( + f reflect.Type, + t reflect.Type, + data any, + ) (any, error) { + if f.Kind() != reflect.String { + return data, nil + } + + switch { + case t == reflect.TypeOf(time.Duration(5)): + d, err := time.ParseDuration(data.(string)) + if err != nil { + return d, fmt.Errorf("failed parsing duration: %w", err) + } else { + return d, nil + } + case t == reflect.TypeOf(tlscommon.TLSVerificationMode(0)): + verificationMode := tlscommon.TLSVerificationMode(0) + if err := verificationMode.Unpack(data); err != nil { + return nil, fmt.Errorf("failed parsing TLS verification mode: %w", err) + } + return verificationMode, nil + default: + return data, nil + } + } +} diff --git a/internal/pkg/otel/translate/output_elasticsearch_test.go b/internal/pkg/otel/translate/output_elasticsearch_test.go new file mode 100644 index 00000000000..68f5941c0f5 --- /dev/null +++ b/internal/pkg/otel/translate/output_elasticsearch_test.go @@ -0,0 +1,361 @@ +// Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one +// or more contributor license agreements. Licensed under the Elastic License 2.0; +// you may not use this file except in compliance with the Elastic License 2.0. + +package translate + +import ( + "bytes" + _ "embed" + "fmt" + "testing" + "text/template" + + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" + "go.opentelemetry.io/collector/confmap" + "gopkg.in/yaml.v2" + + "github.com/elastic/elastic-agent-libs/config" + "github.com/elastic/elastic-agent-libs/logp" + "github.com/elastic/elastic-agent-libs/logp/logptest" +) + +func TestToOtelConfig(t *testing.T) { + logger := logptest.NewTestingLogger(t, "") + + t.Run("basic config translation", func(t *testing.T) { + beatCfg := ` +hosts: + - localhost:9200 + - localhost:9300 +protocol: http +path: /foo/bar +username: elastic +password: changeme +index: "some-index" +pipeline: "some-ingest-pipeline" +backoff: + init: 42s + max: 420s +workers: 30 +headers: + X-Header-1: foo + X-Bar-Header: bar` + + OTelCfg := ` +endpoints: + - http://localhost:9200/foo/bar + - http://localhost:9300/foo/bar +logs_index: some-index +max_conns_per_host: 30 +password: changeme +pipeline: some-ingest-pipeline +retry: + enabled: true + initial_interval: 42s + max_interval: 7m0s + max_retries: 3 +sending_queue: + batch: + flush_timeout: 10s + max_size: 1600 + min_size: 0 + sizer: items + block_on_overflow: true + enabled: true + num_consumers: 30 + queue_size: 3200 + wait_for_result: true +user: elastic +headers: + X-Header-1: foo + X-Bar-Header: bar +mapping: + mode: bodymap +compression: gzip +compression_params: + level: 1 + ` + cfg := config.MustNewConfigFrom(beatCfg) + got, err := ToOTelConfig(cfg, logger) + require.NoError(t, err, "error translating elasticsearch output to ES exporter config") + expOutput := newFromYamlString(t, OTelCfg) + compareAndAssert(t, expOutput, confmap.NewFromStringMap(got)) + + }) + + t.Run("test api key is encoded before mapping to es-exporter", func(t *testing.T) { + beatCfg := ` +hosts: + - localhost:9200 +index: "some-index" +api_key: "TiNAGG4BaaMdaH1tRfuU:KnR6yE41RrSowb0kQ0HWoA" +` + + OTelCfg := ` +endpoints: + - http://localhost:9200 +logs_index: some-index +retry: + enabled: true + initial_interval: 1s + max_interval: 1m0s + max_retries: 3 +sending_queue: + batch: + flush_timeout: 10s + max_size: 1600 + min_size: 0 + sizer: items + block_on_overflow: true + enabled: true + num_consumers: 1 + queue_size: 3200 + wait_for_result: true +mapping: + mode: bodymap +max_conns_per_host: 1 +api_key: VGlOQUdHNEJhYU1kYUgxdFJmdVU6S25SNnlFNDFSclNvd2Iwa1EwSFdvQQ== +compression: gzip +compression_params: + level: 1 + ` + cfg := config.MustNewConfigFrom(beatCfg) + got, err := ToOTelConfig(cfg, logger) + require.NoError(t, err, "error translating elasticsearch output to ES exporter config ") + expOutput := newFromYamlString(t, OTelCfg) + compareAndAssert(t, expOutput, confmap.NewFromStringMap(got)) + + }) + + // when preset is configured, we only test worker, bulk_max_size + // idle_connection_timeout should be correctly configured on beatsauthextension + // es-exporter sets compression level to 1 by default + t.Run("check preset config translation", func(t *testing.T) { + commonBeatCfg := ` +hosts: + - localhost:9200 +index: "some-index" +username: elastic +password: changeme +preset: %s +` + + commonOTelCfg := ` +endpoints: + - http://localhost:9200 +retry: + enabled: true + initial_interval: 1s + max_interval: 1m0s + max_retries: 3 +logs_index: some-index +password: changeme +user: elastic +mapping: + mode: bodymap +compression: gzip +compression_params: + level: 1 +` + + tests := []struct { + presetName string + output string + }{ + { + presetName: "balanced", + output: commonOTelCfg + ` +max_conns_per_host: 1 +sending_queue: + batch: + flush_timeout: 10s + max_size: 1600 + min_size: 0 + sizer: items + block_on_overflow: true + enabled: true + num_consumers: 1 + queue_size: 3200 + wait_for_result: true + `, + }, + { + presetName: "throughput", + output: commonOTelCfg + ` +max_conns_per_host: 4 +sending_queue: + batch: + flush_timeout: 10s + max_size: 1600 + min_size: 0 + sizer: items + block_on_overflow: true + enabled: true + num_consumers: 4 + queue_size: 12800 + wait_for_result: true + `, + }, + { + presetName: "scale", + output: ` +endpoints: + - http://localhost:9200 +retry: + enabled: true + initial_interval: 5s + max_interval: 5m0s + max_retries: 3 +logs_index: some-index +password: changeme +user: elastic +max_conns_per_host: 1 +sending_queue: + batch: + flush_timeout: 10s + max_size: 1600 + min_size: 0 + sizer: items + block_on_overflow: true + enabled: true + num_consumers: 1 + queue_size: 3200 + wait_for_result: true +mapping: + mode: bodymap +compression: gzip +compression_params: + level: 1 + `, + }, + { + presetName: "latency", + output: commonOTelCfg + ` +max_conns_per_host: 1 +sending_queue: + batch: + flush_timeout: 10s + max_size: 50 + min_size: 0 + sizer: items + block_on_overflow: true + enabled: true + num_consumers: 1 + queue_size: 4100 + wait_for_result: true + `, + }, + { + presetName: "custom", + output: commonOTelCfg + ` +max_conns_per_host: 1 +sending_queue: + batch: + flush_timeout: 10s + max_size: 1600 + min_size: 0 + sizer: items + block_on_overflow: true + enabled: true + num_consumers: 1 + queue_size: 3200 + wait_for_result: true + `, + }, + } + + for _, test := range tests { + t.Run("config translation w/"+test.presetName, func(t *testing.T) { + cfg := config.MustNewConfigFrom(fmt.Sprintf(commonBeatCfg, test.presetName)) + got, err := ToOTelConfig(cfg, logger) + require.NoError(t, err, "error translating elasticsearch output to OTel ES exporter type") + expOutput := newFromYamlString(t, test.output) + compareAndAssert(t, expOutput, confmap.NewFromStringMap(got)) + }) + } + + }) + +} + +func TestCompressionConfig(t *testing.T) { + compressionConfig := ` +hosts: + - localhost:9200 + - localhost:9300 +protocol: http +path: /foo/bar +username: elastic +password: changeme +index: "some-index" +compression_level: %d` + + otelConfig := ` +endpoints: + - http://localhost:9200/foo/bar + - http://localhost:9300/foo/bar +logs_index: some-index +password: changeme +retry: + enabled: true + initial_interval: 1s + max_interval: 1m0s + max_retries: 3 +max_conns_per_host: 1 +user: elastic +sending_queue: + batch: + flush_timeout: 10s + max_size: 1600 + min_size: 0 + sizer: items + block_on_overflow: true + enabled: true + num_consumers: 1 + queue_size: 3200 + wait_for_result: true +mapping: + mode: bodymap +{{ if gt . 0 }} +compression: gzip +compression_params: + level: {{ . }} +{{ else }} +compression: none +{{ end }}` + + for level := range 9 { + t.Run(fmt.Sprintf("compression-level-%d", level), func(t *testing.T) { + cfg := config.MustNewConfigFrom(fmt.Sprintf(compressionConfig, level)) + got, err := ToOTelConfig(cfg, logp.NewNopLogger()) + require.NoError(t, err, "error translating elasticsearch output to ES exporter config") + var otelBuffer bytes.Buffer + require.NoError(t, template.Must(template.New("config").Parse(otelConfig)).Execute(&otelBuffer, level)) + expOutput := newFromYamlString(t, otelBuffer.String()) + compareAndAssert(t, expOutput, confmap.NewFromStringMap(got)) + }) + } + +} + +func newFromYamlString(t *testing.T, input string) *confmap.Conf { + t.Helper() + var rawConf map[string]any + err := yaml.Unmarshal([]byte(input), &rawConf) + require.NoError(t, err) + + return confmap.NewFromStringMap(rawConf) +} + +func compareAndAssert(t *testing.T, expectedOutput *confmap.Conf, gotOutput *confmap.Conf) { + t.Helper() + // convert it to a common type + want, err := yaml.Marshal(expectedOutput.ToStringMap()) + require.NoError(t, err) + got, err := yaml.Marshal(gotOutput.ToStringMap()) + require.NoError(t, err) + + assert.Equal(t, string(want), string(got)) +} diff --git a/testing/integration/ess/otel_test.go b/testing/integration/ess/otel_test.go index 09bea912d07..50b8e84ac19 100644 --- a/testing/integration/ess/otel_test.go +++ b/testing/integration/ess/otel_test.go @@ -2081,3 +2081,138 @@ service: cancel() } + +func TestOtelBeatsAuthExtensionInvalidCertificates(t *testing.T) { + info := define.Require(t, define.Requirements{ + Group: integration.Default, + Local: true, + OS: []define.OS{ + // {Type: define.Windows}, we don't support otel on Windows yet + {Type: define.Linux}, + {Type: define.Darwin}, + }, + Stack: &define.Stack{}, + }) + + // Create the otel configuration file + type otelConfigOptions struct { + ESEndpoint string + ESApiKey string + Index string + } + esEndpoint, err := integration.GetESHost() + require.NoError(t, err, "error getting elasticsearch endpoint") + esApiKey, err := createESApiKey(info.ESClient) + require.NoError(t, err, "error creating API key") + require.True(t, len(esApiKey.Encoded) > 1, "api key is invalid %q", esApiKey) + index := "logs-integration-" + info.Namespace + + fixture, err := define.NewFixtureFromLocalBuild(t, define.Version()) + require.NoError(t, err) + + ctx, cancel := testcontext.WithDeadline(t, t.Context(), time.Now().Add(5*time.Minute)) + defer cancel() + err = fixture.Prepare(ctx) + require.NoError(t, err) + + otelConfigTemplate := ` +extensions: + beatsauth: + continue_on_error: true + ssl: + enabled: true + verification_mode: none + certificate: /nonexistent.pem + key: /nonexistent.key + key_passphrase: null + key_passphrase_path: null + verification_mode: none +receivers: + metricbeatreceiver: + metricbeat: + modules: + - module: system + enabled: true + period: 1s + processes: + - '.*' + metricsets: + - cpu + output: + otelconsumer: + queue.mem.flush.timeout: 0s +exporters: + elasticsearch/log: + endpoints: + - {{.ESEndpoint}} + api_key: {{.ESApiKey}} + logs_index: {{.Index}} + batcher: + enabled: true + flush_timeout: 1s + min_size: 1 + auth: + authenticator: beatsauth + mapping: + mode: bodymap +service: + extensions: [beatsauth] + pipelines: + logs: + receivers: + - metricbeatreceiver + exporters: + - elasticsearch/log +` + var otelConfigBuffer bytes.Buffer + require.NoError(t, + template.Must(template.New("otelConfig").Parse(otelConfigTemplate)).Execute(&otelConfigBuffer, + otelConfigOptions{ + ESEndpoint: esEndpoint, + ESApiKey: esApiKey.Encoded, + Index: index, + })) + + // configure elastic-agent.yml + err = fixture.Configure(ctx, otelConfigBuffer.Bytes()) + + // prepare agent command + cmd, err := fixture.PrepareAgentCommand(ctx, nil) + require.NoError(t, err, "cannot prepare Elastic-Agent command: %w", err) + + output := strings.Builder{} + cmd.Stderr = &output + cmd.Stdout = &output + + // start elastic-agent + err = cmd.Start() + require.NoError(t, err) + + t.Cleanup(func() { + if t.Failed() { + t.Log("Elastic-Agent output:") + t.Log(output.String()) + } + }) + + require.EventuallyWithT(t, func(collect *assert.CollectT) { + var statusErr error + status, statusErr := fixture.ExecStatus(ctx) + assert.NoError(collect, statusErr) + require.NotNil(collect, status.Collector) + require.NotNil(collect, status.Collector.ComponentStatusMap) + + pipelines, exists := status.Collector.ComponentStatusMap["pipeline:logs"] + require.True(collect, exists) + + receiver, exists := pipelines.ComponentStatusMap["receiver:metricbeatreceiver"] + require.True(collect, exists) + require.EqualValues(collect, receiver.Status, cproto.State_HEALTHY) + + exporter, exists := pipelines.ComponentStatusMap["exporter:elasticsearch/log"] + require.True(collect, exists) + require.EqualValues(collect, exporter.Status, cproto.State_DEGRADED) + }, 2*time.Minute, 5*time.Second) + + cancel() +} From 78ba0f14f94d713b4944b63d62a1b00bf635cdd1 Mon Sep 17 00:00:00 2001 From: Panos Koutsovasilis Date: Tue, 14 Oct 2025 11:31:36 +0300 Subject: [PATCH 2/2] fix: resolve conflicts --- internal/pkg/otel/translate/otelconfig.go | 7 ++----- 1 file changed, 2 insertions(+), 5 deletions(-) diff --git a/internal/pkg/otel/translate/otelconfig.go b/internal/pkg/otel/translate/otelconfig.go index 46559066410..b12257910a0 100644 --- a/internal/pkg/otel/translate/otelconfig.go +++ b/internal/pkg/otel/translate/otelconfig.go @@ -11,14 +11,11 @@ import ( "slices" "strings" -<<<<<<< HEAD - "github.com/elastic/elastic-agent-libs/logp" - -======= "github.com/go-viper/mapstructure/v2" ->>>>>>> 0c0dada00 (feat: utilise continue_on_err in beatsauthextension (#10343)) koanfmaps "github.com/knadh/koanf/maps" + "github.com/elastic/elastic-agent-libs/logp" + componentmonitoring "github.com/elastic/elastic-agent/internal/pkg/agent/application/monitoring/component" otelcomponent "go.opentelemetry.io/collector/component"