Skip to content

Commit 3f56172

Browse files
committed
Add dedupe
1 parent ada5811 commit 3f56172

File tree

5 files changed

+140
-22
lines changed

5 files changed

+140
-22
lines changed

contrib/kubernetes/flowlogs2metrics.conf.yaml

Lines changed: 1 addition & 21 deletions
Original file line numberDiff line numberDiff line change
@@ -136,7 +136,7 @@ pipeline:
136136
by:
137137
- dstSubnet
138138
operation: sum
139-
recordkey: ""
139+
recordkey: isNewFlow
140140
- name: src_connection_count
141141
by:
142142
- srcSubnet
@@ -248,22 +248,6 @@ pipeline:
248248
output: dstSubnet
249249
type: add_subnet
250250
parameters: /16
251-
- input: srcIP
252-
output: srcSubnet
253-
type: add_subnet
254-
parameters: /16
255-
- input: dstIP
256-
output: dstSubnet24
257-
type: add_subnet
258-
parameters: /24
259-
- input: srcIP
260-
output: srcSubnet24
261-
type: add_subnet
262-
parameters: /24
263-
- input: dstIP
264-
output: dstSubnet
265-
type: add_subnet
266-
parameters: /16
267251
- input: srcIP
268252
output: srcK8S
269253
type: add_kubernetes
@@ -280,10 +264,6 @@ pipeline:
280264
output: elephant
281265
type: add_if
282266
parameters: '>=512'
283-
- input: dstPort
284-
output: service
285-
type: add_service
286-
parameters: proto
287267
type: network
288268
write:
289269
type: none

docs/metrics.md

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -54,7 +54,7 @@ and the transformation to generate the exported metric.
5454
| **Details** | Counts the number of connections per subnet with network prefix length /16 (using conn_tracking sum isNewFlow field) |
5555
| **Usage** | Evaluate network connections per subnet |
5656
| **Labels** | rate, subnet |
57-
| **Operation** | aggregate by `dstSubnet` and `sum` |
57+
| **Operation** | aggregate by `dstSubnet` and `sum` field `isNewFlow` |
5858
| **Exposed as** | `fl2m_connections_per_destination_subnet` of type `gauge` |
5959
| **Visualized as** | "Connections rate per destinationIP /16 subnets" on dashboard `details` |
6060
|||

pkg/confgen/confgen.go

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -85,6 +85,8 @@ func (cg *ConfGen) Run() error {
8585
}
8686
}
8787

