Skip to content

Commit 1623dae

Browse files
committed
delete old prometheus entries from report
1 parent d649199 commit 1623dae

File tree

5 files changed

+242
-83
lines changed

5 files changed

+242
-83
lines changed

docs/api.md

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -15,6 +15,7 @@ Following is the supported API format for prometheus encode:
1515
buckets: histogram buckets
1616
port: port number to expose "/metrics" endpoint
1717
prefix: prefix added to each metric name
18+
expirytime: seconds of no-flow to wait before deleting prometheus data item
1819
</pre>
1920
## Ingest collector API
2021
Following is the supported API format for the netflow collector:

pkg/api/encode_prom.go

Lines changed: 4 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -18,9 +18,10 @@
1818
package api
1919

2020
type PromEncode struct {
21-
Metrics PromMetricsItems `yaml:"metrics" doc:"list of prometheus metric definitions, each includes:"`
22-
Port int `yaml:"port" doc:"port number to expose \"/metrics\" endpoint"`
23-
Prefix string `yaml:"prefix" doc:"prefix added to each metric name"`
21+
Metrics PromMetricsItems `yaml:"metrics" doc:"list of prometheus metric definitions, each includes:"`
22+
Port int `yaml:"port" doc:"port number to expose \"/metrics\" endpoint"`
23+
Prefix string `yaml:"prefix" doc:"prefix added to each metric name"`
24+
ExpiryTime int `yaml:"expirytime" doc:"seconds of no-flow to wait before deleting prometheus data item"`
2425
}
2526

