Skip to content

Commit dfe7ac4

Browse files
authored
NETOBSERV-1407 console deduper merge mode (#435)
* manage deduper merge * manage array filtering for deduper * manage when FlowDirection is number * addressed feedback
1 parent 6a3207b commit dfe7ac4

28 files changed

+370
-98
lines changed

cmd/plugin-backend.go

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -140,6 +140,9 @@ func main() {
140140
config.Loki.StatusUserCertPath,
141141
config.Loki.StatusUserKeyPath,
142142
config.Loki.UseMocks,
143-
config.Loki.Labels),
143+
config.Loki.Labels,
144+
config.Frontend.Deduper.Mark,
145+
config.Frontend.Deduper.Merge,
146+
),
144147
}, checker)
145148
}

config/sample-config.yaml

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -390,6 +390,12 @@ frontend:
390390
filter: interface
391391
default: false
392392
width: 10
393+
- id: FlowDirInts
394+
name: Interfaces and Directions
395+
tooltip: Pairs of network interface and direction of the Flow observed at the Node observation point.
396+
field: FlowDirection
397+
default: false
398+
width: 15
393399
- id: Bytes
394400
name: Bytes
395401
tooltip: The total aggregated number of bytes.
@@ -812,3 +818,6 @@ frontend:
812818
alertNamespaces:
813819
- netobserv
814820
sampling: 50
821+
deduper:
822+
mark: true
823+
merge: false

