Skip to content

Commit 6a351f2

Browse files
author
Mario Macias
authored
NETOBSERV-617: split big payloads in GRPC exporter (#81)
1 parent 82d7b1d commit 6a351f2

File tree

7 files changed

+99
-25
lines changed

7 files changed

+99
-25
lines changed

docs/config.md

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -5,6 +5,8 @@ The following environment variables are available to configure the NetObserv eBF
55
* `EXPORT` (default: `grpc`). Flows' exporter protocol. Accepted values are: `grpc` or `kafka` or `ipfix+tcp` or `ipfix+udp`.
66
* `FLOWS_TARGET_HOST` (required if `EXPORT` is `grpc` or `ipfix+[tcp/udp]`). Host name or IP of the target Flow collector.
77
* `FLOWS_TARGET_PORT` (required if `EXPORT` is `grpc` or `ipfix+[tcp/udp]`). Port of the target flow collector.
8+
* `GRPC_MESSAGE_MAX_FLOWS` (default: `10000`). Specifies the limit, in number of flows, of each GRPC
9+
message. Messages larger than that number will be split and submitted sequentially.
810
* `AGENT_IP` (optional). Allows overriding the reported Agent IP address on each flow.
911
* `AGENT_IP_IFACE` (default: `external`). Specifies which interface should the agent pick the IP
1012
address from in order to report it in the AgentIP field on each flow. Accepted values are:

pkg/agent/agent.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -193,7 +193,7 @@ func buildFlowExporter(cfg *Config) (node.TerminalFunc[[]*flow.Record], error) {
193193
cfg.TargetHost, cfg.TargetPort)
194194
}
195195
target := fmt.Sprintf("%s:%d", cfg.TargetHost, cfg.TargetPort)
196-
grpcExporter, err := exporter.StartGRPCProto(target)
196+
grpcExporter, err := exporter.StartGRPCProto(target, cfg.GRPCMessageMaxFlows)
197197
if err != nil {
198198
return nil, err
199199
}

