Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 7 additions & 0 deletions components/src/dynamo/planner/utils/planner_core.py
Original file line number Diff line number Diff line change
Expand Up @@ -89,9 +89,16 @@ def __init__(
else:
raise ValueError(f"Invalid environment: {args.environment}")

# Use backend metrics for vLLM (queries vllm:* metrics directly from workers)
# Use frontend metrics for other backends (queries dynamo_frontend_* metrics)
metric_source = "backend" if args.backend.lower() == "vllm" else "frontend"
logger.info(
f"Initializing Prometheus client with metric_source='{metric_source}' for backend '{args.backend}'"
)
self.prometheus_api_client = PrometheusAPIClient(
SLAPlannerDefaults.prometheus_endpoint,
args.namespace,
metric_source=metric_source,
)

self.num_req_predictor = LOAD_PREDICTORS[args.load_predictor](
Expand Down
257 changes: 226 additions & 31 deletions components/src/dynamo/planner/utils/prometheus.py
Original file line number Diff line number Diff line change
Expand Up @@ -32,9 +32,11 @@ class FrontendMetric(BaseModel):
endpoint: typing.Optional[str] = None
instance: typing.Optional[str] = None
job: typing.Optional[str] = None
model: typing.Optional[str] = None
namespace: typing.Optional[str] = None
pod: typing.Optional[str] = None
model: typing.Optional[str] = None # Frontend uses this label
model_name: typing.Optional[str] = None # Backend (vLLM) uses this label
namespace: typing.Optional[str] = None # Kubernetes namespace
pod: typing.Optional[str] = None # Pod name (used for backend filtering)
engine: typing.Optional[str] = None # vLLM engine index


class FrontendMetricContainer(BaseModel):
Expand All @@ -43,9 +45,58 @@ class FrontendMetricContainer(BaseModel):


class PrometheusAPIClient:
def __init__(self, url: str, dynamo_namespace: str):
"""
Client for querying Dynamo metrics from Prometheus.

Supports querying both frontend and backend metrics:
- Frontend metrics: dynamo_frontend_* (from Dynamo HTTP frontend)
- Backend metrics: vllm:* (from vLLM engine workers)

Usage:
# Query frontend metrics (default)
frontend_client = PrometheusAPIClient(url="http://prometheus:9090",
dynamo_namespace="my-deployment")
ttft = frontend_client.get_avg_time_to_first_token("60s", "llama-3-8b")

# Query backend worker metrics
backend_client = PrometheusAPIClient(url="http://prometheus:9090",
dynamo_namespace="my-deployment",
metric_source="backend")
ttft = backend_client.get_avg_time_to_first_token("60s", "llama-3-8b")
"""

# Metric name mapping for backend (vLLM) metrics
# Maps from frontend metric concept to actual vLLM metric name
BACKEND_METRIC_MAP = {
"time_to_first_token_seconds": "time_to_first_token_seconds", # histogram
"inter_token_latency_seconds": "inter_token_latency_seconds", # histogram
"request_duration_seconds": "e2e_request_latency_seconds", # histogram - vLLM's e2e latency
"input_sequence_tokens": "prompt_tokens_total", # counter - total prompt tokens
"output_sequence_tokens": "generation_tokens_total", # counter - total generation tokens
"requests_total": "request_success_total", # counter
}

def __init__(
self, url: str, dynamo_namespace: str, metric_source: str = "frontend"
):
"""
Initialize Prometheus API client.

Args:
url: Prometheus server URL
dynamo_namespace: Dynamo namespace to filter metrics
metric_source: Either "frontend" or "backend".
"frontend" queries dynamo_frontend_* metrics
"backend" queries vllm:* metrics from workers
"""
if metric_source not in ["frontend", "backend"]:
raise ValueError(
f"metric_source must be 'frontend' or 'backend', got: {metric_source}"
)

self.prom = PrometheusConnect(url=url, disable_ssl=True)
self.dynamo_namespace = dynamo_namespace
self.metric_source = metric_source

def _get_average_metric(
self, full_metric_name: str, interval: str, operation_name: str, model_name: str
Expand All @@ -55,19 +106,30 @@ def _get_average_metric(
increase(metric_sum[interval])/increase(metric_count[interval])

Args:
full_metric_name: Full metric name (e.g., 'dynamo_frontend_inter_token_latency_seconds')
full_metric_name: Full metric name (e.g., 'dynamo_frontend_inter_token_latency_seconds' or 'time_to_first_token_seconds')
interval: Time interval for the query (e.g., '60s')
operation_name: Human-readable name for error logging
model_name: Model name to filter by

Returns:
Average metric value or 0 if no data/error
"""
try:
# Prepend the frontend metric prefix if not already present
if not full_metric_name.startswith(prometheus_names.name_prefix.FRONTEND):
full_metric_name = (
f"{prometheus_names.name_prefix.FRONTEND}_{full_metric_name}"
)
# Apply metric prefix based on source
if self.metric_source == "frontend":
# Prepend the frontend metric prefix if not already present
if not full_metric_name.startswith(
prometheus_names.name_prefix.FRONTEND
):
full_metric_name = (
f"{prometheus_names.name_prefix.FRONTEND}_{full_metric_name}"
)
else: # backend
# Backend uses vllm: prefix
# Check if it's already a full vllm metric name (from our mapping)
if not full_metric_name.startswith("vllm:"):
full_metric_name = f"vllm:{full_metric_name}"

query = f"increase({full_metric_name}_sum[{interval}])/increase({full_metric_name}_count[{interval}])"
result = self.prom.custom_query(query=query)
if not result:
Expand All @@ -80,12 +142,32 @@ def _get_average_metric(

values = []
for container in metrics_containers:
# Frontend lowercases model names for Prometheus labels so we need to do case-insensitive comparison
if (
container.metric.model
and container.metric.model.lower() == model_name.lower()
and container.metric.dynamo_namespace == self.dynamo_namespace
):
# Determine which label to use for model filtering
if self.metric_source == "frontend":
# Frontend uses 'model' label and lowercases model names
model_match = (
container.metric.model
and container.metric.model.lower() == model_name.lower()
)
namespace_match = (
container.metric.dynamo_namespace == self.dynamo_namespace
)
else: # backend
# Backend (vLLM) uses 'model_name' label - check both for compatibility
backend_model = getattr(
container.metric, "model_name", container.metric.model
)
model_match = (
backend_model and backend_model.lower() == model_name.lower()
)
# Backend metrics don't have dynamo_namespace, filter by pod name containing dynamo namespace
pod_name = getattr(container.metric, "pod", "")
namespace_match = (
self.dynamo_namespace in pod_name if pod_name else True
)

# Filter by model and namespace
if model_match and namespace_match:
values.append(container.value[1])

if not values:
Expand All @@ -99,6 +181,70 @@ def _get_average_metric(
logger.error(f"Error getting {operation_name}: {e}")
return 0

def _get_counter_average(
self, counter_metric: str, interval: str, model_name: str, operation_name: str
) -> float:
"""
Get average value from a counter metric by dividing total increase by request count increase.
Used for vLLM token counters (prompt_tokens_total, generation_tokens_total).

Formula: increase(counter_total[interval]) / increase(request_success_total[interval])
"""
try:
full_metric_name = f"vllm:{counter_metric}"
requests_metric = "vllm:request_success_total"

# Query both the counter and request count
counter_query = f"increase({full_metric_name}[{interval}])"
requests_query = f"increase({requests_metric}[{interval}])"

counter_result = self.prom.custom_query(query=counter_query)
requests_result = self.prom.custom_query(query=requests_query)

if not counter_result or not requests_result:
logger.warning(
f"No prometheus metric data available for {full_metric_name}, use 0 instead"
)
return 0

counter_containers = parse_frontend_metric_containers(counter_result)
requests_containers = parse_frontend_metric_containers(requests_result)

# Sum up values for matching pods
total_counter = 0.0
total_requests = 0.0

for container in counter_containers:
backend_model = getattr(
container.metric, "model_name", container.metric.model
)
if backend_model and backend_model.lower() == model_name.lower():
pod_name = getattr(container.metric, "pod", "")
if self.dynamo_namespace in pod_name if pod_name else True:
total_counter += container.value[1]

for container in requests_containers:
backend_model = getattr(
container.metric, "model_name", container.metric.model
)
if backend_model and backend_model.lower() == model_name.lower():
pod_name = getattr(container.metric, "pod", "")
if self.dynamo_namespace in pod_name if pod_name else True:
total_requests += container.value[1]

if total_requests == 0:
logger.warning(
f"No requests for {operation_name} calculation, use 0 instead"
)
return 0

average = total_counter / total_requests
return average

except Exception as e:
logger.error(f"Error getting {operation_name}: {e}")
return 0

def get_avg_inter_token_latency(self, interval: str, model_name: str):
return self._get_average_metric(
prometheus_names.frontend_service.INTER_TOKEN_LATENCY_SECONDS,
Expand All @@ -116,6 +262,15 @@ def get_avg_time_to_first_token(self, interval: str, model_name: str):
)

def get_avg_request_duration(self, interval: str, model_name: str):
if self.metric_source == "backend":
# Backend uses e2e_request_latency_seconds instead of request_duration_seconds
metric_name = self.BACKEND_METRIC_MAP["request_duration_seconds"]
return self._get_average_metric(
metric_name,
interval,
"avg e2e request latency",
model_name,
)
return self._get_average_metric(
prometheus_names.frontend_service.REQUEST_DURATION_SECONDS,
interval,
Expand All @@ -124,35 +279,69 @@ def get_avg_request_duration(self, interval: str, model_name: str):
)

def get_avg_request_count(self, interval: str, model_name: str):
# This function follows a different query pattern than the other metrics
"""
Get request count over the specified interval.

For frontend: queries dynamo_frontend_requests_total
For backend: queries vllm:request_success_total
"""
try:
requests_total_metric = prometheus_names.frontend_service.REQUESTS_TOTAL
# Prepend the frontend metric prefix if not already present
if not requests_total_metric.startswith(
prometheus_names.name_prefix.FRONTEND
):
requests_total_metric = (
f"{prometheus_names.name_prefix.FRONTEND}_{requests_total_metric}"
)
if self.metric_source == "frontend":
requests_total_metric = prometheus_names.frontend_service.REQUESTS_TOTAL
# Prepend the frontend metric prefix if not already present
if not requests_total_metric.startswith(
prometheus_names.name_prefix.FRONTEND
):
requests_total_metric = f"{prometheus_names.name_prefix.FRONTEND}_{requests_total_metric}"
else: # backend
# Backend uses vllm:request_success_total
requests_total_metric = "vllm:request_success_total"

raw_res = self.prom.custom_query(
query=f"increase({requests_total_metric}[{interval}])"
)
metrics_containers = parse_frontend_metric_containers(raw_res)
total_count = 0.0
for container in metrics_containers:
# Frontend lowercases model names for Prometheus labels so we need to do case-insensitive comparison
if (
container.metric.model
and container.metric.model.lower() == model_name.lower()
and container.metric.dynamo_namespace == self.dynamo_namespace
):
# Determine which label to use for model filtering
if self.metric_source == "frontend":
# Frontend uses 'model' label and lowercases model names
model_match = (
container.metric.model
and container.metric.model.lower() == model_name.lower()
)
namespace_match = (
container.metric.dynamo_namespace == self.dynamo_namespace
)
else: # backend
# Backend (vLLM) uses 'model_name' label - check both for compatibility
backend_model = getattr(
container.metric, "model_name", container.metric.model
)
model_match = (
backend_model and backend_model.lower() == model_name.lower()
)
# Backend metrics don't have dynamo_namespace, filter by pod name containing dynamo namespace
pod_name = getattr(container.metric, "pod", "")
namespace_match = (
self.dynamo_namespace in pod_name if pod_name else True
)

# Filter by model and namespace
if model_match and namespace_match:
total_count += container.value[1]
return total_count
except Exception as e:
logger.error(f"Error getting avg request count: {e}")
return 0

def get_avg_input_sequence_tokens(self, interval: str, model_name: str):
if self.metric_source == "backend":
# Backend uses prompt_tokens counter (not histogram)
metric_name = self.BACKEND_METRIC_MAP["input_sequence_tokens"]
return self._get_counter_average(
metric_name, interval, model_name, "input_sequence_tokens"
)
return self._get_average_metric(
prometheus_names.frontend_service.INPUT_SEQUENCE_TOKENS,
interval,
Expand All @@ -161,6 +350,12 @@ def get_avg_input_sequence_tokens(self, interval: str, model_name: str):
)

def get_avg_output_sequence_tokens(self, interval: str, model_name: str):
if self.metric_source == "backend":
# Backend uses generation_tokens counter (not histogram)
metric_name = self.BACKEND_METRIC_MAP["output_sequence_tokens"]
return self._get_counter_average(
metric_name, interval, model_name, "output_sequence_tokens"
)
return self._get_average_metric(
prometheus_names.frontend_service.OUTPUT_SEQUENCE_TOKENS,
interval,
Expand Down
Loading