Skip to content

Commit d56febe

Browse files
committed
fix sending decoded tcpflags on ipfix
1 parent b044453 commit d56febe

File tree

5 files changed

+104
-78
lines changed

5 files changed

+104
-78
lines changed

docs/api.md

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -340,6 +340,7 @@ Following is the supported API format for writing to an IPFIX collector:
340340
targetPort: IPFIX Collector host target port
341341
transport: Transport protocol (tcp/udp) to be used for the IPFIX connection
342342
enterpriseId: Enterprise ID for exporting transformations
343+
tplSendInterval: Interval for resending templates to the collector (default: 1m)
343344
</pre>
344345
## Aggregate metrics API
345346
Following is the supported API format for specifying metrics aggregations:

pkg/pipeline/write/testnorace/write_ipfix_test.go

Lines changed: 23 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -11,6 +11,7 @@ import (
1111
"github.com/netobserv/flowlogs-pipeline/pkg/api"
1212
"github.com/netobserv/flowlogs-pipeline/pkg/config"
1313
"github.com/netobserv/flowlogs-pipeline/pkg/pipeline/write"
14+
"github.com/netobserv/flowlogs-pipeline/pkg/utils"
1415
"github.com/netobserv/netobserv-ebpf-agent/pkg/decode"
1516
"github.com/netobserv/netobserv-ebpf-agent/pkg/pbflow"
1617
"github.com/stretchr/testify/assert"
@@ -45,7 +46,7 @@ var (
4546
EthProtocol: 2048,
4647
Packets: 3,
4748
Transport: &pbflow.Transport{
48-
Protocol: 17,
49+
Protocol: 6,
4950
SrcPort: 23000,
5051
DstPort: 443,
5152
},
@@ -128,6 +129,9 @@ func TestEnrichedIPFIXFlow(t *testing.T) {
128129

129130
flow := decode.PBFlowToMap(&fullPBFlow)
130131

132+
// Convert TCP flags
133+
flow["Flags"] = utils.DecodeTCPFlags(uint(fullPBFlow.Flags))
134+
131135
// Add enrichment
132136
flow["SrcK8S_Name"] = "pod A"
133137
flow["SrcK8S_Namespace"] = "ns1"
@@ -160,8 +164,12 @@ func TestEnrichedIPFIXFlow(t *testing.T) {
160164
cp.Stop()
161165

162166
expectedFields := write.IPv4IANAFields
163-
expectedFields = append(expectedFields, write.KubeFields...)
164-
expectedFields = append(expectedFields, write.CustomNetworkFields...)
167+
for _, f := range write.KubeFields {
168+
expectedFields = append(expectedFields, f.Name)
169+
}
170+
for _, f := range write.CustomNetworkFields {
171+
expectedFields = append(expectedFields, f.Name)
172+
}
165173

166174
// Check template
167175
assert.Equal(t, uint16(10), tplv4Msg.GetVersion())
@@ -221,8 +229,12 @@ func TestEnrichedIPFIXPartialFlow(t *testing.T) {
221229
cp.Stop()
222230

223231
expectedFields := write.IPv4IANAFields
224-
expectedFields = append(expectedFields, write.KubeFields...)
225-
expectedFields = append(expectedFields, write.CustomNetworkFields...)
232+
for _, f := range write.KubeFields {
233+
expectedFields = append(expectedFields, f.Name)
234+
}
235+
for _, f := range write.CustomNetworkFields {
236+
expectedFields = append(expectedFields, f.Name)
237+
}
226238

227239
// Check template
228240
assert.Equal(t, uint16(10), tplv4Msg.GetVersion())
@@ -298,9 +310,9 @@ func TestBasicIPFIXFlow(t *testing.T) {
298310
}
299311

300312
// Make sure enriched fields are absent
301-
for _, name := range write.KubeFields {
302-
element, _, exist := record.GetInfoElementWithValue(name)
303-
assert.Falsef(t, exist, "element with name %s should NOT exist in the record", name)
313+
for _, f := range write.KubeFields {
314+
element, _, exist := record.GetInfoElementWithValue(f.Name)
315+
assert.Falsef(t, exist, "element with name %s should NOT exist in the record", f.Name)
304316
assert.Nil(t, element)
305317
}
306318
}
@@ -354,9 +366,9 @@ func TestICMPIPFIXFlow(t *testing.T) {
354366
}
355367

356368
// Make sure enriched fields are absent
357-
for _, name := range write.KubeFields {
358-
element, _, exist := record.GetInfoElementWithValue(name)
359-
assert.Falsef(t, exist, "element with name %s should NOT exist in the record", name)
369+
for _, f := range write.KubeFields {
370+
element, _, exist := record.GetInfoElementWithValue(f.Name)
371+
assert.Falsef(t, exist, "element with name %s should NOT exist in the record", f.Name)
360372
assert.Nil(t, element)
361373
}
362374
}

pkg/pipeline/write/write_ipfix.go

Lines changed: 59 additions & 65 deletions
Original file line numberDiff line numberDiff line change
@@ -26,7 +26,8 @@ import (
2626

2727
"github.com/netobserv/flowlogs-pipeline/pkg/api"
2828
"github.com/netobserv/flowlogs-pipeline/pkg/config"
29-
"github.com/netobserv/flowlogs-pipeline/pkg/pipeline/utils"
29+
putils "github.com/netobserv/flowlogs-pipeline/pkg/pipeline/utils"
30+
"github.com/netobserv/flowlogs-pipeline/pkg/utils"
3031
"github.com/sirupsen/logrus"
3132
"github.com/vmware/go-ipfix/pkg/entities"
3233
ipfixExporter "github.com/vmware/go-ipfix/pkg/exporter"
@@ -83,18 +84,18 @@ var (
8384
"icmpTypeIPv6",
8485
"icmpCodeIPv6",
8586
}, IANAFields...)
86-
KubeFields = []string{
87-
"sourcePodNamespace",
88-
"sourcePodName",
89-
"destinationPodNamespace",
90-
"destinationPodName",
91-
"sourceNodeName",
92-
"destinationNodeName",
87+
KubeFields = []entities.InfoElement{
88+
{Name: "sourcePodNamespace", ElementId: 7733, DataType: entities.String, Len: 65535},
89+
{Name: "sourcePodName", ElementId: 7734, DataType: entities.String, Len: 65535},
90+
{Name: "destinationPodNamespace", ElementId: 7735, DataType: entities.String, Len: 65535},
91+
{Name: "destinationPodName", ElementId: 7736, DataType: entities.String, Len: 65535},
92+
{Name: "sourceNodeName", ElementId: 7737, DataType: entities.String, Len: 65535},
93+
{Name: "destinationNodeName", ElementId: 7738, DataType: entities.String, Len: 65535},
9394
}
94-
CustomNetworkFields = []string{
95-
"timeFlowRttNs",
96-
"interfaces",
97-
"directions",
95+
CustomNetworkFields = []entities.InfoElement{
96+
{Name: "timeFlowRttNs", ElementId: 7740, DataType: entities.Unsigned64, Len: 8},
97+
{Name: "interfaces", ElementId: 7741, DataType: entities.String, Len: 65535},
98+
{Name: "directions", ElementId: 7742, DataType: entities.String, Len: 65535},
9899
}
99100

100101
MapIPFIXKeys = map[string]FieldMap{
@@ -224,9 +225,38 @@ var (
224225
},
225226
},
226227
"tcpControlBits": {
227-
Key: "Flags",
228-
Getter: func(elt entities.InfoElementWithValue) any { return elt.GetUnsigned16Value() },
229-
Setter: func(elt entities.InfoElementWithValue, rec any) { elt.SetUnsigned16Value(rec.(uint16)) },
228+
Key: "Flags",
229+
Getter: func(elt entities.InfoElementWithValue) any {
230+
return elt.GetUnsigned16Value()
231+
},
232+
Setter: func(elt entities.InfoElementWithValue, rec any) {
233+
if decoded, isDecoded := rec.([]string); isDecoded {
234+
// reencode for ipfix
235+
reencoded := utils.EncodeTCPFlags(decoded)
236+
elt.SetUnsigned16Value(uint16(reencoded))
237+
} else if raw, isRaw := rec.(uint16); isRaw {
238+
elt.SetUnsigned16Value(raw)
239+
}
240+
},
241+
Matcher: func(elt entities.InfoElementWithValue, expected any) bool {
242+
received := elt.GetUnsigned16Value()
243+
if expSlice, isSlice := expected.([]string); isSlice {
244+
decoded := utils.DecodeTCPFlags(uint(received))
245+
if len(expSlice) != len(decoded) {
246+
return false
247+
}
248+
for i := 0; i < len(expSlice); i++ {
249+
if expSlice[i] != decoded[i] {
250+
return false
251+
}
252+
}
253+
return true
254+
}
255+
if expected == nil {
256+
return received == 0
257+
}
258+
return received == expected
259+
},
230260
},
231261
"icmpTypeIPv4": {
232262
Key: "IcmpType",
@@ -312,7 +342,7 @@ func addElementToTemplate(elementName string, value []byte, elements *[]entities
312342

313343
func addNetworkEnrichmentToTemplate(elements *[]entities.InfoElementWithValue, registryID uint32) error {
314344
for _, field := range CustomNetworkFields {
315-
if err := addElementToTemplate(field, nil, elements, registryID); err != nil {
345+
if err := addElementToTemplate(field.Name, nil, elements, registryID); err != nil {
316346
return err
317347
}
318348
}
@@ -321,7 +351,7 @@ func addNetworkEnrichmentToTemplate(elements *[]entities.InfoElementWithValue, r
321351

322352
func addKubeContextToTemplate(elements *[]entities.InfoElementWithValue, registryID uint32) error {
323353
for _, field := range KubeFields {
324-
if err := addElementToTemplate(field, nil, elements, registryID); err != nil {
354+
if err := addElementToTemplate(field.Name, nil, elements, registryID); err != nil {
325355
return err
326356
}
327357
}
@@ -334,50 +364,16 @@ func loadCustomRegistry(enterpriseID uint32) error {
334364
ilog.WithError(err).Errorf("Failed to initialize registry")
335365
return err
336366
}
337-
err = registry.PutInfoElement((*entities.NewInfoElement("sourcePodNamespace", 7733, entities.String, enterpriseID, 65535)), enterpriseID)
338-
if err != nil {
339-
ilog.WithError(err).Errorf("Failed to register element")
340-
return err
341-
}
342-
err = registry.PutInfoElement((*entities.NewInfoElement("sourcePodName", 7734, entities.String, enterpriseID, 65535)), enterpriseID)
343-
if err != nil {
344-
ilog.WithError(err).Errorf("Failed to register element")
345-
return err
346-
}
347-
err = registry.PutInfoElement((*entities.NewInfoElement("destinationPodNamespace", 7735, entities.String, enterpriseID, 65535)), enterpriseID)
348-
if err != nil {
349-
ilog.WithError(err).Errorf("Failed to register element")
350-
return err
351-
}
352-
err = registry.PutInfoElement((*entities.NewInfoElement("destinationPodName", 7736, entities.String, enterpriseID, 65535)), enterpriseID)
353-
if err != nil {
354-
ilog.WithError(err).Errorf("Failed to register element")
355-
return err
356-
}
357-
err = registry.PutInfoElement((*entities.NewInfoElement("sourceNodeName", 7737, entities.String, enterpriseID, 65535)), enterpriseID)
358-
if err != nil {
359-
ilog.WithError(err).Errorf("Failed to register element")
360-
return err
361-
}
362-
err = registry.PutInfoElement((*entities.NewInfoElement("destinationNodeName", 7738, entities.String, enterpriseID, 65535)), enterpriseID)
363-
if err != nil {
364-
ilog.WithError(err).Errorf("Failed to register element")
365-
return err
366-
}
367-
err = registry.PutInfoElement((*entities.NewInfoElement("timeFlowRttNs", 7740, entities.Unsigned64, enterpriseID, 8)), enterpriseID)
368-
if err != nil {
369-
ilog.WithError(err).Errorf("Failed to register element")
370-
return err
371-
}
372-
err = registry.PutInfoElement((*entities.NewInfoElement("interfaces", 7741, entities.String, enterpriseID, 65535)), enterpriseID)
373-
if err != nil {
374-
ilog.WithError(err).Errorf("Failed to register element")
375-
return err
376-
}
377-
err = registry.PutInfoElement((*entities.NewInfoElement("directions", 7742, entities.String, enterpriseID, 65535)), enterpriseID)
378-
if err != nil {
379-
ilog.WithError(err).Errorf("Failed to register element")
380-
return err
367+
allCustom := []entities.InfoElement{}
368+
allCustom = append(allCustom, KubeFields...)
369+
allCustom = append(allCustom, CustomNetworkFields...)
370+
for _, f := range allCustom {
371+
f.EnterpriseId = enterpriseID
372+
err = registry.PutInfoElement(f, enterpriseID)
373+
if err != nil {
374+
ilog.WithError(err).Errorf("Failed to register element: %s", f.Name)
375+
return err
376+
}
381377
}
382378
return nil
383379
}
@@ -386,7 +382,6 @@ func prepareTemplate(templateID uint16, enrichEnterpriseID uint32, fields []stri
386382
templateSet := entities.NewSet(false)
387383
err := templateSet.PrepareSet(entities.Template, templateID)
388384
if err != nil {
389-
ilog.WithError(err).Error("prepareTemplate: failed to prepare set")
390385
return nil, nil, err
391386
}
392387
elements := make([]entities.InfoElementWithValue, 0)
@@ -409,7 +404,6 @@ func prepareTemplate(templateID uint16, enrichEnterpriseID uint32, fields []stri
409404
}
410405
err = templateSet.AddRecord(elements, templateID)
411406
if err != nil {
412-
ilog.WithError(err).Error("prepareTemplate: failed to add record")
413407
return nil, nil, err
414408
}
415409

@@ -507,7 +501,7 @@ func (t *writeIpfix) startTemplateSenderLoop(interval time.Duration, exitChan <-
507501
for {
508502
select {
509503
case <-exitChan:
510-
log.Debugf("exiting sendTemplates because of signal")
504+
log.Infof("Exit signal received, stopping templates sending loop")
511505
return
512506
case <-ticker.C:
513507
if _, err := t.exporter.SendSet(t.tplV4); err != nil {
@@ -581,7 +575,7 @@ func NewWriteIpfix(params config.StageParam) (Writer, error) {
581575
entitiesV6: entitiesV6,
582576
}
583577

584-
writeIpfix.startTemplateSenderLoop(ipfixConfigIn.TplSendInterval.Duration, utils.ExitChannel())
578+
writeIpfix.startTemplateSenderLoop(ipfixConfigIn.TplSendInterval.Duration, putils.ExitChannel())
585579

586580
return writeIpfix, nil
587581
}

pkg/pipeline/write/write_stdout.go

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -40,15 +40,16 @@ func (t *writeStdout) Write(v config.GenericMap) {
4040
}
4141

4242
func formatter(format string, reorder bool) func(config.GenericMap) string {
43-
if format == "json" {
43+
switch format {
44+
case "json":
4445
jconf := jsonIter.Config{
4546
SortMapKeys: reorder,
4647
}.Froze()
4748
return func(v config.GenericMap) string {
4849
b, _ := jconf.Marshal(v)
4950
return string(b)
5051
}
51-
} else if format == "fields" {
52+
case "fields":
5253
return func(v config.GenericMap) string {
5354
var sb strings.Builder
5455
var order sort.StringSlice

pkg/utils/tcp_flags.go

Lines changed: 18 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,24 @@ var tcpFlags = []tcpFlag{
1818
{value: 512, name: "FIN_ACK"},
1919
{value: 1024, name: "RST_ACK"},
2020
}
21+
var flagsMap map[string]uint
22+
23+
func init() {
24+
flagsMap = make(map[string]uint, len(tcpFlags))
25+
for _, flag := range tcpFlags {
26+
flagsMap[flag.name] = flag.value
27+
}
28+
}
29+
30+
func EncodeTCPFlags(flags []string) uint {
31+
var bf uint
32+
for _, flag := range flags {
33+
if v, ok := flagsMap[flag]; ok {
34+
bf |= v
35+
}
36+
}
37+
return bf
38+
}
2139

2240
func DecodeTCPFlags(bitfield uint) []string {
2341
var values []string

0 commit comments

Comments
 (0)