Skip to content

Commit 2505d56

Browse files
authored
Merge pull request #176 from jpinsonneau/330
NETOBSERV-330 Provide a way to filter Kubernetes-related flows
2 parents 7c4897f + a53958c commit 2505d56

25 files changed

+239
-93
lines changed

cmd/plugin-backend.go

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -33,6 +33,7 @@ var (
3333
lokiCAPath = flag.String("loki-ca-path", "", "Path to loki CA certificate")
3434
lokiSkipTLS = flag.Bool("loki-skip-tls", false, "Skip TLS checks for loki HTTPS connection")
3535
lokiMock = flag.Bool("loki-mock", false, "Fake loki results using saved mocks")
36+
ingressMatcher = flag.String("ingress-matcher", ".*-ingress$", "Regex matching ingress namespace")
3637
logLevel = flag.String("loglevel", "info", "log level (default: info)")
3738
frontendConfig = flag.String("frontend-config", "", "path to the console plugin config file")
3839
versionFlag = flag.Bool("v", false, "print version")
@@ -75,7 +76,7 @@ func main() {
7576
CORSAllowMethods: *corsMethods,
7677
CORSAllowHeaders: *corsHeaders,
7778
CORSMaxAge: *corsMaxAge,
78-
Loki: loki.NewConfig(lURL, *lokiTimeout, *lokiTenantID, *lokiSkipTLS, *lokiCAPath, *lokiMock, strings.Split(lLabels, ",")),
79+
Loki: loki.NewConfig(lURL, *lokiTimeout, *lokiTenantID, *lokiSkipTLS, *lokiCAPath, *lokiMock, *ingressMatcher, strings.Split(lLabels, ",")),
7980
FrontendConfig: *frontendConfig,
8081
})
8182
}

pkg/handler/export.go

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -16,8 +16,8 @@ const (
1616
exportcolumnsKey = "columns"
1717
)
1818

