From 2a975fc1a4dd3f548cb3156a624a5feab6e5ea47 Mon Sep 17 00:00:00 2001 From: Khushi Jain Date: Wed, 15 Oct 2025 19:42:34 +0530 Subject: [PATCH] Support proxy_url on otel translation logic (#10454) * Support proxy_url on otel translation logic * address comments (cherry picked from commit 0e533344c831c769d4ecf845978d7074d6149d43) # Conflicts: # internal/pkg/otel/configtranslate/otelconfig.go # internal/pkg/otel/translate/otelconfig_test.go # internal/pkg/otel/translate/output_elasticsearch.go --- .../pkg/otel/configtranslate/otelconfig.go | 58 + .../pkg/otel/translate/otelconfig_test.go | 1250 +++++++++++++++++ .../otel/translate/output_elasticsearch.go | 271 ++++ 3 files changed, 1579 insertions(+) create mode 100644 internal/pkg/otel/translate/otelconfig_test.go create mode 100644 internal/pkg/otel/translate/output_elasticsearch.go diff --git a/internal/pkg/otel/configtranslate/otelconfig.go b/internal/pkg/otel/configtranslate/otelconfig.go index 47242c63c8d..cd44810aa9f 100644 --- a/internal/pkg/otel/configtranslate/otelconfig.go +++ b/internal/pkg/otel/configtranslate/otelconfig.go @@ -411,3 +411,61 @@ func translateEsOutputToExporter(cfg *config.C) (map[string]any, error) { esConfig["mapping"] = map[string]any{"mode": "bodymap"} return esConfig, nil } +<<<<<<< HEAD:internal/pkg/otel/configtranslate/otelconfig.go +======= + +func BeatDataPath(componentId string) string { + return filepath.Join(paths.Run(), componentId) +} + +// getBeatsAuthExtensionConfig sets http transport settings on beatsauth +// currently this is only supported for elasticsearch output +func getBeatsAuthExtensionConfig(outputCfg *config.C) (map[string]any, error) { + defaultTransportSettings := elasticsearch.ESDefaultTransportSettings() + + var resultMap map[string]any + if err := outputCfg.Unpack(&resultMap); err != nil { + return nil, err + } + + decoder, err := mapstructure.NewDecoder(&mapstructure.DecoderConfig{ + Result: &defaultTransportSettings, + TagName: "config", + SquashTagOption: "inline", + DecodeHook: cfgDecodeHookFunc(), + }) + if err != nil { + return nil, err + } + + if err = decoder.Decode(&resultMap); err != nil { + return nil, err + } + + newConfig, err := config.NewConfigFrom(defaultTransportSettings) + if err != nil { + return nil, err + } + + // proxy_url on newConfig is of type url.URL. Beatsauth extension expects it to be of string type instead + // this logic here converts url.URL to string type similar to what a user would set on filebeat config + if defaultTransportSettings.Proxy.URL != nil { + err = newConfig.SetString("proxy_url", -1, defaultTransportSettings.Proxy.URL.String()) + if err != nil { + return nil, fmt.Errorf("error settingg proxy url:%w ", err) + } + } + + var newMap map[string]any + err = newConfig.Unpack(&newMap) + if err != nil { + return nil, err + } + + // required to make the extension not cause the collector to fail and exit + // on startup + newMap["continue_on_error"] = true + + return newMap, nil +} +>>>>>>> 0e533344c (Support proxy_url on otel translation logic (#10454)):internal/pkg/otel/translate/otelconfig.go diff --git a/internal/pkg/otel/translate/otelconfig_test.go b/internal/pkg/otel/translate/otelconfig_test.go new file mode 100644 index 00000000000..258ba3eda65 --- /dev/null +++ b/internal/pkg/otel/translate/otelconfig_test.go @@ -0,0 +1,1250 @@ +// Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one +// or more contributor license agreements. Licensed under the Elastic License 2.0; +// you may not use this file except in compliance with the Elastic License 2.0. + +package translate + +import ( + "fmt" + "path/filepath" + "testing" + "time" + + "github.com/stretchr/testify/require" + + "go.opentelemetry.io/collector/confmap" + + "github.com/elastic/elastic-agent-client/v7/pkg/client" + "github.com/elastic/elastic-agent-libs/logp" + "github.com/elastic/elastic-agent-libs/mapstr" + + "github.com/elastic/elastic-agent/internal/pkg/agent/application/info" + "github.com/elastic/elastic-agent/internal/pkg/agent/application/paths" + + "github.com/stretchr/testify/assert" + "go.opentelemetry.io/collector/pipeline" + + "github.com/elastic/elastic-agent/pkg/component" +) + +func TestBeatNameToDefaultDatastreamType(t *testing.T) { + tests := []struct { + beatName string + expectedType string + expectedError error + }{ + { + beatName: "filebeat", + expectedType: "logs", + }, + { + beatName: "metricbeat", + expectedType: "metrics", + }, + { + beatName: "cloudbeat", + expectedError: fmt.Errorf("input type not supported by Otel: "), + }, + } + + for _, tt := range tests { + t.Run(fmt.Sprintf("%v", tt.beatName), func(t *testing.T) { + comp := component.Component{ + InputSpec: &component.InputRuntimeSpec{ + BinaryName: "agentbeat", + Spec: component.InputSpec{ + Command: &component.CommandSpec{ + Args: []string{tt.beatName}, + }, + }, + }, + } + actualType, actualError := getDefaultDatastreamTypeForComponent(&comp) + assert.Equal(t, tt.expectedType, actualType) + + if tt.expectedError != nil { + assert.Error(t, actualError) + assert.EqualError(t, actualError, tt.expectedError.Error()) + } else { + assert.NoError(t, actualError) + } + }) + } +} + +func TestGetSignalForComponent(t *testing.T) { + tests := []struct { + name string + component component.Component + expectedSignal pipeline.Signal + expectedError error + }{ + { + name: "no input spec", + component: component.Component{InputType: "test"}, + expectedError: fmt.Errorf("unknown otel signal for input type: %s", "test"), + }, + { + name: "not agentbeat", + component: component.Component{ + InputType: "test", + InputSpec: &component.InputRuntimeSpec{ + BinaryName: "cloudbeat", + }, + }, + expectedError: fmt.Errorf("unknown otel signal for input type: %s", "test"), + }, + { + name: "filebeat", + component: component.Component{ + InputType: "filestream", + InputSpec: &component.InputRuntimeSpec{ + BinaryName: "agentbeat", + Spec: component.InputSpec{ + Command: &component.CommandSpec{ + Args: []string{"filebeat"}, + }, + }, + }, + }, + expectedSignal: pipeline.SignalLogs, + }, + { + name: "metricbeat", + component: component.Component{ + InputType: "filestream", + InputSpec: &component.InputRuntimeSpec{ + BinaryName: "agentbeat", + Spec: component.InputSpec{ + Command: &component.CommandSpec{ + Args: []string{"metricbeat"}, + }, + }, + }, + }, + expectedSignal: pipeline.SignalLogs, + }, + } + + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + actualSignal, actualError := getSignalForComponent(&tt.component) + assert.Equal(t, tt.expectedSignal, actualSignal) + + if tt.expectedError != nil { + assert.Error(t, actualError) + assert.EqualError(t, actualError, tt.expectedError.Error()) + } else { + assert.NoError(t, actualError) + } + }) + } +} + +func TestGetOtelConfig(t *testing.T) { + agentInfo := &info.AgentInfo{} + fileStreamConfig := map[string]any{ + "id": "test", + "use_output": "default", + "streams": []any{ + map[string]any{ + "id": "test-1", + "data_stream": map[string]any{ + "dataset": "generic-1", + }, + "paths": []any{ + "/var/log/*.log", + }, + }, + map[string]any{ + "id": "test-2", + "data_stream": map[string]any{ + "dataset": "generic-2", + }, + "paths": []any{ + "/var/log/*.log", + }, + }, + }, + } + beatMetricsConfig := map[string]any{ + "id": "test", + "use_output": "default", + "type": "beat/metrics", + "streams": []any{ + map[string]any{ + "id": "test-1", + "data_stream": map[string]any{ + "dataset": "generic-1", + }, + "hosts": "http://localhost:5066", + "metricsets": []interface{}{"stats"}, + "period": "60s", + }, + }, + } + systemMetricsConfig := map[string]any{ + "id": "test", + "use_output": "default", + "type": "system/metrics", + "streams": []any{ + map[string]any{ + "id": "test-1", + "data_stream": map[string]any{ + "dataset": "generic-1", + }, + "metricsets": map[string]any{ + "cpu": map[string]any{ + "data_stream.dataset": "system.cpu", + }, + "memory": map[string]any{ + "data_stream.dataset": "system.memory", + }, + "network": map[string]any{ + "data_stream.dataset": "system.network", + }, + "filesystem": map[string]any{ + "data_stream.dataset": "system.filesystem", + }, + }, + }, + }, + } + + type extraParams struct { + key string + value any + } + // pass ssl params as extra args to this method + esOutputConfig := func(extra ...extraParams) map[string]any { + finalOutput := map[string]any{ + "type": "elasticsearch", + "hosts": []any{"localhost:9200"}, + "username": "elastic", + "password": "password", + "preset": "balanced", + "queue.mem.events": 3200, + "ssl.enabled": true, + "proxy_url": "https://example.com", + } + + for _, v := range extra { + finalOutput[v.key] = v.value + } + return finalOutput + } + + expectedExtensionConfig := func(extra ...extraParams) map[string]any { + finalOutput := map[string]any{ + "continue_on_error": true, + "idle_connection_timeout": "3s", + "proxy_disable": false, + "proxy_url": "https://example.com", + "ssl": map[string]interface{}{ + "ca_sha256": []interface{}{}, + "ca_trusted_fingerprint": "", + "certificate": "", + "certificate_authorities": []interface{}{}, + "cipher_suites": []interface{}{}, + "curve_types": []interface{}{}, + "enabled": true, + "key": "", + "key_passphrase": "", + "key_passphrase_path": "", + "renegotiation": int64(0), + "supported_protocols": []interface{}{}, + "verification_mode": uint64(0), + }, + "timeout": "1m30s", + } + for _, v := range extra { + // accepts one level deep parameters to replace + if _, ok := v.value.(map[string]any); ok { + for newkey, newvalue := range v.value.(map[string]any) { + // this is brittle - it is expected that developers will pass expected params correctly here + finalOutput[v.key].(map[string]any)[newkey] = newvalue + } + continue + } + finalOutput[v.key] = v.value + } + return finalOutput + } + + expectedESConfig := func(outputName string) map[string]any { + return map[string]any{ + "compression": "gzip", + "compression_params": map[string]any{ + "level": 1, + }, + "mapping": map[string]any{ + "mode": "bodymap", + }, + "endpoints": []string{"http://localhost:9200"}, + "password": "password", + "user": "elastic", + "max_conns_per_host": 1, + "retry": map[string]any{ + "enabled": true, + "initial_interval": 1 * time.Second, + "max_interval": 1 * time.Minute, + "max_retries": 3, + }, + "sending_queue": map[string]any{ + "enabled": true, + "num_consumers": 1, + "queue_size": 3200, + "block_on_overflow": true, + "wait_for_result": true, + "batch": map[string]any{ + "flush_timeout": "10s", + "max_size": 1600, + "min_size": 0, + "sizer": "items", + }, + }, + "logs_dynamic_id": map[string]any{ + "enabled": true, + }, + "telemetry": map[string]any{ + "log_failed_docs_input": true, + }, + "auth": map[string]any{ + "authenticator": "beatsauth/_agent-component/" + outputName, + }, + } + } + + defaultProcessors := func(streamId, dataset string, namespace string) []any { + return []any{ + mapstr.M{ + "add_fields": mapstr.M{ + "fields": mapstr.M{ + "input_id": "test", + }, + "target": "@metadata", + }, + }, + mapstr.M{ + "add_fields": mapstr.M{ + "fields": mapstr.M{ + "dataset": dataset, + "namespace": "default", + "type": namespace, + }, + "target": "data_stream", + }, + }, + mapstr.M{ + "add_fields": mapstr.M{ + "fields": mapstr.M{ + "dataset": dataset, + }, + "target": "event", + }, + }, + mapstr.M{ + "add_fields": mapstr.M{ + "fields": mapstr.M{ + "stream_id": streamId, + }, + "target": "@metadata", + }, + }, + mapstr.M{ + "add_fields": mapstr.M{ + "fields": mapstr.M{ + "id": agentInfo.AgentID(), + "snapshot": agentInfo.Snapshot(), + "version": agentInfo.Version(), + }, + "target": "elastic_agent", + }, + }, + mapstr.M{ + "add_fields": mapstr.M{ + "fields": mapstr.M{ + "id": agentInfo.AgentID(), + }, + "target": "agent", + }, + }, + } + } + + // expects input id + expectedFilestreamConfig := func(id string) map[string]any { + return map[string]any{ + "filebeat": map[string]any{ + "inputs": []map[string]any{ + { + "id": "test-1", + "type": "filestream", + "data_stream": map[string]any{ + "dataset": "generic-1", + }, + "paths": []any{ + "/var/log/*.log", + }, + "index": "logs-generic-1-default", + "processors": defaultProcessors("test-1", "generic-1", "logs"), + }, + { + "id": "test-2", + "type": "filestream", + "data_stream": map[string]any{ + "dataset": "generic-2", + }, + "paths": []any{ + "/var/log/*.log", + }, + "index": "logs-generic-2-default", + "processors": defaultProcessors("test-2", "generic-2", "logs"), + }, + }, + }, + "output": map[string]any{ + "otelconsumer": map[string]any{}, + }, + "path": map[string]any{ + "data": filepath.Join(paths.Run(), id), + }, + "queue": map[string]any{ + "mem": map[string]any{ + "events": uint64(3200), + "flush": map[string]any{ + "min_events": uint64(1600), + "timeout": "10s", + }, + }, + }, + "logging": map[string]any{ + "with_fields": map[string]any{ + "component": map[string]any{ + "binary": "filebeat", + "dataset": "elastic_agent.filebeat", + "type": "filestream", + "id": id, + }, + "log": map[string]any{ + "source": id, + }, + }, + }, + "http": map[string]any{ + "enabled": true, + "host": "localhost", + }, + "management.otel.enabled": true, + } + } + + getBeatMonitoringConfig := func(_, _ string) map[string]any { + return map[string]any{ + "http": map[string]any{ + "enabled": true, + "host": "localhost", + }, + } + } + + tests := []struct { + name string + model *component.Model + expectedConfig *confmap.Conf + expectedError error + }{ + { + name: "no supported components", + model: &component.Model{ + Components: []component.Component{ + { + InputType: "test", + InputSpec: &component.InputRuntimeSpec{ + BinaryName: "cloudbeat", + }, + }, + }, + }, + }, + { + name: "filestream", + model: &component.Model{ + Components: []component.Component{ + { + ID: "filestream-default", + InputType: "filestream", + OutputType: "elasticsearch", + InputSpec: &component.InputRuntimeSpec{ + BinaryName: "agentbeat", + Spec: component.InputSpec{ + Command: &component.CommandSpec{ + Args: []string{"filebeat"}, + }, + }, + }, + Units: []component.Unit{ + { + ID: "filestream-unit", + Type: client.UnitTypeInput, + Config: component.MustExpectedConfig(fileStreamConfig), + }, + { + ID: "filestream-default", + Type: client.UnitTypeOutput, + Config: component.MustExpectedConfig(esOutputConfig()), + }, + }, + }, + }, + }, + expectedConfig: confmap.NewFromStringMap(map[string]any{ + "exporters": map[string]any{ + "elasticsearch/_agent-component/default": expectedESConfig("default"), + }, + "extensions": map[string]any{ + "beatsauth/_agent-component/default": expectedExtensionConfig(), + }, + "receivers": map[string]any{ + "filebeatreceiver/_agent-component/filestream-default": expectedFilestreamConfig("filestream-default"), + }, + "service": map[string]any{ + "extensions": []any{"beatsauth/_agent-component/default"}, + "pipelines": map[string]any{ + "logs/_agent-component/filestream-default": map[string][]string{ + "exporters": {"elasticsearch/_agent-component/default"}, + "receivers": {"filebeatreceiver/_agent-component/filestream-default"}, + }, + }, + }, + }), + }, + { + name: "multiple filestream inputs and output types", + model: &component.Model{ + Components: []component.Component{ + { + ID: "filestream-primaryOutput", + InputType: "filestream", + OutputType: "elasticsearch", + InputSpec: &component.InputRuntimeSpec{ + BinaryName: "agentbeat", + Spec: component.InputSpec{ + Command: &component.CommandSpec{ + Args: []string{"filebeat"}, + }, + }, + }, + Units: []component.Unit{ + { + ID: "filestream-unit", + Type: client.UnitTypeInput, + Config: component.MustExpectedConfig(fileStreamConfig), + }, + { + ID: "filestream-primaryOutput", + Type: client.UnitTypeOutput, + Config: component.MustExpectedConfig(esOutputConfig(extraParams{"ssl.verification_mode", "certificate"})), + }, + }, + }, + { + ID: "filestream-secondaryOutput", + InputType: "filestream", + OutputType: "elasticsearch", + InputSpec: &component.InputRuntimeSpec{ + BinaryName: "agentbeat", + Spec: component.InputSpec{ + Command: &component.CommandSpec{ + Args: []string{"filebeat"}, + }, + }, + }, + Units: []component.Unit{ + { + ID: "filestream-unit-2", + Type: client.UnitTypeInput, + Config: component.MustExpectedConfig(fileStreamConfig), + }, + { + ID: "filestream-secondaryOutput", + Type: client.UnitTypeOutput, + Config: component.MustExpectedConfig(esOutputConfig(extraParams{"ssl.ca_trusted_fingerprint", "b9a10bbe64ee9826abeda6546fc988c8bf798b41957c33d05db736716513dc9c"})), + }, + }, + }, + }, + }, + expectedConfig: confmap.NewFromStringMap(map[string]any{ + "exporters": map[string]any{ + "elasticsearch/_agent-component/primaryOutput": expectedESConfig("primaryOutput"), + "elasticsearch/_agent-component/secondaryOutput": expectedESConfig("secondaryOutput"), + }, + "extensions": map[string]any{ + "beatsauth/_agent-component/primaryOutput": expectedExtensionConfig(extraParams{"ssl", map[string]any{"verification_mode": uint64(2)}}), + "beatsauth/_agent-component/secondaryOutput": expectedExtensionConfig(extraParams{"ssl", map[string]any{"ca_trusted_fingerprint": "b9a10bbe64ee9826abeda6546fc988c8bf798b41957c33d05db736716513dc9c"}}), + }, + "receivers": map[string]any{ + "filebeatreceiver/_agent-component/filestream-primaryOutput": expectedFilestreamConfig("filestream-primaryOutput"), + "filebeatreceiver/_agent-component/filestream-secondaryOutput": expectedFilestreamConfig("filestream-secondaryOutput"), + }, + "service": map[string]any{ + "extensions": []any{"beatsauth/_agent-component/primaryOutput", "beatsauth/_agent-component/secondaryOutput"}, + "pipelines": map[string]any{ + "logs/_agent-component/filestream-primaryOutput": map[string][]string{ + "exporters": {"elasticsearch/_agent-component/primaryOutput"}, + "receivers": {"filebeatreceiver/_agent-component/filestream-primaryOutput"}, + }, + "logs/_agent-component/filestream-secondaryOutput": map[string][]string{ + "exporters": {"elasticsearch/_agent-component/secondaryOutput"}, + "receivers": {"filebeatreceiver/_agent-component/filestream-secondaryOutput"}, + }, + }, + }, + }), + }, + { + name: "multiple filestream inputs and one output", + model: &component.Model{ + Components: []component.Component{ + { + ID: "filestream1-default", + InputType: "filestream", + OutputType: "elasticsearch", + InputSpec: &component.InputRuntimeSpec{ + BinaryName: "agentbeat", + Spec: component.InputSpec{ + Command: &component.CommandSpec{ + Args: []string{"filebeat"}, + }, + }, + }, + Units: []component.Unit{ + { + ID: "filestream-unit", + Type: client.UnitTypeInput, + Config: component.MustExpectedConfig(fileStreamConfig), + }, + { + ID: "filestream-default", + Type: client.UnitTypeOutput, + Config: component.MustExpectedConfig(esOutputConfig()), + }, + }, + }, + { + ID: "filestream2-default", + InputType: "filestream", + OutputType: "elasticsearch", + InputSpec: &component.InputRuntimeSpec{ + BinaryName: "agentbeat", + Spec: component.InputSpec{ + Command: &component.CommandSpec{ + Args: []string{"filebeat"}, + }, + }, + }, + Units: []component.Unit{ + { + ID: "filestream-unit", + Type: client.UnitTypeInput, + Config: component.MustExpectedConfig(fileStreamConfig), + }, + { + ID: "filestream-default", + Type: client.UnitTypeOutput, + Config: component.MustExpectedConfig(esOutputConfig()), + }, + }, + }, + }, + }, + expectedConfig: confmap.NewFromStringMap(map[string]any{ + "exporters": map[string]any{ + "elasticsearch/_agent-component/default": expectedESConfig("default"), + }, + "extensions": map[string]any{ + "beatsauth/_agent-component/default": expectedExtensionConfig(), + }, + "receivers": map[string]any{ + "filebeatreceiver/_agent-component/filestream1-default": expectedFilestreamConfig("filestream1-default"), + "filebeatreceiver/_agent-component/filestream2-default": expectedFilestreamConfig("filestream2-default"), + }, + "service": map[string]any{ + "extensions": []any{"beatsauth/_agent-component/default"}, + "pipelines": map[string]any{ + "logs/_agent-component/filestream1-default": map[string][]string{ + "exporters": {"elasticsearch/_agent-component/default"}, + "receivers": {"filebeatreceiver/_agent-component/filestream1-default"}, + }, + "logs/_agent-component/filestream2-default": map[string][]string{ + "exporters": {"elasticsearch/_agent-component/default"}, + "receivers": {"filebeatreceiver/_agent-component/filestream2-default"}, + }, + }, + }, + }), + }, + { + name: "beat/metrics", + model: &component.Model{ + Components: []component.Component{ + { + ID: "beat-metrics-monitoring", + InputType: "beat/metrics", + OutputType: "elasticsearch", + InputSpec: &component.InputRuntimeSpec{ + BinaryName: "agentbeat", + Spec: component.InputSpec{ + Command: &component.CommandSpec{ + Args: []string{"metricbeat"}, + }, + }, + }, + Units: []component.Unit{ + { + ID: "beat/metrics-monitoring", + Type: client.UnitTypeInput, + Config: component.MustExpectedConfig(beatMetricsConfig), + }, + { + ID: "beat/metrics-default", + Type: client.UnitTypeOutput, + Config: component.MustExpectedConfig(esOutputConfig()), + }, + }, + }, + }, + }, + expectedConfig: confmap.NewFromStringMap(map[string]any{ + "exporters": map[string]any{ + "elasticsearch/_agent-component/default": expectedESConfig("default"), + }, + "extensions": map[string]any{ + "beatsauth/_agent-component/default": expectedExtensionConfig(), + }, + "receivers": map[string]any{ + "metricbeatreceiver/_agent-component/beat-metrics-monitoring": map[string]any{ + "metricbeat": map[string]any{ + "modules": []map[string]any{ + { + "data_stream": map[string]any{"dataset": "generic-1"}, + "hosts": "http://localhost:5066", + "id": "test-1", + "index": "metrics-generic-1-default", + "metricsets": []interface{}{"stats"}, + "period": "60s", + "processors": defaultProcessors("test-1", "generic-1", "metrics"), + "module": "beat", + }, + }, + }, + "output": map[string]any{ + "otelconsumer": map[string]any{}, + }, + "path": map[string]any{ + "data": filepath.Join(paths.Run(), "beat-metrics-monitoring"), + }, + "queue": map[string]any{ + "mem": map[string]any{ + "events": uint64(3200), + "flush": map[string]any{ + "min_events": uint64(1600), + "timeout": "10s", + }, + }, + }, + "logging": map[string]any{ + "with_fields": map[string]any{ + "component": map[string]any{ + "binary": "metricbeat", + "dataset": "elastic_agent.metricbeat", + "type": "beat/metrics", + "id": "beat-metrics-monitoring", + }, + "log": map[string]any{ + "source": "beat-metrics-monitoring", + }, + }, + }, + "http": map[string]any{ + "enabled": true, + "host": "localhost", + }, + "management.otel.enabled": true, + }, + }, + "service": map[string]any{ + "extensions": []any{"beatsauth/_agent-component/default"}, + "pipelines": map[string]any{ + "logs/_agent-component/beat-metrics-monitoring": map[string][]string{ + "exporters": {"elasticsearch/_agent-component/default"}, + "receivers": {"metricbeatreceiver/_agent-component/beat-metrics-monitoring"}, + }, + }, + }, + }), + }, + { + name: "system/metrics", + model: &component.Model{ + Components: []component.Component{ + { + ID: "system-metrics", + InputType: "system/metrics", + OutputType: "elasticsearch", + InputSpec: &component.InputRuntimeSpec{ + BinaryName: "agentbeat", + Spec: component.InputSpec{ + Command: &component.CommandSpec{ + Args: []string{"metricbeat"}, + }, + }, + }, + Units: []component.Unit{ + { + ID: "system/metrics", + Type: client.UnitTypeInput, + Config: component.MustExpectedConfig(systemMetricsConfig), + }, + { + ID: "system/metrics-default", + Type: client.UnitTypeOutput, + Config: component.MustExpectedConfig(esOutputConfig()), + }, + }, + }, + }, + }, + expectedConfig: confmap.NewFromStringMap(map[string]any{ + "exporters": map[string]any{ + "elasticsearch/_agent-component/default": expectedESConfig("default"), + }, + "extensions": map[string]any{ + "beatsauth/_agent-component/default": expectedExtensionConfig(), + }, + "receivers": map[string]any{ + "metricbeatreceiver/_agent-component/system-metrics": map[string]any{ + "metricbeat": map[string]any{ + "modules": []map[string]any{ + { + "module": "system", + "data_stream": map[string]any{"dataset": "generic-1"}, + "id": "test-1", + "index": "metrics-generic-1-default", + "metricsets": map[string]any{ + "cpu": map[string]any{ + "data_stream.dataset": "system.cpu", + }, + "memory": map[string]any{ + "data_stream.dataset": "system.memory", + }, + "network": map[string]any{ + "data_stream.dataset": "system.network", + }, + "filesystem": map[string]any{ + "data_stream.dataset": "system.filesystem", + }, + }, + "processors": defaultProcessors("test-1", "generic-1", "metrics"), + }, + }, + }, + "output": map[string]any{ + "otelconsumer": map[string]any{}, + }, + "path": map[string]any{ + "data": filepath.Join(paths.Run(), "system-metrics"), + }, + "queue": map[string]any{ + "mem": map[string]any{ + "events": uint64(3200), + "flush": map[string]any{ + "min_events": uint64(1600), + "timeout": "10s", + }, + }, + }, + "logging": map[string]any{ + "with_fields": map[string]any{ + "component": map[string]any{ + "binary": "metricbeat", + "dataset": "elastic_agent.metricbeat", + "type": "system/metrics", + "id": "system-metrics", + }, + "log": map[string]any{ + "source": "system-metrics", + }, + }, + }, + "http": map[string]any{ + "enabled": true, + "host": "localhost", + }, + "management.otel.enabled": true, + }, + }, + "service": map[string]any{ + "extensions": []any{"beatsauth/_agent-component/default"}, + "pipelines": map[string]any{ + "logs/_agent-component/system-metrics": map[string][]string{ + "exporters": {"elasticsearch/_agent-component/default"}, + "receivers": {"metricbeatreceiver/_agent-component/system-metrics"}, + }, + }, + }, + }), + }, + } + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + actualConf, actualError := GetOtelConfig(tt.model, agentInfo, getBeatMonitoringConfig, logp.NewNopLogger()) + if actualConf == nil || tt.expectedConfig == nil { + assert.Equal(t, tt.expectedConfig, actualConf) + } else { // this gives a nicer diff + assert.Equal(t, tt.expectedConfig.ToStringMap(), actualConf.ToStringMap()) + } + + if tt.expectedError != nil { + assert.Error(t, actualError) + assert.EqualError(t, actualError, tt.expectedError.Error()) + } else { + assert.NoError(t, actualError) + } + }) + } +} + +func TestGetReceiversConfigForComponent(t *testing.T) { + testAgentInfo := &info.AgentInfo{} + mockBeatMonitoringConfigGetter := func(componentID, beatName string) map[string]any { + return nil // Behavior when self-monitoring is disabled + } + + customBeatMonitoringConfigGetter := func(componentID, beatName string) map[string]any { + return map[string]any{ + "http": map[string]any{ + "enabled": true, + "host": "custom-host:5067", + "port": 5067, + }, + } + } + + // Create proper component configurations that match existing test patterns + filebeatComponent := &component.Component{ + ID: "filebeat-test-id", + InputType: "filestream", + InputSpec: &component.InputRuntimeSpec{ + BinaryName: "agentbeat", + Spec: component.InputSpec{ + Name: "filestream", + Command: &component.CommandSpec{ + Args: []string{"filebeat"}, + }, + }, + }, + Units: []component.Unit{ + { + ID: "filebeat-test-id-unit", + Type: client.UnitTypeInput, + Config: component.MustExpectedConfig(map[string]any{ + "id": "test", + "use_output": "default", + "streams": []any{ + map[string]any{ + "id": "test-1", + "data_stream": map[string]any{ + "dataset": "generic-1", + }, + "paths": []any{ + "/var/log/*.log", + }, + }, + }, + }), + }, + }, + } + + metricbeatComponent := &component.Component{ + ID: "metricbeat-test-id", + InputType: "system/metrics", + InputSpec: &component.InputRuntimeSpec{ + BinaryName: "agentbeat", + Spec: component.InputSpec{ + Name: "system/metrics", + Command: &component.CommandSpec{ + Args: []string{"metricbeat"}, + }, + }, + }, + Units: []component.Unit{ + { + ID: "metricbeat-test-id-unit", + Type: client.UnitTypeInput, + Config: component.MustExpectedConfig(map[string]any{ + "id": "test", + "use_output": "default", + "type": "system/metrics", + "streams": []any{ + map[string]any{ + "id": "test-1", + "data_stream": map[string]any{ + "dataset": "generic-1", + }, + "metricsets": map[string]any{ + "cpu": map[string]any{ + "data_stream.dataset": "system.cpu", + }, + }, + }, + }, + }), + }, + }, + } + + tests := []struct { + name string + component *component.Component + outputQueueConfig map[string]any + beatMonitoringConfigGetter BeatMonitoringConfigGetter + expectedError string + expectedReceiverType string + expectedBeatName string + }{ + { + name: "filebeat component with default monitoring", + component: filebeatComponent, + outputQueueConfig: nil, + beatMonitoringConfigGetter: mockBeatMonitoringConfigGetter, + expectedReceiverType: "filebeatreceiver", + expectedBeatName: "filebeat", + }, + { + name: "metricbeat component with custom monitoring and queue config", + component: metricbeatComponent, + outputQueueConfig: map[string]any{ + "type": "memory", + "size": 1000, + }, + beatMonitoringConfigGetter: customBeatMonitoringConfigGetter, + expectedReceiverType: "metricbeatreceiver", + expectedBeatName: "metricbeat", + }, + { + name: "component with no input units", + component: &component.Component{ + ID: "no-inputs-test-id", + InputType: "filestream", + InputSpec: &component.InputRuntimeSpec{ + BinaryName: "agentbeat", + Spec: component.InputSpec{ + Name: "filestream", + Command: &component.CommandSpec{ + Args: []string{"filebeat"}, + }, + }, + }, + Units: []component.Unit{ + { + ID: "output-unit", + Type: client.UnitTypeOutput, + Config: component.MustExpectedConfig(map[string]any{ + "type": "elasticsearch", + }), + }, + }, + }, + outputQueueConfig: nil, + beatMonitoringConfigGetter: mockBeatMonitoringConfigGetter, + expectedReceiverType: "filebeatreceiver", + expectedBeatName: "filebeat", + }, + { + name: "unsupported component type", + component: &component.Component{ + ID: "unsupported-test-id", + InputType: "unsupported", + }, + outputQueueConfig: nil, + beatMonitoringConfigGetter: mockBeatMonitoringConfigGetter, + expectedError: "unknown otel receiver type for input type: unsupported", + }, + } + + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + result, err := getReceiversConfigForComponent( + tt.component, + testAgentInfo, + tt.outputQueueConfig, + tt.beatMonitoringConfigGetter, + ) + + if tt.expectedError != "" { + assert.Error(t, err) + assert.ErrorContains(t, err, tt.expectedError) + assert.Nil(t, result) + return + } + + require.NoError(t, err) + assert.NotNil(t, result) + + // Verify the receiver ID is present + receiverID := fmt.Sprintf("%s/_agent-component/%s", tt.expectedReceiverType, tt.component.ID) + assert.Contains(t, result, receiverID) + + receiverConfig, ok := result[receiverID].(map[string]any) + assert.True(t, ok, "receiver config should be a map") + + // Verify configuration section presence + assert.Contains(t, receiverConfig, "output", "output config should be present") + assert.Contains(t, receiverConfig, "path", "path config should be present") + assert.Contains(t, receiverConfig, "logging", "logging config should be present") + assert.Contains(t, receiverConfig, tt.expectedBeatName, fmt.Sprintf("%s config should be present", tt.expectedBeatName)) + + // Verify queue configuration presence + if tt.outputQueueConfig != nil { + assert.Contains(t, receiverConfig, "queue", "queue config should be present") + } else { + assert.NotContains(t, receiverConfig, "queue", "queue config should not be present") + } + + // Verify monitoring configuration is present (http section should exist) + assert.Contains(t, receiverConfig, "http", "http monitoring config should be present") + expectedMonitoringConfig := tt.beatMonitoringConfigGetter(tt.component.ID, tt.component.InputSpec.BinaryName) + // If the monitoring getter is not nil, verify the http section is the same + if expectedMonitoringConfig != nil { + assert.Equal(t, expectedMonitoringConfig["http"], receiverConfig["http"]) + } + }) + } +} + +func TestVerifyComponentIsOtelSupported(t *testing.T) { + tests := []struct { + name string + component *component.Component + expectedError string + }{ + { + name: "supported component", + component: &component.Component{ + ID: "supported-comp", + InputType: "filestream", + OutputType: "elasticsearch", + InputSpec: &component.InputRuntimeSpec{ + BinaryName: "agentbeat", + Spec: component.InputSpec{ + Command: &component.CommandSpec{ + Args: []string{"filebeat"}, + }, + }, + }, + Units: []component.Unit{ + { + ID: "filestream-unit", + Type: client.UnitTypeInput, + Config: component.MustExpectedConfig(map[string]any{ + "streams": []any{ + map[string]any{ + "paths": []any{"/var/log/*.log"}, + }, + }, + }), + }, + { + ID: "filestream-default", + Type: client.UnitTypeOutput, + Config: component.MustExpectedConfig(map[string]any{ + "type": "elasticsearch", + "hosts": []any{"localhost:9200"}, + }), + }, + }, + }, + }, + { + name: "unsupported output type - kafka", + component: &component.Component{ + ID: "unsupported-output", + InputType: "filestream", + OutputType: "kafka", // unsupported + }, + expectedError: "unsupported output type: kafka", + }, + { + name: "unsupported output type - logstash", + component: &component.Component{ + ID: "unsupported-output", + InputType: "filestream", + OutputType: "logstash", // unsupported + }, + expectedError: "unsupported output type: logstash", + }, + { + name: "unsupported input type", + component: &component.Component{ + ID: "unsupported-input", + InputType: "log", // unsupported + OutputType: "elasticsearch", + }, + expectedError: "unsupported input type: log", + }, + { + name: "unsupported configuration", + component: &component.Component{ + ID: "unsupported-config", + InputType: "filestream", + OutputType: "elasticsearch", + InputSpec: &component.InputRuntimeSpec{ + BinaryName: "agentbeat", + Spec: component.InputSpec{ + Command: &component.CommandSpec{ + Args: []string{"filebeat"}, + }, + }, + }, + Units: []component.Unit{ + { + ID: "filestream-unit", + Type: client.UnitTypeInput, + Config: component.MustExpectedConfig(map[string]any{ + "streams": []any{ + map[string]any{ + "paths": []any{"/var/log/*.log"}, + }, + }, + }), + }, + { + ID: "filestream-default", + Type: client.UnitTypeOutput, + Config: component.MustExpectedConfig(map[string]any{ + "type": "elasticsearch", + "hosts": []any{"localhost:9200"}, + "indices": []any{}, + }), + }, + }, + }, + expectedError: "unsupported configuration for unsupported-config: error translating config for output: default, unit: filestream-default, error: indices is currently not supported: unsupported operation", + }, + } + + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + err := VerifyComponentIsOtelSupported(tt.component) + if tt.expectedError != "" { + require.Error(t, err) + assert.Equal(t, err.Error(), tt.expectedError) + } else { + require.NoError(t, err) + } + }) + } +} diff --git a/internal/pkg/otel/translate/output_elasticsearch.go b/internal/pkg/otel/translate/output_elasticsearch.go new file mode 100644 index 00000000000..4b3c1df0020 --- /dev/null +++ b/internal/pkg/otel/translate/output_elasticsearch.go @@ -0,0 +1,271 @@ +// Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one +// or more contributor license agreements. Licensed under the Elastic License 2.0; +// you may not use this file except in compliance with the Elastic License 2.0. + +package translate + +import ( + "encoding/base64" + "errors" + "fmt" + "net/url" + "reflect" + "strings" + "time" + + "github.com/go-viper/mapstructure/v2" + + "github.com/elastic/beats/v7/libbeat/common" + "github.com/elastic/beats/v7/libbeat/outputs" + "github.com/elastic/beats/v7/libbeat/outputs/elasticsearch" + "github.com/elastic/beats/v7/libbeat/publisher/queue/memqueue" + "github.com/elastic/elastic-agent-libs/config" + "github.com/elastic/elastic-agent-libs/logp" + "github.com/elastic/elastic-agent-libs/transport/httpcommon" + "github.com/elastic/elastic-agent-libs/transport/tlscommon" +) + +type esToOTelOptions struct { + elasticsearch.ElasticsearchConfig `config:",inline"` + outputs.HostWorkerCfg `config:",inline"` + + Index string `config:"index"` + Pipeline string `config:"pipeline"` + Preset string `config:"preset"` +} + +var defaultOptions = esToOTelOptions{ + ElasticsearchConfig: elasticsearch.DefaultConfig(), + + Index: "", // Dynamic routing is disabled if index is set + Pipeline: "", + Preset: "custom", // default is custom if not set + HostWorkerCfg: outputs.HostWorkerCfg{ + Workers: 1, + }, +} + +// ToOTelConfig converts a Beat config into OTel elasticsearch exporter config +// Ensure cloudid is handled before calling this method +// Note: This method may override output queue settings defined by user. +func ToOTelConfig(output *config.C, logger *logp.Logger) (map[string]any, error) { + escfg := defaultOptions + + // check for unsupported config + err := checkUnsupportedConfig(output, logger) + if err != nil { + return nil, err + } + + // apply preset here + // It is important to apply preset before unpacking the config, as preset can override output fields + preset, err := output.String("preset", -1) + if err == nil { + // Performance preset is present, apply it and log any fields that + // were overridden + overriddenFields, presetConfig, err := elasticsearch.ApplyPreset(preset, output) + if err != nil { + return nil, err + } + logger.Infof("Applying performance preset '%v': %v", + preset, config.DebugString(presetConfig, false)) + logger.Warnf("Performance preset '%v' overrides user setting for field(s): %s", + preset, strings.Join(overriddenFields, ",")) + } + + unpackedMap := make(map[string]any) + // unpack and validate ES config + if err := output.Unpack(&unpackedMap); err != nil { + return nil, fmt.Errorf("failed unpacking config. %w", err) + } + + decoder, err := mapstructure.NewDecoder(&mapstructure.DecoderConfig{ + Result: &escfg, + TagName: "config", + SquashTagOption: "inline", + DecodeHook: cfgDecodeHookFunc(), + }) + if err != nil { + return nil, fmt.Errorf("failed creating decoder. %w", err) + } + + err = decoder.Decode(&unpackedMap) + if err != nil { + return nil, fmt.Errorf("failed decoding config. %w", err) + } + + if err := escfg.Validate(); err != nil { + return nil, err + } + + // Create url using host name, protocol and path + hosts := []string{} + for _, h := range escfg.Hosts { + esURL, err := common.MakeURL(escfg.Protocol, escfg.Path, h, 9200) + if err != nil { + return nil, fmt.Errorf("cannot generate ES URL from host %w", err) + } + hosts = append(hosts, esURL) + } + + otelYAMLCfg := map[string]any{ + "endpoints": hosts, // hosts, protocol, path, port + + // max_conns_per_host is a "hard" limit on number of open connections. + // Ideally, escfg.NumWorkers() should map to num_consumer, but we had a bug in upstream + // where it could spin as many goroutines as it liked. + // Given that batcher implementation can change and it has a history of such changes, + // let's keep max_conns_per_host setting for now and remove it once exporterhelper is stable. + "max_conns_per_host": escfg.NumWorkers(), + + // Retry + "retry": map[string]any{ + "enabled": true, + "initial_interval": escfg.Backoff.Init, // backoff.init + "max_interval": escfg.Backoff.Max, // backoff.max + "max_retries": escfg.MaxRetries, // max_retries + }, + + "sending_queue": map[string]any{ + "batch": map[string]any{ + "flush_timeout": "10s", + "max_size": escfg.BulkMaxSize, // bulk_max_size + "min_size": 0, // 0 means immediately trigger a flush + "sizer": "items", + }, + "enabled": true, + "queue_size": getQueueSize(logger, output), + "block_on_overflow": true, + "wait_for_result": true, + "num_consumers": escfg.NumWorkers(), + }, + + "mapping": map[string]any{ + "mode": "bodymap", + }, + } + + // Compression + otelYAMLCfg["compression"] = "none" + if escfg.CompressionLevel > 0 { + otelYAMLCfg["compression"] = "gzip" + otelYAMLCfg["compression_params"] = map[string]any{ + "level": escfg.CompressionLevel, + } + } + + // Authentication + setIfNotNil(otelYAMLCfg, "user", escfg.Username) // username + setIfNotNil(otelYAMLCfg, "password", escfg.Password) // password + setIfNotNil(otelYAMLCfg, "api_key", base64.StdEncoding.EncodeToString([]byte(escfg.APIKey))) // api_key + + setIfNotNil(otelYAMLCfg, "headers", escfg.Headers) // headers + setIfNotNil(otelYAMLCfg, "pipeline", escfg.Pipeline) // pipeline + // Dynamic routing is disabled if output.elasticsearch.index is set + setIfNotNil(otelYAMLCfg, "logs_index", escfg.Index) // index + + // idle_connection_timeout, timeout, ssl block, + // proxy_url, proxy_headers, proxy_disable are handled by beatsauthextension https://github.com/elastic/opentelemetry-collector-components/tree/main/extension/beatsauthextension + // caller of this method should take care of integrating the extension + + return otelYAMLCfg, nil +} + +// log warning for unsupported config +func checkUnsupportedConfig(cfg *config.C, logger *logp.Logger) error { + if cfg.HasField("indices") { + return fmt.Errorf("indices is currently not supported: %w", errors.ErrUnsupported) + } else if cfg.HasField("pipelines") { + return fmt.Errorf("pipelines is currently not supported: %w", errors.ErrUnsupported) + } else if cfg.HasField("parameters") { + return fmt.Errorf("parameters is currently not supported: %w", errors.ErrUnsupported) + } else if value, err := cfg.Bool("allow_older_versions", -1); err == nil && !value { + return fmt.Errorf("allow_older_versions:false is currently not supported: %w", errors.ErrUnsupported) + } else if cfg.HasField("loadbalance") { + return fmt.Errorf("loadbalance is currently not supported: %w", errors.ErrUnsupported) + } else if cfg.HasField("non_indexable_policy") { + return fmt.Errorf("non_indexable_policy is currently not supported: %w", errors.ErrUnsupported) + } else if cfg.HasField("escape_html") { + return fmt.Errorf("escape_html is currently not supported: %w", errors.ErrUnsupported) + } else if cfg.HasField("kerberos") { + return fmt.Errorf("kerberos is currently not supported: %w", errors.ErrUnsupported) + } + + return nil +} + +// Helper function to check if a struct is empty +func isStructEmpty(s any) bool { + return reflect.DeepEqual(s, reflect.Zero(reflect.TypeOf(s)).Interface()) +} + +// Helper function to conditionally add fields to the map +func setIfNotNil(m map[string]any, key string, value any) { + if value == nil { + return + } + + v := reflect.ValueOf(value) + + switch v.Kind() { + case reflect.String: + if v.String() != "" { + m[key] = value + } + case reflect.Map, reflect.Slice: + if v.Len() > 0 { + m[key] = value + } + case reflect.Struct: + if !isStructEmpty(value) { + m[key] = value + } + default: + m[key] = value + } +} + +func getQueueSize(logger *logp.Logger, output *config.C) int { + size, err := output.Int("queue.mem.events", -1) + if err != nil { + logger.Debugf("Failed to get queue size: %v", err) + return memqueue.DefaultEvents // return default queue.mem.events for sending_queue in case of an errr + } + return int(size) +} + +func cfgDecodeHookFunc() mapstructure.DecodeHookFunc { + return func( + f reflect.Type, + t reflect.Type, + data any, + ) (any, error) { + if f.Kind() != reflect.String { + return data, nil + } + + switch { + case t == reflect.TypeOf(time.Duration(5)): + d, err := time.ParseDuration(data.(string)) + if err != nil { + return d, fmt.Errorf("failed parsing duration: %w", err) + } else { + return d, nil + } + case t == reflect.TypeOf(tlscommon.TLSVerificationMode(0)): + verificationMode := tlscommon.TLSVerificationMode(0) + if err := verificationMode.Unpack(data); err != nil { + return nil, fmt.Errorf("failed parsing TLS verification mode: %w", err) + } + return verificationMode, nil + case t == reflect.TypeOf(httpcommon.ProxyURI(url.URL{})): + proxyURL := httpcommon.ProxyURI(url.URL{}) + if err := proxyURL.Unpack(data.(string)); err != nil { + return nil, fmt.Errorf("failed parsing proxy_url: %w", err) + } + return proxyURL, nil + default: + return data, nil + } + } +}