Skip to content

Commit 91e0862

Browse files
committed
tmp promclient fork
1 parent 7dd0b17 commit 91e0862

27 files changed

+299
-221
lines changed

go.mod

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -170,3 +170,5 @@ require (
170170
sigs.k8s.io/structured-merge-diff/v4 v4.7.0 // indirect
171171
sigs.k8s.io/yaml v1.4.0 // indirect
172172
)
173+
174+
replace github.com/prometheus/client_golang => github.com/jotak/promclient-expiry v0.0.0-20250611102448-7d77f9cd87ab

go.sum

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -180,6 +180,8 @@ github.com/josharian/intern v1.0.0/go.mod h1:5DoeVV0s6jJacbCEi61lwdGj/aVlrQvzHFF
180180
github.com/josharian/native v1.0.0/go.mod h1:7X/raswPFr05uY3HiLlYeyQntB6OO7E/d2Cu7qoaN2w=
181181
github.com/josharian/native v1.1.0 h1:uuaP0hAbW7Y4l0ZRQ6C9zfb7Mg1mbFKry/xzDAfmtLA=
182182
github.com/josharian/native v1.1.0/go.mod h1:7X/raswPFr05uY3HiLlYeyQntB6OO7E/d2Cu7qoaN2w=
183+
github.com/jotak/promclient-expiry v0.0.0-20250611102448-7d77f9cd87ab h1:p/N80bbVJkc0AwCebR4KNmy9RGzwRy3qHCI8I1GtfG0=
184+
github.com/jotak/promclient-expiry v0.0.0-20250611102448-7d77f9cd87ab/go.mod h1:XsV7jfXxA+RgWQpPiFv7n4PjB/Kffuo/zpizw22O03c=
183185
github.com/jpillora/backoff v1.0.0 h1:uvFg412JmmHBHw7iwprIxkPMI+sGQ4kzOWsMeHnm2EA=
184186
github.com/jpillora/backoff v1.0.0/go.mod h1:J/6gKK9jxlEcS3zixgDgUAsiuZ7yrSoa/FX5e0EB2j4=
185187
github.com/json-iterator/go v1.1.12 h1:PV8peI4a0ysnczrg+LtxykD8LfKY9ML6u2jnxaEnrnM=
@@ -315,8 +317,6 @@ github.com/pkg/errors v0.9.1/go.mod h1:bwawxfHBFNV+L2hUp1rHADufV3IMtnDRdf1r5NINE
315317
github.com/pmezard/go-difflib v1.0.0/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4=
316318
github.com/pmezard/go-difflib v1.0.1-0.20181226105442-5d4384ee4fb2 h1:Jamvg5psRIccs7FGNTlIRMkT8wgtp5eCXdBlqhYGL6U=
317319
github.com/pmezard/go-difflib v1.0.1-0.20181226105442-5d4384ee4fb2/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4=
318-
github.com/prometheus/client_golang v1.22.0 h1:rb93p9lokFEsctTys46VnV1kLCDpVZ0a/Y92Vm0Zc6Q=
319-
github.com/prometheus/client_golang v1.22.0/go.mod h1:R7ljNsLXhuQXYZYtw6GAE9AZg8Y7vEW5scdCXrWRXC0=
320320
github.com/prometheus/client_model v0.6.2 h1:oBsgwpGs7iVziMvrGhE53c/GrLUsZdHnqNwqPLxwZyk=
321321
github.com/prometheus/client_model v0.6.2/go.mod h1:y3m2F6Gdpfy6Ut/GBsUqTWZqCUvMVzSfMLjcu6wAwpE=
322322
github.com/prometheus/common v0.64.0 h1:pdZeA+g617P7oGv1CzdTzyeShxAGrTBsolKNOLQPGO4=

pkg/pipeline/encode/encode_prom.go

Lines changed: 10 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,7 @@
1818
package encode
1919

