diff --git a/pkg/pipeline/encode/encode_prom.go b/pkg/pipeline/encode/encode_prom.go index e1b61f202..aee43a4f4 100644 --- a/pkg/pipeline/encode/encode_prom.go +++ b/pkg/pipeline/encode/encode_prom.go @@ -55,7 +55,7 @@ func (e *Prometheus) Encode(metricRecord config.GenericMap) { e.checkConfUpdate() } -func (e *Prometheus) ProcessCounter(m interface{}, labels map[string]string, value float64) error { +func (e *Prometheus) ProcessCounter(m interface{}, _ string, labels map[string]string, value float64) error { counter := m.(*prometheus.CounterVec) mm, err := counter.GetMetricWith(labels) if err != nil { @@ -75,7 +75,7 @@ func (e *Prometheus) ProcessGauge(m interface{}, _ string, labels map[string]str return nil } -func (e *Prometheus) ProcessHist(m interface{}, labels map[string]string, value float64) error { +func (e *Prometheus) ProcessHist(m interface{}, _ string, labels map[string]string, value float64) error { hist := m.(*prometheus.HistogramVec) mm, err := hist.GetMetricWith(labels) if err != nil { @@ -85,7 +85,7 @@ func (e *Prometheus) ProcessHist(m interface{}, labels map[string]string, value return nil } -func (e *Prometheus) ProcessAggHist(m interface{}, labels map[string]string, values []float64) error { +func (e *Prometheus) ProcessAggHist(m interface{}, _ string, labels map[string]string, values []float64) error { hist := m.(*prometheus.HistogramVec) mm, err := hist.GetMetricWith(labels) if err != nil { diff --git a/pkg/pipeline/encode/metrics_common.go b/pkg/pipeline/encode/metrics_common.go index 425fd6aa8..adaf5e512 100644 --- a/pkg/pipeline/encode/metrics_common.go +++ b/pkg/pipeline/encode/metrics_common.go @@ -51,10 +51,10 @@ type MetricsCommonStruct struct { type MetricsCommonInterface interface { GetCacheEntry(entryLabels map[string]string, m interface{}) interface{} - ProcessCounter(m interface{}, labels map[string]string, value float64) error + ProcessCounter(m interface{}, name string, labels map[string]string, value float64) error ProcessGauge(m interface{}, name string, labels map[string]string, value float64, lvs []string) error - ProcessHist(m interface{}, labels map[string]string, value float64) error - ProcessAggHist(m interface{}, labels map[string]string, value []float64) error + ProcessHist(m interface{}, name string, labels map[string]string, value float64) error + ProcessAggHist(m interface{}, name string, labels map[string]string, value []float64) error } var ( @@ -114,7 +114,7 @@ func (m *MetricsCommonStruct) MetricCommonEncode(mci MetricsCommonInterface, met continue } for _, labels := range labelSets { - err := mci.ProcessCounter(mInfo.genericMetric, labels.lMap, value) + err := mci.ProcessCounter(mInfo.genericMetric, mInfo.info.Name, labels.lMap, value) if err != nil { log.Errorf("labels registering error on %s: %v", mInfo.info.Name, err) m.errorsCounter.WithLabelValues("LabelsRegisteringError", mInfo.info.Name, "").Inc() @@ -148,7 +148,7 @@ func (m *MetricsCommonStruct) MetricCommonEncode(mci MetricsCommonInterface, met continue } for _, labels := range labelSets { - err := mci.ProcessHist(mInfo.genericMetric, labels.lMap, value) + err := mci.ProcessHist(mInfo.genericMetric, mInfo.info.Name, labels.lMap, value) if err != nil { log.Errorf("labels registering error on %s: %v", mInfo.info.Name, err) m.errorsCounter.WithLabelValues("LabelsRegisteringError", mInfo.info.Name, "").Inc() @@ -165,7 +165,7 @@ func (m *MetricsCommonStruct) MetricCommonEncode(mci MetricsCommonInterface, met continue } for _, labels := range labelSets { - err := mci.ProcessAggHist(mInfo.genericMetric, labels.lMap, values) + err := mci.ProcessAggHist(mInfo.genericMetric, mInfo.info.Name, labels.lMap, values) if err != nil { log.Errorf("labels registering error on %s: %v", mInfo.info.Name, err) m.errorsCounter.WithLabelValues("LabelsRegisteringError", mInfo.info.Name, "").Inc() diff --git a/pkg/pipeline/encode/opentelemetry/encode_otlp_test.go b/pkg/pipeline/encode/opentelemetry/encode_otlp_test.go index 12a2ce80f..26d95112c 100644 --- a/pkg/pipeline/encode/opentelemetry/encode_otlp_test.go +++ b/pkg/pipeline/encode/opentelemetry/encode_otlp_test.go @@ -175,27 +175,3 @@ func Test_EncodeOtlpTraces(t *testing.T) { require.NoError(t, err) require.NotNil(t, newEncode) } - -func Test_EncodeOtlpMetrics(t *testing.T) { - cfg := config.StageParam{ - Encode: &config.Encode{ - OtlpMetrics: &api.EncodeOtlpMetrics{ - OtlpConnectionInfo: &api.OtlpConnectionInfo{ - Address: "1.2.3.4", - Port: 999, - ConnectionType: "grpc", - Headers: nil, - }, - Prefix: "flp_test", - Metrics: []api.MetricsItem{ - {Name: "metric1", Type: "counter", Labels: []string{"label11", "label12"}}, - {Name: "metric2", Type: "gauge", Labels: []string{"label21", "label22"}}, - {Name: "metric3", Type: "counter", Labels: []string{"label31", "label32"}}, - }, - }}, - } - newEncode, err := NewEncodeOtlpMetrics(operational.NewMetrics(&config.MetricsSettings{}), cfg) - require.NoError(t, err) - require.NotNil(t, newEncode) - // TODO: add more tests -} diff --git a/pkg/pipeline/encode/opentelemetry/encode_otlpmetrics.go b/pkg/pipeline/encode/opentelemetry/encode_otlpmetrics.go index dac8fb322..7bfd7bfef 100644 --- a/pkg/pipeline/encode/opentelemetry/encode_otlpmetrics.go +++ b/pkg/pipeline/encode/opentelemetry/encode_otlpmetrics.go @@ -19,6 +19,7 @@ package opentelemetry import ( "context" + "fmt" "strings" "time" @@ -31,8 +32,6 @@ import ( "go.opentelemetry.io/otel" "go.opentelemetry.io/otel/attribute" "go.opentelemetry.io/otel/metric" - sdkmetric "go.opentelemetry.io/otel/sdk/metric" - "go.opentelemetry.io/otel/sdk/resource" ) const defaultExpiryTime = time.Duration(2 * time.Minute) @@ -41,9 +40,6 @@ const flpMeterName = "flp_meter" type EncodeOtlpMetrics struct { cfg api.EncodeOtlpMetrics ctx context.Context - res *resource.Resource - mp *sdkmetric.MeterProvider - meter metric.Meter metricCommon *encode.MetricsCommonStruct } @@ -57,12 +53,14 @@ func (e *EncodeOtlpMetrics) Encode(metricRecord config.GenericMap) { e.metricCommon.MetricCommonEncode(e, metricRecord) } -func (e *EncodeOtlpMetrics) ProcessCounter(m interface{}, labels map[string]string, value float64) error { - counter := m.(metric.Float64Counter) - // set attributes using the labels - attributes := obtainAttributesFromLabels(labels) - counter.Add(e.ctx, value, metric.WithAttributes(attributes...)) - return nil +func (e *EncodeOtlpMetrics) ProcessCounter(m interface{}, name string, labels map[string]string, value float64) error { + if counter, ok := m.(metric.Float64Counter); ok { + // set attributes using the labels + attributes := obtainAttributesFromLabels(labels) + counter.Add(e.ctx, value, metric.WithAttributes(attributes...)) + return nil + } + return fmt.Errorf("wrong Otlp Counter type for %s: %T; expecting Float64Counter", name, m) } func createKey(name string, keys []string) string { @@ -77,30 +75,36 @@ func createKey(name string, keys []string) string { } func (e *EncodeOtlpMetrics) ProcessGauge(m interface{}, name string, labels map[string]string, value float64, lvs []string) error { - obs := m.(Float64Gauge) - // set attributes using the labels - attributes := obtainAttributesFromLabels(labels) - key := createKey(name, lvs) - obs.Set(key, value, attributes) - return nil + if obs, ok := m.(Float64Gauge); ok { + // set attributes using the labels + attributes := obtainAttributesFromLabels(labels) + key := createKey(name, lvs) + obs.Set(key, value, attributes) + return nil + } + return fmt.Errorf("wrong Otlp Gauge type for %s: %T; expecting Float64Gauge", name, m) } -func (e *EncodeOtlpMetrics) ProcessHist(m interface{}, labels map[string]string, value float64) error { - histo := m.(metric.Float64Histogram) - // set attributes using the labels - attributes := obtainAttributesFromLabels(labels) - histo.Record(e.ctx, value, metric.WithAttributes(attributes...)) - return nil +func (e *EncodeOtlpMetrics) ProcessHist(m interface{}, name string, labels map[string]string, value float64) error { + if histo, ok := m.(metric.Float64Histogram); ok { + // set attributes using the labels + attributes := obtainAttributesFromLabels(labels) + histo.Record(e.ctx, value, metric.WithAttributes(attributes...)) + return nil + } + return fmt.Errorf("wrong Otlp Histogram type for %s: %T; expecting Float64Histogram", name, m) } -func (e *EncodeOtlpMetrics) ProcessAggHist(m interface{}, labels map[string]string, values []float64) error { - histo := m.(metric.Float64Histogram) - // set attributes using the labels - attributes := obtainAttributesFromLabels(labels) - for _, v := range values { - histo.Record(e.ctx, v, metric.WithAttributes(attributes...)) +func (e *EncodeOtlpMetrics) ProcessAggHist(m interface{}, name string, labels map[string]string, values []float64) error { + if histo, ok := m.(metric.Float64Histogram); ok { + // set attributes using the labels + attributes := obtainAttributesFromLabels(labels) + for _, v := range values { + histo.Record(e.ctx, v, metric.WithAttributes(attributes...)) + } + return nil } - return nil + return fmt.Errorf("wrong Otlp Histogram type for %s: %T; expecting Float64Histogram", name, m) } func (e *EncodeOtlpMetrics) GetCacheEntry(entryLabels map[string]string, _ interface{}) interface{} { @@ -118,30 +122,28 @@ func NewEncodeOtlpMetrics(opMetrics *operational.Metrics, params config.StagePar ctx := context.Background() res := newResource() - mp, err := NewOtlpMetricsProvider(ctx, params, res) + mp, err := NewOtlpMetricsProvider(ctx, &cfg, res) if err != nil { return nil, err } - meter := mp.Meter( - flpMeterName, - ) + meter := mp.Meter(flpMeterName) + return newEncodeOtlpMetricsWithMeter(ctx, params.Name, opMetrics, &cfg, meter) +} + +func newEncodeOtlpMetricsWithMeter(ctx context.Context, stageName string, opMetrics *operational.Metrics, cfg *api.EncodeOtlpMetrics, meter metric.Meter) (encode.Encoder, error) { + meterFactory := otel.Meter(flpMeterName) expiryTime := cfg.ExpiryTime if expiryTime.Duration == 0 { expiryTime.Duration = defaultExpiryTime } - meterFactory := otel.Meter(flpMeterName) - w := &EncodeOtlpMetrics{ - cfg: cfg, - ctx: ctx, - res: res, - mp: mp, - meter: meterFactory, + cfg: *cfg, + ctx: ctx, } - metricCommon := encode.NewMetricsCommonStruct(opMetrics, 0, params.Name, expiryTime, nil) + metricCommon := encode.NewMetricsCommonStruct(opMetrics, 0, stageName, expiryTime, nil) w.metricCommon = metricCommon for i := range cfg.Metrics { @@ -172,6 +174,7 @@ func NewEncodeOtlpMetrics(opMetrics *operational.Metrics, params config.StagePar metricCommon.AddGauge(fullMetricName, gauge, mInfo) case api.MetricHistogram: var histo metric.Float64Histogram + var err error if len(mCfg.Buckets) == 0 { histo, err = meter.Float64Histogram(fullMetricName) } else { diff --git a/pkg/pipeline/encode/opentelemetry/encode_otlpmetrics_test.go b/pkg/pipeline/encode/opentelemetry/encode_otlpmetrics_test.go new file mode 100644 index 000000000..b47fcba97 --- /dev/null +++ b/pkg/pipeline/encode/opentelemetry/encode_otlpmetrics_test.go @@ -0,0 +1,84 @@ +package opentelemetry + +import ( + "context" + "testing" + + "github.com/netobserv/flowlogs-pipeline/pkg/api" + "github.com/netobserv/flowlogs-pipeline/pkg/config" + "github.com/netobserv/flowlogs-pipeline/pkg/operational" + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" + "go.opentelemetry.io/otel/metric" +) + +type wrappedMeter struct { + metric.Meter + back metric.Meter + counters map[string]*wrappedCounter +} + +func newWrappedMeter(ctx context.Context, cfg *api.EncodeOtlpMetrics) (*wrappedMeter, error) { + res := newResource() + mp, err := NewOtlpMetricsProvider(ctx, cfg, res) + if err != nil { + return nil, err + } + return &wrappedMeter{back: mp.Meter("test"), counters: make(map[string]*wrappedCounter)}, nil +} + +func (wm *wrappedMeter) Float64Counter(name string, options ...metric.Float64CounterOption) (metric.Float64Counter, error) { + c, err := wm.back.Float64Counter(name, options...) + wc := &wrappedCounter{back: c} + wm.counters[name] = wc + return wc, err +} + +type wrappedCounter struct { + metric.Float64Counter + back metric.Float64Counter + sum float64 +} + +func (wc *wrappedCounter) Add(ctx context.Context, incr float64, options ...metric.AddOption) { + wc.sum += incr + wc.back.Add(ctx, incr, options...) +} + +func Test_EncodeOtlpMetrics(t *testing.T) { + cfg := &api.EncodeOtlpMetrics{ + OtlpConnectionInfo: &api.OtlpConnectionInfo{ + Address: "1.2.3.4", + Port: 999, + ConnectionType: "grpc", + Headers: nil, + }, + Prefix: "flp_test", + Metrics: []api.MetricsItem{ + {Name: "metric1", Type: "counter", ValueKey: "value", Labels: []string{"destination.k8s.kind", "DstSubnetLabel"}}, + {Name: "metric2", Type: "gauge", Labels: []string{"DstSubnetLabel"}}, + {Name: "metric3", Type: "counter", Labels: []string{"destination.k8s.kind", "source.k8s.kind"}}, + }, + } + ctx := context.Background() + wm, err := newWrappedMeter(ctx, cfg) + require.NoError(t, err) + encoder, err := newEncodeOtlpMetricsWithMeter(ctx, "otlp-encode", operational.NewMetrics(&config.MetricsSettings{}), cfg, wm) + require.NoError(t, err) + require.NotNil(t, encoder) + + assert.Len(t, wm.counters, 2) + assert.Equal(t, float64(0), wm.counters["flp_testmetric1"].sum) + assert.Equal(t, float64(0), wm.counters["flp_testmetric3"].sum) + + // Test empty + encoder.Encode(config.GenericMap{}) + assert.Len(t, wm.counters, 2) + assert.Equal(t, float64(0), wm.counters["flp_testmetric1"].sum) + assert.Equal(t, float64(1), wm.counters["flp_testmetric3"].sum) + + encoder.Encode(config.GenericMap{"destination.k8s.kind": "foo", "DstSubnetLabel": "bar", "value": 5}) + assert.Len(t, wm.counters, 2) + assert.Equal(t, float64(5), wm.counters["flp_testmetric1"].sum) + assert.Equal(t, float64(2), wm.counters["flp_testmetric3"].sum) +} diff --git a/pkg/pipeline/encode/opentelemetry/opentelemetry.go b/pkg/pipeline/encode/opentelemetry/opentelemetry.go index 2aebf1463..9fca0b9d6 100644 --- a/pkg/pipeline/encode/opentelemetry/opentelemetry.go +++ b/pkg/pipeline/encode/opentelemetry/opentelemetry.go @@ -122,11 +122,7 @@ func NewOtlpTracerProvider(ctx context.Context, params config.StageParam, res *r return traceProvider, nil } -func NewOtlpMetricsProvider(ctx context.Context, params config.StageParam, res *resource.Resource) (*sdkmetric.MeterProvider, error) { - cfg := api.EncodeOtlpMetrics{} - if params.Encode != nil && params.Encode.OtlpMetrics != nil { - cfg = *params.Encode.OtlpMetrics - } +func NewOtlpMetricsProvider(ctx context.Context, cfg *api.EncodeOtlpMetrics, res *resource.Resource) (*sdkmetric.MeterProvider, error) { timeInterval := cfg.PushTimeInterval if timeInterval.Duration == 0 { timeInterval.Duration = defaultTimeInterval