Skip to content
20 changes: 15 additions & 5 deletions pkg/handler/clients.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,14 +23,16 @@ type clients struct {
}

func newClients(cfg *config.Config, requestHeader http.Header, useLokiStatus bool, namespace string) (clients, error) {
var lokiClient httpclient.Caller
lokiClients := newLokiClients(cfg, requestHeader, useLokiStatus)
promClients, err := newPromClients(cfg, requestHeader, namespace)
return clients{loki: lokiClients.loki, promAdmin: promClients.promAdmin, promDev: promClients.promDev}, err
}

func newPromClients(cfg *config.Config, requestHeader http.Header, namespace string) (clients, error) {
var promAdminClient api.Client
var promDevClient api.Client
var err error

if cfg.IsLokiEnabled() {
lokiClient = newLokiClient(&cfg.Loki, requestHeader, useLokiStatus)
}
if cfg.IsPromEnabled() {
promAdminClient, err = prometheus.NewAdminClient(&cfg.Prometheus, requestHeader)
if err != nil {
Expand All @@ -41,7 +43,15 @@ func newClients(cfg *config.Config, requestHeader http.Header, useLokiStatus boo
return clients{}, err
}
}
return clients{loki: lokiClient, promAdmin: promAdminClient, promDev: promDevClient}, err
return clients{promAdmin: promAdminClient, promDev: promDevClient}, err
}

func newLokiClients(cfg *config.Config, requestHeader http.Header, useLokiStatus bool) clients {
var lokiClient httpclient.Caller
if cfg.IsLokiEnabled() {
lokiClient = newLokiClient(&cfg.Loki, requestHeader, useLokiStatus)
}
return clients{loki: lokiClient}
}

type datasourceError struct {
Expand Down
14 changes: 8 additions & 6 deletions pkg/handler/loki.go
Original file line number Diff line number Diff line change
Expand Up @@ -135,7 +135,7 @@ func executeLokiQuery(flowsURL string, lokiClient httpclient.Caller) ([]byte, in
}
if code != http.StatusOK {
newCode, msg := getLokiError(resp, code)
return nil, newCode, fmt.Errorf("[%d] %s", code, msg)
return nil, newCode, fmt.Errorf("Error from Loki query: [%d] %s", code, msg)
}
return resp, http.StatusOK, nil
}
Expand Down Expand Up @@ -191,21 +191,23 @@ func getLokiNamesForPrefix(cfg *config.Loki, lokiClient httpclient.Caller, filts
return values, http.StatusOK, nil
}

func (h *Handlers) getLokiStatus(r *http.Request) ([]byte, int, error) {
lokiClient := newLokiClient(&h.Cfg.Loki, r.Header, true)
baseURL := strings.TrimRight(h.Cfg.Loki.GetStatusURL(), "/")
return executeLokiQuery(fmt.Sprintf("%s/%s", baseURL, "ready"), lokiClient)
}

func (h *Handlers) LokiReady() func(w http.ResponseWriter, r *http.Request) {
return func(w http.ResponseWriter, r *http.Request) {
if !h.Cfg.IsLokiEnabled() {
writeError(w, http.StatusBadRequest, "Loki is disabled")
return
}
lokiClient := newLokiClient(&h.Cfg.Loki, r.Header, true)
baseURL := strings.TrimRight(h.Cfg.Loki.GetStatusURL(), "/")

resp, code, err := executeLokiQuery(fmt.Sprintf("%s/%s", baseURL, "ready"), lokiClient)
resp, code, err := h.getLokiStatus(r)
if err != nil {
writeError(w, code, err.Error())
return
}

status := string(resp)
if strings.Contains(status, "ready") {
code = http.StatusOK
Expand Down
83 changes: 56 additions & 27 deletions pkg/handler/resources.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ func (h *Handlers) GetClusters(ctx context.Context) func(w http.ResponseWriter,
return func(w http.ResponseWriter, r *http.Request) {
params := r.URL.Query()
namespace := params.Get(namespaceKey)
isDev := namespace != ""

clients, err := newClients(h.Cfg, r.Header, false, namespace)
if err != nil {
Expand All @@ -32,9 +33,9 @@ func (h *Handlers) GetClusters(ctx context.Context) func(w http.ResponseWriter,
}()

// Fetch and merge values for K8S_ClusterName
values, code, err := h.getLabelValues(ctx, clients, fields.Cluster)
values, code, err := h.getLabelValues(ctx, clients, fields.Cluster, isDev)
if err != nil {
writeError(w, code, "Error while fetching label cluster values from Loki: "+err.Error())
writeError(w, code, err.Error())
return
}

Expand All @@ -47,6 +48,7 @@ func (h *Handlers) GetZones(ctx context.Context) func(w http.ResponseWriter, r *
return func(w http.ResponseWriter, r *http.Request) {
params := r.URL.Query()
namespace := params.Get(namespaceKey)
isDev := namespace != ""

clients, err := newClients(h.Cfg, r.Header, false, namespace)
if err != nil {
Expand All @@ -63,16 +65,16 @@ func (h *Handlers) GetZones(ctx context.Context) func(w http.ResponseWriter, r *
values := []string{}

// Fetch and merge values for SrcK8S_Zone and DstK8S_Zone
values1, code, err := h.getLabelValues(ctx, clients, fields.SrcZone)
values1, code, err := h.getLabelValues(ctx, clients, fields.SrcZone, isDev)
if err != nil {
writeError(w, code, "Error while fetching label source zone values from Loki: "+err.Error())
writeError(w, code, err.Error())
return
}
values = append(values, values1...)

values2, code, err := h.getLabelValues(ctx, clients, fields.DstZone)
values2, code, err := h.getLabelValues(ctx, clients, fields.DstZone, isDev)
if err != nil {
writeError(w, code, "Error while fetching label destination zone values from Loki: "+err.Error())
writeError(w, code, err.Error())
return
}
values = append(values, values2...)
Expand All @@ -82,10 +84,31 @@ func (h *Handlers) GetZones(ctx context.Context) func(w http.ResponseWriter, r *
}
}

func (h *Handlers) getNamespacesValues(ctx context.Context, clients clients, isDev bool) ([]string, int, error) {
// Initialize values explicitly to avoid null json when empty
values := []string{}

// Fetch and merge values for SrcK8S_Namespace and DstK8S_Namespace
values1, code, err := h.getLabelValues(ctx, clients, fields.SrcNamespace, isDev)
if err != nil {
return []string{}, code, err
}
values = append(values, values1...)

values2, code, err := h.getLabelValues(ctx, clients, fields.DstNamespace, isDev)
if err != nil {
return []string{}, code, err
}
values = append(values, values2...)

return values, http.StatusOK, nil
}

func (h *Handlers) GetNamespaces(ctx context.Context) func(w http.ResponseWriter, r *http.Request) {
return func(w http.ResponseWriter, r *http.Request) {
params := r.URL.Query()
namespace := params.Get(namespaceKey)
isDev := namespace != ""

clients, err := newClients(h.Cfg, r.Header, false, namespace)
if err != nil {
Expand All @@ -98,35 +121,41 @@ func (h *Handlers) GetNamespaces(ctx context.Context) func(w http.ResponseWriter
metrics.ObserveHTTPCall("GetNamespaces", code, startTime)
}()

// Initialize values explicitly to avoid null json when empty
values := []string{}

// Fetch and merge values for SrcK8S_Namespace and DstK8S_Namespace
values1, code, err := h.getLabelValues(ctx, clients, fields.SrcNamespace)
values, code, err := h.getNamespacesValues(ctx, clients, isDev)
if err != nil {
writeError(w, code, "Error while fetching label source namespace values from Loki: "+err.Error())
return
}
values = append(values, values1...)

values2, code, err := h.getLabelValues(ctx, clients, fields.DstNamespace)
if err != nil {
writeError(w, code, "Error while fetching label destination namespace values from Loki: "+err.Error())
writeError(w, code, err.Error())
return
}
values = append(values, values2...)

code = http.StatusOK
writeJSON(w, code, utils.NonEmpty(utils.Dedup(values)))
}
}

func (h *Handlers) getLabelValues(ctx context.Context, cl clients, label string) ([]string, int, error) {
func (h *Handlers) getLabelValues(ctx context.Context, cl clients, label string, isDev bool) ([]string, int, error) {
if h.PromInventory != nil && h.PromInventory.LabelExists(label) {
return prometheus.GetLabelValues(ctx, cl.promAdmin, label, nil)
client := cl.getPromClient(isDev)
if client != nil {
resp, code, err := prometheus.GetLabelValues(ctx, client, label, nil)
if err != nil {
if code == http.StatusUnauthorized || code == http.StatusForbidden {
// In case this was a prometheus 401 / 403 error, the query is repeated with Loki
// This is because multi-tenancy is currently not managed for prom datasource, hence such queries have to go with Loki
// Unfortunately we don't know a safe and generic way to pre-flight check if the user will be authorized
hlog.Info("Retrying with Loki...")
// continuing with loki below
} else {
return nil, code, fmt.Errorf("error while fetching label %s values from Prometheus: %w", label, err)
}
} else {
return resp, code, nil
}
}
}
if h.Cfg.IsLokiEnabled() {
return getLokiLabelValues(h.Cfg.Loki.URL, cl.loki, label)
if cl.loki != nil {
resp, code, err := getLokiLabelValues(h.Cfg.Loki.URL, cl.loki, label)
if err != nil {
return nil, code, fmt.Errorf("error while fetching label %s values from Loki: %w", label, err)
}
return resp, code, nil
}
// Loki disabled AND label not managed in metrics => send an error
return nil, http.StatusBadRequest, fmt.Errorf("label %s not found in Prometheus metrics", label)
Expand Down Expand Up @@ -186,7 +215,7 @@ func (h *Handlers) getNamesForPrefix(ctx context.Context, cl clients, prefix, ki
searchField = prefix + fields.Name
}

if h.Cfg.IsPromEnabled() {
if h.Cfg.IsPromEnabled() && h.PromInventory.LabelExists(searchField) {
// Label match query (any metric)
q := prometheus.QueryFilters("", filts)
return prometheus.GetLabelValues(ctx, cl.promAdmin, searchField, []string{q})
Expand Down
2 changes: 1 addition & 1 deletion pkg/handler/resources_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -72,7 +72,7 @@ func TestGetLabelValues(t *testing.T) {
)
})
cl := clients{loki: lokiClientMock}
_, _, _ = h.getLabelValues(context.Background(), cl, "DstK8S_Namespace")
_, _, _ = h.getLabelValues(context.Background(), cl, "DstK8S_Namespace", false)

lokiClientMock.AssertNumberOfCalls(t, "Get", 1)
}
71 changes: 65 additions & 6 deletions pkg/handler/status.go
Original file line number Diff line number Diff line change
@@ -1,14 +1,73 @@
package handler

import (
"context"
"net/http"

"github.com/sirupsen/logrus"
"strings"
)

func Status(w http.ResponseWriter, _ *http.Request) {
_, err := w.Write([]byte("OK"))
if err != nil {
logrus.Errorf("could not write response: %v", err)
type Status struct {
Loki DatasourceStatus `yaml:"loki" json:"loki"`
Prometheus DatasourceStatus `yaml:"prometheus" json:"prometheus"`
}

type DatasourceStatus struct {
IsEnabled bool `yaml:"isEnabled" json:"isEnabled"`
NamespacesCount int `yaml:"namespacesCount" json:"namespacesCount"`
IsReady bool `yaml:"isReady" json:"isReady"`
Error string `yaml:"error" json:"error"`
ErrorCode int `yaml:"errorCode" json:"errorCode"`
}

func (h *Handlers) Status(ctx context.Context) func(w http.ResponseWriter, r *http.Request) {
return func(w http.ResponseWriter, r *http.Request) {
params := r.URL.Query()
namespace := params.Get(namespaceKey)
isDev := namespace != ""

status := Status{
Prometheus: DatasourceStatus{IsEnabled: h.Cfg.IsPromEnabled()},
Loki: DatasourceStatus{IsEnabled: h.Cfg.IsLokiEnabled()},
}

if status.Prometheus.IsEnabled {
promClients, err := newPromClients(h.Cfg, r.Header, namespace)
if err != nil {
status.Prometheus.Error = err.Error()
status.Prometheus.ErrorCode = http.StatusInternalServerError
} else {
// Get namespaces using Prom
promNamespaces, code, err := h.getNamespacesValues(ctx, promClients, isDev)
if err != nil {
status.Prometheus.Error = "Error while fetching label namespace values from Prometheus: " + err.Error()
status.Prometheus.ErrorCode = code
} else {
status.Prometheus.IsReady = true
status.Prometheus.NamespacesCount = len(promNamespaces)
}
}
}

if status.Loki.IsEnabled {
resp, code, err := h.getLokiStatus(r)
if err != nil {
status.Loki.Error = err.Error()
status.Loki.ErrorCode = code
} else {
lokiStatus := string(resp)
status.Loki.IsReady = strings.Contains(lokiStatus, "ready")

lokiClients := newLokiClients(h.Cfg, r.Header, false)
// get namespaces using Loki
lokiNamespaces, code, err := h.getNamespacesValues(ctx, lokiClients, isDev)
if err != nil {
status.Loki.Error = "Error while fetching label namespace values from Loki: " + err.Error()
status.Loki.ErrorCode = code
} else {
status.Loki.NamespacesCount = len(lokiNamespaces)
}
}
}
writeJSON(w, http.StatusOK, status)
}
}
9 changes: 8 additions & 1 deletion pkg/handler/topology.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ import (
"fmt"
"net/http"
"net/url"
"slices"
"time"

"github.com/netobserv/network-observability-console-plugin/pkg/config"
Expand Down Expand Up @@ -338,7 +339,13 @@ func getEligiblePromMetric(promInventory *prometheus.Inventory, filters filters.
}
labelsNeeded = append(labelsNeeded, fromFilters...)
if isDev {
labelsNeeded = append(labelsNeeded, fields.SrcNamespace)
if !slices.Contains(labelsNeeded, fields.SrcNamespace) {
labelsNeeded = append(labelsNeeded, fields.SrcNamespace)
}

if !slices.Contains(labelsNeeded, fields.DstNamespace) {
labelsNeeded = append(labelsNeeded, fields.DstNamespace)
}
}

// Search for such metric
Expand Down
29 changes: 17 additions & 12 deletions pkg/prometheus/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -79,15 +79,7 @@ func executeQueryRange(ctx context.Context, cl api.Client, q *Query) (pmod.Value
}
if err != nil {
log.Tracef("Error:\n%v", err)
code = http.StatusServiceUnavailable
var promError *v1.Error
if errors.As(err, &promError) {
if promError.Type == v1.ErrClient && strings.Contains(promError.Msg, "401") {
code = http.StatusUnauthorized
} else if promError.Type == v1.ErrClient && strings.Contains(promError.Msg, "403") {
code = http.StatusForbidden
}
}
code = translateErrorCode(err)
return nil, code, fmt.Errorf("error from Prometheus query: %w", err)
}

Expand Down Expand Up @@ -125,16 +117,29 @@ func GetLabelValues(ctx context.Context, cl api.Client, label string, match []st
log.Debugf("GetLabelValues: %s", label)
v1api := v1.NewAPI(cl)
result, warnings, err := v1api.LabelValues(ctx, label, match, time.Now().Add(-3*time.Hour), time.Now())
if err != nil {
return nil, http.StatusServiceUnavailable, err
}
if len(warnings) > 0 {
log.Infof("GetLabelValues warnings: %v", warnings)
}
if err != nil {
code := translateErrorCode(err)
return nil, code, fmt.Errorf("could not get label values: %w", err)
}
log.Tracef("Result:\n%v", result)
var asStrings []string
for _, s := range result {
asStrings = append(asStrings, string(s))
}
return asStrings, http.StatusOK, nil
}

func translateErrorCode(err error) int {
var promError *v1.Error
if errors.As(err, &promError) {
if promError.Type == v1.ErrClient && strings.Contains(promError.Msg, "401") {
return http.StatusUnauthorized
} else if promError.Type == v1.ErrClient && strings.Contains(promError.Msg, "403") {
return http.StatusForbidden
}
}
return http.StatusServiceUnavailable
}
Loading
Loading