diff --git a/jenkins/L0_Test.groovy b/jenkins/L0_Test.groovy index ce5842d7c21..00ddbc23cdd 100644 --- a/jenkins/L0_Test.groovy +++ b/jenkins/L0_Test.groovy @@ -887,7 +887,8 @@ def runLLMTestlistWithSbatch(pipeline, platform, testList, config=VANILLA_CONFIG // Create a unique suffix for the job name String customSuffix = "${env.BUILD_TAG}-${UUID.randomUUID().toString().replaceAll("-", "").substring(0, 6)}".toLowerCase() def jobUID = "${cluster.host}-multi_node_test-${customSuffix}" - def disaggMode = stageName.contains("Perf-Sanity-Disagg") + def perfSanityMode = stageName.contains("PerfSanity") + def disaggMode = stageName.contains("PerfSanity-Disagg") def setSegment = disaggMode Utils.exec(pipeline, script: "env | sort && pwd && ls -alh") @@ -930,6 +931,8 @@ def runLLMTestlistWithSbatch(pipeline, platform, testList, config=VANILLA_CONFIG def scriptExecPathLocal = Utils.createTempLocation(pipeline, "./slurm_exec.sh") def scriptExecPathNode = "${jobWorkspace}/${jobUID}-slurm_exec.sh" def coverageConfigFile = "${jobWorkspace}/.coveragerc" + def perfCheckScriptLocal = "${llmSrcLocal}/tests/integration/defs/perf/perf_regression_check.py" + def perfCheckScriptNode = "${jobWorkspace}/${jobUID}-perf_regression_check.py" stage("[${stageName}] Initializing Test") { // Create Job Workspace folder in Frontend Node @@ -1004,6 +1007,16 @@ def runLLMTestlistWithSbatch(pipeline, platform, testList, config=VANILLA_CONFIG coverageConfigFile ) + if (perfSanityMode) { + Utils.copyFileToRemoteHost( + pipeline, + remote, + perfCheckScriptLocal, + perfCheckScriptNode, + true + ) + } + // Generate Pytest command String pytestUtil = "" if (nodeCount > 1) { @@ -1078,7 +1091,8 @@ def runLLMTestlistWithSbatch(pipeline, platform, testList, config=VANILLA_CONFIG // Define environment variables to export def envVarNames = [ 'OPEN_SEARCH_DB_BASE_URL', - 'OPEN_SEARCH_DB_CREDENTIALS', + 'OPEN_SEARCH_DB_CREDENTIALS_USR', + 'OPEN_SEARCH_DB_CREDENTIALS_PSW', 'BUILD_ID', 'BUILD_URL', 'JOB_NAME', @@ -1245,6 +1259,22 @@ def runLLMTestlistWithSbatch(pipeline, platform, testList, config=VANILLA_CONFIG ), numRetries: 3 ) + + if (perfSanityMode) { + stage("[${stageName}] Check perf result") { + def perfCheckResult = Utils.exec( + pipeline, + script: Utils.sshUserCmd( + remote, + "python3 ${perfCheckScriptNode} ${jobWorkspace}/${stageName}" + ), + returnStatus: true + ) + if (perfCheckResult != 0) { + error "Performance regression detected and failing the build (exit code: ${perfCheckResult})" + } + } + } } echo "Finished test stage execution." @@ -2698,7 +2728,7 @@ def runLLMTestlistOnPlatformImpl(pipeline, platform, testList, config=VANILLA_CO error "Some tests still failed after rerun attempts, please check the test report." } - if (perfMode && !stageName.contains("Perf-Sanity")) { + if (perfMode) { basePerfFilename = stageName.contains("PyTorch") ? "base_perf_pytorch.csv" : "base_perf.csv" basePerfPath = "${llmSrc}/tests/integration/defs/perf/${basePerfFilename}" stage("Check perf result") { @@ -2724,7 +2754,7 @@ def runLLMTestlistOnPlatformImpl(pipeline, platform, testList, config=VANILLA_CO } } - if (perfMode && stageName.contains("Perf-Sanity")) { + if (stageName.contains("PerfSanity")) { stage ("Check perf result") { def perfCheckResult = sh( script: """ @@ -2733,10 +2763,9 @@ def runLLMTestlistOnPlatformImpl(pipeline, platform, testList, config=VANILLA_CO """, returnStatus: true ) - // TODO: Enable this when perf regression check is stable - // if (perfCheckResult != 0) { - // error "Performance regression detected and failing the build (exit code: ${perfCheckResult})" - // } + if (perfCheckResult != 0) { + error "Performance regression detected and failing the build (exit code: ${perfCheckResult})" + } } } } @@ -3100,7 +3129,7 @@ def launchTestJobs(pipeline, testFilter) "RTXPro6000D-4_GPUs-PyTorch-Post-Merge-2": ["rtx-pro-6000d-x4", "l0_rtx_pro_6000", 2, 2, 4], ] - parallelJobs = x86TestConfigs.collectEntries{key, values -> [key, [createKubernetesPodConfig(LLM_DOCKER_IMAGE, values[0], "amd64", values[4] ?: 1, key.contains("Perf")), { + parallelJobs = x86TestConfigs.collectEntries{key, values -> [key, [createKubernetesPodConfig(LLM_DOCKER_IMAGE, values[0], "amd64", values[4] ?: 1, key.contains("-Perf-")), { def config = VANILLA_CONFIG if (key.contains("single-device")) { config = SINGLE_DEVICE_CONFIG @@ -3111,7 +3140,7 @@ def launchTestJobs(pipeline, testFilter) if (key.contains("Pybind")) { config = PYBIND_CONFIG } - runLLMTestlistOnPlatform(pipeline, values[0], values[1], config, key.contains("Perf"), key, values[2], values[3]) + runLLMTestlistOnPlatform(pipeline, values[0], values[1], config, key.contains("-Perf-"), key, values[2], values[3]) }]]} fullSet = parallelJobs.keySet() @@ -3132,9 +3161,9 @@ def launchTestJobs(pipeline, testFilter) "DGX_B300-4_GPUs-PyTorch-Post-Merge-1": ["b300-x4", "l0_dgx_b300", 1, 2, 4], "DGX_B300-4_GPUs-PyTorch-Post-Merge-2": ["b300-x4", "l0_dgx_b300", 2, 2, 4], // Perf sanity post merge test - // "DGX_B200-4_GPUs-PyTorch-Perf-Sanity-Post-Merge-1": ["b200-x4", "l0_dgx_b200_perf_sanity", 1, 1, 4], - // "DGX_B200-8_GPUs-PyTorch-Perf-Sanity-Post-Merge-1": ["b200-x8", "l0_dgx_b200_perf_sanity", 1, 1, 8], - // "DGX_B300-4_GPUs-PyTorch-Perf-Sanity-Post-Merge-1": ["b300-x4", "l0_dgx_b300_perf_sanity", 1, 1, 4], + // "DGX_B200-4_GPUs-PyTorch-PerfSanity-Post-Merge-1": ["b200-x4", "l0_dgx_b200_perf_sanity", 1, 1, 4], + // "DGX_B200-8_GPUs-PyTorch-PerfSanity-Post-Merge-1": ["b200-x8", "l0_dgx_b200_perf_sanity", 1, 1, 8], + // "DGX_B300-4_GPUs-PyTorch-PerfSanity-Post-Merge-1": ["b300-x4", "l0_dgx_b300_perf_sanity", 1, 1, 4], ] fullSet += x86SlurmTestConfigs.keySet() @@ -3146,7 +3175,7 @@ def launchTestJobs(pipeline, testFilter) if (key.contains("llvm")) { config = LLVM_CONFIG } - runLLMTestlistOnSlurm(pipeline, values[0], values[1], config, key.contains("Perf"), key, values[2], values[3], values[4] ?: 1, values[5] ?: 1, values[6] ?: false) + runLLMTestlistOnSlurm(pipeline, values[0], values[1], config, key.contains("-Perf-"), key, values[2], values[3], values[4] ?: 1, values[5] ?: 1, values[6] ?: false) }]]} parallelJobs += parallelSlurmJobs @@ -3162,11 +3191,19 @@ def launchTestJobs(pipeline, testFilter) "GB200-4_GPUs-PyTorch-1": ["gb200-x4-oci", "l0_gb200_multi_gpus", 1, 2, 4], "GB200-4_GPUs-PyTorch-2": ["gb200-x4-oci", "l0_gb200_multi_gpus", 2, 2, 4], "GB200-4_GPUs-PyTorch-Post-Merge-1": ["gb200-x4-oci", "l0_gb200_multi_gpus", 1, 1, 4], - // Perf sanity post merge test - "GB200-4_GPUs-PyTorch-Perf-Sanity-Post-Merge-1": ["gb200-x4-oci", "l0_gb200_multi_gpus_perf_sanity", 1, 1, 4], // Disable GB300 stages due to nodes will be offline temporarily. // "GB300-PyTorch-1": ["gb300-single", "l0_gb300", 1, 1], // "GB300-4_GPUs-PyTorch-Post-Merge-1": ["gb300-x4", "l0_gb300_multi_gpus", 1, 1, 4], + // Perf sanity pre merge test + "GB200-4_GPUs-PyTorch-PerfSanity-2": ["gb200-x4-oci", "l0_gb200_multi_gpus_perf_sanity", 2, 6, 4], + "GB200-4_GPUs-PyTorch-PerfSanity-4": ["gb200-x4-oci", "l0_gb200_multi_gpus_perf_sanity", 4, 6, 4], + // Perf sanity post merge test + "GB200-4_GPUs-PyTorch-PerfSanity-Post-Merge-1": ["gb200-x4-oci", "l0_gb200_multi_gpus_perf_sanity", 1, 6, 4], + "GB200-4_GPUs-PyTorch-PerfSanity-Post-Merge-2": ["gb200-x4-oci", "l0_gb200_multi_gpus_perf_sanity", 2, 6, 4], + "GB200-4_GPUs-PyTorch-PerfSanity-Post-Merge-3": ["gb200-x4-oci", "l0_gb200_multi_gpus_perf_sanity", 3, 6, 4], + "GB200-4_GPUs-PyTorch-PerfSanity-Post-Merge-4": ["gb200-x4-oci", "l0_gb200_multi_gpus_perf_sanity", 4, 6, 4], + "GB200-4_GPUs-PyTorch-PerfSanity-Post-Merge-5": ["gb200-x4-oci", "l0_gb200_multi_gpus_perf_sanity", 5, 6, 4], + "GB200-4_GPUs-PyTorch-PerfSanity-Post-Merge-6": ["gb200-x4-oci", "l0_gb200_multi_gpus_perf_sanity", 6, 6, 4], ] fullSet += SBSASlurmTestConfigs.keySet() @@ -3178,13 +3215,15 @@ def launchTestJobs(pipeline, testFilter) "GB200-8_GPUs-2_Nodes-PyTorch-Post-Merge-1": ["gb200-oci-trtllm", "l0_gb200_multi_nodes", 1, 3, 8, 2], "GB200-8_GPUs-2_Nodes-PyTorch-Post-Merge-2": ["gb200-oci-trtllm", "l0_gb200_multi_nodes", 2, 3, 8, 2], "GB200-8_GPUs-2_Nodes-PyTorch-Post-Merge-3": ["gb200-oci-trtllm", "l0_gb200_multi_nodes", 3, 3, 8, 2], - // Perf sanity post merge aggr tests - "GB200-8_GPUs-2_Nodes-PyTorch-Perf-Sanity-Post-Merge-1": ["gb200-oci-trtllm", "l0_gb200_multi_nodes_aggr_perf_sanity_2_nodes_001", 1, 1, 8, 2], - // Perf sanity post merge disagg tests - "GB200-12_GPUs-3_Nodes-PyTorch-Perf-Sanity-Disagg-Post-Merge-1": ["gb200-oci-trtllm", "l0_gb200_multi_nodes_disagg_perf_sanity_3_nodes_001", 1, 1, 12, 3], - // "GB200-24_GPUs-6_Nodes-PyTorch-Perf-Sanity-Disagg-Post-Merge-1": ["gb200-oci-trtllm", "l0_gb200_multi_nodes_disagg_perf_sanity_6_nodes_001", 1, 1, 24, 6], - // "GB200-24_GPUs-6_Nodes-PyTorch-Perf-Sanity-Disagg-Post-Merge-2": ["gb200-oci-trtllm", "l0_gb200_multi_nodes_disagg_perf_sanity_6_nodes_002", 1, 1, 24, 6], - // "GB200-32_GPUs-8_Nodes-PyTorch-Perf-Sanity-Disagg-Post-Merge-1": ["gb200-oci-trtllm", "l0_gb200_multi_nodes_disagg_perf_sanity_8_nodes_001", 1, 1, 32, 8], + // Perf sanity pre merge tests + "GB200-12_GPUs-3_Nodes-PyTorch-PerfSanity-Disagg-1": ["gb200-oci-trtllm", "l0_gb200_multi_nodes_disagg_perf_sanity_3_nodes", 1, 1, 12, 3], + // Perf sanity post merge tests + "GB200-8_GPUs-2_Nodes-PyTorch-PerfSanity-Post-Merge-1": ["gb200-oci-trtllm", "l0_gb200_multi_nodes_aggr_perf_sanity_2_nodes", 1, 2, 8, 2], + "GB200-8_GPUs-2_Nodes-PyTorch-PerfSanity-Post-Merge-2": ["gb200-oci-trtllm", "l0_gb200_multi_nodes_aggr_perf_sanity_2_nodes", 2, 2, 8, 2], + "GB200-12_GPUs-3_Nodes-PyTorch-PerfSanity-Disagg-Post-Merge-1": ["gb200-oci-trtllm", "l0_gb200_multi_nodes_disagg_perf_sanity_3_nodes", 1, 1, 12, 3], + // "GB200-24_GPUs-6_Nodes-PyTorch-PerfSanity-Disagg-Post-Merge-1": ["gb200-oci-trtllm", "l0_gb200_multi_nodes_disagg_perf_sanity_6_nodes", 1, 2, 24, 6], + // "GB200-24_GPUs-6_Nodes-PyTorch-PerfSanity-Disagg-Post-Merge-2": ["gb200-oci-trtllm", "l0_gb200_multi_nodes_disagg_perf_sanity_6_nodes", 2, 2, 24, 6], + // "GB200-32_GPUs-8_Nodes-PyTorch-PerfSanity-Disagg-Post-Merge-1": ["gb200-oci-trtllm", "l0_gb200_multi_nodes_disagg_perf_sanity_8_nodes", 1, 1, 32, 8], ] fullSet += multiNodesSBSAConfigs.keySet() @@ -3202,7 +3241,7 @@ def launchTestJobs(pipeline, testFilter) if (key.contains("llvm")) { config = LLVM_CONFIG } - runLLMTestlistOnSlurm(pipeline, values[0], values[1], config, key.contains("Perf"), key, values[2], values[3], values[4] ?: 1, values[5] ?: 1, values[6] ?: false) + runLLMTestlistOnSlurm(pipeline, values[0], values[1], config, key.contains("-Perf-"), key, values[2], values[3], values[4] ?: 1, values[5] ?: 1, values[6] ?: false) }]]} parallelJobs += parallelSlurmJobs @@ -3215,7 +3254,7 @@ def launchTestJobs(pipeline, testFilter) if (key.contains("llvm")) { config = LLVM_CONFIG } - runLLMTestlistOnSlurm(pipeline, values[0], values[1], config, key.contains("Perf"), key, values[2], values[3], values[4] ?: 1, values[5] ?: 2, values[6] ?: false) + runLLMTestlistOnSlurm(pipeline, values[0], values[1], config, key.contains("-Perf-"), key, values[2], values[3], values[4] ?: 1, values[5] ?: 2, values[6] ?: false) }]]} parallelJobs += parallelMultiNodesSBSAJobs diff --git a/jenkins/scripts/open_search_db.py b/jenkins/scripts/open_search_db.py index 681b7bce29b..d27557a1b20 100644 --- a/jenkins/scripts/open_search_db.py +++ b/jenkins/scripts/open_search_db.py @@ -51,6 +51,7 @@ JOB_MACHINE_PROJECT_NAME = f"{PROJECT_ROOT}-ci-{MODE}-job_machine_info" FAILED_STEP_PROJECT_NAME = f"{PROJECT_ROOT}-ci-{MODE}-failed_step_info" PR_PROJECT_NAME = f"{PROJECT_ROOT}-ci-{MODE}-pr_info" +PERF_SANITY_PROJECT_NAME = f"{PROJECT_ROOT}-ci-{MODE}-perf_sanity_info" READ_ACCESS_PROJECT_NAME = [ JOB_PROJECT_NAME, @@ -59,9 +60,12 @@ JOB_MACHINE_PROJECT_NAME, FAILED_STEP_PROJECT_NAME, PR_PROJECT_NAME, + PERF_SANITY_PROJECT_NAME, ] -WRITE_ACCESS_PROJECT_NAME = [] +WRITE_ACCESS_PROJECT_NAME = [ + PERF_SANITY_PROJECT_NAME, +] DISABLE_OPEN_SEARCH_DB_FOR_LOCAL_TEST = False diff --git a/jenkins/scripts/slurm_run.sh b/jenkins/scripts/slurm_run.sh index e86092b7ea2..8f1c60aff90 100755 --- a/jenkins/scripts/slurm_run.sh +++ b/jenkins/scripts/slurm_run.sh @@ -100,7 +100,7 @@ echo "Full Command: $pytestCommand" eval $pytestCommand echo "Rank${SLURM_PROCID} Pytest finished execution" -if [ $SLURM_PROCID -eq 0 ] && [ "$perfMode" = "true" ] && [[ "$stageName" != *Perf-Sanity* ]]; then +if [ $SLURM_PROCID -eq 0 ] && [ "$perfMode" = "true" ]; then if [[ "$stageName" == *PyTorch* ]]; then basePerfFilename="base_perf_pytorch.csv" else @@ -117,9 +117,3 @@ if [ $SLURM_PROCID -eq 0 ] && [ "$perfMode" = "true" ] && [[ "$stageName" != *Pe --files $stageName/perf_script_test_results.csv \ $basePerfPath fi - -if [ $SLURM_PROCID -eq 0 ] && [ "$perfMode" = "true" ] && [[ "$stageName" == *Perf-Sanity* ]]; then - echo "Check Perf-Sanity Result" - python3 $llmSrcNode/tests/integration/defs/perf/perf_regression_check.py \ - $jobWorkspace -fi diff --git a/tests/integration/defs/perf/open_search_db_utils.py b/tests/integration/defs/perf/open_search_db_utils.py index 87f0b0fed62..2aa38d3f5b9 100644 --- a/tests/integration/defs/perf/open_search_db_utils.py +++ b/tests/integration/defs/perf/open_search_db_utils.py @@ -29,12 +29,12 @@ os.path.join(os.path.dirname(__file__), '../../../..')) if _project_root not in sys.path: sys.path.insert(0, _project_root) -from jenkins.scripts.open_search_db import OpenSearchDB +from jenkins.scripts.open_search_db import (PERF_SANITY_PROJECT_NAME, + OpenSearchDB) -PROJECT_ROOT = "sandbox-temp-trtllm-ci-perf-v1" # "sandbox-trtllm-ci-perf" -TEST_INFO_PROJECT_NAME = f"{PROJECT_ROOT}-test_info" -PRE_MERGE_THRESHOLD = 0.1 -POST_MERGE_THRESHOLD = 0.05 +POC_PROJECT_NAME = "sandbox-temp-trtllm-ci-perf-v1-test_info" +USE_POC_DB = os.environ.get("USE_POC_DB", "false").lower() == "true" +TEST_INFO_PROJECT_NAME = POC_PROJECT_NAME if USE_POC_DB else PERF_SANITY_PROJECT_NAME # Metrics where larger is better MAXIMIZE_METRICS = [ @@ -431,23 +431,22 @@ def prepare_regressive_test_cases(history_baseline_dict, new_data_dict): Set it as regressive. """ regressive_data_list = [] - cmd_idxs = new_data_dict.keys() # Find regressive test cases - for cmd_idx in cmd_idxs: + for cmd_idx in new_data_dict: if history_baseline_dict[cmd_idx] is None: continue - baseline_data = history_baseline_dict[cmd_idx] + history_baseline = history_baseline_dict[cmd_idx] new_data = new_data_dict[cmd_idx] is_regressive = False regressive_metrics = [] # Check MAXIMIZE_METRICS (new should be >= baseline * (1 - threshold)) for metric in MAXIMIZE_METRICS: - if metric not in new_data or metric not in baseline_data: + if metric not in new_data or metric not in history_baseline: continue - threshold = get_threshold(baseline_data, metric) - baseline_value = baseline_data[metric] + threshold = get_threshold(history_baseline, metric) + baseline_value = history_baseline[metric] new_value = new_data[metric] # Regressive if new_value < baseline_value * (1 - threshold) if new_value < baseline_value * (1 - threshold): @@ -456,10 +455,10 @@ def prepare_regressive_test_cases(history_baseline_dict, new_data_dict): # Check MINIMIZE_METRICS (new should be <= baseline * (1 + threshold)) for metric in MINIMIZE_METRICS: - if metric not in new_data or metric not in baseline_data: + if metric not in new_data or metric not in history_baseline: continue - threshold = get_threshold(baseline_data, metric) - baseline_value = baseline_data[metric] + threshold = get_threshold(history_baseline, metric) + baseline_value = history_baseline[metric] new_value = new_data[metric] # Regressive if new_value > baseline_value * (1 + threshold) if new_value > baseline_value * (1 + threshold): @@ -471,9 +470,9 @@ def prepare_regressive_test_cases(history_baseline_dict, new_data_dict): regressive_data = new_data.copy() # Add baseline values and thresholds for all metrics for metric in MAXIMIZE_METRICS + MINIMIZE_METRICS: - if metric in baseline_data: + if metric in history_baseline: baseline_key = f"d_baseline_{metric[2:]}" - regressive_data[baseline_key] = baseline_data[metric] + regressive_data[baseline_key] = history_baseline[metric] # Copy all threshold keys from baseline metric_suffix = metric[2:] @@ -482,8 +481,8 @@ def prepare_regressive_test_cases(history_baseline_dict, new_data_dict): f"d_threshold_post_merge_{metric_suffix}", f"d_threshold_pre_merge_{metric_suffix}" ]: - if threshold_key in baseline_data: - regressive_data[threshold_key] = baseline_data[ + if threshold_key in history_baseline: + regressive_data[threshold_key] = history_baseline[ threshold_key] # Add regression info string @@ -495,7 +494,15 @@ def prepare_regressive_test_cases(history_baseline_dict, new_data_dict): return regressive_data_list -def prepare_baseline_data(history_data_dict, new_data_dict): +def _is_valid_baseline(baseline_data): + """Check if baseline data is valid (non-empty dict).""" + if isinstance(baseline_data, dict) and len(baseline_data) > 0: + return True + return False + + +def prepare_baseline_data(history_baseline_dict, history_data_dict, + new_data_dict): """ Calculate new baseline from history post-merge data and new data. Then return new baseline data. @@ -507,18 +514,42 @@ def prepare_baseline_data(history_data_dict, new_data_dict): # Calculate best metrics from history post-merge data and new data best_metrics = calculate_best_perf_result(history_data_dict[cmd_idx], new_data_dict[cmd_idx]) + + # Create new_baseline_data from new_data_dict and set b_is_baseline new_baseline_data = new_data_dict[cmd_idx].copy() new_baseline_data["b_is_baseline"] = True - # Add or update baseline metrics and thresholds - for metric, value in best_metrics.items(): - new_baseline_data[metric] = value + + # Initialize metric_threshold_dict with default thresholds for all metrics + metric_threshold_dict = {} + for metric in MAXIMIZE_METRICS + MINIMIZE_METRICS: metric_suffix = metric[2:] post_merge_key = f"d_threshold_post_merge_{metric_suffix}" pre_merge_key = f"d_threshold_pre_merge_{metric_suffix}" - new_baseline_data[post_merge_key] = new_baseline_data.get( - post_merge_key, POST_MERGE_THRESHOLD) - new_baseline_data[pre_merge_key] = new_baseline_data.get( - pre_merge_key, PRE_MERGE_THRESHOLD) + metric_threshold_dict[post_merge_key] = POST_MERGE_THRESHOLD + metric_threshold_dict[pre_merge_key] = PRE_MERGE_THRESHOLD + + # If history baseline is valid, extract thresholds and update metric_threshold_dict + history_baseline = history_baseline_dict[cmd_idx] + if _is_valid_baseline(history_baseline): + for metric in MAXIMIZE_METRICS + MINIMIZE_METRICS: + metric_suffix = metric[2:] + post_merge_key = f"d_threshold_post_merge_{metric_suffix}" + pre_merge_key = f"d_threshold_pre_merge_{metric_suffix}" + if post_merge_key in history_baseline: + metric_threshold_dict[post_merge_key] = history_baseline[ + post_merge_key] + if pre_merge_key in history_baseline: + metric_threshold_dict[pre_merge_key] = history_baseline[ + pre_merge_key] + + # Update new_baseline_data with best_metrics values + for metric, value in best_metrics.items(): + new_baseline_data[metric] = value + + # Add all thresholds to new_baseline_data + for threshold_key, threshold_value in metric_threshold_dict.items(): + new_baseline_data[threshold_key] = threshold_value + add_id(new_baseline_data) new_baseline_data_dict[cmd_idx] = new_baseline_data diff --git a/tests/integration/defs/perf/test_perf.py b/tests/integration/defs/perf/test_perf.py index 6074f2f310f..df4c72a8390 100644 --- a/tests/integration/defs/perf/test_perf.py +++ b/tests/integration/defs/perf/test_perf.py @@ -18,28 +18,20 @@ import os import re import shutil -import socket -import subprocess import sys from typing import Dict, List, NamedTuple import pytest import yaml from defs.common import get_cpp_benchmark -from defs.trt_test_alternative import (is_linux, is_windows, print_error, - print_info, print_warning) +from defs.trt_test_alternative import (is_linux, is_windows, print_info, + print_warning) from ..conftest import get_llm_root, llm_models_root, trt_environment -from .open_search_db_utils import (SCENARIO_MATCH_FIELDS, add_id, - get_history_data, get_job_info, - post_new_perf_data, prepare_baseline_data, - prepare_regressive_test_cases, - write_regressive_test_cases) from .pytorch_model_config import get_model_yaml_config from .sampler_options_config import get_sampler_options_config -from .utils import (AbstractPerfScriptTestClass, PerfAggrScriptTestCmds, - PerfBenchScriptTestCmds, PerfDisaggScriptTestCmds, - PerfMetricType, PerfMultiNodeDisaggScriptTestCmds, +from .utils import (AbstractPerfScriptTestClass, PerfBenchScriptTestCmds, + PerfDisaggScriptTestCmds, PerfMetricType, generate_test_nodes) if not hasattr(re, "Pattern"): @@ -502,598 +494,6 @@ class PerfTestMetric(NamedTuple): cmd_idx: int -def to_env_dict(env_vars: str) -> Dict[str, str]: - env = {} - for env_var in env_vars.split(): - if "=" in env_var: - key, value = env_var.split("=", 1) - env[key] = value - return env - - -class ServerConfig: - """ - Configurations of trtllm-server. - """ - - def __init__(self, server_config_data: dict, env_vars: str = ""): - # Extract required fields - self.mode = server_config_data.get('mode', 'e2e') - self.concurrency = server_config_data.get('concurrency', 1) - self.name = server_config_data['name'] - self.model_name = server_config_data['model_name'] - self.model_path = "" - self.env_vars = env_vars - - # Extract optional fields with defaults - self.tp = server_config_data.get('tensor_parallel_size', 1) - self.ep = server_config_data.get('moe_expert_parallel_size', 1) - self.pp = server_config_data.get('pipeline_parallel_size', 1) - self.cp = server_config_data.get('context_parallel_size', 1) - self.gpus = server_config_data.get('gpus', self.tp * self.cp * self.pp) - self.gpus_per_node = server_config_data.get('gpus_per_node', - 0) or self.gpus - self.max_num_tokens = server_config_data.get('max_num_tokens', 2048) - self.max_batch_size = server_config_data.get('max_batch_size', 512) - self.max_seq_len = server_config_data.get('max_seq_len', 0) - self.disable_overlap_scheduler = server_config_data.get( - 'disable_overlap_scheduler', False) - self.num_postprocess_workers = server_config_data.get( - 'num_postprocess_workers', 0) - self.stream_interval = server_config_data.get('stream_interval', 10) - self.attn_backend = server_config_data.get('attn_backend', "TRTLLM") - self.enable_chunked_prefill = server_config_data.get( - 'enable_chunked_prefill', False) - self.enable_attention_dp = server_config_data.get( - 'enable_attention_dp', False) - self.trust_remote_code = server_config_data.get('trust_remote_code', - False) - self.enable_lm_head_tp_in_adp = server_config_data.get( - 'enable_lm_head_tp_in_adp', False) - - # attention_dp_config - attention_dp_config = server_config_data.get('attention_dp_config', {}) - self.attention_dp_balance = attention_dp_config.get( - 'enable_balance', False) - self.batching_wait_iters = attention_dp_config.get( - 'batching_wait_iters', 0) - self.timeout_iters = attention_dp_config.get('timeout_iters', 60) - - # moe_config - moe_config = server_config_data.get('moe_config', {}) - self.moe_backend = moe_config.get('backend', "") - self.moe_max_num_tokens = moe_config.get('max_num_tokens', 0) - self.use_low_precision_moe_combine = moe_config.get( - 'use_low_precision_moe_combine', False) - load_balancer_config = moe_config.get('load_balancer', {}) - self.load_balancer_num_slots = load_balancer_config.get('num_slots', 0) - self.load_balancer_layer_updates_per_iter = load_balancer_config.get( - 'layer_updates_per_iter', 0) - - # cuda_graph_config - cuda_graph_config = server_config_data.get('cuda_graph_config', {}) - self.enable_cuda_graph = False - if cuda_graph_config: - self.enable_cuda_graph = True - self.enable_padding = cuda_graph_config.get('enable_padding', True) - self.cuda_graph_batch_sizes = cuda_graph_config.get( - 'batch_sizes', []) - self.cuda_graph_max_batch_size = cuda_graph_config.get( - 'max_batch_size', 0) - else: - self.enable_padding = True - self.cuda_graph_batch_sizes = [] - self.cuda_graph_max_batch_size = 0 - - # kv_cache_config - kv_cache_config = server_config_data.get('kv_cache_config', {}) - self.kv_cache_dtype = kv_cache_config.get('dtype', "fp8") - self.enable_block_reuse = kv_cache_config.get('enable_block_reuse', - False) - self.free_gpu_memory_fraction = kv_cache_config.get( - 'free_gpu_memory_fraction', 0.8) - - # cache_transceiver_config - cache_transceiver_config = server_config_data.get( - 'cache_transceiver_config', {}) - self.cache_transceiver_backend = cache_transceiver_config.get( - 'backend', "") - self.cache_transceiver_max_tokens_in_buffer = cache_transceiver_config.get( - 'max_tokens_in_buffer', 0) - - # speculative_config - speculative_config = server_config_data.get('speculative_config', {}) - self.spec_decoding_type = speculative_config.get('decoding_type', "") - self.num_nextn_predict_layers = speculative_config.get( - 'num_nextn_predict_layers', 0) - eagle3_value = speculative_config.get('eagle3_layers_to_capture', []) - if isinstance(eagle3_value, int): - self.eagle3_layers_to_capture = [eagle3_value] - elif isinstance(eagle3_value, list): - self.eagle3_layers_to_capture = eagle3_value - else: - self.eagle3_layers_to_capture = [] - self.max_draft_len = speculative_config.get('max_draft_len', 0) - self.speculative_model_dir = speculative_config.get( - 'speculative_model_dir', "") - - # match_mode: "config" (default, 40+ fields) or "scenario" (benchmark scenario fields for recipe testing) - # When match_mode is "scenario", baselines are matched by scenario identity - # (model, gpu, isl, osl, concurrency, num_gpus) instead of full config fields. - self.match_mode = server_config_data.get('match_mode', "config") - - # Store filtered config for extra_llm_api_config (exclude name, model_name, gpus, client_configs) - exclude_keys = [ - 'mode', 'concurrency', 'name', 'model_name', 'gpus', - 'gpus_per_node', 'client_configs' - ] - self.extra_llm_api_config_data = { - k: v - for k, v in server_config_data.items() if k not in exclude_keys - } - - def to_cmd(self, - output_dir: str, - numa_bind: bool = False, - disagg_serving_type: str = "") -> List[str]: - model_dir = get_model_dir(self.model_name) - self.model_path = model_dir if os.path.exists( - model_dir) else self.model_name - config_filename = f"extra-llm-api-config.{self.name}.yml" - config_path = os.path.join(output_dir, config_filename) - - numa_bind_cmd = [] - if numa_bind: - numa_bind_cmd = ["numactl", "-m 0,1"] - - cmd = numa_bind_cmd + [ - "trtllm-serve", self.model_path, "--backend", "pytorch", "--config", - config_path - ] - return cmd - - def to_env(self) -> Dict[str, str]: - return to_env_dict(self.env_vars) - - def to_match_keys(self) -> List[str]: - return [ - "s_mode", - "s_model_name", - "l_tp", - "l_ep", - "l_pp", - "l_cp", - "l_gpus_per_node", - "l_max_batch_size", - "b_disable_overlap_scheduler", - "l_num_postprocess_workers", - "s_attn_backend", - "b_enable_chunked_prefill", - "b_enable_attention_dp", - "b_enable_lm_head_tp_in_adp", - # attention_dp_config - "b_attention_dp_balance", - # moe_config - "s_moe_backend", - # cuda_graph_config - "b_enable_cuda_graph", - # kv_cache_config - "s_kv_cache_dtype", - # cache_transceiver_config - "s_cache_transceiver_backend" - # speculative_config - "s_spec_decoding_type", - "l_num_nextn_predict_layers", - ] - - def to_db_data(self) -> dict: - db_data = { - "s_mode": - self.mode, - "s_model_name": - self.model_name.lower(), - "l_gpus": - self.gpus, - "l_tp": - self.tp, - "l_ep": - self.ep, - "l_pp": - self.pp, - "l_cp": - self.cp, - "l_gpus_per_node": - self.gpus_per_node, - "l_max_num_tokens": - self.max_num_tokens, - "l_max_batch_size": - self.max_batch_size, - "l_max_seq_len": - self.max_seq_len, - "b_disable_overlap_scheduler": - self.disable_overlap_scheduler, - "l_num_postprocess_workers": - self.num_postprocess_workers, - "l_stream_interval": - self.stream_interval, - "s_attn_backend": - self.attn_backend, - "b_enable_chunked_prefill": - self.enable_chunked_prefill, - "b_enable_attention_dp": - self.enable_attention_dp, - "b_trust_remote_code": - self.trust_remote_code, - "b_enable_lm_head_tp_in_adp": - self.enable_lm_head_tp_in_adp, - # attention_dp_config - "b_attention_dp_balance": - self.attention_dp_balance, - "l_batching_wait_iters": - self.batching_wait_iters, - "l_timeout_iters": - self.timeout_iters, - # moe_config - "s_moe_backend": - self.moe_backend, - "l_moe_max_num_tokens": - self.moe_max_num_tokens, - "b_use_low_precision_moe_combine": - self.use_low_precision_moe_combine, - "l_load_balancer_num_slots": - self.load_balancer_num_slots, - "l_load_balancer_layer_updates_per_iter": - self.load_balancer_layer_updates_per_iter, - # cuda_graph_config - "b_enable_cuda_graph": - self.enable_cuda_graph, - "b_enable_padding": - self.enable_padding, - "l_cuda_graph_max_batch_size": - self.cuda_graph_max_batch_size, - "s_cuda_graph_batch_sizes": - ",".join(map(str, self.cuda_graph_batch_sizes)), - # kv_cache_config - "s_kv_cache_dtype": - self.kv_cache_dtype, - "b_enable_block_reuse": - self.enable_block_reuse, - "d_free_gpu_memory_fraction": - self.free_gpu_memory_fraction, - # cache_transceiver_config - "s_cache_transceiver_backend": - self.cache_transceiver_backend, - "l_cache_transceiver_max_tokens_in_buffer": - self.cache_transceiver_max_tokens_in_buffer, - # speculative_config - "s_spec_decoding_type": - self.spec_decoding_type, - "l_num_nextn_predict_layers": - self.num_nextn_predict_layers, - "s_eagle3_layers_to_capture": - ",".join(map(str, self.eagle3_layers_to_capture)), - "l_max_draft_len": - self.max_draft_len, - "s_speculative_model_dir": - self.speculative_model_dir, - "s_server_log_link": - "", - "s_server_env_var": - self.env_vars, - } - return db_data - - def generate_extra_llm_api_config(self) -> str: - """Generate extra-llm-api-config.yml content""" - # Make a copy to avoid modifying the original - config_data = dict(self.extra_llm_api_config_data) - - # Handle speculative_model_dir path conversion if it exists - if 'speculative_config' in config_data and 'speculative_model_dir' in config_data[ - 'speculative_config']: - spec_model_dir = config_data['speculative_config'][ - 'speculative_model_dir'] - if spec_model_dir: - config_data['speculative_config'][ - 'speculative_model_dir'] = os.path.join( - llm_models_root(), spec_model_dir) - - return yaml.dump(config_data, default_flow_style=False, sort_keys=False) - - -class ClientConfig: - """ - Configurations of benchmark client. - """ - - def __init__(self, - client_config_data: dict, - model_name: str, - env_vars: str = ""): - self.name = client_config_data.get('name', '') - self.model_name = model_name - self.concurrency = client_config_data.get('concurrency', 1) - self.iterations = client_config_data.get('iterations', 1) - self.isl = client_config_data.get('isl', 1024) - self.osl = client_config_data.get('osl', 1024) - self.random_range_ratio = client_config_data.get( - 'random_range_ratio', 0.0) - self.backend = client_config_data.get('backend', "openai") - self.use_chat_template = client_config_data.get('use_chat_template', - False) - self.streaming = client_config_data.get('streaming', True) - self.model_path = "" - self.env_vars = env_vars - - def to_cmd(self) -> List[str]: - model_dir = get_model_dir(self.model_name) - self.model_path = model_dir if os.path.exists( - model_dir) else self.model_name - dataset_path = get_dataset_path() - benchmark_cmd = [ - "python", - "-m", - "tensorrt_llm.serve.scripts.benchmark_serving", - "--model", - self.model_path, - "--tokenizer", - self.model_path, - "--dataset-name", - "random", - "--random-ids", - "--num-prompts", - str(self.concurrency * self.iterations), - "--max-concurrency", - str(self.concurrency), - "--random-input-len", - str(self.isl), - "--random-output-len", - str(self.osl), - "--random-range-ratio", - str(self.random_range_ratio), - "--trust-remote-code", - "--ignore-eos", - "--percentile-metrics", - "ttft,tpot,itl,e2el", - ] - if dataset_path and os.path.exists(dataset_path): - benchmark_cmd.append("--dataset-path") - benchmark_cmd.append(dataset_path) - if self.backend: - benchmark_cmd.append("--backend") - benchmark_cmd.append(self.backend) - if self.use_chat_template: - benchmark_cmd.append("--use-chat-template") - if not self.streaming: - benchmark_cmd.append("--non-streaming") - return benchmark_cmd - - def to_env(self) -> Dict[str, str]: - return to_env_dict(self.env_vars) - - def to_match_keys(self) -> List[str]: - return [ - "l_concurrency", - "l_iterations", - "l_isl", - "l_osl", - "d_random_range_ratio", - "s_backend", - "b_use_chat_template", - "b_streaming", - ] - - def to_db_data(self) -> dict: - """Convert ClientConfig to Database data""" - db_data = { - "l_concurrency": self.concurrency, - "l_iterations": self.iterations, - "l_isl": self.isl, - "l_osl": self.osl, - "d_random_range_ratio": self.random_range_ratio, - "s_backend": self.backend, - "b_use_chat_template": self.use_chat_template, - "b_streaming": self.streaming, - "s_client_log_link": "", - "s_client_env_vars": self.env_vars, - } - if self.backend: - db_data["s_backend"] = self.backend - if self.use_chat_template: - db_data["b_use_chat_template"] = self.use_chat_template - return db_data - - -def parse_select_pattern(select_pattern: str): - """Parse select pattern like 'r1_fp4_dep4,r1_fp4_tep4:con1_iter1_1024_1024,r1_fp4_tep4:con8_iter1_1024_1024' - - Format: - - ',' splits different server configs - - ':' means for this server, we choose specific clients - - If no ':', all clients are chosen for that server - - Returns: - - Dict with server name as key and either None (all clients) or set of client names as value - """ - execution_plan = {} - - parts = select_pattern.split(',') - for part in parts: - part = part.strip() - if not part: # Skip empty parts - continue - - if ':' in part: - # Format: "server_name:client_name" - server_name, client_name = part.split(':', 1) - server_name = server_name.strip() - client_name = client_name.strip() - - # Only add if not already set to None (all clients) - if server_name not in execution_plan: - execution_plan[server_name] = set() - - if execution_plan[server_name] is not None: - execution_plan[server_name].add(client_name) - else: - # Format: "server_name" - select all clients for this server - server_name = part.strip() - execution_plan[server_name] = None - - return execution_plan - - -def parse_aggr_config_file(config_file_path: str, select_pattern: str = None): - """Parse YAML configuration file and create ServerConfig and ClientConfig objects for aggregated server - - Args: - config_file_path: Path to YAML configuration file - select_pattern: Selection pattern string (e.g., "r1_fp4_dep4,r1_fp4_tep4:con1_iter1_1024_1024") - - Returns: - execution_plan: None (all servers/clients) or dict with server names as keys - server_configs: List of ServerConfig objects - server_client_configs: Dict with server id as key and list of ClientConfig as value - """ - # Parse selection pattern - if select_pattern: - execution_plan = parse_select_pattern(select_pattern) - else: - execution_plan = None - - with open(config_file_path, 'r') as f: - config = yaml.safe_load(f) - - metadata = config.get('metadata', {}) - environment = config.get('environment', {}) - hardware = config.get('hardware', {}) - gpus_per_node = hardware.get('gpus_per_node', 0) - - model_name = metadata.get('model_name', '') - server_env_var = environment.get('server_env_var', '') - client_env_var = environment.get('client_env_var', '') - - server_configs = [] - server_client_configs = {} - for server_config_data in config['server_configs']: - server_name = server_config_data['name'] - server_config_data[ - 'model_name'] = model_name if 'model_name' not in server_config_data else server_config_data[ - 'model_name'] - server_config_data['mode'] = 'e2e' - server_config_data['concurrency'] = -1 - server_config_data['gpus_per_node'] = gpus_per_node - - # Check if this server should be included based on execution_plan - if execution_plan is not None and server_name not in execution_plan: - continue - - server_config = ServerConfig(server_config_data, server_env_var) - server_id = len(server_configs) - server_configs.append(server_config) - - client_configs = [] - selected_client_names = execution_plan.get( - server_name) if execution_plan else None - - for client_config_data in server_config_data['client_configs']: - client_name = client_config_data['name'] - - # Check if this client should be included - if execution_plan is not None and selected_client_names is not None: - if client_name not in selected_client_names: - continue - - client_config = ClientConfig(client_config_data, - server_config_data['model_name'], - client_env_var) - client_configs.append(client_config) - - server_client_configs[server_id] = client_configs - - return execution_plan, server_configs, server_client_configs - - -def parse_multi_node_disagg_config_file(config_file_path: str, - select_pattern: str = None): - disagg_serving_type = os.environ.get("DISAGG_SERVING_TYPE", "BENCHMARK") - - # Read YAML config file - with open(config_file_path, 'r') as f: - config = yaml.safe_load(f) - - disagg_configs = [] - metadata = config.get('metadata', {}) - hardware = config.get('hardware', {}) - benchmark = config.get('benchmark', {}) - environment = config.get('environment', {}) - slurm_config = config.get('slurm', {}) - worker_config = config.get('worker_config', {}) - timeout = slurm_config.get('timeout', 7200) - numa_bind = slurm_config.get('numa_bind', False) - gpus_per_node = hardware.get('gpus_per_node', 0) - model_name = metadata.get('model_name', '') - assert model_name, "model_name is required in metadata section" - - benchmark_mode = benchmark.get('mode', 'e2e') - if "gen_only" in benchmark_mode: - hardware['num_ctx_servers'] = 0 - - worker_env_var = environment.get('worker_env_var', '') - server_env_var = environment.get('server_env_var', '') - client_env_var = environment.get('client_env_var', '') - - concurrency_str = benchmark.get('concurrency_list', '1') - if isinstance(concurrency_str, str): - concurrency = max(int(x) for x in concurrency_str.split()) - else: - concurrency = int(concurrency_str) - - ctx_server_config_data = { - 'mode': benchmark_mode, - 'concurrency': concurrency, - 'name': 'ctx', - 'model_name': model_name, - 'gpus_per_node': gpus_per_node, - **worker_config.get('ctx', {}) - } - gen_server_config_data = { - 'mode': benchmark_mode, - 'concurrency': concurrency, - 'name': 'gen', - 'model_name': model_name, - 'gpus_per_node': gpus_per_node, - **worker_config.get('gen', {}) - } - client_config_data = { - 'name': 'client', - 'concurrency': concurrency, - 'iterations': benchmark.get('multi_round', 1), - 'isl': benchmark.get('input_length', 1024), - 'osl': benchmark.get('output_length', 1024), - 'random_range_ratio': benchmark.get('benchmark_ratio', 0.0), - 'backend': 'openai', - 'use_chat_template': False, - 'streaming': benchmark.get('streaming', True), - } - disagg_config = { - 'disagg_serving_type': disagg_serving_type, - 'hostname': socket.gethostname(), - 'numa_bind': numa_bind, - 'timeout': timeout, - 'mode': benchmark_mode, - 'name': 'disagg_config', - 'model_name': model_name, - 'hardware': hardware, - 'ctx_server': ServerConfig(ctx_server_config_data, worker_env_var), - 'gen_server': ServerConfig(gen_server_config_data, worker_env_var), - 'server_env_var': server_env_var, - 'client': ClientConfig(client_config_data, model_name, client_env_var), - } - disagg_configs.append(disagg_config) - return disagg_configs - - class PerfTestConfig: """ Configurations defining the LLM perf test. @@ -1203,20 +603,6 @@ def __init__( self.ctx_server_workers = 0 self.gen_server_workers = 0 - # Used for perf sanity test - self.upload_to_db = False - self.config_file = None - self.gpu_type = None - self.config_dir = None - self.config_file = None - self.config_path = None - self.select_pattern = None - # Aggregated mode - self.server_configs = [] - self.server_client_configs = {} - # Multi-node disaggregated mode - self.disagg_configs = [] - def _to_string_disagg(self, entries: List[str]): entries.append(f"disagg_server") if self.ctx_tp_size > 1: @@ -1241,21 +627,6 @@ def to_string(self, custom_output_len: int = None, device_subtype: str = None) -> str: - # Used for perf sanity test - if self.config_file is not None: - entries = ["perf_sanity", self.config_file] - if "disagg" in self.config_file: - # For multi-node disagg, add disagg config name - if custom_server_name is not None: - entries.append(f"disagg:{custom_server_name}") - else: - # For aggr_server - if custom_server_name is not None: - entries.append(f"server:{custom_server_name}") - if custom_client_name is not None: - entries.append(f"client:{custom_client_name}") - return "-".join(entries) - # First, add the model name. entries = [self.model_name] @@ -1425,49 +796,6 @@ def load_from_str(self, test_param_labels) -> None: # Extract configs from test param labels. labels = test_param_labels.split("-") - def get_gpu_type() -> str: - try: - output = subprocess.check_output(["nvidia-smi", "-L"], - stderr=subprocess.DEVNULL, - text=True) - first_line = output.strip().split("\n")[0] - gpu_models = ["GB300", "GB200", "B300", "B200"] - for model in gpu_models: - if model in first_line: - if model.startswith("B") and not model.startswith("GB"): - return f"dgx_{model.lower()}" - return model.lower() - except (subprocess.CalledProcessError, FileNotFoundError, - IndexError): - print_error( - f"Failed to get GPU type: {subprocess.CalledProcessError}") - return "" - - if "perf_sanity" in labels[0]: - assert len(labels) > 1, "perf_sanity test must have a config file!" - is_disagg = "disagg" in labels[0] - self.upload_to_db = "upload" in labels[0] - self.gpu_type = get_gpu_type() - if is_disagg: - # For disagg, test name is like: perf_sanity_disagg-deepseek-r1-fp4_8k1k_ctx1_gen1_dep32_bs128_eplb0_mtp0_ccb-UCX - # labels[0] is perf_sanity_disagg, "-".join(labels[1:]) is config file base name - self.runtime = "multi_node_disagg_server" - self.config_dir = "tests/integration/defs/perf/disagg/test_configs/disagg/perf" - config_base = "-".join(labels[1:]) - self.config_file = f"{config_base}.yaml" if not config_base.endswith( - ".yaml") else config_base - self.select_pattern = None - else: - # For aggr, test name is like: perf_sanity_aggr-l0_dgx_b300-r1_fp8_dep8_mtp1_1k1k - # labels[0] is perf_sanity_aggr, labels[1] is config file base name, labels[2] is select_pattern (optional) - self.runtime = "aggr_server" - self.config_dir = "tests/scripts/perf-sanity" - config_base = labels[1] - self.config_file = f"{config_base}.yaml" if config_base and not config_base.endswith( - ".yaml") else config_base - self.select_pattern = labels[2] if len(labels) > 2 else None - return - self.model_name = labels.pop(0) # Check if device subtype is present (for autodeploy tests) @@ -1685,20 +1013,6 @@ def validate(self): [b >= 32 for b in self.batch_sizes] ), f"gpt_350m and bloom_560m with small BS are very unstable! Please increase to at least 32." - def set_aggr_server_configs(self) -> None: - """ - Set the server and client configs. - """ - _, self.server_configs, self.server_client_configs = parse_aggr_config_file( - self.config_path, self.select_pattern) - - def set_multi_node_disagg_server_configs(self) -> None: - """ - Set the multi-node disaggregated server configs. - """ - self.disagg_configs = parse_multi_node_disagg_config_file( - self.config_path, self.select_pattern) - def get_model_family(self) -> str: """ Get the model family of the current model. @@ -1787,13 +1101,6 @@ def set_runtime_configs(self, output_dir, perf_cache_fpath, gpu_clock_lock=None) -> None: - if self._config.runtime == "aggr_server" or self._config.runtime == "multi_node_disagg_server": - self._config.config_dir = os.getenv( - "TRTLLM_CONFIG_FOLDER", - os.path.join(llm_root, self._config.config_dir)) - self._config.config_path = os.path.join(self._config.config_dir, - self._config.config_file) - if self._config.runtime == "cpp": if not self._config.is_bert_like(): raise ValueError( @@ -1805,14 +1112,8 @@ def set_runtime_configs(self, llm_root) elif self._config.runtime == "bench": benchmark_script = "trtllm-bench" - elif self._config.runtime == "aggr_server": - benchmark_script = None - self._config.set_aggr_server_configs() elif self._config.runtime == "disagg_server": benchmark_script = None - elif self._config.runtime == "multi_node_disagg_server": - benchmark_script = None - self._config.set_multi_node_disagg_server_configs() else: raise RuntimeError(f"Invalid runtime {self._config.runtime}.") @@ -1840,76 +1141,6 @@ def set_runtime_configs(self, self._llm_root = llm_root self._gpu_clock_lock = gpu_clock_lock - def get_trtllm_aggr_commands(self, output_dir): - server_cmds = [] - client_cmds = [] - names = [] - for server_idx, client_configs in self._config.server_client_configs.items( - ): - server_config = self._config.server_configs[server_idx] - server_cmd = server_config.to_cmd(output_dir) - # Generate extra-llm-api-config.yml - config_content = server_config.generate_extra_llm_api_config() - config_filename = f"extra-llm-api-config.{server_config.name}.yml" - config_path = os.path.join(output_dir, config_filename) - with open(config_path, 'w') as f: - f.write(config_content) - for client_config in client_configs: - server_cmds.append(server_cmd) - client_cmd = client_config.to_cmd() - client_cmds.append(client_cmd) - names.append(f"{server_config.name}-{client_config.name}") - return server_cmds, client_cmds, names - - def get_trtllm_multi_node_disagg_commands(self, output_dir): - ctx_server_cmds = [] - gen_server_cmds = [] - disagg_server_cmds = [] - benchmark_cmds = [] - cmd_idx = 0 - for disagg_config in self._config.disagg_configs: - disagg_serving_type = disagg_config['disagg_serving_type'] - disagg_config['hostname'] - numa_bind = disagg_config['numa_bind'] - ctx_server_cmd = None - gen_server_cmd = None - disagg_server_cmd = None - benchmark_cmd = None - if "CTX" in disagg_serving_type or "GEN" in disagg_serving_type: - is_ctx = "CTX" in disagg_serving_type - server_config = disagg_config[ - 'ctx_server'] if is_ctx else disagg_config['gen_server'] - server_cmd = server_config.to_cmd(output_dir, numa_bind, - disagg_serving_type) - if is_ctx: - ctx_server_cmd = server_cmd - else: - gen_server_cmd = server_cmd - # Generate extra-llm-api-config.yml - config_content = server_config.generate_extra_llm_api_config() - config_filename = f"extra-llm-api-config.{server_config.name}.yml" - config_path = os.path.join(output_dir, config_filename) - with open(config_path, 'w') as f: - f.write(config_content) - elif "DISAGG_SERVER" in disagg_serving_type: - timeout = disagg_config['timeout'] - # Generate DISAGG server command if this is the DISAGG server node - disagg_server_cmd = [ - "trtllm-serve", "disaggregated", "-c", - f"{output_dir}/server_config.{cmd_idx}.yaml", "-t", - str(timeout), "-r", - str(timeout) - ] - elif "BENCHMARK" in disagg_serving_type: - # Generate benchmark command if this is the BENCHMARK server node - benchmark_cmd = disagg_config['client'].to_cmd() - ctx_server_cmds.append(ctx_server_cmd) - gen_server_cmds.append(gen_server_cmd) - disagg_server_cmds.append(disagg_server_cmd) - benchmark_cmds.append(benchmark_cmd) - cmd_idx += 1 - return ctx_server_cmds, gen_server_cmds, disagg_server_cmds, benchmark_cmds - def get_trtllm_build_command(self, engine_dir, checkpoint_dir) -> list: build_cmd = [ self._build_script, f"--output_dir={engine_dir}", @@ -2175,22 +1406,7 @@ def get_commands(self): # Whether this is python or cpp runtime perf test. is_python = self._config.runtime == "python" num_gpus = self._config.num_gpus - is_aggr = self._config.runtime == "aggr_server" is_disagg = self._config.runtime == "disagg_server" - is_multi_node_disagg = self._config.runtime == "multi_node_disagg_server" - perf_sanity_output_dir = os.path.join(self._output_dir, - self._test_param_labels) - if is_aggr: - if not os.path.exists(perf_sanity_output_dir): - os.makedirs(perf_sanity_output_dir, exist_ok=True) - server_cmds, client_cmds, names = self.get_trtllm_aggr_commands( - perf_sanity_output_dir) - return PerfAggrScriptTestCmds(server_cmds=server_cmds, - client_cmds=client_cmds, - names=names, - timeout=3600, - output_dir=perf_sanity_output_dir) - if is_disagg: ctx_cmd, gen_cmd = self._get_disagg_worker_deploy_command() server_cmd = self._get_disagg_server_deploy_command() @@ -2199,26 +1415,6 @@ def get_commands(self): return PerfDisaggScriptTestCmds(ctx_cmd, gen_cmd, server_cmd, client_cmd, benchmark_cmd) - if is_multi_node_disagg: - if not os.path.exists(perf_sanity_output_dir): - os.makedirs(perf_sanity_output_dir, exist_ok=True) - ctx_server_cmds, gen_server_cmds, disagg_server_cmds, benchmark_cmds = self.get_trtllm_multi_node_disagg_commands( - perf_sanity_output_dir) - return PerfMultiNodeDisaggScriptTestCmds( - ctx_server_cmds=ctx_server_cmds, - gen_server_cmds=gen_server_cmds, - disagg_server_cmds=disagg_server_cmds, - benchmark_cmds=benchmark_cmds, - timeout=self._config.disagg_configs[0]['timeout'], - hostname=self._config.disagg_configs[0]['hostname'], - disagg_serving_type=self._config.disagg_configs[0] - ['disagg_serving_type'], - num_ctx_servers=self._config.disagg_configs[0]['hardware'] - ['num_ctx_servers'], - num_gen_servers=self._config.disagg_configs[0]['hardware'] - ['num_gen_servers'], - output_dir=perf_sanity_output_dir) - if is_python and num_gpus > 1: # TODO: Fix https://nvbugs/4449875 pytest.skip( @@ -2444,8 +1640,6 @@ def run_metrics(self, llm_venv, gpu_clock_lock, session_data_writer, if self._current_cmd_idx in self._test_results: del self._test_results[self._current_cmd_idx] - self.upload_test_results_to_database() - finally: # Clean up engine dir after use. shutil.rmtree(self._get_engine_dir(), ignore_errors=True) @@ -2473,166 +1667,6 @@ def add_myelin_time_pass_to(input_env): raise RuntimeError(msg) - def upload_test_results_to_database(self): - """ - Upload the test results and baseline to database. - """ - - def add_prefix(key: str, prefix_name: str) -> dict: - type_prefix = key[0:2] # 'l_', 's_', 'b_', 'd_' - rest = key[2:] - return f"{type_prefix}{prefix_name}_{rest}" - - def add_list_prefix(config_list: List, prefix_name: str) -> List: - return [add_prefix(key, prefix_name) for key in config_list] - - def add_dict_prefix(config_dict: dict, prefix_name: str) -> dict: - return { - add_prefix(key, prefix_name): value - for key, value in config_dict.items() - } - - match_keys = [] - # Only aggr_server and multi_node_disagg_server will upload. - if self._config.runtime == "aggr_server": - job_config = get_job_info() - job_config["s_gpu_type"] = self._config.gpu_type - is_post_merge = job_config["b_is_post_merge"] - new_data_dict = {} - cmd_idx = 0 - for server_idx, client_configs in self._config.server_client_configs.items( - ): - server_config = self._config.server_configs[server_idx] - server_config_dict = server_config.to_db_data() - for client_config in client_configs: - client_config_dict = client_config.to_db_data() - # If cmd_idx not in self._test_results or some metrics missing, skip this cmd_idx - if cmd_idx not in self._test_results or not all( - metric_type in self._test_results[cmd_idx] - for metric_type in AGGR_SERVER_METRICS): - print_info( - f"Skipped posting command {cmd_idx} 's test results since some metrics are missing in test results." - ) - cmd_idx += 1 - continue - new_data = { - "s_runtime": - "multi_node_aggr_server" if server_config.gpus - != server_config.gpus_per_node else "aggr_server" - } - new_data.update(job_config) - new_data.update(server_config_dict) - new_data.update(client_config_dict) - for metric_type in AGGR_SERVER_METRICS: - new_data[ - f"d_{PERF_METRIC_STRING[metric_type]}"] = self._test_results[ - cmd_idx][metric_type] - add_id(new_data) - new_data_dict[cmd_idx] = new_data - cmd_idx += 1 - if not match_keys: - match_keys.append("s_runtime") - if server_config.match_mode == "scenario": - match_keys = SCENARIO_MATCH_FIELDS.copy() - else: - match_keys.extend(server_config.to_match_keys()) - match_keys.extend(client_config.to_match_keys()) - - elif self._config.runtime == "multi_node_disagg_server": - if self._config.disagg_configs[0][ - 'disagg_serving_type'] != "BENCHMARK": - return - job_config = get_job_info() - job_config["s_gpu_type"] = self._config.gpu_type - is_post_merge = job_config["b_is_post_merge"] - new_data_dict = {} - cmd_idx = 0 - for disagg_config in self._config.disagg_configs: - # If cmd_idx not in self._test_results or some metrics missing, skip this cmd_idx - if cmd_idx not in self._test_results or not all( - metric_type in self._test_results[cmd_idx] - for metric_type in AGGR_SERVER_METRICS): - print_info( - f"Skipped posting command {cmd_idx} 's test results since some metrics are missing in test results." - ) - cmd_idx += 1 - continue - # Get ctx_server and gen_server configs with prefixed keys - ctx_server_config_dict = disagg_config['ctx_server'].to_db_data( - ) - gen_server_config_dict = disagg_config['gen_server'].to_db_data( - ) - client_config_dict = disagg_config['client'].to_db_data() - ctx_server_config_dict = add_dict_prefix( - ctx_server_config_dict, 'ctx') - gen_server_config_dict = add_dict_prefix( - gen_server_config_dict, 'gen') - - hardware = disagg_config.get('hardware', {}) - num_ctx_servers = hardware.get('num_ctx_servers', 0) - num_gen_servers = hardware.get('num_gen_servers', 0) - new_data = { - "s_runtime": "multi_node_disagg_server", - "s_benchmark_mode": disagg_config['mode'], - "s_server_env_var": disagg_config['server_env_var'], - "l_num_ctx_servers": num_ctx_servers, - "l_num_gen_servers": num_gen_servers - } - new_data.update(job_config) - if num_ctx_servers > 0: - new_data.update(ctx_server_config_dict) - if num_gen_servers > 0: - new_data.update(gen_server_config_dict) - new_data.update(client_config_dict) - for metric_type in AGGR_SERVER_METRICS: - new_data[ - f"d_{PERF_METRIC_STRING[metric_type]}"] = self._test_results[ - cmd_idx][metric_type] - add_id(new_data) - new_data_dict[cmd_idx] = new_data - cmd_idx += 1 - if not match_keys: - match_keys.extend( - ["s_runtime", "l_num_ctx_servers", "l_num_gen_servers"]) - if num_ctx_servers > 0: - match_keys.extend( - add_list_prefix( - disagg_config['ctx_server'].to_match_keys(), - 'ctx')) - if num_gen_servers > 0: - match_keys.extend( - add_list_prefix( - disagg_config['gen_server'].to_match_keys(), - 'gen')) - match_keys.extend(disagg_config['client'].to_match_keys()) - else: - return - - # Get history data for each cmd_idx - history_baseline_dict, history_data_dict = get_history_data( - new_data_dict, self._config.gpu_type, match_keys) - # Prepare regressive test cases - regressive_data_list = prepare_regressive_test_cases( - history_baseline_dict, new_data_dict) - - if is_post_merge: - # Prepare new baseline data for post-merge - new_baseline_data_dict = prepare_baseline_data( - history_data_dict, new_data_dict) - else: - # Pre-merge does not need to upload baseline data - new_baseline_data_dict = None - - if self._config.upload_to_db: - # Upload the new perf data and baseline data to database - post_new_perf_data(new_baseline_data_dict, new_data_dict, - regressive_data_list) - - perf_result_output_dir = os.path.join(self._output_dir, - self._test_param_labels) - write_regressive_test_cases(regressive_data_list, new_data_dict, - perf_result_output_dir) - def _get_engine_dir(self) -> str: """ Get the engine directory to store the engine. @@ -2646,32 +1680,6 @@ def _get_metrics(self) -> List[PerfTestMetric]: Generate all the metric configs for the current test. """ metrics = [] - if self._config.runtime == "aggr_server": - cmd_idx = 0 - for server_idx, client_configs in self._config.server_client_configs.items( - ): - server_name = self._config.server_configs[server_idx].name - for client_config in client_configs: - for metric_type in AGGR_SERVER_METRICS: - metrics.append( - PerfTestMetric( - original_test_name=self._full_test_name, - metric_name=self._get_metric_name( - metric_type=metric_type, - server_name=server_name, - client_name=client_config.name), - metric_type=metric_type, - metric_regex=self._get_metric_regex( - metric_type), - metric_threshold=self._get_metric_threshold( - metric_type), - metric_abs_threshold=self. - _get_metric_abs_threshold(metric_type), - cmd_idx=cmd_idx, - )) - cmd_idx += 1 - return metrics - if self._config.runtime == "disagg_server": for metric_type in DISAGG_SERVER_METRICS: metrics.append( @@ -2689,28 +1697,6 @@ def _get_metrics(self) -> List[PerfTestMetric]: )) return metrics - if self._config.runtime == "multi_node_disagg_server": - cmd_idx = 0 - for disagg_config in self._config.disagg_configs: - config_name = disagg_config['name'] - for metric_type in AGGR_SERVER_METRICS: - metrics.append( - PerfTestMetric( - original_test_name=self._full_test_name, - metric_name=self._get_metric_name( - metric_type=metric_type, - disagg_config_name=config_name), - metric_type=metric_type, - metric_regex=self._get_metric_regex(metric_type), - metric_threshold=self._get_metric_threshold( - metric_type), - metric_abs_threshold=self._get_metric_abs_threshold( - metric_type), - cmd_idx=cmd_idx, - )) - cmd_idx += 1 - return metrics - # Build command is the first command. cmd_idx = 0 if self._config.runtime != "bench" else 1 if self._config.runtime == "bench": diff --git a/tests/integration/defs/perf/test_perf_sanity.py b/tests/integration/defs/perf/test_perf_sanity.py new file mode 100644 index 00000000000..8252c0fc518 --- /dev/null +++ b/tests/integration/defs/perf/test_perf_sanity.py @@ -0,0 +1,1472 @@ +# SPDX-FileCopyrightText: Copyright (c) 2022-2025 NVIDIA CORPORATION & AFFILIATES. All rights reserved. +# SPDX-License-Identifier: Apache-2.0 +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +"""TensorRT LLM perf sanity tests.""" + +import contextlib +import copy +import glob +import io +import os +import re +import socket +import subprocess +import time +from typing import Dict, List, NamedTuple, Tuple + +import pytest +import requests +import yaml +from test_common.http_utils import wait_for_endpoint_ready + +from defs.trt_test_alternative import print_error, print_info +from tensorrt_llm._utils import get_free_port + +from ..conftest import get_llm_root, llm_models_root +from .open_search_db_utils import ( + SCENARIO_MATCH_FIELDS, + add_id, + get_history_data, + get_job_info, + post_new_perf_data, + prepare_baseline_data, + prepare_regressive_test_cases, + write_regressive_test_cases, +) +from .utils import collect_and_clean_myelin_time + +# Model PATH of local dir synced from internal LLM models repo +MODEL_PATH_DICT = { + "deepseek_r1_fp8": "DeepSeek-R1/DeepSeek-R1", + "deepseek_r1_nvfp4": "DeepSeek-R1/DeepSeek-R1-FP4", + "deepseek_r1_0528_fp8": "DeepSeek-R1/DeepSeek-R1-0528/", + "deepseek_r1_0528_fp4": "DeepSeek-R1/DeepSeek-R1-0528-FP4/", + "deepseek_r1_0528_fp4_v2": "DeepSeek-R1/DeepSeek-R1-0528-FP4-v2/", + "gpt_oss_120b_fp4": "gpt_oss/gpt-oss-120b", +} + +SUPPORTED_GPU_TYPE = [ + "B200", + "B300", + "GB200", + "GB300", +] + +DEFAULT_TIMEOUT = 7200 + +# Regex patterns for parsing benchmark output metrics +# Key is the metric name used in database (e.g., "mean_e2el", "seq_throughput") +PERF_METRIC_LOG_QUERIES = { + "seq_throughput": re.compile(r"Request throughput \(req\/s\):\s+(-?[\d\.]+)"), + "token_throughput": re.compile(r"Output token throughput \(tok\/s\):\s+(-?[\d\.]+)"), + "total_token_throughput": re.compile(r"Total Token throughput \(tok\/s\):\s+(-?[\d\.]+)"), + "user_throughput": re.compile(r"User throughput \(tok\/s\):\s+(-?[\d\.]+)"), + "mean_ttft": re.compile(r"Mean TTFT \(ms\):\s+(-?[\d\.]+)"), + "median_ttft": re.compile(r"Median TTFT \(ms\):\s+(-?[\d\.]+)"), + "p99_ttft": re.compile(r"P99 TTFT \(ms\):\s+(-?[\d\.]+)"), + "mean_itl": re.compile(r"Mean ITL \(ms\):\s+(-?[\d\.]+)"), + "median_itl": re.compile(r"Median ITL \(ms\):\s+(-?[\d\.]+)"), + "p99_itl": re.compile(r"P99 ITL \(ms\):\s+(-?[\d\.]+)"), + "mean_tpot": re.compile(r"Mean TPOT \(ms\):\s+(-?[\d\.]+)"), + "median_tpot": re.compile(r"Median TPOT \(ms\):\s+(-?[\d\.]+)"), + "p99_tpot": re.compile(r"P99 TPOT \(ms\):\s+(-?[\d\.]+)"), + "mean_e2el": re.compile(r"Mean E2EL \(ms\):\s+(-?[\d\.]+)"), + "median_e2el": re.compile(r"Median E2EL \(ms\):\s+(-?[\d\.]+)"), + "p99_e2el": re.compile(r"P99 E2EL \(ms\):\s+(-?[\d\.]+)"), +} + + +def get_model_dir(model_name: str) -> str: + """Get model directory path from model name.""" + if model_name in MODEL_PATH_DICT: + return os.path.join(llm_models_root(), MODEL_PATH_DICT[model_name]) + return "" + + +def get_dataset_path() -> str: + """Get dataset path for benchmark.""" + return os.path.join(llm_models_root(), "datasets", "ShareGPT_V3_unfiltered_cleaned_split.json") + + +def to_env_dict(env_vars: str) -> Dict[str, str]: + """Convert env vars string to dict.""" + env = {} + for env_var in env_vars.split(): + if "=" in env_var: + key, value = env_var.split("=", 1) + env[key] = value + return env + + +def add_host_port_to_cmd(cmd: List[str], host: str, port: int) -> List[str]: + """Add host and port to command.""" + return cmd + ["--host", host, "--port", str(port)] + + +class ServerConfig: + """Configurations of trtllm-server.""" + + def __init__(self, server_config_data: dict, env_vars: str = ""): + # Extract required fields + self.mode = server_config_data.get("mode", "e2e") + self.concurrency = server_config_data.get("concurrency", 1) + self.name = server_config_data["name"] + self.model_name = server_config_data["model_name"] + self.model_path = "" + self.env_vars = env_vars + + # Extract optional fields with defaults + self.tp = server_config_data.get("tensor_parallel_size", 1) + self.ep = server_config_data.get("moe_expert_parallel_size", 1) + self.pp = server_config_data.get("pipeline_parallel_size", 1) + self.cp = server_config_data.get("context_parallel_size", 1) + self.gpus = server_config_data.get("gpus", self.tp * self.cp * self.pp) + self.gpus_per_node = server_config_data.get("gpus_per_node", 0) or self.gpus + self.max_num_tokens = server_config_data.get("max_num_tokens", 2048) + self.max_batch_size = server_config_data.get("max_batch_size", 512) + self.max_seq_len = server_config_data.get("max_seq_len", 0) + self.disable_overlap_scheduler = server_config_data.get("disable_overlap_scheduler", False) + self.num_postprocess_workers = server_config_data.get("num_postprocess_workers", 0) + self.stream_interval = server_config_data.get("stream_interval", 10) + self.attn_backend = server_config_data.get("attn_backend", "TRTLLM") + self.enable_chunked_prefill = server_config_data.get("enable_chunked_prefill", False) + self.enable_attention_dp = server_config_data.get("enable_attention_dp", False) + self.trust_remote_code = server_config_data.get("trust_remote_code", False) + self.enable_lm_head_tp_in_adp = server_config_data.get("enable_lm_head_tp_in_adp", False) + + # attention_dp_config + attention_dp_config = server_config_data.get("attention_dp_config", {}) + self.attention_dp_balance = attention_dp_config.get("enable_balance", False) + self.batching_wait_iters = attention_dp_config.get("batching_wait_iters", 0) + self.timeout_iters = attention_dp_config.get("timeout_iters", 60) + + # moe_config + moe_config = server_config_data.get("moe_config", {}) + self.moe_backend = moe_config.get("backend", "") + self.moe_max_num_tokens = moe_config.get("max_num_tokens", 0) + self.use_low_precision_moe_combine = moe_config.get("use_low_precision_moe_combine", False) + load_balancer_config = moe_config.get("load_balancer", {}) + self.load_balancer_num_slots = load_balancer_config.get("num_slots", 0) + self.load_balancer_layer_updates_per_iter = load_balancer_config.get( + "layer_updates_per_iter", 0 + ) + + # cuda_graph_config + cuda_graph_config = server_config_data.get("cuda_graph_config", {}) + self.enable_cuda_graph = False + if cuda_graph_config: + self.enable_cuda_graph = True + self.enable_padding = cuda_graph_config.get("enable_padding", True) + self.cuda_graph_batch_sizes = cuda_graph_config.get("batch_sizes", []) + self.cuda_graph_max_batch_size = cuda_graph_config.get("max_batch_size", 0) + else: + self.enable_padding = True + self.cuda_graph_batch_sizes = [] + self.cuda_graph_max_batch_size = 0 + + # kv_cache_config + kv_cache_config = server_config_data.get("kv_cache_config", {}) + self.kv_cache_dtype = kv_cache_config.get("dtype", "fp8") + self.enable_block_reuse = kv_cache_config.get("enable_block_reuse", False) + self.free_gpu_memory_fraction = kv_cache_config.get("free_gpu_memory_fraction", 0.8) + + # cache_transceiver_config + cache_transceiver_config = server_config_data.get("cache_transceiver_config", {}) + self.cache_transceiver_backend = cache_transceiver_config.get("backend", "") + self.cache_transceiver_max_tokens_in_buffer = cache_transceiver_config.get( + "max_tokens_in_buffer", 0 + ) + + # speculative_config + speculative_config = server_config_data.get("speculative_config", {}) + self.spec_decoding_type = speculative_config.get("decoding_type", "") + self.num_nextn_predict_layers = speculative_config.get("num_nextn_predict_layers", 0) + eagle3_value = speculative_config.get("eagle3_layers_to_capture", []) + if isinstance(eagle3_value, int): + self.eagle3_layers_to_capture = [eagle3_value] + elif isinstance(eagle3_value, list): + self.eagle3_layers_to_capture = eagle3_value + else: + self.eagle3_layers_to_capture = [] + self.max_draft_len = speculative_config.get("max_draft_len", 0) + self.speculative_model_dir = speculative_config.get("speculative_model_dir", "") + + # match_mode: "config" (default) or "scenario" + self.match_mode = server_config_data.get("match_mode", "config") + + # Store filtered config for extra_llm_api_config + exclude_keys = [ + "mode", + "concurrency", + "name", + "model_name", + "gpus", + "gpus_per_node", + "client_configs", + ] + self.extra_llm_api_config_data = { + k: v for k, v in server_config_data.items() if k not in exclude_keys + } + + def to_cmd( + self, output_dir: str, numa_bind: bool = False, disagg_serving_type: str = "" + ) -> List[str]: + """Generate server command.""" + model_dir = get_model_dir(self.model_name) + self.model_path = model_dir if os.path.exists(model_dir) else self.model_name + config_filename = f"extra-llm-api-config.{self.name}.yml" + config_path = os.path.join(output_dir, config_filename) + + numa_bind_cmd = [] + if numa_bind: + numa_bind_cmd = ["numactl", "-m 0,1"] + + cmd = numa_bind_cmd + [ + "trtllm-serve", + self.model_path, + "--backend", + "pytorch", + "--config", + config_path, + ] + return cmd + + def to_env(self) -> Dict[str, str]: + return to_env_dict(self.env_vars) + + def to_match_keys(self) -> List[str]: + return [ + "s_mode", + "s_model_name", + "l_tp", + "l_ep", + "l_pp", + "l_cp", + "l_gpus_per_node", + "l_max_batch_size", + "b_disable_overlap_scheduler", + "l_num_postprocess_workers", + "s_attn_backend", + "b_enable_chunked_prefill", + "b_enable_attention_dp", + "b_enable_lm_head_tp_in_adp", + # attention_dp_config + "b_attention_dp_balance", + # moe_config + "s_moe_backend", + # cuda_graph_config + "b_enable_cuda_graph", + # kv_cache_config + "s_kv_cache_dtype", + # cache_transceiver_config + "s_cache_transceiver_backend", + # speculative_config + "s_spec_decoding_type", + "l_num_nextn_predict_layers", + ] + + def to_db_data(self) -> dict: + """Convert ServerConfig to database data.""" + db_data = { + "s_mode": self.mode, + "s_model_name": self.model_name.lower(), + "l_gpus": self.gpus, + "l_tp": self.tp, + "l_ep": self.ep, + "l_pp": self.pp, + "l_cp": self.cp, + "l_gpus_per_node": self.gpus_per_node, + "l_max_num_tokens": self.max_num_tokens, + "l_max_batch_size": self.max_batch_size, + "l_max_seq_len": self.max_seq_len, + "b_disable_overlap_scheduler": self.disable_overlap_scheduler, + "l_num_postprocess_workers": self.num_postprocess_workers, + "l_stream_interval": self.stream_interval, + "s_attn_backend": self.attn_backend, + "b_enable_chunked_prefill": self.enable_chunked_prefill, + "b_enable_attention_dp": self.enable_attention_dp, + "b_trust_remote_code": self.trust_remote_code, + "b_enable_lm_head_tp_in_adp": self.enable_lm_head_tp_in_adp, + # attention_dp_config + "b_attention_dp_balance": self.attention_dp_balance, + "l_batching_wait_iters": self.batching_wait_iters, + "l_timeout_iters": self.timeout_iters, + # moe_config + "s_moe_backend": self.moe_backend, + "l_moe_max_num_tokens": self.moe_max_num_tokens, + "b_use_low_precision_moe_combine": self.use_low_precision_moe_combine, + "l_load_balancer_num_slots": self.load_balancer_num_slots, + "l_load_balancer_layer_updates_per_iter": self.load_balancer_layer_updates_per_iter, + # cuda_graph_config + "b_enable_cuda_graph": self.enable_cuda_graph, + "b_enable_padding": self.enable_padding, + "l_cuda_graph_max_batch_size": self.cuda_graph_max_batch_size, + "s_cuda_graph_batch_sizes": ",".join(map(str, self.cuda_graph_batch_sizes)), + # kv_cache_config + "s_kv_cache_dtype": self.kv_cache_dtype, + "b_enable_block_reuse": self.enable_block_reuse, + "d_free_gpu_memory_fraction": self.free_gpu_memory_fraction, + # cache_transceiver_config + "s_cache_transceiver_backend": self.cache_transceiver_backend, + "l_cache_transceiver_max_tokens_in_buffer": self.cache_transceiver_max_tokens_in_buffer, + # speculative_config + "s_spec_decoding_type": self.spec_decoding_type, + "l_num_nextn_predict_layers": self.num_nextn_predict_layers, + "s_eagle3_layers_to_capture": ",".join(map(str, self.eagle3_layers_to_capture)), + "l_max_draft_len": self.max_draft_len, + "s_speculative_model_dir": self.speculative_model_dir, + "s_server_log_link": "", + "s_server_env_var": self.env_vars, + } + return db_data + + def generate_extra_llm_api_config(self) -> str: + """Generate extra-llm-api-config.yml content.""" + config_data = dict(self.extra_llm_api_config_data) + + # Handle speculative_model_dir path conversion + if ( + "speculative_config" in config_data + and "speculative_model_dir" in config_data["speculative_config"] + ): + spec_model_dir = config_data["speculative_config"]["speculative_model_dir"] + if spec_model_dir: + config_data["speculative_config"]["speculative_model_dir"] = os.path.join( + llm_models_root(), spec_model_dir + ) + + return yaml.dump(config_data, default_flow_style=False, sort_keys=False) + + +class ClientConfig: + """Configurations of benchmark client.""" + + def __init__(self, client_config_data: dict, model_name: str, env_vars: str = ""): + self.name = client_config_data.get("name", "") + self.model_name = model_name + self.concurrency = client_config_data.get("concurrency", 1) + self.iterations = client_config_data.get("iterations", 1) + self.isl = client_config_data.get("isl", 1024) + self.osl = client_config_data.get("osl", 1024) + self.random_range_ratio = client_config_data.get("random_range_ratio", 0.0) + self.backend = client_config_data.get("backend", "openai") + self.use_chat_template = client_config_data.get("use_chat_template", False) + self.streaming = client_config_data.get("streaming", True) + self.model_path = "" + self.env_vars = env_vars + + def to_cmd(self) -> List[str]: + """Generate benchmark command.""" + model_dir = get_model_dir(self.model_name) + self.model_path = model_dir if os.path.exists(model_dir) else self.model_name + dataset_path = get_dataset_path() + benchmark_cmd = [ + "python", + "-m", + "tensorrt_llm.serve.scripts.benchmark_serving", + "--model", + self.model_path, + "--tokenizer", + self.model_path, + "--dataset-name", + "random", + "--random-ids", + "--num-prompts", + str(self.concurrency * self.iterations), + "--max-concurrency", + str(self.concurrency), + "--random-input-len", + str(self.isl), + "--random-output-len", + str(self.osl), + "--random-range-ratio", + str(self.random_range_ratio), + "--trust-remote-code", + "--ignore-eos", + "--percentile-metrics", + "ttft,tpot,itl,e2el", + ] + if dataset_path and os.path.exists(dataset_path): + benchmark_cmd.append("--dataset-path") + benchmark_cmd.append(dataset_path) + if self.backend: + benchmark_cmd.append("--backend") + benchmark_cmd.append(self.backend) + if self.use_chat_template: + benchmark_cmd.append("--use-chat-template") + if not self.streaming: + benchmark_cmd.append("--non-streaming") + return benchmark_cmd + + def to_env(self) -> Dict[str, str]: + return to_env_dict(self.env_vars) + + def to_match_keys(self) -> List[str]: + return [ + "l_concurrency", + "l_iterations", + "l_isl", + "l_osl", + "d_random_range_ratio", + "s_backend", + "b_use_chat_template", + "b_streaming", + ] + + def to_db_data(self) -> dict: + """Convert ClientConfig to database data.""" + db_data = { + "l_concurrency": self.concurrency, + "l_iterations": self.iterations, + "l_isl": self.isl, + "l_osl": self.osl, + "d_random_range_ratio": self.random_range_ratio, + "s_backend": self.backend, + "b_use_chat_template": self.use_chat_template, + "b_streaming": self.streaming, + "s_client_log_link": "", + "s_client_env_vars": self.env_vars, + } + if self.backend: + db_data["s_backend"] = self.backend + if self.use_chat_template: + db_data["b_use_chat_template"] = self.use_chat_template + return db_data + + +class DisaggConfig: + """Configurations for disaggregated server.""" + + def __init__( + self, + disagg_serving_type: str, + hostname: str, + numa_bind: bool, + timeout: int, + mode: str, + model_name: str, + hardware: dict, + server_env_var: str, + ): + self.disagg_serving_type = disagg_serving_type + self.hostname = hostname + self.numa_bind = numa_bind + self.timeout = timeout + self.mode = mode + self.model_name = model_name + self.hardware = hardware + self.server_env_var = server_env_var + self.num_ctx_servers = hardware.get("num_ctx_servers", 0) + self.num_gen_servers = hardware.get("num_gen_servers", 0) + + +class AggrTestCmds(NamedTuple): + """Commands for aggregated server perf sanity tests.""" + + server_cmds: List[List[str]] + client_cmds: Dict[int, List[List[str]]] + timeout: int + output_dir: str + + def run_cmd(self, server_idx: int) -> List[str]: + """Run all clients for a server and return outputs.""" + outputs = [] + server_proc = None + server_cmd = self.server_cmds[server_idx] + + try: + server_hostname = "localhost" + server_port = get_free_port() + server_cmd_with_port = add_host_port_to_cmd(server_cmd, server_hostname, server_port) + + server_file_path = os.path.join(self.output_dir, f"trtllm-serve.{server_idx}.log") + + print_info(f"Starting server. cmd is {server_cmd_with_port}") + with open(server_file_path, "w") as server_ctx: + server_proc = subprocess.Popen( + server_cmd_with_port, + stdout=server_ctx, + stderr=subprocess.STDOUT, + env=copy.deepcopy(os.environ), + ) + + wait_for_endpoint_ready( + f"http://{server_hostname}:{server_port}/health", timeout=self.timeout + ) + + # Run all clients for this server + for client_idx, client_cmd in enumerate(self.client_cmds[server_idx]): + client_file_path = os.path.join( + self.output_dir, f"trtllm-benchmark.{server_idx}.{client_idx}.log" + ) + + client_cmd_with_port = add_host_port_to_cmd( + client_cmd, server_hostname, server_port + ) + print_info(f"Starting client. cmd is {client_cmd_with_port}") + + output = subprocess.check_output( + client_cmd_with_port, + stderr=subprocess.STDOUT, + env=copy.deepcopy(os.environ), + ).decode() + + with open(client_file_path, "w") as client_ctx: + client_ctx.write(output) + + outputs.append(output) + + finally: + if server_proc: + server_proc.terminate() + server_proc.wait() + + return outputs + + def get_cmd_str(self, server_idx: int) -> List[str]: + return ["aggr_server tests, please check config files"] + + +class DisaggTestCmds(NamedTuple): + """Commands for multi-node disaggregated server perf sanity tests.""" + + server_cmds: List[Tuple[List[str], List[str], List[str]]] + client_cmds: Dict[int, List[List[str]]] + timeout: int + hostname: str + disagg_serving_type: str + num_ctx_servers: int + num_gen_servers: int + output_dir: str + + def _generate_hostname_file(self, server_idx: int, port: int): + """Create hostname file for coordination.""" + hostnames_dir = os.path.join(self.output_dir, f"hostnames-{server_idx}") + if not os.path.exists(hostnames_dir): + os.makedirs(hostnames_dir, exist_ok=True) + hostname_file = os.path.join(hostnames_dir, f"{self.disagg_serving_type}.txt") + with open(hostname_file, "w") as f: + f.write(f"{self.hostname}:{port}") + + def _generate_disagg_server_config(self, server_idx: int, disagg_server_port: int) -> str: + """Generate disagg server config from hostname files.""" + print_info(f"Generating disagg server config for server index {server_idx}") + hostnames_folder = os.path.join(self.output_dir, f"hostnames-{server_idx}") + expected_count = self.num_ctx_servers + self.num_gen_servers + start_time = time.time() + hostnames = [] + + while True: + elapsed_time = time.time() - start_time + print_info( + f"Waiting for hostnames in {hostnames_folder}, " + f"elapsed time: {elapsed_time}s, current: {len(hostnames)}, " + f"expected: {expected_count}" + ) + if elapsed_time > self.timeout: + print_error(f"Time out. Hostnames files are not ready after {self.timeout}s") + break + time.sleep(10) + if not os.path.exists(hostnames_folder): + continue + hostnames = os.listdir(hostnames_folder) + if len(hostnames) >= expected_count: + break + + print_info(f"All hostnames found in {hostnames_folder} after elapsed time: {elapsed_time}s") + + # Read ctx and gen hostnames + ctx_hostnames = [] + gen_hostnames = [] + for hostname_file in hostnames: + hostname_file_path = os.path.join(hostnames_folder, hostname_file) + with open(hostname_file_path, "r") as f: + hostname_port = f.read().strip() + if hostname_file.startswith("CTX"): + ctx_hostnames.append(hostname_port) + elif hostname_file.startswith("GEN"): + gen_hostnames.append(hostname_port) + + server_config = { + "hostname": self.hostname, + "port": disagg_server_port, + "backend": "pytorch", + "context_servers": { + "num_instances": self.num_ctx_servers, + "urls": ctx_hostnames, + }, + "generation_servers": { + "num_instances": self.num_gen_servers, + "urls": gen_hostnames, + }, + } + config_path = os.path.join(self.output_dir, f"server_config.{server_idx}.yaml") + with open(config_path, "w") as f: + yaml.dump(server_config, f) + print_info(f"Server config file {config_path} generated") + return config_path + + def _get_disagg_server_hostname_and_port(self, server_idx: int) -> Tuple[str, int]: + """Wait for and read disagg server config.""" + config_path = os.path.join(self.output_dir, f"server_config.{server_idx}.yaml") + start_time = time.time() + while True: + if os.path.exists(config_path): + print_info(f"Server config file found: {config_path}") + break + elapsed_time = time.time() - start_time + if elapsed_time > self.timeout: + print_error(f"Server config file {config_path} not found after {self.timeout}s") + break + print_info(f"Waiting for server config file, elapsed time: {elapsed_time}s") + time.sleep(10) + + with open(config_path, "r") as f: + server_config = yaml.safe_load(f) + return server_config["hostname"], server_config["port"] + + def wait_for_benchmark_ready(self, benchmark_status_file: str): + """Wait for benchmark to complete.""" + start_time = time.time() + while True: + if os.path.exists(benchmark_status_file): + print_info( + f"Benchmark status file found, terminating server {self.disagg_serving_type}" + ) + break + elapsed_time = time.time() - start_time + print_info(f"Waiting for benchmark status file, elapsed time: {elapsed_time}s") + if elapsed_time > self.timeout: + print_error(f"Timeout waiting for benchmark status file after {self.timeout}s") + break + time.sleep(10) + + def wait_for_endpoint_ready(self, url: str): + """Wait for endpoint to be ready.""" + start = time.monotonic() + while True: + elapsed_time = time.monotonic() - start + if elapsed_time > self.timeout: + print_error( + f"Timeout waiting for endpoint {url} to be ready after {self.timeout} seconds" + ) + break + print_info(f"Waiting for endpoint {url} to be ready, elapsed time: {elapsed_time}s") + try: + time.sleep(10) + if requests.get(url).status_code == 200: + print_info(f"endpoint {url} is ready") + return + except Exception as err: + print_info(f"endpoint {url} is not ready, with exception: {err}") + + def run_cmd(self, server_idx: int) -> List[str]: + """Run commands for a server and return outputs.""" + outputs = [] + benchmark_status_file = os.path.join(self.output_dir, f"benchmark_status.{server_idx}.txt") + port = get_free_port() + + ctx_cmd, gen_cmd, disagg_cmd = self.server_cmds[server_idx] + + if "CTX" in self.disagg_serving_type or "GEN" in self.disagg_serving_type: + self._generate_hostname_file(server_idx, port) + server_file_path = os.path.join( + self.output_dir, f"trtllm-serve.{server_idx}.{self.disagg_serving_type}.log" + ) + is_ctx = "CTX" in self.disagg_serving_type + server_cmd = ctx_cmd if is_ctx else gen_cmd + server_cmd = add_host_port_to_cmd(server_cmd, self.hostname, port) + + try: + print_info( + f"Starting server. disagg_serving_type: {self.disagg_serving_type} cmd is {server_cmd}" + ) + with open(server_file_path, "w") as server_ctx: + server_proc = subprocess.Popen( + server_cmd, + stdout=server_ctx, + stderr=subprocess.STDOUT, + env=copy.deepcopy(os.environ), + ) + self.wait_for_benchmark_ready(benchmark_status_file) + finally: + print_info(f"Server {self.disagg_serving_type} stopped") + server_proc.terminate() + server_proc.wait() + + elif self.disagg_serving_type == "DISAGG_SERVER": + disagg_server_file_path = os.path.join( + self.output_dir, f"trtllm-serve.{server_idx}.{self.disagg_serving_type}.log" + ) + + try: + self._generate_disagg_server_config(server_idx, port) + print_info(f"Starting disagg server. cmd is {disagg_cmd}") + with open(disagg_server_file_path, "w") as disagg_server_ctx: + disagg_server_proc = subprocess.Popen( + disagg_cmd, + stdout=disagg_server_ctx, + stderr=subprocess.STDOUT, + env=copy.deepcopy(os.environ), + ) + self.wait_for_benchmark_ready(benchmark_status_file) + finally: + print_info(f"Disagg server {self.disagg_serving_type} stopped") + disagg_server_proc.terminate() + disagg_server_proc.wait() + + elif self.disagg_serving_type == "BENCHMARK": + try: + disagg_server_hostname, disagg_server_port = ( + self._get_disagg_server_hostname_and_port(server_idx) + ) + self.wait_for_endpoint_ready( + f"http://{disagg_server_hostname}:{disagg_server_port}/health" + ) + + # Run all clients for this server + for client_idx, client_cmd in enumerate(self.client_cmds[server_idx]): + benchmark_file_path = os.path.join( + self.output_dir, f"trtllm-benchmark.{server_idx}.{client_idx}.log" + ) + + client_cmd_with_port = add_host_port_to_cmd( + client_cmd, disagg_server_hostname, disagg_server_port + ) + print_info(f"Starting benchmark. cmd is {client_cmd_with_port}") + + output = subprocess.check_output( + client_cmd_with_port, + env=copy.deepcopy(os.environ), + stderr=subprocess.STDOUT, + ).decode() + + with open(benchmark_file_path, "w") as benchmark_ctx: + benchmark_ctx.write(output) + outputs.append(output) + + finally: + with open(benchmark_status_file, "w") as status_file: + status_file.write("Done") + + return outputs + + def get_cmd_str(self, server_idx: int) -> List[str]: + return ["multi-node disaggregated server tests, please check config files"] + + +def parse_select_pattern(select_pattern: str) -> List[int]: + """Parse select pattern into list of server indices. + + Args: + select_pattern: Can be a single index "1" or a range "2-3". + + Returns: + List of server indices. + """ + if "-" in select_pattern: + parts = select_pattern.split("-") + start_idx = int(parts[0]) + end_idx = int(parts[1]) + return list(range(start_idx, end_idx + 1)) + else: + return [int(select_pattern)] + + +class PerfSanityTestConfig: + """Configuration for perf sanity tests.""" + + def __init__(self, test_case_name: str, output_dir: str): + self._output_dir = output_dir + self._test_results: Dict[int, Dict[str, float]] = {} + self._perf_results: Dict[int, List[Dict[str, float]]] = {} + + # Parse test case name + self.parse_test_case_name(test_case_name) + + def parse_test_case_name(self, test_case_name: str): + """Parse test case name into components.""" + self._test_param_labels = test_case_name + + # Extract configs from test param labels + labels = self._test_param_labels.split("-") + + def get_gpu_type() -> str: + try: + output = subprocess.check_output( + ["nvidia-smi", "-L"], stderr=subprocess.DEVNULL, text=True + ) + first_line = output.strip().split("\n")[0] + gpu_models = SUPPORTED_GPU_TYPE + for model in gpu_models: + if model in first_line: + if model.startswith("B") and not model.startswith("GB"): + return f"dgx_{model.lower()}" + return model.lower() + except (subprocess.CalledProcessError, FileNotFoundError, IndexError): + print_error("Failed to get GPU type") + return "" + + assert len(labels) > 1, "perf_sanity test must have a config file!" + is_disagg = "disagg" in labels[0] + self.upload_to_db = "upload" in labels[0] + self.gpu_type = get_gpu_type() + + if is_disagg: + # For disagg: disagg_upload-deepseek-r1-fp4_8k1k_ctx1_gen1_dep32_bs128_eplb0_mtp0_ccb-UCX + self.runtime = "multi_node_disagg_server" + self.config_dir = "tests/integration/defs/perf/disagg/test_configs/disagg/perf" + config_base = "-".join(labels[1:]) + self.config_file = ( + f"{config_base}.yaml" if not config_base.endswith(".yaml") else config_base + ) + self.select_pattern = None + else: + # For aggr: aggr_upload-config_yml-0 or aggr_upload-config_yml-0-5 + self.runtime = "aggr_server" + self.config_dir = "tests/scripts/perf-sanity" + config_base = labels[1] + self.config_file = ( + f"{config_base}.yaml" + if config_base and not config_base.endswith(".yaml") + else config_base + ) + # select_pattern can be "0" (single) or "0-5" (range) + self.select_pattern = "-".join(labels[2:]) if len(labels) > 2 else None + + self.config_dir = os.getenv( + "TRTLLM_CONFIG_FOLDER", os.path.join(get_llm_root(), self.config_dir) + ) + + # Initialize server configs + self.server_configs: List = [] + self.server_client_configs: Dict[int, List[ClientConfig]] = {} + + def parse_config_file(self): + """Parse config file based on runtime.""" + config_file_path = os.path.join(self.config_dir, self.config_file) + + if self.runtime == "aggr_server": + self._parse_aggr_config_file(config_file_path) + elif self.runtime == "multi_node_disagg_server": + self._parse_disagg_config_file(config_file_path) + + def _parse_aggr_config_file(self, config_file_path: str): + """Parse YAML config file for aggregated server.""" + # Parse selection pattern (server indices) + if self.select_pattern: + selected_server_indices = parse_select_pattern(self.select_pattern) + else: + selected_server_indices = None + + with open(config_file_path, "r") as f: + config = yaml.safe_load(f) + + metadata = config.get("metadata", {}) + environment = config.get("environment", {}) + hardware = config.get("hardware", {}) + gpus_per_node = hardware.get("gpus_per_node", 0) + + model_name = metadata.get("model_name", "") + server_env_var = environment.get("server_env_var", "") + client_env_var = environment.get("client_env_var", "") + + server_configs = [] + server_client_configs = {} + + for server_idx, server_config_data in enumerate(config["server_configs"]): + # Check if this server should be included based on selected_server_indices + if ( + selected_server_indices is not None + and (server_idx + 1) not in selected_server_indices + ): + continue + + server_config_data["model_name"] = ( + model_name + if "model_name" not in server_config_data + else server_config_data["model_name"] + ) + server_config_data["mode"] = "e2e" + server_config_data["concurrency"] = -1 + server_config_data["gpus_per_node"] = gpus_per_node + + server_config = ServerConfig(server_config_data, server_env_var) + server_id = len(server_configs) + server_configs.append(server_config) + + client_configs = [] + for client_config_data in server_config_data["client_configs"]: + client_config = ClientConfig( + client_config_data, server_config_data["model_name"], client_env_var + ) + client_configs.append(client_config) + + server_client_configs[server_id] = client_configs + + self.server_configs = server_configs + self.server_client_configs = server_client_configs + + def _parse_disagg_config_file(self, config_file_path: str): + """Parse YAML config file for disaggregated server.""" + disagg_serving_type = os.environ.get("DISAGG_SERVING_TYPE", "BENCHMARK") + + with open(config_file_path, "r") as f: + config = yaml.safe_load(f) + + metadata = config.get("metadata", {}) + hardware = config.get("hardware", {}) + benchmark = config.get("benchmark", {}) + environment = config.get("environment", {}) + slurm_config = config.get("slurm", {}) + worker_config = config.get("worker_config", {}) + + timeout = slurm_config.get("timeout", DEFAULT_TIMEOUT) + numa_bind = slurm_config.get("numa_bind", False) + gpus_per_node = hardware.get("gpus_per_node", 0) + model_name = metadata.get("model_name", "") + assert model_name, "model_name is required in metadata section" + + benchmark_mode = benchmark.get("mode", "e2e") + if "gen_only" in benchmark_mode: + hardware["num_ctx_servers"] = 0 + + worker_env_var = environment.get("worker_env_var", "") + server_env_var = environment.get("server_env_var", "") + client_env_var = environment.get("client_env_var", "") + + # Parse concurrency_list - can be string or list + concurrency_str = benchmark.get("concurrency_list", "1") + if isinstance(concurrency_str, str): + concurrency_values = [int(x) for x in concurrency_str.split()] + elif isinstance(concurrency_str, list): + concurrency_values = [int(x) for x in concurrency_str] + else: + concurrency_values = [int(concurrency_str)] + + # Gen only mode only runs max concurrency + if "gen_only" in benchmark_mode: + concurrency_values = [max(concurrency_values)] + + # Create ctx server config + ctx_server_config_data = { + "mode": benchmark_mode, + "concurrency": max(concurrency_values), + "name": "ctx", + "model_name": model_name, + "gpus_per_node": gpus_per_node, + **worker_config.get("ctx", {}), + } + + # Create gen server config + gen_server_config_data = { + "mode": benchmark_mode, + "concurrency": max(concurrency_values), + "name": "gen", + "model_name": model_name, + "gpus_per_node": gpus_per_node, + **worker_config.get("gen", {}), + } + + ctx_server_config = ServerConfig(ctx_server_config_data, worker_env_var) + gen_server_config = ServerConfig(gen_server_config_data, worker_env_var) + + # Create disagg config + disagg_config = DisaggConfig( + disagg_serving_type=disagg_serving_type, + hostname=socket.gethostname(), + numa_bind=numa_bind, + timeout=timeout, + mode=benchmark_mode, + model_name=model_name, + hardware=hardware, + server_env_var=server_env_var, + ) + + # server_configs is a list with one element (tuple of ctx, gen, disagg config) + self.server_configs = [(ctx_server_config, gen_server_config, disagg_config)] + + # Create client configs for each concurrency value + client_configs = [] + for concurrency in concurrency_values: + client_config_data = { + "name": f"client_con{concurrency}", + "concurrency": concurrency, + "iterations": benchmark.get("multi_round", 1), + "isl": benchmark.get("input_length", 1024), + "osl": benchmark.get("output_length", 1024), + "random_range_ratio": benchmark.get("benchmark_ratio", 0.0), + "backend": "openai", + "use_chat_template": False, + "streaming": benchmark.get("streaming", True), + } + client_config = ClientConfig(client_config_data, model_name, client_env_var) + client_configs.append(client_config) + + self.server_client_configs = {0: client_configs} + + def get_commands(self): + """Get commands based on runtime.""" + perf_sanity_output_dir = os.path.join(self._output_dir, self._test_param_labels) + os.makedirs(perf_sanity_output_dir, exist_ok=True) + + if self.runtime == "aggr_server": + return self._get_aggr_commands(perf_sanity_output_dir) + elif self.runtime == "multi_node_disagg_server": + return self._get_disagg_commands(perf_sanity_output_dir) + + def _get_aggr_commands(self, output_dir: str): + """Get commands for aggregated server.""" + server_cmds = [] + client_cmds = {} + + for server_idx, client_configs in self.server_client_configs.items(): + server_config = self.server_configs[server_idx] + server_cmd = server_config.to_cmd(output_dir) + + # Generate extra-llm-api-config.yml + config_content = server_config.generate_extra_llm_api_config() + config_filename = f"extra-llm-api-config.{server_config.name}.yml" + config_path = os.path.join(output_dir, config_filename) + with open(config_path, "w") as f: + f.write(config_content) + + server_cmds.append(server_cmd) + client_cmds[server_idx] = [] + + for client_config in client_configs: + client_cmd = client_config.to_cmd() + client_cmds[server_idx].append(client_cmd) + + return AggrTestCmds( + server_cmds=server_cmds, + client_cmds=client_cmds, + timeout=DEFAULT_TIMEOUT, + output_dir=output_dir, + ) + + def _get_disagg_commands(self, output_dir: str): + """Get commands for disaggregated server.""" + server_cmds = [] + client_cmds = {} + + for server_idx, (ctx_config, gen_config, disagg_config) in enumerate(self.server_configs): + numa_bind = disagg_config.numa_bind + timeout = disagg_config.timeout + disagg_serving_type = disagg_config.disagg_serving_type + + # Generate ctx server command + ctx_cmd = ctx_config.to_cmd(output_dir, numa_bind, "CTX") + if "CTX" in disagg_serving_type: + config_content = ctx_config.generate_extra_llm_api_config() + config_path = os.path.join(output_dir, "extra-llm-api-config.ctx.yml") + with open(config_path, "w") as f: + f.write(config_content) + + # Generate gen server command + gen_cmd = gen_config.to_cmd(output_dir, numa_bind, "GEN") + if "GEN" in disagg_serving_type: + config_content = gen_config.generate_extra_llm_api_config() + config_path = os.path.join(output_dir, "extra-llm-api-config.gen.yml") + with open(config_path, "w") as f: + f.write(config_content) + + # Generate disagg server command + disagg_cmd = [ + "trtllm-serve", + "disaggregated", + "-c", + f"{output_dir}/server_config.{server_idx}.yaml", + "-t", + str(timeout), + "-r", + str(timeout), + ] + + server_cmds.append((ctx_cmd, gen_cmd, disagg_cmd)) + + # Add client commands + client_cmds[server_idx] = [] + for client_config in self.server_client_configs[server_idx]: + client_cmd = client_config.to_cmd() + client_cmds[server_idx].append(client_cmd) + + disagg_config = self.server_configs[0][2] + return DisaggTestCmds( + server_cmds=server_cmds, + client_cmds=client_cmds, + timeout=disagg_config.timeout, + hostname=disagg_config.hostname, + disagg_serving_type=disagg_config.disagg_serving_type, + num_ctx_servers=disagg_config.num_ctx_servers, + num_gen_servers=disagg_config.num_gen_servers, + output_dir=output_dir, + ) + + def run_ex(self, commands) -> Dict[int, List[str]]: + """Run commands and collect outputs.""" + outputs = {} + + for server_idx in range(len(commands.server_cmds)): + try: + with io.StringIO() as buf: + with contextlib.redirect_stdout(buf): + server_outputs = commands.run_cmd(server_idx) + for output in server_outputs: + print(collect_and_clean_myelin_time(output)) + + # Check for errors in each output + for output in server_outputs: + self._check_benchmark_output_for_errors(output) + + print(buf.getvalue()) + + outputs[server_idx] = server_outputs + + except Exception as e: + print_error(f"Test command failed for server {server_idx}. Error: {e}") + if isinstance(e, subprocess.CalledProcessError): + print_error("--- stdout ---") + if e.stdout: + print_error(e.stdout.decode() if isinstance(e.stdout, bytes) else e.stdout) + print_error("--------------") + outputs[server_idx] = [] + + return outputs + + def _check_benchmark_output_for_errors(self, output: str) -> None: + """Check whether the benchmark output contains error messages.""" + if not output: + return + + # Check for non-zero failed requests + failed_requests_match = re.search(r"Failed requests:\s+(\d+)", output) + if failed_requests_match: + failed_count = int(failed_requests_match.group(1)) + if failed_count > 0: + print_error(f"Benchmark output contains {failed_count} failed requests.") + raise Exception(f"Benchmark has {failed_count} failed requests") + + # Check for explicit failure markers + if "!FAILED REQUESTS!" in output or "!CHECK LOG FOR ERRORS!" in output: + print_error("Benchmark output contains failure markers.") + raise Exception("Benchmark output contains failure markers") + + def get_perf_result(self, outputs: Dict[int, List[str]]): + """Parse performance results from outputs.""" + self._perf_results = {} + + for server_idx, server_outputs in outputs.items(): + self._perf_results[server_idx] = [] + + for output in server_outputs: + metrics = {} + for metric_type, regex in PERF_METRIC_LOG_QUERIES.items(): + regex_matches = [regex.search(line) for line in output.split("\n")] + for match in regex_matches: + if match: + value = None + for i in range(1, len(match.groups()) + 1): + if match.group(i) is not None: + value = match.group(i) + break + if value is not None: + metrics[metric_type] = float(value) + break + + self._perf_results[server_idx].append(metrics) + + # Also populate _test_results for upload (flattened view) + cmd_idx = 0 + for server_idx in sorted(self._perf_results.keys()): + for client_metrics in self._perf_results[server_idx]: + self._test_results[cmd_idx] = client_metrics + cmd_idx += 1 + + def upload_test_results_to_database(self): + """Upload test results and baseline to database.""" + + def add_prefix(key: str, prefix_name: str) -> str: + type_prefix = key[0:2] + rest = key[2:] + return f"{type_prefix}{prefix_name}_{rest}" + + def add_list_prefix(config_list: List, prefix_name: str) -> List: + return [add_prefix(key, prefix_name) for key in config_list] + + def add_dict_prefix(config_dict: dict, prefix_name: str) -> dict: + return {add_prefix(key, prefix_name): value for key, value in config_dict.items()} + + match_keys = [] + + if self.runtime == "aggr_server": + job_config = get_job_info() + job_config["s_gpu_type"] = self.gpu_type + is_post_merge = job_config["b_is_post_merge"] + new_data_dict = {} + cmd_idx = 0 + + for server_idx, client_configs in self.server_client_configs.items(): + server_config = self.server_configs[server_idx] + server_config_dict = server_config.to_db_data() + + for client_config in client_configs: + client_config_dict = client_config.to_db_data() + + # Skip if metrics missing + if cmd_idx not in self._test_results or not all( + metric_name in self._test_results[cmd_idx] + for metric_name in PERF_METRIC_LOG_QUERIES + ): + print_info( + f"Skipped posting command {cmd_idx}'s test results since some metrics are missing." + ) + cmd_idx += 1 + continue + + new_data = { + "s_runtime": "multi_node_aggr_server" + if server_config.gpus != server_config.gpus_per_node + else "aggr_server" + } + new_data.update(job_config) + new_data.update(server_config_dict) + new_data.update(client_config_dict) + + for metric_name in PERF_METRIC_LOG_QUERIES: + if metric_name in self._test_results[cmd_idx]: + new_data[f"d_{metric_name}"] = self._test_results[cmd_idx][metric_name] + + add_id(new_data) + new_data_dict[cmd_idx] = new_data + cmd_idx += 1 + + if not match_keys: + match_keys.append("s_runtime") + if server_config.match_mode == "scenario": + match_keys = SCENARIO_MATCH_FIELDS.copy() + else: + match_keys.extend(server_config.to_match_keys()) + match_keys.extend(client_config.to_match_keys()) + + elif self.runtime == "multi_node_disagg_server": + # Only BENCHMARK node uploads + if self.server_configs[0][2].disagg_serving_type != "BENCHMARK": + return + + job_config = get_job_info() + job_config["s_gpu_type"] = self.gpu_type + is_post_merge = job_config["b_is_post_merge"] + new_data_dict = {} + cmd_idx = 0 + + for server_idx, (ctx_config, gen_config, disagg_config) in enumerate( + self.server_configs + ): + for client_config in self.server_client_configs[server_idx]: + # Skip if metrics missing + if cmd_idx not in self._test_results or not all( + metric_name in self._test_results[cmd_idx] + for metric_name in PERF_METRIC_LOG_QUERIES + ): + print_info( + f"Skipped posting command {cmd_idx}'s test results since some metrics are missing." + ) + cmd_idx += 1 + continue + + # Get server configs with prefixed keys + ctx_server_config_dict = add_dict_prefix(ctx_config.to_db_data(), "ctx") + gen_server_config_dict = add_dict_prefix(gen_config.to_db_data(), "gen") + client_config_dict = client_config.to_db_data() + + num_ctx_servers = disagg_config.num_ctx_servers + num_gen_servers = disagg_config.num_gen_servers + + new_data = { + "s_runtime": "multi_node_disagg_server", + "s_benchmark_mode": disagg_config.mode, + "s_server_env_var": disagg_config.server_env_var, + "l_num_ctx_servers": num_ctx_servers, + "l_num_gen_servers": num_gen_servers, + } + new_data.update(job_config) + + if num_ctx_servers > 0: + new_data.update(ctx_server_config_dict) + if num_gen_servers > 0: + new_data.update(gen_server_config_dict) + new_data.update(client_config_dict) + + for metric_name in PERF_METRIC_LOG_QUERIES: + if metric_name in self._test_results[cmd_idx]: + new_data[f"d_{metric_name}"] = self._test_results[cmd_idx][metric_name] + + add_id(new_data) + new_data_dict[cmd_idx] = new_data + cmd_idx += 1 + + if not match_keys: + match_keys.extend(["s_runtime", "l_num_ctx_servers", "l_num_gen_servers"]) + if num_ctx_servers > 0: + match_keys.extend(add_list_prefix(ctx_config.to_match_keys(), "ctx")) + if num_gen_servers > 0: + match_keys.extend(add_list_prefix(gen_config.to_match_keys(), "gen")) + match_keys.extend(client_config.to_match_keys()) + else: + return + + if not new_data_dict: + print_info("No data to upload to database.") + return + + # Get history data for each cmd_idx + history_baseline_dict, history_data_dict = get_history_data( + new_data_dict, self.gpu_type, match_keys + ) + + # Prepare regressive test cases + regressive_data_list = prepare_regressive_test_cases(history_baseline_dict, new_data_dict) + + if is_post_merge: + # Prepare new baseline data for post-merge + new_baseline_data_dict = prepare_baseline_data( + history_baseline_dict, history_data_dict, new_data_dict + ) + else: + # Pre-merge does not need to upload baseline data + new_baseline_data_dict = None + + if self.upload_to_db: + # Upload the new perf data and baseline data to database + post_new_perf_data(new_baseline_data_dict, new_data_dict, regressive_data_list) + + perf_result_output_dir = os.path.join(self._output_dir, self._test_param_labels) + write_regressive_test_cases(regressive_data_list, new_data_dict, perf_result_output_dir) + + +# Perf sanity test case parameters +AGG_TEST_TYPES = ["aggr_upload", "aggr"] +DISAGG_TEST_TYPES = ["disagg_upload", "disagg"] + +AGGR_CONFIG_FOLDER = "tests/scripts/perf-sanity" +DISAGG_CONFIG_FOLDER = "tests/integration/defs/perf/disagg/test_configs/disagg/perf" + + +def get_server_config_count(yaml_path: str) -> int: + """Read a YAML file and return the number of server_configs.""" + try: + with open(yaml_path, "r") as f: + data = yaml.safe_load(f) + if data and "server_configs" in data: + return len(data["server_configs"]) + except Exception: + pass + return 0 + + +def get_yaml_files_with_counts(directory: str) -> Dict[str, int]: + """Scan directory for YAML files and return dict of {basename: server_config_count}.""" + yaml_files = glob.glob(os.path.join(directory, "*.yaml")) + result = {} + for yaml_path in sorted(yaml_files): + basename = os.path.splitext(os.path.basename(yaml_path))[0] + count = get_server_config_count(yaml_path) + result[basename] = count + return result + + +def get_aggr_test_cases() -> List[str]: + """Generate aggr test cases based on actual server_config counts in YAML files.""" + llm_root = get_llm_root() + aggr_config_dir = os.path.join(llm_root, AGGR_CONFIG_FOLDER) + yaml_counts = get_yaml_files_with_counts(aggr_config_dir) + + test_cases = [] + for config_yml, count in yaml_counts.items(): + for test_type in AGG_TEST_TYPES: + # Case without select_pattern + test_cases.append(f"{test_type}-{config_yml}") + + # Cases with single server index (1-based) + for server_idx in range(1, count + 1): + test_cases.append(f"{test_type}-{config_yml}-{server_idx}") + + # Cases with range indices + for start_idx in range(1, count + 1): + for end_idx in range(start_idx + 1, count + 1): + test_cases.append(f"{test_type}-{config_yml}-{start_idx}-{end_idx}") + + return test_cases + + +def get_disagg_test_cases() -> List[str]: + """Generate disagg test cases.""" + llm_root = get_llm_root() + disagg_config_dir = os.path.join(llm_root, DISAGG_CONFIG_FOLDER) + yaml_files = glob.glob(os.path.join(disagg_config_dir, "*.yaml")) + basenames = sorted([os.path.splitext(os.path.basename(f))[0] for f in yaml_files]) + + test_cases = [] + for config_yml in basenames: + for test_type in DISAGG_TEST_TYPES: + test_cases.append(f"{test_type}-{config_yml}") + + return test_cases + + +# Generate all test case combinations +# For aggr: {test_type}-{config_yml}, {test_type}-{config_yml}-{server_idx}, +# {test_type}-{config_yml}-{start_idx}-{end_idx} +# For disagg: {test_type}-{config_yml} +PERF_SANITY_TEST_CASES = ( + get_aggr_test_cases() + + get_disagg_test_cases() + + [ + "aggr_upload-config", + "disagg_upload-config", + "disagg_upload-config_3_nodes", + "disagg_upload-config_6_nodes", + ] +) + + +@pytest.mark.parametrize("perf_sanity_test_case", PERF_SANITY_TEST_CASES) +def test_e2e(output_dir, perf_sanity_test_case): + # Create config and parse test case name + config = PerfSanityTestConfig(perf_sanity_test_case, output_dir) + + # Parse config file to get server_configs and server_client_configs + config.parse_config_file() + + # Get commands + commands = config.get_commands() + + # Run commands and collect outputs + outputs = config.run_ex(commands) + + # For disagg mode, only BENCHMARK node parses results and uploads + if config.runtime == "multi_node_disagg_server": + disagg_config = config.server_configs[0][2] + if disagg_config.disagg_serving_type != "BENCHMARK": + print_info( + f"Disagg serving type is {disagg_config.disagg_serving_type}, skipping perf result parsing and upload." + ) + return + + # Parse performance results + config.get_perf_result(outputs) + + # Upload results to database + config.upload_test_results_to_database() diff --git a/tests/integration/defs/perf/utils.py b/tests/integration/defs/perf/utils.py index 9f2ed7bb32f..386138c0903 100644 --- a/tests/integration/defs/perf/utils.py +++ b/tests/integration/defs/perf/utils.py @@ -19,22 +19,17 @@ import os import re import subprocess -import time from datetime import datetime from enum import Enum from pathlib import Path from typing import Dict, List, NamedTuple, Optional -import requests -import yaml from _pytest.nodes import Item from _pytest.python import Function from defs.trt_test_alternative import (check_output, popen, print_error, print_info) from test_common.http_utils import wait_for_endpoint_ready -from tensorrt_llm._utils import get_free_port - from ..common import get_trt_llm_lib_dir, venv_mpi_check_output from ..local_venv import PythonVenvRunnerImpl from ..test_list_parser import parse_test_list @@ -243,55 +238,6 @@ def get_cmd_str(self, cmd_idx) -> List[str]: return cmd_str -class PerfAggrScriptTestCmds(NamedTuple): - server_cmds: List[List[str]] - client_cmds: List[List[str]] - names: List[str] - timeout: int - output_dir: str - - def run_cmd(self, cmd_idx: int, venv) -> str: - output = "" - server_proc = None - server_file_path = os.path.join( - self.output_dir, f"trtllm-serve.{self.names[cmd_idx]}.log") - client_file_path = os.path.join( - self.output_dir, f"trtllm-benchmark.{self.names[cmd_idx]}.log") - try: - server_hostname = "localhost" - server_port = get_free_port() - server_cmd = add_host_port_to_cmd(self.server_cmds[cmd_idx], - server_hostname, server_port) - print_info(f"Starting server. cmd is {server_cmd}") - with open(server_file_path, 'w') as server_ctx: - server_proc = subprocess.Popen( - server_cmd, - stdout=server_ctx, - stderr=subprocess.STDOUT, - env=copy.deepcopy(os.environ), - ) - wait_for_endpoint_ready( - f"http://{server_hostname}:{server_port}/health", - timeout=self.timeout) - client_cmd = add_host_port_to_cmd(self.client_cmds[cmd_idx], - server_hostname, server_port) - print_info(f"Starting client. cmd is {client_cmd}") - output = subprocess.check_output( - client_cmd, - stderr=subprocess.STDOUT, - env=copy.deepcopy(os.environ), - ).decode() - with open(client_file_path, 'w') as client_ctx: - client_ctx.write(output) - finally: - server_proc.terminate() - server_proc.wait() - return output - - def get_cmd_str(self, cmd_idx) -> List[str]: - return ["aggr_server tests, please check config files"] - - class PerfDisaggScriptTestCmds(NamedTuple): ctx_cmd: str gen_cmd: str @@ -341,249 +287,6 @@ def get_cmd_str(self, cmd_idx) -> List[str]: return ["disaggregated server tests, please check config files"] -class PerfMultiNodeDisaggScriptTestCmds(NamedTuple): - ctx_server_cmds: List[List[str]] - gen_server_cmds: List[List[str]] - disagg_server_cmds: List[List[str]] - benchmark_cmds: List[List[str]] - timeout: int - hostname: str - disagg_serving_type: str - num_ctx_servers: int - num_gen_servers: int - output_dir: str - - def _generate_hostname_file(self, cmd_idx: int, port: int): - # Create hostnames directory - hostnames_dir = os.path.join(self.output_dir, f"hostnames-{cmd_idx}") - if not os.path.exists(hostnames_dir): - os.makedirs(hostnames_dir, exist_ok=True) - hostname_file = os.path.join(hostnames_dir, - f"{self.disagg_serving_type}.txt") - with open(hostname_file, 'w') as f: - f.write(f"{self.hostname}:{port}") - - def _generate_disagg_server_config(self, cmd_idx: int, - disagg_server_port: int) -> str: - print_info( - f"Generating disagg server config for command index {cmd_idx}") - hostnames_folder = os.path.join(self.output_dir, f"hostnames-{cmd_idx}") - expected_count = self.num_ctx_servers + self.num_gen_servers - start_time = time.time() - hostnames = [] - while True: - elapsed_time = time.time() - start_time - print_info( - f"Waiting for hostnames in {hostnames_folder}, elapsed time: {elapsed_time}s, current: {len(hostnames)}, expected: {expected_count}" - ) - if elapsed_time > self.timeout: - print_error( - f"Time out. Hostnames files are not ready after {self.timeout}s" - ) - time.sleep(10) - if not os.path.exists(hostnames_folder): - continue - hostnames = os.listdir(hostnames_folder) - if len(hostnames) >= expected_count: - break - print_info( - f"All hostnames found in {hostnames_folder} after elapsed time: {elapsed_time}s" - ) - - # Read ctx and gen hostnames - ctx_hostnames = [] - gen_hostnames = [] - for hostname_file in hostnames: - hostname_file_path = os.path.join(hostnames_folder, hostname_file) - with open(hostname_file_path, 'r') as f: - hostname_port = f.read().strip() - hostname = hostname_port.split(":")[0] - port = hostname_port.split(":")[1] - print_info( - f"Hostname File: {hostname_file_path} Hostname: {hostname_port} Port: {port}" - ) - if hostname_file.startswith("CTX"): - ctx_hostnames.append(hostname_port) - elif hostname_file.startswith("GEN"): - gen_hostnames.append(hostname_port) - - server_config = { - 'hostname': self.hostname, - 'port': disagg_server_port, - 'backend': 'pytorch', - 'context_servers': { - 'num_instances': self.num_ctx_servers, - 'urls': ctx_hostnames, - }, - 'generation_servers': { - 'num_instances': self.num_gen_servers, - 'urls': gen_hostnames, - } - } - config_path = os.path.join(self.output_dir, - f"server_config.{cmd_idx}.yaml") - with open(config_path, 'w') as f: - yaml.dump(server_config, f) - print_info(f"Server config file {config_path} generated") - return config_path - - def _get_disagg_server_hostname_and_port(self, cmd_idx: int) -> tuple: - config_path = os.path.join(self.output_dir, - f"server_config.{cmd_idx}.yaml") - start_time = time.time() - while True: - if os.path.exists(config_path): - print_info(f"Server config file found: {config_path}") - break - elapsed_time = time.time() - start_time - if elapsed_time > self.timeout: - print_error( - f"Server config file {config_path} not found after {self.timeout}s" - ) - print_info( - f"Waiting for server config file, elapsed time: {elapsed_time}s" - ) - time.sleep(10) # Check every 10 seconds - - # Read server config to get hostname and port - with open(config_path, 'r') as f: - server_config = yaml.safe_load(f) - disagg_server_hostname = server_config['hostname'] - disagg_server_port = server_config['port'] - return disagg_server_hostname, disagg_server_port - - def wait_for_benchmark_ready(self, - benchmark_status_file: str, - timeout: int = 7200): - start_time = time.time() - while True: - if os.path.exists(benchmark_status_file): - print_info( - f"Benchmark status file found, terminating server {self.disagg_serving_type}" - ) - break - elapsed_time = time.time() - start_time - print_info( - f"Waiting for benchmark status file, elapsed time: {elapsed_time}s" - ) - if elapsed_time > timeout: - print_error( - f"Timeout waiting for benchmark status file after {timeout}s, terminating server {self.disagg_serving_type}" - ) - break - time.sleep(10) # Check every 10 seconds - - def wait_for_endpoint_ready(self, url: str, timeout: int = 7200): - start = time.monotonic() - while True: - elapsed_time = time.monotonic() - start - if elapsed_time > timeout: - print_error( - f"Timeout waiting for endpoint {url} to be ready after {timeout} seconds" - ) - break - print_info( - f"Waiting for endpoint {url} to be ready, elapsed time: {elapsed_time}s" - ) - try: - time.sleep(10) - if requests.get(url).status_code == 200: - print_info(f"endpoint {url} is ready") - return - except Exception as err: - print_info( - f"endpoint {url} is not ready, with exception: {err}") - print_error( - f"Endpoint {url} did not become ready within {timeout} seconds") - - def run_cmd(self, cmd_idx: int, venv) -> str: - output = "" - server_proc = None - benchmark_status_file = os.path.join(self.output_dir, - f"benchmark_status.{cmd_idx}.txt") - port = get_free_port() - if "CTX" in self.disagg_serving_type or "GEN" in self.disagg_serving_type: - self._generate_hostname_file(cmd_idx, port) - server_file_path = os.path.join( - self.output_dir, - f"trtllm-serve.{cmd_idx}.{self.disagg_serving_type}.log") - is_ctx = "CTX" in self.disagg_serving_type - server_cmd = self.ctx_server_cmds[ - cmd_idx] if is_ctx else self.gen_server_cmds[cmd_idx] - server_cmd = add_host_port_to_cmd(server_cmd, self.hostname, port) - try: - print_info( - f"Starting server. disagg_serving_type: {self.disagg_serving_type} cmd is {server_cmd}" - ) - with open(server_file_path, 'w') as server_ctx: - server_proc = subprocess.Popen( - server_cmd, - stdout=server_ctx, - stderr=subprocess.STDOUT, - env=copy.deepcopy(os.environ), - ) - self.wait_for_benchmark_ready(benchmark_status_file, - timeout=self.timeout) - finally: - print_info(f"Server {self.disagg_serving_type} stopped") - server_proc.terminate() - server_proc.wait() - elif self.disagg_serving_type == "DISAGG_SERVER": - disagg_server_file_path = os.path.join( - self.output_dir, - f"trtllm-serve.{cmd_idx}.{self.disagg_serving_type}.log") - disagg_server_cmd = self.disagg_server_cmds[cmd_idx] - try: - self._generate_disagg_server_config(cmd_idx, port) - print_info( - f"Starting disagg server. disagg_serving_type: {self.disagg_serving_type} disagg server cmd is {disagg_server_cmd}" - ) - with open(disagg_server_file_path, 'w') as disagg_server_ctx: - disagg_server_proc = subprocess.Popen( - disagg_server_cmd, - stdout=disagg_server_ctx, - stderr=subprocess.STDOUT, - env=copy.deepcopy(os.environ), - ) - self.wait_for_benchmark_ready(benchmark_status_file, - timeout=self.timeout) - finally: - print_info(f"Disagg server {self.disagg_serving_type} stopped") - disagg_server_proc.terminate() - disagg_server_proc.wait() - elif self.disagg_serving_type == "BENCHMARK": - benchmark_file_path = os.path.join( - self.output_dir, f"trtllm-benchmark.{cmd_idx}.log") - try: - disagg_server_hostname, disagg_server_port = self._get_disagg_server_hostname_and_port( - cmd_idx) - benchmark_cmd = add_host_port_to_cmd( - self.benchmark_cmds[cmd_idx], disagg_server_hostname, - disagg_server_port) - self.wait_for_endpoint_ready( - f"http://{disagg_server_hostname}:{disagg_server_port}/health", - timeout=self.timeout, - ) - print_info( - f"Starting benchmark. disagg_serving_type: {self.disagg_serving_type} benchmark cmd is {benchmark_cmd}" - ) - output = subprocess.check_output( - benchmark_cmd, - env=copy.deepcopy(os.environ), - stderr=subprocess.STDOUT).decode() - with open(benchmark_file_path, 'w') as benchmark_ctx: - benchmark_ctx.write(output) - finally: - with open(benchmark_status_file, 'w') as status_file: - status_file.write("Done") - return output - - def get_cmd_str(self, cmd_idx) -> List[str]: - return [ - "multi-node disaggregated server tests, please check config files" - ] - - class AbstractPerfScriptTestClass(abc.ABC): """ Abstract class for all script-based perf tests. @@ -715,14 +418,6 @@ def run_ex(self, cmd_str = commands.get_cmd_str(cmd_idx) is_prepare_dataset_cmd = 'prepare_dataset' in cmd_str or "prepare-dataset" in cmd_str - is_perf_sanity_test = "perf_sanity" in full_test_name - - is_disagg_server = False - if self._config.runtime == "multi_node_disagg_server": - disagg_serving_type = self._config.disagg_configs[0][ - 'disagg_serving_type'] - is_disagg_server = disagg_serving_type != "BENCHMARK" - # Start the timer. self._start_timestamp = datetime.utcnow() try: @@ -730,8 +425,7 @@ def run_ex(self, # Capture the stdout from _gpu_clock_lock because the pipeline JUnit update script tries to parse # the log to find the GPU clocks. with io.StringIO() as buf: - # Perf-sanity test doesn't lock gpu clock - if self._gpu_clock_lock and not is_perf_sanity_test: + if self._gpu_clock_lock: # Lock GPU clock and start monitoring. with contextlib.redirect_stdout( buf), self._gpu_clock_lock, tmpDir: @@ -746,7 +440,7 @@ def run_ex(self, print(collect_and_clean_myelin_time(output)) # Check whether output has error message - if not is_prepare_dataset_cmd and is_perf_sanity_test: + if not is_prepare_dataset_cmd: self._check_benchmark_output_for_errors(output) # Print the output log to stdout and cache it. @@ -793,10 +487,6 @@ def run_ex(self, f"skip writing perf result when calling generating dataset in trtllm-bench." ) outputs.pop(cmd_idx) - elif is_disagg_server: - print_info( - f"skip writing perf result when running disagg's worker or server." - ) else: self._perf_result = self.get_perf_result(outputs) @@ -818,11 +508,6 @@ def _write_result(self, full_test_name: str, Store the test results in the _test_results. Write the test results and GPU monitoring data to the output csv and/or yaml files. """ - # Store the test result - if cmd_idx not in self._test_results: - self._test_results[cmd_idx] = {} - self._test_results[cmd_idx][metric_type] = self._perf_result - # Get GPU monitoring data self._gpu_monitor_data = self._gpu_clock_lock.get_state_data() if not self._gpu_monitor_data: diff --git a/tests/integration/test_lists/test-db/l0_dgx_b200_perf_sanity.yml b/tests/integration/test_lists/test-db/l0_dgx_b200_perf_sanity.yml index 4bf4f6ce67d..5f28deb1f88 100644 --- a/tests/integration/test_lists/test-db/l0_dgx_b200_perf_sanity.yml +++ b/tests/integration/test_lists/test-db/l0_dgx_b200_perf_sanity.yml @@ -15,9 +15,7 @@ l0_dgx_b200_perf_sanity: backend: pytorch orchestrator: mpi tests: - - perf/test_perf.py::test_perf[perf_sanity_aggr_upload-deepseek_r1_fp8_blackwell-r1_fp8_dep8_mtp1_1k1k] TIMEOUT (180) - - perf/test_perf.py::test_perf[perf_sanity_aggr_upload-deepseek_r1_fp8_blackwell-r1_fp8_tep8_mtp3_1k1k] TIMEOUT (180) - - perf/test_perf.py::test_perf[perf_sanity_aggr_upload-deepseek_r1_fp8_blackwell-r1_fp8_tp8_mtp3_1k1k] TIMEOUT (180) + - perf/test_perf_sanity.py::test_e2e[aggr_upload-deepseek_r1_fp8_blackwell-1-3] TIMEOUT (180) - condition: ranges: @@ -34,8 +32,4 @@ l0_dgx_b200_perf_sanity: backend: pytorch orchestrator: mpi tests: - - perf/test_perf.py::test_perf[perf_sanity_aggr_upload-deepseek_r1_fp4_v2_blackwell-r1_fp4_v2_dep4_mtp1_1k1k] TIMEOUT (180) - - perf/test_perf.py::test_perf[perf_sanity_aggr_upload-deepseek_r1_fp4_v2_blackwell-r1_fp4_v2_tep4_mtp3_1k1k] TIMEOUT (180) - - perf/test_perf.py::test_perf[perf_sanity_aggr_upload-deepseek_r1_fp4_v2_blackwell-r1_fp4_v2_tp4_mtp3_1k1k] TIMEOUT (180) - - perf/test_perf.py::test_perf[perf_sanity_aggr_upload-gpt_oss_fp4_blackwell-gpt_oss_fp4_dep2,gpt_oss_fp4_dep4] TIMEOUT (180) - - perf/test_perf.py::test_perf[perf_sanity_aggr_upload-gpt_oss_fp4_blackwell-gpt_oss_fp4_tp4_eagle3] TIMEOUT (180) + - perf/test_perf_sanity.py::test_e2e[aggr_upload-deepseek_r1_fp4_v2_blackwell-1-3] TIMEOUT (180) diff --git a/tests/integration/test_lists/test-db/l0_dgx_b300_perf_sanity.yml b/tests/integration/test_lists/test-db/l0_dgx_b300_perf_sanity.yml index d90907d9b40..d7beb771f1e 100644 --- a/tests/integration/test_lists/test-db/l0_dgx_b300_perf_sanity.yml +++ b/tests/integration/test_lists/test-db/l0_dgx_b300_perf_sanity.yml @@ -16,9 +16,7 @@ l0_dgx_b300_perf_sanity: backend: pytorch orchestrator: mpi tests: - - perf/test_perf.py::test_perf[perf_sanity_aggr_upload-deepseek_r1_fp8_blackwell-r1_fp8_dep8_mtp1_1k1k] TIMEOUT (180) - - perf/test_perf.py::test_perf[perf_sanity_aggr_upload-deepseek_r1_fp8_blackwell-r1_fp8_tep8_mtp3_1k1k] TIMEOUT (180) - - perf/test_perf.py::test_perf[perf_sanity_aggr_upload-deepseek_r1_fp8_blackwell-r1_fp8_tp8_mtp3_1k1k] TIMEOUT (180) + - perf/test_perf_sanity.py::test_e2e[aggr_upload-deepseek_r1_fp8_blackwell-1-3] TIMEOUT (180) - condition: ranges: @@ -36,6 +34,4 @@ l0_dgx_b300_perf_sanity: backend: pytorch orchestrator: mpi tests: - - perf/test_perf.py::test_perf[perf_sanity_aggr_upload-deepseek_r1_fp4_v2_blackwell-r1_fp4_v2_dep4_mtp1_1k1k] TIMEOUT (180) - - perf/test_perf.py::test_perf[perf_sanity_aggr_upload-deepseek_r1_fp4_v2_blackwell-r1_fp4_v2_tep4_mtp3_1k1k] TIMEOUT (180) - - perf/test_perf.py::test_perf[perf_sanity_aggr_upload-deepseek_r1_fp4_v2_blackwell-r1_fp4_v2_tp4_mtp3_1k1k] TIMEOUT (180) + - perf/test_perf_sanity.py::test_e2e[aggr_upload-deepseek_r1_fp4_v2_blackwell-1-3] TIMEOUT (180) diff --git a/tests/integration/test_lists/test-db/l0_gb200_multi_gpus_perf_sanity.yml b/tests/integration/test_lists/test-db/l0_gb200_multi_gpus_perf_sanity.yml index e06e1877250..06265a5ba58 100644 --- a/tests/integration/test_lists/test-db/l0_gb200_multi_gpus_perf_sanity.yml +++ b/tests/integration/test_lists/test-db/l0_gb200_multi_gpus_perf_sanity.yml @@ -14,6 +14,9 @@ l0_gb200_multi_gpus_perf_sanity: stage: post_merge backend: pytorch tests: - - perf/test_perf.py::test_perf[perf_sanity_aggr_upload-deepseek_r1_fp4_v2_grace_blackwell-r1_fp4_v2_dep4_mtp1_1k1k] - - perf/test_perf.py::test_perf[perf_sanity_aggr_upload-deepseek_r1_fp4_v2_grace_blackwell-r1_fp4_v2_tep4_mtp3_1k1k] - - perf/test_perf.py::test_perf[perf_sanity_aggr_upload-deepseek_r1_fp4_v2_grace_blackwell-r1_fp4_v2_tp4_mtp3_1k1k] + - perf/test_perf_sanity.py::test_e2e[aggr_upload-deepseek_r1_fp4_v2_grace_blackwell-1-3] TIMEOUT (180) + - perf/test_perf_sanity.py::test_e2e[aggr_upload-deepseek_r1_fp4_v2_grace_blackwell-4-6] TIMEOUT (180) + - perf/test_perf_sanity.py::test_e2e[aggr_upload-deepseek_r1_fp4_v2_grace_blackwell-7-9] TIMEOUT (180) + - perf/test_perf_sanity.py::test_e2e[aggr_upload-gpt_oss_120b_fp4_grace_blackwell-1-2] TIMEOUT (180) + - perf/test_perf_sanity.py::test_e2e[aggr_upload-gpt_oss_120b_fp4_grace_blackwell-3-4] TIMEOUT (180) + - perf/test_perf_sanity.py::test_e2e[aggr_upload-gpt_oss_120b_fp4_grace_blackwell-5] TIMEOUT (180) diff --git a/tests/integration/test_lists/test-db/l0_gb200_multi_nodes_aggr_perf_sanity_2_nodes_001.yml b/tests/integration/test_lists/test-db/l0_gb200_multi_nodes_aggr_perf_sanity_2_nodes.yml similarity index 50% rename from tests/integration/test_lists/test-db/l0_gb200_multi_nodes_aggr_perf_sanity_2_nodes_001.yml rename to tests/integration/test_lists/test-db/l0_gb200_multi_nodes_aggr_perf_sanity_2_nodes.yml index ad69e70c867..31302a8f2a0 100644 --- a/tests/integration/test_lists/test-db/l0_gb200_multi_nodes_aggr_perf_sanity_2_nodes_001.yml +++ b/tests/integration/test_lists/test-db/l0_gb200_multi_nodes_aggr_perf_sanity_2_nodes.yml @@ -1,5 +1,5 @@ version: 0.0.1 -l0_gb200_multi_nodes_aggr_perf_sanity_2_nodes_001: +l0_gb200_multi_nodes_aggr_perf_sanity_2_nodes: - condition: ranges: # 2 nodes with each node has 4 GPUs @@ -13,4 +13,5 @@ l0_gb200_multi_nodes_aggr_perf_sanity_2_nodes_001: stage: post_merge backend: pytorch tests: - - perf/test_perf.py::test_perf[perf_sanity_aggr_upload-deepseek_r1_fp4_v2_2_nodes_grace_blackwell-r1_fp4_v2_dep8_mtp1] + - perf/test_perf_sanity.py::test_e2e[aggr_upload-deepseek_r1_fp4_v2_2_nodes_grace_blackwell-1] + - perf/test_perf_sanity.py::test_e2e[aggr_upload-deepseek_r1_fp4_v2_2_nodes_grace_blackwell-2] diff --git a/tests/integration/test_lists/test-db/l0_gb200_multi_nodes_disagg_perf_sanity_3_nodes_001.yml b/tests/integration/test_lists/test-db/l0_gb200_multi_nodes_disagg_perf_sanity_3_nodes.yml similarity index 56% rename from tests/integration/test_lists/test-db/l0_gb200_multi_nodes_disagg_perf_sanity_3_nodes_001.yml rename to tests/integration/test_lists/test-db/l0_gb200_multi_nodes_disagg_perf_sanity_3_nodes.yml index 456bb7a48ed..eb0aeebd900 100644 --- a/tests/integration/test_lists/test-db/l0_gb200_multi_nodes_disagg_perf_sanity_3_nodes_001.yml +++ b/tests/integration/test_lists/test-db/l0_gb200_multi_nodes_disagg_perf_sanity_3_nodes.yml @@ -1,5 +1,5 @@ version: 0.0.1 -l0_gb200_multi_nodes_disagg_perf_sanity_3_nodes_001: +l0_gb200_multi_nodes_disagg_perf_sanity_3_nodes: - condition: ranges: # 3 nodes with each node has 4 GPUs @@ -13,4 +13,4 @@ l0_gb200_multi_nodes_disagg_perf_sanity_3_nodes_001: stage: post_merge backend: pytorch tests: - - perf/test_perf.py::test_perf[perf_sanity_disagg_upload-deepseek-r1-fp4_1k1k_ctx1_gen1_dep8_bs768_eplb0_mtp0_ccb-UCX] TIMEOUT (180) + - perf/test_perf_sanity.py::test_e2e[disagg_upload-deepseek-r1-fp4_1k1k_ctx1_gen1_dep8_bs768_eplb0_mtp0_ccb-UCX] TIMEOUT (90) diff --git a/tests/integration/test_lists/test-db/l0_gb200_multi_nodes_disagg_perf_sanity_6_nodes.yml b/tests/integration/test_lists/test-db/l0_gb200_multi_nodes_disagg_perf_sanity_6_nodes.yml new file mode 100644 index 00000000000..55ad5690c65 --- /dev/null +++ b/tests/integration/test_lists/test-db/l0_gb200_multi_nodes_disagg_perf_sanity_6_nodes.yml @@ -0,0 +1,17 @@ +version: 0.0.1 +l0_gb200_multi_nodes_disagg_perf_sanity_6_nodes: +- condition: + ranges: + # 6 nodes with each node has 4 GPUs + system_gpu_count: + gte: 24 + lte: 24 + wildcards: + gpu: + - '*gb200*' + terms: + stage: post_merge + backend: pytorch + tests: + - perf/test_perf_sanity.py::test_e2e[disagg_upload-deepseek-r1-fp4_1k1k_ctx2_gen1_dep16_bs128_eplb0_mtp3_ccb-UCX] TIMEOUT (90) + - perf/test_perf_sanity.py::test_e2e[disagg_upload-deepseek-r1-fp4_1k1k_ctx2_gen1_dep16_bs128_eplb0_mtp3_ccb-NIXL] TIMEOUT (90) diff --git a/tests/integration/test_lists/test-db/l0_gb200_multi_nodes_disagg_perf_sanity_6_nodes_001.yml b/tests/integration/test_lists/test-db/l0_gb200_multi_nodes_disagg_perf_sanity_6_nodes_001.yml deleted file mode 100644 index 3e34d0cb219..00000000000 --- a/tests/integration/test_lists/test-db/l0_gb200_multi_nodes_disagg_perf_sanity_6_nodes_001.yml +++ /dev/null @@ -1,16 +0,0 @@ -version: 0.0.1 -l0_gb200_multi_nodes_disagg_perf_sanity_6_nodes_001: -- condition: - ranges: - # 6 nodes with each node has 4 GPUs - system_gpu_count: - gte: 24 - lte: 24 - wildcards: - gpu: - - '*gb200*' - terms: - stage: post_merge - backend: pytorch - tests: - - perf/test_perf.py::test_perf[perf_sanity_disagg_upload-deepseek-r1-fp4_1k1k_ctx2_gen1_dep16_bs128_eplb0_mtp3_ccb-UCX] TIMEOUT (180) diff --git a/tests/integration/test_lists/test-db/l0_gb200_multi_nodes_disagg_perf_sanity_6_nodes_002.yml b/tests/integration/test_lists/test-db/l0_gb200_multi_nodes_disagg_perf_sanity_6_nodes_002.yml deleted file mode 100644 index 273790a2180..00000000000 --- a/tests/integration/test_lists/test-db/l0_gb200_multi_nodes_disagg_perf_sanity_6_nodes_002.yml +++ /dev/null @@ -1,16 +0,0 @@ -version: 0.0.1 -l0_gb200_multi_nodes_disagg_perf_sanity_6_nodes_002: -- condition: - ranges: - # 6 nodes with each node has 4 GPUs - system_gpu_count: - gte: 24 - lte: 24 - wildcards: - gpu: - - '*gb200*' - terms: - stage: post_merge - backend: pytorch - tests: - - perf/test_perf.py::test_perf[perf_sanity_disagg_upload-deepseek-r1-fp4_1k1k_ctx2_gen1_dep16_bs128_eplb0_mtp3_ccb-NIXL] TIMEOUT (180) diff --git a/tests/integration/test_lists/test-db/l0_gb200_multi_nodes_disagg_perf_sanity_8_nodes_001.yml b/tests/integration/test_lists/test-db/l0_gb200_multi_nodes_disagg_perf_sanity_8_nodes.yml similarity index 56% rename from tests/integration/test_lists/test-db/l0_gb200_multi_nodes_disagg_perf_sanity_8_nodes_001.yml rename to tests/integration/test_lists/test-db/l0_gb200_multi_nodes_disagg_perf_sanity_8_nodes.yml index b4784d07368..196c76a6697 100644 --- a/tests/integration/test_lists/test-db/l0_gb200_multi_nodes_disagg_perf_sanity_8_nodes_001.yml +++ b/tests/integration/test_lists/test-db/l0_gb200_multi_nodes_disagg_perf_sanity_8_nodes.yml @@ -1,5 +1,5 @@ version: 0.0.1 -l0_gb200_multi_nodes_disagg_perf_sanity_8_nodes_001: +l0_gb200_multi_nodes_disagg_perf_sanity_8_nodes: - condition: ranges: # 8 nodes with each node has 4 GPUs @@ -13,4 +13,4 @@ l0_gb200_multi_nodes_disagg_perf_sanity_8_nodes_001: stage: post_merge backend: pytorch tests: - - perf/test_perf.py::test_perf[perf_sanity_disagg_upload-deepseek-r1-fp4_8k1k_ctx1_gen1_dep32_bs128_eplb0_mtp3_ccb-UCX] TIMEOUT (180) + - perf/test_perf_sanity.py::test_e2e[disagg_upload-deepseek-r1-fp4_8k1k_ctx1_gen1_dep32_bs128_eplb0_mtp3_ccb-UCX] TIMEOUT (90) diff --git a/tests/scripts/perf-sanity/deepseek_r1_fp4_v2_2_nodes_grace_blackwell.yaml b/tests/scripts/perf-sanity/deepseek_r1_fp4_v2_2_nodes_grace_blackwell.yaml index 1a5c5e5212b..ea29a5fecac 100644 --- a/tests/scripts/perf-sanity/deepseek_r1_fp4_v2_2_nodes_grace_blackwell.yaml +++ b/tests/scripts/perf-sanity/deepseek_r1_fp4_v2_2_nodes_grace_blackwell.yaml @@ -37,6 +37,7 @@ server_configs: osl: 1024 random_range_ratio: 0.2 backend: "openai" + - name: "r1_fp4_v2_tep8_mtp3" model_name: "deepseek_r1_0528_fp4_v2" trust_remote_code: true diff --git a/tests/scripts/perf-sanity/gpt_oss_120b_fp4_blackwell.yaml b/tests/scripts/perf-sanity/gpt_oss_120b_fp4_grace_blackwell.yaml similarity index 57% rename from tests/scripts/perf-sanity/gpt_oss_120b_fp4_blackwell.yaml rename to tests/scripts/perf-sanity/gpt_oss_120b_fp4_grace_blackwell.yaml index 1696347f0fd..dc464cfbd9e 100644 --- a/tests/scripts/perf-sanity/gpt_oss_120b_fp4_blackwell.yaml +++ b/tests/scripts/perf-sanity/gpt_oss_120b_fp4_grace_blackwell.yaml @@ -4,6 +4,37 @@ metadata: - B200 - B300 server_configs: + - name: "gpt_oss_fp4_dep4_1k8k" + model_name: "gpt_oss_120b_fp4" + tensor_parallel_size: 4 + moe_expert_parallel_size: 4 + pipeline_parallel_size: 1 + max_batch_size: 640 + max_num_tokens: 20000 + attn_backend: "TRTLLM" + enable_attention_dp: true + attention_dp_config: + enable_balance: true + moe_config: + backend: 'TRTLLM' + cuda_graph_config: + enable_padding: true + max_batch_size: 640 + kv_cache_config: + dtype: 'fp8' + enable_block_reuse: false + free_gpu_memory_fraction: 0.8 + num_postprocess_workers: 4 + stream_interval: 20 + client_configs: + - name: "con2560_iter5_1k8k" + concurrency: 2560 + iterations: 5 + isl: 1024 + osl: 8192 + random_range_ratio: 0.8 + backend: "openai" + - name: "gpt_oss_fp4_dep2_1k1k" model_name: "gpt_oss_120b_fp4" tensor_parallel_size: 2 @@ -29,28 +60,26 @@ server_configs: client_configs: - name: "con2048_iter5_1k1k" concurrency: 2048 - iterations: 5 + iterations: 10 isl: 1024 osl: 1024 - random_range_ratio: 0.2 + random_range_ratio: 0.8 backend: "openai" - - name: "gpt_oss_fp4_dep4_1k1k" + - name: "gpt_oss_fp4_tep2_1k8k" model_name: "gpt_oss_120b_fp4" - tensor_parallel_size: 4 - moe_expert_parallel_size: 4 + tensor_parallel_size: 2 + moe_expert_parallel_size: 2 pipeline_parallel_size: 1 - max_batch_size: 512 + max_batch_size: 128 max_num_tokens: 20000 attn_backend: "TRTLLM" - enable_attention_dp: true - attention_dp_config: - enable_balance: true + enable_attention_dp: false moe_config: backend: 'TRTLLM' cuda_graph_config: enable_padding: true - max_batch_size: 512 + max_batch_size: 128 kv_cache_config: dtype: 'fp8' enable_block_reuse: false @@ -58,12 +87,41 @@ server_configs: num_postprocess_workers: 4 stream_interval: 20 client_configs: - - name: "con2048_iter5_1k1k" - concurrency: 2048 - iterations: 5 + - name: "con128_iter10_1k8k" + concurrency: 128 + iterations: 10 isl: 1024 - osl: 1024 - random_range_ratio: 0.2 + osl: 8192 + random_range_ratio: 0.8 + backend: "openai" + + - name: "gpt_oss_fp4_tp2_1k8k" + model_name: "gpt_oss_120b_fp4" + tensor_parallel_size: 2 + moe_expert_parallel_size: 1 + pipeline_parallel_size: 1 + max_batch_size: 8 + max_num_tokens: 20000 + attn_backend: "TRTLLM" + enable_attention_dp: false + moe_config: + backend: 'TRTLLM' + cuda_graph_config: + enable_padding: true + max_batch_size: 8 + kv_cache_config: + dtype: 'fp8' + enable_block_reuse: false + free_gpu_memory_fraction: 0.8 + num_postprocess_workers: 4 + stream_interval: 20 + client_configs: + - name: "con8_iter10_1k8k" + concurrency: 8 + iterations: 10 + isl: 1024 + osl: 8192 + random_range_ratio: 0.8 backend: "openai" - name: "gpt_oss_fp4_tp4_eagle3_1k1k"