diff --git a/components/src/dynamo/planner/utils/planner_core.py b/components/src/dynamo/planner/utils/planner_core.py index a87251ba83..393da60deb 100644 --- a/components/src/dynamo/planner/utils/planner_core.py +++ b/components/src/dynamo/planner/utils/planner_core.py @@ -89,9 +89,16 @@ def __init__( else: raise ValueError(f"Invalid environment: {args.environment}") + # Use backend metrics for vLLM (queries vllm:* metrics directly from workers) + # Use frontend metrics for other backends (queries dynamo_frontend_* metrics) + metric_source = "backend" if args.backend.lower() == "vllm" else "frontend" + logger.info( + f"Initializing Prometheus client with metric_source='{metric_source}' for backend '{args.backend}'" + ) self.prometheus_api_client = PrometheusAPIClient( SLAPlannerDefaults.prometheus_endpoint, args.namespace, + metric_source=metric_source, ) self.num_req_predictor = LOAD_PREDICTORS[args.load_predictor]( diff --git a/components/src/dynamo/planner/utils/prometheus.py b/components/src/dynamo/planner/utils/prometheus.py index 99a314832d..3a07a09536 100644 --- a/components/src/dynamo/planner/utils/prometheus.py +++ b/components/src/dynamo/planner/utils/prometheus.py @@ -32,9 +32,11 @@ class FrontendMetric(BaseModel): endpoint: typing.Optional[str] = None instance: typing.Optional[str] = None job: typing.Optional[str] = None - model: typing.Optional[str] = None - namespace: typing.Optional[str] = None - pod: typing.Optional[str] = None + model: typing.Optional[str] = None # Frontend uses this label + model_name: typing.Optional[str] = None # Backend (vLLM) uses this label + namespace: typing.Optional[str] = None # Kubernetes namespace + pod: typing.Optional[str] = None # Pod name (used for backend filtering) + engine: typing.Optional[str] = None # vLLM engine index class FrontendMetricContainer(BaseModel): @@ -43,9 +45,58 @@ class FrontendMetricContainer(BaseModel): class PrometheusAPIClient: - def __init__(self, url: str, dynamo_namespace: str): + """ + Client for querying Dynamo metrics from Prometheus. + + Supports querying both frontend and backend metrics: + - Frontend metrics: dynamo_frontend_* (from Dynamo HTTP frontend) + - Backend metrics: vllm:* (from vLLM engine workers) + + Usage: + # Query frontend metrics (default) + frontend_client = PrometheusAPIClient(url="http://prometheus:9090", + dynamo_namespace="my-deployment") + ttft = frontend_client.get_avg_time_to_first_token("60s", "llama-3-8b") + + # Query backend worker metrics + backend_client = PrometheusAPIClient(url="http://prometheus:9090", + dynamo_namespace="my-deployment", + metric_source="backend") + ttft = backend_client.get_avg_time_to_first_token("60s", "llama-3-8b") + """ + + # Metric name mapping for backend (vLLM) metrics + # Maps from frontend metric concept to actual vLLM metric name + BACKEND_METRIC_MAP = { + "time_to_first_token_seconds": "time_to_first_token_seconds", # histogram + "inter_token_latency_seconds": "inter_token_latency_seconds", # histogram + "request_duration_seconds": "e2e_request_latency_seconds", # histogram - vLLM's e2e latency + "input_sequence_tokens": "prompt_tokens_total", # counter - total prompt tokens + "output_sequence_tokens": "generation_tokens_total", # counter - total generation tokens + "requests_total": "request_success_total", # counter + } + + def __init__( + self, url: str, dynamo_namespace: str, metric_source: str = "frontend" + ): + """ + Initialize Prometheus API client. + + Args: + url: Prometheus server URL + dynamo_namespace: Dynamo namespace to filter metrics + metric_source: Either "frontend" or "backend". + "frontend" queries dynamo_frontend_* metrics + "backend" queries vllm:* metrics from workers + """ + if metric_source not in ["frontend", "backend"]: + raise ValueError( + f"metric_source must be 'frontend' or 'backend', got: {metric_source}" + ) + self.prom = PrometheusConnect(url=url, disable_ssl=True) self.dynamo_namespace = dynamo_namespace + self.metric_source = metric_source def _get_average_metric( self, full_metric_name: str, interval: str, operation_name: str, model_name: str @@ -55,19 +106,30 @@ def _get_average_metric( increase(metric_sum[interval])/increase(metric_count[interval]) Args: - full_metric_name: Full metric name (e.g., 'dynamo_frontend_inter_token_latency_seconds') + full_metric_name: Full metric name (e.g., 'dynamo_frontend_inter_token_latency_seconds' or 'time_to_first_token_seconds') interval: Time interval for the query (e.g., '60s') operation_name: Human-readable name for error logging + model_name: Model name to filter by Returns: Average metric value or 0 if no data/error """ try: - # Prepend the frontend metric prefix if not already present - if not full_metric_name.startswith(prometheus_names.name_prefix.FRONTEND): - full_metric_name = ( - f"{prometheus_names.name_prefix.FRONTEND}_{full_metric_name}" - ) + # Apply metric prefix based on source + if self.metric_source == "frontend": + # Prepend the frontend metric prefix if not already present + if not full_metric_name.startswith( + prometheus_names.name_prefix.FRONTEND + ): + full_metric_name = ( + f"{prometheus_names.name_prefix.FRONTEND}_{full_metric_name}" + ) + else: # backend + # Backend uses vllm: prefix + # Check if it's already a full vllm metric name (from our mapping) + if not full_metric_name.startswith("vllm:"): + full_metric_name = f"vllm:{full_metric_name}" + query = f"increase({full_metric_name}_sum[{interval}])/increase({full_metric_name}_count[{interval}])" result = self.prom.custom_query(query=query) if not result: @@ -80,12 +142,32 @@ def _get_average_metric( values = [] for container in metrics_containers: - # Frontend lowercases model names for Prometheus labels so we need to do case-insensitive comparison - if ( - container.metric.model - and container.metric.model.lower() == model_name.lower() - and container.metric.dynamo_namespace == self.dynamo_namespace - ): + # Determine which label to use for model filtering + if self.metric_source == "frontend": + # Frontend uses 'model' label and lowercases model names + model_match = ( + container.metric.model + and container.metric.model.lower() == model_name.lower() + ) + namespace_match = ( + container.metric.dynamo_namespace == self.dynamo_namespace + ) + else: # backend + # Backend (vLLM) uses 'model_name' label - check both for compatibility + backend_model = getattr( + container.metric, "model_name", container.metric.model + ) + model_match = ( + backend_model and backend_model.lower() == model_name.lower() + ) + # Backend metrics don't have dynamo_namespace, filter by pod name containing dynamo namespace + pod_name = getattr(container.metric, "pod", "") + namespace_match = ( + self.dynamo_namespace in pod_name if pod_name else True + ) + + # Filter by model and namespace + if model_match and namespace_match: values.append(container.value[1]) if not values: @@ -99,6 +181,70 @@ def _get_average_metric( logger.error(f"Error getting {operation_name}: {e}") return 0 + def _get_counter_average( + self, counter_metric: str, interval: str, model_name: str, operation_name: str + ) -> float: + """ + Get average value from a counter metric by dividing total increase by request count increase. + Used for vLLM token counters (prompt_tokens_total, generation_tokens_total). + + Formula: increase(counter_total[interval]) / increase(request_success_total[interval]) + """ + try: + full_metric_name = f"vllm:{counter_metric}" + requests_metric = "vllm:request_success_total" + + # Query both the counter and request count + counter_query = f"increase({full_metric_name}[{interval}])" + requests_query = f"increase({requests_metric}[{interval}])" + + counter_result = self.prom.custom_query(query=counter_query) + requests_result = self.prom.custom_query(query=requests_query) + + if not counter_result or not requests_result: + logger.warning( + f"No prometheus metric data available for {full_metric_name}, use 0 instead" + ) + return 0 + + counter_containers = parse_frontend_metric_containers(counter_result) + requests_containers = parse_frontend_metric_containers(requests_result) + + # Sum up values for matching pods + total_counter = 0.0 + total_requests = 0.0 + + for container in counter_containers: + backend_model = getattr( + container.metric, "model_name", container.metric.model + ) + if backend_model and backend_model.lower() == model_name.lower(): + pod_name = getattr(container.metric, "pod", "") + if self.dynamo_namespace in pod_name if pod_name else True: + total_counter += container.value[1] + + for container in requests_containers: + backend_model = getattr( + container.metric, "model_name", container.metric.model + ) + if backend_model and backend_model.lower() == model_name.lower(): + pod_name = getattr(container.metric, "pod", "") + if self.dynamo_namespace in pod_name if pod_name else True: + total_requests += container.value[1] + + if total_requests == 0: + logger.warning( + f"No requests for {operation_name} calculation, use 0 instead" + ) + return 0 + + average = total_counter / total_requests + return average + + except Exception as e: + logger.error(f"Error getting {operation_name}: {e}") + return 0 + def get_avg_inter_token_latency(self, interval: str, model_name: str): return self._get_average_metric( prometheus_names.frontend_service.INTER_TOKEN_LATENCY_SECONDS, @@ -116,6 +262,15 @@ def get_avg_time_to_first_token(self, interval: str, model_name: str): ) def get_avg_request_duration(self, interval: str, model_name: str): + if self.metric_source == "backend": + # Backend uses e2e_request_latency_seconds instead of request_duration_seconds + metric_name = self.BACKEND_METRIC_MAP["request_duration_seconds"] + return self._get_average_metric( + metric_name, + interval, + "avg e2e request latency", + model_name, + ) return self._get_average_metric( prometheus_names.frontend_service.REQUEST_DURATION_SECONDS, interval, @@ -124,28 +279,56 @@ def get_avg_request_duration(self, interval: str, model_name: str): ) def get_avg_request_count(self, interval: str, model_name: str): - # This function follows a different query pattern than the other metrics + """ + Get request count over the specified interval. + + For frontend: queries dynamo_frontend_requests_total + For backend: queries vllm:request_success_total + """ try: - requests_total_metric = prometheus_names.frontend_service.REQUESTS_TOTAL - # Prepend the frontend metric prefix if not already present - if not requests_total_metric.startswith( - prometheus_names.name_prefix.FRONTEND - ): - requests_total_metric = ( - f"{prometheus_names.name_prefix.FRONTEND}_{requests_total_metric}" - ) + if self.metric_source == "frontend": + requests_total_metric = prometheus_names.frontend_service.REQUESTS_TOTAL + # Prepend the frontend metric prefix if not already present + if not requests_total_metric.startswith( + prometheus_names.name_prefix.FRONTEND + ): + requests_total_metric = f"{prometheus_names.name_prefix.FRONTEND}_{requests_total_metric}" + else: # backend + # Backend uses vllm:request_success_total + requests_total_metric = "vllm:request_success_total" + raw_res = self.prom.custom_query( query=f"increase({requests_total_metric}[{interval}])" ) metrics_containers = parse_frontend_metric_containers(raw_res) total_count = 0.0 for container in metrics_containers: - # Frontend lowercases model names for Prometheus labels so we need to do case-insensitive comparison - if ( - container.metric.model - and container.metric.model.lower() == model_name.lower() - and container.metric.dynamo_namespace == self.dynamo_namespace - ): + # Determine which label to use for model filtering + if self.metric_source == "frontend": + # Frontend uses 'model' label and lowercases model names + model_match = ( + container.metric.model + and container.metric.model.lower() == model_name.lower() + ) + namespace_match = ( + container.metric.dynamo_namespace == self.dynamo_namespace + ) + else: # backend + # Backend (vLLM) uses 'model_name' label - check both for compatibility + backend_model = getattr( + container.metric, "model_name", container.metric.model + ) + model_match = ( + backend_model and backend_model.lower() == model_name.lower() + ) + # Backend metrics don't have dynamo_namespace, filter by pod name containing dynamo namespace + pod_name = getattr(container.metric, "pod", "") + namespace_match = ( + self.dynamo_namespace in pod_name if pod_name else True + ) + + # Filter by model and namespace + if model_match and namespace_match: total_count += container.value[1] return total_count except Exception as e: @@ -153,6 +336,12 @@ def get_avg_request_count(self, interval: str, model_name: str): return 0 def get_avg_input_sequence_tokens(self, interval: str, model_name: str): + if self.metric_source == "backend": + # Backend uses prompt_tokens counter (not histogram) + metric_name = self.BACKEND_METRIC_MAP["input_sequence_tokens"] + return self._get_counter_average( + metric_name, interval, model_name, "input_sequence_tokens" + ) return self._get_average_metric( prometheus_names.frontend_service.INPUT_SEQUENCE_TOKENS, interval, @@ -161,6 +350,12 @@ def get_avg_input_sequence_tokens(self, interval: str, model_name: str): ) def get_avg_output_sequence_tokens(self, interval: str, model_name: str): + if self.metric_source == "backend": + # Backend uses generation_tokens counter (not histogram) + metric_name = self.BACKEND_METRIC_MAP["output_sequence_tokens"] + return self._get_counter_average( + metric_name, interval, model_name, "output_sequence_tokens" + ) return self._get_average_metric( prometheus_names.frontend_service.OUTPUT_SEQUENCE_TOKENS, interval,