Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions docs/api.md
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@ Following is the supported API format for prometheus encode:
valueKey: entry key from which to resolve metric value
labels: labels to be associated with the metric
remap: optional remapping of labels
flatten: list fields to be flattened
buckets: histogram buckets
valueScale: scale factor of the value (MetricVal := FlowVal / Scale)
prefix: prefix added to each metric name
Expand Down Expand Up @@ -444,6 +445,7 @@ Following is the supported API format for writing metrics to an OpenTelemetry co
valueKey: entry key from which to resolve metric value
labels: labels to be associated with the metric
remap: optional remapping of labels
flatten: list fields to be flattened
buckets: histogram buckets
valueScale: scale factor of the value (MetricVal := FlowVal / Scale)
pushTimeInterval: how often should metrics be sent to collector:
Expand Down
1 change: 1 addition & 0 deletions pkg/api/encode_prom.go
Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,7 @@ type MetricsItem struct {
ValueKey string `yaml:"valueKey" json:"valueKey" doc:"entry key from which to resolve metric value"`
Labels []string `yaml:"labels" json:"labels" doc:"labels to be associated with the metric"`
Remap map[string]string `yaml:"remap" json:"remap" doc:"optional remapping of labels"`
Flatten []string `yaml:"flatten" json:"flatten" doc:"list fields to be flattened"`
Buckets []float64 `yaml:"buckets" json:"buckets" doc:"histogram buckets"`
ValueScale float64 `yaml:"valueScale,omitempty" json:"valueScale,omitempty" doc:"scale factor of the value (MetricVal := FlowVal / Scale)"`
}
Expand Down
4 changes: 4 additions & 0 deletions pkg/confgen/confgen_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -152,6 +152,7 @@ func Test_RunShortConfGen(t *testing.T) {
ValueKey: "test_aggregates_value",
Labels: []string{"groupByKeys", "aggregate"},
Remap: map[string]string{},
Flatten: []string{},
Buckets: []float64{},
}},
}, out.Parameters[3].Encode.Prom)
Expand Down Expand Up @@ -234,6 +235,7 @@ func Test_RunConfGenNoAgg(t *testing.T) {
ValueKey: "Bytes",
Labels: []string{"service"},
Remap: map[string]string{},
Flatten: []string{},
Buckets: []float64{},
}},
}, out.Parameters[2].Encode.Prom)
Expand Down Expand Up @@ -339,6 +341,7 @@ func Test_RunLongConfGen(t *testing.T) {
ValueKey: "test_aggregates_value",
Labels: []string{"groupByKeys", "aggregate"},
Remap: map[string]string{},
Flatten: []string{},
Buckets: []float64{},
}, {
Name: "test_histo",
Expand All @@ -347,6 +350,7 @@ func Test_RunLongConfGen(t *testing.T) {
ValueKey: "test_aggregates_value",
Labels: []string{"groupByKeys", "aggregate"},
Remap: map[string]string{},
Flatten: []string{},
Buckets: []float64{},
}},
}, out.Parameters[4].Encode.Prom)
Expand Down
3 changes: 2 additions & 1 deletion pkg/config/pipeline_builder_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -138,6 +138,7 @@ func TestKafkaPromPipeline(t *testing.T) {
ValueKey: "recent_count",
Labels: []string{"by", "aggregate"},
Remap: map[string]string{},
Flatten: []string{},
Buckets: []float64{},
}},
Prefix: "flp_",
Expand Down Expand Up @@ -171,7 +172,7 @@ func TestKafkaPromPipeline(t *testing.T) {

b, err = json.Marshal(params[4])
require.NoError(t, err)
require.JSONEq(t, `{"name":"prom","encode":{"type":"prom","prom":{"expiryTime":"50s", "metrics":[{"name":"connections_per_source_as","type":"counter","filters":[{"key":"name","value":"src_as_connection_count"}],"valueKey":"recent_count","labels":["by","aggregate"],"remap":{},"buckets":[]}],"prefix":"flp_"}}}`, string(b))
require.JSONEq(t, `{"name":"prom","encode":{"type":"prom","prom":{"expiryTime":"50s", "metrics":[{"name":"connections_per_source_as","type":"counter","filters":[{"key":"name","value":"src_as_connection_count"}],"valueKey":"recent_count","labels":["by","aggregate"],"flatten":[],"remap":{},"buckets":[]}],"prefix":"flp_"}}}`, string(b))
}

