Skip to content

Commit ec135a6

Browse files
author
Mario Macias
authored
NETOBSERV-589: send kafka flows as protobuf lines (#55)
* NETOBSERV-589: send kafka flows as protobuf batches * Moved to 1 flow == 1 kafka message * fixed e2e tests manifests * fixed config doc * remove linger comment
1 parent c696bb0 commit ec135a6

File tree

10 files changed

+219
-197
lines changed

10 files changed

+219
-197
lines changed

docs/config.md

Lines changed: 23 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -23,27 +23,15 @@ The following environment variables are available to configure the NetObserv eBF
2323
that flows are kept in the accounting cache before being flushed to the collector.
2424
* `LOG_LEVEL` (default: `info`). From more to less verbose: `trace`, `debug`, `info`, `warn`,
2525
`error`, `fatal`, `panic`.
26-
* `BUFFERS_LENGTH` (default: `50`). Length of the internal communication channels between the different
27-
processing stages. Most probably you won't need to change this value.
28-
* `LISTEN_INTERFACES` (default: `watch`). Mechanism used by the agent to listen for added or removed
29-
network interfaces. Accepted values are:
30-
- `watch`: interfaces are traced immediately after they are created. This is
31-
the recommended setting for most configurations.
32-
- `poll`: recommended mostly as a fallback mechanism if `watch` misbehaves. It periodically
33-
queries the current network interfaces. The poll frequency is specified by the
34-
`LISTEN_POLL_PERIOD` variable.
35-
* `LISTEN_POLL_PERIOD` (default: `10s`). When `LISTEN_INTERFACES` value is `poll`, this duration
36-
string specifies the frequency in which the current network interfaces are polled.
3726
* `KAFKA_BROKERS` (required if `EXPORT` is `kafka`). Comma-separated list of tha addresses of the
3827
brokers of the Kafka cluster that this agent is configured to send messages to.
3928
* `KAFKA_TOPIC`(default: `network-flows`). Name of the topic where the flows' processor will receive
4029
the flows from.
41-
* `KAFKA_BATCH_SIZE` (default: `100`). Limit on how many messages will be buffered before being sent
30+
* `KAFKA_BATCH_MESSAGES` (default: `1000`). Limit on how many messages will be buffered before being sent
4231
to a Kafka partition.
43-
* `KAFKA_BATCH_BYTES` (default: `1048576`). Limit of the maximum size of a request in bytes before
32+
you actually need to set the `CACHE_MAX_FLOWS` and/or `MESSAGE_MAX_FLOW_ENTRIES`
33+
* `KAFKA_BATCH_SIZE` (default: `1048576`). Limit of the maximum size of a request in bytes before
4434
being sent to a Kafka partition.
45-
* `KAFKA_ASYNC` (default: `true`). If `true`, the message writing process will never block. It also
46-
means that errors are ignored since the caller will not receive the returned value.
4735
* `KAFKA_COMPRESSION` (default: `none`). Compression codec to be used to compress messages. Accepted
4836
values: `none`, `gzip`, `snappy`, `lz4`, `zstd`.
4937
* `KAFKA_ENABLE_TLS` (default: false). If `true`, enable TLS encryption for Kafka messages. The following settings are used only when TLS is enabled:
@@ -53,3 +41,23 @@ The following environment variables are available to configure the NetObserv eBF
5341
* `KAFKA_TLS_USER_KEY_PATH` (default: unset). Path to the user (client) private key for mutual TLS connections.
5442
* `PROFILE_PORT` (default: unset). Sets the listening port for [Go's Pprof tool](https://pkg.go.dev/net/http/pprof).
5543
If it is not set, profile is disabled.
44+
45+
## Development-only variables
46+
47+
The following configuration variables are mostly used for development and fine-grained debugging,
48+
so no user should need to change them.
49+
50+
* `BUFFERS_LENGTH` (default: `50`). Length of the internal communication channels between the different
51+
processing stages.
52+
* `KAFKA_ASYNC` (default: `true`). If `true`, the message writing process will never block. It also
53+
means that errors are ignored since the caller will not receive the returned value.
54+
* `LISTEN_INTERFACES` (default: `watch`). Mechanism used by the agent to listen for added or removed
55+
network interfaces. Accepted values are:
56+
- `watch`: interfaces are traced immediately after they are created. This is
57+
the recommended setting for most configurations.
58+
- `poll`: recommended mostly as a fallback mechanism if `watch` misbehaves. It periodically
59+
queries the current network interfaces. The poll frequency is specified by the
60+
`LISTEN_POLL_PERIOD` variable.
61+
* `LISTEN_POLL_PERIOD` (default: `10s`). When `LISTEN_INTERFACES` value is `poll`, this duration
62+
string specifies the frequency in which the current network interfaces are polled.
63+

e2e/cluster/base/03-flp.yml

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -16,7 +16,7 @@ spec:
1616
serviceAccountName: ebpf-agent-test
1717
containers:
1818
- name: flp
19-
image: quay.io/netobserv/flowlogs-pipeline:v0.1.3-rc3
19+
image: quay.io/netobserv/flowlogs-pipeline:latest
2020
ports:
2121
- containerPort: 9999
2222
hostPort: 9999
@@ -64,7 +64,6 @@ data:
6464
write:
6565
type: loki
6666
loki:
67-
type: loki
6867
staticLabels:
6968
app: netobserv-flowcollector
7069
labels:

e2e/kafka/manifests/20-flp-transformer.yml

Lines changed: 2 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -16,7 +16,7 @@ spec:
1616
serviceAccountName: ebpf-agent-test
1717
containers:
1818
- name: flp
19-
image: quay.io/netobserv/flowlogs-pipeline:v0.1.3-rc3
19+
image: quay.io/netobserv/flowlogs-pipeline:latest
2020
args:
2121
- --config=/etc/flp/config.yaml
2222
volumeMounts:
@@ -49,7 +49,7 @@ data:
4949
topic: network-flows
5050
groupId: kafka-transformer
5151
decoder:
52-
type: json
52+
type: protobuf
5353
- name: enrich
5454
transform:
5555
type: network
@@ -65,7 +65,6 @@ data:
6565
write:
6666
type: loki
6767
loki:
68-
type: loki
6968
staticLabels:
7069
app: netobserv-flowcollector
7170
labels:

pkg/agent/agent.go

Lines changed: 9 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -95,21 +95,27 @@ func FlowsAgent(cfg *Config) (*Flows, error) {
9595
}
9696
transport.TLS = tlsConfig
9797
}
98-
exportFunc = (&exporter.KafkaJSON{
98+
exportFunc = (&exporter.KafkaProto{
9999
Writer: &kafkago.Writer{
100100
Addr: kafkago.TCP(cfg.KafkaBrokers...),
101101
Topic: cfg.KafkaTopic,
102-
BatchSize: cfg.KafkaBatchSize,
102+
BatchSize: cfg.KafkaBatchMessages,
103+
// Assigning KafkaBatchSize to BatchBytes instead of BatchSize might be confusing here.
104+
// The reason is that the "standard" Kafka name for this variable is "batch.size",
105+
// which specifies the size of messages in terms of bytes, and not in terms of entries.
106+
// We have decided to hide this library implementation detail and expose to the
107+
// customer the common, standard name and meaning for batch.size
108+
BatchBytes: int64(cfg.KafkaBatchSize),
103109
// Segmentio's Kafka-go does not behave as standard Kafka library, and would
104110
// throttle any Write invocation until reaching the timeout.
105111
// Since we invoke write once each CacheActiveTimeout, we can safely disable this
106112
// timeout throttling
107113
// https://github.com/netobserv/flowlogs-pipeline/pull/233#discussion_r897830057
108114
BatchTimeout: time.Nanosecond,
109-
BatchBytes: int64(cfg.KafkaBatchBytes),
110115
Async: cfg.KafkaAsync,
111116
Compression: compression,
112117
Transport: &transport,
118+
Balancer: &kafkago.RoundRobin{},
113119
},
114120
}).ExportFlows
115121
default:

pkg/agent/config.go

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -55,12 +55,12 @@ type Config struct {
5555
KafkaBrokers []string `env:"KAFKA_BROKERS" envSeparator:","`
5656
// KafkaTopic is the name of the topic where the flows' processor will receive the flows from.
5757
KafkaTopic string `env:"KAFKA_TOPIC" envDefault:"network-flows"`
58-
// KafkaBatchSize sets the limit on how many messages will be buffered before being sent to a
58+
// KafkaBatchMessages sets the limit on how many messages will be buffered before being sent to a
5959
// partition.
60-
KafkaBatchSize int `env:"KAFKA_BATCH_SIZE" envDefault:"100"`
61-
// KafkaBatchBytes sets the limit of the maximum size of a request in bytes before being sent
60+
KafkaBatchMessages int `env:"KAFKA_BATCH_MESSAGES" envDefault:"1000"`
61+
// KafkaBatchSize sets the limit, in bytes, of the maximum size of a request before being sent
6262
// to a partition.
63-
KafkaBatchBytes int `env:"KAFKA_BATCH_BYTES" envDefault:"1048576"`
63+
KafkaBatchSize int `env:"KAFKA_BATCH_SIZE" envDefault:"1048576"`
6464
// KafkaAsync. If it's true, the message writing process will never block. It also means that
6565
// errors are ignored since the caller will not receive the returned value.
6666
KafkaAsync bool `env:"KAFKA_ASYNC" envDefault:"true"`

pkg/exporter/grpc_proto.go

Lines changed: 4 additions & 93 deletions
Original file line numberDiff line numberDiff line change
@@ -3,12 +3,9 @@ package exporter
33
import (
44
"context"
55

6-
"github.com/sirupsen/logrus"
7-
"google.golang.org/protobuf/types/known/timestamppb"
8-
96
"github.com/netobserv/netobserv-ebpf-agent/pkg/flow"
107
"github.com/netobserv/netobserv-ebpf-agent/pkg/grpc"
11-
"github.com/netobserv/netobserv-ebpf-agent/pkg/pbflow"
8+
"github.com/sirupsen/logrus"
129
)
1310

1411
var glog = logrus.WithField("component", "exporter/GRPCProto")
@@ -37,99 +34,13 @@ func StartGRPCProto(hostPort string) (*GRPCProto, error) {
3734
func (g *GRPCProto) ExportFlows(input <-chan []*flow.Record) {
3835
log := glog.WithField("collector", g.hostPort)
3936
for inputRecords := range input {
40-
entries := make([]*pbflow.Record, 0, len(inputRecords))
41-
for _, record := range inputRecords {
42-
entries = append(entries, flowToPB(record))
43-
}
44-
log.Debugf("sending %d records", len(entries))
45-
if _, err := g.clientConn.Client().Send(context.TODO(), &pbflow.Records{
46-
Entries: entries,
47-
}); err != nil {
37+
pbRecords := flowsToPB(inputRecords)
38+
log.Debugf("sending %d records", len(pbRecords.Entries))
39+
if _, err := g.clientConn.Client().Send(context.TODO(), pbRecords); err != nil {
4840
log.WithError(err).Error("couldn't send flow records to collector")
4941
}
50-
5142
}
5243
if err := g.clientConn.Close(); err != nil {
5344
log.WithError(err).Warn("couldn't close flow export client")
5445
}
5546
}
56-
57-
func v4FlowToPB(fr *flow.Record) *pbflow.Record {
58-
return &pbflow.Record{
59-
EthProtocol: uint32(fr.EthProtocol),
60-
Direction: pbflow.Direction(fr.Direction),
61-
DataLink: &pbflow.DataLink{
62-
SrcMac: macToUint64(&fr.DataLink.SrcMac),
63-
DstMac: macToUint64(&fr.DataLink.DstMac),
64-
},
65-
Network: &pbflow.Network{
66-
SrcAddr: &pbflow.IP{IpFamily: &pbflow.IP_Ipv4{Ipv4: fr.Network.SrcAddr.IntEncodeV4()}},
67-
DstAddr: &pbflow.IP{IpFamily: &pbflow.IP_Ipv4{Ipv4: fr.Network.DstAddr.IntEncodeV4()}},
68-
},
69-
Transport: &pbflow.Transport{
70-
Protocol: uint32(fr.Transport.Protocol),
71-
SrcPort: uint32(fr.Transport.SrcPort),
72-
DstPort: uint32(fr.Transport.DstPort),
73-
},
74-
Bytes: fr.Bytes,
75-
TimeFlowStart: &timestamppb.Timestamp{
76-
Seconds: fr.TimeFlowStart.Unix(),
77-
Nanos: int32(fr.TimeFlowStart.Nanosecond()),
78-
},
79-
TimeFlowEnd: &timestamppb.Timestamp{
80-
Seconds: fr.TimeFlowEnd.Unix(),
81-
Nanos: int32(fr.TimeFlowEnd.Nanosecond()),
82-
},
83-
Packets: uint64(fr.Packets),
84-
Interface: fr.Interface,
85-
}
86-
}
87-
88-
func v6FlowToPB(fr *flow.Record) *pbflow.Record {
89-
return &pbflow.Record{
90-
EthProtocol: uint32(fr.EthProtocol),
91-
Direction: pbflow.Direction(fr.Direction),
92-
DataLink: &pbflow.DataLink{
93-
SrcMac: macToUint64(&fr.DataLink.SrcMac),
94-
DstMac: macToUint64(&fr.DataLink.DstMac),
95-
},
96-
Network: &pbflow.Network{
97-
SrcAddr: &pbflow.IP{IpFamily: &pbflow.IP_Ipv6{Ipv6: fr.Network.SrcAddr[:]}},
98-
DstAddr: &pbflow.IP{IpFamily: &pbflow.IP_Ipv6{Ipv6: fr.Network.DstAddr[:]}},
99-
},
100-
Transport: &pbflow.Transport{
101-
Protocol: uint32(fr.Transport.Protocol),
102-
SrcPort: uint32(fr.Transport.SrcPort),
103-
DstPort: uint32(fr.Transport.DstPort),
104-
},
105-
Bytes: fr.Bytes,
106-
TimeFlowStart: &timestamppb.Timestamp{
107-
Seconds: fr.TimeFlowStart.Unix(),
108-
Nanos: int32(fr.TimeFlowStart.Nanosecond()),
109-
},
110-
TimeFlowEnd: &timestamppb.Timestamp{
111-
Seconds: fr.TimeFlowEnd.Unix(),
112-
Nanos: int32(fr.TimeFlowEnd.Nanosecond()),
113-
},
114-
Packets: uint64(fr.Packets),
115-
Interface: fr.Interface,
116-
}
117-
}
118-
119-
func flowToPB(fr *flow.Record) *pbflow.Record {
120-
if fr.EthProtocol == flow.IPv6Type {
121-
return v6FlowToPB(fr)
122-
}
123-
return v4FlowToPB(fr)
124-
}
125-
126-
// Mac bytes are encoded in the same order as in the array. This is, a Mac
127-
// like 11:22:33:44:55:66 will be encoded as 0x112233445566
128-
func macToUint64(m *flow.MacAddr) uint64 {
129-
return uint64(m[5]) |
130-
(uint64(m[4]) << 8) |
131-
(uint64(m[3]) << 16) |
132-
(uint64(m[2]) << 24) |
133-
(uint64(m[1]) << 32) |
134-
(uint64(m[0]) << 40)
135-
}

pkg/exporter/kafka_json.go

Lines changed: 0 additions & 55 deletions
This file was deleted.

pkg/exporter/kafka_proto.go

Lines changed: 54 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,54 @@
1+
package exporter
2+
3+
import (
4+
"context"
5+
6+
"github.com/netobserv/netobserv-ebpf-agent/pkg/flow"
7+
kafkago "github.com/segmentio/kafka-go"
8+
"github.com/sirupsen/logrus"
9+
"google.golang.org/protobuf/proto"
10+
)
11+
12+
var klog = logrus.WithField("component", "exporter/KafkaProto")
13+
14+
type kafkaWriter interface {
15+
WriteMessages(ctx context.Context, msgs ...kafkago.Message) error
16+
}
17+
18+
// KafkaProto exports flows over Kafka, encoded as a protobuf that is understandable by the
19+
// Flowlogs-Pipeline collector
20+
type KafkaProto struct {
21+
Writer kafkaWriter
22+
}
23+
24+
func (kp *KafkaProto) ExportFlows(input <-chan []*flow.Record) {
25+
klog.Info("starting Kafka exporter")
26+
for records := range input {
27+
kp.batchAndSubmit(records)
28+
}
29+
}
30+
31+
func (kp *KafkaProto) batchAndSubmit(records []*flow.Record) {
32+
klog.Debugf("sending %d records", len(records))
33+
msgs := make([]kafkago.Message, 0, len(records))
34+
for _, record := range records {
35+
pbBytes, err := proto.Marshal(flowToPB(record))
36+
if err != nil {
37+
klog.WithError(err).Debug("can't encode protobuf message. Ignoring")
38+
continue
39+
}
40+
msgs = append(msgs, kafkago.Message{Value: pbBytes})
41+
}
42+
43+
if err := kp.Writer.WriteMessages(context.TODO(), msgs...); err != nil {
44+
klog.WithError(err).Error("can't write messages into Kafka")
45+
}
46+
}
47+
48+
type JSONRecord struct {
49+
*flow.Record
50+
TimeFlowStart int64
51+
TimeFlowEnd int64
52+
TimeFlowStartMs int64
53+
TimeFlowEndMs int64
54+
}

0 commit comments

Comments
 (0)