Skip to content

Commit 381890d

Browse files
authored
conntrack copy agg + fix duplicates (#413)
1 parent cfadce6 commit 381890d

File tree

16 files changed

+468
-198
lines changed

16 files changed

+468
-198
lines changed

README.md

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -631,9 +631,11 @@ parameters:
631631
Proto: 17
632632
endConnectionTimeout: 5s
633633
heartbeatInterval: 40s
634+
terminatingTimeout: 5s
634635
- selector: {} # Default group
635636
endConnectionTimeout: 10s
636637
heartbeatInterval: 30s
638+
terminatingTimeout: 5s
637639
tcpFlags:
638640
fieldName: Flags
639641
detectEndConnection: true

contrib/kubernetes/flowlogs-pipeline.conf.yaml

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -107,6 +107,7 @@ parameters:
107107
scheduling:
108108
- endConnectionTimeout: 10s
109109
heartbeatInterval: 30s
110+
terminatingTimeout: 5s
110111
tcpFlags:
111112
fieldName: TCPFlags
112113
detectEndConnection: true

docs/api.md

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -225,16 +225,19 @@ Following is the supported API format for specifying connection tracking:
225225
count: count
226226
min: min
227227
max: max
228+
first: first
229+
last: last
228230
splitAB: When true, 2 output fields will be created. One for A->B and one for B->A flows.
229231
input: The input field to base the operation on. When omitted, 'name' is used
230232
scheduling: list of timeouts and intervals to apply per selector
231233
selector: key-value map to match against connection fields to apply this scheduling
232234
endConnectionTimeout: duration of time to wait from the last flow log to end a connection
235+
terminatingTimeout: duration of time to wait from detected FIN flag to end a connection
233236
heartbeatInterval: duration of time to wait between heartbeat reports of a connection
234237
maxConnectionsTracked: maximum number of connections we keep in our cache (0 means no limit)
235238
tcpFlags: settings for handling TCP flags
236239
fieldName: name of the field containing TCP flags
237-
detectEndConnection: detect end connections by FIN_ACK flag
240+
detectEndConnection: detect end connections by FIN flag
238241
swapAB: swap source and destination when the first flowlog contains the SYN_ACK flag
239242
</pre>
240243
## Time-based Filters API

docs/operational-metrics.md

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -18,9 +18,9 @@ Each table below provides documentation for an exported flowlogs-pipeline operat
1818
### conntrack_memory_connections
1919
| **Name** | conntrack_memory_connections |
2020
|:---|:---|
21-
| **Description** | The total number of tracked connections in memory. |
21+
| **Description** | The total number of tracked connections in memory per group and phase. |
2222
| **Type** | gauge |
23-
| **Labels** | group |
23+
| **Labels** | group, phase |
2424

2525

2626
### conntrack_output_records

network_definitions/config.yaml

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -41,6 +41,7 @@ extract:
4141
- selector: {}
4242
heartbeatInterval: 30s
4343
endConnectionTimeout: 10s
44+
terminatingTimeout: 5s
4445
outputRecordTypes:
4546
- newConnection
4647
- flowLog

pkg/api/conntrack.go

Lines changed: 8 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -81,11 +81,14 @@ type ConnTrackOperationEnum struct {
8181
Count string `yaml:"count" json:"count" doc:"count"`
8282
Min string `yaml:"min" json:"min" doc:"min"`
8383
Max string `yaml:"max" json:"max" doc:"max"`
84+
First string `yaml:"first" json:"first" doc:"first"`
85+
Last string `yaml:"last" json:"last" doc:"last"`
8486
}
8587

8688
type ConnTrackSchedulingGroup struct {
8789
Selector map[string]interface{} `yaml:"selector,omitempty" json:"selector,omitempty" doc:"key-value map to match against connection fields to apply this scheduling"`
8890
EndConnectionTimeout Duration `yaml:"endConnectionTimeout,omitempty" json:"endConnectionTimeout,omitempty" doc:"duration of time to wait from the last flow log to end a connection"`
91+
TerminatingTimeout Duration `yaml:"terminatingTimeout,omitempty" json:"terminatingTimeout,omitempty" doc:"duration of time to wait from detected FIN flag to end a connection"`
8992
HeartbeatInterval Duration `yaml:"heartbeatInterval,omitempty" json:"heartbeatInterval,omitempty" doc:"duration of time to wait between heartbeat reports of a connection"`
9093
}
9194

@@ -95,7 +98,7 @@ func ConnTrackOperationName(operation string) string {
9598

9699
type ConnTrackTCPFlags struct {
97100
FieldName string `yaml:"fieldName,omitempty" json:"fieldName,omitempty" doc:"name of the field containing TCP flags"`
98-
DetectEndConnection bool `yaml:"detectEndConnection,omitempty" json:"detectEndConnection,omitempty" doc:"detect end connections by FIN_ACK flag"`
101+
DetectEndConnection bool `yaml:"detectEndConnection,omitempty" json:"detectEndConnection,omitempty" doc:"detect end connections by FIN flag"`
99102
SwapAB bool `yaml:"swapAB,omitempty" json:"swapAB,omitempty" doc:"swap source and destination when the first flowlog contains the SYN_ACK flag"`
100103
}
101104

@@ -113,7 +116,7 @@ func (ct *ConnTrack) Validate() error {
113116
return conntrackInvalidError{splitABWithNoBidi: true,
114117
msg: fmt.Errorf("output field %q has splitAB=true although bidirection is not enabled (fieldGroupARef is empty)", of.Name)}
115118
}
116-
if !isOperationValid(of.Operation) {
119+
if !isOperationValid(of.Operation, of.SplitAB) {
117120
return conntrackInvalidError{unknownOperation: true,
118121
msg: fmt.Errorf("unknown operation %q in output field %q", of.Operation, of.Name)}
119122
}
@@ -250,13 +253,15 @@ func addToSet(set map[string]struct{}, item string) bool {
250253
return true
251254
}
252255

253-
func isOperationValid(value string) bool {
256+
func isOperationValid(value string, splitAB bool) bool {
254257
valid := true
255258
switch value {
256259
case ConnTrackOperationName("Sum"):
257260
case ConnTrackOperationName("Count"):
258261
case ConnTrackOperationName("Min"):
259262
case ConnTrackOperationName("Max"):
263+
case ConnTrackOperationName("First"), ConnTrackOperationName("Last"):
264+
valid = !splitAB
260265
default:
261266
valid = false
262267
}

pkg/pipeline/conntrack_integ_test.go

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -51,6 +51,8 @@ parameters:
5151
scheduling:
5252
- selector: {}
5353
endConnectionTimeout: 1s
54+
heartbeatInterval: 10s
55+
terminatingTimeout: 5s
5456
outputRecordTypes:
5557
- newConnection
5658
- flowLog
@@ -120,7 +122,7 @@ func TestConnTrack(t *testing.T) {
120122
}, test2.Interval(10*time.Millisecond))
121123

122124
// Wait a moment to make the connections expired
123-
time.Sleep(2 * time.Second)
125+
time.Sleep(5 * time.Second)
124126

125127
// Send something to the pipeline to allow the connection tracking output end connection records
126128
in <- config.GenericMap{"DstAddr": "1.2.3.4"}

pkg/pipeline/extract/conntrack/aggregator.go

Lines changed: 30 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -33,20 +33,22 @@ type aggregator interface {
3333
// addField adds an aggregate field to the connection
3434
addField(conn connection)
3535
// update updates the aggregate field in the connection based on the flow log.
36-
update(conn connection, flowLog config.GenericMap, d direction)
36+
update(conn connection, flowLog config.GenericMap, d direction, isFirst bool)
3737
}
3838

3939
type aggregateBase struct {
4040
inputField string
4141
outputField string
4242
splitAB bool
43-
initVal float64
43+
initVal interface{}
4444
}
4545

4646
type aSum struct{ aggregateBase }
4747
type aCount struct{ aggregateBase }
4848
type aMin struct{ aggregateBase }
4949
type aMax struct{ aggregateBase }
50+
type aFirst struct{ aggregateBase }
51+
type aLast struct{ aggregateBase }
5052

5153
// TODO: think of adding a more complex operation such as Average Packet Size which involves 2 input fields: Bytes/Packets
5254

@@ -65,17 +67,23 @@ func newAggregator(of api.OutputField) (aggregator, error) {
6567
var agg aggregator
6668
switch of.Operation {
6769
case api.ConnTrackOperationName("Sum"):
68-
aggBase.initVal = 0
70+
aggBase.initVal = float64(0)
6971
agg = &aSum{aggBase}
7072
case api.ConnTrackOperationName("Count"):
71-
aggBase.initVal = 0
73+
aggBase.initVal = float64(0)
7274
agg = &aCount{aggBase}
7375
case api.ConnTrackOperationName("Min"):
7476
aggBase.initVal = math.MaxFloat64
7577
agg = &aMin{aggBase}
7678
case api.ConnTrackOperationName("Max"):
7779
aggBase.initVal = -math.MaxFloat64
7880
agg = &aMax{aggBase}
81+
case api.ConnTrackOperationName("First"):
82+
aggBase.initVal = nil
83+
agg = &aFirst{aggBase}
84+
case api.ConnTrackOperationName("Last"):
85+
aggBase.initVal = nil
86+
agg = &aLast{aggBase}
7987
default:
8088
return nil, fmt.Errorf("unknown operation: %q", of.Operation)
8189
}
@@ -118,47 +126,57 @@ func (agg *aggregateBase) addField(conn connection) {
118126
}
119127
}
120128

121-
func (agg *aSum) update(conn connection, flowLog config.GenericMap, d direction) {
129+
func (agg *aSum) update(conn connection, flowLog config.GenericMap, d direction, isNew bool) {
122130
outputField := agg.getOutputField(d)
123131
v, err := agg.getInputFieldValue(flowLog)
124132
if err != nil {
125133
log.Errorf("error updating connection %x: %v", conn.getHash().hashTotal, err)
126134
return
127135
}
128-
conn.updateAggValue(outputField, func(curr float64) float64 {
136+
conn.updateAggFnValue(outputField, func(curr float64) float64 {
129137
return curr + v
130138
})
131139
}
132140

133-
func (agg *aCount) update(conn connection, flowLog config.GenericMap, d direction) {
141+
func (agg *aCount) update(conn connection, flowLog config.GenericMap, d direction, isNew bool) {
134142
outputField := agg.getOutputField(d)
135-
conn.updateAggValue(outputField, func(curr float64) float64 {
143+
conn.updateAggFnValue(outputField, func(curr float64) float64 {
136144
return curr + 1
137145
})
138146
}
139147

140-
func (agg *aMin) update(conn connection, flowLog config.GenericMap, d direction) {
148+
func (agg *aMin) update(conn connection, flowLog config.GenericMap, d direction, isNew bool) {
141149
outputField := agg.getOutputField(d)
142150
v, err := agg.getInputFieldValue(flowLog)
143151
if err != nil {
144152
log.Errorf("error updating connection %x: %v", conn.getHash().hashTotal, err)
145153
return
146154
}
147155

148-
conn.updateAggValue(outputField, func(curr float64) float64 {
156+
conn.updateAggFnValue(outputField, func(curr float64) float64 {
149157
return math.Min(curr, v)
150158
})
151159
}
152160

153-
func (agg *aMax) update(conn connection, flowLog config.GenericMap, d direction) {
161+
func (agg *aMax) update(conn connection, flowLog config.GenericMap, d direction, isNew bool) {
154162
outputField := agg.getOutputField(d)
155163
v, err := agg.getInputFieldValue(flowLog)
156164
if err != nil {
157165
log.Errorf("error updating connection %x: %v", conn.getHash().hashTotal, err)
158166
return
159167
}
160168

161-
conn.updateAggValue(outputField, func(curr float64) float64 {
169+
conn.updateAggFnValue(outputField, func(curr float64) float64 {
162170
return math.Max(curr, v)
163171
})
164172
}
173+
174+
func (cp *aFirst) update(conn connection, flowLog config.GenericMap, d direction, isNew bool) {
175+
if isNew {
176+
conn.updateAggValue(cp.outputField, flowLog[cp.inputField])
177+
}
178+
}
179+
180+
func (cp *aLast) update(conn connection, flowLog config.GenericMap, d direction, isNew bool) {
181+
conn.updateAggValue(cp.outputField, flowLog[cp.inputField])
182+
}

pkg/pipeline/extract/conntrack/aggregator_test.go

Lines changed: 40 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -45,6 +45,14 @@ func TestNewAggregator_Invalid(t *testing.T) {
4545
Input: "Input",
4646
})
4747
require.NotNil(t, err)
48+
49+
// invalid first agg
50+
_, err = newAggregator(api.OutputField{
51+
Operation: "first",
52+
SplitAB: true,
53+
Input: "Input",
54+
})
55+
require.NotNil(t, err)
4856
}
4957

5058
func TestNewAggregator_Valid(t *testing.T) {
@@ -56,27 +64,27 @@ func TestNewAggregator_Valid(t *testing.T) {
5664
{
5765
name: "Default SplitAB",
5866
outputField: api.OutputField{Name: "MyAgg", Operation: "sum"},
59-
expected: &aSum{aggregateBase{"MyAgg", "MyAgg", false, 0}},
67+
expected: &aSum{aggregateBase{"MyAgg", "MyAgg", false, float64(0)}},
6068
},
6169
{
6270
name: "Default input",
6371
outputField: api.OutputField{Name: "MyAgg", Operation: "sum", SplitAB: true},
64-
expected: &aSum{aggregateBase{"MyAgg", "MyAgg", true, 0}},
72+
expected: &aSum{aggregateBase{"MyAgg", "MyAgg", true, float64(0)}},
6573
},
6674
{
6775
name: "Custom input",
6876
outputField: api.OutputField{Name: "MyAgg", Operation: "sum", Input: "MyInput"},
69-
expected: &aSum{aggregateBase{"MyInput", "MyAgg", false, 0}},
77+
expected: &aSum{aggregateBase{"MyInput", "MyAgg", false, float64(0)}},
7078
},
7179
{
7280
name: "OperationType sum",
7381
outputField: api.OutputField{Name: "MyAgg", Operation: "sum"},
74-
expected: &aSum{aggregateBase{"MyAgg", "MyAgg", false, 0}},
82+
expected: &aSum{aggregateBase{"MyAgg", "MyAgg", false, float64(0)}},
7583
},
7684
{
7785
name: "OperationType count",
7886
outputField: api.OutputField{Name: "MyAgg", Operation: "count"},
79-
expected: &aCount{aggregateBase{"MyAgg", "MyAgg", false, 0}},
87+
expected: &aCount{aggregateBase{"MyAgg", "MyAgg", false, float64(0)}},
8088
},
8189
{
8290
name: "OperationType max",
@@ -88,6 +96,21 @@ func TestNewAggregator_Valid(t *testing.T) {
8896
outputField: api.OutputField{Name: "MyAgg", Operation: "min"},
8997
expected: &aMin{aggregateBase{"MyAgg", "MyAgg", false, math.MaxFloat64}},
9098
},
99+
{
100+
name: "Default first",
101+
outputField: api.OutputField{Name: "MyCp", Operation: "first"},
102+
expected: &aFirst{aggregateBase{"MyCp", "MyCp", false, nil}},
103+
},
104+
{
105+
name: "Custom input first",
106+
outputField: api.OutputField{Name: "MyCp", Operation: "first", Input: "MyInput"},
107+
expected: &aFirst{aggregateBase{"MyInput", "MyCp", false, nil}},
108+
},
109+
{
110+
name: "Default last",
111+
outputField: api.OutputField{Name: "MyCp", Operation: "last"},
112+
expected: &aLast{aggregateBase{"MyCp", "MyCp", false, nil}},
113+
},
91114
}
92115

93116
for _, test := range table {
@@ -106,6 +129,8 @@ func TestAddField_and_Update(t *testing.T) {
106129
{Name: "numFlowLogs", Operation: "count"},
107130
{Name: "minFlowLogBytes", Operation: "min", Input: "Bytes"},
108131
{Name: "maxFlowLogBytes", Operation: "max", Input: "Bytes"},
132+
{Name: "FirstFlowDirection", Operation: "first", Input: "FlowDirection"},
133+
{Name: "LastFlowDirection", Operation: "last", Input: "FlowDirection"},
109134
}
110135
var aggs []aggregator
111136
for _, of := range ofs {
@@ -119,38 +144,40 @@ func TestAddField_and_Update(t *testing.T) {
119144
portA := 1
120145
portB := 9002
121146
protocolA := 6
147+
flowDirA := 0
148+
flowDirB := 1
122149

123150
table := []struct {
124151
name string
125152
flowLog config.GenericMap
126153
direction direction
127-
expected map[string]float64
154+
expected map[string]interface{}
128155
}{
129156
{
130157
name: "flowLog 1",
131-
flowLog: newMockFlowLog(ipA, portA, ipB, portB, protocolA, 100, 10, false),
158+
flowLog: newMockFlowLog(ipA, portA, ipB, portB, protocolA, flowDirA, 100, 10, false),
132159
direction: dirAB,
133-
expected: map[string]float64{"Bytes_AB": 100, "Bytes_BA": 0, "Packets": 10, "maxFlowLogBytes": 100, "minFlowLogBytes": 100, "numFlowLogs": 1},
160+
expected: map[string]interface{}{"Bytes_AB": float64(100), "Bytes_BA": float64(0), "Packets": float64(10), "maxFlowLogBytes": float64(100), "minFlowLogBytes": float64(100), "numFlowLogs": float64(1), "FirstFlowDirection": 0, "LastFlowDirection": 0},
134161
},
135162
{
136163
name: "flowLog 2",
137-
flowLog: newMockFlowLog(ipA, portA, ipB, portB, protocolA, 200, 20, false),
164+
flowLog: newMockFlowLog(ipA, portA, ipB, portB, protocolA, flowDirB, 200, 20, false),
138165
direction: dirBA,
139-
expected: map[string]float64{"Bytes_AB": 100, "Bytes_BA": 200, "Packets": 30, "maxFlowLogBytes": 200, "minFlowLogBytes": 100, "numFlowLogs": 2},
166+
expected: map[string]interface{}{"Bytes_AB": float64(100), "Bytes_BA": float64(200), "Packets": float64(30), "maxFlowLogBytes": float64(200), "minFlowLogBytes": float64(100), "numFlowLogs": float64(2), "FirstFlowDirection": 0, "LastFlowDirection": 1},
140167
},
141168
}
142169

143170
conn := NewConnBuilder(nil).Build()
144171
for _, agg := range aggs {
145172
agg.addField(conn)
146173
}
147-
expectedInits := map[string]float64{"Bytes_AB": 0, "Bytes_BA": 0, "Packets": 0, "maxFlowLogBytes": -math.MaxFloat64, "minFlowLogBytes": math.MaxFloat64, "numFlowLogs": 0}
174+
expectedInits := map[string]interface{}{"Bytes_AB": float64(0), "Bytes_BA": float64(0), "Packets": float64(0), "maxFlowLogBytes": float64(-math.MaxFloat64), "minFlowLogBytes": float64(math.MaxFloat64), "numFlowLogs": float64(0), "FirstFlowDirection": nil, "LastFlowDirection": nil}
148175
require.Equal(t, expectedInits, conn.(*connType).aggFields)
149176

150-
for _, test := range table {
177+
for i, test := range table {
151178
t.Run(test.name, func(t *testing.T) {
152179
for _, agg := range aggs {
153-
agg.update(conn, test.flowLog, test.direction)
180+
agg.update(conn, test.flowLog, test.direction, i == 0)
154181
}
155182
require.Equal(t, test.expected, conn.(*connType).aggFields)
156183
})

0 commit comments

Comments
 (0)