Skip to content

Commit 4aee357

Browse files
authored
conntrack: Add operational metrics on errors for hash computation and aggregators (#416)
* Add operational metrics to hash computation and aggregators * Update docs * Fix rebase * Add conntrack_end_connections operational metric
1 parent 381890d commit 4aee357

File tree

10 files changed

+158
-47
lines changed

10 files changed

+158
-47
lines changed

docs/operational-metrics.md

Lines changed: 28 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -7,34 +7,58 @@ Each table below provides documentation for an exported flowlogs-pipeline operat
77

88

99

10+
### conntrack_aggregator_errors
11+
| **Name** | conntrack_aggregator_errors |
12+
|:---|:---|
13+
| **Description** | The total number of errors during aggregation |
14+
| **Type** | counter |
15+
| **Labels** | error, field |
16+
17+
18+
### conntrack_end_connections
19+
| **Name** | conntrack_end_connections |
20+
|:---|:---|
21+
| **Description** | The total number of connections ended per group and reason |
22+
| **Type** | counter |
23+
| **Labels** | group, reason |
24+
25+
26+
### conntrack_hash_errors
27+
| **Name** | conntrack_hash_errors |
28+
|:---|:---|
29+
| **Description** | The total number of errors during hash computation |
30+
| **Type** | counter |
31+
| **Labels** | error, field |
32+
33+
1034
### conntrack_input_records
1135
| **Name** | conntrack_input_records |
1236
|:---|:---|
13-
| **Description** | The total number of input records per classification. |
37+
| **Description** | The total number of input records per classification |
1438
| **Type** | counter |
1539
| **Labels** | classification |
1640

1741

1842
### conntrack_memory_connections
1943
| **Name** | conntrack_memory_connections |
2044
|:---|:---|
21-
| **Description** | The total number of tracked connections in memory per group and phase. |
45+
| **Description** | The total number of tracked connections in memory per group and phase |
2246
| **Type** | gauge |
2347
| **Labels** | group, phase |
2448

2549

2650
### conntrack_output_records
2751
| **Name** | conntrack_output_records |
2852
|:---|:---|
29-
| **Description** | The total number of output records. |
53+
| **Description** | The total number of output records |
3054
| **Type** | counter |
3155
| **Labels** | type |
3256

3357

3458
### conntrack_tcp_flags
3559
| **Name** | conntrack_tcp_flags |
3660
|:---|:---|
37-
| **Description** | The total number of actions taken based on TCP flags. |
61+
| **Description** | The total number of actions taken based on TCP flags |
3862
| **Type** | counter |
3963
| **Labels** | action |
4064

pkg/pipeline/extract/conntrack/aggregator.go

Lines changed: 10 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -41,6 +41,7 @@ type aggregateBase struct {
4141
outputField string
4242
splitAB bool
4343
initVal interface{}
44+
metrics *metricsType
4445
}
4546

4647
type aSum struct{ aggregateBase }
@@ -53,7 +54,7 @@ type aLast struct{ aggregateBase }
5354
// TODO: think of adding a more complex operation such as Average Packet Size which involves 2 input fields: Bytes/Packets
5455

5556
// newAggregator returns a new aggregator depending on the output field operation
56-
func newAggregator(of api.OutputField) (aggregator, error) {
57+
func newAggregator(of api.OutputField, metrics *metricsType) (aggregator, error) {
5758
if of.Name == "" {
5859
return nil, fmt.Errorf("empty name %v", of)
5960
}
@@ -63,7 +64,7 @@ func newAggregator(of api.OutputField) (aggregator, error) {
6364
} else {
6465
inputField = of.Name
6566
}
66-
aggBase := aggregateBase{inputField: inputField, outputField: of.Name, splitAB: of.SplitAB}
67+
aggBase := aggregateBase{inputField: inputField, outputField: of.Name, splitAB: of.SplitAB, metrics: metrics}
6768
var agg aggregator
6869
switch of.Operation {
6970
case api.ConnTrackOperationName("Sum"):
@@ -108,11 +109,17 @@ func (agg *aggregateBase) getOutputField(d direction) string {
108109
func (agg *aggregateBase) getInputFieldValue(flowLog config.GenericMap) (float64, error) {
109110
rawValue, ok := flowLog[agg.inputField]
110111
if !ok {
112+
if agg.metrics != nil {
113+
agg.metrics.aggregatorErrors.WithLabelValues("MissingFieldError", agg.inputField).Inc()
114+
}
111115
return 0, fmt.Errorf("missing field %v", agg.inputField)
112116
}
113117
floatValue, err := utils.ConvertToFloat64(rawValue)
114118
if err != nil {
115-
return 0, fmt.Errorf("cannot convert %v to float64: %w", rawValue, err)
119+
if agg.metrics != nil {
120+
agg.metrics.aggregatorErrors.WithLabelValues("Float64ConversionError", agg.inputField).Inc()
121+
}
122+
return 0, fmt.Errorf("cannot convert %q to float64: %w", rawValue, err)
116123
}
117124
return floatValue, nil
118125
}

pkg/pipeline/extract/conntrack/aggregator_test.go

Lines changed: 48 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -23,6 +23,7 @@ import (
2323

2424
"github.com/netobserv/flowlogs-pipeline/pkg/api"
2525
"github.com/netobserv/flowlogs-pipeline/pkg/config"
26+
"github.com/netobserv/flowlogs-pipeline/pkg/test"
2627
"github.com/stretchr/testify/require"
2728
)
2829

@@ -34,7 +35,7 @@ func TestNewAggregator_Invalid(t *testing.T) {
3435
Operation: "sum",
3536
SplitAB: true,
3637
Input: "Input",
37-
})
38+
}, nil)
3839
require.NotNil(t, err)
3940

4041
// unknown OperationType
@@ -43,15 +44,15 @@ func TestNewAggregator_Invalid(t *testing.T) {
4344
Operation: "unknown",
4445
SplitAB: true,
4546
Input: "Input",
46-
})
47+
}, nil)
4748
require.NotNil(t, err)
4849

