Skip to content

Commit 2168588

Browse files
authored
Merge pull request #3 from jotak/dev-console
Inject namespace label in dev console queries for Loki
2 parents 8ff2a47 + 5f5d4fb commit 2168588

File tree

8 files changed

+249
-108
lines changed

8 files changed

+249
-108
lines changed

pkg/handler/flows.go

Lines changed: 12 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -12,6 +12,7 @@ import (
1212
"github.com/netobserv/network-observability-console-plugin/pkg/loki"
1313
"github.com/netobserv/network-observability-console-plugin/pkg/metrics"
1414
"github.com/netobserv/network-observability-console-plugin/pkg/model"
15+
"github.com/netobserv/network-observability-console-plugin/pkg/model/fields"
1516
"github.com/netobserv/network-observability-console-plugin/pkg/model/filters"
1617
"github.com/netobserv/network-observability-console-plugin/pkg/utils/constants"
1718
)
@@ -85,10 +86,20 @@ func (h *Handlers) getFlows(ctx context.Context, lokiClient httpclient.Caller, p
8586
namespace := params.Get(namespaceKey)
8687
isDev := namespace != ""
8788
rawFilters := params.Get(filtersKey)
88-
filterGroups, err := filters.Parse(rawFilters, namespace)
89+
filterGroups, err := filters.Parse(rawFilters)
8990
if err != nil {
9091
return nil, http.StatusBadRequest, err
9192
}
93+
if namespace != "" {
94+
// TODO: this should actually be managed from the loki gateway, with "namespace" query param
95+
filterGroups = filterGroups.Distribute(
96+
[]filters.SingleQuery{
97+
{filters.NewMatch(fields.SrcNamespace, `"`+namespace+`"`)},
98+
{filters.NewMatch(fields.DstNamespace, `"`+namespace+`"`)},
99+
},
100+
func(sq filters.SingleQuery) bool { return false },
101+
)
102+
}
92103

93104
cl := clients{loki: lokiClient}
94105
merger := loki.NewStreamMerger(reqLimit)

pkg/handler/topology.go

Lines changed: 54 additions & 20 deletions
Original file line numberDiff line numberDiff line change
@@ -12,6 +12,7 @@ import (
1212
"github.com/netobserv/network-observability-console-plugin/pkg/loki"
1313
"github.com/netobserv/network-observability-console-plugin/pkg/metrics"
1414
"github.com/netobserv/network-observability-console-plugin/pkg/model"
15+
"github.com/netobserv/network-observability-console-plugin/pkg/model/fields"
1516
"github.com/netobserv/network-observability-console-plugin/pkg/model/filters"
1617
"github.com/netobserv/network-observability-console-plugin/pkg/prometheus"
1718
"github.com/netobserv/network-observability-console-plugin/pkg/utils/constants"
@@ -126,17 +127,18 @@ func (h *Handlers) extractTopologyQueryParams(params url.Values, ds constants.Da
126127
in.Groups = params.Get(groupsKey)
127128
namespace := params.Get(namespaceKey)
128129
rawFilters := params.Get(filtersKey)
129-
filterGroups, err := filters.Parse(rawFilters, namespace)
130+
filterGroups, err := filters.Parse(rawFilters)
130131
if err != nil {
131132
return nil, nil, qr, reqLimit, err
132133
}
133134

134135
if shouldMergeReporters(in.DataField) {
135-
filterGroups = expandReportersMergeQueries(
136+
filterGroups = expandQueries(
136137
filterGroups,
138+
namespace,
137139
func(filters filters.SingleQuery) bool {
138140
// Do not expand if this is managed from prometheus
139-
sr, _ := getEligiblePromMetric(h.PromInventory, filters, &in)
141+
sr, _ := getEligiblePromMetric(h.PromInventory, filters, &in, namespace != "")
140142
return sr != nil && len(sr.Found) > 0
141143
},
142144
)
@@ -164,7 +166,7 @@ func (h *Handlers) getTopologyFlows(ctx context.Context, cl clients, params url.
164166
var lokiQ []string
165167
var promQ []*prometheus.Query
166168
for _, filters := range filterGroups {
167-
lq, pq, code, err := buildTopologyQuery(h.Cfg, h.PromInventory, filters, in, &qr)
169+
lq, pq, code, err := buildTopologyQuery(h.Cfg, h.PromInventory, filters, in, &qr, isDev)
168170
if err != nil {
169171
return nil, code, errors.New("Can't build query: " + err.Error())
170172
}
@@ -186,7 +188,7 @@ func (h *Handlers) getTopologyFlows(ctx context.Context, cl clients, params url.
186188
if len(filterGroups) > 0 {
187189
filters = filterGroups[0]
188190
}
189-
lokiQ, promQ, code, err := buildTopologyQuery(h.Cfg, h.PromInventory, filters, in, &qr)
191+
lokiQ, promQ, code, err := buildTopologyQuery(h.Cfg, h.PromInventory, filters, in, &qr, isDev)
190192
if err != nil {
191193
return nil, code, err
192194
}
@@ -218,23 +220,51 @@ func shouldMergeReporters(metricType string) bool {
218220
return metricType == constants.MetricTypeBytes || metricType == constants.MetricTypePackets
219221
}
220222

221-
func expandReportersMergeQueries(queries filters.MultiQueries, isForProm func(filters filters.SingleQuery) bool) filters.MultiQueries {
222-
var out filters.MultiQueries
223-
for _, q := range queries {
224-
// Do not expand if this is managed from prometheus
223+
func expandQueries(queries filters.MultiQueries, namespace string, isForProm func(filters filters.SingleQuery) bool) filters.MultiQueries {
224+
// First, expand for reporter merge:
225+
226+
// The rationale here is that most traffic is duplicated from ingress and egress PoV, except cluster-external traffic.
227+
// Ingress traffic will also contains pktDrop and DNS responses.
228+
// Merging is done by running a first query with FlowDirection=INGRESS and another with FlowDirection=EGRESS AND DstOwnerName is empty,
229+
// which stands for cluster-external.
230+
// (Note that we use DstOwnerName both as an optimization as it's a Loki index,
231+
// and as convenience because looking for empty fields won't work if they aren't indexed)
232+
q1 := filters.SingleQuery{
233+
filters.NewMatch(fields.FlowDirection, `"`+string(constants.Ingress)+`","`+string(constants.Inner)+`"`),
234+
}
235+
q2 := filters.SingleQuery{
236+
filters.NewMatch(fields.FlowDirection, `"`+string(constants.Egress)+`"`),
237+
filters.NewMatch(fields.DstOwnerName, `""`),
238+
}
239+
240+
shouldSkip := func(q filters.SingleQuery) bool {
225241
if isForProm(q) {
226-
out = append(out, q)
227-
continue
242+
return true
228243
}
229-
q1, q2 := filters.SplitForReportersMerge(q)
230-
if q1 != nil {
231-
out = append(out, q1)
232-
}
233-
if q2 != nil {
234-
out = append(out, q2)
244+
// If FlowDirection is enforced, skip merging both reporters
245+
for _, m := range q {
246+
if m.Key == fields.FlowDirection {
247+
return true
248+
}
235249
}
250+
return false
236251
}
237-
return out
252+
253+
expanded := queries.Distribute([]filters.SingleQuery{q1, q2}, shouldSkip)
254+
255+
// Then, expand for namespace
256+
if namespace != "" {
257+
// TODO: this should actually be managed from the loki gateway, with "namespace" query param
258+
expanded = expanded.Distribute(
259+
[]filters.SingleQuery{
260+
{filters.NewMatch(fields.SrcNamespace, `"`+namespace+`"`)},
261+
{filters.NewMatch(fields.DstNamespace, `"`+namespace+`"`)},
262+
},
263+
isForProm,
264+
)
265+
}
266+
267+
return expanded
238268
}
239269

240270
func buildTopologyQuery(
@@ -243,8 +273,9 @@ func buildTopologyQuery(
243273
filters filters.SingleQuery,
244274
in *loki.TopologyInput,
245275
qr *v1.Range,
276+
isDev bool,
246277
) (string, *prometheus.Query, int, error) {
247-
search, unsupportedReason := getEligiblePromMetric(promInventory, filters, in)
278+
search, unsupportedReason := getEligiblePromMetric(promInventory, filters, in, isDev)
248279
if unsupportedReason != "" {
249280
hlog.Debugf("Unsupported Prometheus query; reason: %s.", unsupportedReason)
250281
} else if search != nil && len(search.Found) > 0 {
@@ -289,7 +320,7 @@ func buildTopologyQuery(
289320
return EncodeQuery(qb.Build()), nil, http.StatusOK, nil
290321
}
291322

292-
func getEligiblePromMetric(promInventory *prometheus.Inventory, filters filters.SingleQuery, in *loki.TopologyInput) (*prometheus.SearchResult, string) {
323+
func getEligiblePromMetric(promInventory *prometheus.Inventory, filters filters.SingleQuery, in *loki.TopologyInput, isDev bool) (*prometheus.SearchResult, string) {
293324
if in.DataSource != constants.DataSourceAuto && in.DataSource != constants.DataSourceProm {
294325
return nil, ""
295326
}
@@ -306,6 +337,9 @@ func getEligiblePromMetric(promInventory *prometheus.Inventory, filters filters.
306337
return nil, unsupportedReason
307338
}
308339
labelsNeeded = append(labelsNeeded, fromFilters...)
340+
if isDev {
341+
labelsNeeded = append(labelsNeeded, fields.SrcNamespace)
342+
}
309343

310344
// Search for such metric
311345
r := promInventory.Search(labelsNeeded, in.DataField)

pkg/handler/topology_test.go

Lines changed: 118 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,118 @@
1+
package handler
2+
3+
import (
4+
"testing"
5+
6+
"github.com/netobserv/network-observability-console-plugin/pkg/model/filters"
7+
"github.com/netobserv/network-observability-console-plugin/pkg/utils/constants"
8+
"github.com/stretchr/testify/assert"
9+
)
10+
11+
func TestSplitForReportersMerge_NoSplit(t *testing.T) {
12+
mq := filters.MultiQueries{filters.SingleQuery{filters.NewMatch("srcns", "a"), filters.NewMatch("FlowDirection", string(constants.Ingress))}}
13+
res := expandQueries(mq, "", func(filters.SingleQuery) bool { return false })
14+
assert.Len(t, res, 1)
15+
assert.Equal(t, filters.SingleQuery{
16+
filters.NewMatch("srcns", "a"),
17+
filters.NewMatch("FlowDirection", string(constants.Ingress)),
18+
}, res[0])
19+
}
20+
21+
func TestSplitForReportersMerge(t *testing.T) {
22+
mq := filters.MultiQueries{filters.SingleQuery{filters.NewMatch("srcns", "a"), filters.NewMatch("dstns", "b")}}
23+
res := expandQueries(mq, "", func(filters.SingleQuery) bool { return false })
24+
25+
assert.Len(t, res, 2)
26+
assert.Equal(t, filters.SingleQuery{
27+
filters.NewMatch("FlowDirection", `"`+string(constants.Ingress)+`","`+string(constants.Inner)+`"`),
28+
filters.NewMatch("srcns", "a"),
29+
filters.NewMatch("dstns", "b"),
30+
}, res[0])
31+
assert.Equal(t, filters.SingleQuery{
32+
filters.NewMatch("FlowDirection", `"`+string(constants.Egress)+`"`),
33+
filters.NewMatch("DstK8S_OwnerName", `""`),
34+
filters.NewMatch("srcns", "a"),
35+
filters.NewMatch("dstns", "b"),
36+
}, res[1])
37+
}
38+
39+
func TestExpand_ComplexQuery(t *testing.T) {
40+
mq := filters.MultiQueries{
41+
filters.SingleQuery{filters.NewMatch("key1", "a"), filters.NewMatch("FlowDirection", string(constants.Ingress))},
42+
filters.SingleQuery{filters.NewMatch("key1", "a"), filters.NewMatch("key2", "b")},
43+
filters.SingleQuery{filters.NewMatch("prom-handled", "a")},
44+
filters.SingleQuery{filters.NewMatch("key1", "c"), filters.NewMatch("key2", "d")},
45+
}
46+
res := expandQueries(mq, "my-namespace", func(q filters.SingleQuery) bool { return q[0].Key == "prom-handled" })
47+
48+
assert.Len(t, res, 11)
49+
// First is unchanged for reporters, because FlowDirection is forced, but namespaces are injected
50+
assert.Equal(t, filters.SingleQuery{
51+
filters.NewMatch("SrcK8S_Namespace", `"my-namespace"`),
52+
filters.NewMatch("key1", "a"),
53+
filters.NewMatch("FlowDirection", string(constants.Ingress)),
54+
}, res[0])
55+
assert.Equal(t, filters.SingleQuery{
56+
filters.NewMatch("DstK8S_Namespace", `"my-namespace"`),
57+
filters.NewMatch("key1", "a"),
58+
filters.NewMatch("FlowDirection", string(constants.Ingress)),
59+
}, res[1])
60+
// Second is expanded into 3rd+4th+5th+6th
61+
assert.Equal(t, filters.SingleQuery{
62+
filters.NewMatch("SrcK8S_Namespace", `"my-namespace"`),
63+
filters.NewMatch("FlowDirection", `"`+string(constants.Ingress)+`","`+string(constants.Inner)+`"`),
64+
filters.NewMatch("key1", "a"),
65+
filters.NewMatch("key2", "b"),
66+
}, res[2])
67+
assert.Equal(t, filters.SingleQuery{
68+
filters.NewMatch("DstK8S_Namespace", `"my-namespace"`),
69+
filters.NewMatch("FlowDirection", `"`+string(constants.Ingress)+`","`+string(constants.Inner)+`"`),
70+
filters.NewMatch("key1", "a"),
71+
filters.NewMatch("key2", "b"),
72+
}, res[3])
73+
assert.Equal(t, filters.SingleQuery{
74+
filters.NewMatch("SrcK8S_Namespace", `"my-namespace"`),
75+
filters.NewMatch("FlowDirection", `"`+string(constants.Egress)+`"`),
76+
filters.NewMatch("DstK8S_OwnerName", `""`),
77+
filters.NewMatch("key1", "a"),
78+
filters.NewMatch("key2", "b"),
79+
}, res[4])
80+
assert.Equal(t, filters.SingleQuery{
81+
filters.NewMatch("DstK8S_Namespace", `"my-namespace"`),
82+
filters.NewMatch("FlowDirection", `"`+string(constants.Egress)+`"`),
83+
filters.NewMatch("DstK8S_OwnerName", `""`),
84+
filters.NewMatch("key1", "a"),
85+
filters.NewMatch("key2", "b"),
86+
}, res[5])
87+
// Third is unchanged, because it's prom-handled
88+
assert.Equal(t, filters.SingleQuery{
89+
filters.NewMatch("prom-handled", "a"),
90+
}, res[6])
91+
// Fourth is expanded into 8th+9th+10th+11th
92+
assert.Equal(t, filters.SingleQuery{
93+
filters.NewMatch("SrcK8S_Namespace", `"my-namespace"`),
94+
filters.NewMatch("FlowDirection", `"`+string(constants.Ingress)+`","`+string(constants.Inner)+`"`),
95+
filters.NewMatch("key1", "c"),
96+
filters.NewMatch("key2", "d"),
97+
}, res[7])
98+
assert.Equal(t, filters.SingleQuery{
99+
filters.NewMatch("DstK8S_Namespace", `"my-namespace"`),
100+
filters.NewMatch("FlowDirection", `"`+string(constants.Ingress)+`","`+string(constants.Inner)+`"`),
101+
filters.NewMatch("key1", "c"),
102+
filters.NewMatch("key2", "d"),
103+
}, res[8])
104+
assert.Equal(t, filters.SingleQuery{
105+
filters.NewMatch("SrcK8S_Namespace", `"my-namespace"`),
106+
filters.NewMatch("FlowDirection", `"`+string(constants.Egress)+`"`),
107+
filters.NewMatch("DstK8S_OwnerName", `""`),
108+
filters.NewMatch("key1", "c"),
109+
filters.NewMatch("key2", "d"),
110+
}, res[9])
111+
assert.Equal(t, filters.SingleQuery{
112+
filters.NewMatch("DstK8S_Namespace", `"my-namespace"`),
113+
filters.NewMatch("FlowDirection", `"`+string(constants.Egress)+`"`),
114+
filters.NewMatch("DstK8S_OwnerName", `""`),
115+
filters.NewMatch("key1", "c"),
116+
filters.NewMatch("key2", "d"),
117+
}, res[10])
118+
}

pkg/loki/flow_query.go

Lines changed: 1 addition & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -108,10 +108,7 @@ func (q *FlowQueryBuilder) Filters(queryFilters filters.SingleQuery) error {
108108
}
109109

110110
func (q *FlowQueryBuilder) addFilter(filter filters.Match) error {
111-
// namespace filtering is managed by loki gateway so we can simply skip it
112-
if filter.Key == "namespace" {
113-
return nil
114-
} else if !filterRegexpValidation.MatchString(filter.Values) {
111+
if !filterRegexpValidation.MatchString(filter.Values) {
115112
return fmt.Errorf("unauthorized sign in flows request: %s", filter.Values)
116113
}
117114

pkg/model/filters/filters.go

Lines changed: 19 additions & 36 deletions
Original file line numberDiff line numberDiff line change
@@ -1,16 +1,12 @@
11
package filters
22

33
import (
4-
"fmt"
54
"net/url"
65
"strings"
7-
8-
"github.com/netobserv/network-observability-console-plugin/pkg/model/fields"
9-
"github.com/netobserv/network-observability-console-plugin/pkg/utils/constants"
106
)
117

128
// MultiQueries is an union group of singleQueries (OR'ed)
13-
type MultiQueries = []SingleQuery
9+
type MultiQueries []SingleQuery
1410

1511
// singleQuery is an intersect group of matches (AND'ed)
1612
type SingleQuery = []Match
@@ -38,7 +34,7 @@ func NewMoreThanOrEqualMatch(key, values string) Match {
3834
// | | '--- Per-label OR: "foo" must have value "a" OR "b"
3935
// | '----- In-group AND: "foo" must be "a" or "b" AND "bar" must be "c"
4036
// '------- All groups OR: "foo" must be "a" or "b" AND "bar" must be "c", OR "baz" must be "d"
41-
func Parse(raw string, namespace string) (MultiQueries, error) {
37+
func Parse(raw string) (MultiQueries, error) {
4238
var parsed []SingleQuery
4339
decoded, err := url.QueryUnescape(raw)
4440
if err != nil {
@@ -60,49 +56,36 @@ func Parse(raw string, namespace string) (MultiQueries, error) {
6056
}
6157
}
6258
}
63-
// append namespace filter for developer view to retreive compatible metrics
64-
if namespace != "" {
65-
andFilters = append(andFilters, NewMatch("namespace", fmt.Sprintf(`"%s"`, namespace)))
66-
}
6759
parsed = append(parsed, andFilters)
6860
}
6961
return parsed, nil
7062
}
7163

72-
func SplitForReportersMerge(q SingleQuery) (SingleQuery, SingleQuery) {
73-
// If FlowDirection is enforced, skip merging both reporters
74-
for _, m := range q {
75-
if m.Key == fields.FlowDirection {
76-
return q, nil
64+
// Distribute allows to inject and "expand" queries with new filters.
65+
// For example, say we have an initial query `q` with just "{{src-name=foo}}" and we want to enforce source OR dest namespace being "my-namespace". We'd write:
66+
// `q.Distribute({{src-namespace="my-namespace"}, {dst-namespace="my-namespace"}})`
67+
// Which results in: "{{src-namespace="my-namespace", src-name=foo}, {dst-namespace="my-namespace", src-name=foo}}"
68+
func (m MultiQueries) Distribute(toDistribute []SingleQuery, ignorePred func(SingleQuery) bool) MultiQueries {
69+
result := MultiQueries{}
70+
for _, qOrig := range m {
71+
if ignorePred(qOrig) {
72+
result = append(result, qOrig)
73+
continue
74+
}
75+
for _, qToDistribute := range toDistribute {
76+
qDistributed := qToDistribute
77+
qDistributed = append(qDistributed, qOrig...)
78+
result = append(result, qDistributed)
7779
}
7880
}
79-
// The rationale here is that most traffic is duplicated from ingress and egress PoV, except cluster-external traffic.
80-
// Ingress traffic will also contains pktDrop and DNS responses.
81-
// Merging is done by running a first query with FlowDirection=INGRESS and another with FlowDirection=EGRESS AND DstOwnerName is empty,
82-
// which stands for cluster-external.
83-
// (Note that we use DstOwnerName both as an optimization as it's a Loki index,
84-
// and as convenience because looking for empty fields won't work if they aren't indexed)
85-
q1 := SingleQuery{
86-
NewMatch(fields.FlowDirection, `"`+string(constants.Ingress)+`","`+string(constants.Inner)+`"`),
87-
}
88-
q2 := SingleQuery{
89-
NewMatch(fields.FlowDirection, `"`+string(constants.Egress)+`"`),
90-
NewMatch(fields.DstOwnerName, `""`),
91-
}
92-
for _, m := range q {
93-
q1 = append(q1, m)
94-
q2 = append(q2, m)
95-
}
96-
return q1, q2
81+
return result
9782
}
9883

9984
func (m *Match) ToLabelFilter() (LabelFilter, bool) {
10085
values := strings.Split(m.Values, ",")
10186
if len(values) == 1 && isExactMatch(values[0]) {
10287
// namespace must be exact match
103-
if m.Key == "namespace" {
104-
return StringMatchLabelFilter(m.Key, trimExactMatch(values[0])), true
105-
} else if m.Not {
88+
if m.Not {
10689
return NotStringLabelFilter(m.Key, trimExactMatch(values[0])), true
10790
} else if m.MoreThanOrEqual {
10891
return MoreThanNumberLabelFilter(m.Key, trimExactMatch(values[0])), true

0 commit comments

Comments
 (0)