diff --git a/src/robusta/integrations/prometheus/utils.py b/src/robusta/integrations/prometheus/utils.py index 602d48ed7..3947c2468 100644 --- a/src/robusta/integrations/prometheus/utils.py +++ b/src/robusta/integrations/prometheus/utils.py @@ -1,6 +1,6 @@ import logging import os -from typing import Dict, List, Optional +from typing import Dict, List, Optional, Tuple from cachetools import TTLCache from prometrix import ( @@ -15,7 +15,7 @@ from robusta.core.exceptions import NoPrometheusUrlFound from robusta.core.model.base_params import PrometheusParams from robusta.core.model.env_vars import PROMETHEUS_SSL_ENABLED, SERVICE_CACHE_TTL_SEC -from robusta.utils.service_discovery import find_service_url +from robusta.utils.service_discovery import DiscoveredServiceUrl, find_service_url AZURE_RESOURCE = os.environ.get("AZURE_RESOURCE", "https://prometheus.monitor.azure.com") AZURE_METADATA_ENDPOINT = os.environ.get( @@ -33,15 +33,37 @@ VICTORIA_METRICS_CONFIGURED = os.environ.get("VICTORIA_METRICS_CONFIGURED", "false").lower() == "true" +def _unpack_discovery_result( + discovery_result: Optional[DiscoveredServiceUrl], +) -> Tuple[Optional[str], Optional[Dict[str, str]]]: + if not discovery_result: + return None, None + return discovery_result.url, discovery_result.headers + + +def _merge_discovered_headers( + discovered_headers: Optional[Dict[str, str]], additional_headers: Optional[Dict[str, str]] +) -> Optional[Dict[str, str]]: + headers: Dict[str, str] = {} + if discovered_headers: + headers.update(discovered_headers) + if additional_headers: + headers.update(additional_headers) + return headers or None + + def generate_prometheus_config(prometheus_params: PrometheusParams) -> PrometheusConfig: is_victoria_metrics = VICTORIA_METRICS_CONFIGURED - url: Optional[str] = ( - prometheus_params.prometheus_url - if prometheus_params.prometheus_url - else PrometheusDiscovery.find_prometheus_url() - ) + discovered_headers: Optional[Dict[str, str]] = None + url: Optional[str] = None + + if prometheus_params.prometheus_url: + url = prometheus_params.prometheus_url + else: + url, discovered_headers = _unpack_discovery_result(PrometheusDiscovery.find_prometheus_url()) + if not url: - url = PrometheusDiscovery.find_vm_url() + url, discovered_headers = _unpack_discovery_result(PrometheusDiscovery.find_vm_url()) is_victoria_metrics = is_victoria_metrics is not None if not url: raise NoPrometheusUrlFound("Prometheus url could not be found. Add 'prometheus_url' under global_config") @@ -54,8 +76,9 @@ def generate_prometheus_config(prometheus_params: PrometheusParams) -> Prometheu if prometheus_params.prometheus_auth: baseconfig["prometheus_auth"] = prometheus_params.prometheus_auth.get_secret_value() - if prometheus_params.prometheus_additional_headers: - baseconfig["headers"] = prometheus_params.prometheus_additional_headers + merged_headers = _merge_discovered_headers(discovered_headers, prometheus_params.prometheus_additional_headers) + if merged_headers: + baseconfig["headers"] = merged_headers # aws config if AWS_REGION: @@ -113,7 +136,7 @@ class ServiceDiscovery: cache: TTLCache = TTLCache(maxsize=5, ttl=SERVICE_CACHE_TTL_SEC) @classmethod - def find_url(cls, selectors: List[str], error_msg: str) -> Optional[str]: + def find_url(cls, selectors: List[str], error_msg: str) -> Optional[DiscoveredServiceUrl]: """ Try to autodiscover the url of an in-cluster service """ @@ -134,7 +157,7 @@ def find_url(cls, selectors: List[str], error_msg: str) -> Optional[str]: class PrometheusDiscovery(ServiceDiscovery): @classmethod - def find_prometheus_url(cls) -> Optional[str]: + def find_prometheus_url(cls) -> Optional[DiscoveredServiceUrl]: return super().find_url( selectors=[ "app=kube-prometheus-stack-prometheus", @@ -167,7 +190,7 @@ def find_vm_url(cls) -> Optional[str]: class AlertManagerDiscovery(ServiceDiscovery): @classmethod - def find_alert_manager_url(cls) -> Optional[str]: + def find_alert_manager_url(cls) -> Optional[DiscoveredServiceUrl]: return super().find_url( selectors=[ "app=kube-prometheus-stack-alertmanager", @@ -188,9 +211,9 @@ class HolmesDiscovery(ServiceDiscovery): MODEL_NAME_URL = "/api/model" @classmethod - def find_holmes_url(cls, holmes_url: Optional[str] = None) -> Optional[str]: + def find_holmes_url(cls, holmes_url: Optional[str] = None) -> Optional[DiscoveredServiceUrl]: if holmes_url: - return holmes_url + return DiscoveredServiceUrl(url=holmes_url) return super().find_url( selectors=["app=holmes"], diff --git a/src/robusta/utils/service_discovery.py b/src/robusta/utils/service_discovery.py index f8a8b8231..dcef578dc 100644 --- a/src/robusta/utils/service_discovery.py +++ b/src/robusta/utils/service_discovery.py @@ -1,4 +1,7 @@ import logging +import os +from dataclasses import dataclass +from typing import Dict, Optional from kubernetes import client from kubernetes.client import V1ServiceList @@ -7,9 +10,63 @@ from robusta.core.model.env_vars import CLUSTER_DOMAIN +@dataclass +class DiscoveredServiceUrl: + url: str + headers: Optional[Dict[str, str]] = None + + def __str__(self) -> str: + return self.url + + +def _should_use_proxy() -> bool: + return not os.getenv("KUBERNETES_SERVICE_HOST") + + +def _derive_service_scheme(port) -> str: + port_name = (port.name or "").lower() + app_protocol = getattr(port, "app_protocol", None) or "" + app_protocol = app_protocol.lower() + + if "https" in port_name or "https" in app_protocol or port.port == 443: + return "https" + return "http" + + +def _get_kube_proxy_headers() -> Optional[Dict[str, str]]: + try: + api_client = client.ApiClient() + auth_header = api_client.configuration.get_api_key_with_prefix("authorization") + headers: Dict[str, str] = {} + if auth_header: + headers["Authorization"] = auth_header + if api_client.configuration.default_headers: + headers.update(api_client.configuration.default_headers) + return headers or None + except Exception as e: + logging.debug(f"Unable to build Kubernetes proxy headers: {e}") + return None + + +def _build_proxy_url(name: str, namespace: str, port: int, scheme: str) -> Optional[str]: + try: + configuration = client.Configuration.get_default_copy() + except Exception as e: + logging.debug(f"Unable to load Kubernetes configuration for proxy url: {e}") + return None + + host = (configuration.host or "").rstrip("/") + if not host: + return None + + path = f"/api/v1/namespaces/{namespace}/services/{scheme}:{name}:{port}/proxy" + return f"{host}{path}" + + def find_service_url(label_selector): """ - Get the url of an in-cluster service with a specific label + Get the url of a service with a specific label. When running outside the cluster, + prefer the Kubernetes API proxy using the current user's credentials. """ # we do it this way because there is a weird issue with hikaru's ServiceList.listServiceForAllNamespaces() v1 = client.CoreV1Api() @@ -19,7 +76,17 @@ def find_service_url(label_selector): svc: V1Service = svc_list.items[0] name = svc.metadata.name namespace = svc.metadata.namespace - port = svc.spec.ports[0].port - url = f"http://{name}.{namespace}.svc.{CLUSTER_DOMAIN}:{port}" - logging.info(f"discovered service with label-selector: `{label_selector}` at url: `{url}`") - return url + port_obj = svc.spec.ports[0] + port = port_obj.port + scheme = _derive_service_scheme(port_obj) + + cluster_local_url = f"{scheme}://{name}.{namespace}.svc.{CLUSTER_DOMAIN}:{port}" + proxy_url = _build_proxy_url(name, namespace, port, scheme) if _should_use_proxy() else None + headers = _get_kube_proxy_headers() if proxy_url else None + final_url = proxy_url or cluster_local_url + + logging.info( + f"discovered service with label-selector: `{label_selector}` at url: `{final_url}` " + f"(proxy_used={proxy_url is not None})" + ) + return DiscoveredServiceUrl(url=final_url, headers=headers)