Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
53 changes: 38 additions & 15 deletions src/robusta/integrations/prometheus/utils.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
import logging
import os
from typing import Dict, List, Optional
from typing import Dict, List, Optional, Tuple

from cachetools import TTLCache
from prometrix import (
Expand All @@ -15,7 +15,7 @@
from robusta.core.exceptions import NoPrometheusUrlFound
from robusta.core.model.base_params import PrometheusParams
from robusta.core.model.env_vars import PROMETHEUS_SSL_ENABLED, SERVICE_CACHE_TTL_SEC
from robusta.utils.service_discovery import find_service_url
from robusta.utils.service_discovery import DiscoveredServiceUrl, find_service_url

AZURE_RESOURCE = os.environ.get("AZURE_RESOURCE", "https://prometheus.monitor.azure.com")
AZURE_METADATA_ENDPOINT = os.environ.get(
Expand All @@ -33,15 +33,37 @@
VICTORIA_METRICS_CONFIGURED = os.environ.get("VICTORIA_METRICS_CONFIGURED", "false").lower() == "true"


def _unpack_discovery_result(
discovery_result: Optional[DiscoveredServiceUrl],
) -> Tuple[Optional[str], Optional[Dict[str, str]]]:
if not discovery_result:
return None, None
return discovery_result.url, discovery_result.headers


def _merge_discovered_headers(
discovered_headers: Optional[Dict[str, str]], additional_headers: Optional[Dict[str, str]]
) -> Optional[Dict[str, str]]:
headers: Dict[str, str] = {}
if discovered_headers:
headers.update(discovered_headers)
if additional_headers:
headers.update(additional_headers)
return headers or None


def generate_prometheus_config(prometheus_params: PrometheusParams) -> PrometheusConfig:
is_victoria_metrics = VICTORIA_METRICS_CONFIGURED
url: Optional[str] = (
prometheus_params.prometheus_url
if prometheus_params.prometheus_url
else PrometheusDiscovery.find_prometheus_url()
)
discovered_headers: Optional[Dict[str, str]] = None
url: Optional[str] = None

if prometheus_params.prometheus_url:
url = prometheus_params.prometheus_url
else:
url, discovered_headers = _unpack_discovery_result(PrometheusDiscovery.find_prometheus_url())

if not url:
url = PrometheusDiscovery.find_vm_url()
url, discovered_headers = _unpack_discovery_result(PrometheusDiscovery.find_vm_url())
is_victoria_metrics = is_victoria_metrics is not None
if not url:
raise NoPrometheusUrlFound("Prometheus url could not be found. Add 'prometheus_url' under global_config")
Expand All @@ -54,8 +76,9 @@ def generate_prometheus_config(prometheus_params: PrometheusParams) -> Prometheu
if prometheus_params.prometheus_auth:
baseconfig["prometheus_auth"] = prometheus_params.prometheus_auth.get_secret_value()

if prometheus_params.prometheus_additional_headers:
baseconfig["headers"] = prometheus_params.prometheus_additional_headers
merged_headers = _merge_discovered_headers(discovered_headers, prometheus_params.prometheus_additional_headers)
if merged_headers:
baseconfig["headers"] = merged_headers

# aws config
if AWS_REGION:
Expand Down Expand Up @@ -113,7 +136,7 @@ class ServiceDiscovery:
cache: TTLCache = TTLCache(maxsize=5, ttl=SERVICE_CACHE_TTL_SEC)

@classmethod
def find_url(cls, selectors: List[str], error_msg: str) -> Optional[str]:
def find_url(cls, selectors: List[str], error_msg: str) -> Optional[DiscoveredServiceUrl]:
"""
Try to autodiscover the url of an in-cluster service
"""
Expand All @@ -134,7 +157,7 @@ def find_url(cls, selectors: List[str], error_msg: str) -> Optional[str]:

class PrometheusDiscovery(ServiceDiscovery):
@classmethod
def find_prometheus_url(cls) -> Optional[str]:
def find_prometheus_url(cls) -> Optional[DiscoveredServiceUrl]:
return super().find_url(
selectors=[
"app=kube-prometheus-stack-prometheus",
Expand Down Expand Up @@ -167,7 +190,7 @@ def find_vm_url(cls) -> Optional[str]:

class AlertManagerDiscovery(ServiceDiscovery):
@classmethod
def find_alert_manager_url(cls) -> Optional[str]:
def find_alert_manager_url(cls) -> Optional[DiscoveredServiceUrl]:
return super().find_url(
selectors=[
"app=kube-prometheus-stack-alertmanager",
Expand All @@ -188,9 +211,9 @@ class HolmesDiscovery(ServiceDiscovery):
MODEL_NAME_URL = "/api/model"

@classmethod
def find_holmes_url(cls, holmes_url: Optional[str] = None) -> Optional[str]:
def find_holmes_url(cls, holmes_url: Optional[str] = None) -> Optional[DiscoveredServiceUrl]:
if holmes_url:
return holmes_url
return DiscoveredServiceUrl(url=holmes_url)

return super().find_url(
selectors=["app=holmes"],
Expand Down
77 changes: 72 additions & 5 deletions src/robusta/utils/service_discovery.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,7 @@
import logging
import os
from dataclasses import dataclass
from typing import Dict, Optional

from kubernetes import client
from kubernetes.client import V1ServiceList
Expand All @@ -7,9 +10,63 @@
from robusta.core.model.env_vars import CLUSTER_DOMAIN


@dataclass
class DiscoveredServiceUrl:
url: str
headers: Optional[Dict[str, str]] = None

def __str__(self) -> str:
return self.url


def _should_use_proxy() -> bool:
return not os.getenv("KUBERNETES_SERVICE_HOST")


def _derive_service_scheme(port) -> str:
port_name = (port.name or "").lower()
app_protocol = getattr(port, "app_protocol", None) or ""
app_protocol = app_protocol.lower()

if "https" in port_name or "https" in app_protocol or port.port == 443:
return "https"
return "http"


def _get_kube_proxy_headers() -> Optional[Dict[str, str]]:
try:
api_client = client.ApiClient()
auth_header = api_client.configuration.get_api_key_with_prefix("authorization")
headers: Dict[str, str] = {}
if auth_header:
headers["Authorization"] = auth_header
if api_client.configuration.default_headers:
headers.update(api_client.configuration.default_headers)
return headers or None
except Exception as e:
logging.debug(f"Unable to build Kubernetes proxy headers: {e}")
return None


def _build_proxy_url(name: str, namespace: str, port: int, scheme: str) -> Optional[str]:
try:
configuration = client.Configuration.get_default_copy()
except Exception as e:
logging.debug(f"Unable to load Kubernetes configuration for proxy url: {e}")
return None

host = (configuration.host or "").rstrip("/")
if not host:
return None

path = f"/api/v1/namespaces/{namespace}/services/{scheme}:{name}:{port}/proxy"
return f"{host}{path}"
Comment on lines +51 to +63
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🟠 Major

🧩 Analysis chain

🌐 Web query:

Kubernetes API proxy URL format services /api/v1/namespaces services proxy

💡 Result:

Kubernetes API server proxy URL patterns (service-level)

  • Base form:
    /api/v1/namespaces/{namespace}/services/{service-ident}/proxy{/path...}.[1][2]

  • {service-ident} supported forms:

    • {service-name} → proxy to service’s default/unnamed port over http.
    • {service-name}:{port} → proxy to the named numeric port (or port-name) over http.
    • https:{service-name}: → proxy to service’s default/unnamed port over HTTPS (note the leading "https:" and trailing colon).
    • https:{service-name}:{port} → proxy to the specified port over HTTPS.
      Examples:
    • /api/v1/namespaces/default/services/web-frontend/proxy/
    • /api/v1/namespaces/default/services/web-frontend:80/proxy/
    • /api/v1/namespaces/kube-system/services/https:kube-ui:/proxy/ (HTTPS to default port). [2][4]
  • You can append additional path/query after /proxy, e.g. /proxy/_search?q=foo or /proxy/api/v1/items. [1][3]

Notes:

  • The API also exposes verb-specific endpoints (GET/POST/HEAD/PUT) at the same path. [1]
  • When using kubectl proxy, call the same path against the local proxy (e.g. http://localhost:8001/api/v1/...). [3]

Sources: Kubernetes API reference and docs; examples/discussion. [1] Kubernetes API reference — Proxy operations. [2] kubectl cluster-info / proxy examples and formats. [3] kubectl proxy docs. [4] community examples/Q&A showing https: prefix.


Fix proxy path format for HTTP services.

The proxy URL format is incorrect for HTTP services. Per Kubernetes API documentation, the service identifier should be:

  • HTTP with port: {name}:{port} (no scheme prefix)
  • HTTPS with port: https:{name}:{port} (with scheme prefix)

The current code uses {scheme}:{name}:{port} for all schemes, which generates invalid URLs for HTTP (e.g., http:my-service:8080 instead of my-service:8080). Conditional logic is needed: omit the scheme prefix for HTTP, include it only for HTTPS.

🧰 Tools
🪛 Ruff (0.14.10)

54-54: Do not catch blind exception: Exception

(BLE001)

🤖 Prompt for AI Agents
In src/robusta/utils/service_discovery.py around lines 51 to 63, the proxy path
uses "{scheme}:{name}:{port}" for all services which produces invalid proxy
paths for HTTP; change the path construction so that for scheme == "https" the
service identifier is "https:{name}:{port}" but for "http" (and empty/default)
it is just "{name}:{port}". Keep the rest of the function intact (load
configuration, validate host) and return f"{host}{path}" as before.



def find_service_url(label_selector):
"""
Get the url of an in-cluster service with a specific label
Get the url of a service with a specific label. When running outside the cluster,
prefer the Kubernetes API proxy using the current user's credentials.
"""
# we do it this way because there is a weird issue with hikaru's ServiceList.listServiceForAllNamespaces()
v1 = client.CoreV1Api()
Expand All @@ -19,7 +76,17 @@ def find_service_url(label_selector):
svc: V1Service = svc_list.items[0]
name = svc.metadata.name
namespace = svc.metadata.namespace
port = svc.spec.ports[0].port
url = f"http://{name}.{namespace}.svc.{CLUSTER_DOMAIN}:{port}"
logging.info(f"discovered service with label-selector: `{label_selector}` at url: `{url}`")
return url
port_obj = svc.spec.ports[0]
port = port_obj.port
Comment on lines +79 to +80
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🟡 Minor

Potential IndexError if service has no ports.

If svc.spec.ports is empty or None, accessing svc.spec.ports[0] will raise an IndexError. Consider adding a guard.

🔎 Proposed fix
     name = svc.metadata.name
     namespace = svc.metadata.namespace
+    if not svc.spec.ports:
+        logging.warning(f"Service {name} in {namespace} has no ports defined")
+        return None
     port_obj = svc.spec.ports[0]
     port = port_obj.port

Committable suggestion skipped: line range outside the PR's diff.

🤖 Prompt for AI Agents
In src/robusta/utils/service_discovery.py around lines 79-80, the code
unconditionally accesses svc.spec.ports[0] which will raise an IndexError if
svc.spec.ports is None or empty; add a guard that checks if svc.spec and
svc.spec.ports are truthy and that len(svc.spec.ports) > 0 before indexing, and
handle the empty case by logging/raising a clear error or skipping this service
(returning None or continuing) so the function fails gracefully instead of
throwing an IndexError.

scheme = _derive_service_scheme(port_obj)

cluster_local_url = f"{scheme}://{name}.{namespace}.svc.{CLUSTER_DOMAIN}:{port}"
proxy_url = _build_proxy_url(name, namespace, port, scheme) if _should_use_proxy() else None
headers = _get_kube_proxy_headers() if proxy_url else None
final_url = proxy_url or cluster_local_url

logging.info(
f"discovered service with label-selector: `{label_selector}` at url: `{final_url}` "
f"(proxy_used={proxy_url is not None})"
)
return DiscoveredServiceUrl(url=final_url, headers=headers)
Loading