func TestForkPipeline(t *testing.T) {
Expand Down
15 changes: 8 additions & 7 deletions pkg/pipeline/encode/encode_prom.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ import (
"github.com/netobserv/flowlogs-pipeline/pkg/api"
"github.com/netobserv/flowlogs-pipeline/pkg/config"
"github.com/netobserv/flowlogs-pipeline/pkg/operational"
"github.com/netobserv/flowlogs-pipeline/pkg/pipeline/encode/metrics"
promserver "github.com/netobserv/flowlogs-pipeline/pkg/prometheus"
"github.com/prometheus/client_golang/prometheus"
"github.com/sirupsen/logrus"
Expand Down Expand Up @@ -114,25 +115,25 @@ func (e *EncodeProm) Cleanup(cleanupFunc interface{}) {
cleanupFunc.(func())()
}

func (e *EncodeProm) addCounter(fullMetricName string, mInfo *MetricInfo) prometheus.Collector {
func (e *EncodeProm) addCounter(fullMetricName string, mInfo *metrics.Preprocessed) prometheus.Collector {
counter := prometheus.NewCounterVec(prometheus.CounterOpts{Name: fullMetricName, Help: ""}, mInfo.TargetLabels())
e.metricCommon.AddCounter(fullMetricName, counter, mInfo)
return counter
}

func (e *EncodeProm) addGauge(fullMetricName string, mInfo *MetricInfo) prometheus.Collector {
func (e *EncodeProm) addGauge(fullMetricName string, mInfo *metrics.Preprocessed) prometheus.Collector {
gauge := prometheus.NewGaugeVec(prometheus.GaugeOpts{Name: fullMetricName, Help: ""}, mInfo.TargetLabels())
e.metricCommon.AddGauge(fullMetricName, gauge, mInfo)
return gauge
}

func (e *EncodeProm) addHistogram(fullMetricName string, mInfo *MetricInfo) prometheus.Collector {
func (e *EncodeProm) addHistogram(fullMetricName string, mInfo *metrics.Preprocessed) prometheus.Collector {
histogram := prometheus.NewHistogramVec(prometheus.HistogramOpts{Name: fullMetricName, Help: ""}, mInfo.TargetLabels())
e.metricCommon.AddHist(fullMetricName, histogram, mInfo)
return histogram
}

func (e *EncodeProm) addAgghistogram(fullMetricName string, mInfo *MetricInfo) prometheus.Collector {
func (e *EncodeProm) addAgghistogram(fullMetricName string, mInfo *metrics.Preprocessed) prometheus.Collector {
agghistogram := prometheus.NewHistogramVec(prometheus.HistogramOpts{Name: fullMetricName, Help: ""}, mInfo.TargetLabels())
e.metricCommon.AddAggHist(fullMetricName, agghistogram, mInfo)
return agghistogram
Expand Down Expand Up @@ -176,10 +177,10 @@ func (e *EncodeProm) cleanDeletedMetrics(newCfg api.PromEncode) {
}

// returns true if a registry restart is needed
func (e *EncodeProm) checkMetricUpdate(prefix string, apiItem *api.MetricsItem, store map[string]mInfoStruct, createMetric func(string, *MetricInfo) prometheus.Collector) bool {
func (e *EncodeProm) checkMetricUpdate(prefix string, apiItem *api.MetricsItem, store map[string]mInfoStruct, createMetric func(string, *metrics.Preprocessed) prometheus.Collector) bool {
fullMetricName := prefix + apiItem.Name
plog.Debugf("Checking metric: %s", fullMetricName)
mInfo := CreateMetricInfo(apiItem)
mInfo := metrics.Preprocess(apiItem)
if oldMetric, ok := store[fullMetricName]; ok {
if !reflect.DeepEqual(mInfo.TargetLabels(), oldMetric.info.TargetLabels()) {
plog.Debug("Changes detected in labels")
Expand Down Expand Up @@ -257,7 +258,7 @@ func (e *EncodeProm) resetRegistry() {
for i := range e.cfg.Metrics {
mCfg := &e.cfg.Metrics[i]
fullMetricName := e.cfg.Prefix + mCfg.Name
mInfo := CreateMetricInfo(mCfg)
mInfo := metrics.Preprocess(mCfg)
plog.Debugf("Create metric: %s, Labels: %v", fullMetricName, mInfo.TargetLabels())
var m prometheus.Collector
switch mCfg.Type {
Expand Down
213 changes: 198 additions & 15 deletions pkg/pipeline/encode/encode_prom_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -543,9 +543,6 @@ func Test_MissingLabels(t *testing.T) {
},
}
params := api.PromEncode{
ExpiryTime: api.Duration{
Duration: time.Duration(60 * time.Second),
},
Metrics: []api.MetricsItem{
{
Name: "my_counter",
Expand Down Expand Up @@ -588,9 +585,6 @@ func Test_Remap(t *testing.T) {
},
}
params := api.PromEncode{
ExpiryTime: api.Duration{
Duration: time.Duration(60 * time.Second),
},
Metrics: []api.MetricsItem{
{
Name: "my_counter",
Expand All @@ -616,6 +610,204 @@ func Test_Remap(t *testing.T) {
require.Contains(t, exposed, `my_counter{ip="10.0.0.3",namespace="B"} 4`)
}

func Test_WithListField(t *testing.T) {
metrics := []config.GenericMap{
{
"namespace": "A",
"interfaces": []string{"eth0", "123456"},
"bytes": 7,
},
{
"namespace": "A",
"interfaces": []any{"eth0", "abcdef"},
"bytes": 1,
},
{
"namespace": "A",
"interfaces": []any{"eth0", "xyz"},
"bytes": 10,
},
{
"namespace": "B",
"bytes": 2,
},
{
"namespace": "C",
"interfaces": []string{},
"bytes": 3,
},
}
params := api.PromEncode{
Metrics: []api.MetricsItem{
{
Name: "my_counter",
Type: "counter",
ValueKey: "bytes",
Filters: []api.MetricsFilter{
{Key: "interfaces", Value: "xyz", Type: api.MetricFilterNotEqual},
},
Flatten: []string{"interfaces"},
Labels: []string{"namespace", "interfaces"},
Remap: map[string]string{"interfaces": "interface"},
},
},
}

encodeProm, err := initProm(&params)
require.NoError(t, err)
for _, metric := range metrics {
encodeProm.Encode(metric)
}
time.Sleep(100 * time.Millisecond)

exposed := test.ReadExposedMetrics(t, encodeProm.server)

require.Contains(t, exposed, `my_counter{interface="eth0",namespace="A"} 18`)
require.Contains(t, exposed, `my_counter{interface="123456",namespace="A"} 7`)
require.Contains(t, exposed, `my_counter{interface="abcdef",namespace="A"} 1`)
require.Contains(t, exposed, `my_counter{interface="",namespace="B"} 2`)
require.Contains(t, exposed, `my_counter{interface="",namespace="C"} 3`)
require.NotContains(t, exposed, `"xyz"`)
}

func Test_WithObjectListField(t *testing.T) {
metrics := []config.GenericMap{
{
"namespace": "A",
"events": []any{
config.GenericMap{"type": "acl", "name": "my_policy"},
},
"bytes": 1,
},
{
"namespace": "A",
"events": []any{
config.GenericMap{"type": "egress", "name": "my_egress"},
config.GenericMap{"type": "acl", "name": "my_policy"},
},
"bytes": 10,
},
{
"namespace": "B",
"bytes": 2,
},
{
"namespace": "C",
"events": []string{},
"bytes": 3,
},
}
params := api.PromEncode{
Metrics: []api.MetricsItem{
{
Name: "policy_counter",
Type: "counter",
ValueKey: "bytes",
Filters: []api.MetricsFilter{
{Key: "events>type", Value: "acl", Type: api.MetricFilterEqual},
},
Labels: []string{"namespace", "events>name"},
Flatten: []string{"events"},
Remap: map[string]string{"events>name": "name"},
},
},
}

encodeProm, err := initProm(&params)
require.NoError(t, err)
for _, metric := range metrics {
encodeProm.Encode(metric)
}
time.Sleep(100 * time.Millisecond)

exposed := test.ReadExposedMetrics(t, encodeProm.server)

require.Contains(t, exposed, `policy_counter{name="my_policy",namespace="A"} 11`)
require.NotContains(t, exposed, `"my_egress"`)
require.NotContains(t, exposed, `"B"`)
require.NotContains(t, exposed, `"C"`)
}

func Test_WithObjectListField_bis(t *testing.T) {
metrics := []config.GenericMap{
{
"namespace": "A",
"events": []any{
config.GenericMap{"type": "egress", "name": "my_egress"},
config.GenericMap{"type": "acl", "name": "my_policy"},
},
"bytes": 10,
},
}
params := api.PromEncode{
Metrics: []api.MetricsItem{
{
Name: "policy_counter",
Type: "counter",
ValueKey: "bytes",
Filters: []api.MetricsFilter{
{Key: "events>type", Value: "acl", Type: api.MetricFilterEqual},
},
Flatten: []string{"events"},
Labels: []string{"namespace"},
},
},
}

encodeProm, err := initProm(&params)
require.NoError(t, err)
for _, metric := range metrics {
encodeProm.Encode(metric)
}
time.Sleep(100 * time.Millisecond)

exposed := test.ReadExposedMetrics(t, encodeProm.server)

require.Contains(t, exposed, `policy_counter{namespace="A"} 10`)
require.NotContains(t, exposed, `"my_egress"`)
require.NotContains(t, exposed, `"B"`)
require.NotContains(t, exposed, `"C"`)
}

func Test_WithObjectListField_ter(t *testing.T) {
metrics := []config.GenericMap{
{
"namespace": "A",
"events": []any{
config.GenericMap{"type": "egress", "name": "my_egress"},
config.GenericMap{"type": "acl", "name": "my_policy"},
},
"bytes": 10,
},
}
params := api.PromEncode{
Metrics: []api.MetricsItem{
{
Name: "policy_counter",
Type: "counter",
ValueKey: "bytes",
Labels: []string{"namespace", "events>name"},
Flatten: []string{"events"},
Remap: map[string]string{"events>name": "name"},
},
},
}

encodeProm, err := initProm(&params)
require.NoError(t, err)
for _, metric := range metrics {
encodeProm.Encode(metric)
}
time.Sleep(100 * time.Millisecond)

exposed := test.ReadExposedMetrics(t, encodeProm.server)

require.Contains(t, exposed, `policy_counter{name="my_policy",namespace="A"} 10`)
require.Contains(t, exposed, `policy_counter{name="my_egress",namespace="A"} 10`)
require.NotContains(t, exposed, `"B"`)
require.NotContains(t, exposed, `"C"`)
}

func buildFlow() config.GenericMap {
return config.GenericMap{
"srcIP": "10.0.0." + strconv.Itoa(rand.Intn(20)),
Expand Down Expand Up @@ -754,12 +946,3 @@ func Test_MultipleProm(t *testing.T) {

// TODO: Add test for different addresses, but need to deal with StartPromServer (ListenAndServe)
}

func Test_Filters_extractVarLookups(t *testing.T) {
variables := extractVarLookups("$(abc)--$(def)")

require.Equal(t, [][]string{{"$(abc)", "abc"}, {"$(def)", "def"}}, variables)

variables = extractVarLookups("")
require.Empty(t, variables)
}
28 changes: 28 additions & 0 deletions pkg/pipeline/encode/metrics/filtering.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,28 @@
package metrics

import "github.com/netobserv/flowlogs-pipeline/pkg/config"

func (p *Preprocessed) ApplyFilters(flow config.GenericMap, flatParts []config.GenericMap) (bool, []config.GenericMap) {
filteredParts := flatParts
for _, filter := range p.filters {
if filter.useFlat {
filteredParts = filter.filterFlatParts(filteredParts)
if len(filteredParts) == 0 {
return false, nil
}
} else if !filter.predicate(flow) {
return false, nil
}
}
return true, filteredParts
}

func (pf *preprocessedFilter) filterFlatParts(flatParts []config.GenericMap) []config.GenericMap {
var filteredParts []config.GenericMap
for _, part := range flatParts {
if pf.predicate(part) {
filteredParts = append(filteredParts, part)
}
}
return filteredParts
}
Loading
Loading