19-
func ExportFlows(cfg loki.Config) func(w http.ResponseWriter, r *http.Request) {
20-
lokiClient := newLokiClient(&cfg)
19+
func ExportFlows(cfg *loki.Config) func(w http.ResponseWriter, r *http.Request) {
20+
lokiClient := newLokiClient(cfg)
2121

2222
return func(w http.ResponseWriter, r *http.Request) {
2323
var code int

pkg/handler/flows.go

Lines changed: 7 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -20,6 +20,7 @@ const (
2020
timeRangeKey = "timeRange"
2121
limitKey = "limit"
2222
reporterKey = "reporter"
23+
layerKey = "layer"
2324
filtersKey = "filters"
2425
)
2526

@@ -101,8 +102,8 @@ func getLimit(params url.Values) (string, int, error) {
101102
return limit, reqLimit, nil
102103
}
103104

104-
func GetFlows(cfg loki.Config) func(w http.ResponseWriter, r *http.Request) {
105-
lokiClient := newLokiClient(&cfg)
105+
func GetFlows(cfg *loki.Config) func(w http.ResponseWriter, r *http.Request) {
106+
lokiClient := newLokiClient(cfg)
106107

107108
return func(w http.ResponseWriter, r *http.Request) {
108109
var code int
@@ -125,7 +126,7 @@ func GetFlows(cfg loki.Config) func(w http.ResponseWriter, r *http.Request) {
125126
}
126127
}
127128

128-
func getFlows(cfg loki.Config, client httpclient.Caller, params url.Values) (*model.AggregatedQueryResponse, int, error) {
129+
func getFlows(cfg *loki.Config, client httpclient.Caller, params url.Values) (*model.AggregatedQueryResponse, int, error) {
129130
start, err := getStartTime(params)
130131
if err != nil {
131132
return nil, http.StatusBadRequest, err
@@ -139,6 +140,7 @@ func getFlows(cfg loki.Config, client httpclient.Caller, params url.Values) (*mo
139140
return nil, http.StatusBadRequest, err
140141
}
141142
reporter := params.Get(reporterKey)
143+
layer := params.Get(layerKey)
142144
rawFilters := params.Get(filtersKey)
143145
filterGroups, err := parseFilters(rawFilters)
144146
if err != nil {
@@ -150,7 +152,7 @@ func getFlows(cfg loki.Config, client httpclient.Caller, params url.Values) (*mo
150152
// match any, and multiple filters => run in parallel then aggregate
151153
var queries []string
152154
for _, group := range filterGroups {
153-
qb := loki.NewFlowQueryBuilder(&cfg, start, end, limit, reporter)
155+
qb := loki.NewFlowQueryBuilder(cfg, start, end, limit, reporter, layer)
154156
err := qb.Filters(group)
155157
if err != nil {
156158
return nil, http.StatusBadRequest, errors.New("Can't build query: " + err.Error())
@@ -163,7 +165,7 @@ func getFlows(cfg loki.Config, client httpclient.Caller, params url.Values) (*mo
163165
}
164166
} else {
165167
// else, run all at once
166-
qb := loki.NewFlowQueryBuilder(&cfg, start, end, limit, reporter)
168+
qb := loki.NewFlowQueryBuilder(cfg, start, end, limit, reporter, layer)
167169
if len(filterGroups) > 0 {
168170
err := qb.Filters(filterGroups[0])
169171
if err != nil {

pkg/handler/loki.go

Lines changed: 8 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -164,8 +164,8 @@ func fetchParallel(lokiClient httpclient.Caller, queries []string, merger loki.M
164164
return codeOut, nil
165165
}
166166

167-
func LokiReady(cfg loki.Config) func(w http.ResponseWriter, r *http.Request) {
168-
lokiClient := newLokiClient(&cfg)
167+
func LokiReady(cfg *loki.Config) func(w http.ResponseWriter, r *http.Request) {
168+
lokiClient := newLokiClient(cfg)
169169

170170
return func(w http.ResponseWriter, r *http.Request) {
171171
baseURL := strings.TrimRight(cfg.URL.String(), "/")
@@ -187,8 +187,8 @@ func LokiReady(cfg loki.Config) func(w http.ResponseWriter, r *http.Request) {
187187
}
188188
}
189189

190-
func LokiMetrics(cfg loki.Config) func(w http.ResponseWriter, r *http.Request) {
191-
lokiClient := newLokiClient(&cfg)
190+
func LokiMetrics(cfg *loki.Config) func(w http.ResponseWriter, r *http.Request) {
191+
lokiClient := newLokiClient(cfg)
192192

193193
return func(w http.ResponseWriter, r *http.Request) {
194194
baseURL := strings.TrimRight(cfg.URL.String(), "/")
@@ -203,8 +203,8 @@ func LokiMetrics(cfg loki.Config) func(w http.ResponseWriter, r *http.Request) {
203203
}
204204
}
205205

206-
func LokiBuildInfos(cfg loki.Config) func(w http.ResponseWriter, r *http.Request) {
207-
lokiClient := newLokiClient(&cfg)
206+
func LokiBuildInfos(cfg *loki.Config) func(w http.ResponseWriter, r *http.Request) {
207+
lokiClient := newLokiClient(cfg)
208208

209209
return func(w http.ResponseWriter, r *http.Request) {
210210
baseURL := strings.TrimRight(cfg.URL.String(), "/")
@@ -219,8 +219,8 @@ func LokiBuildInfos(cfg loki.Config) func(w http.ResponseWriter, r *http.Request
219219
}
220220
}
221221

222-
func LokiConfig(cfg loki.Config, param string) func(w http.ResponseWriter, r *http.Request) {
223-
lokiClient := newLokiClient(&cfg)
222+
func LokiConfig(cfg *loki.Config, param string) func(w http.ResponseWriter, r *http.Request) {
223+
lokiClient := newLokiClient(cfg)
224224

225225
return func(w http.ResponseWriter, r *http.Request) {
226226
baseURL := strings.TrimRight(cfg.URL.String(), "/")

pkg/handler/resources.go

Lines changed: 8 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -18,8 +18,8 @@ import (
1818
"github.com/netobserv/network-observability-console-plugin/pkg/utils"
1919
)
2020

21-
func GetNamespaces(cfg loki.Config) func(w http.ResponseWriter, r *http.Request) {
22-
lokiClient := newLokiClient(&cfg)
21+
func GetNamespaces(cfg *loki.Config) func(w http.ResponseWriter, r *http.Request) {
22+
lokiClient := newLokiClient(cfg)
2323
return func(w http.ResponseWriter, r *http.Request) {
2424
var code int
2525
startTime := time.Now()
@@ -31,14 +31,14 @@ func GetNamespaces(cfg loki.Config) func(w http.ResponseWriter, r *http.Request)
3131
values := []string{}
3232

3333
// Fetch and merge values for SrcK8S_Namespace and DstK8S_Namespace
34-
values1, code, err := getLabelValues(&cfg, lokiClient, fields.SrcNamespace)
34+
values1, code, err := getLabelValues(cfg, lokiClient, fields.SrcNamespace)
3535
if err != nil {
3636
writeError(w, code, "Error while fetching label source namespace values from Loki: "+err.Error())
3737
return
3838
}
3939
values = append(values, values1...)
4040

41-
values2, code, err := getLabelValues(&cfg, lokiClient, fields.DstNamespace)
41+
values2, code, err := getLabelValues(cfg, lokiClient, fields.DstNamespace)
4242
if err != nil {
4343
writeError(w, code, "Error while fetching label destination namespace values from Loki: "+err.Error())
4444
return
@@ -72,8 +72,8 @@ func getLabelValues(cfg *loki.Config, lokiClient httpclient.Caller, label string
7272
return lvr.Data, http.StatusOK, nil
7373
}
7474

75-
func GetNames(cfg loki.Config) func(w http.ResponseWriter, r *http.Request) {
76-
lokiClient := newLokiClient(&cfg)
75+
func GetNames(cfg *loki.Config) func(w http.ResponseWriter, r *http.Request) {
76+
lokiClient := newLokiClient(cfg)
7777
return func(w http.ResponseWriter, r *http.Request) {
7878
var code int
7979
startTime := time.Now()
@@ -107,7 +107,7 @@ func GetNames(cfg loki.Config) func(w http.ResponseWriter, r *http.Request) {
107107
}
108108
}
109109

110-
func getNamesForPrefix(cfg loki.Config, lokiClient httpclient.Caller, prefix, kind, namespace string) ([]string, int, error) {
110+
func getNamesForPrefix(cfg *loki.Config, lokiClient httpclient.Caller, prefix, kind, namespace string) ([]string, int, error) {
111111
lokiParams := [][]string{}
112112
if namespace != "" {
113113
lokiParams = append(lokiParams, []string{prefix + fields.Namespace, exact(namespace)})
@@ -121,7 +121,7 @@ func getNamesForPrefix(cfg loki.Config, lokiClient httpclient.Caller, prefix, ki
121121
fieldToExtract = prefix + fields.Name
122122
}
123123

124-
queryBuilder := loki.NewFlowQueryBuilderWithDefaults(&cfg)
124+
queryBuilder := loki.NewFlowQueryBuilderWithDefaults(cfg)
125125
if err := queryBuilder.Filters(lokiParams); err != nil {
126126
return nil, http.StatusBadRequest, err
127127
}

pkg/handler/resources_test.go

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -31,7 +31,7 @@ func TestGetSourceOwnerNames(t *testing.T) {
3131
url,
3232
)
3333
})
34-
_, _, _ = getNamesForPrefix(testLokiConfig, lokiClientMock, "Src", "Deployment", "default")
34+
_, _, _ = getNamesForPrefix(&testLokiConfig, lokiClientMock, "Src", "Deployment", "default")
3535

3636
lokiClientMock.AssertNumberOfCalls(t, "Get", 1)
3737
}
@@ -45,7 +45,7 @@ func TestGetDestPodNames(t *testing.T) {
4545
url,
4646
)
4747
})
48-
_, _, _ = getNamesForPrefix(testLokiConfig, lokiClientMock, "Dst", "Pod", "default")
48+
_, _, _ = getNamesForPrefix(&testLokiConfig, lokiClientMock, "Dst", "Pod", "default")
4949

5050
lokiClientMock.AssertNumberOfCalls(t, "Get", 1)
5151
}
@@ -59,7 +59,7 @@ func TestGetSourceNodeNames(t *testing.T) {
5959
url,
6060
)
6161
})
62-
_, _, _ = getNamesForPrefix(testLokiConfig, lokiClientMock, "Src", "Node", "")
62+
_, _, _ = getNamesForPrefix(&testLokiConfig, lokiClientMock, "Src", "Node", "")
6363

6464
lokiClientMock.AssertNumberOfCalls(t, "Get", 1)
6565
}

pkg/handler/topology.go

Lines changed: 8 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -19,8 +19,8 @@ const (
1919
groupsKey = "groups"
2020
)
2121

22-
func GetTopology(cfg loki.Config) func(w http.ResponseWriter, r *http.Request) {
23-
lokiClient := newLokiClient(&cfg)
22+
func GetTopology(cfg *loki.Config) func(w http.ResponseWriter, r *http.Request) {
23+
lokiClient := newLokiClient(cfg)
2424

2525
return func(w http.ResponseWriter, r *http.Request) {
2626
var code int
@@ -40,7 +40,7 @@ func GetTopology(cfg loki.Config) func(w http.ResponseWriter, r *http.Request) {
4040
}
4141
}
4242

43-
func getTopologyFlows(cfg loki.Config, client httpclient.Caller, params url.Values) (*model.AggregatedQueryResponse, int, error) {
43+
func getTopologyFlows(cfg *loki.Config, client httpclient.Caller, params url.Values) (*model.AggregatedQueryResponse, int, error) {
4444
hlog.Debugf("GetTopology query params: %s", params)
4545

4646
start, err := getStartTime(params)
@@ -58,6 +58,7 @@ func getTopologyFlows(cfg loki.Config, client httpclient.Caller, params url.Valu
5858
metricFunction := params.Get(metricFunctionKey)
5959
metricType := params.Get(metricTypeKey)
6060
reporter := params.Get(reporterKey)
61+
layer := params.Get(layerKey)
6162
scope := params.Get(scopeKey)
6263
groups := params.Get(groupsKey)
6364
rawFilters := params.Get(filtersKey)
@@ -71,7 +72,7 @@ func getTopologyFlows(cfg loki.Config, client httpclient.Caller, params url.Valu
7172
// match any, and multiple filters => run in parallel then aggregate
7273
var queries []string
7374
for _, group := range filterGroups {
74-
query, code, err := buildTopologyQuery(&cfg, group, start, end, limit, metricFunction, metricType, reporter, scope, groups)
75+
query, code, err := buildTopologyQuery(cfg, group, start, end, limit, metricFunction, metricType, reporter, layer, scope, groups)
7576
if err != nil {
7677
return nil, code, errors.New("Can't build query: " + err.Error())
7778
}
@@ -87,7 +88,7 @@ func getTopologyFlows(cfg loki.Config, client httpclient.Caller, params url.Valu
8788
if len(filterGroups) > 0 {
8889
filters = filterGroups[0]
8990
}
90-
query, code, err := buildTopologyQuery(&cfg, filters, start, end, limit, metricFunction, metricType, reporter, scope, groups)
91+
query, code, err := buildTopologyQuery(cfg, filters, start, end, limit, metricFunction, metricType, reporter, layer, scope, groups)
9192
if err != nil {
9293
return nil, code, err
9394
}
@@ -102,8 +103,8 @@ func getTopologyFlows(cfg loki.Config, client httpclient.Caller, params url.Valu
102103
return qr, http.StatusOK, nil
103104
}
104105

105-
func buildTopologyQuery(cfg *loki.Config, filters [][]string, start, end, limit, metricFunction, metricType, reporter, scope, groups string) (string, int, error) {
106-
qb, err := loki.NewTopologyQuery(cfg, start, end, limit, metricFunction, metricType, reporter, scope, groups)
106+
func buildTopologyQuery(cfg *loki.Config, filters [][]string, start, end, limit, metricFunction, metricType, reporter, layer, scope, groups string) (string, int, error) {
107+
qb, err := loki.NewTopologyQuery(cfg, start, end, limit, metricFunction, metricType, reporter, layer, scope, groups)
107108
if err != nil {
108109
return "", http.StatusBadRequest, err
109110
}

pkg/loki/config.go

Lines changed: 17 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -8,24 +8,26 @@ import (
88
)
99

1010
type Config struct {
11-
URL *url.URL
12-
Timeout time.Duration
13-
TenantID string
14-
SkipTLS bool
15-
CAPath string
16-
UseMocks bool
17-
Labels map[string]struct{}
11+
URL *url.URL
12+
Timeout time.Duration
13+
TenantID string
14+
SkipTLS bool
15+
CAPath string
16+
UseMocks bool
17+
IngressMatcher string
18+
Labels map[string]struct{}
1819
}
1920

20-
func NewConfig(url *url.URL, timeout time.Duration, tenantID string, skipTLS bool, capath string, useMocks bool, labels []string) Config {
21+
func NewConfig(url *url.URL, timeout time.Duration, tenantID string, skipTLS bool, capath string, useMocks bool, ingressMatcher string, labels []string) Config {
2122
return Config{
22-
URL: url,
23-
Timeout: timeout,
24-
TenantID: tenantID,
25-
SkipTLS: skipTLS,
26-
CAPath: capath,
27-
UseMocks: useMocks,
28-
Labels: utils.GetMapInterface(labels),
23+
URL: url,
24+
Timeout: timeout,
25+
TenantID: tenantID,
26+
SkipTLS: skipTLS,
27+
CAPath: capath,
28+
UseMocks: useMocks,
29+
IngressMatcher: ingressMatcher,
30+
Labels: utils.GetMapInterface(labels),
2931
}
3032
}
3133

pkg/loki/filter.go

Lines changed: 23 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -3,6 +3,8 @@ package loki
33
import (
44
"fmt"
55
"strings"
6+
7+
"github.com/netobserv/network-observability-console-plugin/pkg/model/fields"
68
)
79

810
// remove quotes and replace * by regex any
@@ -13,6 +15,8 @@ type labelMatcher string
1315
const (
1416
labelEqual = labelMatcher("=")
1517
labelMatches = labelMatcher("=~")
18+
//infrastructure regex match any empty namespace or starting with kube- or openshift-
19+
infrastructureRegex = `^$|(kube-|openshift-).*`
1620
)
1721

1822
type valueType int
@@ -71,6 +75,25 @@ func ipLabelFilter(labelKey, cidr string) labelFilter {
7175
}
7276
}
7377

78+
//layerLineFilter match or exclude infrastructure namespaces
79+
//keeping related kubernetes service flows
80+
func layerLineFilter(match bool, ingressMatcher string) string {
81+
var matcher string
82+
if match {
83+
matcher = `=~`
84+
} else {
85+
matcher = `!~`
86+
}
87+
88+
return `(` +
89+
`(` + fields.SrcNamespace + matcher + `"` + infrastructureRegex + `"` +
90+
`+or+` + fields.SrcNamespace + `=~"` + ingressMatcher + `")` +
91+
`+and+` +
92+
`(` + fields.DstNamespace + matcher + `"` + infrastructureRegex + `"` +
93+
`+or+` + fields.DstNamespace + `=~"` + ingressMatcher + `")` +
94+
`)`
95+
}
96+
7497
func (f *labelFilter) writeInto(sb *strings.Builder) {
7598
sb.WriteString(f.key)
7699
sb.WriteString(string(f.matcher))

0 commit comments

Comments
 (0)