4950
// invalid first agg
5051
_, err = newAggregator(api.OutputField{
5152
Operation: "first",
5253
SplitAB: true,
5354
Input: "Input",
54-
})
55+
}, nil)
5556
require.NotNil(t, err)
5657
}
5758

@@ -64,58 +65,58 @@ func TestNewAggregator_Valid(t *testing.T) {
6465
{
6566
name: "Default SplitAB",
6667
outputField: api.OutputField{Name: "MyAgg", Operation: "sum"},
67-
expected: &aSum{aggregateBase{"MyAgg", "MyAgg", false, float64(0)}},
68+
expected: &aSum{aggregateBase{"MyAgg", "MyAgg", false, float64(0), nil}},
6869
},
6970
{
7071
name: "Default input",
7172
outputField: api.OutputField{Name: "MyAgg", Operation: "sum", SplitAB: true},
72-
expected: &aSum{aggregateBase{"MyAgg", "MyAgg", true, float64(0)}},
73+
expected: &aSum{aggregateBase{"MyAgg", "MyAgg", true, float64(0), nil}},
7374
},
7475
{
7576
name: "Custom input",
7677
outputField: api.OutputField{Name: "MyAgg", Operation: "sum", Input: "MyInput"},
77-
expected: &aSum{aggregateBase{"MyInput", "MyAgg", false, float64(0)}},
78+
expected: &aSum{aggregateBase{"MyInput", "MyAgg", false, float64(0), nil}},
7879
},
7980
{
8081
name: "OperationType sum",
8182
outputField: api.OutputField{Name: "MyAgg", Operation: "sum"},
82-
expected: &aSum{aggregateBase{"MyAgg", "MyAgg", false, float64(0)}},
83+
expected: &aSum{aggregateBase{"MyAgg", "MyAgg", false, float64(0), nil}},
8384
},
8485
{
8586
name: "OperationType count",
8687
outputField: api.OutputField{Name: "MyAgg", Operation: "count"},
87-
expected: &aCount{aggregateBase{"MyAgg", "MyAgg", false, float64(0)}},
88+
expected: &aCount{aggregateBase{"MyAgg", "MyAgg", false, float64(0), nil}},
8889
},
8990
{
9091
name: "OperationType max",
9192
outputField: api.OutputField{Name: "MyAgg", Operation: "max"},
92-
expected: &aMax{aggregateBase{"MyAgg", "MyAgg", false, -math.MaxFloat64}},
93+
expected: &aMax{aggregateBase{"MyAgg", "MyAgg", false, -math.MaxFloat64, nil}},
9394
},
9495
{
9596
name: "OperationType min",
9697
outputField: api.OutputField{Name: "MyAgg", Operation: "min"},
97-
expected: &aMin{aggregateBase{"MyAgg", "MyAgg", false, math.MaxFloat64}},
98+
expected: &aMin{aggregateBase{"MyAgg", "MyAgg", false, math.MaxFloat64, nil}},
9899
},
99100
{
100101
name: "Default first",
101102
outputField: api.OutputField{Name: "MyCp", Operation: "first"},
102-
expected: &aFirst{aggregateBase{"MyCp", "MyCp", false, nil}},
103+
expected: &aFirst{aggregateBase{"MyCp", "MyCp", false, nil, nil}},
103104
},
104105
{
105106
name: "Custom input first",
106107
outputField: api.OutputField{Name: "MyCp", Operation: "first", Input: "MyInput"},
107-
expected: &aFirst{aggregateBase{"MyInput", "MyCp", false, nil}},
108+
expected: &aFirst{aggregateBase{"MyInput", "MyCp", false, nil, nil}},
108109
},
109110
{
110111
name: "Default last",
111112
outputField: api.OutputField{Name: "MyCp", Operation: "last"},
112-
expected: &aLast{aggregateBase{"MyCp", "MyCp", false, nil}},
113+
expected: &aLast{aggregateBase{"MyCp", "MyCp", false, nil, nil}},
113114
},
114115
}
115116

