diff --git a/internal/test/integration/configs/obi-config-go-otel-grpc.yml b/internal/test/integration/configs/obi-config-go-otel-grpc.yml index 28336efcb..ac5d6a366 100644 --- a/internal/test/integration/configs/obi-config-go-otel-grpc.yml +++ b/internal/test/integration/configs/obi-config-go-otel-grpc.yml @@ -2,11 +2,12 @@ routes: unmatched: path prometheus_export: port: 8999 - features: - - application - - application_process otel_traces_export: endpoint: http://jaeger:4318 +meter_provider: + obi_features: + - application + - application_process attributes: kubernetes: cluster_name: obi-k8s-test-cluster diff --git a/internal/test/integration/k8s/manifests/06-obi-netolly-multizone.yml b/internal/test/integration/k8s/manifests/06-obi-netolly-multizone.yml index 8bcc6351a..7c4538ecc 100644 --- a/internal/test/integration/k8s/manifests/06-obi-netolly-multizone.yml +++ b/internal/test/integration/k8s/manifests/06-obi-netolly-multizone.yml @@ -5,9 +5,10 @@ metadata: data: obi-config.yml: | log_level: debug + meter_provider: + obi_features: ["network", "network_inter_zone"] otel_metrics_export: endpoint: http://otelcol.default:4317 - features: ["network", "network_inter_zone"] network: protocols: - TCP diff --git a/pkg/appolly/instrumenter.go b/pkg/appolly/instrumenter.go index b14b26f1b..88fecec90 100644 --- a/pkg/appolly/instrumenter.go +++ b/pkg/appolly/instrumenter.go @@ -124,14 +124,13 @@ func newGraphBuilder( // some nodes (ipNodesFilter, span name limiter...) are only passed to the metrics export nodes. // Nodes directly handling raw traces will still get the unfiltered exportableSpans queue. // If no metrics exporter is configured, we will not start the metrics subpipeline to save resources. - exportingMetrics := config.Metrics.Enabled() || - config.Metrics.ServiceGraphMetricsEnabled() || - config.Prometheus.Enabled() + exportingMetrics := config.MeterProvider.Features.AnyAppO11yMetric() && + (config.Metrics.EndpointEnabled() || config.Prometheus.EndpointEnabled()) if exportingMetrics { setupMetricsSubPipeline(config, ctxInfo, swi, exportableSpans, selectorCfg, processEventsCh) } - swi.Add(prom.BPFMetrics(ctxInfo, &config.Prometheus), + swi.Add(prom.BPFMetrics(ctxInfo, &config.Prometheus, &config.MeterProvider), swarm.WithID("BPFMetrics")) // The returned builder later invokes its "Build" function that, given @@ -155,9 +154,10 @@ func setupMetricsSubPipeline( spanNameAggregatedMetrics := newQueue("spanNameAggregatedMetrics") swi.Add(transform.SpanNameLimiter(transform.SpanNameLimiterConfig{ - Limit: config.Attributes.MetricSpanNameAggregationLimit, - OTEL: &config.Metrics, - Prom: &config.Prometheus, + Limit: config.Attributes.MetricSpanNameAggregationLimit, + OTEL: &config.Metrics, + Prom: &config.Prometheus, + MeterProvider: &config.MeterProvider, }, exportableSpans, spanNameAggregatedMetrics)) unresolvedCfg := request.UnresolvedNames{ @@ -169,6 +169,7 @@ func setupMetricsSubPipeline( swi.Add(otel.ReportMetrics( ctxInfo, &config.Metrics, + &config.MeterProvider, selectorCfg, unresolvedCfg, spanNameAggregatedMetrics, @@ -178,6 +179,7 @@ func setupMetricsSubPipeline( swi.Add(otel.ReportSvcGraphMetrics( ctxInfo, &config.Metrics, + &config.MeterProvider, unresolvedCfg, spanNameAggregatedMetrics, processEventsCh, @@ -186,6 +188,7 @@ func setupMetricsSubPipeline( swi.Add(prom.PrometheusEndpoint( ctxInfo, &config.Prometheus, + &config.MeterProvider, selectorCfg, unresolvedCfg, spanNameAggregatedMetrics, diff --git a/pkg/appolly/instrumenter_test.go b/pkg/appolly/instrumenter_test.go index e4973b418..bf34df651 100644 --- a/pkg/appolly/instrumenter_test.go +++ b/pkg/appolly/instrumenter_test.go @@ -29,6 +29,7 @@ import ( attr "go.opentelemetry.io/obi/pkg/export/attributes/names" "go.opentelemetry.io/obi/pkg/export/imetrics" "go.opentelemetry.io/obi/pkg/export/instrumentations" + "go.opentelemetry.io/obi/pkg/export/otel/decfg" "go.opentelemetry.io/obi/pkg/export/otel/otelcfg" "go.opentelemetry.io/obi/pkg/filter" "go.opentelemetry.io/obi/pkg/internal/testutil" @@ -56,6 +57,10 @@ var allMetrics = attributes.Selection{ "*": attributes.InclusionLists{Include: []string{"*"}}, } +var mpConfig = decfg.MeterProvider{ + Features: export.FeatureApplicationRED, +} + func allMetricsBut(patterns ...string) attributes.Selection { return attributes.Selection{ attributes.HTTPServerDuration.Section: attributes.InclusionLists{ @@ -74,7 +79,6 @@ func TestBasicPipeline(t *testing.T) { tracesInput := msg.NewQueue[[]request.Span](msg.ChannelBufferLen(10)) processEvents := msg.NewQueue[exec.ProcessEvent](msg.ChannelBufferLen(20)) cfg := otelcfg.MetricsConfig{ - Features: export.FeatureApplication, MetricsEndpoint: tc.ServerEndpoint, Interval: 10 * time.Millisecond, ReportersCacheLen: 16, TTL: 5 * time.Minute, @@ -83,9 +87,10 @@ func TestBasicPipeline(t *testing.T) { }, } gb := newGraphBuilder(&obi.Config{ - NameResolver: obi.DefaultConfig.NameResolver, - Metrics: cfg, - Attributes: obi.Attributes{Select: allMetrics, InstanceID: config.InstanceIDConfig{OverrideHostname: "the-host"}}, + NameResolver: obi.DefaultConfig.NameResolver, + MeterProvider: mpConfig, + Metrics: cfg, + Attributes: obi.Attributes{Select: allMetrics, InstanceID: config.InstanceIDConfig{OverrideHostname: "the-host"}}, }, gctx(0, &cfg), tracesInput, processEvents) // Override eBPF tracer to send some fake data @@ -193,7 +198,6 @@ func TestMergedMetricsTracePipeline(t *testing.T) { tracesInput := msg.NewQueue[[]request.Span](msg.ChannelBufferLen(10)) processEvents := msg.NewQueue[exec.ProcessEvent](msg.ChannelBufferLen(20)) mCfg := otelcfg.MetricsConfig{ - Features: export.FeatureApplication, MetricsEndpoint: tc.ServerEndpoint, Interval: 10 * time.Millisecond, ReportersCacheLen: 16, TTL: 5 * time.Minute, @@ -210,8 +214,9 @@ func TestMergedMetricsTracePipeline(t *testing.T) { } gb := newGraphBuilder(&obi.Config{ - Metrics: mCfg, - Traces: tCfg, + Metrics: mCfg, + Traces: tCfg, + MeterProvider: mpConfig, Attributes: obi.Attributes{ Select: allMetrics, InstanceID: config.InstanceIDConfig{OverrideHostname: "the-host"}, @@ -282,7 +287,6 @@ func TestRouteConsolidation(t *testing.T) { cfg := otelcfg.MetricsConfig{ SDKLogLevel: "debug", - Features: export.FeatureApplication, MetricsEndpoint: tc.ServerEndpoint, Interval: 10 * time.Millisecond, ReportersCacheLen: 16, TTL: 5 * time.Minute, @@ -291,9 +295,10 @@ func TestRouteConsolidation(t *testing.T) { }, } gb := newGraphBuilder(&obi.Config{ - Metrics: cfg, - Routes: &transform.RoutesConfig{Patterns: []string{"/user/{id}", "/products/{id}/push"}}, - Attributes: obi.Attributes{Select: allMetricsBut("client.address", "url.path"), InstanceID: config.InstanceIDConfig{OverrideHostname: "the-host"}}, + Metrics: cfg, + MeterProvider: mpConfig, + Routes: &transform.RoutesConfig{Patterns: []string{"/user/{id}", "/products/{id}/push"}}, + Attributes: obi.Attributes{Select: allMetricsBut("client.address", "url.path"), InstanceID: config.InstanceIDConfig{OverrideHostname: "the-host"}}, }, gctx(attributes.GroupHTTPRoutes, &cfg), tracesInput, processEvents) // Override eBPF tracer to send some fake data tracesInput.Send(newRequest("svc-1", "/user/1234", 200)) @@ -416,7 +421,6 @@ func TestGRPCPipeline(t *testing.T) { processEvents := msg.NewQueue[exec.ProcessEvent](msg.ChannelBufferLen(20)) cfg := otelcfg.MetricsConfig{ - Features: export.FeatureApplication, MetricsEndpoint: tc.ServerEndpoint, Interval: time.Millisecond, ReportersCacheLen: 16, TTL: 5 * time.Minute, @@ -425,8 +429,9 @@ func TestGRPCPipeline(t *testing.T) { }, } gb := newGraphBuilder(&obi.Config{ - Metrics: cfg, - Attributes: obi.Attributes{Select: allMetrics, InstanceID: config.InstanceIDConfig{OverrideHostname: "the-host"}}, + Metrics: cfg, + MeterProvider: mpConfig, + Attributes: obi.Attributes{Select: allMetrics, InstanceID: config.InstanceIDConfig{OverrideHostname: "the-host"}}, }, gctx(0, &cfg), tracesInput, processEvents) // Override eBPF tracer to send some fake data tracesInput.Send(newGRPCRequest("grpc-svc", "/foo/bar", 3)) @@ -514,7 +519,6 @@ func TestBasicPipelineInfo(t *testing.T) { tracesInput := msg.NewQueue[[]request.Span](msg.ChannelBufferLen(10)) processEvents := msg.NewQueue[exec.ProcessEvent](msg.ChannelBufferLen(20)) cfg := otelcfg.MetricsConfig{ - Features: export.FeatureApplication, MetricsEndpoint: tc.ServerEndpoint, Interval: 10 * time.Millisecond, ReportersCacheLen: 16, TTL: 5 * time.Minute, @@ -523,7 +527,8 @@ func TestBasicPipelineInfo(t *testing.T) { }, } gb := newGraphBuilder(&obi.Config{ - Metrics: cfg, + Metrics: cfg, + MeterProvider: mpConfig, Attributes: obi.Attributes{ Select: allMetrics, InstanceID: config.InstanceIDConfig{OverrideHostname: "the-host"}, @@ -609,14 +614,14 @@ func TestSpanAttributeFilterNode(t *testing.T) { processEvents := msg.NewQueue[exec.ProcessEvent](msg.ChannelBufferLen(20)) cfg := otelcfg.MetricsConfig{ SDKLogLevel: "debug", - Features: export.FeatureApplication, MetricsEndpoint: tc.ServerEndpoint, Interval: 10 * time.Millisecond, ReportersCacheLen: 16, TTL: 5 * time.Minute, Instrumentations: []instrumentations.Instrumentation{instrumentations.InstrumentationALL}, } gb := newGraphBuilder(&obi.Config{ - Metrics: cfg, + Metrics: cfg, + MeterProvider: mpConfig, Filters: filter.AttributesConfig{ Application: map[string]filter.MatchDefinition{"url.path": {Match: "/user/*"}}, }, diff --git a/pkg/export/feature.go b/pkg/export/feature.go index 92b388c00..c3c8a9083 100644 --- a/pkg/export/feature.go +++ b/pkg/export/feature.go @@ -19,8 +19,8 @@ type Features maps.Bits const ( FeatureNetwork Features = 1 << iota FeatureNetworkInterZone - FeatureApplication - FeatureSpan + FeatureApplicationRED + FeatureSpanLegacy FeatureSpanOTel FeatureSpanSizes FeatureGraph @@ -33,8 +33,8 @@ const ( var featureMapper = map[string]maps.Bits{ "network": maps.Bits(FeatureNetwork), "network_inter_zone": maps.Bits(FeatureNetworkInterZone), - "application": maps.Bits(FeatureApplication), - "application_span": maps.Bits(FeatureSpan), + "application": maps.Bits(FeatureApplicationRED), + "application_span": maps.Bits(FeatureSpanLegacy), "application_span_otel": maps.Bits(FeatureSpanOTel), "application_span_sizes": maps.Bits(FeatureSpanSizes), "application_service_graph": maps.Bits(FeatureGraph), @@ -49,11 +49,11 @@ func LoadFeatures(features []string) Features { return Features(maps.MappedBits(features, featureMapper)) } -func (f Features) Has(feature Features) bool { +func (f Features) has(feature Features) bool { return maps.Bits(f).Has(maps.Bits(feature)) } -func (f Features) Any(feature Features) bool { +func (f Features) any(feature Features) bool { return maps.Bits(f).Any(maps.Bits(feature)) } @@ -77,3 +77,71 @@ func (f *Features) UnmarshalText(text []byte) error { *f = LoadFeatures(strings.Split(string(text), ",")) return nil } + +func (f Features) AnyAppO11yMetric() bool { + return f.any( + FeatureApplicationRED | + FeatureSpanLegacy | + FeatureSpanOTel | + FeatureSpanSizes | + FeatureGraph | + FeatureProcess | + FeatureApplicationHost) +} + +func (f Features) SpanMetrics() bool { + return f.any(FeatureSpanLegacy | FeatureSpanOTel) +} + +func (f Features) AnySpanMetrics() bool { + return f.any(FeatureSpanLegacy | FeatureSpanOTel | FeatureSpanSizes) +} + +func (f Features) AnyNetwork() bool { + return f.any(FeatureNetwork | FeatureNetworkInterZone) +} + +func (f Features) AppOrSpan() bool { + return f.any(FeatureApplicationRED | + FeatureSpanSizes | + FeatureApplicationHost | + FeatureSpanLegacy | + FeatureSpanOTel) +} + +func (f Features) LegacySpanMetrics() bool { + return f.any(FeatureSpanLegacy) +} + +func (f Features) ServiceGraph() bool { + return f.any(FeatureGraph) +} + +func (f Features) AppHost() bool { + return f.any(FeatureApplicationHost) +} + +func (f Features) AppRED() bool { + return f.any(FeatureApplicationRED) +} + +func (f Features) SpanSizes() bool { + return f.any(FeatureSpanSizes) +} + +func (f Features) NetworkBytes() bool { + return f.any(FeatureNetwork) +} + +func (f Features) NetworkInterZone() bool { + return f.any(FeatureNetworkInterZone) +} + +func (f Features) BPF() bool { + return f.any(FeatureEBPF) +} + +// InvalidSpanMetricsConfig is used to make sure that you can't define both legacy and OTEL span metrics at the same time +func (f Features) InvalidSpanMetricsConfig() bool { + return f.has(FeatureSpanLegacy | FeatureSpanOTel) +} diff --git a/pkg/export/feature_test.go b/pkg/export/feature_test.go index 5f136e617..fd0f063fd 100644 --- a/pkg/export/feature_test.go +++ b/pkg/export/feature_test.go @@ -19,12 +19,12 @@ func TestFeatureYAML(t *testing.T) { require.NoError(t, yaml.Unmarshal([]byte(`features: [application, application_span_otel]`), &doc)) - assert.True(t, doc.Features.Has(FeatureApplication)) - assert.True(t, doc.Features.Has(FeatureSpanOTel)) - assert.True(t, doc.Features.Has(FeatureApplication|FeatureSpanOTel)) - assert.False(t, doc.Features.Has(FeatureProcess)) - assert.False(t, doc.Features.Has(FeatureApplication|FeatureProcess)) - assert.False(t, doc.Features.Has(FeatureAll)) + assert.True(t, doc.Features.has(FeatureApplicationRED)) + assert.True(t, doc.Features.has(FeatureSpanOTel)) + assert.True(t, doc.Features.has(FeatureApplicationRED|FeatureSpanOTel)) + assert.False(t, doc.Features.has(FeatureProcess)) + assert.False(t, doc.Features.has(FeatureApplicationRED|FeatureProcess)) + assert.False(t, doc.Features.has(FeatureAll)) } func TestFeatureEnv(t *testing.T) { @@ -34,10 +34,10 @@ func TestFeatureEnv(t *testing.T) { t.Setenv("FOO", "network") require.NoError(t, env.Parse(&doc)) - assert.True(t, doc.Features.Has(FeatureNetwork)) - assert.False(t, doc.Features.Has(FeatureSpanOTel)) - assert.False(t, doc.Features.Has(FeatureProcess)) - assert.False(t, doc.Features.Has(FeatureAll)) + assert.True(t, doc.Features.has(FeatureNetwork)) + assert.False(t, doc.Features.has(FeatureSpanOTel)) + assert.False(t, doc.Features.has(FeatureProcess)) + assert.False(t, doc.Features.has(FeatureAll)) } func TestFeatureEnv_Separator(t *testing.T) { @@ -47,10 +47,10 @@ func TestFeatureEnv_Separator(t *testing.T) { t.Setenv("FOO", "network,application,application_span_otel") require.NoError(t, env.Parse(&doc)) - assert.True(t, doc.Features.Has(FeatureNetwork)) - assert.True(t, doc.Features.Has(FeatureApplication|FeatureSpanOTel)) - assert.False(t, doc.Features.Has(FeatureProcess)) - assert.False(t, doc.Features.Has(FeatureAll)) + assert.True(t, doc.Features.has(FeatureNetwork)) + assert.True(t, doc.Features.has(FeatureApplicationRED|FeatureSpanOTel)) + assert.False(t, doc.Features.has(FeatureProcess)) + assert.False(t, doc.Features.has(FeatureAll)) } func TestFeatureEnv_All(t *testing.T) { @@ -60,10 +60,10 @@ func TestFeatureEnv_All(t *testing.T) { t.Setenv("FOO", "all") require.NoError(t, env.Parse(&doc)) - assert.True(t, doc.Features.Has(FeatureNetwork)) - assert.True(t, doc.Features.Has(FeatureApplication|FeatureSpanOTel)) - assert.True(t, doc.Features.Has(FeatureProcess)) - assert.True(t, doc.Features.Has(FeatureAll)) + assert.True(t, doc.Features.has(FeatureNetwork)) + assert.True(t, doc.Features.has(FeatureApplicationRED|FeatureSpanOTel)) + assert.True(t, doc.Features.has(FeatureProcess)) + assert.True(t, doc.Features.has(FeatureAll)) } func TestFeatureYAML_All(t *testing.T) { @@ -73,10 +73,10 @@ func TestFeatureYAML_All(t *testing.T) { require.NoError(t, yaml.Unmarshal([]byte(`features: ["*"]`), &doc)) - assert.True(t, doc.Features.Has(FeatureApplication)) - assert.True(t, doc.Features.Has(FeatureSpanOTel)) - assert.True(t, doc.Features.Has(FeatureProcess)) - assert.True(t, doc.Features.Has(FeatureAll)) + assert.True(t, doc.Features.has(FeatureApplicationRED)) + assert.True(t, doc.Features.has(FeatureSpanOTel)) + assert.True(t, doc.Features.has(FeatureProcess)) + assert.True(t, doc.Features.has(FeatureAll)) } func TestFeatureYAML_Error(t *testing.T) { diff --git a/pkg/export/otel/decfg/decfg.go b/pkg/export/otel/decfg/decfg.go new file mode 100644 index 000000000..dab280987 --- /dev/null +++ b/pkg/export/otel/decfg/decfg.go @@ -0,0 +1,20 @@ +// Copyright The OpenTelemetry Authors +// SPDX-License-Identifier: Apache-2.0 + +// Package decfg is a placeholder for the future global and per-service support +// of the OpenTelemetry Declarative configuration format. +// https://github.com/open-telemetry/opentelemetry-configuration/tree/main/schema +package decfg + +import "go.opentelemetry.io/obi/pkg/export" + +// MeterProvider is a placeholder for the progressive support of global and per-service +// definition of the meter_provider section of the OpenTelemetry configuration format: +// https://github.com/open-telemetry/opentelemetry-configuration/blob/main/schema/meter_provider.yaml +// Due to the nature of OBI, it might contain some fields that are exclusive to OBI. They are prefixed with "obi_" +type MeterProvider struct { + // Features of metrics that can be exported. Accepted values: application, network, application_process, + // application_span, application_service_graph, ... + // envDefault is provided to avoid breaking changes + Features export.Features `yaml:"obi_features" env:"OTEL_EBPF_METRICS_FEATURES,expand" envDefault:"${OTEL_EBPF_METRIC_FEATURES}" envSeparator:","` +} diff --git a/pkg/export/otel/expirer_test.go b/pkg/export/otel/expirer_test.go index 01ed9205e..d6d32ca8f 100644 --- a/pkg/export/otel/expirer_test.go +++ b/pkg/export/otel/expirer_test.go @@ -20,6 +20,7 @@ import ( "go.opentelemetry.io/obi/pkg/export/attributes" attr "go.opentelemetry.io/obi/pkg/export/attributes/names" "go.opentelemetry.io/obi/pkg/export/instrumentations" + "go.opentelemetry.io/obi/pkg/export/otel/decfg" "go.opentelemetry.io/obi/pkg/export/otel/otelcfg" "go.opentelemetry.io/obi/pkg/internal/netolly/ebpf" "go.opentelemetry.io/obi/pkg/pipe/global" @@ -28,6 +29,8 @@ import ( const timeout = 20 * time.Second +var mpConfig = decfg.MeterProvider{Features: export.FeatureAll} + func TestNetMetricsExpiration(t *testing.T) { defer otelcfg.RestoreEnvAfterExecution()() ctx := t.Context() @@ -43,7 +46,6 @@ func TestNetMetricsExpiration(t *testing.T) { Interval: 50 * time.Millisecond, CommonEndpoint: otlp.ServerEndpoint, MetricsProtocol: otelcfg.ProtocolHTTPProtobuf, - Features: export.FeatureNetwork, TTL: 3 * time.Minute, Instrumentations: []instrumentations.Instrumentation{ instrumentations.InstrumentationALL, @@ -60,6 +62,7 @@ func TestNetMetricsExpiration(t *testing.T) { }, }, }, + MeterProvider: &decfg.MeterProvider{Features: export.FeatureNetwork}, }, metrics)(ctx) require.NoError(t, err) @@ -168,7 +171,6 @@ func TestAppMetricsExpiration_ByMetricAttrs(t *testing.T) { Interval: 50 * time.Millisecond, CommonEndpoint: otlp.ServerEndpoint, MetricsProtocol: otelcfg.ProtocolHTTPProtobuf, - Features: export.FeatureApplication, TTL: 3 * time.Minute, ReportersCacheLen: 100, Instrumentations: []instrumentations.Instrumentation{ @@ -179,7 +181,7 @@ func TestAppMetricsExpiration_ByMetricAttrs(t *testing.T) { &global.ContextInfo{ MetricAttributeGroups: g, OTELMetricsExporter: &otelcfg.MetricsExporterInstancer{Cfg: cfg}, - }, cfg, &attributes.SelectorConfig{ + }, cfg, &mpConfig, &attributes.SelectorConfig{ SelectionCfg: attributes.Selection{ attributes.HTTPServerDuration.Section: attributes.InclusionLists{ Include: []string{"url.path", "k8s.app.version"}, @@ -307,7 +309,6 @@ func TestAppMetricsExpiration_BySvcID(t *testing.T) { Interval: 50 * time.Millisecond, CommonEndpoint: otlp.ServerEndpoint, MetricsProtocol: otelcfg.ProtocolHTTPProtobuf, - Features: export.FeatureApplication, TTL: 3 * time.Minute, ReportersCacheLen: 100, Instrumentations: []instrumentations.Instrumentation{ @@ -316,7 +317,7 @@ func TestAppMetricsExpiration_BySvcID(t *testing.T) { } otelExporter, err := ReportMetrics( &global.ContextInfo{OTELMetricsExporter: &otelcfg.MetricsExporterInstancer{Cfg: cfg}}, - cfg, + cfg, &mpConfig, &attributes.SelectorConfig{ SelectionCfg: attributes.Selection{ attributes.HTTPServerDuration.Section: attributes.InclusionLists{ diff --git a/pkg/export/otel/metrics.go b/pkg/export/otel/metrics.go index af1bc4557..fd124dd1b 100644 --- a/pkg/export/otel/metrics.go +++ b/pkg/export/otel/metrics.go @@ -18,11 +18,11 @@ import ( "go.opentelemetry.io/obi/pkg/appolly/app/request" "go.opentelemetry.io/obi/pkg/appolly/app/svc" "go.opentelemetry.io/obi/pkg/appolly/discover/exec" - "go.opentelemetry.io/obi/pkg/export" "go.opentelemetry.io/obi/pkg/export/attributes" attr "go.opentelemetry.io/obi/pkg/export/attributes/names" "go.opentelemetry.io/obi/pkg/export/imetrics" "go.opentelemetry.io/obi/pkg/export/instrumentations" + "go.opentelemetry.io/obi/pkg/export/otel/decfg" "go.opentelemetry.io/obi/pkg/export/otel/metric" instrument "go.opentelemetry.io/obi/pkg/export/otel/metric/api/metric" "go.opentelemetry.io/obi/pkg/export/otel/otelcfg" @@ -71,6 +71,7 @@ var MetricTypes = []string{ type MetricsReporter struct { ctx context.Context cfg *otelcfg.MetricsConfig + meterProvider *decfg.MeterProvider hostID string attributes *attributes.AttrSelector exporter sdkmetric.Exporter @@ -155,13 +156,14 @@ type TargetMetrics struct { func ReportMetrics( ctxInfo *global.ContextInfo, cfg *otelcfg.MetricsConfig, + mcfg *decfg.MeterProvider, selectorCfg *attributes.SelectorConfig, unresolved request.UnresolvedNames, input *msg.Queue[[]request.Span], processEventCh *msg.Queue[exec.ProcessEvent], ) swarm.InstanceFunc { return func(ctx context.Context) (swarm.RunFunc, error) { - if !cfg.Enabled() { + if !(cfg.EndpointEnabled() && mcfg.Features.AppOrSpan()) { return swarm.EmptyRunFunc() } otelcfg.SetupInternalOTELSDKLogger(cfg.SDKLogLevel) @@ -170,6 +172,7 @@ func ReportMetrics( ctx, ctxInfo, cfg, + mcfg, selectorCfg, unresolved, input, @@ -187,6 +190,7 @@ func newMetricsReporter( ctx context.Context, ctxInfo *global.ContextInfo, cfg *otelcfg.MetricsConfig, + meterProvider *decfg.MeterProvider, selectorCfg *attributes.SelectorConfig, unresolved request.UnresolvedNames, input *msg.Queue[[]request.Span], @@ -204,6 +208,7 @@ func newMetricsReporter( mr := MetricsReporter{ ctx: ctx, cfg: cfg, + meterProvider: meterProvider, is: is, targetMetrics: map[svc.UID]*TargetMetrics{}, attributes: attribProvider, @@ -304,7 +309,7 @@ func newMetricsReporter( systemMetrics := mr.newMetricsInstance(nil) systemMeter := systemMetrics.provider.Meter(reporterName) - if cfg.HostMetricsEnabled() { + if meterProvider.Features.AppHost() { err := mr.setupHostInfoMeter(systemMeter) if err != nil { return nil, fmt.Errorf("setting up host metrics: %w", err) @@ -324,7 +329,7 @@ func newMetricsReporter( func (mr *MetricsReporter) otelMetricOptions(mlog *slog.Logger) []metric.Option { var opts []metric.Option - if !mr.cfg.OTelMetricsEnabled() { + if !mr.meterProvider.Features.AppRED() { return opts } @@ -365,7 +370,7 @@ func (mr *MetricsReporter) otelMetricOptions(mlog *slog.Logger) []metric.Option } func (mr *MetricsReporter) usesLegacySpanNames() bool { - return mr.cfg.Features.Has(export.FeatureSpan) + return mr.meterProvider.Features.LegacySpanMetrics() } func (mr *MetricsReporter) spanMetricsLatencyName() string { @@ -377,7 +382,7 @@ func (mr *MetricsReporter) spanMetricsLatencyName() string { } func (mr *MetricsReporter) spanMetricOptions(mlog *slog.Logger) []metric.Option { - if !mr.cfg.SpanMetricsEnabled() { + if !mr.meterProvider.Features.SpanMetrics() { return []metric.Option{} } @@ -400,7 +405,7 @@ func (mr *MetricsReporter) setupTargetInfo(meter instrument.Meter) error { // nolint: cyclop func (mr *MetricsReporter) setupOtelMeters(m *Metrics, meter instrument.Meter) error { - if !mr.cfg.OTelMetricsEnabled() { + if !mr.meterProvider.Features.AppRED() { return nil } @@ -547,7 +552,7 @@ func (mr *MetricsReporter) spanMetricsCallsName() string { } func (mr *MetricsReporter) setupSpanSizeMeters(m *Metrics, meter instrument.Meter) error { - if !mr.cfg.SpanMetricsSizesEnabled() { + if !mr.meterProvider.Features.SpanSizes() { return nil } @@ -584,7 +589,7 @@ func (mr *MetricsReporter) setupTracesTargetInfo(meter instrument.Meter) error { } func (mr *MetricsReporter) setupSpanMeters(m *Metrics, meter instrument.Meter) error { - if !mr.cfg.SpanMetricsEnabled() { + if !mr.meterProvider.Features.SpanMetrics() { return nil } @@ -671,21 +676,21 @@ func (mr *MetricsReporter) newMetricSet(service *svc.Attrs) (*Metrics, error) { meter := m.provider.Meter(reporterName) var err error - if mr.cfg.OTelMetricsEnabled() { + if mr.meterProvider.Features.AppRED() { err = mr.setupOtelMeters(&m, meter) if err != nil { return nil, err } } - if mr.cfg.SpanMetricsEnabled() { + if mr.meterProvider.Features.SpanMetrics() { err = mr.setupSpanMeters(&m, meter) if err != nil { return nil, err } } - if mr.cfg.SpanMetricsSizesEnabled() { + if mr.meterProvider.Features.SpanSizes() { err = mr.setupSpanSizeMeters(&m, meter) if err != nil { return nil, err @@ -808,11 +813,12 @@ func (mr *MetricsReporter) spanMetricAttributes() []attributes.Field[*request.Sp } func otelMetricsAccepted(span *request.Span, mr *MetricsReporter) bool { - return mr.cfg.OTelMetricsEnabled() && !span.Service.ExportsOTelMetrics() + return mr.meterProvider.Features.AppRED() && !span.Service.ExportsOTelMetrics() } func otelSpanMetricsAccepted(span *request.Span, mr *MetricsReporter) bool { - return mr.cfg.AnySpanMetricsEnabled() && !span.Service.ExportsOTelMetricsSpan() + return mr.meterProvider.Features.AnySpanMetrics() && + !span.Service.ExportsOTelMetricsSpan() } //nolint:cyclop @@ -909,7 +915,7 @@ func (r *Metrics) record(span *request.Span, mr *MetricsReporter) { } } - if mr.cfg.SpanMetricsEnabled() { + if mr.meterProvider.Features.SpanMetrics() { sml, attrs := r.spanMetricsLatency.ForRecord(span) sml.Record(ctx, duration, instrument.WithAttributeSet(attrs)) @@ -917,7 +923,7 @@ func (r *Metrics) record(span *request.Span, mr *MetricsReporter) { smct.Add(ctx, 1, instrument.WithAttributeSet(attrs)) } - if mr.cfg.SpanMetricsSizesEnabled() { + if mr.meterProvider.Features.SpanSizes() { smst, attrs := r.spanMetricsRequestSizeTotal.ForRecord(span) smst.Add(ctx, float64(span.RequestBodyLength()), instrument.WithAttributeSet(attrs)) @@ -946,7 +952,7 @@ func (mr *MetricsReporter) deleteTargetInfo(attrs *attribute.Set) { } func (mr *MetricsReporter) createTracesTargetInfo(attrs *attribute.Set) { - if !mr.cfg.AnySpanMetricsEnabled() { + if !mr.meterProvider.Features.AnySpanMetrics() { return } @@ -958,7 +964,7 @@ func (mr *MetricsReporter) createTracesTargetInfo(attrs *attribute.Set) { } func (mr *MetricsReporter) deleteTracesTargetInfo(attrs *attribute.Set) { - if attrs == nil || !mr.cfg.AnySpanMetricsEnabled() { + if attrs == nil || !mr.meterProvider.Features.AnySpanMetrics() { return } @@ -1022,7 +1028,7 @@ func (mr *MetricsReporter) ensureTargetMetrics(service *svc.Attrs) *TargetMetric targetMetrics.resourceAttributes = attribute.NewSet(mr.resourceAttrsForService(service)...) - if mr.cfg.AnySpanMetricsEnabled() { + if mr.meterProvider.Features.AnySpanMetrics() { targetMetrics.tracesResourceAttributes = mr.tracesResourceAttributes(service) } else { targetMetrics.tracesResourceAttributes = *attribute.EmptySet() @@ -1108,7 +1114,7 @@ func (mr *MetricsReporter) onProcessEvent(pe *exec.ProcessEvent) { mr.deleteTargetMetrics(&origUID) - if mr.cfg.HostMetricsEnabled() && mr.pidTracker.Count() == 0 { + if mr.meterProvider.Features.AppHost() && mr.pidTracker.Count() == 0 { mlog().Debug("No more PIDs tracked, expiring host info metric") mr.hostInfo.RemoveAllMetrics(mr.ctx) } @@ -1137,7 +1143,7 @@ func (mr *MetricsReporter) onSpan(spans []request.Span) { } reporter.record(s, mr) - if mr.cfg.HostMetricsEnabled() { + if mr.meterProvider.Features.AppHost() { hostInfo, attrs := mr.hostInfo.ForRecord(s) hostInfo.Record(mr.ctx, 1, instrument.WithAttributeSet(attrs)) } diff --git a/pkg/export/otel/metrics_net.go b/pkg/export/otel/metrics_net.go index 8c281b0aa..509e7ea20 100644 --- a/pkg/export/otel/metrics_net.go +++ b/pkg/export/otel/metrics_net.go @@ -18,6 +18,7 @@ import ( "go.opentelemetry.io/obi/pkg/export/attributes" attr "go.opentelemetry.io/obi/pkg/export/attributes/names" "go.opentelemetry.io/obi/pkg/export/expire" + "go.opentelemetry.io/obi/pkg/export/otel/decfg" "go.opentelemetry.io/obi/pkg/export/otel/metric" metric2 "go.opentelemetry.io/obi/pkg/export/otel/metric/api/metric" "go.opentelemetry.io/obi/pkg/export/otel/otelcfg" @@ -29,14 +30,14 @@ import ( // NetMetricsConfig extends MetricsConfig for Network Metrics type NetMetricsConfig struct { - Metrics *otelcfg.MetricsConfig - SelectorCfg *attributes.SelectorConfig - // Deprecated: to be removed in Beyla 3.0 with OTEL_EBPF_NETWORK_METRICS bool flag - GloballyEnabled bool + Metrics *otelcfg.MetricsConfig + MeterProvider *decfg.MeterProvider + SelectorCfg *attributes.SelectorConfig } -func (mc NetMetricsConfig) Enabled() bool { - return mc.Metrics != nil && mc.Metrics.EndpointEnabled() && (mc.Metrics.NetworkMetricsEnabled() || mc.GloballyEnabled) +func (mc *NetMetricsConfig) Enabled() bool { + return mc.Metrics != nil && mc.Metrics.EndpointEnabled() && + mc.MeterProvider.Features.AnyNetwork() } func nmlog() *slog.Logger { @@ -79,7 +80,9 @@ type netMetricsExporter struct { } func NetMetricsExporterProvider( - ctxInfo *global.ContextInfo, cfg *NetMetricsConfig, input *msg.Queue[[]*ebpf.Record], + ctxInfo *global.ContextInfo, + cfg *NetMetricsConfig, + input *msg.Queue[[]*ebpf.Record], ) swarm.InstanceFunc { return func(ctx context.Context) (swarm.RunFunc, error) { if !cfg.Enabled() { @@ -98,7 +101,10 @@ func NetMetricsExporterProvider( } func newMetricsExporter( - ctx context.Context, ctxInfo *global.ContextInfo, cfg *NetMetricsConfig, input *msg.Queue[[]*ebpf.Record], + ctx context.Context, + ctxInfo *global.ContextInfo, + cfg *NetMetricsConfig, + input *msg.Queue[[]*ebpf.Record], ) (*netMetricsExporter, error) { log := nmlog() log.Debug("instantiating network metrics exporter provider") @@ -124,7 +130,7 @@ func newMetricsExporter( clock: clock, expireTTL: cfg.Metrics.TTL, } - if cfg.GloballyEnabled || cfg.Metrics.NetworkFlowBytesEnabled() { + if cfg.MeterProvider.Features.NetworkBytes() { log := log.With("metricFamily", "FlowBytes") bytesMetric, err := ebpfEvents.Int64Counter(attributes.NetworkFlow.OTEL, metric2.WithDescription("total bytes_sent value of network flows observed by probe since its launch"), @@ -143,7 +149,7 @@ func newMetricsExporter( nme.flowBytes = NewExpirer[*ebpf.Record, metric2.Int64Counter, float64](ctx, bytesMetric, attrs, clock.Time, cfg.Metrics.TTL) } - if cfg.Metrics.NetworkInterzoneMetricsEnabled() { + if cfg.MeterProvider.Features.NetworkInterZone() { log := log.With("metricFamily", "InterZoneBytes") bytesMetric, err := ebpfEvents.Int64Counter(attributes.NetworkInterZone.OTEL, metric2.WithDescription("total bytes_sent value between Cloud availability zones"), diff --git a/pkg/export/otel/metrics_net_test.go b/pkg/export/otel/metrics_net_test.go index f07a3878d..7730990b5 100644 --- a/pkg/export/otel/metrics_net_test.go +++ b/pkg/export/otel/metrics_net_test.go @@ -12,7 +12,6 @@ import ( "go.opentelemetry.io/otel/attribute" - "go.opentelemetry.io/obi/pkg/export" "go.opentelemetry.io/obi/pkg/export/attributes" attr "go.opentelemetry.io/obi/pkg/export/attributes/names" "go.opentelemetry.io/obi/pkg/export/otel/otelcfg" @@ -49,16 +48,19 @@ func TestMetricAttributes(t *testing.T) { Interval: 10 * time.Millisecond, ReportersCacheLen: 100, TTL: 5 * time.Minute, - Features: export.FeatureNetwork | export.FeatureNetworkInterZone, } me, err := newMetricsExporter(t.Context(), &global.ContextInfo{ MetricAttributeGroups: attributes.GroupKubernetes, OTELMetricsExporter: &otelcfg.MetricsExporterInstancer{Cfg: mcfg}, - }, &NetMetricsConfig{SelectorCfg: &attributes.SelectorConfig{ - SelectionCfg: map[attributes.Section]attributes.InclusionLists{ - attributes.NetworkFlow.Section: {Include: []string{"*"}}, + }, &NetMetricsConfig{ + SelectorCfg: &attributes.SelectorConfig{ + SelectionCfg: map[attributes.Section]attributes.InclusionLists{ + attributes.NetworkFlow.Section: {Include: []string{"*"}}, + }, }, - }, Metrics: mcfg}, msg.NewQueue[[]*ebpf.Record]()) + Metrics: mcfg, + MeterProvider: &mpConfig, + }, msg.NewQueue[[]*ebpf.Record]()) require.NoError(t, err) _, reportedAttributes := me.flowBytes.ForRecord(in) @@ -108,21 +110,24 @@ func TestMetricAttributes_Filter(t *testing.T) { MetricsEndpoint: "http://foo", Interval: 10 * time.Millisecond, ReportersCacheLen: 100, - Features: export.FeatureNetwork | export.FeatureNetworkInterZone, } me, err := newMetricsExporter(t.Context(), &global.ContextInfo{ MetricAttributeGroups: attributes.GroupKubernetes, OTELMetricsExporter: &otelcfg.MetricsExporterInstancer{Cfg: mcfg}, }, - &NetMetricsConfig{SelectorCfg: &attributes.SelectorConfig{ - SelectionCfg: map[attributes.Section]attributes.InclusionLists{ - attributes.NetworkFlow.Section: {Include: []string{ - "src.address", - "k8s.src.name", - "k8s.dst.name", - }}, + &NetMetricsConfig{ + SelectorCfg: &attributes.SelectorConfig{ + SelectionCfg: map[attributes.Section]attributes.InclusionLists{ + attributes.NetworkFlow.Section: {Include: []string{ + "src.address", + "k8s.src.name", + "k8s.dst.name", + }}, + }, }, - }, Metrics: mcfg}, msg.NewQueue[[]*ebpf.Record]()) + Metrics: mcfg, + MeterProvider: &mpConfig, + }, msg.NewQueue[[]*ebpf.Record]()) require.NoError(t, err) _, reportedAttributes := me.flowBytes.ForRecord(in) @@ -146,27 +151,6 @@ func TestMetricAttributes_Filter(t *testing.T) { } } -func TestNetMetricsConfig_Enabled(t *testing.T) { - assert.True(t, NetMetricsConfig{Metrics: &otelcfg.MetricsConfig{ - Features: export.FeatureApplication | export.FeatureNetwork, CommonEndpoint: "foo", - }}.Enabled()) - assert.True(t, NetMetricsConfig{Metrics: &otelcfg.MetricsConfig{ - Features: export.FeatureNetwork | export.FeatureProcess, MetricsEndpoint: "foo", - }}.Enabled()) -} - -func TestNetMetricsConfig_Disabled(t *testing.T) { - fa := export.FeatureApplication - fn := export.FeatureNetwork - assert.False(t, NetMetricsConfig{Metrics: &otelcfg.MetricsConfig{Features: fn}}.Enabled()) - assert.False(t, NetMetricsConfig{Metrics: &otelcfg.MetricsConfig{Features: fn}}.Enabled()) - assert.False(t, NetMetricsConfig{Metrics: &otelcfg.MetricsConfig{Features: fn}}.Enabled()) - // network feature is not enabled - assert.False(t, NetMetricsConfig{Metrics: &otelcfg.MetricsConfig{CommonEndpoint: "foo"}}.Enabled()) - assert.False(t, NetMetricsConfig{Metrics: &otelcfg.MetricsConfig{MetricsEndpoint: "foo", Features: fa}}.Enabled()) - assert.False(t, NetMetricsConfig{Metrics: &otelcfg.MetricsConfig{}}.Enabled()) -} - func TestGetFilteredNetworkResourceAttrs(t *testing.T) { hostID := "test-host-id" attrSelector := attributes.Selection{ diff --git a/pkg/export/otel/metrics_svc_graph.go b/pkg/export/otel/metrics_svc_graph.go index 860b2afb9..35d86158c 100644 --- a/pkg/export/otel/metrics_svc_graph.go +++ b/pkg/export/otel/metrics_svc_graph.go @@ -20,6 +20,7 @@ import ( "go.opentelemetry.io/obi/pkg/export/attributes" attr "go.opentelemetry.io/obi/pkg/export/attributes/names" "go.opentelemetry.io/obi/pkg/export/instrumentations" + "go.opentelemetry.io/obi/pkg/export/otel/decfg" "go.opentelemetry.io/obi/pkg/export/otel/metric" instrument "go.opentelemetry.io/obi/pkg/export/otel/metric/api/metric" "go.opentelemetry.io/obi/pkg/export/otel/otelcfg" @@ -39,7 +40,7 @@ const ( ServiceGraphTotal = "traces_service_graph_request_total" ) -// MetricsReporter implements the graph node that receives request.Span +// SvcGraphMetricsReporter implements the graph node that receives request.Span // instances and forwards them as OTEL metrics. type SvcGraphMetricsReporter struct { ctx context.Context @@ -57,7 +58,7 @@ type SvcGraphMetricsReporter struct { log *slog.Logger } -// SvcGraphMetrics is a set of metrics associated to a given OTEL MeterProvider. +// SvcGraphMetrics is a set of metrics associated with a given OTEL MeterProvider. // There is a Metrics instance for each service/process instrumented by OBI. type SvcGraphMetrics struct { ctx context.Context @@ -75,12 +76,13 @@ type SvcGraphMetrics struct { func ReportSvcGraphMetrics( ctxInfo *global.ContextInfo, cfg *otelcfg.MetricsConfig, + mpCfg *decfg.MeterProvider, unresolved request.UnresolvedNames, input *msg.Queue[[]request.Span], processEvents *msg.Queue[exec.ProcessEvent], ) swarm.InstanceFunc { return func(ctx context.Context) (swarm.RunFunc, error) { - if !cfg.EndpointEnabled() || !cfg.ServiceGraphMetricsEnabled() { + if !cfg.EndpointEnabled() || !mpCfg.Features.ServiceGraph() { return swarm.EmptyRunFunc() } otelcfg.SetupInternalOTELSDKLogger(cfg.SDKLogLevel) @@ -313,9 +315,9 @@ func ClientSpanToUninstrumentedService(tracker *PidServiceTracker, span *request n := svc.ServiceNameNamespace{Name: span.HostName, Namespace: span.OtherNamespace} return !tracker.IsTrackingServerService(n) } - // If we haven't resolved a hostname, don't add this node to the service graph - // it will appear only in client requests. Essentially, in this case we have no - // idea if the service is instrumented or not, therefore we take the conservative + // If we haven't resolved a hostname, don't add this node to the service graph. + // It will appear only in client requests. Essentially, in this case we have no + // idea if the service is instrumented or not; therefore, we take the conservative // approach to avoid double counting. return false } diff --git a/pkg/export/otel/metrics_svc_graph_test.go b/pkg/export/otel/metrics_svc_graph_test.go index 0695b3535..76c780b79 100644 --- a/pkg/export/otel/metrics_svc_graph_test.go +++ b/pkg/export/otel/metrics_svc_graph_test.go @@ -17,7 +17,6 @@ import ( "go.opentelemetry.io/obi/pkg/appolly/app/request" "go.opentelemetry.io/obi/pkg/appolly/app/svc" "go.opentelemetry.io/obi/pkg/appolly/discover/exec" - "go.opentelemetry.io/obi/pkg/export" "go.opentelemetry.io/obi/pkg/export/instrumentations" "go.opentelemetry.io/obi/pkg/export/otel/otelcfg" "go.opentelemetry.io/obi/pkg/pipe/global" @@ -87,14 +86,13 @@ func makeSvcGraphExporter( Interval: 50 * time.Millisecond, CommonEndpoint: otlp.ServerEndpoint, MetricsProtocol: otelcfg.ProtocolHTTPProtobuf, - Features: export.FeatureGraph, TTL: 30 * time.Minute, ReportersCacheLen: 100, Instrumentations: []instrumentations.Instrumentation{instrumentations.InstrumentationALL}, } otelExporter, err := ReportSvcGraphMetrics( &global.ContextInfo{OTELMetricsExporter: &otelcfg.MetricsExporterInstancer{Cfg: mcfg}}, - mcfg, request.UnresolvedNames{}, input, processEvents)(ctx) + mcfg, &mpConfig, request.UnresolvedNames{}, input, processEvents)(ctx) require.NoError(t, err) return otelExporter diff --git a/pkg/export/otel/metrics_test.go b/pkg/export/otel/metrics_test.go index dcd88f1c7..ac8ade9b6 100644 --- a/pkg/export/otel/metrics_test.go +++ b/pkg/export/otel/metrics_test.go @@ -30,6 +30,7 @@ import ( attr "go.opentelemetry.io/obi/pkg/export/attributes/names" "go.opentelemetry.io/obi/pkg/export/imetrics" "go.opentelemetry.io/obi/pkg/export/instrumentations" + "go.opentelemetry.io/obi/pkg/export/otel/decfg" "go.opentelemetry.io/obi/pkg/export/otel/otelcfg" "go.opentelemetry.io/obi/pkg/pipe/global" "go.opentelemetry.io/obi/pkg/pipe/msg" @@ -57,12 +58,12 @@ func TestMetrics_InternalInstrumentation(t *testing.T) { internalMetrics := &fakeInternalMetrics{} mcfg := &otelcfg.MetricsConfig{ CommonEndpoint: coll.URL, Interval: 10 * time.Millisecond, ReportersCacheLen: 16, - Features: export.FeatureApplication, Instrumentations: []instrumentations.Instrumentation{instrumentations.InstrumentationHTTP}, + Instrumentations: []instrumentations.Instrumentation{instrumentations.InstrumentationHTTP}, } reporter, err := ReportMetrics(&global.ContextInfo{ Metrics: internalMetrics, OTELMetricsExporter: &otelcfg.MetricsExporterInstancer{Cfg: mcfg}, - }, mcfg, &attributes.SelectorConfig{}, request.UnresolvedNames{}, exportMetrics, processEvents, + }, mcfg, &mpConfig, &attributes.SelectorConfig{}, request.UnresolvedNames{}, exportMetrics, processEvents, )(t.Context()) require.NoError(t, err) go reporter(t.Context()) @@ -241,7 +242,7 @@ func TestAppMetrics_ByInstrumentation(t *testing.T) { metrics := msg.NewQueue[[]request.Span](msg.ChannelBufferLen(20)) processEvents := msg.NewQueue[exec.ProcessEvent](msg.ChannelBufferLen(20)) - otelExporter := makeMetricsReporter(ctx, t, tt.instr, export.FeatureApplication, otlp, metrics, processEvents).reportMetrics + otelExporter := makeMetricsReporter(ctx, t, tt.instr, export.FeatureApplicationRED, otlp, metrics, processEvents).reportMetrics require.NoError(t, err) go otelExporter(ctx) @@ -307,7 +308,7 @@ func TestAppMetrics_ResourceAttributes(t *testing.T) { metrics := msg.NewQueue[[]request.Span](msg.ChannelBufferLen(20)) processEvents := msg.NewQueue[exec.ProcessEvent](msg.ChannelBufferLen(20)) - otelExporter := makeMetricsReporter(ctx, t, []instrumentations.Instrumentation{instrumentations.InstrumentationHTTP}, export.FeatureApplication, otlp, metrics, processEvents).reportMetrics + otelExporter := makeMetricsReporter(ctx, t, []instrumentations.Instrumentation{instrumentations.InstrumentationHTTP}, export.FeatureApplicationRED, otlp, metrics, processEvents).reportMetrics go otelExporter(ctx) metrics.Send([]request.Span{ @@ -322,11 +323,9 @@ func TestAppMetrics_ResourceAttributes(t *testing.T) { } func TestMetricsDiscarded(t *testing.T) { - mc := otelcfg.MetricsConfig{ - Features: export.FeatureApplication, - } mr := MetricsReporter{ - cfg: &mc, + cfg: &otelcfg.MetricsConfig{}, + meterProvider: &decfg.MeterProvider{Features: export.FeatureApplicationRED}, } svcNoExport := svc.Attrs{} @@ -367,11 +366,9 @@ func TestMetricsDiscarded(t *testing.T) { } func TestSpanMetricsDiscarded(t *testing.T) { - mc := otelcfg.MetricsConfig{ - Features: export.FeatureSpan, - } mr := MetricsReporter{ - cfg: &mc, + cfg: &otelcfg.MetricsConfig{}, + meterProvider: &decfg.MeterProvider{Features: export.FeatureSpanLegacy}, } svcNoExport := svc.Attrs{} @@ -412,11 +409,9 @@ func TestSpanMetricsDiscarded(t *testing.T) { } func TestSpanMetricsDiscardedGraph(t *testing.T) { - mc := otelcfg.MetricsConfig{ - Features: export.FeatureGraph, - } mr := MetricsReporter{ - cfg: &mc, + cfg: &otelcfg.MetricsConfig{}, + meterProvider: &decfg.MeterProvider{Features: export.FeatureSpanLegacy}, } svcNoExport := svc.Attrs{} @@ -457,12 +452,10 @@ func TestSpanMetricsDiscardedGraph(t *testing.T) { } func TestProcessPIDEvents(t *testing.T) { - mc := otelcfg.MetricsConfig{ - Features: export.FeatureApplication, - } mr := MetricsReporter{ - cfg: &mc, - pidTracker: NewPidServiceTracker(), + cfg: &otelcfg.MetricsConfig{}, + meterProvider: &decfg.MeterProvider{Features: export.FeatureApplicationRED}, + pidTracker: NewPidServiceTracker(), } svcA := svc.Attrs{ @@ -545,7 +538,6 @@ func makeMetricsReporter( Interval: 50 * time.Millisecond, CommonEndpoint: otlp.ServerEndpoint, MetricsProtocol: otelcfg.ProtocolHTTPProtobuf, - Features: features, TTL: 30 * time.Minute, ReportersCacheLen: 100, Instrumentations: instrumentations, @@ -553,7 +545,7 @@ func makeMetricsReporter( mr, err := newMetricsReporter( ctx, &global.ContextInfo{OTELMetricsExporter: &otelcfg.MetricsExporterInstancer{Cfg: mcfg}}, - mcfg, + mcfg, &decfg.MeterProvider{Features: features}, &attributes.SelectorConfig{ SelectionCfg: attributes.Selection{ attributes.HTTPServerDuration.Section: attributes.InclusionLists{ @@ -580,7 +572,7 @@ func TestAppMetrics_TracesHostInfo(t *testing.T) { metrics := msg.NewQueue[[]request.Span](msg.ChannelBufferLen(20)) processEvents := msg.NewQueue[exec.ProcessEvent](msg.ChannelBufferLen(20)) - mr := makeMetricsReporter(ctx, t, []instrumentations.Instrumentation{instrumentations.InstrumentationHTTP}, export.FeatureApplication|export.FeatureApplicationHost, otlp, metrics, processEvents) + mr := makeMetricsReporter(ctx, t, []instrumentations.Instrumentation{instrumentations.InstrumentationHTTP}, export.FeatureApplicationRED|export.FeatureApplicationHost, otlp, metrics, processEvents) otelExporter := mr.reportMetrics go otelExporter(ctx) @@ -1109,6 +1101,7 @@ func TestHandleProcessEventCreated(t *testing.T) { reporter := &MetricsReporter{ cfg: &otelcfg.MetricsConfig{}, log: slog.Default(), + meterProvider: &decfg.MeterProvider{Features: export.FeatureApplicationRED}, targetMetrics: make(map[svc.UID]*TargetMetrics), pidTracker: NewPidServiceTracker(), createEventMetrics: mockEventsStore.createEventMetrics, @@ -1153,6 +1146,7 @@ func TestHandleProcessEventCreated_EdgeCases(t *testing.T) { reporter := &MetricsReporter{ cfg: &otelcfg.MetricsConfig{}, log: slog.Default(), + meterProvider: &decfg.MeterProvider{Features: export.FeatureProcess}, targetMetrics: make(map[svc.UID]*TargetMetrics), pidTracker: NewPidServiceTracker(), createEventMetrics: mockEventsStore.createEventMetrics, @@ -1187,6 +1181,7 @@ func TestHandleProcessEventCreated_EdgeCases(t *testing.T) { reporter := &MetricsReporter{ cfg: &otelcfg.MetricsConfig{}, log: slog.Default(), + meterProvider: &decfg.MeterProvider{Features: export.FeatureProcess}, targetMetrics: make(map[svc.UID]*TargetMetrics), pidTracker: NewPidServiceTracker(), createEventMetrics: mockEventsStore.createEventMetrics, diff --git a/pkg/export/otel/otelcfg/config_metrics.go b/pkg/export/otel/otelcfg/config_metrics.go index e065db1a3..639e888e9 100644 --- a/pkg/export/otel/otelcfg/config_metrics.go +++ b/pkg/export/otel/otelcfg/config_metrics.go @@ -47,7 +47,8 @@ type MetricsConfig struct { // Features of metrics that can be exported. Accepted values: application, network, application_process, // application_span, application_service_graph, ... // envDefault is provided to avoid breaking changes - Features export.Features `yaml:"features" env:"OTEL_EBPF_METRICS_FEATURES,expand" envDefault:"${OTEL_EBPF_METRIC_FEATURES}" envSeparator:","` + // Deprecated: use top-level MeterProvider.Features instead. + DeprFeatures export.Features `yaml:"features"` // Allows configuration of which instrumentations should be enabled, e.g. http, grpc, sql... Instrumentations []instrumentations.Instrumentation `yaml:"instrumentations" env:"OTEL_EBPF_METRICS_INSTRUMENTATIONS" envSeparator:","` @@ -128,50 +129,6 @@ func (m *MetricsConfig) EndpointEnabled() bool { return ep != "" } -func (m *MetricsConfig) AnySpanMetricsEnabled() bool { - return m.SpanMetricsEnabled() || m.SpanMetricsSizesEnabled() || m.ServiceGraphMetricsEnabled() -} - -func (m *MetricsConfig) SpanMetricsSizesEnabled() bool { - return m.Features.Has(export.FeatureSpanSizes) -} - -func (m *MetricsConfig) SpanMetricsEnabled() bool { - return m.Features.Any(export.FeatureSpan | export.FeatureSpanOTel) -} - -func (m *MetricsConfig) InvalidSpanMetricsConfig() bool { - return m.Features.Has(export.FeatureSpan | export.FeatureSpanOTel) -} - -func (m *MetricsConfig) HostMetricsEnabled() bool { - return m.Features.Has(export.FeatureApplicationHost) -} - -func (m *MetricsConfig) ServiceGraphMetricsEnabled() bool { - return m.Features.Has(export.FeatureGraph) -} - -func (m *MetricsConfig) OTelMetricsEnabled() bool { - return m.Features.Has(export.FeatureApplication) -} - -func (m *MetricsConfig) NetworkMetricsEnabled() bool { - return m.NetworkFlowBytesEnabled() || m.NetworkInterzoneMetricsEnabled() -} - -func (m *MetricsConfig) NetworkFlowBytesEnabled() bool { - return m.Features.Has(export.FeatureNetwork) -} - -func (m *MetricsConfig) NetworkInterzoneMetricsEnabled() bool { - return m.Features.Has(export.FeatureNetworkInterZone) -} - -func (m *MetricsConfig) Enabled() bool { - return m.EndpointEnabled() && (m.OTelMetricsEnabled() || m.AnySpanMetricsEnabled() || m.NetworkMetricsEnabled()) -} - func httpMetricEndpointOptions(cfg *MetricsConfig) (OTLPOptions, error) { opts := OTLPOptions{Headers: map[string]string{}} log := mlog().With("transport", "http") diff --git a/pkg/export/otel/otelcfg/config_metrics_test.go b/pkg/export/otel/otelcfg/config_metrics_test.go index 50b28ee90..5df2af12c 100644 --- a/pkg/export/otel/otelcfg/config_metrics_test.go +++ b/pkg/export/otel/otelcfg/config_metrics_test.go @@ -11,7 +11,6 @@ import ( "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" - "go.opentelemetry.io/obi/pkg/export" "go.opentelemetry.io/obi/pkg/export/instrumentations" ) @@ -208,33 +207,6 @@ func TestMetricSetupHTTP_DoNotOverrideEnv(t *testing.T) { }) } -func TestMetricsConfig_Enabled(t *testing.T) { - assert.True(t, (&MetricsConfig{Features: export.FeatureApplication | export.FeatureNetwork, CommonEndpoint: "foo"}).Enabled()) - assert.True(t, (&MetricsConfig{Features: export.FeatureApplication, MetricsEndpoint: "foo"}).Enabled()) - assert.True(t, (&MetricsConfig{MetricsEndpoint: "foo", Features: export.FeatureNetwork}).Enabled()) - assert.True(t, (&MetricsConfig{ - Features: export.FeatureNetwork, - OTLPEndpointProvider: func() (string, bool) { return "something", false }, - }).Enabled()) - assert.True(t, (&MetricsConfig{ - Features: export.FeatureNetwork, - OTLPEndpointProvider: func() (string, bool) { return "something", true }, - }).Enabled()) -} - -func TestMetricsConfig_Disabled(t *testing.T) { - assert.False(t, (&MetricsConfig{Features: export.FeatureApplication}).Enabled()) - assert.False(t, (&MetricsConfig{Features: export.FeatureNetwork | export.FeatureApplication}).Enabled()) - assert.False(t, (&MetricsConfig{Features: export.FeatureNetwork}).Enabled()) - // application feature is not enabled - assert.False(t, (&MetricsConfig{CommonEndpoint: "foo"}).Enabled()) - assert.False(t, (&MetricsConfig{}).Enabled()) - assert.False(t, (&MetricsConfig{ - Features: export.FeatureApplication, - OTLPEndpointProvider: func() (string, bool) { return "", false }, - }).Enabled()) -} - func TestMetricsInterval(t *testing.T) { cfg := MetricsConfig{ OTELIntervalMS: 60_000, diff --git a/pkg/export/otel/traces_test.go b/pkg/export/otel/traces_test.go index 1e4b6d281..93283d1c8 100644 --- a/pkg/export/otel/traces_test.go +++ b/pkg/export/otel/traces_test.go @@ -26,7 +26,6 @@ import ( "go.opentelemetry.io/obi/pkg/appolly/app/request" "go.opentelemetry.io/obi/pkg/appolly/app/svc" - "go.opentelemetry.io/obi/pkg/export" "go.opentelemetry.io/obi/pkg/export/attributes" attr "go.opentelemetry.io/obi/pkg/export/attributes/names" "go.opentelemetry.io/obi/pkg/export/instrumentations" @@ -565,11 +564,7 @@ func TestTraceSkipSpanMetrics(t *testing.T) { } t.Run("test with span metrics on", func(t *testing.T) { - mc := otelcfg.MetricsConfig{ - Features: export.FeatureSpan, - } - - receiver := makeTracesTestReceiverWithSpanMetrics(mc.AnySpanMetricsEnabled(), []instrumentations.Instrumentation{instrumentations.InstrumentationHTTP}) + receiver := makeTracesTestReceiverWithSpanMetrics(true, []instrumentations.Instrumentation{instrumentations.InstrumentationHTTP}) sampler := sdktrace.AlwaysSample() attrs, err := receiver.getConstantAttributes() diff --git a/pkg/export/prom/prom.go b/pkg/export/prom/prom.go index 20c50afa9..dc5958ea8 100644 --- a/pkg/export/prom/prom.go +++ b/pkg/export/prom/prom.go @@ -25,6 +25,7 @@ import ( "go.opentelemetry.io/obi/pkg/export/expire" "go.opentelemetry.io/obi/pkg/export/instrumentations" "go.opentelemetry.io/obi/pkg/export/otel" + "go.opentelemetry.io/obi/pkg/export/otel/decfg" "go.opentelemetry.io/obi/pkg/pipe/global" "go.opentelemetry.io/obi/pkg/pipe/msg" "go.opentelemetry.io/obi/pkg/pipe/swarm" @@ -115,7 +116,9 @@ type PrometheusConfig struct { // Features of metrics that can be exported. Accepted values: application, network, application_process, // application_span, application_service_graph, ... - Features export.Features `yaml:"features" env:"OTEL_EBPF_PROMETHEUS_FEATURES" envSeparator:","` + // Deprecated: use top-level MeterProvider.Features instead. + DeprFeatures export.Features `yaml:"features" env:"OTEL_EBPF_PROMETHEUS_FEATURES" envSeparator:","` + // Allows configuration of which instrumentations should be enabled, e.g. http, grpc, sql... Instrumentations []instrumentations.Instrumentation `yaml:"instrumentations" env:"OTEL_EBPF_PROMETHEUS_INSTRUMENTATIONS" envSeparator:","` @@ -147,61 +150,13 @@ func mlog() *slog.Logger { return slog.With("component", "prom.MetricsReporter") } -func (p *PrometheusConfig) AnySpanMetricsEnabled() bool { - return p.SpanMetricsEnabled() || p.SpanMetricsSizesEnabled() || p.ServiceGraphMetricsEnabled() -} - -func (p *PrometheusConfig) SpanMetricsSizesEnabled() bool { - return p.Features.Has(export.FeatureSpanSizes) -} - -func (p *PrometheusConfig) SpanMetricsEnabled() bool { - return p.Features.Any(export.FeatureSpan | export.FeatureSpanOTel) -} - -func (p *PrometheusConfig) InvalidSpanMetricsConfig() bool { - return p.Features.Has(export.FeatureSpan | export.FeatureSpanOTel) -} - -func (p *PrometheusConfig) HostMetricsEnabled() bool { - return p.Features.Has(export.FeatureApplicationHost) -} - -func (p *PrometheusConfig) OTelMetricsEnabled() bool { - return p.Features.Has(export.FeatureApplication) -} - -func (p *PrometheusConfig) ServiceGraphMetricsEnabled() bool { - return p.Features.Has(export.FeatureGraph) -} - -func (p *PrometheusConfig) NetworkMetricsEnabled() bool { - return p.NetworkFlowBytesEnabled() || p.NetworkInterzoneMetricsEnabled() -} - -func (p *PrometheusConfig) NetworkFlowBytesEnabled() bool { - return p.Features.Has(export.FeatureNetwork) -} - -func (p *PrometheusConfig) NetworkInterzoneMetricsEnabled() bool { - return p.Features.Has(export.FeatureNetworkInterZone) -} - -func (p *PrometheusConfig) EBPFEnabled() bool { - return p.Features.Has(export.FeatureEBPF) -} - func (p *PrometheusConfig) EndpointEnabled() bool { return p.Port != 0 || p.Registry != nil } -// Enabled returns whether the node needs to be activated -func (p *PrometheusConfig) Enabled() bool { - return p.EndpointEnabled() && (p.OTelMetricsEnabled() || p.AnySpanMetricsEnabled() || p.NetworkMetricsEnabled()) -} - type metricsReporter struct { cfg *PrometheusConfig + meterProvider *decfg.MeterProvider extraMetadataLabels []attr.Name extraSpanMetadataLabels []attr.Name @@ -287,16 +242,17 @@ type metricsReporter struct { func PrometheusEndpoint( ctxInfo *global.ContextInfo, cfg *PrometheusConfig, + mcfg *decfg.MeterProvider, selectorCfg *attributes.SelectorConfig, unresolved request.UnresolvedNames, input *msg.Queue[[]request.Span], processEventCh *msg.Queue[exec.ProcessEvent], ) swarm.InstanceFunc { return func(_ context.Context) (swarm.RunFunc, error) { - if !cfg.Enabled() { + if !cfg.EndpointEnabled() || !mcfg.Features.AppOrSpan() { return swarm.EmptyRunFunc() } - reporter, err := newReporter(ctxInfo, cfg, selectorCfg, unresolved, input, processEventCh) + reporter, err := newReporter(ctxInfo, cfg, mcfg, selectorCfg, unresolved, input, processEventCh) if err != nil { return nil, fmt.Errorf("instantiating Prometheus endpoint: %w", err) } @@ -307,19 +263,17 @@ func PrometheusEndpoint( } } -func (p *PrometheusConfig) spanMetricsLatencyName() string { - if p.Features.Has(export.FeatureSpan) { +func spanMetricsLatencyName(mp *decfg.MeterProvider) string { + if mp.Features.LegacySpanMetrics() { return SpanMetricsLatency } - return SpanMetricsLatencyOTel } -func (p *PrometheusConfig) spanMetricsCallsName() string { - if p.Features.Has(export.FeatureSpan) { +func spanMetricsCallsName(mp *decfg.MeterProvider) string { + if mp.Features.LegacySpanMetrics() { return SpanMetricsCalls } - return SpanMetricsCallsOTel } @@ -327,6 +281,7 @@ func (p *PrometheusConfig) spanMetricsCallsName() string { func newReporter( ctxInfo *global.ContextInfo, cfg *PrometheusConfig, + meterProvider *decfg.MeterProvider, selectorCfg *attributes.SelectorConfig, unresolved request.UnresolvedNames, input *msg.Queue[[]request.Span], @@ -412,7 +367,7 @@ func newReporter( attrsProvider.For(attributes.DNSLookupDuration)) } - if cfg.ServiceGraphMetricsEnabled() { + if meterProvider.Features.ServiceGraph() { attrSvcGraph = attributes.PrometheusGetters(attributeGetters, []attr.Name{attr.Client, attr.ClientNamespace, attr.Server, attr.ServerNamespace, attr.Source}) } @@ -429,6 +384,7 @@ func newReporter( pidsTracker: otel.NewPidServiceTracker(), ctxInfo: ctxInfo, cfg: cfg, + meterProvider: meterProvider, kubeEnabled: kubeEnabled, extraMetadataLabels: extraMetadataLabels, extraSpanMetadataLabels: extraSpanMetadataLabels, @@ -577,9 +533,9 @@ func newReporter( NativeHistogramMinResetDuration: defaultHistogramMinResetDuration, }, labelNames(attrHTTPClientResponseSize)).MetricVec, clock.Time, cfg.TTL) }), - spanMetricsLatency: optionalHistogramProvider(cfg.SpanMetricsEnabled(), func() *Expirer[prometheus.Histogram] { + spanMetricsLatency: optionalHistogramProvider(meterProvider.Features.SpanMetrics(), func() *Expirer[prometheus.Histogram] { return NewExpirer[prometheus.Histogram](prometheus.NewHistogramVec(prometheus.HistogramOpts{ - Name: cfg.spanMetricsLatencyName(), + Name: spanMetricsLatencyName(meterProvider), Help: "duration of service calls (client and server), in seconds, in trace span metrics format", Buckets: cfg.Buckets.DurationHistogram, NativeHistogramBucketFactor: defaultHistogramBucketFactor, @@ -587,37 +543,37 @@ func newReporter( NativeHistogramMinResetDuration: defaultHistogramMinResetDuration, }, labelNamesSpans(extraSpanMetadataLabels)).MetricVec, clock.Time, cfg.TTL) }), - spanMetricsCallsTotal: optionalCounterProvider(cfg.SpanMetricsEnabled(), func() *Expirer[prometheus.Counter] { + spanMetricsCallsTotal: optionalCounterProvider(meterProvider.Features.SpanMetrics(), func() *Expirer[prometheus.Counter] { return NewExpirer[prometheus.Counter](prometheus.NewCounterVec(prometheus.CounterOpts{ - Name: cfg.spanMetricsCallsName(), + Name: spanMetricsCallsName(meterProvider), Help: "number of service calls in trace span metrics format", }, labelNamesSpans(extraSpanMetadataLabels)).MetricVec, clock.Time, cfg.TTL) }), - spanMetricsRequestSizeTotal: optionalCounterProvider(cfg.SpanMetricsSizesEnabled(), func() *Expirer[prometheus.Counter] { + spanMetricsRequestSizeTotal: optionalCounterProvider(meterProvider.Features.SpanSizes(), func() *Expirer[prometheus.Counter] { return NewExpirer[prometheus.Counter](prometheus.NewCounterVec(prometheus.CounterOpts{ Name: SpanMetricsRequestSizes, Help: "size of service calls, in bytes, in trace span metrics format", }, labelNamesSpans(extraSpanMetadataLabels)).MetricVec, clock.Time, cfg.TTL) }), - spanMetricsResponseSizeTotal: optionalCounterProvider(cfg.SpanMetricsSizesEnabled(), func() *Expirer[prometheus.Counter] { + spanMetricsResponseSizeTotal: optionalCounterProvider(meterProvider.Features.SpanSizes(), func() *Expirer[prometheus.Counter] { return NewExpirer[prometheus.Counter](prometheus.NewCounterVec(prometheus.CounterOpts{ Name: SpanMetricsResponseSizes, Help: "size of service responses, in bytes, in trace span metrics format", }, labelNamesSpans(extraSpanMetadataLabels)).MetricVec, clock.Time, cfg.TTL) }), - tracesTargetInfo: optionalDirectGaugeProvider(cfg.AnySpanMetricsEnabled(), func() *prometheus.GaugeVec { + tracesTargetInfo: optionalDirectGaugeProvider(meterProvider.Features.AnySpanMetrics(), func() *prometheus.GaugeVec { return prometheus.NewGaugeVec(prometheus.GaugeOpts{ Name: TracesTargetInfo, Help: "target service information in trace span metric format", }, labelNamesTargetInfo(kubeEnabled, extraMetadataLabels)) }), - tracesHostInfo: optionalGaugeProvider(cfg.HostMetricsEnabled(), func() *Expirer[prometheus.Gauge] { + tracesHostInfo: optionalGaugeProvider(meterProvider.Features.AppHost(), func() *Expirer[prometheus.Gauge] { return NewExpirer[prometheus.Gauge](prometheus.NewGaugeVec(prometheus.GaugeOpts{ Name: TracesHostInfo, Help: "A metric with a constant '1' value labeled by the host id ", }, hostInfoLabelNames).MetricVec, clock.Time, cfg.TTL) }), - serviceGraphClient: optionalHistogramProvider(cfg.ServiceGraphMetricsEnabled(), func() *Expirer[prometheus.Histogram] { + serviceGraphClient: optionalHistogramProvider(meterProvider.Features.ServiceGraph(), func() *Expirer[prometheus.Histogram] { return NewExpirer[prometheus.Histogram](prometheus.NewHistogramVec(prometheus.HistogramOpts{ Name: ServiceGraphClient, Help: "duration of client service calls, in seconds, in trace service graph metrics format", @@ -627,7 +583,7 @@ func newReporter( NativeHistogramMinResetDuration: defaultHistogramMinResetDuration, }, labelNames(attrSvcGraph)).MetricVec, clock.Time, cfg.TTL) }), - serviceGraphServer: optionalHistogramProvider(cfg.ServiceGraphMetricsEnabled(), func() *Expirer[prometheus.Histogram] { + serviceGraphServer: optionalHistogramProvider(meterProvider.Features.ServiceGraph(), func() *Expirer[prometheus.Histogram] { return NewExpirer[prometheus.Histogram](prometheus.NewHistogramVec(prometheus.HistogramOpts{ Name: ServiceGraphServer, Help: "duration of server service calls, in seconds, in trace service graph metrics format", @@ -637,13 +593,13 @@ func newReporter( NativeHistogramMinResetDuration: defaultHistogramMinResetDuration, }, labelNames(attrSvcGraph)).MetricVec, clock.Time, cfg.TTL) }), - serviceGraphFailed: optionalCounterProvider(cfg.ServiceGraphMetricsEnabled(), func() *Expirer[prometheus.Counter] { + serviceGraphFailed: optionalCounterProvider(meterProvider.Features.ServiceGraph(), func() *Expirer[prometheus.Counter] { return NewExpirer[prometheus.Counter](prometheus.NewCounterVec(prometheus.CounterOpts{ Name: ServiceGraphFailed, Help: "number of failed service calls in trace service graph metrics format", }, labelNames(attrSvcGraph)).MetricVec, clock.Time, cfg.TTL) }), - serviceGraphTotal: optionalCounterProvider(cfg.ServiceGraphMetricsEnabled(), func() *Expirer[prometheus.Counter] { + serviceGraphTotal: optionalCounterProvider(meterProvider.Features.ServiceGraph(), func() *Expirer[prometheus.Counter] { return NewExpirer[prometheus.Counter](prometheus.NewCounterVec(prometheus.CounterOpts{ Name: ServiceGraphTotal, Help: "number of service calls in trace service graph metrics format", @@ -717,7 +673,7 @@ func newReporter( registeredMetrics = append(registeredMetrics, mr.beylaInfo) } - if cfg.OTelMetricsEnabled() { + if meterProvider.Features.AppRED() { if is.HTTPEnabled() { registeredMetrics = append(registeredMetrics, mr.httpClientRequestSize, @@ -754,21 +710,21 @@ func newReporter( } } - if cfg.SpanMetricsEnabled() { + if meterProvider.Features.SpanMetrics() { registeredMetrics = append(registeredMetrics, mr.spanMetricsLatency, mr.spanMetricsCallsTotal, ) } - if cfg.SpanMetricsSizesEnabled() { + if meterProvider.Features.SpanSizes() { registeredMetrics = append(registeredMetrics, mr.spanMetricsRequestSizeTotal, mr.spanMetricsResponseSizeTotal, ) } - if cfg.ServiceGraphMetricsEnabled() { + if meterProvider.Features.ServiceGraph() { registeredMetrics = append(registeredMetrics, mr.serviceGraphClient, mr.serviceGraphServer, @@ -777,11 +733,11 @@ func newReporter( ) } - if cfg.AnySpanMetricsEnabled() { + if meterProvider.Features.AnySpanMetrics() { registeredMetrics = append(registeredMetrics, mr.tracesTargetInfo) } - if cfg.HostMetricsEnabled() { + if meterProvider.Features.AppHost() { registeredMetrics = append(registeredMetrics, mr.tracesHostInfo) } @@ -864,11 +820,11 @@ func (r *metricsReporter) collectMetrics(ctx context.Context) { } func (r *metricsReporter) otelMetricsObserved(span *request.Span) bool { - return r.cfg.OTelMetricsEnabled() && !span.Service.ExportsOTelMetrics() + return r.meterProvider.Features.AppRED() && !span.Service.ExportsOTelMetrics() } func (r *metricsReporter) otelSpanMetricsObserved(span *request.Span) bool { - return r.cfg.AnySpanMetricsEnabled() && !span.Service.ExportsOTelMetricsSpan() + return r.meterProvider.Features.AnySpanMetrics() && !span.Service.ExportsOTelMetricsSpan() } func (r *metricsReporter) otelSpanFiltered(span *request.Span) bool { @@ -885,7 +841,7 @@ func (r *metricsReporter) observe(span *request.Span) { } t := span.Timings() r.beylaInfo.WithLabelValues(span.Service.SDKLanguage.String()).Metric.Set(1.0) - if r.cfg.HostMetricsEnabled() { + if r.meterProvider.Features.AppHost() { r.tracesHostInfo.WithLabelValues(r.hostID).Metric.Set(1.0) } duration := t.End.Sub(t.RequestStart).Seconds() @@ -981,19 +937,19 @@ func (r *metricsReporter) observe(span *request.Span) { } if r.otelSpanMetricsObserved(span) { - if r.cfg.SpanMetricsEnabled() { + if r.meterProvider.Features.SpanMetrics() { lv := r.labelValuesSpans(span) r.spanMetricsLatency.WithLabelValues(lv...).Metric.Observe(duration) r.spanMetricsCallsTotal.WithLabelValues(lv...).Metric.Add(1) } - if r.cfg.SpanMetricsSizesEnabled() { + if r.meterProvider.Features.SpanSizes() { lv := r.labelValuesSpans(span) r.spanMetricsRequestSizeTotal.WithLabelValues(lv...).Metric.Add(float64(span.RequestBodyLength())) r.spanMetricsResponseSizeTotal.WithLabelValues(lv...).Metric.Add(float64(span.ResponseBodyLength())) } - if r.cfg.ServiceGraphMetricsEnabled() { + if r.meterProvider.Features.ServiceGraph() { if !span.IsSelfReferenceSpan() || r.cfg.AllowServiceGraphSelfReferences { lvg := labelValues(span, r.attrSvcGraph) @@ -1170,7 +1126,7 @@ func (r *metricsReporter) createTargetInfo(service *svc.Attrs) { } func (r *metricsReporter) createTracesTargetInfo(service *svc.Attrs) { - if !r.cfg.AnySpanMetricsEnabled() { + if !r.meterProvider.Features.AnySpanMetrics() { return } targetInfoLabelValues := r.labelValuesTargetInfo(service) @@ -1191,7 +1147,7 @@ func (r *metricsReporter) deleteTargetInfoMetric(service *svc.Attrs) { } func (r *metricsReporter) deleteTracesTargetInfoMetric(service *svc.Attrs) { - if !r.cfg.AnySpanMetricsEnabled() { + if !r.meterProvider.Features.AnySpanMetrics() { return } targetInfoLabelValues := r.labelValuesTargetInfo(service) @@ -1256,7 +1212,7 @@ func (r *metricsReporter) handleProcessEvent(pe exec.ProcessEvent, log *slog.Log if deleted, origUID := r.disassociatePIDFromService(pe.File.Pid); deleted { mlog().Debug("deleting infos for", "pid", pe.File.Pid, "attrs", pe.File.Service.UID) r.deleteTargetInfos(origUID, &pe.File.Service) - if r.cfg.HostMetricsEnabled() && r.pidsTracker.Count() == 0 { + if r.meterProvider.Features.AppHost() && r.pidsTracker.Count() == 0 { mlog().Debug("No more PIDs tracked, expiring host info metric") r.tracesHostInfo.entries.DeleteAll() } diff --git a/pkg/export/prom/prom_bpf.go b/pkg/export/prom/prom_bpf.go index b08c6fa5a..a955a4afb 100644 --- a/pkg/export/prom/prom_bpf.go +++ b/pkg/export/prom/prom_bpf.go @@ -16,6 +16,7 @@ import ( "go.opentelemetry.io/obi/pkg/export/connector" "go.opentelemetry.io/obi/pkg/export/imetrics" "go.opentelemetry.io/obi/pkg/export/otel" + "go.opentelemetry.io/obi/pkg/export/otel/decfg" "go.opentelemetry.io/obi/pkg/pipe/global" "go.opentelemetry.io/obi/pkg/pipe/swarm" ) @@ -23,6 +24,7 @@ import ( // BPFCollector implements prometheus.Collector for collecting metrics about currently loaded eBPF programs. type BPFCollector struct { promCfg *PrometheusConfig + meterProvider *decfg.MeterProvider internalMetrics imetrics.Reporter promConnect *connector.PrometheusManager ctxInfo *global.ContextInfo @@ -61,12 +63,13 @@ type BpfMapMetrics struct { func BPFMetrics( ctxInfo *global.ContextInfo, cfg *PrometheusConfig, + mpCfg *decfg.MeterProvider, ) swarm.InstanceFunc { return func(_ context.Context) (swarm.RunFunc, error) { - if !bpfCollectorEnabled(cfg, ctxInfo.Metrics) { + if !bpfCollectorEnabled(cfg, mpCfg, ctxInfo.Metrics) { return swarm.EmptyRunFunc() } - collector := newBPFCollector(ctxInfo, cfg) + collector := newBPFCollector(ctxInfo, cfg, mpCfg) return collector.start, nil } } @@ -76,17 +79,18 @@ func internalMetricsOTELEnabled(internalMetrics imetrics.Reporter) bool { return ok } -func promMetricsEnabled(cfg *PrometheusConfig) bool { - return cfg.EndpointEnabled() && cfg.EBPFEnabled() +func promMetricsEnabled(cfg *PrometheusConfig, mpCfg *decfg.MeterProvider) bool { + return cfg.EndpointEnabled() && mpCfg.Features.BPF() } -func bpfCollectorEnabled(cfg *PrometheusConfig, internalMetrics imetrics.Reporter) bool { - return promMetricsEnabled(cfg) || internalMetricsOTELEnabled(internalMetrics) +func bpfCollectorEnabled(cfg *PrometheusConfig, mpCfg *decfg.MeterProvider, internalMetrics imetrics.Reporter) bool { + return promMetricsEnabled(cfg, mpCfg) || internalMetricsOTELEnabled(internalMetrics) } -func newBPFCollector(ctxInfo *global.ContextInfo, cfg *PrometheusConfig) *BPFCollector { +func newBPFCollector(ctxInfo *global.ContextInfo, cfg *PrometheusConfig, mpCfg *decfg.MeterProvider) *BPFCollector { c := &BPFCollector{ promCfg: cfg, + meterProvider: mpCfg, internalMetrics: ctxInfo.Metrics, log: slog.With("component", "prom.BPFCollector"), ctxInfo: ctxInfo, @@ -105,7 +109,7 @@ func newBPFCollector(ctxInfo *global.ContextInfo, cfg *PrometheusConfig) *BPFCol nil, ), } - if promMetricsEnabled(cfg) { + if promMetricsEnabled(cfg, mpCfg) { // Register the collector c.promConnect.Register(cfg.Port, cfg.Path, c) } @@ -113,7 +117,7 @@ func newBPFCollector(ctxInfo *global.ContextInfo, cfg *PrometheusConfig) *BPFCol } func (bc *BPFCollector) start(ctx context.Context) { - if promMetricsEnabled(bc.promCfg) { + if promMetricsEnabled(bc.promCfg, bc.meterProvider) { bc.reportMetrics(ctx) } else { go bc.collectInternalMetrics(ctx) diff --git a/pkg/export/prom/prom_net.go b/pkg/export/prom/prom_net.go index ae8aad5b4..858cbe8b2 100644 --- a/pkg/export/prom/prom_net.go +++ b/pkg/export/prom/prom_net.go @@ -13,6 +13,7 @@ import ( "go.opentelemetry.io/obi/pkg/export/attributes" "go.opentelemetry.io/obi/pkg/export/connector" "go.opentelemetry.io/obi/pkg/export/expire" + "go.opentelemetry.io/obi/pkg/export/otel/decfg" "go.opentelemetry.io/obi/pkg/internal/netolly/ebpf" "go.opentelemetry.io/obi/pkg/pipe/global" "go.opentelemetry.io/obi/pkg/pipe/msg" @@ -23,15 +24,14 @@ import ( // NetPrometheusConfig for network metrics just wraps the global prom.NetPrometheusConfig as provided by the user type NetPrometheusConfig struct { - Config *PrometheusConfig - SelectorCfg *attributes.SelectorConfig - // Deprecated: to be removed in Beyla 3.0 with OTEL_EBPF_NETWORK_METRICS bool flag - GloballyEnabled bool + Config *PrometheusConfig + SelectorCfg *attributes.SelectorConfig + MeterProvider *decfg.MeterProvider } // Enabled returns whether the node needs to be activated func (p NetPrometheusConfig) Enabled() bool { - return p.Config != nil && p.Config.EndpointEnabled() && (p.Config.NetworkMetricsEnabled() || p.GloballyEnabled) + return p.Config != nil && p.Config.EndpointEnabled() && (p.MeterProvider.Features.AnyNetwork()) } type netMetricsReporter struct { @@ -97,7 +97,7 @@ func newNetReporter( var register []prometheus.Collector log := slog.With("component", "prom.NetworkEndpoint") - if cfg.GloballyEnabled || mr.cfg.NetworkFlowBytesEnabled() { + if cfg.MeterProvider.Features.NetworkBytes() { log.Debug("registering network flow bytes metric") mr.flowAttrs = attributes.PrometheusGetters( ebpf.RecordStringGetters, @@ -110,7 +110,7 @@ func newNetReporter( register = append(register, mr.flowBytes) } - if mr.cfg.NetworkInterzoneMetricsEnabled() { + if cfg.MeterProvider.Features.NetworkInterZone() { log.Debug("registering network inter-zone metric") mr.interZoneAttrs = attributes.PrometheusGetters( ebpf.RecordStringGetters, diff --git a/pkg/export/prom/prom_net_test.go b/pkg/export/prom/prom_net_test.go index b4b6c2a89..fec8f818d 100644 --- a/pkg/export/prom/prom_net_test.go +++ b/pkg/export/prom/prom_net_test.go @@ -15,11 +15,14 @@ import ( "go.opentelemetry.io/obi/pkg/export" "go.opentelemetry.io/obi/pkg/export/attributes" "go.opentelemetry.io/obi/pkg/export/connector" + "go.opentelemetry.io/obi/pkg/export/otel/decfg" "go.opentelemetry.io/obi/pkg/internal/netolly/ebpf" "go.opentelemetry.io/obi/pkg/pipe/global" "go.opentelemetry.io/obi/pkg/pipe/msg" ) +var mpConfig = &decfg.MeterProvider{Features: export.FeatureNetwork | export.FeatureNetworkInterZone} + func TestMetricsExpiration(t *testing.T) { t.Skip("fails regularly with port already in use or data race condition") now := syncedClock{now: time.Now()} @@ -35,19 +38,22 @@ func TestMetricsExpiration(t *testing.T) { metrics := msg.NewQueue[[]*ebpf.Record](msg.ChannelBufferLen(20)) exporter, err := NetPrometheusEndpoint( &global.ContextInfo{Prometheus: &connector.PrometheusManager{}}, - &NetPrometheusConfig{Config: &PrometheusConfig{ - Port: openPort, - Path: "/metrics", - TTL: 3 * time.Minute, - SpanMetricsServiceCacheSize: 10, - Features: export.FeatureNetwork, - }, SelectorCfg: &attributes.SelectorConfig{ - SelectionCfg: attributes.Selection{ - attributes.NetworkFlow.Section: attributes.InclusionLists{ - Include: []string{"src_name", "dst_name"}, + &NetPrometheusConfig{ + Config: &PrometheusConfig{ + Port: openPort, + Path: "/metrics", + TTL: 3 * time.Minute, + SpanMetricsServiceCacheSize: 10, + }, + SelectorCfg: &attributes.SelectorConfig{ + SelectionCfg: attributes.Selection{ + attributes.NetworkFlow.Section: attributes.InclusionLists{ + Include: []string{"src_name", "dst_name"}, + }, }, }, - }}, metrics)(ctx) + MeterProvider: mpConfig, + }, metrics)(ctx) require.NoError(t, err) go exporter(ctx) diff --git a/pkg/export/prom/prom_test.go b/pkg/export/prom/prom_test.go index 13bfbd59c..c79845009 100644 --- a/pkg/export/prom/prom_test.go +++ b/pkg/export/prom/prom_test.go @@ -31,6 +31,7 @@ import ( "go.opentelemetry.io/obi/pkg/export/connector" "go.opentelemetry.io/obi/pkg/export/instrumentations" "go.opentelemetry.io/obi/pkg/export/otel" + "go.opentelemetry.io/obi/pkg/export/otel/decfg" "go.opentelemetry.io/obi/pkg/pipe/global" "go.opentelemetry.io/obi/pkg/pipe/msg" "go.opentelemetry.io/obi/pkg/pipe/swarm" @@ -65,9 +66,9 @@ func TestAppMetricsExpiration(t *testing.T) { Path: "/metrics", TTL: 3 * time.Minute, SpanMetricsServiceCacheSize: 10, - Features: export.FeatureApplication | export.FeatureApplicationHost, Instrumentations: []instrumentations.Instrumentation{instrumentations.InstrumentationALL}, }, + &decfg.MeterProvider{Features: export.FeatureApplicationRED | export.FeatureApplicationHost}, &attributes.SelectorConfig{ SelectionCfg: attributes.Selection{ attributes.HTTPServerDuration.Section: attributes.InclusionLists{ @@ -389,11 +390,9 @@ func TestAppMetrics_ByInstrumentation(t *testing.T) { } func TestMetricsDiscarded(t *testing.T) { - mc := PrometheusConfig{ - Features: export.FeatureApplication, - } mr := metricsReporter{ - cfg: &mc, + cfg: &PrometheusConfig{}, + meterProvider: &decfg.MeterProvider{Features: export.FeatureApplicationRED}, } svcNoExport := svc.Attrs{} @@ -443,11 +442,9 @@ func TestMetricsDiscarded(t *testing.T) { } func TestSpanMetricsDiscarded(t *testing.T) { - mc := PrometheusConfig{ - Features: export.FeatureSpanOTel, - } mr := metricsReporter{ - cfg: &mc, + cfg: &PrometheusConfig{}, + meterProvider: &decfg.MeterProvider{Features: export.FeatureSpanOTel}, } svcNoExport := svc.Attrs{} @@ -489,11 +486,9 @@ func TestSpanMetricsDiscarded(t *testing.T) { } func TestSpanMetricsDiscardedGraph(t *testing.T) { - mc := PrometheusConfig{ - Features: export.FeatureGraph, - } mr := metricsReporter{ - cfg: &mc, + cfg: &PrometheusConfig{}, + meterProvider: &decfg.MeterProvider{Features: export.FeatureSpanLegacy}, } svcNoExport := svc.Attrs{} @@ -579,13 +574,11 @@ func TestTerminatesOnBadPromPort(t *testing.T) { } func TestProcessPIDEvents(t *testing.T) { - mc := PrometheusConfig{ - Features: export.FeatureApplication, - } mr := metricsReporter{ - cfg: &mc, - serviceMap: map[svc.UID]svc.Attrs{}, - pidsTracker: otel.NewPidServiceTracker(), + cfg: &PrometheusConfig{}, + meterProvider: &decfg.MeterProvider{Features: export.FeatureApplicationRED}, + serviceMap: map[svc.UID]svc.Attrs{}, + pidsTracker: otel.NewPidServiceTracker(), } svcA := svc.Attrs{ @@ -670,9 +663,9 @@ func makePromExporter( Path: "/metrics", TTL: 300 * time.Minute, SpanMetricsServiceCacheSize: 10, - Features: export.FeatureApplication, Instrumentations: instrumentations, }, + &decfg.MeterProvider{Features: export.FeatureApplicationRED}, &attributes.SelectorConfig{ SelectionCfg: attributes.Selection{ attributes.HTTPServerDuration.Section: attributes.InclusionLists{ diff --git a/pkg/instrumenter/instrumenter.go b/pkg/instrumenter/instrumenter.go index 27d4488a5..f7444fce8 100644 --- a/pkg/instrumenter/instrumenter.go +++ b/pkg/instrumenter/instrumenter.go @@ -31,8 +31,6 @@ func Run( ctx context.Context, cfg *obi.Config, opts ...Option, ) error { - normalizeConfig(cfg) - ctxInfo, err := buildCommonContextInfo(ctx, cfg) if err != nil { return fmt.Errorf("can't build common context info: %w", err) @@ -72,11 +70,6 @@ func Run( return nil } -// normalizeConfig normalizes user input to a common set of assumptions that are global to OBI -func normalizeConfig(cfg *obi.Config) { - cfg.Attributes.Select.Normalize() -} - func setupAppO11y(ctx context.Context, ctxInfo *global.ContextInfo, config *obi.Config) error { slog.Info("starting Application Observability mode") diff --git a/pkg/instrumenter/instrumenter_test_linux.go b/pkg/instrumenter/instrumenter_test_linux.go index 61681c981..1ade26c41 100644 --- a/pkg/instrumenter/instrumenter_test_linux.go +++ b/pkg/instrumenter/instrumenter_test_linux.go @@ -26,7 +26,7 @@ func TestRunDontPanic(t *testing.T) { description: "otel endpoint but feature excluded", configProvider: func() obi.Config { cfg := obi.DefaultConfig - cfg.Metrics.Features = export.FeatureApplication + cfg.MeterProvider.Features = export.FeatureApplicationRED cfg.NetworkFlows.Enable = true cfg.Metrics.CommonEndpoint = "http://localhost" return cfg @@ -35,7 +35,7 @@ func TestRunDontPanic(t *testing.T) { description: "prom endpoint but feature excluded", configProvider: func() obi.Config { cfg := obi.DefaultConfig - cfg.Prometheus.Features = export.FeatureApplication + cfg.MeterProvider.Features = export.FeatureApplicationRED cfg.NetworkFlows.Enable = true cfg.Prometheus.Port = 9090 return cfg @@ -44,7 +44,7 @@ func TestRunDontPanic(t *testing.T) { description: "otel endpoint, otel feature excluded, but prom enabled", configProvider: func() obi.Config { cfg := obi.DefaultConfig - cfg.Metrics.Features = export.FeatureApplication + cfg.MeterProvider.Features = export.FeatureApplicationRED cfg.NetworkFlows.Enable = true cfg.Metrics.CommonEndpoint = "http://localhost" cfg.Prometheus.Port = 9090 @@ -56,9 +56,8 @@ func TestRunDontPanic(t *testing.T) { cfg := obi.DefaultConfig cfg.NetworkFlows.Enable = true cfg.Prometheus.Port = 9090 - cfg.Prometheus.Features = export.FeatureApplication cfg.Metrics.CommonEndpoint = "http://localhost" - cfg.Metrics.Features = export.FeatureApplication + cfg.MeterProvider.Features = export.FeatureApplicationRED return cfg }, }} diff --git a/pkg/internal/otelsdk/sdk_inject.go b/pkg/internal/otelsdk/sdk_inject.go index 8577f0b73..dc9aad259 100644 --- a/pkg/internal/otelsdk/sdk_inject.go +++ b/pkg/internal/otelsdk/sdk_inject.go @@ -68,7 +68,7 @@ func (i *SDKInjector) findTempDir(root string, ie *ebpf.Instrumentable) (string, } func (i *SDKInjector) Enabled() bool { - return i.cfg.EBPF.UseOTelSDKForJava && (i.cfg.Traces.Enabled() || i.cfg.Metrics.Enabled()) + return i.cfg.EBPF.UseOTelSDKForJava && (i.cfg.Traces.Enabled() || i.cfg.Metrics.EndpointEnabled()) } func (i *SDKInjector) NewExecutable(ie *ebpf.Instrumentable) error { @@ -158,7 +158,7 @@ func otlpOptions(cfg *obi.Config) (map[string]string, error) { options["otel.traces.exporter"] = "none" } - if cfg.Metrics.Enabled() { + if cfg.Metrics.EndpointEnabled() { options["otel.exporter.otlp.metrics.endpoint"] = metricsEndpoint options["otel.exporter.otlp.metrics.protocol"] = string(cfg.Metrics.GetProtocol()) options["otel.metric.export.interval"] = strconv.Itoa(int(cfg.Metrics.GetInterval().Milliseconds())) diff --git a/pkg/netolly/agent/pipeline.go b/pkg/netolly/agent/pipeline.go index 3d33c9d8d..1615cfaa8 100644 --- a/pkg/netolly/agent/pipeline.go +++ b/pkg/netolly/agent/pipeline.go @@ -104,15 +104,15 @@ func (f *Flows) buildPipeline(ctx context.Context) (*swarm.Runner, error) { // Not all the nodes are mandatory here. Is the responsibility of each Provider function to decide // whether each node is going to be instantiated or just ignored. swi.Add(otel.NetMetricsExporterProvider(f.ctxInfo, &otel.NetMetricsConfig{ - Metrics: &f.cfg.Metrics, - SelectorCfg: selectorCfg, - GloballyEnabled: f.cfg.NetworkFlows.Enable, + Metrics: &f.cfg.Metrics, + SelectorCfg: selectorCfg, + MeterProvider: &f.cfg.MeterProvider, }, filteredFlows), swarm.WithID("OTelExporter")) swi.Add(prom.NetPrometheusEndpoint(f.ctxInfo, &prom.NetPrometheusConfig{ - Config: &f.cfg.Prometheus, - SelectorCfg: selectorCfg, - GloballyEnabled: f.cfg.NetworkFlows.Enable, + Config: &f.cfg.Prometheus, + SelectorCfg: selectorCfg, + MeterProvider: &f.cfg.MeterProvider, }, filteredFlows), swarm.WithID("PrometheusExporter")) swi.Add(swarm.DirectInstance(export.FlowPrinterProvider(f.cfg.NetworkFlows.Print, filteredFlows)), diff --git a/pkg/netolly/agent/pipeline_test.go b/pkg/netolly/agent/pipeline_test.go index 0e1e47d71..b42e638e4 100644 --- a/pkg/netolly/agent/pipeline_test.go +++ b/pkg/netolly/agent/pipeline_test.go @@ -18,6 +18,7 @@ import ( "go.opentelemetry.io/obi/pkg/export" "go.opentelemetry.io/obi/pkg/export/attributes" "go.opentelemetry.io/obi/pkg/export/connector" + "go.opentelemetry.io/obi/pkg/export/otel/decfg" "go.opentelemetry.io/obi/pkg/export/prom" "go.opentelemetry.io/obi/pkg/filter" "go.opentelemetry.io/obi/pkg/internal/netolly/ebpf" @@ -44,11 +45,11 @@ func TestFilter(t *testing.T) { }, cfg: &obi.Config{ Prometheus: prom.PrometheusConfig{ - Path: "/metrics", - Port: promPort, - Features: export.FeatureNetwork, - TTL: time.Hour, + Path: "/metrics", + Port: promPort, + TTL: time.Hour, }, + MeterProvider: decfg.MeterProvider{Features: export.FeatureNetwork}, Filters: filter.AttributesConfig{ Network: map[string]filter.MatchDefinition{"transport": {Match: "TCP"}}, }, diff --git a/pkg/obi/config.go b/pkg/obi/config.go index ec5063123..327e2265b 100644 --- a/pkg/obi/config.go +++ b/pkg/obi/config.go @@ -23,6 +23,7 @@ import ( "go.opentelemetry.io/obi/pkg/export/imetrics" "go.opentelemetry.io/obi/pkg/export/instrumentations" "go.opentelemetry.io/obi/pkg/export/otel" + "go.opentelemetry.io/obi/pkg/export/otel/decfg" "go.opentelemetry.io/obi/pkg/export/otel/otelcfg" "go.opentelemetry.io/obi/pkg/export/prom" "go.opentelemetry.io/obi/pkg/filter" @@ -122,6 +123,9 @@ var DefaultConfig = Config{ CacheLen: 1024, CacheTTL: 5 * time.Minute, }, + MeterProvider: decfg.MeterProvider{ + Features: export.FeatureApplicationRED, + }, Metrics: otelcfg.MetricsConfig{ Protocol: otelcfg.ProtocolUnset, MetricsProtocol: otelcfg.ProtocolUnset, @@ -130,7 +134,6 @@ var DefaultConfig = Config{ Buckets: export.DefaultBuckets, ReportersCacheLen: ReporterLRUSize, HistogramAggregation: otel.AggregationExplicit, - Features: export.FeatureApplication, Instrumentations: []instrumentations.Instrumentation{ instrumentations.InstrumentationALL, }, @@ -153,9 +156,8 @@ var DefaultConfig = Config{ }, }, Prometheus: prom.PrometheusConfig{ - Path: "/metrics", - Buckets: export.DefaultBuckets, - Features: export.FeatureApplication, + Path: "/metrics", + Buckets: export.DefaultBuckets, Instrumentations: []instrumentations.Instrumentation{ instrumentations.InstrumentationALL, }, @@ -266,6 +268,9 @@ type Config struct { // as this is a reminiscence of past times when we only supported one executable per instance. ServiceNamespace string `yaml:"service_namespace" env:"OTEL_EBPF_SERVICE_NAMESPACE"` + // MeterProvider is a placeholder for the progressive support of the OTEL declarative configuration. + MeterProvider decfg.MeterProvider `yaml:"meter_provider"` + // Discovery configuration Discovery services.DiscoveryConfig `yaml:"discovery"` @@ -363,10 +368,9 @@ func (c *Config) Validate() error { if !c.Enabled(FeatureNetO11y) && !c.Enabled(FeatureAppO11y) { return ConfigError("at least one of 'network' or 'application' features must be enabled. " + - "Enable OpenTelemetry export features using the 'OTEL_EBPF_METRIC_FEATURES=network,application' environment variable " + - "or 'otel_metrics_export: { features: [network,application] }' in the YAML configuration file. " + - "Enable Prometheus export features using the 'OTEL_EBPF_PROMETHEUS_FEATURES=network,application' environment variable " + - "or 'prometheus_export: { features: [network,application] }' in the YAML configuration file.") + "Enable an OpenTelemetry or Prometheus metrics export, then enable any of the network* or application*" + + "features using the 'OTEL_EBPF_METRICS_FEATURES=network,application' environment variable " + + "or 'meter_provicer: { features: [network,application] }' in the YAML configuration file. ") } if c.willUseTC() { @@ -379,8 +383,8 @@ func (c *Config) Validate() error { return ConfigError("OTEL_EBPF_KUBE_INFORMERS_SYNC_TIMEOUT duration must be greater than 0s") } - if c.Enabled(FeatureNetO11y) && !c.Metrics.Enabled() && - !c.Prometheus.Enabled() && !c.NetworkFlows.Print { + if c.Enabled(FeatureNetO11y) && !c.Metrics.EndpointEnabled() && + !c.Prometheus.EndpointEnabled() && !c.NetworkFlows.Print { return ConfigError("enabling network metrics requires to enable at least the OpenTelemetry" + " metrics exporter: otel_metrics_export or prometheus_export sections in the YAML configuration file; or the" + " OTEL_EXPORTER_OTLP_ENDPOINT, OTEL_EXPORTER_OTLP_METRICS_ENDPOINT or OTEL_EBPF_PROMETHEUS_PORT environment variables. For debugging" + @@ -392,15 +396,14 @@ func (c *Config) Validate() error { } if c.Enabled(FeatureAppO11y) && !c.TracePrinter.Enabled() && - !c.Metrics.Enabled() && !c.Traces.Enabled() && - !c.Prometheus.Enabled() && !c.TracePrinter.Enabled() { + !c.Metrics.EndpointEnabled() && !c.Traces.Enabled() && + !c.Prometheus.EndpointEnabled() && !c.TracePrinter.Enabled() { return ConfigError("you need to define at least one exporter: trace_printer," + " otel_metrics_export, otel_traces_export or prometheus_export") } if c.Enabled(FeatureAppO11y) && - ((c.Prometheus.Enabled() && c.Prometheus.InvalidSpanMetricsConfig()) || - (c.Metrics.Enabled() && c.Metrics.InvalidSpanMetricsConfig())) { + ((c.Prometheus.EndpointEnabled() || c.Metrics.EndpointEnabled()) && c.MeterProvider.Features.InvalidSpanMetricsConfig()) { return ConfigError("you can only enable one format of span metrics," + " application_span or application_span_otel") } @@ -412,7 +415,7 @@ func (c *Config) Validate() error { if c.InternalMetrics.Exporter == imetrics.InternalMetricsExporterOTEL && c.InternalMetrics.Prometheus.Port != 0 { return ConfigError("you can't enable both OTEL and Prometheus internal metrics") } - if c.InternalMetrics.Exporter == imetrics.InternalMetricsExporterOTEL && !c.Metrics.Enabled() { + if c.InternalMetrics.Exporter == imetrics.InternalMetricsExporterOTEL && !c.Metrics.EndpointEnabled() { return ConfigError("you can't enable OTEL internal metrics without enabling OTEL metrics") } @@ -420,11 +423,11 @@ func (c *Config) Validate() error { } func (c *Config) promNetO11yEnabled() bool { - return c.Prometheus.Enabled() && c.Prometheus.NetworkMetricsEnabled() + return c.Prometheus.EndpointEnabled() && c.MeterProvider.Features.AnyNetwork() } func (c *Config) otelNetO11yEnabled() bool { - return c.Metrics.Enabled() && c.Metrics.NetworkMetricsEnabled() + return c.Metrics.EndpointEnabled() && c.MeterProvider.Features.AnyNetwork() } func (c *Config) willUseTC() bool { @@ -445,10 +448,8 @@ func (c *Config) Enabled(feature Feature) bool { } func (c *Config) SpanMetricsEnabledForTraces() bool { - otelSpanMetricsEnabled := c.Metrics.Enabled() && c.Metrics.AnySpanMetricsEnabled() - promSpanMetricsEnabled := c.Prometheus.Enabled() && c.Prometheus.AnySpanMetricsEnabled() - - return otelSpanMetricsEnabled || promSpanMetricsEnabled + return c.MeterProvider.Features.AnySpanMetrics() && + (c.Metrics.EndpointEnabled() || c.Prometheus.EndpointEnabled()) } // ExternalLogger sets the logging capabilities of OBI. @@ -487,6 +488,8 @@ func LoadConfig(file io.Reader) (*Config, error) { return nil, fmt.Errorf("reading env vars: %w", err) } + cfg.normalize() + return &cfg, nil } @@ -498,3 +501,21 @@ func registerCustomValidations(validate *validator.Validate, customValidations C } return nil } + +// normalizeConfig normalizes user input to a common set of assumptions that are global to OBI +func (c *Config) normalize() { + c.Attributes.Select.Normalize() + // backwards compatibility assumptions for the deprecated Metric feature sections in OTEL and Prom metrics config. + // Old, deprecated properties would take precedence over meter_provider > features, to avoid breaking changes. + if c.Metrics.EndpointEnabled() && c.Metrics.DeprFeatures != 0 { + // if the user has overridden otel_metrics_export > features + c.MeterProvider.Features = c.Metrics.DeprFeatures + } else if c.Prometheus.EndpointEnabled() && c.Prometheus.DeprFeatures != 0 { + // if the user has overridden prometheus_export > features + c.MeterProvider.Features = c.Prometheus.DeprFeatures + } + // Deprecated: to be removed together with OTEL_EBPF_NETWORK_METRICS bool flag + if c.NetworkFlows.Enable { + c.MeterProvider.Features |= export.FeatureNetwork + } +} diff --git a/pkg/obi/config_test.go b/pkg/obi/config_test.go index 08ff6aacb..ad9bd1104 100644 --- a/pkg/obi/config_test.go +++ b/pkg/obi/config_test.go @@ -25,6 +25,7 @@ import ( "go.opentelemetry.io/obi/pkg/export/debug" "go.opentelemetry.io/obi/pkg/export/imetrics" "go.opentelemetry.io/obi/pkg/export/instrumentations" + "go.opentelemetry.io/obi/pkg/export/otel/decfg" "go.opentelemetry.io/obi/pkg/export/otel/otelcfg" "go.opentelemetry.io/obi/pkg/export/prom" "go.opentelemetry.io/obi/pkg/kube" @@ -144,6 +145,10 @@ discovery: KafkaTopicUUIDCacheSize: 1024, }, NetworkFlows: nc, + MeterProvider: decfg.MeterProvider{ + // after normalization, network feature is added from network > enable: true + Features: export.FeatureApplicationRED | export.FeatureNetwork, + }, Metrics: otelcfg.MetricsConfig{ OTELIntervalMS: 60_000, CommonEndpoint: "localhost:3131", @@ -155,7 +160,6 @@ discovery: RequestSizeHistogram: export.DefaultBuckets.RequestSizeHistogram, ResponseSizeHistogram: export.DefaultBuckets.ResponseSizeHistogram, }, - Features: export.FeatureApplication, Instrumentations: []instrumentations.Instrumentation{ instrumentations.InstrumentationALL, }, @@ -180,8 +184,7 @@ discovery: }, }, Prometheus: prom.PrometheusConfig{ - Path: "/metrics", - Features: export.FeatureApplication, + Path: "/metrics", Instrumentations: []instrumentations.Instrumentation{ instrumentations.InstrumentationALL, }, @@ -466,7 +469,7 @@ func TestConfig_NetworkImplicitProm(t *testing.T) { // OTEL_GO_AUTO_TARGET_EXE is an alias to OTEL_EBPF_EXECUTABLE_PATH // (Compatibility with OpenTelemetry) t.Setenv("OTEL_EBPF_PROMETHEUS_PORT", "9090") - t.Setenv("OTEL_EBPF_PROMETHEUS_FEATURES", "network") + t.Setenv("OTEL_EBPF_METRICS_FEATURES", "network") cfg, err := LoadConfig(bytes.NewReader(nil)) require.NoError(t, err) assert.True(t, cfg.Enabled(FeatureNetO11y)) // Net o11y should be on @@ -606,6 +609,7 @@ func TestConfig_SpanMetricsEnabledForTraces(t *testing.T) { name string metrics otelcfg.MetricsConfig prometheus prom.PrometheusConfig + mp decfg.MeterProvider wantEnabled bool }{ { @@ -618,46 +622,43 @@ func TestConfig_SpanMetricsEnabledForTraces(t *testing.T) { name: "otel metrics enabled, but not spans", metrics: otelcfg.MetricsConfig{ MetricsEndpoint: "http://localhost:4318/v1/metrics", - Features: export.FeatureApplication, }, prometheus: prom.PrometheusConfig{}, + mp: decfg.MeterProvider{Features: export.FeatureApplicationRED}, wantEnabled: false, }, { name: "otel metrics enabled with spans", metrics: otelcfg.MetricsConfig{ MetricsEndpoint: "http://localhost:4318/v1/metrics", - Features: export.FeatureSpanOTel, }, prometheus: prom.PrometheusConfig{}, + mp: decfg.MeterProvider{Features: export.FeatureSpanOTel}, wantEnabled: true, }, { name: "prometheus metrics enabled, but not spans", metrics: otelcfg.MetricsConfig{}, prometheus: prom.PrometheusConfig{ - Port: 9090, - Features: export.FeatureApplication, + Port: 9090, }, + mp: decfg.MeterProvider{Features: export.FeatureApplicationRED}, wantEnabled: false, }, { name: "prometheus span metrics enabled", metrics: otelcfg.MetricsConfig{}, prometheus: prom.PrometheusConfig{ - Features: export.FeatureGraph, - Port: 9090, + Port: 9090, }, + mp: decfg.MeterProvider{Features: export.FeatureSpanOTel}, wantEnabled: true, }, { - name: "both have features, but not enabled", - metrics: otelcfg.MetricsConfig{ - Features: export.FeatureApplication, - }, - prometheus: prom.PrometheusConfig{ - Features: export.FeatureGraph, - }, + name: "both have features, but not enabled", + metrics: otelcfg.MetricsConfig{}, + prometheus: prom.PrometheusConfig{}, + mp: decfg.MeterProvider{Features: export.FeatureApplicationRED | export.FeatureGraph}, wantEnabled: false, }, } @@ -665,8 +666,9 @@ func TestConfig_SpanMetricsEnabledForTraces(t *testing.T) { for _, tc := range tests { t.Run(tc.name, func(t *testing.T) { cfg := &Config{ - Metrics: tc.metrics, - Prometheus: tc.prometheus, + Metrics: tc.metrics, + Prometheus: tc.prometheus, + MeterProvider: tc.mp, } got := cfg.SpanMetricsEnabledForTraces() assert.Equal(t, tc.wantEnabled, got) @@ -682,3 +684,69 @@ func loadConfig(t *testing.T, env envMap) *Config { require.NoError(t, err) return cfg } + +func TestNormalizeConfig_MeterProvider(t *testing.T) { + type testCase struct { + name string + expected export.Features + cfg Config + } + testCases := []testCase{{ + name: "default global meter provider", + expected: export.FeatureApplicationRED, + cfg: Config{ + Metrics: otelcfg.MetricsConfig{DeprFeatures: export.FeatureEBPF}, + Prometheus: prom.PrometheusConfig{DeprFeatures: export.FeatureNetwork}, + MeterProvider: decfg.MeterProvider{Features: export.FeatureApplicationRED}, + }, + }, { + name: "OTEL endpoint and legacy features are defined", + expected: export.FeatureEBPF, + cfg: Config{ + Metrics: otelcfg.MetricsConfig{MetricsEndpoint: "http://foo", DeprFeatures: export.FeatureEBPF}, + Prometheus: prom.PrometheusConfig{DeprFeatures: export.FeatureNetwork}, + MeterProvider: decfg.MeterProvider{Features: export.FeatureApplicationRED}, + }, + }, { + name: "OTEL endpoint defined but legacy features are not", + expected: export.FeatureApplicationRED, + cfg: Config{ + Metrics: otelcfg.MetricsConfig{MetricsEndpoint: "http://foo"}, + Prometheus: prom.PrometheusConfig{DeprFeatures: export.FeatureNetwork}, + MeterProvider: decfg.MeterProvider{Features: export.FeatureApplicationRED}, + }, + }, { + name: "Prom endpoint and legacy features are defined", + expected: export.FeatureNetwork, + cfg: Config{ + Metrics: otelcfg.MetricsConfig{DeprFeatures: export.FeatureEBPF}, + Prometheus: prom.PrometheusConfig{Port: 8080, DeprFeatures: export.FeatureNetwork}, + MeterProvider: decfg.MeterProvider{Features: export.FeatureApplicationRED}, + }, + }, { + name: "Prom endpoint defined but legacy features are not", + expected: export.FeatureApplicationRED, + cfg: Config{ + Metrics: otelcfg.MetricsConfig{MetricsEndpoint: "http://foo"}, + Prometheus: prom.PrometheusConfig{Port: 8080}, + MeterProvider: decfg.MeterProvider{Features: export.FeatureApplicationRED}, + }, + }} + for _, tc := range testCases { + t.Run(tc.name, func(t *testing.T) { + cfg := tc.cfg + cfg.normalize() + assert.Equal(t, tc.expected, cfg.MeterProvider.Features) + }) + } +} + +func TestNormalizeConfig_Network(t *testing.T) { + obi := Config{ + NetworkFlows: NetworkConfig{Enable: true}, + MeterProvider: decfg.MeterProvider{Features: export.FeatureApplicationRED}, + } + obi.normalize() + assert.Equal(t, export.FeatureApplicationRED|export.FeatureNetwork, + obi.MeterProvider.Features) +} diff --git a/pkg/obi/network_cfg.go b/pkg/obi/network_cfg.go index 8cef63733..422e7f51f 100644 --- a/pkg/obi/network_cfg.go +++ b/pkg/obi/network_cfg.go @@ -44,7 +44,7 @@ const ( type NetworkConfig struct { // Enable network metrics. // Default value is false (disabled) - // Deprecated: add "network" to OTEL_EBPF_METRIC_FEATURES or OTEL_EBPF_PROMETHEUS_FEATURES + // Deprecated: add "network" or "network_inter_zone" to OTEL_EBPF_METRICS_FEATURES // TODO OBI 3.0: remove Enable bool `yaml:"enable" env:"OTEL_EBPF_NETWORK_METRICS" validate:"boolean"` diff --git a/pkg/transform/span_name_limiter.go b/pkg/transform/span_name_limiter.go index 8dbcbc23f..3bf3102b9 100644 --- a/pkg/transform/span_name_limiter.go +++ b/pkg/transform/span_name_limiter.go @@ -10,7 +10,7 @@ import ( "go.opentelemetry.io/obi/pkg/appolly/app/request" "go.opentelemetry.io/obi/pkg/appolly/app/svc" - "go.opentelemetry.io/obi/pkg/export" + "go.opentelemetry.io/obi/pkg/export/otel/decfg" "go.opentelemetry.io/obi/pkg/export/otel/otelcfg" "go.opentelemetry.io/obi/pkg/export/prom" "go.opentelemetry.io/obi/pkg/internal/helpers/cache" @@ -38,9 +38,10 @@ type routesCount struct { } type SpanNameLimiterConfig struct { - Limit int - OTEL *otelcfg.MetricsConfig - Prom *prom.PrometheusConfig + Limit int + MeterProvider *decfg.MeterProvider + OTEL *otelcfg.MetricsConfig + Prom *prom.PrometheusConfig } // SpanNameLimiter applies only to metrics. If span metrics are enabled and @@ -69,8 +70,7 @@ func SpanNameLimiter(cfg SpanNameLimiterConfig, input, output *msg.Queue[[]reque func enabled(cfg *SpanNameLimiterConfig) bool { return cfg.Limit > 0 && - (cfg.OTEL.Features.Has(export.FeatureSpan) || - cfg.Prom.Features.Has(export.FeatureSpan)) + cfg.MeterProvider.Features.SpanMetrics() } func (l *spanNameLimiter) doLimit(ctx context.Context) { diff --git a/pkg/transform/span_name_limiter_test.go b/pkg/transform/span_name_limiter_test.go index 7e6be149d..38f446602 100644 --- a/pkg/transform/span_name_limiter_test.go +++ b/pkg/transform/span_name_limiter_test.go @@ -17,6 +17,7 @@ import ( "go.opentelemetry.io/obi/pkg/appolly/app/request" "go.opentelemetry.io/obi/pkg/appolly/app/svc" "go.opentelemetry.io/obi/pkg/export" + "go.opentelemetry.io/obi/pkg/export/otel/decfg" "go.opentelemetry.io/obi/pkg/export/otel/otelcfg" "go.opentelemetry.io/obi/pkg/export/prom" "go.opentelemetry.io/obi/pkg/internal/testutil" @@ -31,9 +32,10 @@ func TestSpanNameLimiter(t *testing.T) { output := msg.NewQueue[[]request.Span](msg.ChannelBufferLen(10)) outCh := output.Subscribe() runSpanNameLimiter, err := SpanNameLimiter(SpanNameLimiterConfig{ - Limit: maxCardinalityBeforeAggregation, - OTEL: &otelcfg.MetricsConfig{Features: export.FeatureSpan, TTL: time.Minute}, - Prom: &prom.PrometheusConfig{Features: export.FeatureSpan, TTL: time.Minute}, + Limit: maxCardinalityBeforeAggregation, + OTEL: &otelcfg.MetricsConfig{TTL: time.Minute}, + Prom: &prom.PrometheusConfig{TTL: time.Minute}, + MeterProvider: &decfg.MeterProvider{Features: export.FeatureSpanLegacy}, }, input, output)(t.Context()) require.NoError(t, err) @@ -116,9 +118,10 @@ func TestSpanNameLimiter_ExpireOld(t *testing.T) { output := msg.NewQueue[[]request.Span](msg.ChannelBufferLen(10)) outCh := output.Subscribe() runSpanNameLimiter, err := SpanNameLimiter(SpanNameLimiterConfig{ - Limit: maxCardinalityBeforeAggregation, - OTEL: &otelcfg.MetricsConfig{Features: export.FeatureSpan, TTL: time.Minute}, - Prom: &prom.PrometheusConfig{Features: export.FeatureSpan, TTL: time.Minute}, + Limit: maxCardinalityBeforeAggregation, + OTEL: &otelcfg.MetricsConfig{TTL: time.Minute}, + Prom: &prom.PrometheusConfig{TTL: time.Minute}, + MeterProvider: &decfg.MeterProvider{Features: export.FeatureSpanLegacy}, }, input, output)(t.Context()) require.NoError(t, err) @@ -180,9 +183,10 @@ func TestSpanNameLimiter_CopiesOutput(t *testing.T) { output := msg.NewQueue[[]request.Span](msg.ChannelBufferLen(10)) outCh := output.Subscribe() runSpanNameLimiter, err := SpanNameLimiter(SpanNameLimiterConfig{ - Limit: 3, - OTEL: &otelcfg.MetricsConfig{Features: export.FeatureSpan, TTL: time.Minute}, - Prom: &prom.PrometheusConfig{Features: export.FeatureSpan, TTL: time.Minute}, + Limit: 3, + OTEL: &otelcfg.MetricsConfig{TTL: time.Minute}, + Prom: &prom.PrometheusConfig{TTL: time.Minute}, + MeterProvider: &decfg.MeterProvider{Features: export.FeatureSpanLegacy}, }, input, output)(t.Context()) require.NoError(t, err)