Skip to content

Commit 467e409

Browse files
Merge pull request #250 from OlivierCazade/kafka-metrics
Added metrics to kafka ingest
2 parents b350404 + e277183 commit 467e409

File tree

1 file changed

+5
-1
lines changed

1 file changed

+5
-1
lines changed

pkg/pipeline/ingest/ingest_kafka.go

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -97,10 +97,12 @@ func (ingestK *ingestKafka) processLogLines(out chan<- []config.GenericMap) {
9797
if len(records) >= ingestK.batchMaxLength {
9898
log.Debugf("ingestKafka sending %d records, %d entries waiting", len(records), len(ingestK.in))
9999
decoded := ingestK.decoder.Decode(records)
100-
out <- decoded
100+
linesProcessed.Add(float64(len(records)))
101+
queueLength.Set(float64(len(out)))
101102
ingestK.prevRecords = decoded
102103
log.Debugf("prevRecords = %v", ingestK.prevRecords)
103104
records = []interface{}{}
105+
out <- decoded
104106
}
105107
case <-flushRecords.C: // Maximum batch time for each batch
106108
// Process batch of records (if not empty)
@@ -113,6 +115,8 @@ func (ingestK *ingestKafka) processLogLines(out chan<- []config.GenericMap) {
113115
}
114116
log.Debugf("ingestKafka sending %d records, %d entries waiting", len(records), len(ingestK.in))
115117
decoded := ingestK.decoder.Decode(records)
118+
linesProcessed.Add(float64(len(records)))
119+
queueLength.Set(float64(len(out)))
116120
ingestK.prevRecords = decoded
117121
log.Debugf("prevRecords = %v", ingestK.prevRecords)
118122
out <- decoded

0 commit comments

Comments
 (0)