pkg/agent/config.go

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -42,6 +42,9 @@ type Config struct {
4242
TargetHost string `env:"FLOWS_TARGET_HOST"`
4343
// TargetPort is the port the target Flow collector, when the EXPORT variable is set to "grpc"
4444
TargetPort int `env:"FLOWS_TARGET_PORT"`
45+
// GRPCMessageMaxFlows specifies the limit, in number of flows, of each GRPC message. Messages
46+
// larger than that number will be split and submitted sequentially.
47+
GRPCMessageMaxFlows int `env:"GRPC_MESSAGE_MAX_FLOWS" envDefault:"10000"`
4548
// Interfaces contains the interface names from where flows will be collected. If empty, the agent
4649
// will fetch all the interfaces in the system, excepting the ones listed in ExcludeInterfaces.
4750
// If an entry is enclosed by slashes (e.g. `/br-/`), it will match as regular expression,

pkg/exporter/grpc_proto.go

Lines changed: 13 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -16,16 +16,21 @@ var glog = logrus.WithField("component", "exporter/GRPCProto")
1616
type GRPCProto struct {
1717
hostPort string
1818
clientConn *grpc.ClientConnection
19+
// maxFlowsPerMessage limits the maximum number of flows per GRPC message.
20+
// If a message contains more flows than this number, the GRPC message will be split into
21+
// multiple messages.
22+
maxFlowsPerMessage int
1923
}
2024

21-
func StartGRPCProto(hostPort string) (*GRPCProto, error) {
25+
func StartGRPCProto(hostPort string, maxFlowsPerMessage int) (*GRPCProto, error) {
2226
clientConn, err := grpc.ConnectClient(hostPort)
2327
if err != nil {
2428
return nil, err
2529
}
2630
return &GRPCProto{
27-
hostPort: hostPort,
28-
clientConn: clientConn,
31+
hostPort: hostPort,
32+
clientConn: clientConn,
33+
maxFlowsPerMessage: maxFlowsPerMessage,
2934
}, nil
3035
}
3136

@@ -34,10 +39,11 @@ func StartGRPCProto(hostPort string) (*GRPCProto, error) {
3439
func (g *GRPCProto) ExportFlows(input <-chan []*flow.Record) {
3540
log := glog.WithField("collector", g.hostPort)
3641
for inputRecords := range input {
37-
pbRecords := flowsToPB(inputRecords)
38-
log.Debugf("sending %d records", len(pbRecords.Entries))
39-
if _, err := g.clientConn.Client().Send(context.TODO(), pbRecords); err != nil {
40-
log.WithError(err).Error("couldn't send flow records to collector")
42+
for _, pbRecords := range flowsToPB(inputRecords, g.maxFlowsPerMessage) {
43+
log.Debugf("sending %d records", len(pbRecords.Entries))
44+
if _, err := g.clientConn.Client().Send(context.TODO(), pbRecords); err != nil {
45+
log.WithError(err).Error("couldn't send flow records to collector")
46+
}
4147
}
4248
}
4349
if err := g.clientConn.Close(); err != nil {

pkg/exporter/grpc_proto_test.go

Lines changed: 50 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -6,6 +6,8 @@ import (
66
"testing"
77
"time"
88

9+
test2 "github.com/netobserv/netobserv-ebpf-agent/pkg/test"
10+
911
"github.com/mariomac/guara/pkg/test"
1012
"github.com/netobserv/netobserv-ebpf-agent/pkg/flow"
1113
"github.com/netobserv/netobserv-ebpf-agent/pkg/grpc"
@@ -21,11 +23,12 @@ func TestGRPCProto_ExportFlows_AgentIP(t *testing.T) {
2123
port, err := test.FreeTCPPort()
2224
require.NoError(t, err)
2325
serverOut := make(chan *pbflow.Records)
24-
_, err = grpc.StartCollector(port, serverOut)
26+
coll, err := grpc.StartCollector(port, serverOut)
2527
require.NoError(t, err)
28+
defer coll.Close()
2629

2730
// Start GRPCProto exporter stage
28-
exporter, err := StartGRPCProto(fmt.Sprintf("127.0.0.1:%d", port))
31+
exporter, err := StartGRPCProto(fmt.Sprintf("127.0.0.1:%d", port), 1000)
2932
require.NoError(t, err)
3033

3134
// Send some flows to the input of the exporter stage
@@ -37,23 +40,14 @@ func TestGRPCProto_ExportFlows_AgentIP(t *testing.T) {
3740
{RawRecord: flow.RawRecord{RecordKey: flow.RecordKey{EthProtocol: flow.IPv6Type}},
3841
AgentIP: net.ParseIP("8888::1111")},
3942
}
40-
close(flows)
4143
go exporter.ExportFlows(flows)
4244

43-
var rs *pbflow.Records
44-
select {
45-
case rs = <-serverOut:
46-
case <-time.After(timeout):
47-
require.Fail(t, "timeout waiting for flows")
48-
}
45+
rs := test2.ReceiveTimeout(t, serverOut, timeout)
4946
assert.Len(t, rs.Entries, 1)
5047
r := rs.Entries[0]
5148
assert.EqualValues(t, 0x0a090807, r.GetAgentIp().GetIpv4())
52-
select {
53-
case rs = <-serverOut:
54-
case <-time.After(timeout):
55-
require.Fail(t, "timeout waiting for flows")
56-
}
49+
50+
rs = test2.ReceiveTimeout(t, serverOut, timeout)
5751
assert.Len(t, rs.Entries, 1)
5852
r = rs.Entries[0]
5953
assert.EqualValues(t, net.ParseIP("8888::1111"), r.GetAgentIp().GetIpv6())
@@ -65,3 +59,45 @@ func TestGRPCProto_ExportFlows_AgentIP(t *testing.T) {
6559
//ok!
6660
}
6761
}
62+
63+
func TestGRPCProto_SplitLargeMessages(t *testing.T) {
64+
// start remote ingestor
65+
port, err := test.FreeTCPPort()
66+
require.NoError(t, err)
67+
serverOut := make(chan *pbflow.Records)
68+
coll, err := grpc.StartCollector(port, serverOut)
69+
require.NoError(t, err)
70+
defer coll.Close()
71+
72+
const msgMaxLen = 10000
73+
// Start GRPCProto exporter stage
74+
exporter, err := StartGRPCProto(fmt.Sprintf("127.0.0.1:%d", port), msgMaxLen)
75+
require.NoError(t, err)
76+
77+
// Send a message much longer than the limit length
78+
flows := make(chan []*flow.Record, 10)
79+
var input []*flow.Record
80+
for i := 0; i < 25000; i++ {
81+
input = append(input, &flow.Record{RawRecord: flow.RawRecord{RecordKey: flow.RecordKey{
82+
EthProtocol: flow.IPv6Type,
83+
}}, AgentIP: net.ParseIP("1111::1111"), Interface: "12345678"})
84+
}
85+
flows <- input
86+
go exporter.ExportFlows(flows)
87+
88+
// expect that the submitted message is split in chunks no longer than msgMaxLen
89+
rs := test2.ReceiveTimeout(t, serverOut, timeout)
90+
assert.Len(t, rs.Entries, msgMaxLen)
91+
rs = test2.ReceiveTimeout(t, serverOut, timeout)
92+
assert.Len(t, rs.Entries, msgMaxLen)
93+
rs = test2.ReceiveTimeout(t, serverOut, timeout)
94+
assert.Len(t, rs.Entries, 5000)
95+
96+
// after all the operation, no more flows are sent
97+
select {
98+
case rs = <-serverOut:
99+
assert.Failf(t, "shouldn't have received any flow", "Got: %#v", rs)
100+
default:
101+
//ok!
102+
}
103+
}

pkg/exporter/proto.go

Lines changed: 10 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -11,14 +11,21 @@ import (
1111

1212
// flowsToPB is an auxiliary function to convert flow records, as returned by the eBPF agent,
1313
// into protobuf-encoded messages ready to be sent to the collector via GRPC
14-
func flowsToPB(inputRecords []*flow.Record) *pbflow.Records {
14+
func flowsToPB(inputRecords []*flow.Record, maxLen int) []*pbflow.Records {
1515
entries := make([]*pbflow.Record, 0, len(inputRecords))
1616
for _, record := range inputRecords {
1717
entries = append(entries, flowToPB(record))
1818
}
19-
return &pbflow.Records{
20-
Entries: entries,
19+
var records []*pbflow.Records
20+
for len(entries) > 0 {
21+
end := len(entries)
22+
if end > maxLen {
23+
end = maxLen
24+
}
25+
records = append(records, &pbflow.Records{Entries: entries[:end]})
26+
entries = entries[end:]
2127
}
28+
return records
2229
}
2330

2431
// flowsToPB is an auxiliary function to convert a single flow record, as returned by the eBPF agent,

pkg/test/channels.go

Lines changed: 20 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,20 @@
1+
package test
2+
3+
import (
4+
"testing"
5+
"time"
6+
)
7+
8+
// ReceiveTimeout returns the first received element or fails the test if nothing is received
9+
// before the given timeout
10+
func ReceiveTimeout[T any](t *testing.T, ch <-chan T, timeout time.Duration) T {
11+
t.Helper()
12+
select {
13+
case e := <-ch:
14+
return e
15+
case <-time.After(timeout):
16+
var z T
17+
t.Fatalf("timeout while waiting %s for a %T element in channel", timeout, z)
18+
return z
19+
}
20+
}

0 commit comments

Comments
 (0)