Skip to content

Commit 743583a

Browse files
committed
Use patched go-ipfix, add write+read test...
The go-ipfix patch regularly recheck the udp connection, e.g. to account for restarted host with a different resolved IP ... also: - allow to provide data mapping for ingesting with goflow2 - rename API collector => ipfix - simplify ingest API with more default values - write ipfix: do not resend templates every second! Instead, use configurable periodicity
1 parent d56febe commit 743583a

File tree

24 files changed

+328
-219
lines changed

24 files changed

+328
-219
lines changed
Lines changed: 41 additions & 34 deletions
Original file line numberDiff line numberDiff line change
@@ -1,54 +1,45 @@
1-
apiVersion: apps/v1
2-
kind: Deployment
1+
apiVersion: v1
2+
kind: Pod
33
metadata:
4-
name: flowlogs-pipeline
4+
name: flp-ipfix-stdout
55
labels:
6-
app: flowlogs-pipeline
6+
app: flp-ipfix-stdout
77
spec:
8-
replicas: 1
9-
selector:
10-
matchLabels:
11-
app: flowlogs-pipeline
12-
template:
13-
metadata:
14-
labels:
15-
app: flowlogs-pipeline
16-
spec:
17-
containers:
18-
- name: flowlogs-pipeline
19-
image: quay.io/netobserv/flowlogs-pipeline:main
20-
args:
21-
- "--config=/etc/flowlogs-pipeline/config.yaml"
22-
ports:
23-
- containerPort: 2055
24-
imagePullPolicy: IfNotPresent
25-
volumeMounts:
26-
- name: configuration
27-
mountPath: "/etc/flowlogs-pipeline/"
28-
volumes:
8+
containers:
9+
- name: flp-ipfix-stdout
10+
image: quay.io/jotak/flowlogs-pipeline:main
11+
imagePullPolicy: Always
12+
args:
13+
- "--config=/etc/flowlogs-pipeline/config.yaml"
14+
ports:
15+
- containerPort: 2055
16+
volumeMounts:
2917
- name: configuration
30-
configMap:
31-
name: flp-config
18+
mountPath: "/etc/flowlogs-pipeline/"
19+
volumes:
20+
- name: configuration
21+
configMap:
22+
name: flp-ipfix-stdout-config
3223
---
3324
apiVersion: v1
3425
kind: Service
3526
metadata:
36-
name: flowlogs-pipeline
27+
name: flp-ipfix-stdout
3728
labels:
38-
app: flowlogs-pipeline
29+
app: flp-ipfix-stdout
3930
spec:
4031
ports:
4132
- port: 2055
4233
targetPort: 2055
4334
protocol: UDP
4435
name: ipfix
4536
selector:
46-
app: flowlogs-pipeline
37+
app: flp-ipfix-stdout
4738
---
4839
apiVersion: v1
4940
kind: ConfigMap
5041
metadata:
51-
name: flp-config
42+
name: flp-ipfix-stdout-config
5243
data:
5344
config.yaml: |
5445
log-level: info
@@ -59,10 +50,26 @@ data:
5950
parameters:
6051
- name: ingest
6152
ingest:
62-
type: collector
63-
collector:
64-
hostName: 0.0.0.0
53+
type: ipfix
54+
ipfix:
6555
port: 2055
56+
mapping:
57+
- penprovided: true
58+
pen: 2
59+
field: 7733
60+
destination: CustomBytes_1
61+
- penprovided: true
62+
pen: 2
63+
field: 7734
64+
destination: CustomBytes_2
65+
- penprovided: true
66+
pen: 2
67+
field: 7735
68+
destination: CustomBytes_3
69+
- penprovided: true
70+
pen: 2
71+
field: 7736
72+
destination: CustomBytes_4
6673
- name: write
6774
write:
6875
type: stdout

docs/api.md

Lines changed: 5 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -81,17 +81,17 @@ Following is the supported API format for S3 encode:
8181
secure: true for https, false for http (default: false)
8282
objectHeaderParameters: parameters to include in object header (key/value pairs)
8383
</pre>
84-
## Ingest collector API
84+
## Ingest NetFlow/IPFIX API
8585
Following is the supported API format for the NetFlow / IPFIX collector:
8686

