Skip to content

Commit 2874b60

Browse files
authored
Add debug logs to ingest (#246)
* added debug output of decoded records in ingest * fixed import
1 parent b74bf07 commit 2874b60

File tree

5 files changed

+10
-1
lines changed

5 files changed

+10
-1
lines changed

docs/api.md

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -63,6 +63,8 @@ Following is the supported API format for the kafka ingest:
6363
type: (enum) one of the following:
6464
json: JSON decoder
6565
protobuf: Protobuf decoder
66+
batchMaxLen: the number of accumulated flows before being forwarded for processing
67+
commitInterval: the interval (in milliseconds) at which offsets are committed to the broker. If 0, commits will be handled synchronously.
6668
</pre>
6769
## Ingest GRPC from Network Observability eBPF Agent
6870
Following is the supported API format for the Network Observability eBPF ingest:

pkg/pipeline/ingest/ingest_collector.go

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -174,6 +174,7 @@ func (ingestC *ingestCollector) processLogLines(out chan<- []config.GenericMap)
174174
log.Debugf("ingestCollector sending %d entries, %d entries waiting", len(records), len(ingestC.in))
175175
linesProcessed.Add(float64(len(records)))
176176
queueLength.Set(float64(len(out)))
177+
log.Debugf("ingestCollector records = %v", records)
177178
out <- records
178179
records = []config.GenericMap{}
179180
}
@@ -189,6 +190,7 @@ func (ingestC *ingestCollector) processLogLines(out chan<- []config.GenericMap)
189190
log.Debugf("ingestCollector sending %d entries, %d entries waiting", len(records), len(ingestC.in))
190191
linesProcessed.Add(float64(len(records)))
191192
queueLength.Set(float64(len(out)))
193+
log.Debugf("ingestCollector records = %v", records)
192194
out <- records
193195
records = []config.GenericMap{}
194196
}

pkg/pipeline/ingest/ingest_file.go

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -64,6 +64,7 @@ func (ingestF *IngestFile) Ingest(out chan<- []config.GenericMap) {
6464
lines = append(lines, text)
6565
}
6666
decoded := ingestF.decoder.Decode(lines)
67+
log.Debugf("IngestFile decoded = %v", decoded)
6768

6869
log.Debugf("Ingesting %d log lines from %s", len(lines), filename)
6970
switch ingestF.params.Type {

pkg/pipeline/ingest/ingest_grpc.go

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -15,6 +15,7 @@ import (
1515
"github.com/netobserv/netobserv-ebpf-agent/pkg/pbflow"
1616
flow "github.com/netsampler/goflow2/utils"
1717
"github.com/prometheus/client_golang/prometheus"
18+
log "github.com/sirupsen/logrus"
1819
grpc2 "google.golang.org/grpc"
1920
"google.golang.org/grpc/metadata"
2021
"google.golang.org/protobuf/proto"
@@ -79,7 +80,9 @@ func (no *GRPCProtobuf) Ingest(out chan<- []config.GenericMap) {
7980
no.collector.Close()
8081
}()
8182
for fp := range no.flowPackets {
82-
out <- no.decoder.Decode([]interface{}{fp})
83+
records := no.decoder.Decode([]interface{}{fp})
84+
log.Debugf("GRPCProtobuf records = %v", records)
85+
out <- records
8386
}
8487
}
8588

pkg/pipeline/transform/transform_generic.go

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -33,6 +33,7 @@ func (g *Generic) Transform(input []config.GenericMap) []config.GenericMap {
3333
log.Debugf("entering Generic Transform g = %v", g)
3434
output := make([]config.GenericMap, 0)
3535
for _, entry := range input {
36+
log.Debugf("entry.GenericMap = %v", entry)
3637
outputEntry := make(config.GenericMap)
3738
if g.policy != "replace_keys" {
3839
// copy old map to new map

0 commit comments

Comments
 (0)