Skip to content

Commit f78e474

Browse files
committed
Optimizations on prom-encode and loki-write
- Pre-allocate slices/maps to avoid dynamic reallocations - In loki write, split labels/lines in 1 pass rather than copying everything then deleting labels from lines.
1 parent 7dd0b17 commit f78e474

File tree

8 files changed

+69
-59
lines changed

8 files changed

+69
-59
lines changed

pkg/pipeline/encode/encode_prom.go

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -98,7 +98,8 @@ func (e *EncodeProm) ProcessAggHist(m interface{}, labels map[string]string, val
9898
return nil
9999
}
100100

101-
func (e *EncodeProm) GetChacheEntry(entryLabels map[string]string, m interface{}) interface{} {
101+
func (e *EncodeProm) GetCacheEntry(entryLabels map[string]string, m interface{}) interface{} {
102+
// In prom_encode, the metrics cache just contains cleanup callbacks
102103
switch mv := m.(type) {
103104
case *prometheus.CounterVec:
104105
return func() { mv.Delete(entryLabels) }

pkg/pipeline/encode/metrics_common.go

Lines changed: 14 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -42,7 +42,7 @@ type MetricsCommonStruct struct {
4242
histos map[string]mInfoStruct
4343
aggHistos map[string]mInfoStruct
4444
mCache *putils.TimedCache
45-
mChacheLenMetric prometheus.Gauge
45+
mCacheLenMetric prometheus.Gauge
4646
metricsProcessed prometheus.Counter
4747
metricsDropped prometheus.Counter
4848
errorsCounter *prometheus.CounterVec
@@ -51,7 +51,7 @@ type MetricsCommonStruct struct {
5151
}
5252

5353
type MetricsCommonInterface interface {
54-
GetChacheEntry(entryLabels map[string]string, m interface{}) interface{}
54+
GetCacheEntry(entryLabels map[string]string, m interface{}) interface{}
5555
ProcessCounter(m interface{}, labels map[string]string, value float64) error
5656
ProcessGauge(m interface{}, labels map[string]string, value float64, key string) error
5757
ProcessHist(m interface{}, labels map[string]string, value float64) error
@@ -198,13 +198,14 @@ func (m *MetricsCommonStruct) prepareMetric(mci MetricsCommonInterface, flow con
198198
}
199199

200200
labelSets := extractLabels(flow, flatParts, info)
201-
var lkms []labelsKeyAndMap
201+
lkms := make([]labelsKeyAndMap, 0, len(labelSets))
202202
for _, ls := range labelSets {
203203
// Update entry for expiry mechanism (the entry itself is its own cleanup function)
204204
lkm := ls.toKeyAndMap(info)
205205
lkms = append(lkms, lkm)
206-
cacheEntry := mci.GetChacheEntry(lkm.lMap, mv)
207-
ok := m.mCache.UpdateCacheEntry(lkm.key, cacheEntry)
206+
ok := m.mCache.UpdateCacheEntry(lkm.key, func() interface{} {
207+
return mci.GetCacheEntry(lkm.lMap, mv)
208+
})
208209
if !ok {
209210
m.metricsDropped.Inc()
210211
return nil, 0
@@ -231,13 +232,14 @@ func (m *MetricsCommonStruct) prepareAggHisto(mci MetricsCommonInterface, flow c
231232
}
232233

233234
labelSets := extractLabels(flow, flatParts, info)
234-
var lkms []labelsKeyAndMap
235+
lkms := make([]labelsKeyAndMap, 0, len(labelSets))
235236
for _, ls := range labelSets {
236237
// Update entry for expiry mechanism (the entry itself is its own cleanup function)
237238
lkm := ls.toKeyAndMap(info)
238239
lkms = append(lkms, lkm)
239-
cacheEntry := mci.GetChacheEntry(lkm.lMap, mc)
240-
ok := m.mCache.UpdateCacheEntry(lkm.key, cacheEntry)
240+
ok := m.mCache.UpdateCacheEntry(lkm.key, func() interface{} {
241+
return mci.GetCacheEntry(lkm.lMap, mc)
242+
})
241243
if !ok {
242244
m.metricsDropped.Inc()
243245
return nil, nil
@@ -273,9 +275,10 @@ type labelsKeyAndMap struct {
273275

274276
func (l labelSet) toKeyAndMap(info *metrics.Preprocessed) labelsKeyAndMap {
275277
key := strings.Builder{}
278+
key.Grow(256) // pre-allocate a decent buffer
276279
key.WriteString(info.Name)
277280
key.WriteRune('|')
278-
m := map[string]string{}
281+
m := make(map[string]string, len(l))
279282
for _, kv := range l {
280283
key.WriteString(kv.value)
281284
key.WriteRune('|')
@@ -302,7 +305,7 @@ func extractLabels(flow config.GenericMap, flatParts []config.GenericMap, info *
302305
}
303306

304307
func newLabelSet(part config.GenericMap, labels []metrics.MappedLabel) labelSet {
305-
var ls labelSet
308+
ls := make(labelSet, 0, len(labels))
306309
for _, t := range labels {
307310
label := label{key: t.Target, value: ""}
308311
if v, ok := part[t.Source]; ok {
@@ -337,7 +340,7 @@ func NewMetricsCommonStruct(opMetrics *operational.Metrics, maxCacheEntries int,
337340
mChacheLenMetric := opMetrics.NewGauge(&mChacheLen, name)
338341
m := &MetricsCommonStruct{
339342
mCache: putils.NewTimedCache(maxCacheEntries, mChacheLenMetric),
340-
mChacheLenMetric: mChacheLenMetric,
343+
mCacheLenMetric: mChacheLenMetric,
341344
metricsProcessed: opMetrics.NewCounter(&metricsProcessed, name),
342345
metricsDropped: opMetrics.NewCounter(&metricsDropped, name),
343346
errorsCounter: opMetrics.NewCounterVec(&encodePromErrors),

pkg/pipeline/encode/opentelemetry/encode_otlpmetrics.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -90,7 +90,7 @@ func (e *EncodeOtlpMetrics) ProcessAggHist(m interface{}, labels map[string]stri
9090
return nil
9191
}
9292

93-
func (e *EncodeOtlpMetrics) GetChacheEntry(entryLabels map[string]string, _ interface{}) interface{} {
93+
func (e *EncodeOtlpMetrics) GetCacheEntry(entryLabels map[string]string, _ interface{}) interface{} {
9494
return entryLabels
9595
}
9696

pkg/pipeline/extract/aggregate/aggregate.go

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -140,7 +140,9 @@ func (aggregate *Aggregate) UpdateByEntry(entry config.GenericMap, normalizedVal
140140
} else {
141141
groupState = oldEntry.(*GroupState)
142142
}
143-
aggregate.cache.UpdateCacheEntry(string(normalizedValues), groupState)
143+
aggregate.cache.UpdateCacheEntry(string(normalizedValues), func() interface{} {
144+
return groupState
145+
})
144146

145147
// update value
146148
operationKey := aggregate.definition.OperationKey

pkg/pipeline/transform/transform_network.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -130,7 +130,7 @@ func (n *Network) Transform(inputEntry config.GenericMap) (config.GenericMap, bo
130130
lbl, ok := n.ipLabelCache.GetCacheEntry(strIP)
131131
if !ok {
132132
lbl = n.applySubnetLabel(strIP)
133-
n.ipLabelCache.UpdateCacheEntry(strIP, lbl)
133+
n.ipLabelCache.UpdateCacheEntry(strIP, func() interface{} { return lbl })
134134
}
135135
if lbl != "" {
136136
outputEntry[rule.AddSubnetLabel.Output] = lbl

pkg/pipeline/utils/timed_cache.go

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -65,9 +65,9 @@ func (tc *TimedCache) GetCacheEntry(key string) (interface{}, bool) {
6565

6666
var uclog = log.WithField("method", "UpdateCacheEntry")
6767

68-
// If cache entry exists, update it and return it; if it does not exist, create it if there is room.
68+
// If cache entry exists, update its timestamp; if it does not exist, create it if there is room.
6969
// If we exceed the size of the cache, then do not allocate new entry
70-
func (tc *TimedCache) UpdateCacheEntry(key string, entry interface{}) bool {
70+
func (tc *TimedCache) UpdateCacheEntry(key string, entryProvider func() interface{}) bool {
7171
nowInSecs := time.Now()
7272
tc.mu.Lock()
7373
defer tc.mu.Unlock()
@@ -85,7 +85,7 @@ func (tc *TimedCache) UpdateCacheEntry(key string, entry interface{}) bool {
8585
cEntry = &cacheEntry{
8686
lastUpdatedTime: nowInSecs,
8787
key: key,
88-
SourceEntry: entry,
88+
SourceEntry: entryProvider(),
8989
}
9090
uclog.Tracef("adding entry: %#v", cEntry)
9191
// place at end of list

pkg/pipeline/write/write_loki.go

Lines changed: 41 additions & 39 deletions
Original file line numberDiff line numberDiff line change
@@ -56,6 +56,7 @@ type Loki struct {
5656
apiConfig api.WriteLoki
5757
timestampScale float64
5858
saneLabels map[string]model.LabelName
59+
ignoreList map[string]any
5960
client emitter
6061
timeNow func() time.Time
6162
exitChan <-chan struct{}
@@ -107,36 +108,51 @@ func buildLokiConfig(c *api.WriteLoki) (loki.Config, error) {
107108
}
108109

109110
func (l *Loki) ProcessRecord(in config.GenericMap) error {
110-
// copy record before process to avoid alteration on parallel stages
111-
out := in.Copy()
112-
labels := model.LabelSet{}
111+
labels, lines := l.splitLabelsLines(in)
113112

114-
// Add static labels from config
115-
for k, v := range l.apiConfig.StaticLabels {
116-
labels[k] = v
117-
}
118-
l.addLabels(in, labels)
119-
120-
// Remove labels and configured ignore list from record
121-
ignoreList := l.apiConfig.IgnoreList
122-
ignoreList = append(ignoreList, l.apiConfig.Labels...)
123-
for _, label := range ignoreList {
124-
delete(out, label)
125-
}
126-
127-
js, err := jsonEncodingConfig.Marshal(out)
113+
js, err := jsonEncodingConfig.Marshal(lines)
128114
if err != nil {
129115
return err
130116
}
131117

132-
timestamp := l.extractTimestamp(out)
118+
timestamp := l.extractTimestamp(lines)
133119
err = l.client.Handle(labels, timestamp, string(js))
134120
if err == nil {
135121
l.metrics.recordsWritten.Inc()
136122
}
137123
return err
138124
}
139125

126+
func (l *Loki) splitLabelsLines(in config.GenericMap) (model.LabelSet, config.GenericMap) {
127+
// Split the input GenericMap into one map for labels and another for lines / payload
128+
nLabels := len(l.apiConfig.StaticLabels) + len(l.saneLabels)
129+
labels := make(model.LabelSet, nLabels)
130+
lines := make(config.GenericMap, len(in))
131+
132+
// Add static labels from config
133+
for k, v := range l.apiConfig.StaticLabels {
134+
labels[k] = v
135+
}
136+
137+
for k, v := range in {
138+
if _, ignored := l.ignoreList[k]; ignored {
139+
continue
140+
}
141+
if sanitized, isLabel := l.saneLabels[k]; isLabel {
142+
lv := model.LabelValue(utils.ConvertToString(v))
143+
if !lv.IsValid() {
144+
log.WithFields(logrus.Fields{"key": k, "value": v}).Debug("Invalid label value. Ignoring it")
145+
continue
146+
}
147+
labels[sanitized] = lv
148+
} else {
149+
lines[k] = v
150+
}
151+
}
152+
153+
return labels, lines
154+
}
155+
140156
func (l *Loki) extractTimestamp(record map[string]interface{}) time.Time {
141157
if l.apiConfig.TimestampLabel == "" {
142158
return l.timeNow()
@@ -163,27 +179,6 @@ func (l *Loki) extractTimestamp(record map[string]interface{}) time.Time {
163179
return time.Unix(tsNanos/int64(time.Second), tsNanos%int64(time.Second))
164180
}
165181

166-
func (l *Loki) addLabels(record config.GenericMap, labels model.LabelSet) {
167-
// Add non-static labels from record
168-
for _, label := range l.apiConfig.Labels {
169-
val, ok := record[label]
170-
if !ok {
171-
continue
172-
}
173-
sanitized, ok := l.saneLabels[label]
174-
if !ok {
175-
continue
176-
}
177-
lv := model.LabelValue(utils.ConvertToString(val))
178-
if !lv.IsValid() {
179-
log.WithFields(logrus.Fields{"key": label, "value": val}).
180-
Debug("Invalid label value. Ignoring it")
181-
continue
182-
}
183-
labels[sanitized] = lv
184-
}
185-
}
186-
187182
func getFloat64(timestamp interface{}) (ft float64, ok bool) {
188183
switch i := timestamp.(type) {
189184
case float64:
@@ -255,11 +250,18 @@ func NewWriteLoki(opMetrics *operational.Metrics, params config.StageParam) (*Lo
255250
}
256251
}
257252

253+
// Ignore list to map
254+
ignoreList := make(map[string]any, len(lokiConfigIn.IgnoreList))
255+
for _, label := range lokiConfigIn.IgnoreList {
256+
ignoreList[label] = nil
257+
}
258+
258259
l := &Loki{
259260
lokiConfig: lokiConfig,
260261
apiConfig: lokiConfigIn,
261262
timestampScale: float64(timestampScale),
262263
saneLabels: saneLabels,
264+
ignoreList: ignoreList,
263265
client: client,
264266
timeNow: time.Now,
265267
exitChan: pUtils.ExitChannel(),

pkg/pipeline/write/write_loki_test.go

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -289,7 +289,7 @@ parameters:
289289
loki.client = &fe
290290

291291
require.NoError(t, loki.ProcessRecord(map[string]interface{}{
292-
"ba/z": "isBaz", "fo.o": "isFoo", "ba-r": "isBar", "ignored?": "yes!"}))
292+
"ba/z": "isBaz", "fo.o": "isFoo", "ba-r": "isBar", "ignored?": "yes!", "md": "md"}))
293293

294294
fe.AssertCalled(t, "Handle", model.LabelSet{
295295
"ba_r": "isBar",
@@ -336,6 +336,7 @@ func buildFlow(t time.Time) config.GenericMap {
336336
"bytes": rand.Intn(100),
337337
"packets": rand.Intn(10),
338338
"latency": rand.Float64(),
339+
"toIgnore": "--",
339340
}
340341
}
341342

@@ -359,9 +360,10 @@ func BenchmarkWriteLoki(b *testing.B) {
359360
StaticLabels: model.LabelSet{
360361
"app": "flp-benchmark",
361362
},
362-
Labels: []string{"srcIP", "dstIP"},
363+
Labels: []string{"srcIP", "dstIP", "toIgnore"},
363364
TimestampLabel: "timestamp",
364365
TimestampScale: "1ms",
366+
IgnoreList: []string{"toIgnore"},
365367
}
366368

367369
loki, err := NewWriteLoki(operational.NewMetrics(&config.MetricsSettings{}), config.StageParam{Write: &config.Write{Loki: &params}})

0 commit comments

Comments
 (0)