Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 3 additions & 3 deletions pkg/pipeline/encode/encode_prom.go
Original file line number Diff line number Diff line change
Expand Up @@ -55,7 +55,7 @@ func (e *Prometheus) Encode(metricRecord config.GenericMap) {
e.checkConfUpdate()
}

func (e *Prometheus) ProcessCounter(m interface{}, labels map[string]string, value float64) error {
func (e *Prometheus) ProcessCounter(m interface{}, _ string, labels map[string]string, value float64) error {
counter := m.(*prometheus.CounterVec)
mm, err := counter.GetMetricWith(labels)
if err != nil {
Expand All @@ -75,7 +75,7 @@ func (e *Prometheus) ProcessGauge(m interface{}, _ string, labels map[string]str
return nil
}

func (e *Prometheus) ProcessHist(m interface{}, labels map[string]string, value float64) error {
func (e *Prometheus) ProcessHist(m interface{}, _ string, labels map[string]string, value float64) error {
hist := m.(*prometheus.HistogramVec)
mm, err := hist.GetMetricWith(labels)
if err != nil {
Expand All @@ -85,7 +85,7 @@ func (e *Prometheus) ProcessHist(m interface{}, labels map[string]string, value
return nil
}

func (e *Prometheus) ProcessAggHist(m interface{}, labels map[string]string, values []float64) error {
func (e *Prometheus) ProcessAggHist(m interface{}, _ string, labels map[string]string, values []float64) error {
hist := m.(*prometheus.HistogramVec)
mm, err := hist.GetMetricWith(labels)
if err != nil {
Expand Down
12 changes: 6 additions & 6 deletions pkg/pipeline/encode/metrics_common.go
Original file line number Diff line number Diff line change
Expand Up @@ -51,10 +51,10 @@ type MetricsCommonStruct struct {

type MetricsCommonInterface interface {
GetCacheEntry(entryLabels map[string]string, m interface{}) interface{}
ProcessCounter(m interface{}, labels map[string]string, value float64) error
ProcessCounter(m interface{}, name string, labels map[string]string, value float64) error
ProcessGauge(m interface{}, name string, labels map[string]string, value float64, lvs []string) error
ProcessHist(m interface{}, labels map[string]string, value float64) error
ProcessAggHist(m interface{}, labels map[string]string, value []float64) error
ProcessHist(m interface{}, name string, labels map[string]string, value float64) error
ProcessAggHist(m interface{}, name string, labels map[string]string, value []float64) error
}

var (
Expand Down Expand Up @@ -114,7 +114,7 @@ func (m *MetricsCommonStruct) MetricCommonEncode(mci MetricsCommonInterface, met
continue
}
for _, labels := range labelSets {
err := mci.ProcessCounter(mInfo.genericMetric, labels.lMap, value)
err := mci.ProcessCounter(mInfo.genericMetric, mInfo.info.Name, labels.lMap, value)
if err != nil {
log.Errorf("labels registering error on %s: %v", mInfo.info.Name, err)
m.errorsCounter.WithLabelValues("LabelsRegisteringError", mInfo.info.Name, "").Inc()
Expand Down Expand Up @@ -148,7 +148,7 @@ func (m *MetricsCommonStruct) MetricCommonEncode(mci MetricsCommonInterface, met
continue
}
for _, labels := range labelSets {
err := mci.ProcessHist(mInfo.genericMetric, labels.lMap, value)
err := mci.ProcessHist(mInfo.genericMetric, mInfo.info.Name, labels.lMap, value)
if err != nil {
log.Errorf("labels registering error on %s: %v", mInfo.info.Name, err)
m.errorsCounter.WithLabelValues("LabelsRegisteringError", mInfo.info.Name, "").Inc()
Expand All @@ -165,7 +165,7 @@ func (m *MetricsCommonStruct) MetricCommonEncode(mci MetricsCommonInterface, met
continue
}
for _, labels := range labelSets {
err := mci.ProcessAggHist(mInfo.genericMetric, labels.lMap, values)
err := mci.ProcessAggHist(mInfo.genericMetric, mInfo.info.Name, labels.lMap, values)
if err != nil {
log.Errorf("labels registering error on %s: %v", mInfo.info.Name, err)
m.errorsCounter.WithLabelValues("LabelsRegisteringError", mInfo.info.Name, "").Inc()
Expand Down
24 changes: 0 additions & 24 deletions pkg/pipeline/encode/opentelemetry/encode_otlp_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -175,27 +175,3 @@ func Test_EncodeOtlpTraces(t *testing.T) {
require.NoError(t, err)
require.NotNil(t, newEncode)
}

func Test_EncodeOtlpMetrics(t *testing.T) {
cfg := config.StageParam{
Encode: &config.Encode{
OtlpMetrics: &api.EncodeOtlpMetrics{
OtlpConnectionInfo: &api.OtlpConnectionInfo{
Address: "1.2.3.4",
Port: 999,
ConnectionType: "grpc",
Headers: nil,
},
Prefix: "flp_test",
Metrics: []api.MetricsItem{
{Name: "metric1", Type: "counter", Labels: []string{"label11", "label12"}},
{Name: "metric2", Type: "gauge", Labels: []string{"label21", "label22"}},
{Name: "metric3", Type: "counter", Labels: []string{"label31", "label32"}},
},
}},
}
newEncode, err := NewEncodeOtlpMetrics(operational.NewMetrics(&config.MetricsSettings{}), cfg)
require.NoError(t, err)
require.NotNil(t, newEncode)
// TODO: add more tests
}
87 changes: 45 additions & 42 deletions pkg/pipeline/encode/opentelemetry/encode_otlpmetrics.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ package opentelemetry

import (
"context"
"fmt"
"strings"
"time"

Expand All @@ -31,8 +32,6 @@ import (
"go.opentelemetry.io/otel"
"go.opentelemetry.io/otel/attribute"
"go.opentelemetry.io/otel/metric"
sdkmetric "go.opentelemetry.io/otel/sdk/metric"
"go.opentelemetry.io/otel/sdk/resource"
)

const defaultExpiryTime = time.Duration(2 * time.Minute)
Expand All @@ -41,9 +40,6 @@ const flpMeterName = "flp_meter"
type EncodeOtlpMetrics struct {
cfg api.EncodeOtlpMetrics
ctx context.Context
res *resource.Resource
mp *sdkmetric.MeterProvider
meter metric.Meter
metricCommon *encode.MetricsCommonStruct
}

Expand All @@ -57,12 +53,14 @@ func (e *EncodeOtlpMetrics) Encode(metricRecord config.GenericMap) {
e.metricCommon.MetricCommonEncode(e, metricRecord)
}

func (e *EncodeOtlpMetrics) ProcessCounter(m interface{}, labels map[string]string, value float64) error {
counter := m.(metric.Float64Counter)
// set attributes using the labels
attributes := obtainAttributesFromLabels(labels)
counter.Add(e.ctx, value, metric.WithAttributes(attributes...))
return nil
func (e *EncodeOtlpMetrics) ProcessCounter(m interface{}, name string, labels map[string]string, value float64) error {
if counter, ok := m.(metric.Float64Counter); ok {
// set attributes using the labels
attributes := obtainAttributesFromLabels(labels)
counter.Add(e.ctx, value, metric.WithAttributes(attributes...))
return nil
}
return fmt.Errorf("wrong Otlp Counter type for %s: %T; expecting Float64Counter", name, m)
}

func createKey(name string, keys []string) string {
Expand All @@ -77,30 +75,36 @@ func createKey(name string, keys []string) string {
}

func (e *EncodeOtlpMetrics) ProcessGauge(m interface{}, name string, labels map[string]string, value float64, lvs []string) error {
obs := m.(Float64Gauge)
// set attributes using the labels
attributes := obtainAttributesFromLabels(labels)
key := createKey(name, lvs)
obs.Set(key, value, attributes)
return nil
if obs, ok := m.(Float64Gauge); ok {
// set attributes using the labels
attributes := obtainAttributesFromLabels(labels)
key := createKey(name, lvs)
obs.Set(key, value, attributes)
return nil
}
return fmt.Errorf("wrong Otlp Gauge type for %s: %T; expecting Float64Gauge", name, m)
}

func (e *EncodeOtlpMetrics) ProcessHist(m interface{}, labels map[string]string, value float64) error {
histo := m.(metric.Float64Histogram)
// set attributes using the labels
attributes := obtainAttributesFromLabels(labels)
histo.Record(e.ctx, value, metric.WithAttributes(attributes...))
return nil
func (e *EncodeOtlpMetrics) ProcessHist(m interface{}, name string, labels map[string]string, value float64) error {
if histo, ok := m.(metric.Float64Histogram); ok {
// set attributes using the labels
attributes := obtainAttributesFromLabels(labels)
histo.Record(e.ctx, value, metric.WithAttributes(attributes...))
return nil
}
return fmt.Errorf("wrong Otlp Histogram type for %s: %T; expecting Float64Histogram", name, m)
}

func (e *EncodeOtlpMetrics) ProcessAggHist(m interface{}, labels map[string]string, values []float64) error {
histo := m.(metric.Float64Histogram)
// set attributes using the labels
attributes := obtainAttributesFromLabels(labels)
for _, v := range values {
histo.Record(e.ctx, v, metric.WithAttributes(attributes...))
func (e *EncodeOtlpMetrics) ProcessAggHist(m interface{}, name string, labels map[string]string, values []float64) error {
if histo, ok := m.(metric.Float64Histogram); ok {
// set attributes using the labels
attributes := obtainAttributesFromLabels(labels)
for _, v := range values {
histo.Record(e.ctx, v, metric.WithAttributes(attributes...))
}
return nil
}
return nil
return fmt.Errorf("wrong Otlp Histogram type for %s: %T; expecting Float64Histogram", name, m)
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

(minor) Change the message so it's not exactly the same as line 95, such as:
return fmt.Errorf("wrong Otlp Aggregated Histogram type for %s: %T; expecting Float64Histogram", name, m)

}

func (e *EncodeOtlpMetrics) GetCacheEntry(entryLabels map[string]string, _ interface{}) interface{} {
Expand All @@ -118,30 +122,28 @@ func NewEncodeOtlpMetrics(opMetrics *operational.Metrics, params config.StagePar
ctx := context.Background()
res := newResource()

mp, err := NewOtlpMetricsProvider(ctx, params, res)
mp, err := NewOtlpMetricsProvider(ctx, &cfg, res)
if err != nil {
return nil, err
}
meter := mp.Meter(
flpMeterName,
)
meter := mp.Meter(flpMeterName)
return newEncodeOtlpMetricsWithMeter(ctx, params.Name, opMetrics, &cfg, meter)
}

func newEncodeOtlpMetricsWithMeter(ctx context.Context, stageName string, opMetrics *operational.Metrics, cfg *api.EncodeOtlpMetrics, meter metric.Meter) (encode.Encoder, error) {
meterFactory := otel.Meter(flpMeterName)

expiryTime := cfg.ExpiryTime
if expiryTime.Duration == 0 {
expiryTime.Duration = defaultExpiryTime
}

meterFactory := otel.Meter(flpMeterName)

w := &EncodeOtlpMetrics{
cfg: cfg,
ctx: ctx,
res: res,
mp: mp,
meter: meterFactory,
cfg: *cfg,
ctx: ctx,
}

metricCommon := encode.NewMetricsCommonStruct(opMetrics, 0, params.Name, expiryTime, nil)
metricCommon := encode.NewMetricsCommonStruct(opMetrics, 0, stageName, expiryTime, nil)
w.metricCommon = metricCommon

for i := range cfg.Metrics {
Expand Down Expand Up @@ -172,6 +174,7 @@ func NewEncodeOtlpMetrics(opMetrics *operational.Metrics, params config.StagePar
metricCommon.AddGauge(fullMetricName, gauge, mInfo)
case api.MetricHistogram:
var histo metric.Float64Histogram
var err error
if len(mCfg.Buckets) == 0 {
histo, err = meter.Float64Histogram(fullMetricName)
} else {
Expand Down
84 changes: 84 additions & 0 deletions pkg/pipeline/encode/opentelemetry/encode_otlpmetrics_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,84 @@
package opentelemetry

import (
"context"
"testing"

"github.com/netobserv/flowlogs-pipeline/pkg/api"
"github.com/netobserv/flowlogs-pipeline/pkg/config"
"github.com/netobserv/flowlogs-pipeline/pkg/operational"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
"go.opentelemetry.io/otel/metric"
)

type wrappedMeter struct {
metric.Meter
back metric.Meter
counters map[string]*wrappedCounter
}

func newWrappedMeter(ctx context.Context, cfg *api.EncodeOtlpMetrics) (*wrappedMeter, error) {
res := newResource()
mp, err := NewOtlpMetricsProvider(ctx, cfg, res)
if err != nil {
return nil, err
}
return &wrappedMeter{back: mp.Meter("test"), counters: make(map[string]*wrappedCounter)}, nil
}

func (wm *wrappedMeter) Float64Counter(name string, options ...metric.Float64CounterOption) (metric.Float64Counter, error) {
c, err := wm.back.Float64Counter(name, options...)
wc := &wrappedCounter{back: c}
wm.counters[name] = wc
return wc, err
}

type wrappedCounter struct {
metric.Float64Counter
back metric.Float64Counter
sum float64
}

func (wc *wrappedCounter) Add(ctx context.Context, incr float64, options ...metric.AddOption) {
wc.sum += incr
wc.back.Add(ctx, incr, options...)
}

func Test_EncodeOtlpMetrics(t *testing.T) {
cfg := &api.EncodeOtlpMetrics{
OtlpConnectionInfo: &api.OtlpConnectionInfo{
Address: "1.2.3.4",
Port: 999,
ConnectionType: "grpc",
Headers: nil,
},
Prefix: "flp_test",
Metrics: []api.MetricsItem{
{Name: "metric1", Type: "counter", ValueKey: "value", Labels: []string{"destination.k8s.kind", "DstSubnetLabel"}},
{Name: "metric2", Type: "gauge", Labels: []string{"DstSubnetLabel"}},
{Name: "metric3", Type: "counter", Labels: []string{"destination.k8s.kind", "source.k8s.kind"}},
},
}
ctx := context.Background()
wm, err := newWrappedMeter(ctx, cfg)
require.NoError(t, err)
encoder, err := newEncodeOtlpMetricsWithMeter(ctx, "otlp-encode", operational.NewMetrics(&config.MetricsSettings{}), cfg, wm)
require.NoError(t, err)
require.NotNil(t, encoder)

assert.Len(t, wm.counters, 2)
assert.Equal(t, float64(0), wm.counters["flp_testmetric1"].sum)
assert.Equal(t, float64(0), wm.counters["flp_testmetric3"].sum)

// Test empty
encoder.Encode(config.GenericMap{})
assert.Len(t, wm.counters, 2)
assert.Equal(t, float64(0), wm.counters["flp_testmetric1"].sum)
assert.Equal(t, float64(1), wm.counters["flp_testmetric3"].sum)

encoder.Encode(config.GenericMap{"destination.k8s.kind": "foo", "DstSubnetLabel": "bar", "value": 5})
assert.Len(t, wm.counters, 2)
assert.Equal(t, float64(5), wm.counters["flp_testmetric1"].sum)
assert.Equal(t, float64(2), wm.counters["flp_testmetric3"].sum)
}
6 changes: 1 addition & 5 deletions pkg/pipeline/encode/opentelemetry/opentelemetry.go
Original file line number Diff line number Diff line change
Expand Up @@ -122,11 +122,7 @@ func NewOtlpTracerProvider(ctx context.Context, params config.StageParam, res *r
return traceProvider, nil
}

func NewOtlpMetricsProvider(ctx context.Context, params config.StageParam, res *resource.Resource) (*sdkmetric.MeterProvider, error) {
cfg := api.EncodeOtlpMetrics{}
if params.Encode != nil && params.Encode.OtlpMetrics != nil {
cfg = *params.Encode.OtlpMetrics
}
func NewOtlpMetricsProvider(ctx context.Context, cfg *api.EncodeOtlpMetrics, res *resource.Resource) (*sdkmetric.MeterProvider, error) {
timeInterval := cfg.PushTimeInterval
if timeInterval.Duration == 0 {
timeInterval.Duration = defaultTimeInterval
Expand Down