Skip to content

Commit 145cc54

Browse files
committed
Add ingest count and latency metrics for ipfix collector
Also, do not expose unused metrics, such as stage duration with ipfix collector Fixes #1032
1 parent 975f771 commit 145cc54

File tree

4 files changed

+85
-28
lines changed

4 files changed

+85
-28
lines changed

pkg/pipeline/ingest/ingest_grpc.go

Lines changed: 9 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -45,7 +45,15 @@ func NewGRPCProtobuf(opMetrics *operational.Metrics, params config.StageParam) (
4545
bufLen = defaultBufferLen
4646
}
4747
flowPackets := make(chan *pbflow.Records, bufLen)
48-
metrics := newMetrics(opMetrics, params.Name, params.Ingest.Type, func() int { return len(flowPackets) })
48+
metrics := newMetrics(
49+
opMetrics,
50+
params.Name,
51+
params.Ingest.Type,
52+
func() int { return len(flowPackets) },
53+
withLatency(),
54+
withBatchSizeBytes(),
55+
withStageDuration(),
56+
)
4957
collector, err := grpc.StartCollector(netObserv.Port, flowPackets,
5058
grpc.WithGRPCServerOptions(grpc2.UnaryInterceptor(instrumentGRPC(metrics))))
5159
if err != nil {

pkg/pipeline/ingest/ingest_ipfix.go

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -176,6 +176,8 @@ func (c *ingestIPFIX) processLogLines(out chan<- config.GenericMap) {
176176
ilog.Infof("Exit signal received, stop processing input")
177177
return
178178
case record := <-c.in:
179+
c.metrics.flowsProcessed.Inc()
180+
c.metrics.observeLatency(record)
179181
out <- record
180182
}
181183
}
@@ -195,7 +197,7 @@ func NewIngestIPFIX(opMetrics *operational.Metrics, params config.StageParam) (I
195197
ilog.Infof("Ingest IPFIX config: [%s]", cfg.String())
196198

197199
in := make(chan map[string]interface{}, channelSize)
198-
metrics := newMetrics(opMetrics, params.Name, params.Ingest.Type, func() int { return len(in) })
200+
metrics := newMetrics(opMetrics, params.Name, params.Ingest.Type, func() int { return len(in) }, withLatency())
199201

200202
return &ingestIPFIX{
201203
IngestIpfix: &cfg,

pkg/pipeline/ingest/ingest_kafka.go

Lines changed: 9 additions & 21 deletions
Original file line numberDiff line numberDiff line change
@@ -107,33 +107,14 @@ func (k *ingestKafka) isStopped() bool {
107107
}
108108
}
109109

110-
func (k *ingestKafka) processRecordDelay(record config.GenericMap) {
111-
timeFlowEndInterface, ok := record["TimeFlowEndMs"]
112-
if !ok {
113-
// "trace" level used to minimize performance impact
114-
klog.Tracef("TimeFlowEndMs missing in record %v", record)
115-
k.metrics.error("TimeFlowEndMs missing")
116-
return
117-
}
118-
timeFlowEnd, ok := timeFlowEndInterface.(int64)
119-
if !ok {
120-
// "trace" level used to minimize performance impact
121-
klog.Tracef("Cannot parse TimeFlowEndMs of record %v", record)
122-
k.metrics.error("Cannot parse TimeFlowEndMs")
123-
return
124-
}
125-
delay := time.Since(time.UnixMilli(timeFlowEnd)).Seconds()
126-
k.metrics.latency.Observe(delay)
127-
}
128-
129110
func (k *ingestKafka) processRecord(record []byte, out chan<- config.GenericMap) {
130111
// Decode batch
131112
decoded, err := k.decoder.Decode(record)
132113
if err != nil {
133114
klog.WithError(err).Warnf("ignoring flow")
134115
return
135116
}
136-
k.processRecordDelay(decoded)
117+
k.metrics.observeLatency(decoded)
137118

138119
// Send batch
139120
out <- decoded
@@ -189,7 +170,14 @@ func NewIngestKafka(opMetrics *operational.Metrics, params config.StageParam) (I
189170
}
190171

191172
in := make(chan []byte, 2*bml)
192-
metrics := newMetrics(opMetrics, params.Name, ingestType, func() int { return len(in) })
173+
metrics := newMetrics(
174+
opMetrics,
175+
params.Name,
176+
ingestType,
177+
func() int { return len(in) },
178+
withLatency(),
179+
withBatchSizeBytes(),
180+
)
193181

194182
return &ingestKafka{
195183
kafkaReader: kafkaReader,

pkg/pipeline/ingest/metrics.go

Lines changed: 64 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,8 @@
11
package ingest
22

33
import (
4+
"time"
5+
46
"github.com/netobserv/flowlogs-pipeline/pkg/config"
57
"github.com/netobserv/flowlogs-pipeline/pkg/operational"
68
"github.com/prometheus/client_golang/prometheus"
@@ -44,18 +46,48 @@ type metrics struct {
4446
errors *prometheus.CounterVec
4547
}
4648

47-
func newMetrics(opMetrics *operational.Metrics, stage, stageType string, inGaugeFunc func() int) *metrics {
49+
func newMetrics(opMetrics *operational.Metrics, stage, stageType string, inGaugeFunc func() int, opts ...metricsOption) *metrics {
4850
opMetrics.CreateInQueueSizeGauge(stage, inGaugeFunc)
49-
return &metrics{
51+
ret := &metrics{
5052
Metrics: opMetrics,
5153
stage: stage,
5254
stageType: stageType,
53-
latency: opMetrics.NewHistogram(&latencyHistogram, []float64{.001, .01, .1, 1, 10, 100, 1000, 10000}, stage),
54-
stageDuration: opMetrics.GetOrCreateStageDurationHisto().WithLabelValues(stage),
5555
flowsProcessed: opMetrics.NewCounter(&flowsProcessedCounter, stage),
56-
batchSizeBytes: opMetrics.NewSummary(&batchSizeBytesSummary, stage),
5756
errors: opMetrics.NewCounterVec(&errorsCounter),
5857
}
58+
for _, opt := range opts {
59+
ret = opt(ret)
60+
}
61+
return ret
62+
}
63+
64+
type metricsOption func(*metrics) *metrics
65+
66+
func withStageDuration() metricsOption {
67+
return func(m *metrics) *metrics {
68+
if m.stageDuration == nil {
69+
m.stageDuration = m.GetOrCreateStageDurationHisto().WithLabelValues(m.stage)
70+
}
71+
return m
72+
}
73+
}
74+
75+
func withLatency() metricsOption {
76+
return func(m *metrics) *metrics {
77+
if m.latency == nil {
78+
m.latency = m.NewHistogram(&latencyHistogram, []float64{.001, .01, .1, 1, 10, 100, 1000, 10000}, m.stage)
79+
}
80+
return m
81+
}
82+
}
83+
84+
func withBatchSizeBytes() metricsOption {
85+
return func(m *metrics) *metrics {
86+
if m.batchSizeBytes == nil {
87+
m.batchSizeBytes = m.NewSummary(&batchSizeBytesSummary, m.stage)
88+
}
89+
return m
90+
}
5991
}
6092

6193
func (m *metrics) createOutQueueLen(out chan<- config.GenericMap) {
@@ -72,3 +104,30 @@ func (m *metrics) error(code string) {
72104
func (m *metrics) stageDurationTimer() *operational.Timer {
73105
return operational.NewTimer(m.stageDuration)
74106
}
107+
108+
func (m *metrics) observeLatency(record config.GenericMap) {
109+
if m.latency == nil {
110+
return
111+
}
112+
tfeUnknown, ok := record["TimeFlowEndMs"]
113+
if !ok {
114+
m.error("TimeFlowEndMs missing")
115+
return
116+
}
117+
var tfe int64
118+
switch i := tfeUnknown.(type) {
119+
case int64:
120+
tfe = i
121+
case int:
122+
tfe = int64(i)
123+
case uint64:
124+
tfe = int64(i)
125+
case uint:
126+
tfe = int64(i)
127+
default:
128+
m.error("Cannot parse TimeFlowEndMs")
129+
return
130+
}
131+
delay := time.Since(time.UnixMilli(tfe)).Seconds()
132+
m.latency.Observe(delay)
133+
}

0 commit comments

Comments
 (0)