Skip to content

Commit b74bf07

Browse files
authored
Conntrack: Add RecordType and HashId fields (#230)
* Add hash id to records * Add type field * Add a common internal prefix to field names * Change prefix of internal fields to underscore "_"
1 parent 2011d0a commit b74bf07

File tree

4 files changed

+149
-60
lines changed

4 files changed

+149
-60
lines changed

pkg/api/conn_track.go

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,11 @@ package api
1919

2020
import "time"
2121

22+
const (
23+
HashIdFieldName = "_HashId"
24+
RecordTypeFieldName = "_RecordType"
25+
)
26+
2227
type ConnTrack struct {
2328
// TODO: should by a pointer instead?
2429
KeyDefinition KeyDefinition `yaml:"keyDefinition,omitempty" doc:"fields that are used to identify the connection"`

pkg/pipeline/conntrack/conntrack.go

Lines changed: 21 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -22,6 +22,7 @@ import (
2222
"fmt"
2323
"hash"
2424
"hash/fnv"
25+
"strconv"
2526
"time"
2627

2728
"github.com/benbjohnson/clock"
@@ -141,14 +142,20 @@ func (ct *conntrackImpl) Track(flowLogs []config.GenericMap) []config.GenericMap
141142
ct.connStore.addConnection(computedHash.hashTotal, conn)
142143
ct.updateConnection(conn, fl, computedHash)
143144
if ct.shouldOutputNewConnection {
144-
outputRecords = append(outputRecords, conn.toGenericMap())
145+
record := conn.toGenericMap()
146+
addHashField(record, computedHash.hashTotal)
147+
addTypeField(record, api.ConnTrackOutputRecordTypeName("NewConnection"))
148+
outputRecords = append(outputRecords, record)
145149
}
146150
} else {
147151
ct.updateConnection(conn, fl, computedHash)
148152
}
149153

150154
if ct.shouldOutputFlowLogs {
151-
outputRecords = append(outputRecords, fl)
155+
record := fl.Copy()
156+
addHashField(record, computedHash.hashTotal)
157+
addTypeField(record, api.ConnTrackOutputRecordTypeName("FlowLog"))
158+
outputRecords = append(outputRecords, record)
152159
}
153160
}
154161

@@ -167,7 +174,10 @@ func (ct *conntrackImpl) popEndConnections() []config.GenericMap {
167174
lastUpdate := conn.getLastUpdate()
168175
if lastUpdate.Before(expireTime) {
169176
// The last update time of this connection is too old. We want to pop it.
170-
outputRecords = append(outputRecords, conn.toGenericMap())
177+
record := conn.toGenericMap()
178+
addHashField(record, conn.getHash().hashTotal)
179+
addTypeField(record, api.ConnTrackOutputRecordTypeName("EndConnection"))
180+
outputRecords = append(outputRecords, record)
171181
shouldDelete, shouldStop = true, false
172182
} else {
173183
// No more expired connections
@@ -238,3 +248,11 @@ func NewConnectionTrack(config api.ConnTrack, clock clock.Clock) (ConnectionTrac
238248
}
239249
return conntrack, nil
240250
}
251+
252+
func addHashField(record config.GenericMap, hashId uint64) {
253+
record[api.HashIdFieldName] = strconv.FormatUint(hashId, 16)
254+
}
255+
256+
func addTypeField(record config.GenericMap, recordType string) {
257+
record[api.RecordTypeFieldName] = recordType
258+
}

pkg/pipeline/conntrack/conntrack_test.go

Lines changed: 34 additions & 56 deletions
Original file line numberDiff line numberDiff line change
@@ -77,40 +77,15 @@ func buildMockConnTrackConfig(isBidirectional bool, outputRecordType []string) *
7777
}
7878
}
7979