2627
type PromEncodeOperationEnum struct {

pkg/pipeline/encode/encode_prom.go

Lines changed: 160 additions & 42 deletions
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,7 @@
1818
package encode
1919

2020
import (
21+
"container/list"
2122
"encoding/json"
2223
"fmt"
2324
"github.com/netobserv/flowlogs2metrics/pkg/api"
@@ -28,46 +29,54 @@ import (
2829
"net/http"
2930
"os"
3031
"strconv"
32+
"sync"
33+
"time"
3134
)
3235

33-
// TODO when do we clean up old entries?
36+
const defaultExpiryTime = 120
3437

35-
type gaugeInfo struct {
36-
input string
37-
tags []string
38-
promGauge *prometheus.GaugeVec
39-
}
40-
41-
type counterInfo struct {
38+
type genericMetricInfo struct {
4239
input string
4340
tags []string
41+
metricType string
42+
promGauge *prometheus.GaugeVec
4443
promCounter *prometheus.CounterVec
44+
promHist *prometheus.HistogramVec
4545
}
4646

47-
type histInfo struct {
48-
input string
49-
tags []string
50-
promHist *prometheus.HistogramVec
47+
type entrySignature struct {
48+
Name string
49+
Labels map[string]string
5150
}
5251

53-
type counterEntryInfo struct {
54-
counterName string
55-
counterValue float64
56-
labels map[string]string
52+
type entryInfo struct {
53+
eInfo entrySignature
54+
value float64
5755
}
5856

59-
type gaugeEntryInfo struct {
60-
gaugeName string
61-
gaugeValue float64
62-
labels map[string]string
57+
type metricCacheEntry struct {
58+
label prometheus.Labels
59+
timeStamp int64
60+
e *list.Element
61+
key string
62+
metricType string
63+
promGauge *prometheus.GaugeVec
64+
promCounter *prometheus.CounterVec
65+
promHist *prometheus.HistogramVec
6366
}
6467

68+
type metricCache map[string]*metricCacheEntry
69+
6570
type encodeProm struct {
71+
mu sync.Mutex
6672
port string
6773
prefix string
68-
counters map[string]counterInfo
69-
gauges map[string]gaugeInfo
70-
histograms map[string]histInfo
74+
counters map[string]genericMetricInfo
75+
gauges map[string]genericMetricInfo
76+
histograms map[string]genericMetricInfo
77+
expiryTime int64
78+
mList *list.List
79+
mCache metricCache
7180
}
7281

7382
func (e *encodeProm) EncodeCounter(metric config.GenericMap) []interface{} {
@@ -88,19 +97,22 @@ func (e *encodeProm) EncodeCounter(metric config.GenericMap) []interface{} {
8897
for _, t := range counterInfo.tags {
8998
entryLabels[t] = fmt.Sprintf("%v", metric[t])
9099
}
91-
entry := counterEntryInfo{
92-
counterName: e.prefix + counterName,
93-
counterValue: valueFloat,
94-
labels: entryLabels,
100+
entry := entryInfo{
101+
eInfo: entrySignature{
102+
Name: e.prefix + counterName,
103+
Labels: entryLabels,
104+
},
105+
value: valueFloat,
95106
}
96107
out = append(out, entry)
97108
// push the metric to prometheus
98-
// TODO - fix the types here
99109
if counterInfo.promCounter != nil {
100110
counterInfo.promCounter.With(entryLabels).Add(valueFloat)
101111
}
112+
cEntry := e.saveEntryInCache(entry, entryLabels)
113+
cEntry.metricType = api.PromEncodeOperationName("Counter")
114+
cEntry.promCounter = counterInfo.promCounter
102115
}
103-
104116
return out
105117
}
106118
func (e *encodeProm) EncodeGauge(metric config.GenericMap) []interface{} {
@@ -121,34 +133,116 @@ func (e *encodeProm) EncodeGauge(metric config.GenericMap) []interface{} {
121133
for _, t := range gaugeInfo.tags {
122134
entryLabels[t] = fmt.Sprintf("%v", metric[t])
123135
}
124-
entry := gaugeEntryInfo{
125-
gaugeName: e.prefix + gaugeName,
126-
gaugeValue: valueFloat,
127-
labels: entryLabels,
136+
entry := entryInfo{
137+
eInfo: entrySignature{
138+
Name: e.prefix + gaugeName,
139+
Labels: entryLabels,
140+
},
141+
value: valueFloat,
128142
}
129143
out = append(out, entry)
130144
// push the metric to prometheus
131-
// TODO - fix the types here
132145
if gaugeInfo.promGauge != nil {
133146
gaugeInfo.promGauge.With(entryLabels).Set(valueFloat)
134147
}
135-
}
136148

149+
cEntry := e.saveEntryInCache(entry, entryLabels)
150+
cEntry.metricType = api.PromEncodeOperationName("Gauge")
151+
cEntry.promGauge = gaugeInfo.promGauge
152+
}
137153
return out
138154
}
139155

156+
func (e *encodeProm) saveEntryInCache(entry entryInfo, entryLabels map[string]string) *metricCacheEntry {
157+
// save item in cache; use eInfo as key to the cache
158+
var cEntry *metricCacheEntry
159+
secs := time.Now().Unix()
160+
eInfoBytes, _ := json.Marshal(&entry.eInfo)
161+
eInfoString := string(eInfoBytes)
162+
cEntry, ok := e.mCache[eInfoString]
163+
if ok {
164+
// item already exists in cache; update the element and move to end of list
165+
cEntry.timeStamp = secs
166+
// move to end of list
167+
e.mList.MoveToBack(cEntry.e)
168+
} else {
169+
// create new entry for cache
170+
cEntry = &metricCacheEntry{
171+
label: entryLabels,
172+
timeStamp: secs,
173+
key: eInfoString,
174+
}
175+
// place at end of list
176+
log.Debugf("adding entry = %v", cEntry)
177+
cEntry.e = e.mList.PushBack(cEntry)
178+
e.mCache[eInfoString] = cEntry
179+
log.Debugf("mlist = %v", e.mList)
180+
}
181+
return cEntry
182+
}
183+
140184
// Encode encodes a flow before being stored
141185
func (e *encodeProm) Encode(metrics []config.GenericMap) []interface{} {
186+
log.Debugf("entering encodeProm Encode")
187+
e.mu.Lock()
188+
defer e.mu.Unlock()
142189
out := make([]interface{}, 0)
143190
for _, metric := range metrics {
144191
gaugeOut := e.EncodeGauge(metric)
145192
out = append(out, gaugeOut...)
146193
counterOut := e.EncodeCounter(metric)
147194
out = append(out, counterOut...)
148195
}
196+
log.Debugf("cache = %v", e.mCache)
197+
log.Debugf("list = %v", e.mList)
149198
return out
150199
}
151200

201+
func (e *encodeProm) cleanupExpiredEntriesLoop() {
202+
for {
203+
e.cleanupExpiredEntries()
204+
time.Sleep(time.Duration(e.expiryTime) * time.Second)
205+
}
206+
}
207+
208+
// cleanupExpiredEntries - any entry that has expired should be removed from the prometheus reporting and cache
209+
func (e *encodeProm) cleanupExpiredEntries() {
210+
log.Debugf("entering cleanupExpiredEntries")
211+
e.mu.Lock()
212+
defer e.mu.Unlock()
213+
log.Debugf("cache = %v", e.mCache)
214+
log.Debugf("list = %v", e.mList)
215+
secs := time.Now().Unix()
216+
expireTime := secs - e.expiryTime
217+
// go through the list until we reach recently used connections
218+
for {
219+
entry := e.mList.Front()
220+
if entry == nil {
221+
return
222+
}
223+
c := entry.Value.(*metricCacheEntry)
224+
log.Debugf("timeStamp = %d, expireTime = %d", c.timeStamp, expireTime)
225+
log.Debugf("c = %v", c)
226+
if c.timeStamp > expireTime {
227+
// no more expired items
228+
return
229+
}
230+
231+
// clean up the entry
232+
log.Debugf("secs = %d, deleting %s", secs, c.label)
233+
switch c.metricType {
234+
case api.PromEncodeOperationName("Gauge"):
235+
c.promGauge.Delete(c.label)
236+
case api.PromEncodeOperationName("Counter"):
237+
c.promCounter.Delete(c.label)
238+
case api.PromEncodeOperationName("Histogram"):
239+
c.promHist.Delete(c.label)
240+
}
241+
delete(e.mCache, c.key)
242+
e.mList.Remove(entry)
243+
}
244+
}
245+
152246
// startPrometheusInterface listens for prometheus resource usage requests
153247
func startPrometheusInterface(w *encodeProm) {
154248
log.Debugf("entering startPrometheusInterface")
@@ -176,15 +270,20 @@ func NewEncodeProm() (Encoder, error) {
176270

177271
portNum := jsonEncodeProm.Port
178272
promPrefix := jsonEncodeProm.Prefix
273+
expiryTime := int64(jsonEncodeProm.ExpiryTime)
274+
if expiryTime == 0 {
275+
expiryTime = defaultExpiryTime
276+
}
277+
log.Debugf("expiryTime = %d", expiryTime)
179278

180-
counters := make(map[string]counterInfo)
181-
gauges := make(map[string]gaugeInfo)
182-
histograms := make(map[string]histInfo)
279+
counters := make(map[string]genericMetricInfo)
280+
gauges := make(map[string]genericMetricInfo)
281+
histograms := make(map[string]genericMetricInfo)
183282
for _, metricInfo := range jsonEncodeProm.Metrics {
184283
fullMetricName := promPrefix + metricInfo.Name
185284
labels := metricInfo.Labels
186285
log.Debugf("fullMetricName = %v", fullMetricName)
187-
log.Debugf("labels = %v", labels)
286+
log.Debugf("Labels = %v", labels)
188287
switch metricInfo.Type {
189288
case api.PromEncodeOperationName("Counter"):
190289
counter := prometheus.NewCounterVec(prometheus.CounterOpts{Name: fullMetricName, Help: ""}, labels)
@@ -193,15 +292,25 @@ func NewEncodeProm() (Encoder, error) {
193292
log.Errorf("error during prometheus.Register: %v", err)
194293
return nil, err
195294
}
196-
counters[metricInfo.Name] = counterInfo{metricInfo.ValueKey, labels, counter}
295+
counters[metricInfo.Name] = genericMetricInfo{
296+
input: metricInfo.ValueKey,
297+
tags: labels,
298+
metricType: api.PromEncodeOperationName("Counter"),
299+
promCounter: counter,
300+
}
197301
case api.PromEncodeOperationName("Gauge"):
198302
gauge := prometheus.NewGaugeVec(prometheus.GaugeOpts{Name: fullMetricName, Help: ""}, labels)
199303
err := prometheus.Register(gauge)
200304
if err != nil {
201305
log.Errorf("error during prometheus.Register: %v", err)
202306
return nil, err
203307
}
204-
gauges[metricInfo.Name] = gaugeInfo{metricInfo.ValueKey, labels, gauge}
308+
gauges[metricInfo.Name] = genericMetricInfo{
309+
input: metricInfo.ValueKey,
310+
tags: labels,
311+
metricType: api.PromEncodeOperationName("Gauge"),
312+
promGauge: gauge,
313+
}
205314
case api.PromEncodeOperationName("Histogram"):
206315
log.Debugf("buckets = %v", metricInfo.Buckets)
207316
hist := prometheus.NewHistogramVec(prometheus.HistogramOpts{Name: fullMetricName, Help: "", Buckets: metricInfo.Buckets}, labels)
@@ -210,7 +319,12 @@ func NewEncodeProm() (Encoder, error) {
210319
log.Errorf("error during prometheus.Register: %v", err)
211320
return nil, err
212321
}
213-
histograms[metricInfo.Name] = histInfo{metricInfo.ValueKey, labels, hist}
322+
histograms[metricInfo.Name] = genericMetricInfo{
323+
input: metricInfo.ValueKey,
324+
tags: labels,
325+
metricType: api.PromEncodeOperationName("Histogram"),
326+
promHist: hist,
327+
}
214328
case "default":
215329
log.Errorf("invalid metric type = %v, skipping", metricInfo.Type)
216330
}
@@ -222,7 +336,11 @@ func NewEncodeProm() (Encoder, error) {
222336
counters: counters,
223337
gauges: gauges,
224338
histograms: histograms,
339+
expiryTime: expiryTime,
340+
mList: list.New(),
341+
mCache: make(metricCache),
225342
}
226343
go startPrometheusInterface(w)
344+
go w.cleanupExpiredEntriesLoop()
227345
return w, nil
228346
}

0 commit comments

Comments
 (0)