diff --git a/jenkins/runPerfSanityTriage.groovy b/jenkins/runPerfSanityTriage.groovy new file mode 100644 index 00000000000..c8f3a7f3d8f --- /dev/null +++ b/jenkins/runPerfSanityTriage.groovy @@ -0,0 +1,111 @@ +@Library(['bloom-jenkins-shared-lib@main', 'trtllm-jenkins-shared-lib@main']) _ + +import java.lang.InterruptedException + +DOCKER_IMAGE = "urm.nvidia.com/sw-tensorrt-docker/tensorrt-llm:pytorch-25.10-py3-x86_64-ubuntu24.04-trt10.13.3.9-skip-tritondevel-202510291120-8621" + +// LLM repository configuration +withCredentials([string(credentialsId: 'default-llm-repo', variable: 'DEFAULT_LLM_REPO')]) { + LLM_REPO = env.gitlabSourceRepoHttpUrl ? env.gitlabSourceRepoHttpUrl : "${DEFAULT_LLM_REPO}" +} +LLM_ROOT = "llm" + +def createKubernetesPodConfig(image, arch = "amd64") +{ + def archSuffix = arch == "arm64" ? "arm" : "amd" + def jnlpImage = "urm.nvidia.com/sw-ipp-blossom-sre-docker-local/lambda/custom_jnlp_images_${archSuffix}_linux:jdk17" + + def podConfig = [ + cloud: "kubernetes-cpu", + namespace: "sw-tensorrt", + yaml: """ + apiVersion: v1 + kind: Pod + spec: + nodeSelector: + nvidia.com/node_type: builder + kubernetes.io/os: linux + containers: + - name: trt-llm + image: ${image} + command: ['cat'] + volumeMounts: + - name: sw-tensorrt-pvc + mountPath: "/mnt/sw-tensorrt-pvc" + readOnly: false + tty: true + resources: + requests: + cpu: 2 + memory: 5Gi + ephemeral-storage: 25Gi + limits: + cpu: 2 + memory: 5Gi + ephemeral-storage: 25Gi + imagePullPolicy: Always + - name: jnlp + image: ${jnlpImage} + args: ['\$(JENKINS_SECRET)', '\$(JENKINS_NAME)'] + resources: + requests: + cpu: '2' + memory: 5Gi + ephemeral-storage: 25Gi + limits: + cpu: '2' + memory: 5Gi + ephemeral-storage: 25Gi + qosClass: Guaranteed + volumes: + - name: sw-tensorrt-pvc + persistentVolumeClaim: + claimName: sw-tensorrt-pvc + """.stripIndent(), + ] + + return podConfig +} + +pipeline { + agent { + kubernetes createKubernetesPodConfig(DOCKER_IMAGE) + } + options { + timestamps() + } + environment { + OPEN_SEARCH_DB_BASE_URL=credentials("open_search_db_base_url") + OPEN_SEARCH_DB_CREDENTIALS=credentials("open_search_db_credentials") + } + parameters { + string(name: "BRANCH", defaultValue: "main", description: "Branch to checkout.") + string(name: "OPEN_SEARCH_PROJECT_NAME", defaultValue: "swdl-trtllm-infra-ci-prod-perf_sanity_info", description: "OpenSearch project name.") + string(name: "OPERATION", defaultValue: "SLACK BOT SENDS MESSAGE", description: "Operation to perform.") + string(name: "QUERY_JOB_NUMBER", defaultValue: "1", description: "Number of latest jobs to query.") + string(name: "SLACK_CHANNEL_ID", defaultValue: "C0A7D0LCA1F", description: "Slack channel IDs to send messages to.") + string(name: "SLACK_BOT_TOKEN", defaultValue: "", description: "Slack bot token for authentication.") + } + stages { + stage("Run Perf Sanity Script") { + steps { + container("trt-llm") { + script { + sh "pwd && ls -alh" + sh "env | sort" + trtllm_utils.checkoutSource(LLM_REPO, params.BRANCH, LLM_ROOT, false, false) + sh "pip install slack_sdk" + sh """ + cd ${LLM_ROOT}/jenkins/scripts/perf && ls -alh && python3 perf_sanity_triage.py \ + --project_name "${params.OPEN_SEARCH_PROJECT_NAME}" \ + --operation "${params.OPERATION}" \ + --channel_id "${params.SLACK_CHANNEL_ID}" \ + --bot_token "${params.SLACK_BOT_TOKEN}" \ + --query_job_number "${params.QUERY_JOB_NUMBER}" + """ + } + } + } + } // stage Run Perf Sanity Script + } // stages +} // pipeline diff --git a/jenkins/scripts/perf/perf_sanity_triage.py b/jenkins/scripts/perf/perf_sanity_triage.py new file mode 100644 index 00000000000..defe07106f5 --- /dev/null +++ b/jenkins/scripts/perf/perf_sanity_triage.py @@ -0,0 +1,251 @@ +#!/usr/bin/env python3 + +import argparse +import json +import sys +import time + +from slack_sdk import WebClient +from slack_sdk.errors import SlackApiError + +sys.path.insert(0, sys.path[0] + "/..") +from open_search_db import OpenSearchDB + +QUERY_LOOKBACK_DAYS = 90 +MAX_QUERY_SIZE = 3000 +MAX_TEST_CASES_PER_MSG = 5 +POST_SLACK_MSG_RETRY_TIMES = 5 + + +def query_regression_data(project_name): + """Query regression data from OpenSearch database.""" + last_days = QUERY_LOOKBACK_DAYS + + must_clauses = [ + {"term": {"b_is_valid": True}}, + {"term": {"b_is_post_merge": True}}, + {"term": {"b_is_regression": True}}, + {"term": {"b_is_baseline": False}}, + { + "range": { + "ts_created": { + "gte": int(time.time() - 24 * 3600 * last_days) + // (24 * 3600) + * 24 + * 3600 + * 1000, + } + } + }, + ] + + json_data = { + "query": { + "bool": {"must": must_clauses}, + }, + "size": MAX_QUERY_SIZE, + } + json_data = json.dumps(json_data) + + data_list = [] + try: + res = OpenSearchDB.queryFromOpenSearchDB(json_data, project_name) + if res is None: + print(f"Failed to query from {project_name}, returned no response") + return None + payload = res.json().get("hits", {}).get("hits", []) + if len(payload) == 0: + print(f"No regression data found in {project_name}, returned empty list") + return [] + for hit in payload: + data_dict = hit.get("_source", {}) + data_dict["_id"] = hit.get("_id", "") + if data_dict["_id"] == "": + print(f"Failed to query from {project_name}, returned data with no _id") + return None + data_list.append(data_dict) + print(f"Successfully queried from {project_name}, queried {len(data_list)} entries") + return data_list + except Exception as e: + print(f"Failed to query from {project_name}, returned error: {e}") + return None + + +def get_regression_data_by_job_id(data_list, query_job_number): + """Returns a dict with job_id as key and list of regression data as value. + + Only returns the latest query_job_number jobs. + """ + if data_list is None or len(data_list) == 0: + return {} + + # Group data by job_id + job_data_dict = {} + for data in data_list: + job_id = data.get("s_job_id", "") + if job_id == "": + continue + if job_id not in job_data_dict: + job_data_dict[job_id] = [] + job_data_dict[job_id].append(data) + + # Sort job_ids by the latest ts_created in each group (descending) + def get_latest_timestamp(job_id): + timestamps = [d.get("ts_created", 0) for d in job_data_dict[job_id]] + return max(timestamps) if timestamps else 0 + + sorted_job_ids = sorted(job_data_dict.keys(), key=get_latest_timestamp, reverse=True) + + # Only keep the latest query_job_number jobs + latest_job_ids = sorted_job_ids[:query_job_number] + + result = {} + for job_id in latest_job_ids: + result[job_id] = job_data_dict[job_id] + + return result + + +def process_regression_message(regression_dict): + """Process regression data into message chunks. + + Returns a list of messages, each containing at most MAX_TEST_CASES_PER_MSG test cases. + """ + if not regression_dict: + return [] + + # Flatten all test cases into a list with (job_id, idx, data) tuples + all_test_cases = [] + for job_id, data_list in regression_dict.items(): + sorted_data_list = sorted(data_list, key=lambda x: x.get("s_test_case_name", "")) + for idx, data in enumerate(sorted_data_list, start=1): + all_test_cases.append((job_id, idx, data)) + + # Split into chunks of MAX_TEST_CASES_PER_MSG + chunks = [] + for i in range(0, len(all_test_cases), MAX_TEST_CASES_PER_MSG): + chunks.append(all_test_cases[i : i + MAX_TEST_CASES_PER_MSG]) + + # Build messages for each chunk + messages = [] + for chunk in chunks: + msg_parts = [] + current_job_id = None + for job_id, idx, data in chunk: + # Add job header when switching to a new job_id + if job_id != current_job_id: + if msg_parts: + msg_parts.append("\n") + job_header = f"*LLM/main/L0_PostMerge/{job_id}:*\n" + msg_parts.append(job_header) + current_job_id = job_id + + test_case_name = data.get("s_test_case_name", "N/A") + regression_info = data.get("s_regression_info", "N/A") + msg_parts.append(f"*REGRESSION TEST CASE {idx}: {test_case_name}*\n") + for part in regression_info.split(","): + part = part.strip() + if part and "baseline_id" not in part: + msg_parts.append(f" {part}\n") + + msg = "".join(msg_parts).strip() + messages.append(msg) + + return messages + + +def send_regression_message(messages, channel_id, bot_token): + """Send regression messages to Slack channel(s). + + channel_id can be a single ID or multiple IDs separated by commas. + """ + if not messages: + print("No regression data to send") + return + + if channel_id and bot_token: + channel_ids = [cid.strip() for cid in channel_id.split(",") if cid.strip()] + for cid in channel_ids: + for msg in messages: + send_message(msg, cid, bot_token) + else: + print("Slack channel_id or bot_token not provided, printing message:") + for i, msg in enumerate(messages, start=1): + print(f"--- Message {i} ---") + print(msg) + + +def send_message(msg, channel_id, bot_token): + """Send message to Slack channel using slack_sdk.""" + client = WebClient(token=bot_token) + + attachments = [ + { + "title": "Perf Sanity Regression Report", + "color": "#ff0000", + "text": msg, + } + ] + + for attempt in range(1, POST_SLACK_MSG_RETRY_TIMES + 1): + try: + result = client.chat_postMessage( + channel=channel_id, + attachments=attachments, + ) + assert result["ok"] is True, json.dumps(result.data) + print(f"Message sent successfully to channel {channel_id}") + return + except SlackApiError as e: + print( + f"Attempt {attempt}/{POST_SLACK_MSG_RETRY_TIMES}: Error sending message to Slack: {e}" + ) + except Exception as e: + print(f"Attempt {attempt}/{POST_SLACK_MSG_RETRY_TIMES}: Unexpected error: {e}") + + if attempt < POST_SLACK_MSG_RETRY_TIMES: + time.sleep(1) + + print( + f"Failed to send message to channel {channel_id} after {POST_SLACK_MSG_RETRY_TIMES} attempts" + ) + + +def main(): + parser = argparse.ArgumentParser(description="Perf Sanity Triage Script") + parser.add_argument("--project_name", type=str, required=True, help="OpenSearch project name") + parser.add_argument("--operation", type=str, required=True, help="Operation to perform") + parser.add_argument( + "--channel_id", + type=str, + default="", + help="Slack channel ID(s), comma-separated for multiple channels", + ) + parser.add_argument("--bot_token", type=str, default="", help="Slack bot token") + parser.add_argument( + "--query_job_number", type=int, default=1, help="Number of latest jobs to query" + ) + + args = parser.parse_args() + + print(f"Project Name: {args.project_name}") + print(f"Operation: {args.operation}") + print(f"Channel ID: {args.channel_id}") + print(f"Bot Token: {'***' if args.bot_token else 'Not provided'}") + print(f"Query Job Number: {args.query_job_number}") + + if args.operation == "SLACK BOT SENDS MESSAGE": + data_list = query_regression_data(args.project_name) + if data_list is None: + print("Failed to query regression data") + return + + regression_dict = get_regression_data_by_job_id(data_list, args.query_job_number) + messages = process_regression_message(regression_dict) + send_regression_message(messages, args.channel_id, args.bot_token) + else: + print(f"Unknown operation: {args.operation}") + + +if __name__ == "__main__": + main() diff --git a/tests/integration/defs/perf/disagg/test_configs/disagg/perf/deepseek-r1-fp4_1k1k_ctx1_gen1_dep8_bs768_eplb0_mtp0_ccb-UCX.yaml b/tests/integration/defs/perf/disagg/test_configs/disagg/perf/deepseek-r1-fp4_1k1k_ctx1_gen1_dep8_bs768_eplb0_mtp0_ccb-UCX.yaml index 387704da4a1..97ce35a0a7f 100644 --- a/tests/integration/defs/perf/disagg/test_configs/disagg/perf/deepseek-r1-fp4_1k1k_ctx1_gen1_dep8_bs768_eplb0_mtp0_ccb-UCX.yaml +++ b/tests/integration/defs/perf/disagg/test_configs/disagg/perf/deepseek-r1-fp4_1k1k_ctx1_gen1_dep8_bs768_eplb0_mtp0_ccb-UCX.yaml @@ -22,7 +22,7 @@ benchmark: multi_round: 8 benchmark_ratio: 0.8 streaming: true - concurrency_list: '6144' + concurrency_list: '1024' input_length: 1024 output_length: 1024 dataset_file: diff --git a/tests/integration/defs/perf/open_search_db_utils.py b/tests/integration/defs/perf/open_search_db_utils.py index 5fe40eecb24..fbb2ed78095 100644 --- a/tests/integration/defs/perf/open_search_db_utils.py +++ b/tests/integration/defs/perf/open_search_db_utils.py @@ -22,7 +22,7 @@ import time from datetime import datetime -from defs.trt_test_alternative import print_error, print_info, print_warning +from defs.trt_test_alternative import print_info, print_warning _project_root = os.path.abspath( os.path.join(os.path.dirname(__file__), '../../../..')) @@ -282,28 +282,27 @@ def query_history_data(common_values_dict): f"Failed to query from {TEST_INFO_PROJECT_NAME}, returned no response" ) return None - else: - payload = res.json().get("hits", {}).get("hits", []) - if len(payload) == 0: - # No history data found in database, return empty list - print_info( - f"No history data found in {TEST_INFO_PROJECT_NAME}, returned empty list" - ) - return [] - for hit in payload: - data_dict = hit.get("_source", {}) - data_dict["_id"] = hit.get("_id", "") - if data_dict["_id"] == "": - print_info( - f"Failed to query from {TEST_INFO_PROJECT_NAME}, returned data with no _id" - ) - # Invalid data, return None - return None - data_list.append(data_dict) + payload = res.json().get("hits", {}).get("hits", []) + if len(payload) == 0: + # No history data found in database, return empty list print_info( - f"Successfully queried from {TEST_INFO_PROJECT_NAME}, queried {len(data_list)} entries" + f"No history data found in {TEST_INFO_PROJECT_NAME}, returned empty list" ) - return data_list + return [] + for hit in payload: + data_dict = hit.get("_source", {}) + data_dict["_id"] = hit.get("_id", "") + if data_dict["_id"] == "": + print_info( + f"Failed to query from {TEST_INFO_PROJECT_NAME}, returned data with no _id" + ) + # Invalid data, return None + return None + data_list.append(data_dict) + print_info( + f"Successfully queried from {TEST_INFO_PROJECT_NAME}, queried {len(data_list)} entries" + ) + return data_list except Exception as e: print_info( f"Failed to query from {TEST_INFO_PROJECT_NAME}, returned error: {e}" @@ -522,7 +521,7 @@ def prepare_regressive_test_cases(history_baseline_dict, new_data_dict): # Add metric info to s_regression_info metric_info = (f"{metric}'s value: {new_value} " f"baseline value: {baseline_value} " - f"threshold: {threshold} " + f"threshold: {threshold * 100:.2f}% " f"diff: {diff:+.2f}%") info_parts.append(metric_info) @@ -643,65 +642,19 @@ def _get_metric_keys(): return metric_keys -def _print_perf_data(data): - """Print performance metrics and config for a single data entry.""" - print_info("=== Metrics ===") - for metric in MAXIMIZE_METRICS + MINIMIZE_METRICS: - if metric in data: - value = data.get(metric, "N/A") - print_info(f'"{metric}": {value}') - - metric_keys = _get_metric_keys() - print_info("\n=== Config ===") - config_keys = sorted([key for key in data.keys() if key not in metric_keys]) - for key in config_keys: - value = data[key] - print_info(f'"{key}": {value}') - - def _print_regression_data(data, print_func=None): """ - Print regression info, metrics with baselines/thresholds, and config. + Print regression info and config. """ if print_func is None: print_func = print_info if "s_regression_info" in data: print_func("=== Regression Info ===") - print_func(f"{data['s_regression_info']}") + for item in data["s_regression_info"].split(","): + print_func(item.strip()) metric_keys = _get_metric_keys() - is_post_merge = data.get("b_is_post_merge", False) - - print_func("=== Metrics ===") - for metric in MAXIMIZE_METRICS + MINIMIZE_METRICS: - metric_suffix = metric[2:] # Strip "d_" prefix - baseline_key = f"d_baseline_{metric_suffix}" - if is_post_merge: - threshold_key = f"d_threshold_post_merge_{metric_suffix}" - else: - threshold_key = f"d_threshold_pre_merge_{metric_suffix}" - # Only print if at least one of the keys exists - if metric in data or baseline_key in data or threshold_key in data: - value = data.get(metric, "N/A") - baseline = data.get(baseline_key, "N/A") - threshold = data.get(threshold_key, "N/A") - # Calculate percentage difference between value and baseline - # Positive percentage means better perf, negative means regression - if (isinstance(value, (int, float)) - and isinstance(baseline, (int, float)) and baseline != 0): - if metric in MAXIMIZE_METRICS: - # Larger is better: value > baseline is positive (better) - percentage = (value - baseline) / baseline * 100 - else: - # Smaller is better: value < baseline is positive (better) - percentage = (baseline - value) / baseline * 100 - percentage_str = f"{percentage:+.2f}%" - else: - percentage_str = "N/A" - print_func( - f'"{metric}": {value}, "{baseline_key}": {baseline}, ' - f'"{threshold_key}": {threshold}, "diff": {percentage_str}') print_func("\n=== Config ===") config_keys = sorted([key for key in data.keys() if key not in metric_keys]) @@ -712,16 +665,17 @@ def _print_regression_data(data, print_func=None): print_func(f'"{key}": {value}') -def check_perf_regression(new_data_dict): +def check_perf_regression(new_data_dict, fail_on_regression=False): """ Check performance regression by printing regression data from new_data_dict. + If fail_on_regression is True, raises RuntimeError when regressions are found. + (This is a temporary feature to fail regression tests. We are observing the stability and will fail them by default soon.) """ # Filter regression data from new_data_dict regressive_data_list = [ data for data in new_data_dict.values() if data.get("b_is_regression", False) ] - # Split regression data into post-merge and pre-merge post_merge_regressions = [ data for data in regressive_data_list @@ -735,24 +689,34 @@ def check_perf_regression(new_data_dict): # Print pre-merge regression data with print_warning if len(pre_merge_regressions) > 0: print_warning( - f"Found {len(pre_merge_regressions)} pre-merge regression data") + f"Found {len(pre_merge_regressions)} pre-merge perf regression data" + ) for i, data in enumerate(pre_merge_regressions): print_warning(f"\n{'=' * 60}") print_warning(f"Pre-merge Regression Data #{i + 1}") print_warning("=" * 60) _print_regression_data(data, print_func=print_warning) - # Print post-merge regression data with print_error + if fail_on_regression: + raise RuntimeError( + f"Found {len(pre_merge_regressions)} pre-merge perf regression data" + ) + + # Print post-merge regression data with print_warning if len(post_merge_regressions) > 0: + print_warning( + f"Found {len(post_merge_regressions)} post-merge perf regression data" + ) for i, data in enumerate(post_merge_regressions): - print_error(f"\n{'=' * 60}") - print_error(f"Post-merge Regression Data #{i + 1}") - print_error("=" * 60) - _print_regression_data(data, print_func=print_error) - print_error( - f"Found {len(post_merge_regressions)} post-merge regression data") - raise RuntimeError( - f"Found {len(post_merge_regressions)} post-merge regression data") + print_warning(f"\n{'=' * 60}") + print_warning(f"Post-merge Regression Data #{i + 1}") + print_warning("=" * 60) + _print_regression_data(data, print_func=print_warning) + + if fail_on_regression: + raise RuntimeError( + f"Found {len(post_merge_regressions)} post-merge perf regression data" + ) # Print summary if no regressions if len(regressive_data_list) == 0: diff --git a/tests/integration/defs/perf/test_perf_sanity.py b/tests/integration/defs/perf/test_perf_sanity.py index 7bad9cf7f40..3cbc5f16c7f 100644 --- a/tests/integration/defs/perf/test_perf_sanity.py +++ b/tests/integration/defs/perf/test_perf_sanity.py @@ -23,7 +23,7 @@ import socket import subprocess import time -from typing import Dict, List, NamedTuple, Tuple +from typing import Dict, List, NamedTuple, Optional, Tuple import pytest import requests @@ -124,6 +124,7 @@ def __init__(self, server_config_data: dict, env_vars: str = ""): self.model_name = server_config_data["model_name"] self.model_path = "" self.env_vars = env_vars + self.disagg_run_type = server_config_data.get("disagg_run_type", "aggr") # Extract optional fields with defaults self.tp = server_config_data.get("tensor_parallel_size", 1) @@ -220,8 +221,10 @@ def __init__(self, server_config_data: dict, env_vars: str = ""): "concurrency", "name", "model_name", + "disagg_run_type", "gpus", "gpus_per_node", + "match_mode", "client_configs", ] self.extra_llm_api_config_data = { @@ -234,7 +237,7 @@ def to_cmd( """Generate server command.""" model_dir = get_model_dir(self.model_name) self.model_path = model_dir if os.path.exists(model_dir) else self.model_name - config_filename = f"extra-llm-api-config.{self.name}.yml" + config_filename = f"extra-llm-api-config.{self.disagg_run_type}.{self.name}.yml" config_path = os.path.join(output_dir, config_filename) numa_bind_cmd = [] @@ -667,10 +670,13 @@ def wait_for_benchmark_ready(self, benchmark_status_file: str): break time.sleep(10) - def wait_for_endpoint_ready(self, url: str): + def wait_for_endpoint_ready(self, url: str, server_files: List[str] = None): """Wait for endpoint to be ready.""" start = time.monotonic() + iteration = 0 + error_keywords = ["RuntimeError", "out of memory", "ValueError"] while True: + iteration += 1 elapsed_time = time.monotonic() - start if elapsed_time > self.timeout: print_error( @@ -678,6 +684,22 @@ def wait_for_endpoint_ready(self, url: str): ) break print_info(f"Waiting for endpoint {url} to be ready, elapsed time: {elapsed_time}s") + + if server_files and iteration % 30 == 0: + for server_file in server_files: + if os.path.exists(server_file): + try: + with open(server_file, "r") as f: + content = f.read() + for line in content.splitlines(): + for keyword in error_keywords: + if keyword in line: + print_error( + f"Found '{keyword}' in server file {server_file}: {line}" + ) + except Exception as e: + print_info(f"Failed to read server file {server_file}: {e}") + try: time.sleep(10) if requests.get(url).status_code == 200: @@ -693,7 +715,6 @@ def run_cmd(self, server_idx: int) -> List[str]: port = get_free_port() ctx_cmd, gen_cmd, disagg_cmd = self.server_cmds[server_idx] - if "CTX" in self.disagg_serving_type or "GEN" in self.disagg_serving_type: self._generate_hostname_file(server_idx, port) server_file_path = os.path.join( @@ -702,7 +723,6 @@ def run_cmd(self, server_idx: int) -> List[str]: is_ctx = "CTX" in self.disagg_serving_type server_cmd = ctx_cmd if is_ctx else gen_cmd server_cmd = add_host_port_to_cmd(server_cmd, self.hostname, port) - try: print_info( f"Starting server. disagg_serving_type: {self.disagg_serving_type} cmd is {server_cmd}" @@ -724,7 +744,6 @@ def run_cmd(self, server_idx: int) -> List[str]: disagg_server_file_path = os.path.join( self.output_dir, f"trtllm-serve.{server_idx}.{self.disagg_serving_type}.log" ) - try: self._generate_disagg_server_config(server_idx, port) print_info(f"Starting disagg server. cmd is {disagg_cmd}") @@ -746,8 +765,24 @@ def run_cmd(self, server_idx: int) -> List[str]: disagg_server_hostname, disagg_server_port = ( self._get_disagg_server_hostname_and_port(server_idx) ) + server_files = [ + os.path.join(self.output_dir, f"trtllm-serve.{server_idx}.DISAGG_SERVER.log"), + ] + for ctx_idx in range(self.num_ctx_servers): + server_files.append( + os.path.join( + self.output_dir, f"trtllm-serve.{server_idx}.CTX_{ctx_idx}.log" + ) + ) + for gen_idx in range(self.num_gen_servers): + server_files.append( + os.path.join( + self.output_dir, f"trtllm-serve.{server_idx}.GEN_{gen_idx}.log" + ) + ) self.wait_for_endpoint_ready( - f"http://{disagg_server_hostname}:{disagg_server_port}/health" + f"http://{disagg_server_hostname}:{disagg_server_port}/health", + server_files=server_files, ) # Run all clients for this server @@ -799,7 +834,6 @@ class PerfSanityTestConfig: def __init__(self, test_case_name: str, output_dir: str): self._output_dir = output_dir - self._test_results: Dict[int, Dict[str, float]] = {} self._perf_results: Dict[int, List[Dict[str, float]]] = {} # Parse test case name @@ -977,6 +1011,7 @@ def _parse_disagg_config_file(self, config_file_path: str, config_file: str): "name": config_file_base_name, "model_name": model_name, "gpus_per_node": gpus_per_node, + "disagg_run_type": "ctx", **worker_config.get("ctx", {}), } @@ -986,6 +1021,7 @@ def _parse_disagg_config_file(self, config_file_path: str, config_file: str): "name": config_file_base_name, "model_name": model_name, "gpus_per_node": gpus_per_node, + "disagg_run_type": "gen", **worker_config.get("gen", {}), } @@ -1047,7 +1083,7 @@ def _get_aggr_commands(self, output_dir: str): # Generate extra-llm-api-config.yml config_content = server_config.generate_extra_llm_api_config() - config_filename = f"extra-llm-api-config.{server_config.name}.yml" + config_filename = f"extra-llm-api-config.aggr.{server_config.name}.yml" config_path = os.path.join(output_dir, config_filename) with open(config_path, "w") as f: f.write(config_content) @@ -1080,7 +1116,9 @@ def _get_disagg_commands(self, output_dir: str): ctx_cmd = ctx_config.to_cmd(output_dir, numa_bind, "CTX") if "CTX" in disagg_serving_type: config_content = ctx_config.generate_extra_llm_api_config() - config_path = os.path.join(output_dir, "extra-llm-api-config.ctx.yml") + config_path = os.path.join( + output_dir, f"extra-llm-api-config.ctx.{ctx_config.name}.yml" + ) with open(config_path, "w") as f: f.write(config_content) @@ -1088,7 +1126,9 @@ def _get_disagg_commands(self, output_dir: str): gen_cmd = gen_config.to_cmd(output_dir, numa_bind, "GEN") if "GEN" in disagg_serving_type: config_content = gen_config.generate_extra_llm_api_config() - config_path = os.path.join(output_dir, "extra-llm-api-config.gen.yml") + config_path = os.path.join( + output_dir, f"extra-llm-api-config.gen.{gen_config.name}.yml" + ) with open(config_path, "w") as f: f.write(config_content) @@ -1165,44 +1205,59 @@ def _check_benchmark_output_for_errors(self, output: str) -> None: if failed_requests_match: failed_count = int(failed_requests_match.group(1)) if failed_count > 0: - print_error(f"Benchmark output contains {failed_count} failed requests.") - raise Exception(f"Benchmark has {failed_count} failed requests") + error_msg = f"Benchmark output contains {failed_count} failed requests." + raise Exception(error_msg) # Check for explicit failure markers if "!FAILED REQUESTS!" in output or "!CHECK LOG FOR ERRORS!" in output: - print_error("Benchmark output contains failure markers.") - raise Exception("Benchmark output contains failure markers") + error_msg = "Benchmark output contains failure markers." + raise Exception(error_msg) def get_perf_result(self, outputs: Dict[int, List[str]]): """Parse performance results from outputs.""" - self._perf_results = {} - - for server_idx, server_outputs in outputs.items(): - self._perf_results[server_idx] = [] - for output in server_outputs: - metrics = {} + def parse_metrics_from_output(output: str) -> Optional[Dict[str, float]]: + """Parse all metrics from a single output string.""" + metrics = {} + for line in output.split("\n"): for metric_type, regex in PERF_METRIC_LOG_QUERIES.items(): - regex_matches = [regex.search(line) for line in output.split("\n")] - for match in regex_matches: - if match: - value = None - for i in range(1, len(match.groups()) + 1): - if match.group(i) is not None: - value = match.group(i) - break - if value is not None: - metrics[metric_type] = float(value) - break + if metric_type in metrics: + continue + match = regex.search(line) + if match: + metrics[metric_type] = float(match.group(1)) + break + return metrics + self._perf_results = {} + for server_idx, client_configs in self.server_client_configs.items(): + self._perf_results[server_idx] = [] + server_outputs = outputs.get(server_idx, []) + for output in server_outputs: + metrics = parse_metrics_from_output(output) self._perf_results[server_idx].append(metrics) - # Also populate _test_results for upload (flattened view) - cmd_idx = 0 - for server_idx in sorted(self._perf_results.keys()): - for client_metrics in self._perf_results[server_idx]: - self._test_results[cmd_idx] = client_metrics - cmd_idx += 1 + def check_test_failure(self): + """Check if any server failed based on perf results.""" + error_msg = "" + for server_idx, client_configs in self.server_client_configs.items(): + server_perf_results = self._perf_results.get(server_idx, []) + if len(server_perf_results) != len(client_configs): + error_msg += ( + f"Server {server_idx}'s perf results number: {len(server_perf_results)} " + f"is not equal to client number: {len(client_configs)}. " + ) + for client_idx, metrics in enumerate(server_perf_results): + if len(metrics) != len(PERF_METRIC_LOG_QUERIES): + error_msg += ( + f"Some metrics in Server {server_idx} Client {client_idx} are missing. " + f"The broken metrics is {metrics}. " + ) + + if error_msg: + raise Exception(error_msg) + + print_info("All servers passed") def upload_test_results_to_database(self): """Upload test results and baseline to database.""" @@ -1219,25 +1274,27 @@ def add_dict_prefix(config_dict: dict, prefix_name: str) -> dict: return {add_prefix(key, prefix_name): value for key, value in config_dict.items()} match_keys = [] + is_scenario_mode = False if self.runtime == "aggr_server": job_config = get_job_info() is_post_merge = job_config["b_is_post_merge"] new_data_dict = {} cmd_idx = 0 - for server_idx, client_configs in self.server_client_configs.items(): server_config = self.server_configs[server_idx] server_config_dict = server_config.to_db_data() + server_perf_results = self._perf_results.get(server_idx, []) + # Skip if server failed + if len(server_perf_results) != len(client_configs): + cmd_idx += len(client_configs) + continue - for client_config in client_configs: + for client_idx, client_config in enumerate(client_configs): client_config_dict = client_config.to_db_data() # Skip if metrics missing - if cmd_idx not in self._test_results or not all( - metric_name in self._test_results[cmd_idx] - for metric_name in PERF_METRIC_LOG_QUERIES - ): + if server_perf_results[client_idx] is None: print_info( f"Skipped posting command {cmd_idx}'s test results since some metrics are missing." ) @@ -1257,8 +1314,7 @@ def add_dict_prefix(config_dict: dict, prefix_name: str) -> dict: new_data["s_test_case_name"] = f"{server_config.name}-{client_config.name}" for metric_name in PERF_METRIC_LOG_QUERIES: - if metric_name in self._test_results[cmd_idx]: - new_data[f"d_{metric_name}"] = self._test_results[cmd_idx][metric_name] + new_data[f"d_{metric_name}"] = server_perf_results[client_idx][metric_name] add_id(new_data) new_data_dict[cmd_idx] = new_data @@ -1268,6 +1324,7 @@ def add_dict_prefix(config_dict: dict, prefix_name: str) -> dict: match_keys.extend(["s_gpu_type", "s_runtime"]) if server_config.match_mode == "scenario": match_keys = SCENARIO_MATCH_FIELDS.copy() + is_scenario_mode = True else: match_keys.extend(server_config.to_match_keys()) match_keys.extend(client_config.to_match_keys()) @@ -1285,12 +1342,16 @@ def add_dict_prefix(config_dict: dict, prefix_name: str) -> dict: for server_idx, (ctx_config, gen_config, disagg_config) in enumerate( self.server_configs ): - for client_config in self.server_client_configs[server_idx]: + client_configs = self.server_client_configs[server_idx] + server_perf_results = self._perf_results.get(server_idx, []) + # Skip if server failed + if len(server_perf_results) != len(client_configs): + cmd_idx += len(client_configs) + continue + + for client_idx, client_config in enumerate(client_configs): # Skip if metrics missing - if cmd_idx not in self._test_results or not all( - metric_name in self._test_results[cmd_idx] - for metric_name in PERF_METRIC_LOG_QUERIES - ): + if server_perf_results[client_idx] is None: print_info( f"Skipped posting command {cmd_idx}'s test results since some metrics are missing." ) @@ -1323,8 +1384,7 @@ def add_dict_prefix(config_dict: dict, prefix_name: str) -> dict: new_data["s_test_case_name"] = f"{disagg_config.name}-{client_config.name}" for metric_name in PERF_METRIC_LOG_QUERIES: - if metric_name in self._test_results[cmd_idx]: - new_data[f"d_{metric_name}"] = self._test_results[cmd_idx][metric_name] + new_data[f"d_{metric_name}"] = server_perf_results[client_idx][metric_name] add_id(new_data) new_data_dict[cmd_idx] = new_data @@ -1376,7 +1436,7 @@ def add_dict_prefix(config_dict: dict, prefix_name: str) -> dict: # Upload the new perf data and baseline data to database post_new_perf_data(new_baseline_data_dict, new_data_dict) - check_perf_regression(new_data_dict) + check_perf_regression(new_data_dict, fail_on_regression=is_scenario_mode) # Perf sanity test case parameters @@ -1479,5 +1539,8 @@ def test_e2e(output_dir, perf_sanity_test_case): # Parse performance results config.get_perf_result(outputs) + # Check for test failures + config.check_test_failure() + # Upload results to database config.upload_test_results_to_database() diff --git a/tests/integration/test_lists/waives.txt b/tests/integration/test_lists/waives.txt index dcef1eabea1..d1cefb2a0c6 100644 --- a/tests/integration/test_lists/waives.txt +++ b/tests/integration/test_lists/waives.txt @@ -475,7 +475,6 @@ disaggregated/test_disaggregated.py::test_disaggregated_benchmark_on_diff_backen disaggregated/test_disaggregated.py::test_disaggregated_benchmark_on_diff_backends[DeepSeek-V3-Lite-bf16] SKIP (https://nvbugs/5769890) disaggregated/test_disaggregated.py::test_disaggregated_benchmark_on_diff_backends[llama-v3-8b-hf] SKIP (https://nvbugs/5769890,https://nvbugs/5748683) accuracy/test_llm_api_pytorch.py::TestDeepSeekR1::test_nvfp4_multi_gpus[throughput_pp4_mtp] SKIP (https://nvbugs/5779536) -perf/test_perf_sanity.py::test_e2e[disagg_upload-deepseek-r1-fp4_1k1k_ctx1_gen1_dep8_bs768_eplb0_mtp0_ccb-UCX] SKIP (https://nvbugs/5778381) unittest/_torch/attention/test_flashinfer_star_attn.py::TestStarAttention::test_flashinfer_star_attention[num_layers:2-num_heads:32-num_kv_heads:8-head_dim:64-anchor_size:64-block_size:64-dtype:torch.float16] SKIP (https://nvbugs/5781389) unittest/_torch/ray_orchestrator/multi_gpu/test_ops.py::test_reducescatter_pg_op[var_len:True-seqlen:16-hidden:128] SKIP (https://nvbugs/5781383) cpp/test_e2e.py::test_model[-mamba-86] SKIP (https://nvbugs/5781665) diff --git a/tests/scripts/perf-sanity/deepseek_r1_fp4_v2_grace_blackwell.yaml b/tests/scripts/perf-sanity/deepseek_r1_fp4_v2_grace_blackwell.yaml index e17acb10bae..46c89fe768f 100644 --- a/tests/scripts/perf-sanity/deepseek_r1_fp4_v2_grace_blackwell.yaml +++ b/tests/scripts/perf-sanity/deepseek_r1_fp4_v2_grace_blackwell.yaml @@ -131,7 +131,7 @@ server_configs: iterations: 5 isl: 8192 osl: 1024 - random_range_ratio: 0.8 + random_range_ratio: 0.2 backend: "openai" - name: "r1_fp4_v2_tep4_mtp3_8k1k" @@ -161,7 +161,7 @@ server_configs: iterations: 10 isl: 8192 osl: 1024 - random_range_ratio: 0.8 + random_range_ratio: 0.2 backend: "openai" - name: "r1_fp4_v2_tp4_mtp3_8k1k" @@ -191,7 +191,7 @@ server_configs: iterations: 10 isl: 8192 osl: 1024 - random_range_ratio: 0.8 + random_range_ratio: 0.2 backend: "openai" # 1k8k configs