Skip to content

Commit aa7838d

Browse files
author
Mario Macias
authored
NETOBSERV-759: decorate flows with agent IP (#78)
* NETOBSERV-759: decorate flows with agent IP only for Protocol Buffers encoding. IPFIX is still missing. * Fix GRPC export + test
1 parent cb99105 commit aa7838d

File tree

20 files changed

+540
-75
lines changed

20 files changed

+540
-75
lines changed

Makefile

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -22,7 +22,7 @@ IMG_SHA = $(IMAGE_TAG_BASE):$(BUILD_SHA)
2222
LOCAL_GENERATOR_IMAGE ?= ebpf-generator:latest
2323

2424
CILIUM_EBPF_VERSION := v0.8.1
25-
GOLANGCI_LINT_VERSION = v1.46.2
25+
GOLANGCI_LINT_VERSION = v1.50.1
2626

2727
CLANG ?= clang
2828
CFLAGS := -O2 -g -Wall -Werror $(CFLAGS)

docs/architecture.md

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -23,5 +23,7 @@ flowchart TD
2323
2424
DD --> |"chan []*flow.Record"| CL(flow.CapacityLimiter)
2525
26-
CL --> |"chan []*flow.Record"| EX("export.GRPCProto<br/>or<br/>export.KafkaProto")
26+
CL --> |"chan []*flow.Record"| DC(flow.Decorator)
27+
28+
DC --> |"chan []*flow.Record"| EX("export.GRPCProto<br/>or<br/>export.KafkaProto")
2729
```

docs/config.md

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -5,6 +5,14 @@ The following environment variables are available to configure the NetObserv eBF
55
* `EXPORT` (default: `grpc`). Flows' exporter protocol. Accepted values are: `grpc` or `kafka` or `ipfix+tcp` or `ipfix+udp`.
66
* `FLOWS_TARGET_HOST` (required if `EXPORT` is `grpc` or `ipfix+[tcp/udp]`). Host name or IP of the target Flow collector.
77
* `FLOWS_TARGET_PORT` (required if `EXPORT` is `grpc` or `ipfix+[tcp/udp]`). Port of the target flow collector.
8+
* `AGENT_IP` (optional). Allows overriding the reported Agent IP address on each flow.
9+
* `AGENT_IP_IFACE` (default: `external`). Specifies which interface should the agent pick the IP
10+
address from in order to report it in the AgentIP field on each flow. Accepted values are:
11+
`external` (default), `local`, or `name:<interface name>` (e.g. `name:eth0`). If the `AGENT_IP`
12+
configuration property is set, this property has no effect.
13+
* `AGENT_IP_TYPE` (default: `any`). Specifies which type of IP address (IPv4 or IPv6 or any) should
14+
the agent report in the AgentID field of each flow. Accepted values are: `any` (default), `ipv4`,
15+
`ipv6`. If the `AGENT_IP` configuration property is set, this property has no effect.
816
* `INTERFACES` (optional). Comma-separated list of the interface names from where flows will be collected. If
917
empty, the agent will use all the interfaces in the system, excepting the ones listed in
1018
the `EXCLUDE_INTERFACES` variable.

pkg/agent/agent.go

Lines changed: 32 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -5,6 +5,7 @@ import (
55
"errors"
66
"fmt"
77
"io"
8+
"net"
89
"time"
910

1011
"github.com/cilium/ebpf/ringbuf"
@@ -65,6 +66,10 @@ type Flows struct {
6566
accounter *flow.Accounter
6667
exporter flowExporter
6768

69+
// elements used to decorate flows with extra information
70+
interfaceNamer flow.InterfaceNamer
71+
agentIP net.IP
72+
6873
status Status
6974
}
7075

@@ -101,6 +106,13 @@ func FlowsAgent(cfg *Config) (*Flows, error) {
101106
informer = ifaces.NewWatcher(cfg.BuffersLength)
102107
}
103108

109+
alog.Debug("acquiring Agent IP")
110+
agentIP, err := fetchAgentIP(cfg)
111+
if err != nil {
112+
return nil, fmt.Errorf("acquiring Agent IP: %w", err)
113+
}
114+
alog.Debug("agent IP: " + agentIP.String())
115+
104116
// configure selected exporter
105117
exportFunc, err := buildFlowExporter(cfg)
106118
if err != nil {
@@ -119,14 +131,15 @@ func FlowsAgent(cfg *Config) (*Flows, error) {
119131
return nil, err
120132
}
121133

122-
return flowsAgent(cfg, informer, fetcher, exportFunc)
134+
return flowsAgent(cfg, informer, fetcher, exportFunc, agentIP)
123135
}
124136

125137
// flowsAgent is a private constructor with injectable dependencies, usable for tests
126138
func flowsAgent(cfg *Config,
127139
informer ifaces.Informer,
128140
fetcher ebpfFlowFetcher,
129141
exporter flowExporter,
142+
agentIP net.IP,
130143
) (*Flows, error) {
131144
// configure allow/deny interfaces filter
132145
filter, err := initInterfaceFilter(cfg.Interfaces, cfg.ExcludeInterfaces)
@@ -144,19 +157,21 @@ func flowsAgent(cfg *Config,
144157
return iface
145158
}
146159

147-
mapTracer := flow.NewMapTracer(fetcher, interfaceNamer, cfg.CacheActiveTimeout)
160+
mapTracer := flow.NewMapTracer(fetcher, cfg.CacheActiveTimeout)
148161
rbTracer := flow.NewRingBufTracer(fetcher, mapTracer, cfg.CacheActiveTimeout)
149162
accounter := flow.NewAccounter(
150-
cfg.CacheMaxFlows, cfg.CacheActiveTimeout, interfaceNamer, time.Now, monotime.Now)
163+
cfg.CacheMaxFlows, cfg.CacheActiveTimeout, time.Now, monotime.Now)
151164
return &Flows{
152-
ebpf: fetcher,
153-
exporter: exporter,
154-
interfaces: registerer,
155-
filter: filter,
156-
cfg: cfg,
157-
mapTracer: mapTracer,
158-
rbTracer: rbTracer,
159-
accounter: accounter,
165+
ebpf: fetcher,
166+
exporter: exporter,
167+
interfaces: registerer,
168+
filter: filter,
169+
cfg: cfg,
170+
mapTracer: mapTracer,
171+
rbTracer: rbTracer,
172+
accounter: accounter,
173+
agentIP: agentIP,
174+
interfaceNamer: interfaceNamer,
160175
}, nil
161176
}
162177

@@ -345,6 +360,9 @@ func (f *Flows) buildAndStartPipeline(ctx context.Context) (*node.Terminal, erro
345360
limiter := node.AsMiddle((&flow.CapacityLimiter{}).Limit,
346361
node.ChannelBufferLen(f.cfg.BuffersLength))
347362

363+
decorator := node.AsMiddle(flow.Decorate(f.agentIP, f.interfaceNamer),
364+
node.ChannelBufferLen(f.cfg.BuffersLength))
365+
348366
ebl := f.cfg.ExporterBufferLength
349367
if ebl == 0 {
350368
ebl = f.cfg.BuffersLength
@@ -365,7 +383,9 @@ func (f *Flows) buildAndStartPipeline(ctx context.Context) (*node.Terminal, erro
365383
mapTracer.SendsTo(limiter)
366384
accounter.SendsTo(limiter)
367385
}
368-
limiter.SendsTo(export)
386+
limiter.SendsTo(decorator)
387+
decorator.SendsTo(export)
388+
369389
alog.Debug("starting graph")
370390
mapTracer.Start()
371391
rbTracer.Start()

pkg/agent/agent_test.go

Lines changed: 27 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -2,6 +2,7 @@ package agent
22

33
import (
44
"context"
5+
"net"
56
"testing"
67
"time"
78

@@ -13,6 +14,8 @@ import (
1314
"github.com/stretchr/testify/require"
1415
)
1516

17+
var agentIP = "192.168.1.13"
18+
1619
const timeout = 2 * time.Second
1720

1821
func TestFlowsAgent_InvalidConfigs(t *testing.T) {
@@ -171,14 +174,37 @@ func TestFlowsAgent_Deduplication_None(t *testing.T) {
171174
assert.Lenf(t, key1Flows, 2, "both key1 flows should have been forwarded: %#v", key1Flows)
172175
}
173176

177+
func TestFlowsAgent_Decoration(t *testing.T) {
178+
export := testAgent(t, &Config{
179+
CacheActiveTimeout: 10 * time.Millisecond,
180+
CacheMaxFlows: 100,
181+
})
182+
183+
exported := export.Get(t, timeout)
184+
assert.Len(t, exported, 3)
185+
186+
// Tests that the decoration stage has been properly executed. It should
187+
// add the interface name and the agent IP
188+
for _, f := range exported {
189+
assert.Equal(t, agentIP, f.AgentIP.String())
190+
switch f.RecordKey {
191+
case key1, key2:
192+
assert.Equal(t, "foo", f.Interface)
193+
default:
194+
assert.Equal(t, "bar", f.Interface)
195+
}
196+
}
197+
}
198+
174199
func testAgent(t *testing.T, cfg *Config) *test.ExporterFake {
175200
ebpf := test.NewTracerFake()
176201
export := test.NewExporterFake()
177202
agent, err := flowsAgent(cfg,
178203
test.SliceInformerFake{
179204
{Name: "foo", Index: 3},
180205
{Name: "bar", Index: 4},
181-
}, ebpf, export.Export)
206+
}, ebpf, export.Export,
207+
net.ParseIP(agentIP))
182208
require.NoError(t, err)
183209

184210
go func() {

pkg/agent/config.go

Lines changed: 19 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -12,9 +12,28 @@ const (
1212
DirectionIngress = "ingress"
1313
DirectionEgress = "egress"
1414
DirectionBoth = "both"
15+
16+
IPTypeAny = "any"
17+
IPTypeIPV4 = "ipv4"
18+
IPTypeIPV6 = "ipv6"
19+
20+
IPIfaceExternal = "external"
21+
IPIfaceLocal = "local"
22+
IPIfaceNamedPrefix = "name:"
1523
)
1624

1725
type Config struct {
26+
// AgentIP allows overriding the reported Agent IP address on each flow.
27+
AgentIP string `env:"AGENT_IP"`
28+
// AgentIPIface specifies which interface should the agent pick the IP address from in order to
29+
// report it in the AgentIP field on each flow. Accepted values are: external (default), local,
30+
// or name:<interface name> (e.g. name:eth0).
31+
// If the AgentIP configuration property is set, this property has no effect.
32+
AgentIPIface string `env:"AGENT_IP_IFACE" envDefault:"external"`
33+
// AgentIPType specifies which type of IP address (IPv4 or IPv6 or any) should the agent report
34+
// in the AgentID field of each flow. Accepted values are: any (default), ipv4, ipv6.
35+
// If the AgentIP configuration property is set, this property has no effect.
36+
AgentIPType string `env:"AGENT_IP_TYPE" envDefault:"any"`
1837
// Export selects the flows' exporter protocol. Accepted values are: grpc (default) or kafka
1938
// or ipfix+udp or ipfix+tcp.
2039
Export string `env:"EXPORT" envDefault:"grpc"`

pkg/agent/ip.go

Lines changed: 137 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,137 @@
1+
package agent
2+
3+
import (
4+
"fmt"
5+
"net"
6+
"strings"
7+
)
8+
9+
// dependencies that can be injected from testing
10+
var (
11+
interfaceByName = net.InterfaceByName
12+
interfaceAddrs = net.InterfaceAddrs
13+
dial = net.Dial
14+
ifaceAddrs = func(iface *net.Interface) ([]net.Addr, error) {
15+
return iface.Addrs()
16+
}
17+
)
18+
19+
// fetchAgentIP guesses the non-loopback IP address of the Agent host, according to the
20+
// user-provided configuration:
21+
// - If AgentIP is provided, this value is used whatever is the real IP of the Agent.
22+
// - AgentIPIface specifies which interface this function should look into in order to pickup an address.
23+
// - AgentIPType specifies which type of IP address should the agent pickup ("any" to pickup whichever
24+
// ipv4 or ipv6 address is found first)
25+
func fetchAgentIP(cfg *Config) (net.IP, error) {
26+
if cfg.AgentIP != "" {
27+
if ip := net.ParseIP(cfg.AgentIP); ip != nil {
28+
return ip, nil
29+
}
30+
return nil, fmt.Errorf("can't parse provided IP %v", cfg.AgentIP)
31+
}
32+
33+
if cfg.AgentIPType != IPTypeAny &&
34+
cfg.AgentIPType != IPTypeIPV6 &&
35+
cfg.AgentIPType != IPTypeIPV4 {
36+
return nil, fmt.Errorf("invalid IP type %q. Valid values are: %s, %s or %s",
37+
cfg.AgentIPType, IPTypeIPV4, IPTypeIPV6, IPTypeAny)
38+
}
39+
40+
switch cfg.AgentIPIface {
41+
case IPIfaceLocal:
42+
return fromLocal(cfg.AgentIPType)
43+
case IPIfaceExternal:
44+
return fromExternal(cfg.AgentIPType)
45+
default:
46+
if !strings.HasPrefix(cfg.AgentIPIface, IPIfaceNamedPrefix) {
47+
return nil, fmt.Errorf(
48+
"invalid IP interface %q. Valid values are: %s, %s or %s<iface_name>",
49+
cfg.AgentIPIface, IPIfaceLocal, IPIfaceExternal, IPIfaceNamedPrefix)
50+
}
51+
return fromInterface(cfg.AgentIPIface[len(IPIfaceNamedPrefix):], cfg.AgentIPType)
52+
}
53+
}
54+
55+
func fromInterface(ifName, ipType string) (net.IP, error) {
56+
iface, err := interfaceByName(ifName)
57+
if err != nil {
58+
return nil, err
59+
}
60+
addrs, err := ifaceAddrs(iface)
61+
if err != nil {
62+
return nil, err
63+
}
64+
if ip, ok := findAddress(addrs, ipType); ok {
65+
return ip, nil
66+
}
67+
return nil, fmt.Errorf("no matching %q addresses found at interface %v", ipType, ifName)
68+
}
69+
70+
func fromLocal(ipType string) (net.IP, error) {
71+
addrs, err := interfaceAddrs()
72+
if err != nil {
73+
return nil, err
74+
}
75+
if ip, ok := findAddress(addrs, ipType); ok {
76+
return ip, nil
77+
}
78+
return nil, fmt.Errorf("no matching local %q addresses found", ipType)
79+
}
80+
81+
func fromExternal(ipType string) (net.IP, error) {
82+
// We don't really care about the existence or nonexistence of the addresses.
83+
// This will just establish an external dialer where we can pickup the external
84+
// host address
85+
addrStr := "8.8.8.8:80"
86+
if ipType == IPTypeIPV6 {
87+
addrStr = "[2001:4860:4860::8888]:80"
88+
}
89+
conn, err := dial("udp", addrStr)
90+
if err != nil {
91+
return nil, fmt.Errorf("can't establish an external connection %w", err)
92+
}
93+
if addr, ok := conn.LocalAddr().(*net.UDPAddr); !ok {
94+
return nil, fmt.Errorf("unexpected local address type %T for external connection",
95+
conn.LocalAddr())
96+
} else if ip, ok := getIP(addr.IP, ipType); ok {
97+
return ip, nil
98+
}
99+
return nil, fmt.Errorf("no matching %q external addresses found", ipType)
100+
}
101+
102+
func findAddress(addrs []net.Addr, ipType string) (net.IP, bool) {
103+
for _, addr := range addrs {
104+
if ipnet, ok := addr.(*net.IPNet); ok && ipnet != nil {
105+
if ip, ok := getIP(ipnet.IP, ipType); ok {
106+
return ip, true
107+
}
108+
}
109+
}
110+
return nil, false
111+
}
112+
113+
func getIP(pip net.IP, ipType string) (net.IP, bool) {
114+
if pip == nil || pip.IsLoopback() {
115+
return nil, false
116+
}
117+
switch ipType {
118+
case IPTypeIPV4:
119+
if ip := pip.To4(); ip != nil {
120+
return ip, true
121+
}
122+
case IPTypeIPV6:
123+
// as any IP4 address can be converted to IP6, we only return any
124+
// address that can be converted to IP6 but not to IP4
125+
if ip := pip.To16(); ip != nil && pip.To4() == nil {
126+
return ip, true
127+
}
128+
default: // Any
129+
if ip := pip.To4(); ip != nil {
130+
return ip, true
131+
}
132+
if ip := pip.To16(); ip != nil {
133+
return ip, true
134+
}
135+
}
136+
return nil, false
137+
}

0 commit comments

Comments
 (0)