mocks/loki/flow_records.json

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -308,7 +308,7 @@
308308
"values": [
309309
[
310310
"1689619351079000064",
311-
"{\"SrcMac\":\"42:01:0A:00:00:01\",\"DstPort\":6443,\"SrcPort\":54082,\"Etype\":2048,\"SrcK8S_Type\":\"Node\",\"AgentIP\":\"10.0.0.5\",\"Bytes\":66,\"Packets\":1,\"SrcK8S_HostIP\":\"10.0.0.4\",\"SrcK8S_HostName\":\"ci-ln-hnd9rjk-72292-hnd5v-master-1\",\"SrcK8S_OwnerType\":\"Node\",\"Proto\":6,\"Flags\":16,\"SrcAddr\":\"10.0.0.4\",\"SrcK8S_Name\":\"ci-ln-hnd9rjk-72292-hnd5v-master-1\",\"DstMac\":\"42:01:0A:00:00:05\",\"TimeFlowStartMs\":1689619351079,\"Duplicate\":false,\"IfDirection\":0,\"TimeFlowEndMs\":1689619351079,\"TimeFlowRttNs\":1234,\"TimeReceived\":1689619351,\"DstAddr\":\"10.0.0.2\",\"Interface\":\"br-ex\",\"Dscp\":0,\"DnsLatencyMs\":10,\"DnsErrno\":0}"
311+
"{\"SrcMac\":\"42:01:0A:00:00:01\",\"DstPort\":6443,\"SrcPort\":54082,\"Etype\":2048,\"SrcK8S_Type\":\"Node\",\"AgentIP\":\"10.0.0.5\",\"Bytes\":66,\"Packets\":1,\"SrcK8S_HostIP\":\"10.0.0.4\",\"SrcK8S_HostName\":\"ci-ln-hnd9rjk-72292-hnd5v-master-1\",\"SrcK8S_OwnerType\":\"Node\",\"Proto\":6,\"Flags\":16,\"SrcAddr\":\"10.0.0.4\",\"SrcK8S_Name\":\"ci-ln-hnd9rjk-72292-hnd5v-master-1\",\"DstMac\":\"42:01:0A:00:00:05\",\"TimeFlowStartMs\":1689619351079,\"Duplicate\":false,\"IfDirection\":0,\"TimeFlowEndMs\":1689619351079,\"TimeFlowRttNs\":1234,\"TimeReceived\":1689619351,\"DstAddr\":\"10.0.0.2\",\"Interface\":\"br-ex\",\"Dscp\":0,\"DnsLatencyMs\":10,\"DnsErrno\":0,\"Interfaces\":[\"br-ex\",\"ens4\"],\"FlowDirections\":[0,0]}"
312312
],
313313
[
314314
"1689619350793999872",

pkg/handler/config.go

Lines changed: 11 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -77,6 +77,11 @@ type QuickFilter struct {
7777
Default bool `yaml:"default,omitempty" json:"default,omitempty"`
7878
}
7979

80+
type Deduper struct {
81+
Mark bool `yaml:"mark" json:"mark"`
82+
Merge bool `yaml:"merge" json:"merge"`
83+
}
84+
8085
type Frontend struct {
8186
BuildVersion string `yaml:"buildVersion" json:"buildVersion"`
8287
BuildDate string `yaml:"buildDate" json:"buildDate"`
@@ -88,6 +93,7 @@ type Frontend struct {
8893
AlertNamespaces []string `yaml:"alertNamespaces" json:"alertNamespaces"`
8994
Sampling int `yaml:"sampling" json:"sampling"`
9095
Features []string `yaml:"features" json:"features"`
96+
Deduper Deduper `yaml:"deduper" json:"deduper"`
9197
}
9298

9399
type Config struct {
@@ -122,6 +128,11 @@ func ReadConfigFile(version, date, filename string) (*Config, error) {
122128
Filters: []Filter{},
123129
QuickFilters: []QuickFilter{},
124130
Features: []string{},
131+
// TODO: update these defaults when operator will move to merge mode
132+
Deduper: Deduper{
133+
Mark: true,
134+
Merge: false,
135+
},
125136
},
126137
}
127138
if len(filename) == 0 {

pkg/handler/flows.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -67,7 +67,7 @@ func getFlows(cfg *loki.Config, client httpclient.Caller, params url.Values) (*m
6767
return nil, http.StatusBadRequest, err
6868
}
6969
dedup := params.Get(dedupKey) == "true"
70-
if utils.Contains(constants.AnyConnectionType, string(recordType)) {
70+
if !cfg.Deduper.Mark || utils.Contains(constants.AnyConnectionType, string(recordType)) {
7171
dedup = false
7272
}
7373
packetLoss, err := getPacketLoss(params)

pkg/handler/topology.go

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -93,7 +93,7 @@ func getTopologyFlows(cfg *loki.Config, client httpclient.Caller, params url.Val
9393
if err != nil {
9494
return nil, http.StatusBadRequest, err
9595
}
96-
if shouldMergeReporters(metricType) {
96+
if shouldMergeReporters(metricType, cfg.Deduper) {
9797
filterGroups = expandReportersMergeQueries(filterGroups)
9898
}
9999

@@ -135,9 +135,9 @@ func getTopologyFlows(cfg *loki.Config, client httpclient.Caller, params url.Val
135135
return qr, http.StatusOK, nil
136136
}
137137

138-
func shouldMergeReporters(metricType constants.MetricType) bool {
139-
return metricType == constants.MetricTypeBytes ||
140-
metricType == constants.MetricTypePackets
138+
func shouldMergeReporters(metricType constants.MetricType, deduper loki.Deduper) bool {
139+
return !deduper.Merge && deduper.Mark && (metricType == constants.MetricTypeBytes ||
140+
metricType == constants.MetricTypePackets)
141141
}
142142

143143
func expandReportersMergeQueries(queries filters.MultiQueries) filters.MultiQueries {

pkg/loki/config.go

Lines changed: 11 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -7,6 +7,11 @@ import (
77
"github.com/netobserv/network-observability-console-plugin/pkg/utils"
88
)
99

10+
type Deduper struct {
11+
Mark bool
12+
Merge bool
13+
}
14+
1015
type Config struct {
1116
URL *url.URL
1217
StatusURL *url.URL
@@ -23,9 +28,10 @@ type Config struct {
2328
UseMocks bool
2429
ForwardUserToken bool
2530
Labels map[string]struct{}
31+
Deduper Deduper
2632
}
2733

28-
func NewConfig(url *url.URL, statusURL *url.URL, timeout time.Duration, tenantID string, tokenPath string, forwardUserToken bool, skipTLS bool, capath string, statusSkipTLS bool, statusCapath string, statusUserCertPath string, statusUserKeyPath string, useMocks bool, labels []string) Config {
34+
func NewConfig(url *url.URL, statusURL *url.URL, timeout time.Duration, tenantID string, tokenPath string, forwardUserToken bool, skipTLS bool, capath string, statusSkipTLS bool, statusCapath string, statusUserCertPath string, statusUserKeyPath string, useMocks bool, labels []string, deduperMark bool, deduperMerge bool) Config {
2935
return Config{
3036
URL: url,
3137
StatusURL: statusURL,
@@ -41,6 +47,10 @@ func NewConfig(url *url.URL, statusURL *url.URL, timeout time.Duration, tenantID
4147
UseMocks: useMocks,
4248
ForwardUserToken: forwardUserToken,
4349
Labels: utils.GetMapInterface(labels),
50+
Deduper: Deduper{
51+
Mark: deduperMark,
52+
Merge: deduperMerge,
53+
},
4454
}
4555
}
4656

pkg/loki/filter.go

Lines changed: 14 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -28,6 +28,7 @@ const (
2828
typeString
2929
typeRegex
3030
typeRegexContains
31+
typeRegexArrayContains
3132
typeIP
3233
)
3334

@@ -54,6 +55,10 @@ type lineMatch struct {
5455
valueType valueType
5556
}
5657

58+
func isRegex(v valueType) bool {
59+
return v == typeRegex || v == typeRegexContains || v == typeRegexArrayContains
60+
}
61+
5762
func stringEqualLabelFilter(labelKey string, value string) labelFilter {
5863
return labelFilter{
5964
key: labelKey,
@@ -122,7 +127,7 @@ func (f *labelFilter) writeInto(sb *strings.Builder) {
122127
sb.WriteString(`ip("`)
123128
sb.WriteString(f.value)
124129
sb.WriteString(`")`)
125-
case typeRegexContains:
130+
case typeRegexContains, typeRegexArrayContains:
126131
sb.WriteString("`(?i).*")
127132
sb.WriteString(f.value)
128133
sb.WriteString(".*`")
@@ -141,19 +146,17 @@ func (f *lineFilter) asLabelFilters() []labelFilter {
141146
valueType: v.valueType,
142147
value: v.value,
143148
}
144-
if v.valueType == typeRegex || v.valueType == typeRegexContains {
149+
if isRegex(v.valueType) {
150+
lf.matcher = labelMatches
145151
if f.not {
146152
lf.matcher = labelNoMatches
147-
} else {
148-
lf.matcher = labelMatches
149153
}
150154
} else {
155+
lf.matcher = labelEqual
151156
if f.not {
152157
lf.matcher = labelNotEqual
153158
} else if f.moreThan {
154159
lf.matcher = labelMoreThanOrEqual
155-
} else {
156-
lf.matcher = labelEqual
157160
}
158161
}
159162
lfs = append(lfs, lf)
@@ -307,6 +310,11 @@ func (f *lineFilter) writeInto(sb *strings.Builder) {
307310
sb.WriteString(`"(?i)[^"]*`)
308311
sb.WriteString(valueReplacer.Replace(v.value))
309312
sb.WriteString(`.*"`)
313+
// for array, we ensure it starts by [ and ends by ]
314+
case typeRegexArrayContains:
315+
sb.WriteString(`\[(?i).*`)
316+
sb.WriteString(valueReplacer.Replace(v.value))
317+
sb.WriteString(`.*]`)
310318
}
311319
}
312320
}

pkg/loki/flow_query.go

Lines changed: 14 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -180,6 +180,18 @@ func (q *FlowQueryBuilder) addLineFilters(key string, values []string, not bool,
180180
if len(values) == 0 {
181181
return
182182
}
183+
184+
var isArray bool
185+
if q.config.Deduper.Merge {
186+
switch key {
187+
case "FlowDirection", "Interface":
188+
key = fmt.Sprintf("%ss", key)
189+
isArray = true
190+
default:
191+
isArray = false
192+
}
193+
}
194+
183195
lf := lineFilter{
184196
key: key,
185197
not: not,
@@ -190,6 +202,8 @@ func (q *FlowQueryBuilder) addLineFilters(key string, values []string, not bool,
190202
for _, value := range values {
191203
lm := lineMatch{}
192204
switch {
205+
case isArray:
206+
lm = lineMatch{valueType: typeRegexArrayContains, value: value}
193207
case isExactMatch(value):
194208
lm = lineMatch{valueType: typeString, value: trimExactMatch(value)}
195209
emptyMatches = emptyMatches || len(lm.value) == 0

pkg/loki/query_test.go

Lines changed: 7 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -13,7 +13,7 @@ import (
1313
func TestFlowQuery_AddLabelFilters(t *testing.T) {
1414
lokiURL, err := url.Parse("/")
1515
require.NoError(t, err)
16-
cfg := NewConfig(lokiURL, lokiURL, time.Second, "", "", false, false, "", false, "", "", "", false, []string{"foo", "flis"})
16+
cfg := NewConfig(lokiURL, lokiURL, time.Second, "", "", false, false, "", false, "", "", "", false, []string{"foo", "flis"}, true, false)
1717
query := NewFlowQueryBuilderWithDefaults(&cfg)
1818
err = query.addFilter(filters.NewMatch("foo", `"bar"`))
1919
require.NoError(t, err)
@@ -26,15 +26,15 @@ func TestFlowQuery_AddLabelFilters(t *testing.T) {
2626
func TestQuery_BackQuote_Error(t *testing.T) {
2727
lokiURL, err := url.Parse("/")
2828
require.NoError(t, err)
29-
cfg := NewConfig(lokiURL, lokiURL, time.Second, "", "", false, false, "", false, "", "", "", false, []string{"lab1", "lab2"})
29+
cfg := NewConfig(lokiURL, lokiURL, time.Second, "", "", false, false, "", false, "", "", "", false, []string{"lab1", "lab2"}, true, false)
3030
query := NewFlowQueryBuilderWithDefaults(&cfg)
3131
assert.Error(t, query.addFilter(filters.NewMatch("key", "backquoted`val")))
3232
}
3333

3434
func TestFlowQuery_AddNotLabelFilters(t *testing.T) {
3535
lokiURL, err := url.Parse("/")
3636
require.NoError(t, err)
37-
cfg := NewConfig(lokiURL, lokiURL, time.Second, "", "", false, false, "", false, "", "", "", false, []string{"foo", "flis"})
37+
cfg := NewConfig(lokiURL, lokiURL, time.Second, "", "", false, false, "", false, "", "", "", false, []string{"foo", "flis"}, true, false)
3838
query := NewFlowQueryBuilderWithDefaults(&cfg)
3939
err = query.addFilter(filters.NewMatch("foo", `"bar"`))
4040
require.NoError(t, err)
@@ -51,7 +51,7 @@ func backtick(str string) string {
5151
func TestFlowQuery_AddLineFilterMultipleValues(t *testing.T) {
5252
lokiURL, err := url.Parse("/")
5353
require.NoError(t, err)
54-
cfg := NewConfig(lokiURL, lokiURL, time.Second, "", "", false, false, "", false, "", "", "", false, []string{})
54+
cfg := NewConfig(lokiURL, lokiURL, time.Second, "", "", false, false, "", false, "", "", "", false, []string{}, true, false)
5555
query := NewFlowQueryBuilderWithDefaults(&cfg)
5656
err = query.addFilter(filters.NewMatch("foo", `bar,baz`))
5757
require.NoError(t, err)
@@ -62,7 +62,7 @@ func TestFlowQuery_AddLineFilterMultipleValues(t *testing.T) {
6262
func TestFlowQuery_AddNotLineFilters(t *testing.T) {
6363
lokiURL, err := url.Parse("/")
6464
require.NoError(t, err)
65-
cfg := NewConfig(lokiURL, lokiURL, time.Second, "", "", false, false, "", false, "", "", "", false, []string{})
65+
cfg := NewConfig(lokiURL, lokiURL, time.Second, "", "", false, false, "", false, "", "", "", false, []string{}, true, false)
6666
query := NewFlowQueryBuilderWithDefaults(&cfg)
6767
err = query.addFilter(filters.NewMatch("foo", `"bar"`))
6868
require.NoError(t, err)
@@ -75,7 +75,7 @@ func TestFlowQuery_AddNotLineFilters(t *testing.T) {
7575
func TestFlowQuery_AddLineFiltersWithEmpty(t *testing.T) {
7676
lokiURL, err := url.Parse("/")
7777
require.NoError(t, err)
78-
cfg := NewConfig(lokiURL, lokiURL, time.Second, "", "", false, false, "", false, "", "", "", false, []string{})
78+
cfg := NewConfig(lokiURL, lokiURL, time.Second, "", "", false, false, "", false, "", "", "", false, []string{}, true, false)
7979
query := NewFlowQueryBuilderWithDefaults(&cfg)
8080
err = query.addFilter(filters.NewMatch("foo", `"bar"`))
8181
require.NoError(t, err)
@@ -88,7 +88,7 @@ func TestFlowQuery_AddLineFiltersWithEmpty(t *testing.T) {
8888
func TestFlowQuery_AddRecordTypeLabelFilter(t *testing.T) {
8989
lokiURL, err := url.Parse("/")
9090
require.NoError(t, err)
91-
cfg := NewConfig(lokiURL, lokiURL, time.Second, "", "", false, false, "", false, "", "", "", false, []string{"foo", "flis", "_RecordType"})
91+
cfg := NewConfig(lokiURL, lokiURL, time.Second, "", "", false, false, "", false, "", "", "", false, []string{"foo", "flis", "_RecordType"}, true, false)
9292
query := NewFlowQueryBuilderWithDefaults(&cfg)
9393
err = query.addFilter(filters.NewMatch("foo", `"bar"`))
9494
require.NoError(t, err)

0 commit comments

Comments
 (0)