116117
for _, test := range table {
117118
t.Run(test.name, func(t *testing.T) {
118-
agg, err := newAggregator(test.outputField)
119+
agg, err := newAggregator(test.outputField, nil)
119120
require.NoError(t, err)
120121
require.Equal(t, test.expected, agg)
121122
})
@@ -134,7 +135,7 @@ func TestAddField_and_Update(t *testing.T) {
134135
}
135136
var aggs []aggregator
136137
for _, of := range ofs {
137-
agg, err := newAggregator(of)
138+
agg, err := newAggregator(of, nil)
138139
require.NoError(t, err)
139140
aggs = append(aggs, agg)
140141
}
@@ -183,3 +184,35 @@ func TestAddField_and_Update(t *testing.T) {
183184
})
184185
}
185186
}
187+
188+
func TestMissingFieldError(t *testing.T) {
189+
test.ResetPromRegistry()
190+
metrics := newMetrics(opMetrics)
191+
agg, err := newAggregator(api.OutputField{Name: "Bytes", Operation: "sum", SplitAB: true}, metrics)
192+
require.NoError(t, err)
193+
194+
conn := NewConnBuilder(metrics).Build()
195+
agg.addField(conn)
196+
197+
flowLog := config.GenericMap{}
198+
agg.update(conn, flowLog, dirAB, true)
199+
200+
exposed := test.ReadExposedMetrics(t)
201+
require.Contains(t, exposed, `conntrack_aggregator_errors{error="MissingFieldError",field="Bytes"} 1`)
202+
}
203+
204+
func TestFloat64ConversionError(t *testing.T) {
205+
test.ResetPromRegistry()
206+
metrics := newMetrics(opMetrics)
207+
agg, err := newAggregator(api.OutputField{Name: "Bytes", Operation: "sum", SplitAB: true}, metrics)
208+
require.NoError(t, err)
209+
210+
conn := NewConnBuilder(metrics).Build()
211+
agg.addField(conn)
212+
213+
flowLog := config.GenericMap{"Bytes": "float64 inconvertible value"}
214+
agg.update(conn, flowLog, dirAB, true)
215+
216+
exposed := test.ReadExposedMetrics(t)
217+
require.Contains(t, exposed, `conntrack_aggregator_errors{error="Float64ConversionError",field="Bytes"} 1`)
218+
}

