Skip to content

Commit e9391df

Browse files
committed
Optimization: use uint64 hash instead of string for cache key
In timed_cache, use uint64 hash (fnv 64a) instead of string builder Uses the same implementation as in prom go client
1 parent 90e6395 commit e9391df

File tree

9 files changed

+114
-61
lines changed

9 files changed

+114
-61
lines changed

pkg/pipeline/encode/encode_prom.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -66,7 +66,7 @@ func (e *EncodeProm) ProcessCounter(m interface{}, labels map[string]string, val
6666
return nil
6767
}
6868

69-
func (e *EncodeProm) ProcessGauge(m interface{}, labels map[string]string, value float64, _ string) error {
69+
func (e *EncodeProm) ProcessGauge(m interface{}, _ string, labels map[string]string, value float64, _ []string) error {
7070
gauge := m.(*prometheus.GaugeVec)
7171
mm, err := gauge.GetMetricWith(labels)
7272
if err != nil {

pkg/pipeline/encode/encode_prom_test.go

Lines changed: 2 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -819,14 +819,6 @@ func buildFlow() config.GenericMap {
819819
}
820820
}
821821

822-
func thousandsFlows() []config.GenericMap {
823-
flows := make([]config.GenericMap, 1000)
824-
for i := 0; i < 1000; i++ {
825-
flows[i] = buildFlow()
826-
}
827-
return flows
828-
}
829-
830822
func BenchmarkPromEncode(b *testing.B) {
831823
var expiryTimeDuration api.Duration
832824
expiryTimeDuration.Duration = time.Duration(60 * time.Second)
@@ -869,9 +861,8 @@ func BenchmarkPromEncode(b *testing.B) {
869861
require.NoError(b, err)
870862

871863
for i := 0; i < b.N; i++ {
872-
for _, metric := range thousandsFlows() {
873-
prom.Encode(metric)
874-
}
864+
m := buildFlow()
865+
prom.Encode(m)
875866
}
876867
}
877868

pkg/pipeline/encode/metrics_common.go

Lines changed: 13 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -18,7 +18,6 @@
1818
package encode
1919

2020
import (
21-
"strings"
2221
"time"
2322

2423
"github.com/netobserv/flowlogs-pipeline/pkg/api"
@@ -53,7 +52,7 @@ type MetricsCommonStruct struct {
5352
type MetricsCommonInterface interface {
5453
GetCacheEntry(entryLabels map[string]string, m interface{}) interface{}
5554
ProcessCounter(m interface{}, labels map[string]string, value float64) error
56-
ProcessGauge(m interface{}, labels map[string]string, value float64, key string) error
55+
ProcessGauge(m interface{}, name string, labels map[string]string, value float64, lvs []string) error
5756
ProcessHist(m interface{}, labels map[string]string, value float64) error
5857
ProcessAggHist(m interface{}, labels map[string]string, value []float64) error
5958
}
@@ -132,7 +131,7 @@ func (m *MetricsCommonStruct) MetricCommonEncode(mci MetricsCommonInterface, met
132131
continue
133132
}
134133
for _, labels := range labelSets {
135-
err := mci.ProcessGauge(mInfo.genericMetric, labels.lMap, value, labels.key)
134+
err := mci.ProcessGauge(mInfo.genericMetric, mInfo.info.Name, labels.lMap, value, labels.values)
136135
if err != nil {
137136
log.Errorf("labels registering error on %s: %v", mInfo.info.Name, err)
138137
m.errorsCounter.WithLabelValues("LabelsRegisteringError", mInfo.info.Name, "").Inc()
@@ -201,9 +200,9 @@ func (m *MetricsCommonStruct) prepareMetric(mci MetricsCommonInterface, flow con
201200
lkms := make([]labelsKeyAndMap, 0, len(labelSets))
202201
for _, ls := range labelSets {
203202
// Update entry for expiry mechanism (the entry itself is its own cleanup function)
204-
lkm := ls.toKeyAndMap(info)
203+
lkm := ls.toKeysAndMap(info)
205204
lkms = append(lkms, lkm)
206-
ok := m.mCache.UpdateCacheEntry(lkm.key, func() interface{} {
205+
ok := m.mCache.UpdateCacheEntry(lkm.values, func() interface{} {
207206
return mci.GetCacheEntry(lkm.lMap, mv)
208207
})
209208
if !ok {
@@ -235,9 +234,9 @@ func (m *MetricsCommonStruct) prepareAggHisto(mci MetricsCommonInterface, flow c
235234
lkms := make([]labelsKeyAndMap, 0, len(labelSets))
236235
for _, ls := range labelSets {
237236
// Update entry for expiry mechanism (the entry itself is its own cleanup function)
238-
lkm := ls.toKeyAndMap(info)
237+
lkm := ls.toKeysAndMap(info)
239238
lkms = append(lkms, lkm)
240-
ok := m.mCache.UpdateCacheEntry(lkm.key, func() interface{} {
239+
ok := m.mCache.UpdateCacheEntry(lkm.values, func() interface{} {
241240
return mci.GetCacheEntry(lkm.lMap, mc)
242241
})
243242
if !ok {
@@ -269,22 +268,19 @@ type label struct {
269268
type labelSet []label
270269

271270
type labelsKeyAndMap struct {
272-
key string
273-
lMap map[string]string
271+
values []string
272+
lMap map[string]string
274273
}
275274

276-
func (l labelSet) toKeyAndMap(info *metrics.Preprocessed) labelsKeyAndMap {
277-
key := strings.Builder{}
278-
key.Grow(256) // pre-allocate a decent buffer
279-
key.WriteString(info.Name)
280-
key.WriteRune('|')
275+
func (l labelSet) toKeysAndMap(info *metrics.Preprocessed) labelsKeyAndMap {
276+
values := make([]string, 0, len(l)+1)
277+
values = append(values, info.Name)
281278
m := make(map[string]string, len(l))
282279
for _, kv := range l {
283-
key.WriteString(kv.value)
284-
key.WriteRune('|')
280+
values = append(values, kv.value)
285281
m[kv.key] = kv.value
286282
}
287-
return labelsKeyAndMap{key: key.String(), lMap: m}
283+
return labelsKeyAndMap{values: values, lMap: m}
288284
}
289285

290286
// extractLabels takes the flow and a single metric definition as input.

pkg/pipeline/encode/opentelemetry/encode_otlpmetrics.go

Lines changed: 14 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,7 @@ package opentelemetry
1919

2020
import (
2121
"context"
22+
"strings"
2223
"time"
2324

2425
"github.com/netobserv/flowlogs-pipeline/pkg/api"
@@ -64,10 +65,22 @@ func (e *EncodeOtlpMetrics) ProcessCounter(m interface{}, labels map[string]stri
6465
return nil
6566
}
6667

67-
func (e *EncodeOtlpMetrics) ProcessGauge(m interface{}, labels map[string]string, value float64, key string) error {
68+
func createKey(name string, keys []string) string {
69+
key := strings.Builder{}
70+
key.WriteString(name)
71+
key.WriteRune('|')
72+
for _, k := range keys {
73+
key.WriteString(k)
74+
key.WriteRune('|')
75+
}
76+
return key.String()
77+
}
78+
79+
func (e *EncodeOtlpMetrics) ProcessGauge(m interface{}, name string, labels map[string]string, value float64, lvs []string) error {
6880
obs := m.(Float64Gauge)
6981
// set attributes using the labels
7082
attributes := obtainAttributesFromLabels(labels)
83+
key := createKey(name, lvs)
7184
obs.Set(key, value, attributes)
7285
return nil
7386
}

pkg/pipeline/extract/aggregate/aggregate.go

Lines changed: 11 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -43,7 +43,7 @@ const (
4343
)
4444

4545
type Labels map[string]string
46-
type NormalizedValues string
46+
type NormalizedValues []string
4747

4848
type Aggregate struct {
4949
definition *api.AggregateDefinition
@@ -78,7 +78,7 @@ func (aggregate *Aggregate) LabelsFromEntry(entry config.GenericMap) (Labels, bo
7878
}
7979

8080
func (labels Labels) getNormalizedValues() NormalizedValues {
81-
var normalizedAsString string
81+
var normalized NormalizedValues
8282

8383
keys := make([]string, 0, len(labels))
8484
for k := range labels {
@@ -87,20 +87,16 @@ func (labels Labels) getNormalizedValues() NormalizedValues {
8787
sort.Strings(keys)
8888

8989
for _, k := range keys {
90-
normalizedAsString += labels[k] + ","
90+
normalized = append(normalized, labels[k])
9191
}
9292

93-
if len(normalizedAsString) > 0 {
94-
normalizedAsString = normalizedAsString[:len(normalizedAsString)-1]
95-
}
96-
97-
return NormalizedValues(normalizedAsString)
93+
return normalized
9894
}
9995

10096
func (aggregate *Aggregate) filterEntry(entry config.GenericMap) (NormalizedValues, Labels, error) {
10197
labels, allLabelsFound := aggregate.LabelsFromEntry(entry)
10298
if !allLabelsFound {
103-
return "", nil, fmt.Errorf("missing keys in entry")
99+
return nil, nil, fmt.Errorf("missing keys in entry")
104100
}
105101

106102
normalizedValues := labels.getNormalizedValues()
@@ -128,7 +124,7 @@ func (aggregate *Aggregate) UpdateByEntry(entry config.GenericMap, normalizedVal
128124
defer aggregate.mutex.Unlock()
129125

130126
var groupState *GroupState
131-
oldEntry, ok := aggregate.cache.GetCacheEntry(string(normalizedValues))
127+
oldEntry, ok := aggregate.cache.GetCacheEntry(normalizedValues)
132128
if !ok {
133129
groupState = &GroupState{normalizedValues: normalizedValues, labels: labels}
134130
initVal := getInitValue(string(aggregate.definition.OperationType))
@@ -140,7 +136,7 @@ func (aggregate *Aggregate) UpdateByEntry(entry config.GenericMap, normalizedVal
140136
} else {
141137
groupState = oldEntry.(*GroupState)
142138
}
143-
aggregate.cache.UpdateCacheEntry(string(normalizedValues), func() interface{} {
139+
aggregate.cache.UpdateCacheEntry(normalizedValues, func() interface{} {
144140
return groupState
145141
})
146142

@@ -212,20 +208,21 @@ func (aggregate *Aggregate) GetMetrics() []config.GenericMap {
212208
var metrics []config.GenericMap
213209

214210
// iterate over the items in the cache
215-
aggregate.cache.Iterate(func(_ string, value interface{}) {
211+
aggregate.cache.Iterate(func(_ uint64, value interface{}) {
216212
group := value.(*GroupState)
213+
nv := strings.Join(group.normalizedValues, ",")
217214
newEntry := config.GenericMap{
218215
"name": aggregate.definition.Name,
219216
"operation_type": aggregate.definition.OperationType,
220217
"operation_key": aggregate.definition.OperationKey,
221218
"by": strings.Join(aggregate.definition.GroupByKeys, ","),
222-
"aggregate": string(group.normalizedValues),
219+
"aggregate": nv,
223220
"total_value": group.totalValue,
224221
"total_count": group.totalCount,
225222
"recent_raw_values": group.recentRawValues,
226223
"recent_op_value": group.recentOpValue,
227224
"recent_count": group.recentCount,
228-
strings.Join(aggregate.definition.GroupByKeys, "_"): string(group.normalizedValues),
225+
strings.Join(aggregate.definition.GroupByKeys, "_"): nv,
229226
}
230227
// add the items in aggregate.definition.GroupByKeys individually to the entry
231228
for _, key := range aggregate.definition.GroupByKeys {

pkg/pipeline/extract/aggregate/aggregate_test.go

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -62,7 +62,7 @@ func getMockLabels(reverseOrder bool) Labels {
6262
}
6363

6464
func Test_getNormalizedValues(t *testing.T) {
65-
expectedLabels := NormalizedValues("20.0.0.2,10.0.0.1")
65+
expectedLabels := NormalizedValues{"20.0.0.2", "10.0.0.1"}
6666

6767
labels := getMockLabels(false)
6868

@@ -103,7 +103,7 @@ func Test_FilterEntry(t *testing.T) {
103103
normalizedLabels, labels, err := aggregate.filterEntry(entry)
104104
require.Equal(t, err, nil)
105105
require.Equal(t, Labels{"srcIP": "10.0.0.1", "dstIP": "20.0.0.2"}, labels)
106-
require.Equal(t, NormalizedValues("20.0.0.2,10.0.0.1"), normalizedLabels)
106+
require.Equal(t, NormalizedValues{"20.0.0.2", "10.0.0.1"}, normalizedLabels)
107107

108108
entry = test.GetIngestMockEntry(true)
109109

@@ -125,7 +125,7 @@ func Test_Evaluate(t *testing.T) {
125125

126126
require.Equal(t, nil, err)
127127
require.Equal(t, 1, aggregate.cache.GetCacheLen())
128-
cacheEntry, found := aggregate.cache.GetCacheEntry(string(normalizedValues))
128+
cacheEntry, found := aggregate.cache.GetCacheEntry(normalizedValues)
129129
gState := cacheEntry.(*GroupState)
130130
require.Equal(t, true, found)
131131
require.Equal(t, 2, gState.totalCount)

pkg/pipeline/transform/transform_network.go

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -127,10 +127,11 @@ func (n *Network) Transform(inputEntry config.GenericMap) (config.GenericMap, bo
127127
}
128128
if anyIP, ok := outputEntry[rule.AddSubnetLabel.Input]; ok {
129129
if strIP, ok := anyIP.(string); ok {
130-
lbl, ok := n.ipLabelCache.GetCacheEntry(strIP)
130+
keys := []string{strIP}
131+
lbl, ok := n.ipLabelCache.GetCacheEntry(keys)
131132
if !ok {
132133
lbl = n.applySubnetLabel(strIP)
133-
n.ipLabelCache.UpdateCacheEntry(strIP, func() interface{} { return lbl })
134+
n.ipLabelCache.UpdateCacheEntry(keys, func() interface{} { return lbl })
134135
}
135136
if lbl != "" {
136137
outputEntry[rule.AddSubnetLabel.Output] = lbl

pkg/pipeline/utils/fnv.go

Lines changed: 44 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,44 @@
1+
// Copyright 2018 The Prometheus Authors
2+
// Licensed under the Apache License, Version 2.0 (the "License");
3+
// you may not use this file except in compliance with the License.
4+
// You may obtain a copy of the License at
5+
//
6+
// http://www.apache.org/licenses/LICENSE-2.0
7+
//
8+
// Unless required by applicable law or agreed to in writing, software
9+
// distributed under the License is distributed on an "AS IS" BASIS,
10+
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
11+
// See the License for the specific language governing permissions and
12+
// limitations under the License.
13+
14+
// Borrowed from https://github.com/prometheus/client_golang/blob/7c924b7c82b5a61e1e7e2be165bff781700b5365/prometheus/fnv.go
15+
16+
package utils
17+
18+
// Inline and byte-free variant of hash/fnv's fnv64a.
19+
20+
const (
21+
offset64 = 14695981039346656037
22+
prime64 = 1099511628211
23+
)
24+
25+
// hashNew initializies a new fnv64a hash value.
26+
func hashNew() uint64 {
27+
return offset64
28+
}
29+
30+
// hashAdd adds a string to a fnv64a hash value, returning the updated hash.
31+
func hashAdd(h uint64, s string) uint64 {
32+
for i := 0; i < len(s); i++ {
33+
h ^= uint64(s[i])
34+
h *= prime64
35+
}
36+
return h
37+
}
38+
39+
// hashAddByte adds a byte to a fnv64a hash value, returning the updated hash.
40+
func hashAddByte(h uint64, b byte) uint64 {
41+
h ^= uint64(b)
42+
h *= prime64
43+
return h
44+
}

0 commit comments

Comments
 (0)