8787
<pre>
88-
collector:
89-
hostName: the hostname to listen on
90-
port: the port number to listen on, for IPFIX/NetFlow v9. Omit or set to 0 to disable IPFIX/NetFlow v9 ingestion
88+
ipfix:
89+
hostName: the hostname to listen on; defaults to 0.0.0.0
90+
port: the port number to listen on, for IPFIX/NetFlow v9. Omit or set to 0 to disable IPFIX/NetFlow v9 ingestion. If both port and portLegacy are omitted, defaults to 2055
9191
portLegacy: the port number to listen on, for legacy NetFlow v5. Omit or set to 0 to disable NetFlow v5 ingestion
92-
batchMaxLen: the number of accumulated flows before being forwarded for processing
9392
workers: the number of netflow/ipfix decoding workers
9493
sockets: the number of listening sockets
94+
mapping: custom field mapping
9595
</pre>
9696
## Ingest Kafka API
9797
Following is the supported API format for the kafka ingest:

go.mod

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -170,3 +170,5 @@ require (
170170
sigs.k8s.io/structured-merge-diff/v4 v4.7.0 // indirect
171171
sigs.k8s.io/yaml v1.4.0 // indirect
172172
)
173+
174+
replace github.com/vmware/go-ipfix => github.com/jotak/go-ipfix v0.0.0-20250704140557-a3d746019169

go.sum

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -180,6 +180,8 @@ github.com/josharian/intern v1.0.0/go.mod h1:5DoeVV0s6jJacbCEi61lwdGj/aVlrQvzHFF
180180
github.com/josharian/native v1.0.0/go.mod h1:7X/raswPFr05uY3HiLlYeyQntB6OO7E/d2Cu7qoaN2w=
181181
github.com/josharian/native v1.1.0 h1:uuaP0hAbW7Y4l0ZRQ6C9zfb7Mg1mbFKry/xzDAfmtLA=
182182
github.com/josharian/native v1.1.0/go.mod h1:7X/raswPFr05uY3HiLlYeyQntB6OO7E/d2Cu7qoaN2w=
183+
github.com/jotak/go-ipfix v0.0.0-20250704140557-a3d746019169 h1:S3VDdAZlSH3xZ2ORXrIkWZ5US+RrfNKFU978woykoD0=
184+
github.com/jotak/go-ipfix v0.0.0-20250704140557-a3d746019169/go.mod h1:GgFbcmEGqMQfA7jDC9UVLKAelNh2sy1jsxyV7Tor3Ig=
183185
github.com/jpillora/backoff v1.0.0 h1:uvFg412JmmHBHw7iwprIxkPMI+sGQ4kzOWsMeHnm2EA=
184186
github.com/jpillora/backoff v1.0.0/go.mod h1:J/6gKK9jxlEcS3zixgDgUAsiuZ7yrSoa/FX5e0EB2j4=
185187
github.com/json-iterator/go v1.1.12 h1:PV8peI4a0ysnczrg+LtxykD8LfKY9ML6u2jnxaEnrnM=
@@ -380,8 +382,6 @@ github.com/vishvananda/netns v0.0.5 h1:DfiHV+j8bA32MFM7bfEunvT8IAqQ/NzSJHtcmW5zd
380382
github.com/vishvananda/netns v0.0.5/go.mod h1:SpkAiCQRtJ6TvvxPnOSyH3BMl6unz3xZlaprSwhNNJM=
381383
github.com/vladimirvivien/gexe v0.5.0 h1:AWBVaYnrTsGYBktXvcO0DfWPeSiZxn6mnQ5nvL+A1/A=
382384
github.com/vladimirvivien/gexe v0.5.0/go.mod h1:3gjgTqE2c0VyHnU5UOIwk7gyNzZDGulPb/DJPgcw64E=
383-
github.com/vmware/go-ipfix v0.15.0 h1:F/3BjFoODvCHEHYk36jy3TGmQzJ7rF2bC7ZG+c/lng8=
384-
github.com/vmware/go-ipfix v0.15.0/go.mod h1:GgFbcmEGqMQfA7jDC9UVLKAelNh2sy1jsxyV7Tor3Ig=
385385
github.com/wlynxg/anet v0.0.3/go.mod h1:eay5PRQr7fIVAMbTbchTnO9gG65Hg/uYGdc7mguHxoA=
386386
github.com/x448/float16 v0.8.4 h1:qLwI1I70+NjRFUR3zs1JPUCgaCXSh3SW62uAKT1mSBM=
387387
github.com/x448/float16 v0.8.4/go.mod h1:14CWIYCyZA/cWjXOioeEpHeN/83MdbZDRQHoFcYsOfg=