2020
import (
21+
"context"
2122
"reflect"
2223
"strings"
2324
"time"
@@ -43,6 +44,7 @@ type EncodeProm struct {
4344
updateChan chan config.StageParam
4445
server *promserver.PromServer
4546
regName string
47+
exp *prometheus.Expiry
4648
}
4749

4850
func (e *EncodeProm) Gatherer() prometheus.Gatherer {
@@ -110,31 +112,26 @@ func (e *EncodeProm) GetChacheEntry(entryLabels map[string]string, m interface{}
110112
return nil
111113
}
112114

113-
// callback function from lru cleanup
114-
func (e *EncodeProm) Cleanup(cleanupFunc interface{}) {
115-
cleanupFunc.(func())()
116-
}
117-
118115
func (e *EncodeProm) addCounter(fullMetricName string, mInfo *metrics.Preprocessed) prometheus.Collector {
119-
counter := prometheus.NewCounterVec(prometheus.CounterOpts{Name: fullMetricName, Help: ""}, mInfo.TargetLabels())
116+
counter := prometheus.NewCounterVec(prometheus.CounterOpts{Name: fullMetricName, Expiry: e.exp, Help: ""}, mInfo.TargetLabels())
120117
e.metricCommon.AddCounter(fullMetricName, counter, mInfo)
121118
return counter
122119
}
123120

124121
func (e *EncodeProm) addGauge(fullMetricName string, mInfo *metrics.Preprocessed) prometheus.Collector {
125-
gauge := prometheus.NewGaugeVec(prometheus.GaugeOpts{Name: fullMetricName, Help: ""}, mInfo.TargetLabels())
122+
gauge := prometheus.NewGaugeVec(prometheus.GaugeOpts{Name: fullMetricName, Expiry: e.exp, Help: ""}, mInfo.TargetLabels())
126123
e.metricCommon.AddGauge(fullMetricName, gauge, mInfo)
127124
return gauge
128125
}
129126

130127
func (e *EncodeProm) addHistogram(fullMetricName string, mInfo *metrics.Preprocessed) prometheus.Collector {
131-
histogram := prometheus.NewHistogramVec(prometheus.HistogramOpts{Name: fullMetricName, Help: ""}, mInfo.TargetLabels())
128+
histogram := prometheus.NewHistogramVec(prometheus.HistogramOpts{Name: fullMetricName, Expiry: e.exp, Help: ""}, mInfo.TargetLabels())
132129
e.metricCommon.AddHist(fullMetricName, histogram, mInfo)
133130
return histogram
134131
}
135132

136133
func (e *EncodeProm) addAgghistogram(fullMetricName string, mInfo *metrics.Preprocessed) prometheus.Collector {
137-
agghistogram := prometheus.NewHistogramVec(prometheus.HistogramOpts{Name: fullMetricName, Help: ""}, mInfo.TargetLabels())
134+
agghistogram := prometheus.NewHistogramVec(prometheus.HistogramOpts{Name: fullMetricName, Expiry: e.exp, Help: ""}, mInfo.TargetLabels())
138135
e.metricCommon.AddAggHist(fullMetricName, agghistogram, mInfo)
139136
return agghistogram
140137
}
@@ -297,21 +294,24 @@ func NewEncodeProm(opMetrics *operational.Metrics, params config.StageParam) (En
297294
plog.Debugf("expiryTime = %v", expiryTime)
298295

299296
registry := prometheus.NewRegistry()
297+
exp := prometheus.NewExpiry(&expiryTime.Duration)
298+
exp.RunEvery(context.TODO(), expiryTime.Duration)
300299

301300
w := &EncodeProm{
302301
cfg: &cfg,
303302
registerer: registry,
304303
updateChan: make(chan config.StageParam),
305304
server: promserver.SharedServer,
306305
regName: params.Name,
306+
exp: &exp,
307307
}
308308

309309
if cfg.PromConnectionInfo != nil {
310310
// Start new server
311311
w.server = promserver.StartServerAsync(cfg.PromConnectionInfo, params.Name, registry)
312312
}
313313

314-
metricCommon := NewMetricsCommonStruct(opMetrics, cfg.MaxMetrics, params.Name, expiryTime, w.Cleanup)
314+
metricCommon := NewMetricsCommonStruct(opMetrics, params.Name)
315315
w.metricCommon = metricCommon
316316

317317
// Init metrics

pkg/pipeline/encode/encode_prom_test.go

Lines changed: 0 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -97,7 +97,6 @@ func Test_NewEncodeProm(t *testing.T) {
9797
require.Equal(t, 1, len(encodeProm.metricCommon.gauges))
9898
require.Equal(t, 1, len(encodeProm.metricCommon.histos))
9999
require.Equal(t, 1, len(encodeProm.metricCommon.aggHistos))
100-
require.Equal(t, time.Second, encodeProm.metricCommon.expiryTime)
101100

102101
require.Equal(t, encodeProm.metricCommon.gauges["test_Bytes"].info.Name, "Bytes")
103102
expectedList := []string{"srcAddr", "dstAddr", "srcPort"}
@@ -108,16 +107,6 @@ func Test_NewEncodeProm(t *testing.T) {
108107
require.Equal(t, encodeProm.metricCommon.counters["test_Packets"].info.Labels, expectedList)
109108
entry := test.GetExtractMockEntry()
110109
encodeProm.Encode(entry)
111-
112-
// verify entries are in cache; one for the gauge and one for the counter
113-
entriesMapLen := encodeProm.metricCommon.mCache.GetCacheLen()
114-
require.Equal(t, 2, entriesMapLen)
115-
116-
// wait a couple seconds so that the entry will expire
117-
time.Sleep(2 * time.Second)
118-
encodeProm.metricCommon.mCache.CleanupExpiredEntries(encodeProm.metricCommon.expiryTime, encodeProm.Cleanup)
119-
entriesMapLen = encodeProm.metricCommon.mCache.GetCacheLen()
120-
require.Equal(t, 0, entriesMapLen)
121110
}
122111

123112
func Test_CustomMetric(t *testing.T) {

pkg/pipeline/encode/metrics_common.go

Lines changed: 18 additions & 73 deletions
Original file line numberDiff line numberDiff line change
@@ -18,10 +18,6 @@
1818
package encode
1919

2020
import (
21-
"strings"
22-
"time"
23-
24-
"github.com/netobserv/flowlogs-pipeline/pkg/api"
2521
"github.com/netobserv/flowlogs-pipeline/pkg/config"
2622
"github.com/netobserv/flowlogs-pipeline/pkg/operational"
2723
"github.com/netobserv/flowlogs-pipeline/pkg/pipeline/encode/metrics"
@@ -41,12 +37,9 @@ type MetricsCommonStruct struct {
4137
counters map[string]mInfoStruct
4238
histos map[string]mInfoStruct
4339
aggHistos map[string]mInfoStruct
44-
mCache *putils.TimedCache
45-
mChacheLenMetric prometheus.Gauge
4640
metricsProcessed prometheus.Counter
4741
metricsDropped prometheus.Counter
4842
errorsCounter *prometheus.CounterVec
49-
expiryTime time.Duration
5043
exitChan <-chan struct{}
5144
}
5245

@@ -77,12 +70,6 @@ var (
7770
operational.TypeCounter,
7871
"error", "metric", "key",
7972
)
80-
mChacheLen = operational.DefineMetric(
81-
"encode_prom_metrics_reported",
82-
"Total number of prometheus metrics reported by this stage",
83-
operational.TypeGauge,
84-
"stage",
85-
)
8673
)
8774

8875
func (m *MetricsCommonStruct) AddCounter(name string, g interface{}, info *metrics.Preprocessed) {
@@ -110,12 +97,12 @@ func (m *MetricsCommonStruct) MetricCommonEncode(mci MetricsCommonInterface, met
11097

11198
// Process counters
11299
for _, mInfo := range m.counters {
113-
labelSets, value := m.prepareMetric(mci, metricRecord, mInfo.info, mInfo.genericMetric)
100+
labelSets, value := m.prepareMetric(metricRecord, mInfo.info)
114101
if labelSets == nil {
115102
continue
116103
}
117104
for _, labels := range labelSets {
118-
err := mci.ProcessCounter(mInfo.genericMetric, labels.lMap, value)
105+
err := mci.ProcessCounter(mInfo.genericMetric, labels, value)
119106
if err != nil {
120107
log.Errorf("labels registering error on %s: %v", mInfo.info.Name, err)
121108
m.errorsCounter.WithLabelValues("LabelsRegisteringError", mInfo.info.Name, "").Inc()
@@ -127,12 +114,12 @@ func (m *MetricsCommonStruct) MetricCommonEncode(mci MetricsCommonInterface, met
127114

128115
// Process gauges
129116
for _, mInfo := range m.gauges {
130-
labelSets, value := m.prepareMetric(mci, metricRecord, mInfo.info, mInfo.genericMetric)
117+
labelSets, value := m.prepareMetric(metricRecord, mInfo.info)
131118
if labelSets == nil {
132119
continue
133120
}
134121
for _, labels := range labelSets {
135-
err := mci.ProcessGauge(mInfo.genericMetric, labels.lMap, value, labels.key)
122+
err := mci.ProcessGauge(mInfo.genericMetric, labels, value, "")
136123
if err != nil {
137124
log.Errorf("labels registering error on %s: %v", mInfo.info.Name, err)
138125
m.errorsCounter.WithLabelValues("LabelsRegisteringError", mInfo.info.Name, "").Inc()
@@ -144,12 +131,12 @@ func (m *MetricsCommonStruct) MetricCommonEncode(mci MetricsCommonInterface, met
144131

145132
// Process histograms
146133
for _, mInfo := range m.histos {
147-
labelSets, value := m.prepareMetric(mci, metricRecord, mInfo.info, mInfo.genericMetric)
134+
labelSets, value := m.prepareMetric(metricRecord, mInfo.info)
148135
if labelSets == nil {
149136
continue
150137
}
151138
for _, labels := range labelSets {
152-
err := mci.ProcessHist(mInfo.genericMetric, labels.lMap, value)
139+
err := mci.ProcessHist(mInfo.genericMetric, labels, value)
153140
if err != nil {
154141
log.Errorf("labels registering error on %s: %v", mInfo.info.Name, err)
155142
m.errorsCounter.WithLabelValues("LabelsRegisteringError", mInfo.info.Name, "").Inc()
@@ -161,12 +148,12 @@ func (m *MetricsCommonStruct) MetricCommonEncode(mci MetricsCommonInterface, met
161148

162149
// Process pre-aggregated histograms
163150
for _, mInfo := range m.aggHistos {
164-
labelSets, values := m.prepareAggHisto(mci, metricRecord, mInfo.info, mInfo.genericMetric)
151+
labelSets, values := m.prepareAggHisto(metricRecord, mInfo.info)
165152
if labelSets == nil {
166153
continue
167154
}
168155
for _, labels := range labelSets {
169-
err := mci.ProcessAggHist(mInfo.genericMetric, labels.lMap, values)
156+
err := mci.ProcessAggHist(mInfo.genericMetric, labels, values)
170157
if err != nil {
171158
log.Errorf("labels registering error on %s: %v", mInfo.info.Name, err)
172159
m.errorsCounter.WithLabelValues("LabelsRegisteringError", mInfo.info.Name, "").Inc()
@@ -177,7 +164,7 @@ func (m *MetricsCommonStruct) MetricCommonEncode(mci MetricsCommonInterface, met
177164
}
178165
}
179166

180-
func (m *MetricsCommonStruct) prepareMetric(mci MetricsCommonInterface, flow config.GenericMap, info *metrics.Preprocessed, mv interface{}) ([]labelsKeyAndMap, float64) {
167+
func (m *MetricsCommonStruct) prepareMetric(flow config.GenericMap, info *metrics.Preprocessed) ([]map[string]string, float64) {
181168
flatParts := info.GenerateFlatParts(flow)
182169
ok, flatParts := info.ApplyFilters(flow, flatParts)
183170
if !ok {
@@ -198,22 +185,15 @@ func (m *MetricsCommonStruct) prepareMetric(mci MetricsCommonInterface, flow con
198185
}
199186

200187
labelSets := extractLabels(flow, flatParts, info)
201-
var lkms []labelsKeyAndMap
188+
var lkms []map[string]string
202189
for _, ls := range labelSets {
203190
// Update entry for expiry mechanism (the entry itself is its own cleanup function)
204-
lkm := ls.toKeyAndMap(info)
205-
lkms = append(lkms, lkm)
206-
cacheEntry := mci.GetChacheEntry(lkm.lMap, mv)
207-
ok := m.mCache.UpdateCacheEntry(lkm.key, cacheEntry)
208-
if !ok {
209-
m.metricsDropped.Inc()
210-
return nil, 0
211-
}
191+
lkms = append(lkms, ls.toMap())
212192
}
213193
return lkms, floatVal
214194
}
215195

216-
func (m *MetricsCommonStruct) prepareAggHisto(mci MetricsCommonInterface, flow config.GenericMap, info *metrics.Preprocessed, mc interface{}) ([]labelsKeyAndMap, []float64) {
196+
func (m *MetricsCommonStruct) prepareAggHisto(flow config.GenericMap, info *metrics.Preprocessed) ([]map[string]string, []float64) {
217197
flatParts := info.GenerateFlatParts(flow)
218198
ok, flatParts := info.ApplyFilters(flow, flatParts)
219199
if !ok {
@@ -231,17 +211,10 @@ func (m *MetricsCommonStruct) prepareAggHisto(mci MetricsCommonInterface, flow c
231211
}
232212

233213
labelSets := extractLabels(flow, flatParts, info)
234-
var lkms []labelsKeyAndMap
214+
var lkms []map[string]string
235215
for _, ls := range labelSets {
236216
// Update entry for expiry mechanism (the entry itself is its own cleanup function)
237-
lkm := ls.toKeyAndMap(info)
238-
lkms = append(lkms, lkm)
239-
cacheEntry := mci.GetChacheEntry(lkm.lMap, mc)
240-
ok := m.mCache.UpdateCacheEntry(lkm.key, cacheEntry)
241-
if !ok {
242-
m.metricsDropped.Inc()
243-
return nil, nil
244-
}
217+
lkms = append(lkms, ls.toMap())
245218
}
246219
return lkms, values
247220
}
@@ -266,22 +239,12 @@ type label struct {
266239

267240
type labelSet []label
268241

269-
type labelsKeyAndMap struct {
270-
key string
271-
lMap map[string]string
272-
}
273-
274-
func (l labelSet) toKeyAndMap(info *metrics.Preprocessed) labelsKeyAndMap {
275-
key := strings.Builder{}
276-
key.WriteString(info.Name)
277-
key.WriteRune('|')
278-
m := map[string]string{}
242+
func (l labelSet) toMap() map[string]string {
243+
m := make(map[string]string, len(l))
279244
for _, kv := range l {
280-
key.WriteString(kv.value)
281-
key.WriteRune('|')
282245
m[kv.key] = kv.value
283246
}
284-
return labelsKeyAndMap{key: key.String(), lMap: m}
247+
return m
285248
}
286249

287250
// extractLabels takes the flow and a single metric definition as input.
@@ -313,41 +276,23 @@ func newLabelSet(part config.GenericMap, labels []metrics.MappedLabel) labelSet
313276
return ls
314277
}
315278

316-
func (m *MetricsCommonStruct) cleanupExpiredEntriesLoop(callback putils.CacheCallback) {
317-
ticker := time.NewTicker(m.expiryTime)
318-
for {
319-
select {
320-
case <-m.exitChan:
321-
log.Debugf("exiting cleanupExpiredEntriesLoop because of signal")
322-
return
323-
case <-ticker.C:
324-
m.mCache.CleanupExpiredEntries(m.expiryTime, callback)
325-
}
326-
}
327-
}
328-
329279
func (m *MetricsCommonStruct) cleanupInfoStructs() {
330280
m.gauges = map[string]mInfoStruct{}
331281
m.counters = map[string]mInfoStruct{}
332282
m.histos = map[string]mInfoStruct{}
333283
m.aggHistos = map[string]mInfoStruct{}
334284
}
335285

336-
func NewMetricsCommonStruct(opMetrics *operational.Metrics, maxCacheEntries int, name string, expiryTime api.Duration, callback putils.CacheCallback) *MetricsCommonStruct {
337-
mChacheLenMetric := opMetrics.NewGauge(&mChacheLen, name)
286+
func NewMetricsCommonStruct(opMetrics *operational.Metrics, name string) *MetricsCommonStruct {
338287
m := &MetricsCommonStruct{
339-
mCache: putils.NewTimedCache(maxCacheEntries, mChacheLenMetric),
340-
mChacheLenMetric: mChacheLenMetric,
341288
metricsProcessed: opMetrics.NewCounter(&metricsProcessed, name),
342289
metricsDropped: opMetrics.NewCounter(&metricsDropped, name),
343290
errorsCounter: opMetrics.NewCounterVec(&encodePromErrors),
344-
expiryTime: expiryTime.Duration,
345291
exitChan: putils.ExitChannel(),
346292
gauges: map[string]mInfoStruct{},
347293
counters: map[string]mInfoStruct{},
348294
histos: map[string]mInfoStruct{},
349295
aggHistos: map[string]mInfoStruct{},
350296
}
351-
go m.cleanupExpiredEntriesLoop(callback)
352297
return m
353298
}

pkg/pipeline/encode/opentelemetry/encode_otlpmetrics.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -128,7 +128,7 @@ func NewEncodeOtlpMetrics(opMetrics *operational.Metrics, params config.StagePar
128128
meter: meterFactory,
129129
}
130130

131-
metricCommon := encode.NewMetricsCommonStruct(opMetrics, 0, params.Name, expiryTime, nil)
131+
metricCommon := encode.NewMetricsCommonStruct(opMetrics, params.Name)
132132
w.metricCommon = metricCommon
133133

134134
for i := range cfg.Metrics {

0 commit comments

Comments
 (0)