88+
cg.dedupe()
89+
8890
err = cg.generateFlowlogs2MetricsConfig(Opt.DestConfFile)
8991
if err != nil {
9092
log.Debugf("cg.generateFlowlogs2MetricsConfig err: %v ", err)

pkg/confgen/dedup.go

Lines changed: 74 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,74 @@
1+
/*
2+
* Copyright (C) 2021 IBM, Inc.
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* http://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*
16+
*/
17+
18+
package confgen
19+
20+
import (
21+
"github.com/netobserv/flowlogs2metrics/pkg/api"
22+
"github.com/netobserv/flowlogs2metrics/pkg/pipeline/extract/aggregate"
23+
log "github.com/sirupsen/logrus"
24+
"reflect"
25+
)
26+
27+
func (cg *ConfGen) dedupe() {
28+
cg.transformRules = dedupeNetworkTransformRules(cg.transformRules)
29+
cg.aggregateDefinitions = dedupeAggregateDefinitions(cg.aggregateDefinitions)
30+
}
31+
32+
type void struct{}
33+
var voidMember void
34+
35+
func dedupeNetworkTransformRules(rules api.NetworkTransformRules) api.NetworkTransformRules {
36+
// There are no built-in sets in go
37+
//https://stackoverflow.com/a/34020023/2749989
38+
uniqueSet := make(map[api.NetworkTransformRule]void)
39+
var dedpueSlice []api.NetworkTransformRule
40+
for i, rule := range rules {
41+
if _, exists := uniqueSet[rule]; exists {
42+
// duplicate rule
43+
log.Debugf("Remove duplicate NetworkTransformRule %v at index %v", rule, i)
44+
continue
45+
}
46+
uniqueSet[rule] = voidMember
47+
dedpueSlice = append(dedpueSlice, rule)
48+
}
49+
return dedpueSlice
50+
}
51+
52+
// dedupeAggregateDefinitions is inefficient because we can't use a map to look for duplicates.
53+
// The reason is that aggregate.Definition is not hashable due to its By field which is a slice.
54+
func dedupeAggregateDefinitions(aggregateDefinitions aggregate.Definitions) aggregate.Definitions {
55+
var dedpueSlice []aggregate.Definition
56+
for i, aggregateDefinition := range aggregateDefinitions {
57+
if containsAggregateDefinitions(dedpueSlice, aggregateDefinition) {
58+
// duplicate aggregateDefinition
59+
log.Debugf("Remove duplicate AggregateDefinitions %v at index %v", aggregateDefinition, i)
60+
continue
61+
}
62+
dedpueSlice = append(dedpueSlice, aggregateDefinition)
63+
}
64+
return dedpueSlice
65+
}
66+
67+
func containsAggregateDefinitions(slice []aggregate.Definition, searchItem aggregate.Definition) bool {
68+
for _, item := range slice {
69+
if reflect.DeepEqual(item, searchItem) {
70+
return true
71+
}
72+
}
73+
return false
74+
}

pkg/confgen/dedup_test.go

Lines changed: 62 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,62 @@
1+
/*
2+
* Copyright (C) 2021 IBM, Inc.
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* http://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*
16+
*/
17+
18+
19+
package confgen
20+
21+
import (
22+
"github.com/netobserv/flowlogs2metrics/pkg/api"
23+
"github.com/netobserv/flowlogs2metrics/pkg/pipeline/extract/aggregate"
24+
"github.com/stretchr/testify/require"
25+
"testing"
26+
)
27+
28+
func Test_dedupeNetworkTransformRules(t *testing.T) {
29+
slice := api.NetworkTransformRules{
30+
api.NetworkTransformRule{Input: "i1", Output: "o1"},
31+
api.NetworkTransformRule{Input: "i2", Output: "o2"},
32+
api.NetworkTransformRule{Input: "i3", Output: "o3"},
33+
api.NetworkTransformRule{Input: "i2", Output: "o2"},
34+
}
35+
expected := api.NetworkTransformRules{
36+
api.NetworkTransformRule{Input: "i1", Output: "o1"},
37+
api.NetworkTransformRule{Input: "i2", Output: "o2"},
38+
api.NetworkTransformRule{Input: "i3", Output: "o3"},
39+
}
40+
actual := dedupeNetworkTransformRules(slice)
41+
42+
require.ElementsMatch(t, actual, expected)
43+
}
44+
45+
func Test_dedupeAggregateDefinitions(t *testing.T) {
46+
slice := aggregate.Definitions{
47+
aggregate.Definition{Name: "n1", By: aggregate.By{"a", "b"}, Operation: aggregate.Operation("o1")},
48+
aggregate.Definition{Name: "n1", By: aggregate.By{"a"}, Operation: aggregate.Operation("o1")},
49+
aggregate.Definition{Name: "n2", By: aggregate.By{"a", "b"}, Operation: aggregate.Operation("o2")},
50+
aggregate.Definition{Name: "n3", By: aggregate.By{"a", "b"}, Operation: aggregate.Operation("o3")},
51+
aggregate.Definition{Name: "n2", By: aggregate.By{"a", "b"}, Operation: aggregate.Operation("o2")},
52+
}
53+
expected := aggregate.Definitions{
54+
aggregate.Definition{Name: "n1", By: aggregate.By{"a", "b"}, Operation: aggregate.Operation("o1")},
55+
aggregate.Definition{Name: "n1", By: aggregate.By{"a"}, Operation: aggregate.Operation("o1")},
56+
aggregate.Definition{Name: "n2", By: aggregate.By{"a", "b"}, Operation: aggregate.Operation("o2")},
57+
aggregate.Definition{Name: "n3", By: aggregate.By{"a", "b"}, Operation: aggregate.Operation("o3")},
58+
}
59+
actual := dedupeAggregateDefinitions(slice)
60+
61+
require.ElementsMatch(t, actual, expected)
62+
}

0 commit comments

Comments
 (0)