Skip to content

Commit 26f1cdb

Browse files
authored
NETOBSERV-2285: opt: use uint64 hash instead of string for cache key (#992)
* Optimization: use uint64 hash instead of string for cache key In timed_cache, use uint64 hash (fnv 64a) instead of string builder Uses the same implementation as in prom go client * Bump agent, with improved pb decode (MAC address to string was improved) * Optimization: reduce allocations in metrics encode Previously, labels per metric were computed in 2 passes: 1. to extract key/values 2. copying the map in prom-client suitable format and extracting the values to build a key Step 1 resulted in an ephemere map allocation Those two steps are now merged, avoiding intermediate allocations
1 parent 61fca20 commit 26f1cdb

File tree

14 files changed

+145
-105
lines changed

14 files changed

+145
-105
lines changed

go.mod

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -18,7 +18,7 @@ require (
1818
github.com/mitchellh/mapstructure v1.5.0
1919
github.com/netobserv/gopipes v0.3.0
2020
github.com/netobserv/loki-client-go v0.0.0-20250425113517-526b43e51847
21-
github.com/netobserv/netobserv-ebpf-agent v1.9.0-crc0.0.20250528064221-6f34e5b85d2c
21+
github.com/netobserv/netobserv-ebpf-agent v1.9.0-crc0.0.20250610144135-d64c5d99f2da
2222
github.com/netsampler/goflow2 v1.3.7
2323
github.com/pkg/errors v0.9.1
2424
github.com/prometheus/client_golang v1.22.0

go.sum

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -258,8 +258,8 @@ github.com/netobserv/gopipes v0.3.0 h1:IYmPnnAVCdSK7VmHmpFhrVBOEm45qpgbZmJz1sSW+
258258
github.com/netobserv/gopipes v0.3.0/go.mod h1:N7/Gz05EOF0CQQSKWsv3eof22Cj2PB08Pbttw98YFYU=
259259
github.com/netobserv/loki-client-go v0.0.0-20250425113517-526b43e51847 h1:hjzhVZSSKIOmAzHbGUV4JhVIPkgKs/UtrWDx6JSVKMw=
260260
github.com/netobserv/loki-client-go v0.0.0-20250425113517-526b43e51847/go.mod h1:Zb/jtD3Lnu88Poo+jnhTASzxYnvncmHOoZaT93xQjJ8=
261-
github.com/netobserv/netobserv-ebpf-agent v1.9.0-crc0.0.20250528064221-6f34e5b85d2c h1:HhObTHfakq06NAPkljppnT1UT9MV2o5obH2dPD0jxmk=
262-
github.com/netobserv/netobserv-ebpf-agent v1.9.0-crc0.0.20250528064221-6f34e5b85d2c/go.mod h1:sqvri+RCx8QefV2g+pbJktbNuO3ebB1TE1Zl7M2qqDo=
261+
github.com/netobserv/netobserv-ebpf-agent v1.9.0-crc0.0.20250610144135-d64c5d99f2da h1:fahbLlD/5BidF+7rcvydhro3Tu0pY81OUGOkxmaY07A=
262+
github.com/netobserv/netobserv-ebpf-agent v1.9.0-crc0.0.20250610144135-d64c5d99f2da/go.mod h1:IfxvtBeSfhJaCO/7ie3R7mna+j8sAer2vqtbwaBTlzA=
263263
github.com/netsampler/goflow2 v1.3.7 h1:XZaTy8kkMnGXpJ9hS3KbO1McyrFTpVNhVFEx9rNhMmc=
264264
github.com/netsampler/goflow2 v1.3.7/go.mod h1:4UZsVGVAs//iMCptUHn3WNScztJeUhZH7kDW2+/vDdQ=
265265
github.com/nxadm/tail v1.4.4/go.mod h1:kenIhsEOeOJmVchQTgglprH7qJGnHDVpk1VPCcaMI8A=

pkg/pipeline/encode/encode_prom.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -66,7 +66,7 @@ func (e *EncodeProm) ProcessCounter(m interface{}, labels map[string]string, val
6666
return nil
6767
}
6868

69-
func (e *EncodeProm) ProcessGauge(m interface{}, labels map[string]string, value float64, _ string) error {
69+
func (e *EncodeProm) ProcessGauge(m interface{}, _ string, labels map[string]string, value float64, _ []string) error {
7070
gauge := m.(*prometheus.GaugeVec)
7171
mm, err := gauge.GetMetricWith(labels)
7272
if err != nil {

pkg/pipeline/encode/encode_prom_test.go

Lines changed: 2 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -819,14 +819,6 @@ func buildFlow() config.GenericMap {
819819
}
820820
}
821821

822-
func thousandsFlows() []config.GenericMap {
823-
flows := make([]config.GenericMap, 1000)
824-
for i := 0; i < 1000; i++ {
825-
flows[i] = buildFlow()
826-
}
827-
return flows
828-
}
829-
830822
func BenchmarkPromEncode(b *testing.B) {
831823
var expiryTimeDuration api.Duration
832824
expiryTimeDuration.Duration = time.Duration(60 * time.Second)
@@ -869,9 +861,8 @@ func BenchmarkPromEncode(b *testing.B) {
869861
require.NoError(b, err)
870862

871863
for i := 0; i < b.N; i++ {
872-
for _, metric := range thousandsFlows() {
873-
prom.Encode(metric)
874-
}
864+
m := buildFlow()
865+
prom.Encode(m)
875866
}
876867
}
877868

pkg/pipeline/encode/metrics_common.go

Lines changed: 28 additions & 50 deletions
Original file line numberDiff line numberDiff line change
@@ -18,7 +18,6 @@
1818
package encode
1919

2020
import (
21-
"strings"
2221
"time"
2322

2423
"github.com/netobserv/flowlogs-pipeline/pkg/api"
@@ -53,7 +52,7 @@ type MetricsCommonStruct struct {
5352
type MetricsCommonInterface interface {
5453
GetCacheEntry(entryLabels map[string]string, m interface{}) interface{}
5554
ProcessCounter(m interface{}, labels map[string]string, value float64) error
56-
ProcessGauge(m interface{}, labels map[string]string, value float64, key string) error
55+
ProcessGauge(m interface{}, name string, labels map[string]string, value float64, lvs []string) error
5756
ProcessHist(m interface{}, labels map[string]string, value float64) error
5857
ProcessAggHist(m interface{}, labels map[string]string, value []float64) error
5958
}
@@ -132,7 +131,7 @@ func (m *MetricsCommonStruct) MetricCommonEncode(mci MetricsCommonInterface, met
132131
continue
133132
}
134133
for _, labels := range labelSets {
135-
err := mci.ProcessGauge(mInfo.genericMetric, labels.lMap, value, labels.key)
134+
err := mci.ProcessGauge(mInfo.genericMetric, mInfo.info.Name, labels.lMap, value, labels.values)
136135
if err != nil {
137136
log.Errorf("labels registering error on %s: %v", mInfo.info.Name, err)
138137
m.errorsCounter.WithLabelValues("LabelsRegisteringError", mInfo.info.Name, "").Inc()
@@ -198,20 +197,17 @@ func (m *MetricsCommonStruct) prepareMetric(mci MetricsCommonInterface, flow con
198197
}
199198

200199
labelSets := extractLabels(flow, flatParts, info)
201-
lkms := make([]labelsKeyAndMap, 0, len(labelSets))
202200
for _, ls := range labelSets {
203201
// Update entry for expiry mechanism (the entry itself is its own cleanup function)
204-
lkm := ls.toKeyAndMap(info)
205-
lkms = append(lkms, lkm)
206-
ok := m.mCache.UpdateCacheEntry(lkm.key, func() interface{} {
207-
return mci.GetCacheEntry(lkm.lMap, mv)
202+
ok := m.mCache.UpdateCacheEntry(ls.values, func() interface{} {
203+
return mci.GetCacheEntry(ls.lMap, mv)
208204
})
209205
if !ok {
210206
m.metricsDropped.Inc()
211207
return nil, 0
212208
}
213209
}
214-
return lkms, floatVal
210+
return labelSets, floatVal
215211
}
216212

217213
func (m *MetricsCommonStruct) prepareAggHisto(mci MetricsCommonInterface, flow config.GenericMap, info *metrics.Preprocessed, mc interface{}) ([]labelsKeyAndMap, []float64) {
@@ -232,20 +228,17 @@ func (m *MetricsCommonStruct) prepareAggHisto(mci MetricsCommonInterface, flow c
232228
}
233229

234230
labelSets := extractLabels(flow, flatParts, info)
235-
lkms := make([]labelsKeyAndMap, 0, len(labelSets))
236231
for _, ls := range labelSets {
237232
// Update entry for expiry mechanism (the entry itself is its own cleanup function)
238-
lkm := ls.toKeyAndMap(info)
239-
lkms = append(lkms, lkm)
240-
ok := m.mCache.UpdateCacheEntry(lkm.key, func() interface{} {
241-
return mci.GetCacheEntry(lkm.lMap, mc)
233+
ok := m.mCache.UpdateCacheEntry(ls.values, func() interface{} {
234+
return mci.GetCacheEntry(ls.lMap, mc)
242235
})
243236
if !ok {
244237
m.metricsDropped.Inc()
245238
return nil, nil
246239
}
247240
}
248-
return lkms, values
241+
return labelSets, values
249242
}
250243

251244
func (m *MetricsCommonStruct) extractGenericValue(flow config.GenericMap, info *metrics.Preprocessed) interface{} {
@@ -261,59 +254,44 @@ func (m *MetricsCommonStruct) extractGenericValue(flow config.GenericMap, info *
261254
return val
262255
}
263256

264-
type label struct {
265-
key string
266-
value string
267-
}
268-
269-
type labelSet []label
270-
271257
type labelsKeyAndMap struct {
272-
key string
273-
lMap map[string]string
274-
}
275-
276-
func (l labelSet) toKeyAndMap(info *metrics.Preprocessed) labelsKeyAndMap {
277-
key := strings.Builder{}
278-
key.Grow(256) // pre-allocate a decent buffer
279-
key.WriteString(info.Name)
280-
key.WriteRune('|')
281-
m := make(map[string]string, len(l))
282-
for _, kv := range l {
283-
key.WriteString(kv.value)
284-
key.WriteRune('|')
285-
m[kv.key] = kv.value
286-
}
287-
return labelsKeyAndMap{key: key.String(), lMap: m}
258+
values []string
259+
lMap map[string]string
288260
}
289261

290262
// extractLabels takes the flow and a single metric definition as input.
291263
// It returns the flat labels maps (label names and values).
292264
// Most of the time it will return a single map; it may return several of them when the parsed flow fields are lists (e.g. "interfaces").
293-
func extractLabels(flow config.GenericMap, flatParts []config.GenericMap, info *metrics.Preprocessed) []labelSet {
294-
common := newLabelSet(flow, info.MappedLabels)
265+
func extractLabels(flow config.GenericMap, flatParts []config.GenericMap, info *metrics.Preprocessed) []labelsKeyAndMap {
266+
common := newLabelKeyAndMap(info.Name, flow, info.MappedLabels)
295267
if len(flatParts) == 0 {
296-
return []labelSet{common}
268+
return []labelsKeyAndMap{common}
297269
}
298-
var all []labelSet
270+
all := make([]labelsKeyAndMap, 0, len(flatParts))
299271
for _, fp := range flatParts {
300-
ls := newLabelSet(fp, info.FlattenedLabels)
301-
ls = append(ls, common...)
272+
ls := newLabelKeyAndMap(info.Name, fp, info.FlattenedLabels)
273+
ls.values = append(ls.values, common.values...)
274+
for k, v := range common.lMap {
275+
ls.lMap[k] = v
276+
}
302277
all = append(all, ls)
303278
}
304279
return all
305280
}
306281

307-
func newLabelSet(part config.GenericMap, labels []metrics.MappedLabel) labelSet {
308-
ls := make(labelSet, 0, len(labels))
282+
func newLabelKeyAndMap(name string, part config.GenericMap, labels []metrics.MappedLabel) labelsKeyAndMap {
283+
values := make([]string, 0, len(labels)+1)
284+
values = append(values, name)
285+
m := make(map[string]string, len(labels))
309286
for _, t := range labels {
310-
label := label{key: t.Target, value: ""}
287+
value := ""
311288
if v, ok := part[t.Source]; ok {
312-
label.value = utils.ConvertToString(v)
289+
value = utils.ConvertToString(v)
313290
}
314-
ls = append(ls, label)
291+
values = append(values, value)
292+
m[t.Target] = value
315293
}
316-
return ls
294+
return labelsKeyAndMap{values: values, lMap: m}
317295
}
318296

319297
func (m *MetricsCommonStruct) cleanupExpiredEntriesLoop(callback putils.CacheCallback) {

pkg/pipeline/encode/opentelemetry/encode_otlpmetrics.go

Lines changed: 14 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,7 @@ package opentelemetry
1919

2020
import (
2121
"context"
22+
"strings"
2223
"time"
2324

2425
"github.com/netobserv/flowlogs-pipeline/pkg/api"
@@ -64,10 +65,22 @@ func (e *EncodeOtlpMetrics) ProcessCounter(m interface{}, labels map[string]stri
6465
return nil
6566
}
6667

67-
func (e *EncodeOtlpMetrics) ProcessGauge(m interface{}, labels map[string]string, value float64, key string) error {
68+
func createKey(name string, keys []string) string {
69+
key := strings.Builder{}
70+
key.WriteString(name)
71+
key.WriteRune('|')
72+
for _, k := range keys {
73+
key.WriteString(k)
74+
key.WriteRune('|')
75+
}
76+
return key.String()
77+
}
78+
79+
func (e *EncodeOtlpMetrics) ProcessGauge(m interface{}, name string, labels map[string]string, value float64, lvs []string) error {
6880
obs := m.(Float64Gauge)
6981
// set attributes using the labels
7082
attributes := obtainAttributesFromLabels(labels)
83+
key := createKey(name, lvs)
7184
obs.Set(key, value, attributes)
7285
return nil
7386
}

pkg/pipeline/extract/aggregate/aggregate.go

Lines changed: 11 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -43,7 +43,7 @@ const (
4343
)
4444

4545
type Labels map[string]string
46-
type NormalizedValues string
46+
type NormalizedValues []string
4747

4848
type Aggregate struct {
4949
definition *api.AggregateDefinition
@@ -78,7 +78,7 @@ func (aggregate *Aggregate) LabelsFromEntry(entry config.GenericMap) (Labels, bo
7878
}
7979

8080
func (labels Labels) getNormalizedValues() NormalizedValues {
81-
var normalizedAsString string
81+
var normalized NormalizedValues
8282

8383
keys := make([]string, 0, len(labels))
8484
for k := range labels {
@@ -87,20 +87,16 @@ func (labels Labels) getNormalizedValues() NormalizedValues {
8787
sort.Strings(keys)
8888

8989
for _, k := range keys {
90-
normalizedAsString += labels[k] + ","
90+
normalized = append(normalized, labels[k])
9191
}
9292

93-
if len(normalizedAsString) > 0 {
94-
normalizedAsString = normalizedAsString[:len(normalizedAsString)-1]
95-
}
96-
97-
return NormalizedValues(normalizedAsString)
93+
return normalized
9894
}
9995

10096
func (aggregate *Aggregate) filterEntry(entry config.GenericMap) (NormalizedValues, Labels, error) {
10197
labels, allLabelsFound := aggregate.LabelsFromEntry(entry)
10298
if !allLabelsFound {
103-
return "", nil, fmt.Errorf("missing keys in entry")
99+
return nil, nil, fmt.Errorf("missing keys in entry")
104100
}
105101

106102
normalizedValues := labels.getNormalizedValues()
@@ -128,7 +124,7 @@ func (aggregate *Aggregate) UpdateByEntry(entry config.GenericMap, normalizedVal
128124
defer aggregate.mutex.Unlock()
129125

130126
var groupState *GroupState
131-
oldEntry, ok := aggregate.cache.GetCacheEntry(string(normalizedValues))
127+
oldEntry, ok := aggregate.cache.GetCacheEntry(normalizedValues)
132128
if !ok {
133129
groupState = &GroupState{normalizedValues: normalizedValues, labels: labels}
134130
initVal := getInitValue(string(aggregate.definition.OperationType))
@@ -140,7 +136,7 @@ func (aggregate *Aggregate) UpdateByEntry(entry config.GenericMap, normalizedVal
140136
} else {
141137
groupState = oldEntry.(*GroupState)
142138
}
143-
aggregate.cache.UpdateCacheEntry(string(normalizedValues), func() interface{} {
139+
aggregate.cache.UpdateCacheEntry(normalizedValues, func() interface{} {
144140
return groupState
145141
})
146142

@@ -212,20 +208,21 @@ func (aggregate *Aggregate) GetMetrics() []config.GenericMap {
212208
var metrics []config.GenericMap
213209

214210
// iterate over the items in the cache
215-
aggregate.cache.Iterate(func(_ string, value interface{}) {
211+
aggregate.cache.Iterate(func(_ uint64, value interface{}) {
216212
group := value.(*GroupState)
213+
nv := strings.Join(group.normalizedValues, ",")
217214
newEntry := config.GenericMap{
218215
"name": aggregate.definition.Name,
219216
"operation_type": aggregate.definition.OperationType,
220217
"operation_key": aggregate.definition.OperationKey,
221218
"by": strings.Join(aggregate.definition.GroupByKeys, ","),
222-
"aggregate": string(group.normalizedValues),
219+
"aggregate": nv,
223220
"total_value": group.totalValue,
224221
"total_count": group.totalCount,
225222
"recent_raw_values": group.recentRawValues,
226223
"recent_op_value": group.recentOpValue,
227224
"recent_count": group.recentCount,
228-
strings.Join(aggregate.definition.GroupByKeys, "_"): string(group.normalizedValues),
225+
strings.Join(aggregate.definition.GroupByKeys, "_"): nv,
229226
}
230227
// add the items in aggregate.definition.GroupByKeys individually to the entry
231228
for _, key := range aggregate.definition.GroupByKeys {

pkg/pipeline/extract/aggregate/aggregate_test.go

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -62,7 +62,7 @@ func getMockLabels(reverseOrder bool) Labels {
6262
}
6363

6464
func Test_getNormalizedValues(t *testing.T) {
65-
expectedLabels := NormalizedValues("20.0.0.2,10.0.0.1")
65+
expectedLabels := NormalizedValues{"20.0.0.2", "10.0.0.1"}
6666

6767
labels := getMockLabels(false)
6868

@@ -103,7 +103,7 @@ func Test_FilterEntry(t *testing.T) {
103103
normalizedLabels, labels, err := aggregate.filterEntry(entry)
104104
require.Equal(t, err, nil)
105105
require.Equal(t, Labels{"srcIP": "10.0.0.1", "dstIP": "20.0.0.2"}, labels)
106-
require.Equal(t, NormalizedValues("20.0.0.2,10.0.0.1"), normalizedLabels)
106+
require.Equal(t, NormalizedValues{"20.0.0.2", "10.0.0.1"}, normalizedLabels)
107107

108108
entry = test.GetIngestMockEntry(true)
109109

@@ -125,7 +125,7 @@ func Test_Evaluate(t *testing.T) {
125125

126126
require.Equal(t, nil, err)
127127
require.Equal(t, 1, aggregate.cache.GetCacheLen())
128-
cacheEntry, found := aggregate.cache.GetCacheEntry(string(normalizedValues))
128+
cacheEntry, found := aggregate.cache.GetCacheEntry(normalizedValues)
129129
gState := cacheEntry.(*GroupState)
130130
require.Equal(t, true, found)
131131
require.Equal(t, 2, gState.totalCount)

pkg/pipeline/transform/transform_network.go

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -127,10 +127,11 @@ func (n *Network) Transform(inputEntry config.GenericMap) (config.GenericMap, bo
127127
}
128128
if anyIP, ok := outputEntry[rule.AddSubnetLabel.Input]; ok {
129129
if strIP, ok := anyIP.(string); ok {
130-
lbl, ok := n.ipLabelCache.GetCacheEntry(strIP)
130+
keys := []string{strIP}
131+
lbl, ok := n.ipLabelCache.GetCacheEntry(keys)
131132
if !ok {
132133
lbl = n.applySubnetLabel(strIP)
133-
n.ipLabelCache.UpdateCacheEntry(strIP, func() interface{} { return lbl })
134+
n.ipLabelCache.UpdateCacheEntry(keys, func() interface{} { return lbl })
134135
}
135136
if lbl != "" {
136137
outputEntry[rule.AddSubnetLabel.Output] = lbl

0 commit comments

Comments
 (0)