pkg/pipeline/extract/conntrack/conntrack.go

Lines changed: 4 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -61,7 +61,7 @@ func (ct *conntrackImpl) Extract(flowLogs []config.GenericMap) []config.GenericM
6161

6262
var outputRecords []config.GenericMap
6363
for _, fl := range flowLogs {
64-
computedHash, err := ComputeHash(fl, ct.config.KeyDefinition, ct.hashProvider())
64+
computedHash, err := ComputeHash(fl, ct.config.KeyDefinition, ct.hashProvider(), ct.metrics)
6565
if err != nil {
6666
log.Warningf("skipping flow log %v: %v", fl, err)
6767
ct.metrics.inputRecords.WithLabelValues("rejected").Inc()
@@ -220,9 +220,11 @@ func NewConnectionTrack(opMetrics *operational.Metrics, params config.StageParam
220220
return nil, fmt.Errorf("ConnectionTrack config is invalid: %w", err)
221221
}
222222

223+
metrics := newMetrics(opMetrics)
224+
223225
var aggregators []aggregator
224226
for _, of := range cfg.OutputFields {
225-
agg, err := newAggregator(of)
227+
agg, err := newAggregator(of, metrics)
226228
if err != nil {
227229
return nil, fmt.Errorf("error creating aggregator: %w", err)
228230
}
@@ -248,7 +250,6 @@ func NewConnectionTrack(opMetrics *operational.Metrics, params config.StageParam
248250
}
249251

250252
endpointAFields, endpointBFields := cfg.GetABFields()
251-
metrics := newMetrics(opMetrics)
252253
conntrack := &conntrackImpl{
253254
clock: clock,
254255
connStore: newConnectionStore(cfg.Scheduling, metrics, clock.Now),

pkg/pipeline/extract/conntrack/conntrack_test.go

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -898,6 +898,8 @@ func TestScheduling(t *testing.T) {
898898
assertStoreConsistency(t, ct)
899899
})
900900
}
901+
exposed := test.ReadExposedMetrics(t)
902+
require.Contains(t, exposed, `conntrack_end_connections{group="0: Proto=1, ",reason="timeout"} 1`)
901903
}
902904

903905
func assertStoreConsistency(t *testing.T, extractor extract.Extractor) {
@@ -1265,4 +1267,6 @@ func TestExpiringConnection(t *testing.T) {
12651267
assertStoreConsistency(t, ct)
12661268
})
12671269
}
1270+
exposed := test.ReadExposedMetrics(t)
1271+
require.Contains(t, exposed, `conntrack_end_connections{group="0: DEFAULT",reason="FIN_flag"} 1`)
12681272
}

pkg/pipeline/extract/conntrack/hash.go