80-
func newMockRecordConnAB(srcIP string, srcPort int, dstIP string, dstPort int, protocol int, bytesAB, bytesBA, packetsAB, packetsBA, numFlowLogs float64) config.GenericMap {
81-
return config.GenericMap{
82-
"SrcAddr": srcIP,
83-
"SrcPort": srcPort,
84-
"DstAddr": dstIP,
85-
"DstPort": dstPort,
86-
"Proto": protocol,
87-
"Bytes_AB": bytesAB,
88-
"Bytes_BA": bytesBA,
89-
"Packets_AB": packetsAB,
90-
"Packets_BA": packetsBA,
91-
"numFlowLogs": numFlowLogs,
92-
}
93-
}
94-
95-
func newMockRecordConn(srcIP string, srcPort int, dstIP string, dstPort int, protocol int, bytes, packets, numFlowLogs float64) config.GenericMap {
96-
return config.GenericMap{
97-
"SrcAddr": srcIP,
98-
"SrcPort": srcPort,
99-
"DstAddr": dstIP,
100-
"DstPort": dstPort,
101-
"Proto": protocol,
102-
"Bytes": bytes,
103-
"Packets": packets,
104-
"numFlowLogs": numFlowLogs,
105-
}
106-
}
107-
10880
func TestTrack(t *testing.T) {
10981
ipA := "10.0.0.1"
11082
ipB := "10.0.0.2"
11183
portA := 9001
11284
portB := 9002
11385
protocol := 6
86+
hashId := "705baa5149302fa1"
87+
hashIdAB := "705baa5149302fa1"
88+
hashIdBA := "cc40f571f40f3111"
11489

11590
flAB1 := newMockFlowLog(ipA, portA, ipB, portB, protocol, 111, 11)
11691
flAB2 := newMockFlowLog(ipA, portA, ipB, portB, protocol, 222, 22)
@@ -127,41 +102,41 @@ func TestTrack(t *testing.T) {
127102
buildMockConnTrackConfig(true, []string{"newConnection"}),
128103
[]config.GenericMap{flAB1, flAB2, flBA3, flBA4},
129104
[]config.GenericMap{
130-
newMockRecordConnAB(ipA, portA, ipB, portB, protocol, 111, 0, 11, 0, 1),
105+
newMockRecordNewConnAB(ipA, portA, ipB, portB, protocol, 111, 0, 11, 0, 1).withHash(hashId).get(),
131106
},
132107
},
133108
{
134109
"bidirectional, output new connection and flow log",
135110
buildMockConnTrackConfig(true, []string{"newConnection", "flowLog"}),
136111
[]config.GenericMap{flAB1, flAB2, flBA3, flBA4},
137112
[]config.GenericMap{
138-
newMockRecordConnAB(ipA, portA, ipB, portB, protocol, 111, 0, 11, 0, 1),
139-
flAB1,
140-
flAB2,
141-
flBA3,
142-
flBA4,
113+
newMockRecordNewConnAB(ipA, portA, ipB, portB, protocol, 111, 0, 11, 0, 1).withHash(hashId).get(),
114+
newMockRecordFromFlowLog(flAB1).withHash(hashId).get(),
115+
newMockRecordFromFlowLog(flAB2).withHash(hashId).get(),
116+
newMockRecordFromFlowLog(flBA3).withHash(hashId).get(),
117+
newMockRecordFromFlowLog(flBA4).withHash(hashId).get(),
143118
},
144119
},
145120
{
146121
"unidirectional, output new connection",
147122
buildMockConnTrackConfig(false, []string{"newConnection"}),
148123
[]config.GenericMap{flAB1, flAB2, flBA3, flBA4},
149124
[]config.GenericMap{
150-
newMockRecordConn(ipA, portA, ipB, portB, protocol, 111, 11, 1),
151-
newMockRecordConn(ipB, portB, ipA, portA, protocol, 333, 33, 1),
125+
newMockRecordNewConn(ipA, portA, ipB, portB, protocol, 111, 11, 1).withHash(hashIdAB).get(),
126+
newMockRecordNewConn(ipB, portB, ipA, portA, protocol, 333, 33, 1).withHash(hashIdBA).get(),
152127
},
153128
},
154129
{
155130
"unidirectional, output new connection and flow log",
156131
buildMockConnTrackConfig(false, []string{"newConnection", "flowLog"}),
157132
[]config.GenericMap{flAB1, flAB2, flBA3, flBA4},
158133
[]config.GenericMap{
159-
newMockRecordConn(ipA, portA, ipB, portB, protocol, 111, 11, 1),
160-
flAB1,
161-
flAB2,
162-
newMockRecordConn(ipB, portB, ipA, portA, protocol, 333, 33, 1),
163-
flBA3,
164-
flBA4,
134+
newMockRecordNewConn(ipA, portA, ipB, portB, protocol, 111, 11, 1).withHash(hashIdAB).get(),
135+
newMockRecordFromFlowLog(flAB1).withHash(hashIdAB).get(),
136+
newMockRecordFromFlowLog(flAB2).withHash(hashIdAB).get(),
137+
newMockRecordNewConn(ipB, portB, ipA, portA, protocol, 333, 33, 1).withHash(hashIdBA).get(),
138+
newMockRecordFromFlowLog(flBA3).withHash(hashIdBA).get(),
139+
newMockRecordFromFlowLog(flBA4).withHash(hashIdBA).get(),
165140
},
166141
},
167142
}
@@ -191,6 +166,7 @@ func TestEndConn_Bidirectional(t *testing.T) {
191166
portA := 9001
192167
portB := 9002
193168
protocol := 6
169+
hashId := "705baa5149302fa1"
194170

195171
flAB1 := newMockFlowLog(ipA, portA, ipB, portB, protocol, 111, 11)
196172
flAB2 := newMockFlowLog(ipA, portA, ipB, portB, protocol, 222, 22)
@@ -208,25 +184,25 @@ func TestEndConn_Bidirectional(t *testing.T) {
208184
startTime.Add(0 * time.Second),
209185
[]config.GenericMap{flAB1},
210186
[]config.GenericMap{
211-
newMockRecordConnAB(ipA, portA, ipB, portB, protocol, 111, 0, 11, 0, 1),
212-
flAB1,
187+
newMockRecordNewConnAB(ipA, portA, ipB, portB, protocol, 111, 0, 11, 0, 1).withHash(hashId).get(),
188+
newMockRecordFromFlowLog(flAB1).withHash(hashId).get(),
213189
},
214190
},
215191
{
216192
"10s: flow AB and BA",
217193
startTime.Add(10 * time.Second),
218194
[]config.GenericMap{flAB2, flBA3},
219195
[]config.GenericMap{
220-
flAB2,
221-
flBA3,
196+
newMockRecordFromFlowLog(flAB2).withHash(hashId).get(),
197+
newMockRecordFromFlowLog(flBA3).withHash(hashId).get(),
222198
},
223199
},
224200
{
225201
"20s: flow BA",
226202
startTime.Add(20 * time.Second),
227203
[]config.GenericMap{flBA4},
228204
[]config.GenericMap{
229-
flBA4,
205+
newMockRecordFromFlowLog(flBA4).withHash(hashId).get(),
230206
},
231207
},
232208
{
@@ -240,7 +216,7 @@ func TestEndConn_Bidirectional(t *testing.T) {
240216
startTime.Add(51 * time.Second),
241217
nil,
242218
[]config.GenericMap{
243-
newMockRecordConnAB(ipA, portA, ipB, portB, protocol, 333, 777, 33, 77, 4),
219+
newMockRecordEndConnAB(ipA, portA, ipB, portB, protocol, 333, 777, 33, 77, 4).withHash(hashId).get(),
244220
},
245221
},
246222
}
@@ -272,6 +248,8 @@ func TestEndConn_Unidirectional(t *testing.T) {
272248
portA := 9001
273249
portB := 9002
274250
protocol := 6
251+
hashIdAB := "705baa5149302fa1"
252+
hashIdBA := "cc40f571f40f3111"
275253

276254
flAB1 := newMockFlowLog(ipA, portA, ipB, portB, protocol, 111, 11)
277255
flAB2 := newMockFlowLog(ipA, portA, ipB, portB, protocol, 222, 22)
@@ -289,26 +267,26 @@ func TestEndConn_Unidirectional(t *testing.T) {
289267
startTime.Add(0 * time.Second),
290268
[]config.GenericMap{flAB1},
291269
[]config.GenericMap{
292-
newMockRecordConn(ipA, portA, ipB, portB, protocol, 111, 11, 1),
293-
flAB1,
270+
newMockRecordNewConn(ipA, portA, ipB, portB, protocol, 111, 11, 1).withHash(hashIdAB).get(),
271+
newMockRecordFromFlowLog(flAB1).withHash(hashIdAB).get(),
294272
},
295273
},
296274
{
297275
"10s: flow AB and BA",
298276
startTime.Add(10 * time.Second),
299277
[]config.GenericMap{flAB2, flBA3},
300278
[]config.GenericMap{
301-
flAB2,
302-
newMockRecordConn(ipB, portB, ipA, portA, protocol, 333, 33, 1),
303-
flBA3,
279+
newMockRecordFromFlowLog(flAB2).withHash(hashIdAB).get(),
280+
newMockRecordNewConn(ipB, portB, ipA, portA, protocol, 333, 33, 1).withHash(hashIdBA).get(),
281+
newMockRecordFromFlowLog(flBA3).withHash(hashIdBA).get(),
304282
},
305283
},
306284
{
307285
"20s: flow BA",
308286
startTime.Add(20 * time.Second),
309287
[]config.GenericMap{flBA4},
310288
[]config.GenericMap{
311-
flBA4,
289+
newMockRecordFromFlowLog(flBA4).withHash(hashIdBA).get(),
312290
},
313291
},
314292
{
@@ -322,7 +300,7 @@ func TestEndConn_Unidirectional(t *testing.T) {
322300
startTime.Add(41 * time.Second),
323301
nil,
324302
[]config.GenericMap{
325-
newMockRecordConn(ipA, portA, ipB, portB, protocol, 333, 33, 2),
303+
newMockRecordEndConn(ipA, portA, ipB, portB, protocol, 333, 33, 2).withHash(hashIdAB).get(),
326304
},
327305
},
328306
{
@@ -336,7 +314,7 @@ func TestEndConn_Unidirectional(t *testing.T) {
336314
startTime.Add(51 * time.Second),
337315
nil,
338316
[]config.GenericMap{
339-
newMockRecordConn(ipB, portB, ipA, portA, protocol, 777, 77, 2),
317+
newMockRecordEndConn(ipB, portB, ipA, portA, protocol, 777, 77, 2).withHash(hashIdBA).get(),
340318
},
341319
},
342320
}

pkg/pipeline/conntrack/utils_test.go

Lines changed: 89 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,9 @@
11
package conntrack
22

3-
import "github.com/netobserv/flowlogs-pipeline/pkg/config"
3+
import (
4+
"github.com/netobserv/flowlogs-pipeline/pkg/api"
5+
"github.com/netobserv/flowlogs-pipeline/pkg/config"
6+
)
47

58
func newMockFlowLog(srcIP string, srcPort int, dstIP string, dstPort int, protocol int, bytes, packets int) config.GenericMap {
69
return config.GenericMap{
@@ -13,3 +16,88 @@ func newMockFlowLog(srcIP string, srcPort int, dstIP string, dstPort int, protoc
1316
"Packets": packets,
1417
}
1518
}
19+
20+
type mockRecord struct {
21+
record config.GenericMap
22+
}
23+
24+
func newMockRecordFromFlowLog(fl config.GenericMap) *mockRecord {
25+
mock := &mockRecord{
26+
record: config.GenericMap{},
27+
}
28+
for k, v := range fl {
29+
mock.record[k] = v
30+
}
31+
mock.withType("flowLog")
32+
return mock
33+
}
34+
35+
func newMockRecordConnAB(srcIP string, srcPort int, dstIP string, dstPort int, protocol int, bytesAB, bytesBA, packetsAB, packetsBA, numFlowLogs float64) *mockRecord {
36+
mock := &mockRecord{
37+
record: config.GenericMap{
38+
"SrcAddr": srcIP,
39+
"SrcPort": srcPort,
40+
"DstAddr": dstIP,
41+
"DstPort": dstPort,
42+
"Proto": protocol,
43+
"Bytes_AB": bytesAB,
44+
"Bytes_BA": bytesBA,
45+
"Packets_AB": packetsAB,
46+
"Packets_BA": packetsBA,
47+
"numFlowLogs": numFlowLogs,
48+
},
49+
}
50+
return mock
51+
}
52+
53+
func newMockRecordNewConnAB(srcIP string, srcPort int, dstIP string, dstPort int, protocol int, bytesAB, bytesBA, packetsAB, packetsBA, numFlowLogs float64) *mockRecord {
54+
return newMockRecordConnAB(srcIP, srcPort, dstIP, dstPort, protocol, bytesAB, bytesBA, packetsAB, packetsBA, numFlowLogs).
55+
withType("newConnection")
56+
57+
}
58+
59+
func newMockRecordEndConnAB(srcIP string, srcPort int, dstIP string, dstPort int, protocol int, bytesAB, bytesBA, packetsAB, packetsBA, numFlowLogs float64) *mockRecord {
60+
return newMockRecordConnAB(srcIP, srcPort, dstIP, dstPort, protocol, bytesAB, bytesBA, packetsAB, packetsBA, numFlowLogs).
61+
withType("endConnection")
62+
63+
}
64+
65+
func newMockRecordConn(srcIP string, srcPort int, dstIP string, dstPort int, protocol int, bytes, packets, numFlowLogs float64) *mockRecord {
66+
mock := &mockRecord{
67+
record: config.GenericMap{
68+
"SrcAddr": srcIP,
69+
"SrcPort": srcPort,
70+
"DstAddr": dstIP,
71+
"DstPort": dstPort,
72+
"Proto": protocol,
73+
"Bytes": bytes,
74+
"Packets": packets,
75+
"numFlowLogs": numFlowLogs,
76+
},
77+
}
78+
return mock
79+
}
80+
81+
func newMockRecordNewConn(srcIP string, srcPort int, dstIP string, dstPort int, protocol int, bytes, packets, numFlowLogs float64) *mockRecord {
82+
return newMockRecordConn(srcIP, srcPort, dstIP, dstPort, protocol, bytes, packets, numFlowLogs).
83+
withType("newConnection")
84+
}
85+
86+
func newMockRecordEndConn(srcIP string, srcPort int, dstIP string, dstPort int, protocol int, bytes, packets, numFlowLogs float64) *mockRecord {
87+
return newMockRecordConn(srcIP, srcPort, dstIP, dstPort, protocol, bytes, packets, numFlowLogs).
88+
withType("endConnection")
89+
}
90+
91+
func (m *mockRecord) withHash(hashStr string) *mockRecord {
92+
m.record[api.HashIdFieldName] = hashStr
93+
return m
94+
}
95+
96+
func (m *mockRecord) withType(recordType string) *mockRecord {
97+
m.record[api.RecordTypeFieldName] = recordType
98+
return m
99+
}
100+
101+
func (m *mockRecord) get() config.GenericMap {
102+
return m.record
103+
}

0 commit comments

Comments
 (0)