Skip to content

Commit bd7b5f5

Browse files
authored
Addressing some tech debt on otlp-metrics (#1094)
- Add some missing error handling - Add missing unit-test - Remove unnecessary fields from EncodeOtlpMetrics
1 parent 4f87308 commit bd7b5f5

File tree

6 files changed

+139
-80
lines changed

6 files changed

+139
-80
lines changed

pkg/pipeline/encode/encode_prom.go

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -55,7 +55,7 @@ func (e *Prometheus) Encode(metricRecord config.GenericMap) {
5555
e.checkConfUpdate()
5656
}
5757

58-
func (e *Prometheus) ProcessCounter(m interface{}, labels map[string]string, value float64) error {
58+
func (e *Prometheus) ProcessCounter(m interface{}, _ string, labels map[string]string, value float64) error {
5959
counter := m.(*prometheus.CounterVec)
6060
mm, err := counter.GetMetricWith(labels)
6161
if err != nil {
@@ -75,7 +75,7 @@ func (e *Prometheus) ProcessGauge(m interface{}, _ string, labels map[string]str
7575
return nil
7676
}
7777

78-
func (e *Prometheus) ProcessHist(m interface{}, labels map[string]string, value float64) error {
78+
func (e *Prometheus) ProcessHist(m interface{}, _ string, labels map[string]string, value float64) error {
7979
hist := m.(*prometheus.HistogramVec)
8080
mm, err := hist.GetMetricWith(labels)
8181
if err != nil {
@@ -85,7 +85,7 @@ func (e *Prometheus) ProcessHist(m interface{}, labels map[string]string, value
8585
return nil
8686
}
8787

88-
func (e *Prometheus) ProcessAggHist(m interface{}, labels map[string]string, values []float64) error {
88+
func (e *Prometheus) ProcessAggHist(m interface{}, _ string, labels map[string]string, values []float64) error {
8989
hist := m.(*prometheus.HistogramVec)
9090
mm, err := hist.GetMetricWith(labels)
9191
if err != nil {

pkg/pipeline/encode/metrics_common.go

Lines changed: 6 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -51,10 +51,10 @@ type MetricsCommonStruct struct {
5151

5252
type MetricsCommonInterface interface {
5353
GetCacheEntry(entryLabels map[string]string, m interface{}) interface{}
54-
ProcessCounter(m interface{}, labels map[string]string, value float64) error
54+
ProcessCounter(m interface{}, name string, labels map[string]string, value float64) error
5555
ProcessGauge(m interface{}, name string, labels map[string]string, value float64, lvs []string) error
56-
ProcessHist(m interface{}, labels map[string]string, value float64) error
57-
ProcessAggHist(m interface{}, labels map[string]string, value []float64) error
56+
ProcessHist(m interface{}, name string, labels map[string]string, value float64) error
57+
ProcessAggHist(m interface{}, name string, labels map[string]string, value []float64) error
5858
}
5959

6060
var (
@@ -114,7 +114,7 @@ func (m *MetricsCommonStruct) MetricCommonEncode(mci MetricsCommonInterface, met
114114
continue
115115
}
116116
for _, labels := range labelSets {
117-
err := mci.ProcessCounter(mInfo.genericMetric, labels.lMap, value)
117+
err := mci.ProcessCounter(mInfo.genericMetric, mInfo.info.Name, labels.lMap, value)
118118
if err != nil {
119119
log.Errorf("labels registering error on %s: %v", mInfo.info.Name, err)
120120
m.errorsCounter.WithLabelValues("LabelsRegisteringError", mInfo.info.Name, "").Inc()
@@ -148,7 +148,7 @@ func (m *MetricsCommonStruct) MetricCommonEncode(mci MetricsCommonInterface, met
148148
continue
149149
}
150150
for _, labels := range labelSets {
151-
err := mci.ProcessHist(mInfo.genericMetric, labels.lMap, value)
151+
err := mci.ProcessHist(mInfo.genericMetric, mInfo.info.Name, labels.lMap, value)
152152
if err != nil {
153153
log.Errorf("labels registering error on %s: %v", mInfo.info.Name, err)
154154
m.errorsCounter.WithLabelValues("LabelsRegisteringError", mInfo.info.Name, "").Inc()
@@ -165,7 +165,7 @@ func (m *MetricsCommonStruct) MetricCommonEncode(mci MetricsCommonInterface, met
165165
continue
166166
}
167167
for _, labels := range labelSets {
168-
err := mci.ProcessAggHist(mInfo.genericMetric, labels.lMap, values)
168+
err := mci.ProcessAggHist(mInfo.genericMetric, mInfo.info.Name, labels.lMap, values)
169169
if err != nil {
170170
log.Errorf("labels registering error on %s: %v", mInfo.info.Name, err)
171171
m.errorsCounter.WithLabelValues("LabelsRegisteringError", mInfo.info.Name, "").Inc()

pkg/pipeline/encode/opentelemetry/encode_otlp_test.go

Lines changed: 0 additions & 24 deletions
Original file line numberDiff line numberDiff line change
@@ -175,27 +175,3 @@ func Test_EncodeOtlpTraces(t *testing.T) {
175175
require.NoError(t, err)
176176
require.NotNil(t, newEncode)
177177
}
178-
179-
func Test_EncodeOtlpMetrics(t *testing.T) {
180-
cfg := config.StageParam{
181-
Encode: &config.Encode{
182-
OtlpMetrics: &api.EncodeOtlpMetrics{
183-
OtlpConnectionInfo: &api.OtlpConnectionInfo{
184-
Address: "1.2.3.4",
185-
Port: 999,
186-
ConnectionType: "grpc",
187-
Headers: nil,
188-
},
189-
Prefix: "flp_test",
190-
Metrics: []api.MetricsItem{
191-
{Name: "metric1", Type: "counter", Labels: []string{"label11", "label12"}},
192-
{Name: "metric2", Type: "gauge", Labels: []string{"label21", "label22"}},
193-
{Name: "metric3", Type: "counter", Labels: []string{"label31", "label32"}},
194-
},
195-
}},
196-
}
197-
newEncode, err := NewEncodeOtlpMetrics(operational.NewMetrics(&config.MetricsSettings{}), cfg)
198-
require.NoError(t, err)
199-
require.NotNil(t, newEncode)
200-
// TODO: add more tests
201-
}

pkg/pipeline/encode/opentelemetry/encode_otlpmetrics.go

Lines changed: 45 additions & 42 deletions
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,7 @@ package opentelemetry
1919

2020
import (
2121
"context"
22+
"fmt"
2223
"strings"
2324
"time"
2425

@@ -31,8 +32,6 @@ import (
3132
"go.opentelemetry.io/otel"
3233
"go.opentelemetry.io/otel/attribute"
3334
"go.opentelemetry.io/otel/metric"
34-
sdkmetric "go.opentelemetry.io/otel/sdk/metric"
35-
"go.opentelemetry.io/otel/sdk/resource"
3635
)
3736

3837
const defaultExpiryTime = time.Duration(2 * time.Minute)
@@ -41,9 +40,6 @@ const flpMeterName = "flp_meter"
4140
type EncodeOtlpMetrics struct {
4241
cfg api.EncodeOtlpMetrics
4342
ctx context.Context
44-
res *resource.Resource
45-
mp *sdkmetric.MeterProvider
46-
meter metric.Meter
4743
metricCommon *encode.MetricsCommonStruct
4844
}
4945

@@ -57,12 +53,14 @@ func (e *EncodeOtlpMetrics) Encode(metricRecord config.GenericMap) {
5753
e.metricCommon.MetricCommonEncode(e, metricRecord)
5854
}
5955

60-
func (e *EncodeOtlpMetrics) ProcessCounter(m interface{}, labels map[string]string, value float64) error {
61-
counter := m.(metric.Float64Counter)
62-
// set attributes using the labels
63-
attributes := obtainAttributesFromLabels(labels)
64-
counter.Add(e.ctx, value, metric.WithAttributes(attributes...))
65-
return nil
56+
func (e *EncodeOtlpMetrics) ProcessCounter(m interface{}, name string, labels map[string]string, value float64) error {
57+
if counter, ok := m.(metric.Float64Counter); ok {
58+
// set attributes using the labels
59+
attributes := obtainAttributesFromLabels(labels)
60+
counter.Add(e.ctx, value, metric.WithAttributes(attributes...))
61+
return nil
62+
}
63+
return fmt.Errorf("wrong Otlp Counter type for %s: %T; expecting Float64Counter", name, m)
6664
}
6765

6866
func createKey(name string, keys []string) string {
@@ -77,30 +75,36 @@ func createKey(name string, keys []string) string {
7775
}
7876

7977
func (e *EncodeOtlpMetrics) ProcessGauge(m interface{}, name string, labels map[string]string, value float64, lvs []string) error {
80-
obs := m.(Float64Gauge)
81-
// set attributes using the labels
82-
attributes := obtainAttributesFromLabels(labels)
83-
key := createKey(name, lvs)
84-
obs.Set(key, value, attributes)
85-
return nil
78+
if obs, ok := m.(Float64Gauge); ok {
79+
// set attributes using the labels
80+
attributes := obtainAttributesFromLabels(labels)
81+
key := createKey(name, lvs)
82+
obs.Set(key, value, attributes)
83+
return nil
84+
}
85+
return fmt.Errorf("wrong Otlp Gauge type for %s: %T; expecting Float64Gauge", name, m)
8686
}
8787

88-
func (e *EncodeOtlpMetrics) ProcessHist(m interface{}, labels map[string]string, value float64) error {
89-
histo := m.(metric.Float64Histogram)
90-
// set attributes using the labels
91-
attributes := obtainAttributesFromLabels(labels)
92-
histo.Record(e.ctx, value, metric.WithAttributes(attributes...))
93-
return nil
88+
func (e *EncodeOtlpMetrics) ProcessHist(m interface{}, name string, labels map[string]string, value float64) error {
89+
if histo, ok := m.(metric.Float64Histogram); ok {
90+
// set attributes using the labels
91+
attributes := obtainAttributesFromLabels(labels)
92+
histo.Record(e.ctx, value, metric.WithAttributes(attributes...))
93+
return nil
94+
}
95+
return fmt.Errorf("wrong Otlp Histogram type for %s: %T; expecting Float64Histogram", name, m)
9496
}
9597

96-
func (e *EncodeOtlpMetrics) ProcessAggHist(m interface{}, labels map[string]string, values []float64) error {
97-
histo := m.(metric.Float64Histogram)
98-
// set attributes using the labels
99-
attributes := obtainAttributesFromLabels(labels)
100-
for _, v := range values {
101-
histo.Record(e.ctx, v, metric.WithAttributes(attributes...))
98+
func (e *EncodeOtlpMetrics) ProcessAggHist(m interface{}, name string, labels map[string]string, values []float64) error {
99+
if histo, ok := m.(metric.Float64Histogram); ok {
100+
// set attributes using the labels
101+
attributes := obtainAttributesFromLabels(labels)
102+
for _, v := range values {
103+
histo.Record(e.ctx, v, metric.WithAttributes(attributes...))
104+
}
105+
return nil
102106
}
103-
return nil
107+
return fmt.Errorf("wrong Otlp Histogram type for %s: %T; expecting Float64Histogram", name, m)
104108
}
105109

106110
func (e *EncodeOtlpMetrics) GetCacheEntry(entryLabels map[string]string, _ interface{}) interface{} {
@@ -118,30 +122,28 @@ func NewEncodeOtlpMetrics(opMetrics *operational.Metrics, params config.StagePar
118122
ctx := context.Background()
119123
res := newResource()
120124

121-
mp, err := NewOtlpMetricsProvider(ctx, params, res)
125+
mp, err := NewOtlpMetricsProvider(ctx, &cfg, res)
122126
if err != nil {
123127
return nil, err
124128
}
125-
meter := mp.Meter(
126-
flpMeterName,
127-
)
129+
meter := mp.Meter(flpMeterName)
130+
return newEncodeOtlpMetricsWithMeter(ctx, params.Name, opMetrics, &cfg, meter)
131+
}
132+
133+
func newEncodeOtlpMetricsWithMeter(ctx context.Context, stageName string, opMetrics *operational.Metrics, cfg *api.EncodeOtlpMetrics, meter metric.Meter) (encode.Encoder, error) {
134+
meterFactory := otel.Meter(flpMeterName)
128135

129136
expiryTime := cfg.ExpiryTime
130137
if expiryTime.Duration == 0 {
131138
expiryTime.Duration = defaultExpiryTime
132139
}
133140

134-
meterFactory := otel.Meter(flpMeterName)
135-
136141
w := &EncodeOtlpMetrics{
137-
cfg: cfg,
138-
ctx: ctx,
139-
res: res,
140-
mp: mp,
141-
meter: meterFactory,
142+
cfg: *cfg,
143+
ctx: ctx,
142144
}
143145

144-
metricCommon := encode.NewMetricsCommonStruct(opMetrics, 0, params.Name, expiryTime, nil)
146+
metricCommon := encode.NewMetricsCommonStruct(opMetrics, 0, stageName, expiryTime, nil)
145147
w.metricCommon = metricCommon
146148

147149
for i := range cfg.Metrics {
@@ -172,6 +174,7 @@ func NewEncodeOtlpMetrics(opMetrics *operational.Metrics, params config.StagePar
172174
metricCommon.AddGauge(fullMetricName, gauge, mInfo)
173175
case api.MetricHistogram:
174176
var histo metric.Float64Histogram
177+
var err error
175178
if len(mCfg.Buckets) == 0 {
176179
histo, err = meter.Float64Histogram(fullMetricName)
177180
} else {
Lines changed: 84 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,84 @@
1+
package opentelemetry
2+
3+
import (
4+
"context"
5+
"testing"
6+
7+
"github.com/netobserv/flowlogs-pipeline/pkg/api"
8+
"github.com/netobserv/flowlogs-pipeline/pkg/config"
9+
"github.com/netobserv/flowlogs-pipeline/pkg/operational"
10+
"github.com/stretchr/testify/assert"
11+
"github.com/stretchr/testify/require"
12+
"go.opentelemetry.io/otel/metric"
13+
)
14+
15+
type wrappedMeter struct {
16+
metric.Meter
17+
back metric.Meter
18+
counters map[string]*wrappedCounter
19+
}
20+
21+
func newWrappedMeter(ctx context.Context, cfg *api.EncodeOtlpMetrics) (*wrappedMeter, error) {
22+
res := newResource()
23+
mp, err := NewOtlpMetricsProvider(ctx, cfg, res)
24+
if err != nil {
25+
return nil, err
26+
}
27+
return &wrappedMeter{back: mp.Meter("test"), counters: make(map[string]*wrappedCounter)}, nil
28+
}
29+
30+
func (wm *wrappedMeter) Float64Counter(name string, options ...metric.Float64CounterOption) (metric.Float64Counter, error) {
31+
c, err := wm.back.Float64Counter(name, options...)
32+
wc := &wrappedCounter{back: c}
33+
wm.counters[name] = wc
34+
return wc, err
35+
}
36+
37+
type wrappedCounter struct {
38+
metric.Float64Counter
39+
back metric.Float64Counter
40+
sum float64
41+
}
42+
43+
func (wc *wrappedCounter) Add(ctx context.Context, incr float64, options ...metric.AddOption) {
44+
wc.sum += incr
45+
wc.back.Add(ctx, incr, options...)
46+
}
47+
48+
func Test_EncodeOtlpMetrics(t *testing.T) {
49+
cfg := &api.EncodeOtlpMetrics{
50+
OtlpConnectionInfo: &api.OtlpConnectionInfo{
51+
Address: "1.2.3.4",
52+
Port: 999,
53+
ConnectionType: "grpc",
54+
Headers: nil,
55+
},
56+
Prefix: "flp_test",
57+
Metrics: []api.MetricsItem{
58+
{Name: "metric1", Type: "counter", ValueKey: "value", Labels: []string{"destination.k8s.kind", "DstSubnetLabel"}},
59+
{Name: "metric2", Type: "gauge", Labels: []string{"DstSubnetLabel"}},
60+
{Name: "metric3", Type: "counter", Labels: []string{"destination.k8s.kind", "source.k8s.kind"}},
61+
},
62+
}
63+
ctx := context.Background()
64+
wm, err := newWrappedMeter(ctx, cfg)
65+
require.NoError(t, err)
66+
encoder, err := newEncodeOtlpMetricsWithMeter(ctx, "otlp-encode", operational.NewMetrics(&config.MetricsSettings{}), cfg, wm)
67+
require.NoError(t, err)
68+
require.NotNil(t, encoder)
69+
70+
assert.Len(t, wm.counters, 2)
71+
assert.Equal(t, float64(0), wm.counters["flp_testmetric1"].sum)
72+
assert.Equal(t, float64(0), wm.counters["flp_testmetric3"].sum)
73+
74+
// Test empty
75+
encoder.Encode(config.GenericMap{})
76+
assert.Len(t, wm.counters, 2)
77+
assert.Equal(t, float64(0), wm.counters["flp_testmetric1"].sum)
78+
assert.Equal(t, float64(1), wm.counters["flp_testmetric3"].sum)
79+
80+
encoder.Encode(config.GenericMap{"destination.k8s.kind": "foo", "DstSubnetLabel": "bar", "value": 5})
81+
assert.Len(t, wm.counters, 2)
82+
assert.Equal(t, float64(5), wm.counters["flp_testmetric1"].sum)
83+
assert.Equal(t, float64(2), wm.counters["flp_testmetric3"].sum)
84+
}

pkg/pipeline/encode/opentelemetry/opentelemetry.go

Lines changed: 1 addition & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -122,11 +122,7 @@ func NewOtlpTracerProvider(ctx context.Context, params config.StageParam, res *r
122122
return traceProvider, nil
123123
}
124124

125-
func NewOtlpMetricsProvider(ctx context.Context, params config.StageParam, res *resource.Resource) (*sdkmetric.MeterProvider, error) {
126-
cfg := api.EncodeOtlpMetrics{}
127-
if params.Encode != nil && params.Encode.OtlpMetrics != nil {
128-
cfg = *params.Encode.OtlpMetrics
129-
}
125+
func NewOtlpMetricsProvider(ctx context.Context, cfg *api.EncodeOtlpMetrics, res *resource.Resource) (*sdkmetric.MeterProvider, error) {
130126
timeInterval := cfg.PushTimeInterval
131127
if timeInterval.Duration == 0 {
132128
timeInterval.Duration = defaultTimeInterval

0 commit comments

Comments
 (0)