Lines changed: 6 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -38,12 +38,12 @@ type totalHashType struct {
3838

3939
// ComputeHash computes the hash of a flow log according to keyDefinition.
4040
// Two flow logs will have the same hash if they belong to the same connection.
41-
func ComputeHash(flowLog config.GenericMap, keyDefinition api.KeyDefinition, hasher hash.Hash64) (totalHashType, error) {
41+
func ComputeHash(flowLog config.GenericMap, keyDefinition api.KeyDefinition, hasher hash.Hash64, metrics *metricsType) (totalHashType, error) {
4242
fieldGroup2hash := make(map[string]uint64)
4343

4444
// Compute the hash of each field group
4545
for _, fg := range keyDefinition.FieldGroups {
46-
h, err := computeHashFields(flowLog, fg.Fields, hasher)
46+
h, err := computeHashFields(flowLog, fg.Fields, hasher, metrics)
4747
if err != nil {
4848
return totalHashType{}, fmt.Errorf("compute hash: %w", err)
4949
}
@@ -72,12 +72,15 @@ func ComputeHash(flowLog config.GenericMap, keyDefinition api.KeyDefinition, has
7272
return th, nil
7373
}
7474

75-
func computeHashFields(flowLog config.GenericMap, fieldNames []string, hasher hash.Hash64) (uint64, error) {
75+
func computeHashFields(flowLog config.GenericMap, fieldNames []string, hasher hash.Hash64, metrics *metricsType) (uint64, error) {
7676
hasher.Reset()
7777
for _, fn := range fieldNames {
7878
f, ok := flowLog[fn]
7979
if !ok {
8080
log.Warningf("Missing field %v", fn)
81+
if metrics != nil {
82+
metrics.hashErrors.WithLabelValues("MissingFieldError", fn).Inc()
83+
}
8184
continue
8285
}
8386
bytes, err := toBytes(f)

pkg/pipeline/extract/conntrack/hash_test.go

Lines changed: 10 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -23,6 +23,7 @@ import (
2323

2424
"github.com/netobserv/flowlogs-pipeline/pkg/api"
2525
"github.com/netobserv/flowlogs-pipeline/pkg/config"
26+
"github.com/netobserv/flowlogs-pipeline/pkg/test"
2627
"github.com/stretchr/testify/require"
2728
)
2829

@@ -102,8 +103,8 @@ func TestComputeHash_Unidirectional(t *testing.T) {
102103
}
103104
for _, test := range table {
104105
t.Run(test.name, func(t *testing.T) {
105-
h1, err1 := ComputeHash(test.flowLog1, keyDefinition, testHasher)
106-
h2, err2 := ComputeHash(test.flowLog2, keyDefinition, testHasher)
106+
h1, err1 := ComputeHash(test.flowLog1, keyDefinition, testHasher, nil)
107+
h2, err2 := ComputeHash(test.flowLog2, keyDefinition, testHasher, nil)
107108
require.NoError(t, err1)
108109
require.NoError(t, err2)
109110
if test.sameHash {
@@ -191,8 +192,8 @@ func TestComputeHash_Bidirectional(t *testing.T) {
191192
}
192193
for _, test := range table {
193194
t.Run(test.name, func(t *testing.T) {
194-
h1, err1 := ComputeHash(test.flowLog1, keyDefinition, testHasher)
195-
h2, err2 := ComputeHash(test.flowLog2, keyDefinition, testHasher)
195+
h1, err1 := ComputeHash(test.flowLog1, keyDefinition, testHasher, nil)
196+
h2, err2 := ComputeHash(test.flowLog2, keyDefinition, testHasher, nil)
196197
require.NoError(t, err1)
197198
require.NoError(t, err2)
198199
if test.sameHash {
@@ -205,6 +206,7 @@ func TestComputeHash_Bidirectional(t *testing.T) {
205206
}
206207

207208
func TestComputeHash_MissingField(t *testing.T) {
209+
test.ResetPromRegistry()
208210
keyDefinition := api.KeyDefinition{
209211
FieldGroups: []api.FieldGroup{
210212
{
@@ -229,7 +231,10 @@ func TestComputeHash_MissingField(t *testing.T) {
229231

230232
fl := newMockFlowLog(ipA, portA, ipB, portB, protocolA, flowDir, 111, 22, false)
231233

232-
h, err := ComputeHash(fl, keyDefinition, testHasher)
234+
metrics := newMetrics(opMetrics)
235+
h, err := ComputeHash(fl, keyDefinition, testHasher, metrics)
233236
require.NoError(t, err)
234237
require.NotNil(t, h)
238+
exposed := test.ReadExposedMetrics(t)
239+
require.Contains(t, exposed, `conntrack_hash_errors{error="MissingFieldError",field="Missing"} 1`)
235240
}

0 commit comments

Comments
 (0)