Skip to content

Commit 5302974

Browse files
authored
Enable IPFIX export (#64)
* IPFIX exporter and collector with tcp/udp * Enable ipfix export at the agent using config * Add vmware go-ipfix modules * Add the vendor directories * Add e2e tests for IPFIX * Caching of ie values for performance * Fix inconsistent vendor after merge * Fix lint error
1 parent 8418b97 commit 5302974

File tree

165 files changed

+17695
-6
lines changed

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

165 files changed

+17695
-6
lines changed

e2e/cluster/kind.go

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -1,8 +1,8 @@
11
// Package cluster cointains the base setup for the test environment. This is:
2-
// - Deployment manifests for a base cluster: Loki, permissions, flowlogs-processor and the
3-
// local version of the agent. As well as the cluster configuration for ports exposure.
4-
// - Utility classes to programmatically manage the Kind cluster and some of its components
5-
// (e.g. Loki)
2+
// - Deployment manifests for a base cluster: Loki, permissions, flowlogs-processor and the
3+
// local version of the agent. As well as the cluster configuration for ports exposure.
4+
// - Utility classes to programmatically manage the Kind cluster and some of its components
5+
// (e.g. Loki)
66
package cluster
77

88
import (

e2e/ipfix/ipfix_test.go

Lines changed: 55 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,55 @@
1+
//go:build e2e
2+
3+
package basic
4+
5+
import (
6+
"path"
7+
"testing"
8+
"time"
9+
10+
"github.com/netobserv/netobserv-ebpf-agent/e2e/basic"
11+
"github.com/netobserv/netobserv-ebpf-agent/e2e/cluster"
12+
"github.com/sirupsen/logrus"
13+
)
14+
15+
const (
16+
clusterNamePrefix = "ipfix-test-cluster"
17+
testTimeout = 20 * time.Minute
18+
namespace = "default"
19+
)
20+
21+
var (
22+
testCluster *cluster.Kind
23+
)
24+
25+
func TestMain(m *testing.M) {
26+
logrus.StandardLogger().SetLevel(logrus.DebugLevel)
27+
28+
testCluster = cluster.NewKind(
29+
clusterNamePrefix+time.Now().Format("20060102-150405"),
30+
path.Join("..", ".."),
31+
cluster.Timeout(testTimeout),
32+
cluster.Override(cluster.FlowLogsPipeline, cluster.Deployment{
33+
Order: cluster.NetObservServices, ManifestFile: path.Join("manifests", "20-flp-transformer.yml"),
34+
}),
35+
cluster.Override(cluster.Agent, cluster.Deployment{
36+
Order: cluster.WithAgent, ManifestFile: path.Join("manifests", "30-agent.yml"),
37+
}),
38+
cluster.Deploy(cluster.Deployment{
39+
Order: cluster.AfterAgent,
40+
ManifestFile: path.Join("..", "basic", "manifests", "pods.yml"),
41+
}),
42+
)
43+
testCluster.Run(m)
44+
}
45+
46+
// TestBasicFlowCapture checks that the agent is correctly capturing the request/response flows
47+
// between the pods/service deployed from the manifests/pods.yml file
48+
func TestBasicFlowCapture(t *testing.T) {
49+
bt := basic.FlowCaptureTester{
50+
Cluster: testCluster,
51+
Namespace: namespace,
52+
Timeout: testTimeout,
53+
}
54+
bt.DoTest(t)
55+
}
Lines changed: 89 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,89 @@
1+
apiVersion: apps/v1
2+
kind: DaemonSet
3+
metadata:
4+
name: flp
5+
labels:
6+
k8s-app: flp
7+
spec:
8+
selector:
9+
matchLabels:
10+
k8s-app: flp
11+
template:
12+
metadata:
13+
labels:
14+
k8s-app: flp
15+
spec:
16+
serviceAccountName: ebpf-agent-test
17+
containers:
18+
- name: flp
19+
image: quay.io/netobserv/flowlogs-pipeline:main
20+
ports:
21+
- containerPort: 9999
22+
hostPort: 9999
23+
protocol: UDP
24+
args:
25+
- --config=/etc/flp/config.yaml
26+
volumeMounts:
27+
- mountPath: /etc/flp
28+
name: config-volume
29+
volumes:
30+
- name: config-volume
31+
configMap:
32+
name: flp-config
33+
---
34+
apiVersion: v1
35+
kind: ConfigMap
36+
metadata:
37+
name: flp-config
38+
data:
39+
config.yaml: |
40+
log-level: debug
41+
pipeline:
42+
- name: ingest
43+
- name: interface
44+
follows: ingest
45+
- name: enrich
46+
follows: interface
47+
- name: loki
48+
follows: enrich
49+
parameters:
50+
- name: ingest
51+
ingest:
52+
type: collector
53+
collector:
54+
hostName: 0.0.0.0
55+
port: 9999
56+
- name: interface
57+
transform:
58+
type: generic
59+
generic:
60+
policy: preserve_original_keys
61+
rules:
62+
- input: TimeReceived
63+
output: Interface
64+
- name: enrich
65+
transform:
66+
type: network
67+
network:
68+
rules:
69+
- input: SrcAddr
70+
output: SrcK8S
71+
type: "add_kubernetes"
72+
- input: DstAddr
73+
output: DstK8S
74+
type: "add_kubernetes"
75+
- name: loki
76+
write:
77+
type: loki
78+
loki:
79+
staticLabels:
80+
app: netobserv-flowcollector
81+
labels:
82+
- "SrcK8S_Namespace"
83+
- "SrcK8S_OwnerName"
84+
- "DstK8S_Namespace"
85+
- "DstK8S_OwnerName"
86+
- "FlowDirection"
87+
url: http://loki:3100
88+
timestampLabel: TimeFlowEndMs
89+
timestampScale: 1ms

e2e/ipfix/manifests/30-agent.yml

Lines changed: 36 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,36 @@
1+
apiVersion: apps/v1
2+
kind: DaemonSet
3+
metadata:
4+
name: netobserv-ebpf-agent
5+
labels:
6+
k8s-app: netobserv-ebpf-agent
7+
spec:
8+
selector:
9+
matchLabels:
10+
k8s-app: netobserv-ebpf-agent
11+
template:
12+
metadata:
13+
labels:
14+
k8s-app: netobserv-ebpf-agent
15+
spec:
16+
hostNetwork: true
17+
dnsPolicy: ClusterFirstWithHostNet
18+
containers:
19+
- name: netobserv-ebpf-agent
20+
image: localhost/ebpf-agent:test
21+
securityContext:
22+
privileged: true
23+
runAsUser: 0
24+
env:
25+
- name: EXPORT
26+
value: ipfix+udp
27+
- name: CACHE_ACTIVE_TIMEOUT
28+
value: 200ms
29+
- name: LOG_LEVEL
30+
value: debug
31+
- name: FLOWS_TARGET_HOST
32+
valueFrom:
33+
fieldRef:
34+
fieldPath: status.hostIP
35+
- name: FLOWS_TARGET_PORT
36+
value: "9999"
Lines changed: 143 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,143 @@
1+
package main
2+
3+
import (
4+
"bytes"
5+
"flag"
6+
"fmt"
7+
"log"
8+
"os"
9+
"os/signal"
10+
"syscall"
11+
"time"
12+
13+
ipfixCollector "github.com/vmware/go-ipfix/pkg/collector"
14+
"github.com/vmware/go-ipfix/pkg/entities"
15+
"github.com/vmware/go-ipfix/pkg/registry"
16+
)
17+
18+
const (
19+
hostPortIPv4 = "127.0.0.1:9999"
20+
hostPortIPv6 = "[::1]:0"
21+
)
22+
23+
var (
24+
transportType = flag.String("transport", "tcp", "transport type :tcp/udp")
25+
)
26+
27+
func printIPFIXMessage(msg *entities.Message) {
28+
var buf bytes.Buffer
29+
fmt.Fprint(&buf, "\nIPFIX-HDR:\n")
30+
fmt.Fprintf(&buf, " version: %v, Message Length: %v\n", msg.GetVersion(), msg.GetMessageLen())
31+
fmt.Fprintf(&buf, " Exported Time: %v (%v)\n", msg.GetExportTime(), time.Unix(int64(msg.GetExportTime()), 0))
32+
fmt.Fprintf(&buf, " Sequence No.: %v, Observation Domain ID: %v\n", msg.GetSequenceNum(), msg.GetObsDomainID())
33+
34+
set := msg.GetSet()
35+
if set.GetSetType() == entities.Template {
36+
fmt.Fprint(&buf, "TEMPLATE SET:\n")
37+
for i, record := range set.GetRecords() {
38+
fmt.Fprintf(&buf, " TEMPLATE RECORD-%d:\n", i)
39+
for _, ie := range record.GetOrderedElementList() {
40+
elem := ie.GetInfoElement()
41+
fmt.Fprintf(&buf, " %s: len=%d (enterprise ID = %d) \n", elem.Name, elem.Len, elem.EnterpriseId)
42+
}
43+
}
44+
} else {
45+
fmt.Fprint(&buf, "DATA SET:\n")
46+
for i, record := range set.GetRecords() {
47+
fmt.Fprintf(&buf, " DATA RECORD-%d:\n", i)
48+
for _, ie := range record.GetOrderedElementList() {
49+
elem := ie.GetInfoElement()
50+
switch elem.DataType {
51+
case entities.Unsigned8:
52+
fmt.Fprintf(&buf, " %s: %v \n", elem.Name, ie.GetUnsigned8Value())
53+
case entities.Unsigned16:
54+
fmt.Fprintf(&buf, " %s: %v \n", elem.Name, ie.GetUnsigned16Value())
55+
case entities.Unsigned32:
56+
fmt.Fprintf(&buf, " %s: %v \n", elem.Name, ie.GetUnsigned32Value())
57+
case entities.Unsigned64:
58+
fmt.Fprintf(&buf, " %s: %v \n", elem.Name, ie.GetUnsigned64Value())
59+
case entities.Signed8:
60+
fmt.Fprintf(&buf, " %s: %v \n", elem.Name, ie.GetSigned8Value())
61+
case entities.Signed16:
62+
fmt.Fprintf(&buf, " %s: %v \n", elem.Name, ie.GetSigned16Value())
63+
case entities.Signed32:
64+
fmt.Fprintf(&buf, " %s: %v \n", elem.Name, ie.GetSigned32Value())
65+
case entities.Signed64:
66+
fmt.Fprintf(&buf, " %s: %v \n", elem.Name, ie.GetSigned64Value())
67+
case entities.Float32:
68+
fmt.Fprintf(&buf, " %s: %v \n", elem.Name, ie.GetFloat32Value())
69+
case entities.Float64:
70+
fmt.Fprintf(&buf, " %s: %v \n", elem.Name, ie.GetFloat64Value())
71+
case entities.Boolean:
72+
fmt.Fprintf(&buf, " %s: %v \n", elem.Name, ie.GetBooleanValue())
73+
case entities.DateTimeSeconds:
74+
fmt.Fprintf(&buf, " %s: %v \n", elem.Name, ie.GetUnsigned32Value())
75+
case entities.DateTimeMilliseconds:
76+
fmt.Fprintf(&buf, " %s: %v \n", elem.Name, ie.GetUnsigned64Value())
77+
case entities.DateTimeMicroseconds, entities.DateTimeNanoseconds:
78+
err := fmt.Errorf("API does not support micro and nano seconds types yet")
79+
fmt.Fprintf(&buf, " %s: %v \n", elem.Name, err)
80+
case entities.MacAddress:
81+
fmt.Fprintf(&buf, " %s: %v \n", elem.Name, ie.GetMacAddressValue())
82+
case entities.Ipv4Address, entities.Ipv6Address:
83+
fmt.Fprintf(&buf, " %s: %v \n", elem.Name, ie.GetIPAddressValue())
84+
case entities.String:
85+
fmt.Fprintf(&buf, " %s: %v \n", elem.Name, ie.GetStringValue())
86+
default:
87+
err := fmt.Errorf("API supports only valid information elements with datatypes given in RFC7011")
88+
fmt.Fprintf(&buf, " %s: %v \n", elem.Name, err)
89+
}
90+
}
91+
}
92+
}
93+
log.Printf(buf.String())
94+
}
95+
96+
func signalHandler(stopCh chan struct{}, messageReceived chan *entities.Message) {
97+
signalCh := make(chan os.Signal, 1)
98+
signal.Notify(signalCh, syscall.SIGINT, syscall.SIGTERM, syscall.SIGQUIT)
99+
100+
for {
101+
select {
102+
case msg := <-messageReceived:
103+
printIPFIXMessage(msg)
104+
case <-signalCh:
105+
close(stopCh)
106+
return
107+
}
108+
}
109+
}
110+
111+
func main() {
112+
log.SetFlags(0)
113+
flag.Parse()
114+
115+
// Create exporter using local server info
116+
var input = ipfixCollector.CollectorInput{
117+
Address: hostPortIPv4,
118+
Protocol: *transportType,
119+
MaxBufferSize: 1024,
120+
}
121+
registry.LoadRegistry()
122+
cp, err := ipfixCollector.InitCollectingProcess(input)
123+
if err != nil {
124+
log.Fatalf("UDP Collecting Process does not start correctly: %v", err)
125+
}
126+
// Start listening to connections and receiving messages.
127+
messageReceived := make(chan *entities.Message)
128+
go func() {
129+
go cp.Start()
130+
msgChan := cp.GetMsgChan()
131+
for message := range msgChan {
132+
messageReceived <- message
133+
}
134+
}()
135+
136+
stopCh := make(chan struct{})
137+
go signalHandler(stopCh, messageReceived)
138+
139+
<-stopCh
140+
// Stop the collector process
141+
cp.Stop()
142+
log.Printf("Stopping IPFIX collector")
143+
}

go.mod

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -14,6 +14,7 @@ require (
1414
github.com/stretchr/testify v1.8.0
1515
github.com/vishvananda/netlink v1.1.0
1616
github.com/vladimirvivien/gexe v0.1.1
17+
github.com/vmware/go-ipfix v0.5.12
1718
golang.org/x/sys v0.0.0-20220520151302-bc2c85ada10a
1819
google.golang.org/grpc v1.45.0
1920
google.golang.org/protobuf v1.28.0
@@ -48,11 +49,16 @@ require (
4849
github.com/modern-go/reflect2 v1.0.2 // indirect
4950
github.com/munnerz/goautoneg v0.0.0-20191010083416-a7dc8b61c822 // indirect
5051
github.com/pierrec/lz4/v4 v4.1.15 // indirect
52+
github.com/pion/dtls/v2 v2.0.3 // indirect
53+
github.com/pion/logging v0.2.2 // indirect
54+
github.com/pion/transport v0.10.1 // indirect
55+
github.com/pion/udp v0.1.0 // indirect
5156
github.com/pkg/errors v0.9.1 // indirect
5257
github.com/pmezard/go-difflib v1.0.0 // indirect
5358
github.com/prometheus/client_golang v1.12.1 // indirect
5459
github.com/spf13/pflag v1.0.5 // indirect
5560
github.com/vishvananda/netns v0.0.0-20191106174202-0a2b9b5464df // indirect
61+
golang.org/x/crypto v0.0.0-20220622213112-05595931fe9d // indirect
5662
golang.org/x/net v0.0.0-20220706163947-c90051bbdb60 // indirect
5763
golang.org/x/oauth2 v0.0.0-20220411215720-9780585627b5 // indirect
5864
golang.org/x/term v0.0.0-20210927222741-03fcf44c2211 // indirect

0 commit comments

Comments
 (0)