pkg/api/api.go

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -22,7 +22,7 @@ const (
2222
FileLoopType = "file_loop"
2323
FileChunksType = "file_chunks"
2424
SyntheticType = "synthetic"
25-
CollectorType = "collector"
25+
CollectorType = "collector" // deprecated: use 'ipfix' instead
2626
StdinType = "stdin"
2727
GRPCType = "grpc"
2828
FakeType = "fake"
@@ -53,7 +53,7 @@ type API struct {
5353
PromEncode PromEncode `yaml:"prom" doc:"## Prometheus encode API\nFollowing is the supported API format for prometheus encode:\n"`
5454
KafkaEncode EncodeKafka `yaml:"kafka" doc:"## Kafka encode API\nFollowing is the supported API format for kafka encode:\n"`
5555
S3Encode EncodeS3 `yaml:"s3" doc:"## S3 encode API\nFollowing is the supported API format for S3 encode:\n"`
56-
IngestCollector IngestCollector `yaml:"collector" doc:"## Ingest collector API\nFollowing is the supported API format for the NetFlow / IPFIX collector:\n"`
56+
IngestIpfix IngestIpfix `yaml:"ipfix" doc:"## Ingest NetFlow/IPFIX API\nFollowing is the supported API format for the NetFlow / IPFIX collector:\n"`
5757
IngestKafka IngestKafka `yaml:"kafka" doc:"## Ingest Kafka API\nFollowing is the supported API format for the kafka ingest:\n"`
5858
IngestGRPCProto IngestGRPCProto `yaml:"grpc" doc:"## Ingest GRPC from Network Observability eBPF Agent\nFollowing is the supported API format for the Network Observability eBPF ingest:\n"`
5959
IngestStdin IngestStdin `yaml:"stdin" doc:"## Ingest Standard Input\nFollowing is the supported API format for the standard input ingest:\n"`

pkg/api/ingest_collector.go

Lines changed: 0 additions & 27 deletions
This file was deleted.

pkg/api/ingest_ipfix.go

Lines changed: 64 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,64 @@
1+
/*
2+
* Copyright (C) 2022 IBM, Inc.
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* http://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*
16+
*/
17+
18+
package api
19+
20+
import (
21+
"fmt"
22+
23+
"github.com/netsampler/goflow2/producer"
24+
)
25+
26+
type IngestIpfix struct {
27+
HostName string `yaml:"hostName,omitempty" json:"hostName,omitempty" doc:"the hostname to listen on; defaults to 0.0.0.0"`
28+
Port uint `yaml:"port,omitempty" json:"port,omitempty" doc:"the port number to listen on, for IPFIX/NetFlow v9. Omit or set to 0 to disable IPFIX/NetFlow v9 ingestion. If both port and portLegacy are omitted, defaults to 2055"`
29+
PortLegacy uint `yaml:"portLegacy,omitempty" json:"portLegacy,omitempty" doc:"the port number to listen on, for legacy NetFlow v5. Omit or set to 0 to disable NetFlow v5 ingestion"`
30+
Workers uint `yaml:"workers,omitempty" json:"workers,omitempty" doc:"the number of netflow/ipfix decoding workers"`
31+
Sockets uint `yaml:"sockets,omitempty" json:"sockets,omitempty" doc:"the number of listening sockets"`
32+
Mapping []producer.NetFlowMapField `yaml:"mapping,omitempty" json:"mapping,omitempty" doc:"custom field mapping"`
33+
}
34+
35+
func (i *IngestIpfix) SetDefaults() {
36+
if i.HostName == "" {
37+
i.HostName = "0.0.0.0"
38+
}
39+
if i.Port == 0 && i.PortLegacy == 0 {
40+
i.Port = 2055
41+
}
42+
if i.Workers == 0 {
43+
i.Workers = 1
44+
}
45+
if i.Sockets == 0 {
46+
i.Sockets = 1
47+
}
48+
}
49+
50+
func (i *IngestIpfix) String() string {
51+
hasMapping := "no"
52+
if len(i.Mapping) > 0 {
53+
hasMapping = "yes"
54+
}
55+
return fmt.Sprintf(
56+
"hostname=%s, port=%d, portLegacy=%d, workers=%d, sockets=%d, mapping=%s",
57+
i.HostName,
58+
i.Port,
59+
i.PortLegacy,
60+
i.Workers,
61+
i.Sockets,
62+
hasMapping,
63+
)
64+
}

pkg/api/write_ipfix.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -9,7 +9,7 @@ type WriteIpfix struct {
99
TargetHost string `yaml:"targetHost,omitempty" json:"targetHost,omitempty" doc:"IPFIX Collector host target IP"`
1010
TargetPort int `yaml:"targetPort,omitempty" json:"targetPort,omitempty" doc:"IPFIX Collector host target port"`
1111
Transport string `yaml:"transport,omitempty" json:"transport,omitempty" doc:"Transport protocol (tcp/udp) to be used for the IPFIX connection"`
12-
EnterpriseID int `yaml:"enterpriseId,omitempty" json:"EnterpriseId,omitempty" doc:"Enterprise ID for exporting transformations"`
12+
EnterpriseID int `yaml:"enterpriseId,omitempty" json:"enterpriseId,omitempty" doc:"Enterprise ID for exporting transformations"`
1313
TplSendInterval Duration `yaml:"tplSendInterval,omitempty" json:"tplSendInterval,omitempty" doc:"Interval for resending templates to the collector (default: 1m)"`
1414
}
1515

pkg/confgen/confgen_test.go

Lines changed: 6 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -115,11 +115,11 @@ func Test_RunShortConfGen(t *testing.T) {
115115
)
116116

117117
// Expects ingest
118-
require.Equal(t, &api.IngestCollector{
118+
require.Equal(t, &api.IngestIpfix{
119119
HostName: "0.0.0.0",
120120
Port: 2155,
121121
PortLegacy: 2156,
122-
}, out.Parameters[0].Ingest.Collector)
122+
}, out.Parameters[0].Ingest.Ipfix)
123123

124124
// Expects transform network
125125
require.Len(t, out.Parameters[1].Transform.Network.Rules, 1)
@@ -207,11 +207,11 @@ func Test_RunConfGenNoAgg(t *testing.T) {
207207
)
208208

209209
// Expects ingest
210-
require.Equal(t, &api.IngestCollector{
210+
require.Equal(t, &api.IngestIpfix{
211211
HostName: "0.0.0.0",
212212
Port: 2155,
213213
PortLegacy: 2156,
214-
}, out.Parameters[0].Ingest.Collector)
214+
}, out.Parameters[0].Ingest.Ipfix)
215215

216216
// Expects transform network
217217
require.Len(t, out.Parameters[1].Transform.Network.Rules, 1)
@@ -295,11 +295,11 @@ func Test_RunLongConfGen(t *testing.T) {
295295
)
296296

297297
// Expects ingest
298-
require.Equal(t, &api.IngestCollector{
298+
require.Equal(t, &api.IngestIpfix{
299299
HostName: "0.0.0.0",
300300
Port: 2155,
301301
PortLegacy: 2156,
302-
}, out.Parameters[0].Ingest.Collector)
302+
}, out.Parameters[0].Ingest.Ipfix)
303303

304304
// Expects transform generic
305305
require.Equal(t, api.ReplaceKeys, out.Parameters[1].Transform.Generic.Policy)

pkg/confgen/config_test.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -35,7 +35,7 @@ func expectedConfig() *Config {
3535
},
3636
},
3737
Ingest: config.Ingest{
38-
Collector: &api.IngestCollector{
38+
Collector: &api.IngestIpfix{
3939
Port: 8888,
4040
},
4141
},

0 commit comments

Comments
 (0)