Skip to content

Commit be3303e

Browse files
authored
Merge pull request #571 from jpinsonneau/dev_console_followup
NETOBSERV-1689 Allow multi-tenant Prometheus queries
2 parents 6dc3119 + 1ec4183 commit be3303e

22 files changed

+542
-197
lines changed

pkg/config/config.go

Lines changed: 10 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -33,6 +33,7 @@ type Server struct {
3333

3434
type Prometheus struct {
3535
URL string `yaml:"url" json:"url"`
36+
DevURL string `yaml:"devUrl,omitempty" json:"devUrl,omitempty"`
3637
Timeout Duration `yaml:"timeout,omitempty" json:"timeout,omitempty"`
3738
TokenPath string `yaml:"tokenPath,omitempty" json:"tokenPath,omitempty"`
3839
SkipTLS bool `yaml:"skipTls,omitempty" json:"skipTls,omitempty"`
@@ -220,7 +221,7 @@ func (c *Config) IsLokiEnabled() bool {
220221
}
221222

222223
func (c *Config) IsPromEnabled() bool {
223-
return c.Prometheus.URL != ""
224+
return c.Prometheus.URL != "" || c.Prometheus.DevURL != ""
224225
}
225226

226227
func (c *Config) Validate() error {
@@ -253,12 +254,19 @@ func (c *Config) Validate() error {
253254
}
254255

255256
if c.IsPromEnabled() {
256-
log.Infof("Prometheus is enabled (%s)", c.Prometheus.URL)
257+
log.Infof("Prometheus is enabled:\n - admin: %s\n - dev: %s\n", c.Prometheus.URL, c.Prometheus.DevURL)
257258
// parse config urls
258259
_, err := url.Parse(c.Prometheus.URL)
259260
if err != nil {
260261
configErrors = append(configErrors, "wrong Prometheus URL")
261262
}
263+
264+
if c.Prometheus.DevURL != "" {
265+
_, err := url.Parse(c.Prometheus.DevURL)
266+
if err != nil {
267+
configErrors = append(configErrors, "wrong Prometheus dev URL")
268+
}
269+
}
262270
} else {
263271
log.Info("Prometheus is disabled")
264272
}

pkg/handler/clients.go

Lines changed: 33 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -17,21 +17,31 @@ import (
1717
)
1818

1919
type clients struct {
20-
loki httpclient.Caller
21-
prom api.Client
20+
loki httpclient.Caller
21+
promAdmin api.Client
22+
promDev api.Client
2223
}
2324

24-
func newClients(cfg *config.Config, requestHeader http.Header, useLokiStatus bool) (clients, error) {
25+
func newClients(cfg *config.Config, requestHeader http.Header, useLokiStatus bool, namespace string) (clients, error) {
2526
var lokiClient httpclient.Caller
26-
var promClient api.Client
27+
var promAdminClient api.Client
28+
var promDevClient api.Client
2729
var err error
30+
2831
if cfg.IsLokiEnabled() {
2932
lokiClient = newLokiClient(&cfg.Loki, requestHeader, useLokiStatus)
3033
}
3134
if cfg.IsPromEnabled() {
32-
promClient, err = prometheus.NewClient(&cfg.Prometheus, requestHeader)
35+
promAdminClient, err = prometheus.NewAdminClient(&cfg.Prometheus, requestHeader)
36+
if err != nil {
37+
return clients{}, err
38+
}
39+
promDevClient, err = prometheus.NewDevClient(&cfg.Prometheus, requestHeader, namespace)
40+
if err != nil {
41+
return clients{}, err
42+
}
3343
}
34-
return clients{loki: lokiClient, prom: promClient}, err
44+
return clients{loki: lokiClient, promAdmin: promAdminClient, promDev: promDevClient}, err
3545
}
3646

3747
type datasourceError struct {
@@ -54,8 +64,15 @@ func (c *clients) fetchLokiSingle(logQL string, merger loki.Merger) (int, error)
5464
return code, nil
5565
}
5666

57-
func (c *clients) fetchPrometheusSingle(ctx context.Context, promQL *prometheus.Query, merger loki.Merger) (int, error) {
58-
qr, code, err := prometheus.QueryMatrix(ctx, c.prom, promQL)
67+
func (c *clients) getPromClient(isDev bool) api.Client {
68+
if isDev {
69+
return c.promDev
70+
}
71+
return c.promAdmin
72+
}
73+
74+
func (c *clients) fetchPrometheusSingle(ctx context.Context, promQL *prometheus.Query, merger loki.Merger, client api.Client) (int, error) {
75+
qr, code, err := prometheus.QueryMatrix(ctx, client, promQL)
5976
if err != nil {
6077
return code, &datasourceError{datasource: constants.DataSourceProm, nested: err}
6178
}
@@ -65,26 +82,28 @@ func (c *clients) fetchPrometheusSingle(ctx context.Context, promQL *prometheus.
6582
return code, nil
6683
}
6784

68-
func (c *clients) fetchSingle(ctx context.Context, logQL string, promQL *prometheus.Query, merger loki.Merger) (int, error) {
85+
func (c *clients) fetchSingle(ctx context.Context, logQL string, promQL *prometheus.Query, merger loki.Merger, isDev bool) (int, error) {
6986
if promQL != nil {
70-
if c.prom == nil {
87+
client := c.getPromClient(isDev)
88+
if client == nil {
7189
return http.StatusBadRequest, fmt.Errorf("cannot execute the following Prometheus query: Prometheus is disabled: %v", promQL.PromQL)
7290
}
73-
return c.fetchPrometheusSingle(ctx, promQL, merger)
91+
return c.fetchPrometheusSingle(ctx, promQL, merger, client)
7492
}
7593
if c.loki == nil {
7694
return http.StatusBadRequest, fmt.Errorf("cannot execute the following Loki query: Loki is disabled: %v", logQL)
7795
}
7896
return c.fetchLokiSingle(logQL, merger)
7997
}
8098

81-
func (c *clients) fetchParallel(ctx context.Context, logQL []string, promQL []*prometheus.Query, merger loki.Merger) (int, error) {
99+
func (c *clients) fetchParallel(ctx context.Context, logQL []string, promQL []*prometheus.Query, merger loki.Merger, isDev bool) (int, error) {
82100
if c.loki == nil && len(logQL) > 0 {
83101
hlog.Errorf("Cannot execute the following Loki queries: Loki is disabled: %v", logQL)
84102
logQL = nil
85103
}
86104

87-
if c.prom == nil && len(promQL) > 0 {
105+
promClient := c.getPromClient(isDev)
106+
if promClient == nil && len(promQL) > 0 {
88107
hlog.Errorf("Cannot execute the following Prometheus queries: Prometheus is disabled: %v", promQL[0].PromQL)
89108
promQL = nil
90109
}
@@ -115,7 +134,7 @@ func (c *clients) fetchParallel(ctx context.Context, logQL []string, promQL []*p
115134
for _, q := range promQL {
116135
go func(query *prometheus.Query) {
117136
defer wg.Done()
118-
qr, code, err := prometheus.QueryMatrix(ctx, c.prom, query)
137+
qr, code, err := prometheus.QueryMatrix(ctx, promClient, query)
119138
if err != nil {
120139
errChan <- errorWithCode{err: &datasourceError{datasource: constants.DataSourceProm, nested: err}, code: code}
121140
} else {

pkg/handler/flows.go

Lines changed: 16 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -12,6 +12,7 @@ import (
1212
"github.com/netobserv/network-observability-console-plugin/pkg/loki"
1313
"github.com/netobserv/network-observability-console-plugin/pkg/metrics"
1414
"github.com/netobserv/network-observability-console-plugin/pkg/model"
15+
"github.com/netobserv/network-observability-console-plugin/pkg/model/fields"
1516
"github.com/netobserv/network-observability-console-plugin/pkg/model/filters"
1617
"github.com/netobserv/network-observability-console-plugin/pkg/utils/constants"
1718
)
@@ -26,6 +27,7 @@ const (
2627
dataSourceKey = "dataSource"
2728
filtersKey = "filters"
2829
packetLossKey = "packetLoss"
30+
namespaceKey = "namespace"
2931
)
3032

3133
func (h *Handlers) GetFlows(ctx context.Context) func(w http.ResponseWriter, r *http.Request) {
@@ -81,11 +83,23 @@ func (h *Handlers) getFlows(ctx context.Context, lokiClient httpclient.Caller, p
8183
if err != nil {
8284
return nil, http.StatusBadRequest, err
8385
}
86+
namespace := params.Get(namespaceKey)
87+
isDev := namespace != ""
8488
rawFilters := params.Get(filtersKey)
8589
filterGroups, err := filters.Parse(rawFilters)
8690
if err != nil {
8791
return nil, http.StatusBadRequest, err
8892
}
93+
if namespace != "" {
94+
// TODO: this should actually be managed from the loki gateway, with "namespace" query param
95+
filterGroups = filterGroups.Distribute(
96+
[]filters.SingleQuery{
97+
{filters.NewMatch(fields.SrcNamespace, `"`+namespace+`"`)},
98+
{filters.NewMatch(fields.DstNamespace, `"`+namespace+`"`)},
99+
},
100+
func(sq filters.SingleQuery) bool { return false },
101+
)
102+
}
89103

90104
cl := clients{loki: lokiClient}
91105
merger := loki.NewStreamMerger(reqLimit)
@@ -100,7 +114,7 @@ func (h *Handlers) getFlows(ctx context.Context, lokiClient httpclient.Caller, p
100114
}
101115
queries = append(queries, qb.Build())
102116
}
103-
code, err := cl.fetchParallel(ctx, queries, nil, merger)
117+
code, err := cl.fetchParallel(ctx, queries, nil, merger, isDev)
104118
if err != nil {
105119
return nil, code, err
106120
}
@@ -114,7 +128,7 @@ func (h *Handlers) getFlows(ctx context.Context, lokiClient httpclient.Caller, p
114128
}
115129
}
116130
query := qb.Build()
117-
code, err := cl.fetchSingle(ctx, query, nil, merger)
131+
code, err := cl.fetchSingle(ctx, query, nil, merger, isDev)
118132
if err != nil {
119133
return nil, code, err
120134
}

pkg/handler/resources.go

Lines changed: 19 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -7,8 +7,6 @@ import (
77
"net/http"
88
"time"
99

10-
"github.com/gorilla/mux"
11-
1210
"github.com/netobserv/network-observability-console-plugin/pkg/metrics"
1311
"github.com/netobserv/network-observability-console-plugin/pkg/model"
1412
"github.com/netobserv/network-observability-console-plugin/pkg/model/fields"
@@ -19,7 +17,10 @@ import (
1917

2018
func (h *Handlers) GetClusters(ctx context.Context) func(w http.ResponseWriter, r *http.Request) {
2119
return func(w http.ResponseWriter, r *http.Request) {
22-
clients, err := newClients(h.Cfg, r.Header, false)
20+
params := r.URL.Query()
21+
namespace := params.Get(namespaceKey)
22+
23+
clients, err := newClients(h.Cfg, r.Header, false, namespace)
2324
if err != nil {
2425
writeError(w, http.StatusInternalServerError, err.Error())
2526
return
@@ -44,7 +45,10 @@ func (h *Handlers) GetClusters(ctx context.Context) func(w http.ResponseWriter,
4445

4546
func (h *Handlers) GetZones(ctx context.Context) func(w http.ResponseWriter, r *http.Request) {
4647
return func(w http.ResponseWriter, r *http.Request) {
47-
clients, err := newClients(h.Cfg, r.Header, false)
48+
params := r.URL.Query()
49+
namespace := params.Get(namespaceKey)
50+
51+
clients, err := newClients(h.Cfg, r.Header, false, namespace)
4852
if err != nil {
4953
writeError(w, http.StatusInternalServerError, err.Error())
5054
return
@@ -80,7 +84,10 @@ func (h *Handlers) GetZones(ctx context.Context) func(w http.ResponseWriter, r *
8084

8185
func (h *Handlers) GetNamespaces(ctx context.Context) func(w http.ResponseWriter, r *http.Request) {
8286
return func(w http.ResponseWriter, r *http.Request) {
83-
clients, err := newClients(h.Cfg, r.Header, false)
87+
params := r.URL.Query()
88+
namespace := params.Get(namespaceKey)
89+
90+
clients, err := newClients(h.Cfg, r.Header, false, namespace)
8491
if err != nil {
8592
writeError(w, http.StatusInternalServerError, err.Error())
8693
return
@@ -116,7 +123,7 @@ func (h *Handlers) GetNamespaces(ctx context.Context) func(w http.ResponseWriter
116123

117124
func (h *Handlers) getLabelValues(ctx context.Context, cl clients, label string) ([]string, int, error) {
118125
if h.PromInventory != nil && h.PromInventory.LabelExists(label) {
119-
return prometheus.GetLabelValues(ctx, cl.prom, label, nil)
126+
return prometheus.GetLabelValues(ctx, cl.promAdmin, label, nil)
120127
}
121128
if h.Cfg.IsLokiEnabled() {
122129
return getLokiLabelValues(h.Cfg.Loki.URL, cl.loki, label)
@@ -127,7 +134,11 @@ func (h *Handlers) getLabelValues(ctx context.Context, cl clients, label string)
127134

128135
func (h *Handlers) GetNames(ctx context.Context) func(w http.ResponseWriter, r *http.Request) {
129136
return func(w http.ResponseWriter, r *http.Request) {
130-
clients, err := newClients(h.Cfg, r.Header, false)
137+
params := r.URL.Query()
138+
namespace := params.Get(namespaceKey)
139+
kind := params.Get("kind")
140+
141+
clients, err := newClients(h.Cfg, r.Header, false, namespace)
131142
if err != nil {
132143
writeError(w, http.StatusInternalServerError, err.Error())
133144
return
@@ -137,9 +148,6 @@ func (h *Handlers) GetNames(ctx context.Context) func(w http.ResponseWriter, r *
137148
defer func() {
138149
metrics.ObserveHTTPCall("GetNames", code, startTime)
139150
}()
140-
params := mux.Vars(r)
141-
namespace := params["namespace"]
142-
kind := params["kind"]
143151

144152
// Initialize names explicitly to avoid null json when empty
145153
names := []string{}
@@ -181,7 +189,7 @@ func (h *Handlers) getNamesForPrefix(ctx context.Context, cl clients, prefix, ki
181189
if h.Cfg.IsPromEnabled() {
182190
// Label match query (any metric)
183191
q := prometheus.QueryFilters("", filts)
184-
return prometheus.GetLabelValues(ctx, cl.prom, searchField, []string{q})
192+
return prometheus.GetLabelValues(ctx, cl.promAdmin, searchField, []string{q})
185193
}
186194
return getLokiNamesForPrefix(&h.Cfg.Loki, cl.loki, filts, searchField)
187195
}

0 commit comments

Comments
 (0)