Skip to content

Commit 73aa303

Browse files
committed
combined handling of gauge and counter
1 parent bafea80 commit 73aa303

File tree

2 files changed

+65
-113
lines changed

2 files changed

+65
-113
lines changed

pkg/pipeline/encode/encode_prom.go

Lines changed: 57 additions & 100 deletions
Original file line numberDiff line numberDiff line change
@@ -72,84 +72,74 @@ type encodeProm struct {
7272
mu sync.Mutex
7373
port string
7474
prefix string
75-
counters map[string]metricInfo
76-
gauges map[string]metricInfo
77-
histograms map[string]metricInfo
75+
metrics map[string]metricInfo
7876
expiryTime int64
7977
mList *list.List
8078
mCache metricCache
8179
}
8280

83-
func (e *encodeProm) EncodeCounter(metric config.GenericMap) []interface{} {
81+
// Encode encodes a metric before being stored
82+
func (e *encodeProm) Encode(metrics []config.GenericMap) []interface{} {
83+
log.Debugf("entering encodeProm Encode")
84+
e.mu.Lock()
85+
defer e.mu.Unlock()
8486
out := make([]interface{}, 0)
85-
for counterName, counterInfo := range e.counters {
86-
counterValue, ok := metric[counterInfo.input]
87-
if !ok {
88-
log.Debugf("field %v is missing", counterName)
89-
continue
90-
}
91-
counterValueString := fmt.Sprintf("%v", counterValue)
92-
valueFloat, err := strconv.ParseFloat(counterValueString, 64)
93-
if err != nil {
94-
log.Debugf("field cannot be converted to float: %v, %s", counterValue, counterValueString)
95-
continue
96-
}
97-
entryLabels := make(map[string]string, len(counterInfo.labelNames))
98-
for _, t := range counterInfo.labelNames {
99-
entryLabels[t] = fmt.Sprintf("%v", metric[t])
100-
}
101-
entry := entryInfo{
102-
eInfo: entrySignature{
103-
Name: e.prefix + counterName,
104-
Labels: entryLabels,
105-
},
106-
value: valueFloat,
107-
}
108-
out = append(out, entry)
109-
// push the metric to prometheus
110-
if counterInfo.promCounter != nil {
111-
counterInfo.promCounter.With(entryLabels).Add(valueFloat)
112-
}
113-
cEntry := e.saveEntryInCache(entry, entryLabels)
114-
cEntry.PromMetric.metricType = api.PromEncodeOperationName("Counter")
115-
cEntry.PromMetric.promCounter = counterInfo.promCounter
87+
for _, metric := range metrics {
88+
// TODO: We may need different handling for histograms
89+
metricOut := e.EncodeMetric(metric)
90+
out = append(out, metricOut...)
11691
}
92+
log.Debugf("out = %v", out)
93+
log.Debugf("cache = %v", e.mCache)
94+
log.Debugf("list = %v", e.mList)
11795
return out
11896
}
119-
func (e *encodeProm) EncodeGauge(metric config.GenericMap) []interface{} {
97+
98+
func (e *encodeProm) EncodeMetric(metric config.GenericMap) []interface{} {
99+
log.Debugf("entering EncodeMetric metric = %v", metric)
100+
// TODO: We may need different handling for histograms
120101
out := make([]interface{}, 0)
121-
for gaugeName, gaugeInfo := range e.gauges {
122-
gaugeValue, ok := metric[gaugeInfo.input]
102+
for metricName, mInfo := range e.metrics {
103+
metricValue, ok := metric[mInfo.input]
123104
if !ok {
124-
log.Debugf("field %v is missing", gaugeName)
105+
log.Debugf("field %v is missing", metricName)
125106
continue
126107
}
127-
gaugeValueString := fmt.Sprintf("%v", gaugeValue)
128-
valueFloat, err := strconv.ParseFloat(gaugeValueString, 64)
108+
metricValueString := fmt.Sprintf("%v", metricValue)
109+
valueFloat, err := strconv.ParseFloat(metricValueString, 64)
129110
if err != nil {
130-
log.Debugf("field cannot be converted to float: %v, %s", gaugeValue, gaugeValueString)
111+
log.Debugf("field cannot be converted to float: %v, %s", metricValue, metricValueString)
131112
continue
132113
}
133-
entryLabels := make(map[string]string, len(gaugeInfo.labelNames))
134-
for _, t := range gaugeInfo.labelNames {
114+
log.Debugf("metricName = %v, metricValue = %v, valueFloat = %v", metricName, metricValue, valueFloat)
115+
entryLabels := make(map[string]string, len(mInfo.labelNames))
116+
for _, t := range mInfo.labelNames {
135117
entryLabels[t] = fmt.Sprintf("%v", metric[t])
136118
}
137119
entry := entryInfo{
138120
eInfo: entrySignature{
139-
Name: e.prefix + gaugeName,
121+
Name: e.prefix + metricName,
140122
Labels: entryLabels,
141123
},
142124
value: valueFloat,
143125
}
144126
out = append(out, entry)
145-
// push the metric to prometheus
146-
if gaugeInfo.promGauge != nil {
147-
gaugeInfo.promGauge.With(entryLabels).Set(valueFloat)
148-
}
149127

150128
cEntry := e.saveEntryInCache(entry, entryLabels)
151-
cEntry.PromMetric.metricType = api.PromEncodeOperationName("Gauge")
152-
cEntry.PromMetric.promGauge = gaugeInfo.promGauge
129+
cEntry.PromMetric.metricType = mInfo.PromMetric.metricType
130+
// push the metric to prometheus
131+
switch mInfo.PromMetric.metricType {
132+
case api.PromEncodeOperationName("Gauge"):
133+
mInfo.promGauge.With(entryLabels).Set(valueFloat)
134+
cEntry.PromMetric.promGauge = mInfo.promGauge
135+
case api.PromEncodeOperationName("Counter"):
136+
mInfo.promCounter.With(entryLabels).Add(valueFloat)
137+
cEntry.PromMetric.promCounter = mInfo.promCounter
138+
case api.PromEncodeOperationName("Histogram"):
139+
// TODO: Need to perform proper function for histogram
140+
//mInfo.promHist.With(entryLabels).Set(valueFloat)
141+
cEntry.PromMetric.promHist = mInfo.promHist
142+
}
153143
}
154144
return out
155145
}
@@ -187,23 +177,6 @@ func (e *encodeProm) saveEntryInCache(entry entryInfo, entryLabels map[string]st
187177
return cEntry
188178
}
189179

190-
// Encode encodes a metric before being stored
191-
func (e *encodeProm) Encode(metrics []config.GenericMap) []interface{} {
192-
log.Debugf("entering encodeProm Encode")
193-
e.mu.Lock()
194-
defer e.mu.Unlock()
195-
out := make([]interface{}, 0)
196-
for _, metric := range metrics {
197-
gaugeOut := e.EncodeGauge(metric)
198-
out = append(out, gaugeOut...)
199-
counterOut := e.EncodeCounter(metric)
200-
out = append(out, counterOut...)
201-
}
202-
log.Debugf("cache = %v", e.mCache)
203-
log.Debugf("list = %v", e.mList)
204-
return out
205-
}
206-
207180
func (e *encodeProm) cleanupExpiredEntriesLoop() {
208181
for {
209182
e.cleanupExpiredEntries()
@@ -220,7 +193,7 @@ func (e *encodeProm) cleanupExpiredEntries() {
220193
log.Debugf("list = %v", e.mList)
221194
nowInSecs := time.Now().Unix()
222195
expireTime := nowInSecs - e.expiryTime
223-
// go through the list until we reach recently used connections
196+
// go through the list until we reach recently used entries
224197
for {
225198
entry := e.mList.Front()
226199
if entry == nil {
@@ -235,7 +208,7 @@ func (e *encodeProm) cleanupExpiredEntries() {
235208
}
236209

237210
// clean up the entry
238-
log.Debugf("nowInSecs = %d, deleting %s", nowInSecs, c.labels)
211+
log.Debugf("nowInSecs = %d, deleting %v", nowInSecs, c)
239212
switch c.PromMetric.metricType {
240213
case api.PromEncodeOperationName("Gauge"):
241214
c.PromMetric.promGauge.Delete(c.labels)
@@ -282,14 +255,14 @@ func NewEncodeProm() (Encoder, error) {
282255
}
283256
log.Debugf("expiryTime = %d", expiryTime)
284257

285-
counters := make(map[string]metricInfo)
286-
gauges := make(map[string]metricInfo)
287-
histograms := make(map[string]metricInfo)
258+
metrics := make(map[string]metricInfo)
288259
for _, mInfo := range jsonEncodeProm.Metrics {
260+
var pMetric PromMetric
289261
fullMetricName := promPrefix + mInfo.Name
290262
labels := mInfo.Labels
291263
log.Debugf("fullMetricName = %v", fullMetricName)
292264
log.Debugf("Labels = %v", labels)
265+
pMetric.metricType = mInfo.Type
293266
switch mInfo.Type {
294267
case api.PromEncodeOperationName("Counter"):
295268
counter := prometheus.NewCounterVec(prometheus.CounterOpts{Name: fullMetricName, Help: ""}, labels)
@@ -298,29 +271,15 @@ func NewEncodeProm() (Encoder, error) {
298271
log.Errorf("error during prometheus.Register: %v", err)
299272
return nil, err
300273
}
301-
counters[mInfo.Name] = metricInfo{
302-
input: mInfo.ValueKey,
303-
labelNames: labels,
304-
PromMetric: PromMetric{
305-
metricType: api.PromEncodeOperationName("Counter"),
306-
promCounter: counter,
307-
},
308-
}
274+
pMetric.promCounter = counter
309275
case api.PromEncodeOperationName("Gauge"):
310276
gauge := prometheus.NewGaugeVec(prometheus.GaugeOpts{Name: fullMetricName, Help: ""}, labels)
311277
err := prometheus.Register(gauge)
312278
if err != nil {
313279
log.Errorf("error during prometheus.Register: %v", err)
314280
return nil, err
315281
}
316-
gauges[mInfo.Name] = metricInfo{
317-
input: mInfo.ValueKey,
318-
labelNames: labels,
319-
PromMetric: PromMetric{
320-
metricType: api.PromEncodeOperationName("Gauge"),
321-
promGauge: gauge,
322-
},
323-
}
282+
pMetric.promGauge = gauge
324283
case api.PromEncodeOperationName("Histogram"):
325284
log.Debugf("buckets = %v", mInfo.Buckets)
326285
hist := prometheus.NewHistogramVec(prometheus.HistogramOpts{Name: fullMetricName, Help: "", Buckets: mInfo.Buckets}, labels)
@@ -329,25 +288,23 @@ func NewEncodeProm() (Encoder, error) {
329288
log.Errorf("error during prometheus.Register: %v", err)
330289
return nil, err
331290
}
332-
histograms[mInfo.Name] = metricInfo{
333-
input: mInfo.ValueKey,
334-
labelNames: labels,
335-
PromMetric: PromMetric{
336-
metricType: api.PromEncodeOperationName("Histogram"),
337-
promHist: hist,
338-
},
339-
}
291+
pMetric.promHist = hist
340292
case "default":
341293
log.Errorf("invalid metric type = %v, skipping", mInfo.Type)
294+
continue
295+
}
296+
metrics[mInfo.Name] = metricInfo{
297+
input: mInfo.ValueKey,
298+
labelNames: labels,
299+
PromMetric: pMetric,
342300
}
343301
}
344302

303+
log.Debugf("metrics = %v", metrics)
345304
w := &encodeProm{
346305
port: fmt.Sprintf(":%v", portNum),
347306
prefix: promPrefix,
348-
counters: counters,
349-
gauges: gauges,
350-
histograms: histograms,
307+
metrics: metrics,
351308
expiryTime: expiryTime,
352309
mList: list.New(),
353310
mCache: make(metricCache),

pkg/pipeline/encode/encode_prom_test.go

Lines changed: 8 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -77,25 +77,21 @@ func Test_NewEncodeProm(t *testing.T) {
7777
encodeProm := newEncode.(*encodeProm)
7878
require.Equal(t, ":9103", encodeProm.port)
7979
require.Equal(t, "test_", encodeProm.prefix)
80-
require.Equal(t, 1, len(encodeProm.gauges))
81-
require.Equal(t, 1, len(encodeProm.counters))
82-
require.Equal(t, 1, len(encodeProm.histograms))
80+
require.Equal(t, 3, len(encodeProm.metrics))
8381
require.Equal(t, int64(1), encodeProm.expiryTime)
8482

85-
gauges := encodeProm.gauges
86-
assert.Contains(t, gauges, "Bytes")
87-
gInfo := gauges["Bytes"]
83+
metrics := encodeProm.metrics
84+
assert.Contains(t, metrics, "Bytes")
85+
gInfo := metrics["Bytes"]
8886
require.Equal(t, gInfo.input, "bytes")
8987
expectedList := []string{"srcAddr", "dstAddr", "srcPort"}
9088
require.Equal(t, gInfo.labelNames, expectedList)
9189

92-
counters := encodeProm.counters
93-
assert.Contains(t, counters, "Packets")
94-
cInfo := counters["Packets"]
90+
assert.Contains(t, metrics, "Packets")
91+
cInfo := metrics["Packets"]
9592
require.Equal(t, cInfo.input, "packets")
9693
expectedList = []string{"srcAddr", "dstAddr", "dstPort"}
9794
require.Equal(t, cInfo.labelNames, expectedList)
98-
9995
entry := test.GetExtractMockEntry()
10096
input := []config.GenericMap{entry}
10197
output := encodeProm.Encode(input)
@@ -123,8 +119,7 @@ func Test_NewEncodeProm(t *testing.T) {
123119
value: float64(34),
124120
}
125121
expectedOutput := []interface{}{gEntryInfo1, gEntryInfo2}
126-
require.Equal(t, output, expectedOutput)
127-
122+
require.Equal(t, expectedOutput, output)
128123
gaugeA, err := gInfo.promGauge.GetMetricWith(entryLabels1)
129124
require.Equal(t, nil, err)
130125
bytesA := testutil.ToFloat64(gaugeA)
@@ -164,7 +159,7 @@ func Test_EncodeAggregate(t *testing.T) {
164159
newEncode := &encodeProm{
165160
port: ":0000",
166161
prefix: "test_",
167-
gauges: map[string]metricInfo{
162+
metrics: map[string]metricInfo{
168163
"gauge": {
169164
input: "test_aggregate_value",
170165
labelNames: []string{"by", "aggregate"},

0 commit comments

Comments
 (0)