diff --git a/benchmarks/profiler/deploy/profile_sla_moe_dgdr.yaml b/benchmarks/profiler/deploy/profile_sla_moe_dgdr.yaml index 0a54963a16..58a7893d32 100644 --- a/benchmarks/profiler/deploy/profile_sla_moe_dgdr.yaml +++ b/benchmarks/profiler/deploy/profile_sla_moe_dgdr.yaml @@ -15,10 +15,6 @@ spec: profilingConfig: profilerImage: "nvcr.io/nvidia/ai-dynamo/sglang-runtime:0.6.1" config: - # Engine configuration - engine: - is_moe_model: true # Enable MoE model support (uses TEP/DEP instead of TP) - # Sweep/profiling configuration sweep: # Standard online profiling (not using AI Configurator) diff --git a/benchmarks/profiler/profile_endpoint.py b/benchmarks/profiler/profile_endpoint.py index 63f0daf0d9..e850a7a86b 100644 --- a/benchmarks/profiler/profile_endpoint.py +++ b/benchmarks/profiler/profile_endpoint.py @@ -5,6 +5,7 @@ import logging import os +from benchmarks.profiler.utils.defaults import EngineType from benchmarks.profiler.utils.profile_decode import profile_decode from benchmarks.profiler.utils.profile_prefill import profile_prefill @@ -91,7 +92,11 @@ os.makedirs(args.work_dir, exist_ok=True) if args.tokenizer_path == "": args.tokenizer_path = args.model_name - if args.mode == "prefill": + + # Convert string mode to EngineType + mode = EngineType(args.mode) + + if mode == EngineType.PREFILL: profile_prefill( args.work_dir, args.model_name, @@ -101,7 +106,7 @@ args.max_context_length, args.interpolation_granularity, ) - elif args.mode == "decode": + elif mode == EngineType.DECODE: assert args.max_kv_tokens > 0, "max_kv_tokens must be provided for decode" profile_decode( args.work_dir, @@ -115,4 +120,4 @@ args.attention_dp_size, ) else: - raise ValueError(f"Invalid mode: {args.mode}") + raise ValueError(f"Invalid mode: {mode}") diff --git a/benchmarks/profiler/profile_sla.py b/benchmarks/profiler/profile_sla.py index aa7ef2cce5..5ac710c2c0 100644 --- a/benchmarks/profiler/profile_sla.py +++ b/benchmarks/profiler/profile_sla.py @@ -17,12 +17,19 @@ import logging import math import os +from dataclasses import dataclass, field import numpy as np import yaml from benchmarks.profiler.utils.aiperf import benchmark_decode, benchmark_prefill from benchmarks.profiler.utils.config_modifiers import CONFIG_MODIFIERS +from benchmarks.profiler.utils.config_modifiers.parallelization_mapping import ( + ParallelizationMapping, + apply_parallel_mapping_to_config, + get_candidate_parallel_mappings, +) +from benchmarks.profiler.utils.defaults import EngineType from benchmarks.profiler.utils.dgd_generation import generate_dgd_config_with_planner from benchmarks.profiler.utils.estimate_perf import AIConfiguratorPerfEstimator from benchmarks.profiler.utils.plot import ( @@ -30,12 +37,6 @@ plot_pd_joint_results, plot_prefill_performance, ) -from benchmarks.profiler.utils.profile_cache import ( - check_decode_results_exist, - check_prefill_results_exist, - load_existing_decode_results, - load_existing_prefill_results, -) from benchmarks.profiler.utils.profile_decode import ( get_num_request_range, profile_decode, @@ -52,6 +53,65 @@ ) from dynamo.planner.defaults import WORKER_COMPONENT_NAMES + +@dataclass +class PrefillProfileData: + """Container for prefill profiling results.""" + + num_gpus: list[int] = field(default_factory=list) + ttft: list[float] = field(default_factory=list) + thpt_per_gpu: list[float] = field(default_factory=list) + parallel_mapping_labels: list[str] = field(default_factory=list) + parallel_mappings: list[ParallelizationMapping] = field(default_factory=list) + + def add_data( + self, + num_gpus: int, + ttft: float, + thpt_per_gpu: float, + parallel_mapping_label: str, + parallel_mapping: ParallelizationMapping, + ) -> None: + """Add a complete data point to the profile data.""" + self.num_gpus.append(num_gpus) + self.ttft.append(ttft) + self.thpt_per_gpu.append(thpt_per_gpu) + self.parallel_mapping_labels.append(parallel_mapping_label) + self.parallel_mappings.append(parallel_mapping) + + +@dataclass +class DecodeProfileData: + """Container for decode profiling results.""" + + num_gpus: list[int] = field(default_factory=list) + itl: list[float] = field(default_factory=list) + thpt_per_gpu: list[float] = field(default_factory=list) + concurrency: list[int] = field(default_factory=list) + kv_cache_size: list[int] = field(default_factory=list) + parallel_mapping_labels: list[str] = field(default_factory=list) + parallel_mappings: list[ParallelizationMapping] = field(default_factory=list) + + def add_data( + self, + num_gpus: int, + itl: float, + thpt_per_gpu: float, + concurrency: int, + kv_cache_size: int, + parallel_mapping_label: str, + parallel_mapping: ParallelizationMapping, + ) -> None: + """Add a complete data point to the profile data.""" + self.num_gpus.append(num_gpus) + self.itl.append(itl) + self.thpt_per_gpu.append(thpt_per_gpu) + self.concurrency.append(concurrency) + self.kv_cache_size.append(kv_cache_size) + self.parallel_mapping_labels.append(parallel_mapping_label) + self.parallel_mappings.append(parallel_mapping) + + logger = logging.getLogger(__name__) logger.setLevel(logging.INFO) console_handler = logging.StreamHandler() @@ -73,7 +133,7 @@ async def run_profile(args): try: # Log MoE model support - if args.is_moe_model: + if args.model_info.is_moe: logger.info( "MoE (Mixture of Experts) model profiling, sweeping TEP size for prefill and DEP size for decode" ) @@ -102,28 +162,7 @@ async def run_profile(args): for i in range(int(math.log2(args.max_num_gpus_per_engine)) + 1) if args.min_num_gpus_per_engine <= 2**i <= args.max_num_gpus_per_engine ] - if args.is_moe_model: - # Filter GPU counts to only include divisors of num_experts - if hasattr(args, "num_experts") and args.num_experts is not None: - original_counts = profile_num_gpus.copy() - profile_num_gpus = [ - gpu_count - for gpu_count in profile_num_gpus - if args.num_experts % gpu_count == 0 - ] - if not profile_num_gpus: - error_msg = ( - f"No valid GPU counts found that divide evenly into num_experts={args.num_experts}. " - f"Original candidates were {original_counts}. " - f"Valid divisors in range would be: {[d for d in range(args.min_num_gpus_per_engine, args.max_num_gpus_per_engine + 1) if args.num_experts % d == 0]}" - ) - logger.error(error_msg) - raise ValueError(error_msg) - if len(profile_num_gpus) < len(original_counts): - logger.info( - f"Filtered GPU counts from {original_counts} to {profile_num_gpus} " - f"(only divisors of num_experts={args.num_experts})" - ) + if args.model_info.is_moe: logger.info(f"Profiling MoE GPU counts (TEP/DEP): {profile_num_gpus}") else: logger.info(f"Profiling dense model GPU counts (TP): {profile_num_gpus}") @@ -132,17 +171,30 @@ async def run_profile(args): model_name = config_modifier.get_model_name(config) - # Log skip behavior - if args.force_rerun: + # Determine sweep max context length: allow user-provided cap to override model's if smaller + use_specified_max_context_len = getattr(args, "max_context_length", None) + model_max_context_len = args.model_info.max_context_length + if not use_specified_max_context_len and not model_max_context_len: + raise ValueError( + "No max_context_length available from args.max_context_length or model_info from HF config" + ) + elif not use_specified_max_context_len: + sweep_max_context_length = model_max_context_len logger.info( - "Force rerun enabled - will re-run all tests even if results exist" + f"Using model's maximum context length: {model_max_context_len}" ) - elif args.skip_existing_results: + elif not model_max_context_len: + sweep_max_context_length = use_specified_max_context_len logger.info( - "Skip existing results enabled - will skip TP sizes with existing results" + f"Using user-provided max_context_length: {use_specified_max_context_len}" ) else: - logger.info("Skip existing results disabled - will re-run all tests") + sweep_max_context_length = min( + use_specified_max_context_len, model_max_context_len + ) + logger.info( + f"Using minimum of user-provided and model's maximum context length: {sweep_max_context_length}" + ) if args.use_ai_configurator: if not args.aic_system: @@ -173,309 +225,262 @@ async def run_profile(args): ) # first profile prefill - prefill_num_gpus = [] - prefill_ttft = [] - prefill_thpt_per_gpu = [] + prefill_data = PrefillProfileData() logger.info("Profiling prefill...") - prefill_config = config_modifier.convert_config( - config, "prefill", is_moe_model=args.is_moe_model + base_prefill_config = config_modifier.convert_config( + config, EngineType.PREFILL, is_moe_model=args.model_info.is_moe ) frontend_port = config_modifier.get_port(config) itl: float | None = None thpt_per_gpu: float | None = None for num_gpus in profile_num_gpus: logger.info(f"Profiling prefill with {num_gpus} GPUs...") + candidate_mappings = get_candidate_parallel_mappings( + num_gpus, args.model_info, EngineType.PREFILL + ) - # Check if results already exist for this GPU count - if ( - args.skip_existing_results - and not args.force_rerun - and check_prefill_results_exist(args.output_dir, num_gpus, args.isl) - ): - logger.info( - f"Skipping prefill {num_gpus} GPU(s) - results already exist" - ) - ttft, thpt_per_gpu = load_existing_prefill_results( - args.output_dir, num_gpus, args.isl + for mapping in candidate_mappings: + # Apply parallel mapping to config + prefill_config = apply_parallel_mapping_to_config( + base_prefill_config, + mapping, + EngineType.PREFILL, + config_modifier, + args.num_gpus_per_node, ) - if ttft is not None and thpt_per_gpu is not None: - prefill_num_gpus.append(num_gpus) - prefill_ttft.append(ttft) - prefill_thpt_per_gpu.append(thpt_per_gpu) - logger.info( - f"Loaded existing prefill results: {num_gpus} GPU TTFT={ttft:.2f}ms, throughput={thpt_per_gpu:.2f} tokens/s/GPU" - ) - continue + logger.info(f"Dynamo config: {prefill_config}") - if args.is_moe_model: - prefill_config = config_modifier.set_config_tep_size( - prefill_config, num_gpus, args.num_gpus_per_node + # Work dir includes mapping label (safe chars only) + parallel_mapping_tag = ( + mapping.label().replace("=", "").replace("/", "_") ) - else: - prefill_config = config_modifier.set_config_tp_size( - prefill_config, num_gpus + work_dir = ( + f"{args.output_dir}/prefill_{num_gpus}gpus_{parallel_mapping_tag}" ) - logger.info(f"Dynamo config: {prefill_config}") - - work_dir = f"{args.output_dir}/prefill_{num_gpus}gpus" - os.makedirs(work_dir, exist_ok=True) - - prefill_config_fn = f"{work_dir}/config.yaml" - with open(prefill_config_fn, "w") as f: - yaml.dump(prefill_config, f) - - ttft = None - if args.dry_run: - logger.info("Skipping deployment creation in dry run mode") - elif args.use_ai_configurator: - logger.info("Using ai-configurator to estimate prefill latency.") - perf_dict = ai_configurator_perf_estimator.estimate_prefill_perf( - args.isl, - tp_size=num_gpus, - ) - ttft = perf_dict["context_latency"] - logger.info(f"Estimated prefill TTFT: {ttft:.2f}ms") - else: - client = DynamoDeploymentClient( - namespace=args.namespace, - base_log_dir=work_dir, - model_name=model_name, - service_name=args.service_name, - frontend_port=frontend_port, - deployment_name=prefill_config["metadata"]["name"], - ) - logger.info(f"Created client with service_name: {client.service_name}") - deployment_clients.append(client) # Track for cleanup - await client.create_deployment(prefill_config_fn) - logger.info("Waiting for deployment to be ready...") - await client.wait_for_deployment_ready() - logger.info("Deployment is ready") - - logger.info("Getting deployment logs...") - await client.get_deployment_logs() - logger.info( - f"Logs have been saved to {client.base_log_dir / client.deployment_name}" - ) - - # run ai-perf - base_url = client.get_service_url() - ai_perf_artifact_dir = f"{work_dir}/aiperf_isl{args.isl}" - aiperf_result = benchmark_prefill( - args.isl, - ai_perf_artifact_dir, - model_name, - model_name, - base_url=base_url, - ) - if aiperf_result is not None: - ttft = aiperf_result["time_to_first_token"]["avg"] - - logger.info("Cleaning up deployment...") - await client.delete_deployment() - deployment_clients.remove(client) - logger.info("Deployment deleted") + os.makedirs(work_dir, exist_ok=True) + + prefill_config_fn = f"{work_dir}/config.yaml" + with open(prefill_config_fn, "w") as f: + yaml.dump(prefill_config, f) + + ttft = None + if args.dry_run: + logger.info("Skipping deployment creation in dry run mode") + elif args.use_ai_configurator: + logger.info("Using ai-configurator to estimate prefill latency.") + perf_dict = ai_configurator_perf_estimator.estimate_prefill_perf( + args.isl, + tp_size=(mapping.tp or num_gpus), + ) + ttft = perf_dict["context_latency"] + logger.info(f"Estimated prefill TTFT: {ttft:.2f}ms") + else: + client = DynamoDeploymentClient( + namespace=args.namespace, + base_log_dir=work_dir, + model_name=model_name, + service_name=args.service_name, + frontend_port=frontend_port, + deployment_name=prefill_config["metadata"]["name"], + ) + logger.info( + f"Created client with service_name: {client.service_name}" + ) + deployment_clients.append(client) # Track for cleanup + await client.create_deployment(prefill_config_fn) + logger.info("Waiting for deployment to be ready...") + await client.wait_for_deployment_ready() + logger.info("Deployment is ready") + + logger.info("Getting deployment logs...") + await client.get_deployment_logs() + logger.info( + f"Logs have been saved to {client.base_log_dir / client.deployment_name}" + ) - if ttft is not None: - prefill_num_gpus.append(num_gpus) - prefill_ttft.append(ttft) - prefill_thpt_per_gpu.append(args.isl / ttft / num_gpus * 1000) + # run ai-perf + base_url = client.get_service_url() + ai_perf_artifact_dir = f"{work_dir}/aiperf_isl{args.isl}" + aiperf_result = benchmark_prefill( + args.isl, + ai_perf_artifact_dir, + model_name, + model_name, + base_url=base_url, + ) + if aiperf_result is not None: + ttft = aiperf_result["time_to_first_token"]["avg"] + + logger.info("Cleaning up deployment...") + await client.delete_deployment() + deployment_clients.remove(client) + logger.info("Deployment deleted") + + if ttft is not None: + prefill_data.add_data( + num_gpus=num_gpus, + ttft=ttft, + thpt_per_gpu=args.isl / ttft / num_gpus * 1000, + parallel_mapping_label=mapping.label(), + parallel_mapping=mapping, + ) # Plot the results as a 2D scatter plot - prefill_results = None - if prefill_num_gpus and prefill_ttft and prefill_thpt_per_gpu: - prefill_results = (prefill_num_gpus, prefill_ttft, prefill_thpt_per_gpu) - plot_prefill_performance(prefill_results, args.ttft, args.output_dir) + if prefill_data.num_gpus and prefill_data.ttft and prefill_data.thpt_per_gpu: + plot_prefill_performance(prefill_data, args.ttft, args.output_dir) # then profile decode - decode_num_gpus = [] - decode_itl = [] - decode_thpt_per_gpu = [] - decode_concurrency = [] - decode_kv_cache_size = [] - decode_results = [] # Store partial results for plotting later + decode_data = DecodeProfileData() logger.info("Profiling decode...") - decode_config = config_modifier.convert_config( - config, "decode", is_moe_model=args.is_moe_model + base_decode_config = config_modifier.convert_config( + config, EngineType.DECODE, is_moe_model=args.model_info.is_moe ) for num_gpus in profile_num_gpus: logger.info(f"Profiling decode with {num_gpus} GPUs...") + candidate_mappings = get_candidate_parallel_mappings( + num_gpus, args.model_info, EngineType.DECODE + ) - # Check if results already exist for this GPU count - if ( - args.skip_existing_results - and not args.force_rerun - and check_decode_results_exist( - args.output_dir, num_gpus, args.isl, args.osl - ) - ): - logger.info( - f"Skipping decode {num_gpus} GPU(s) - results already exist" - ) - existing_results = load_existing_decode_results( - args.output_dir, num_gpus, args.isl, args.osl + for mapping in candidate_mappings: + # Apply parallel mapping to config + decode_config = apply_parallel_mapping_to_config( + base_decode_config, + mapping, + EngineType.DECODE, + config_modifier, + args.num_gpus_per_node, ) - if existing_results: - # Add existing results to our arrays - engine_decode_itl = [] - engine_decode_thpt_per_gpu = [] - for itl, thpt_per_gpu, concurrency in existing_results: - decode_num_gpus.append(num_gpus) - decode_itl.append(itl) - decode_thpt_per_gpu.append(thpt_per_gpu) - decode_concurrency.append(concurrency) - # We need to get kv_cache_size from existing logs or estimate it - estimated_kv_cache = max( - 100000, concurrency * (args.isl + args.osl) * 2 - ) # Conservative estimate - decode_kv_cache_size.append(estimated_kv_cache) - engine_decode_itl.append(itl) - engine_decode_thpt_per_gpu.append(thpt_per_gpu) - - # Store results for plotting - decode_results.append( - (num_gpus, engine_decode_itl, engine_decode_thpt_per_gpu) - ) - logger.info( - f"Loaded {len(existing_results)} existing decode results for {num_gpus} GPU(s)" - ) - continue + logger.info(f"Dynamo config: {decode_config}") - if args.is_moe_model: - decode_config = config_modifier.set_config_dep_size( - decode_config, num_gpus, args.num_gpus_per_node + parallel_mapping_tag = ( + mapping.label() + .replace("=", "") + .replace("/", "_") # safe chars for directory ) - else: - decode_config = config_modifier.set_config_tp_size( - decode_config, num_gpus + work_dir = ( + f"{args.output_dir}/decode_{num_gpus}gpus_{parallel_mapping_tag}" ) - logger.info(f"Dynamo config: {decode_config}") + os.makedirs(work_dir, exist_ok=True) - work_dir = f"{args.output_dir}/decode_{num_gpus}gpus" - os.makedirs(work_dir, exist_ok=True) + decode_config_fn = f"{work_dir}/config.yaml" + with open(decode_config_fn, "w") as f: + yaml.dump(decode_config, f) - decode_config_fn = f"{work_dir}/config.yaml" - with open(decode_config_fn, "w") as f: - yaml.dump(decode_config, f) + if args.dry_run: + logger.info("Skipping deployment creation in dry run mode") - if args.dry_run: - logger.info("Skipping deployment creation in dry run mode") - - elif args.use_ai_configurator: - # Compute max_concurrency and max_kv_tokens to know which - # num_request to sweep over. - max_concurrency = ai_configurator_perf_estimator.get_max_batch_size( - args.isl, args.osl, tp_size=num_gpus - ) - max_kv_tokens = max_concurrency * (args.isl + args.osl) - - else: - client = DynamoDeploymentClient( - namespace=args.namespace, - base_log_dir=work_dir, - model_name=model_name, - service_name=args.service_name, - frontend_port=frontend_port, - deployment_name=decode_config["metadata"]["name"], - ) - deployment_clients.append(client) # Track for cleanup - await client.create_deployment(decode_config_fn) - logger.info("Waiting for deployment to be ready...") - await client.wait_for_deployment_ready() - logger.info("Deployment is ready") - - logger.info("Getting deployment logs...") - await client.get_deployment_logs() - logger.info( - f"Logs have been saved to {client.base_log_dir / client.deployment_name}" - ) + elif args.use_ai_configurator: + # Compute max_concurrency and max_kv_tokens to know which + # num_request to sweep over. + max_concurrency = ai_configurator_perf_estimator.get_max_batch_size( + args.isl, args.osl, tp_size=(mapping.tp or num_gpus) + ) + max_kv_tokens = max_concurrency * (args.isl + args.osl) + + else: + client = DynamoDeploymentClient( + namespace=args.namespace, + base_log_dir=work_dir, + model_name=model_name, + service_name=args.service_name, + frontend_port=frontend_port, + deployment_name=decode_config["metadata"]["name"], + ) + deployment_clients.append(client) # Track for cleanup + await client.create_deployment(decode_config_fn) + logger.info("Waiting for deployment to be ready...") + await client.wait_for_deployment_ready() + logger.info("Deployment is ready") + + logger.info("Getting deployment logs...") + await client.get_deployment_logs() + logger.info( + f"Logs have been saved to {client.base_log_dir / client.deployment_name}" + ) - # Compute max_concurrency and max_kv_tokens to know which - # num_request to sweep over. - # For MoE models, attention_dp_size = DEP size (num_gpus), for dense models = 1 - attention_dp_size = num_gpus if args.is_moe_model else 1 - max_kv_tokens = config_modifier.get_kv_cache_size_from_dynamo_log( - f"{work_dir}/{client.deployment_name}/{WORKER_COMPONENT_NAMES[args.backend].decode_worker_k8s_name.lower()}/0.log", - attention_dp_size=attention_dp_size, - ) - max_concurrency = max_kv_tokens // (args.isl + args.osl) - - if not args.dry_run: - attention_dp_size = num_gpus if args.is_moe_model else 1 - sweep_num_request = get_num_request_range( - attention_dp_size, - max_concurrency, - args.decode_interpolation_granularity, - ) - logger.info( - f"Sweeping num_request range based on maximum number of kv tokens: {sweep_num_request}" - ) + # Compute max_concurrency and max_kv_tokens to know which + # num_request to sweep over. + attention_dp_size = mapping.get_attn_dp_size(num_gpus) + max_kv_tokens = config_modifier.get_kv_cache_size_from_dynamo_log( + f"{work_dir}/{client.deployment_name}/{WORKER_COMPONENT_NAMES[args.backend].decode_worker_k8s_name.lower()}/0.log", + attention_dp_size=attention_dp_size, + ) + max_concurrency = max_kv_tokens // (args.isl + args.osl) + + if not args.dry_run: + attention_dp_size = mapping.get_attn_dp_size(num_gpus) + sweep_num_request = get_num_request_range( + attention_dp_size, + max_concurrency, + args.decode_interpolation_granularity, + ) + logger.info( + f"Sweeping num_request range based on maximum number of kv tokens: {sweep_num_request}" + ) - engine_decode_itl = [] - engine_decode_thpt_per_gpu = [] - for num_request in sweep_num_request: - itl = thpt_per_gpu = None - if args.use_ai_configurator: - logger.info("Using ai-configurator to estimate decode latency.") - perf_dict = ai_configurator_perf_estimator.estimate_perf( - args.isl, - args.osl, - num_request, - mode="decode", - tp_size=num_gpus, - ) - - itl = perf_dict["tpot"] - thpt_per_gpu = perf_dict["tokens/s/gpu"] - logger.info(f"Estimated decode ITL: {itl:.2f}ms") - logger.info( - f"Estimated decode throughput per GPU: {thpt_per_gpu:.2f} tokens/s/GPU" - ) - else: - base_url = client.get_service_url() - ai_perf_artifact_dir = f"{work_dir}/aiperf_request{num_request}_isl{args.isl}_osl{args.osl}_n{num_request}" - aiperf_result = benchmark_decode( - args.isl, - args.osl, - num_request, - ai_perf_artifact_dir, - model_name, - model_name, - base_url=base_url, - ) - if aiperf_result is not None: - itl = aiperf_result["inter_token_latency"]["avg"] - thpt_per_gpu = ( - aiperf_result["output_token_throughput"]["avg"] - / num_gpus + for num_request in sweep_num_request: + itl = thpt_per_gpu = None + if args.use_ai_configurator: + logger.info( + "Using ai-configurator to estimate decode latency." + ) + perf_dict = ai_configurator_perf_estimator.estimate_perf( + args.isl, + args.osl, + num_request, + mode=EngineType.DECODE, + tp_size=(mapping.tp or num_gpus), ) - if itl is not None and thpt_per_gpu is not None: - engine_decode_itl.append(itl) - engine_decode_thpt_per_gpu.append(thpt_per_gpu) - decode_num_gpus.append(num_gpus) - decode_itl.append(itl) - decode_thpt_per_gpu.append(thpt_per_gpu) - decode_concurrency.append(num_request) - decode_kv_cache_size.append(max_kv_tokens) - - # Store partial results for plotting later - decode_results.append( - (num_gpus, engine_decode_itl, engine_decode_thpt_per_gpu) - ) + itl = perf_dict["tpot"] + thpt_per_gpu = perf_dict["tokens/s/gpu"] + logger.info(f"Estimated decode ITL: {itl:.2f}ms") + logger.info( + f"Estimated decode throughput per GPU: {thpt_per_gpu:.2f} tokens/s/GPU" + ) + else: + base_url = client.get_service_url() + ai_perf_artifact_dir = f"{work_dir}/aiperf_request{num_request}_isl{args.isl}_osl{args.osl}_n{num_request}" + aiperf_result = benchmark_decode( + args.isl, + args.osl, + num_request, + ai_perf_artifact_dir, + model_name, + model_name, + base_url=base_url, + ) + if aiperf_result is not None: + itl = aiperf_result["inter_token_latency"]["avg"] + thpt_per_gpu = ( + aiperf_result["output_token_throughput"]["avg"] + / num_gpus + ) + + if itl is not None and thpt_per_gpu is not None: + decode_data.add_data( + num_gpus=num_gpus, + itl=itl, + thpt_per_gpu=thpt_per_gpu, + concurrency=num_request, + kv_cache_size=max_kv_tokens, + parallel_mapping_label=mapping.label(), + parallel_mapping=mapping, + ) - if not args.dry_run and not args.use_ai_configurator: - logger.info("Cleaning up deployment...") - await client.delete_deployment() - deployment_clients.remove(client) - logger.info("Deployment deleted") + if not args.dry_run and not args.use_ai_configurator: + logger.info("Cleaning up deployment...") + await client.delete_deployment() + deployment_clients.remove(client) + logger.info("Deployment deleted") # Plot all decode results after profiling is complete - if decode_results: - plot_decode_performance(decode_results, args.itl, args.output_dir) + if decode_data.num_gpus: + plot_decode_performance(decode_data, args.itl, args.output_dir) - if prefill_results and decode_results: + if prefill_data.num_gpus and decode_data.num_gpus: plot_pd_joint_results( - args.isl, args.osl, prefill_results, decode_results, args.output_dir + args.isl, args.osl, prefill_data, decode_data, args.output_dir ) if args.dry_run: @@ -483,100 +488,77 @@ async def run_profile(args): else: logger.info("Analyzing results and generate recommendations...") # Safety guards: no results → exit early with a clear message - if not (prefill_num_gpus and prefill_ttft and prefill_thpt_per_gpu): + if not prefill_data.num_gpus: logger.error("No prefill results produced; skipping recommendations.") - # select best tp size for prefill - if min(prefill_ttft) > args.ttft: + # select best parallel mapping for prefill + if min(prefill_data.ttft) > args.ttft: logger.info( "No TP size satisfies the TTFT requirement, please try a smaller model or a more powerful GPU SKU" ) - selected_prefill_idx = int(np.argmin(np.array(prefill_ttft))) + selected_prefill_idx = int(np.argmin(np.array(prefill_data.ttft))) else: valid_indices = [ - i for i, ttft in enumerate(prefill_ttft) if ttft <= args.ttft + i for i, ttft in enumerate(prefill_data.ttft) if ttft <= args.ttft ] # Among valid TP sizes, select the one with highest throughput per GPU - valid_thpts = [prefill_thpt_per_gpu[i] for i in valid_indices] + valid_thpts = [prefill_data.thpt_per_gpu[i] for i in valid_indices] max_thpt_idx = valid_indices[int(np.argmax(valid_thpts))] selected_prefill_idx = max_thpt_idx logger.info( - f"Suggested number of GPUs for prefill: {prefill_num_gpus[selected_prefill_idx]} (TTFT {prefill_ttft[selected_prefill_idx]:.2f} ms, throughput {prefill_thpt_per_gpu[selected_prefill_idx]:.2f} tokens/s/GPU)" + f"Suggested prefill parallel mapping: {prefill_data.parallel_mapping_labels[selected_prefill_idx]} on {prefill_data.num_gpus[selected_prefill_idx]} GPU(s) (TTFT {prefill_data.ttft[selected_prefill_idx]:.2f} ms, throughput {prefill_data.thpt_per_gpu[selected_prefill_idx]:.2f} tokens/s/GPU)" ) - # scale up if estimated TTFT is 120% of target TTFT - prefill_queue_size_upper_bound = max( - 0.1, args.ttft * 1.2 / prefill_ttft[selected_prefill_idx] - 1 - ) - # scale down if estimated TTFT is 80% of target TTFT - prefill_queue_size_lower_bound = max( - 0.1, args.ttft * 0.8 / prefill_ttft[selected_prefill_idx] - 1 - ) - logger.info( - f"Suggested planner upper/lower bound for prefill queue size: {prefill_queue_size_upper_bound:.2f}/{prefill_queue_size_lower_bound:.2f}" - ) - - # select best gpu count for decode - if not ( - decode_num_gpus - and decode_itl - and decode_thpt_per_gpu - and decode_concurrency - and decode_kv_cache_size - ): + # select best parallel mapping for decode + if not decode_data.num_gpus: logger.error("No decode results produced; skipping recommendations.") return - if min(decode_itl) > args.itl: + if min(decode_data.itl) > args.itl: logger.info( "No TP size satisfies the ITL requirement, please try a smaller model or a more powerful GPU SKU" ) - selected_decode_idx = int(np.argmin(np.array(decode_itl))) + selected_decode_idx = int(np.argmin(np.array(decode_data.itl))) else: valid_indices = [ - i for i, itl in enumerate(decode_itl) if itl <= args.itl + i for i, itl in enumerate(decode_data.itl) if itl <= args.itl ] # Among valid TP sizes, select the one with highest throughput per GPU - valid_thpts = [decode_thpt_per_gpu[i] for i in valid_indices] + valid_thpts = [decode_data.thpt_per_gpu[i] for i in valid_indices] max_thpt_idx = valid_indices[int(np.argmax(valid_thpts))] selected_decode_idx = max_thpt_idx logger.info( - f"Suggested number of GPUs for decode: {decode_num_gpus[selected_decode_idx]} (ITL {decode_itl[selected_decode_idx]:.2f} ms, throughput {decode_thpt_per_gpu[selected_decode_idx]:.2f} tokens/s/GPU)" - ) - - # calculate kv cache utlization for the selected TP and concurrency - selected_decode_kv_cache_utilization = ( - decode_concurrency[selected_decode_idx] - * (args.isl + (args.osl / 2)) - / decode_kv_cache_size[selected_decode_idx] - ) - # set a +- 20% range for the kv cache utilization - logger.info( - f"Suggested planner upper/lower bound for decode kv cache utilization: {min(1, selected_decode_kv_cache_utilization + 0.2):.2f}/{max(0.1, selected_decode_kv_cache_utilization - 0.2):.2f}" + f"Suggested decode parallel mapping: {decode_data.parallel_mapping_labels[selected_decode_idx]} on {decode_data.num_gpus[selected_decode_idx]} GPU(s) (ITL {decode_data.itl[selected_decode_idx]:.2f} ms, throughput {decode_data.thpt_per_gpu[selected_decode_idx]:.2f} tokens/s/GPU)" ) if args.dry_run: # use min value for prefill and decode GPU counts - prefill_num_gpus = [args.min_num_gpus_per_engine] - decode_num_gpus = [args.min_num_gpus_per_engine] + prefill_data.num_gpus = [args.min_num_gpus_per_engine] + decode_data.num_gpus = [args.min_num_gpus_per_engine] + prefill_data.parallel_mappings = [ + ParallelizationMapping(tp=args.min_num_gpus_per_engine) + ] + decode_data.parallel_mappings = [ + ParallelizationMapping(tp=args.min_num_gpus_per_engine) + ] selected_prefill_idx = 0 selected_decode_idx = 0 - # interpolate ISL - TTFT with best prefill GPU count - best_prefill_gpus = prefill_num_gpus[selected_prefill_idx] + # interpolate ISL - TTFT with best prefill parallel mapping + best_prefill_gpus = prefill_data.num_gpus[selected_prefill_idx] + best_prefill_mapping = prefill_data.parallel_mappings[selected_prefill_idx] logger.info( - f"Profiling prefill under best {best_prefill_gpus} GPU(s) with different ISL..." + f"Profiling prefill under best {best_prefill_gpus} GPU(s) with parallel mapping [{best_prefill_mapping.label()}] with different ISL..." ) prefill_config = config_modifier.convert_config( - config, "prefill", is_moe_model=args.is_moe_model + config, EngineType.PREFILL, is_moe_model=args.model_info.is_moe + ) + prefill_config = apply_parallel_mapping_to_config( + prefill_config, + best_prefill_mapping, + EngineType.PREFILL, + config_modifier, + args.num_gpus_per_node, ) - if args.is_moe_model: - prefill_config = config_modifier.set_config_tep_size( - prefill_config, best_prefill_gpus, args.num_gpus_per_node - ) - else: - prefill_config = config_modifier.set_config_tp_size( - prefill_config, best_prefill_gpus - ) logger.info(f"Dynamo config: {prefill_config}") work_dir = f"{args.output_dir}/selected_prefill_interpolation" @@ -592,10 +574,10 @@ async def run_profile(args): profile_prefill_aiconfigurator( work_dir, best_prefill_gpus, # num_gpus - args.max_context_length, + sweep_max_context_length, args.prefill_interpolation_granularity, ai_configurator_perf_estimator, - tp_size=best_prefill_gpus, + tp_size=(best_prefill_mapping.tp or best_prefill_gpus), ) else: client = DynamoDeploymentClient( @@ -635,7 +617,7 @@ async def run_profile(args): model_name, base_url, best_prefill_gpus, - args.max_context_length, + sweep_max_context_length, args.prefill_interpolation_granularity, ) @@ -644,17 +626,22 @@ async def run_profile(args): deployment_clients.remove(client) logger.info("Deployment deleted") - # interpolate ITL - Active_KV_Cache - Decode_Context_Length with best decode GPU count - best_decode_gpus = decode_num_gpus[selected_decode_idx] - logger.info(f"Profiling decode with {best_decode_gpus} GPUs...") - if args.is_moe_model: - decode_config = config_modifier.set_config_dep_size( - decode_config, best_decode_gpus, args.num_gpus_per_node - ) - else: - decode_config = config_modifier.set_config_tp_size( - decode_config, best_decode_gpus - ) + # interpolate ITL - Active_KV_Cache - Decode_Context_Length with best decode parallel mapping + best_decode_gpus = decode_data.num_gpus[selected_decode_idx] + best_decode_mapping = decode_data.parallel_mappings[selected_decode_idx] + logger.info( + f"Profiling decode with {best_decode_gpus} GPUs with parallel mapping [{best_decode_mapping.label()}]..." + ) + decode_config = config_modifier.convert_config( + config, EngineType.DECODE, is_moe_model=args.model_info.is_moe + ) + decode_config = apply_parallel_mapping_to_config( + decode_config, + best_decode_mapping, + EngineType.DECODE, + config_modifier, + args.num_gpus_per_node, + ) logger.info(f"Dynamo config: {decode_config}") work_dir = f"{args.output_dir}/selected_decode_interpolation" @@ -667,20 +654,19 @@ async def run_profile(args): if args.dry_run: logger.info("Skipping deployment creation in dry run mode") elif args.use_ai_configurator: - # For MoE models, attention_dp_size = DEP size (best_decode_gpus), for dense models = 1 - attention_dp_size = best_decode_gpus if args.is_moe_model else 1 + attention_dp_size = best_decode_mapping.get_attn_dp_size(best_decode_gpus) max_kv_tokens = ai_configurator_perf_estimator.get_max_kv_tokens( - args.isl, args.osl, tp_size=best_decode_gpus + args.isl, args.osl, tp_size=(best_decode_mapping.tp or best_decode_gpus) ) profile_decode_aiconfigurator( work_dir, best_decode_gpus, # num_gpus max_kv_tokens, - args.max_context_length, + sweep_max_context_length, args.decode_interpolation_granularity, ai_configurator_perf_estimator, attention_dp_size, - tp_size=best_decode_gpus, + tp_size=(best_decode_mapping.tp or best_decode_gpus), ) else: client = DynamoDeploymentClient( @@ -703,8 +689,7 @@ async def run_profile(args): f"Logs have been saved to {client.base_log_dir / client.deployment_name}" ) - # For MoE models, attention_dp_size = DEP size (best_decode_gpus), for dense models = 1 - attention_dp_size = best_decode_gpus if args.is_moe_model else 1 + attention_dp_size = best_decode_mapping.get_attn_dp_size(best_decode_gpus) max_kv_tokens = config_modifier.get_kv_cache_size_from_dynamo_log( f"{work_dir}/{client.deployment_name}/{WORKER_COMPONENT_NAMES[args.backend].decode_worker_k8s_name.lower()}/0.log", attention_dp_size=attention_dp_size, @@ -719,7 +704,7 @@ async def run_profile(args): base_url, best_decode_gpus, max_kv_tokens, - args.max_context_length, + sweep_max_context_length, args.decode_interpolation_granularity, attention_dp_size, ) @@ -737,7 +722,7 @@ async def run_profile(args): best_decode_gpus=best_decode_gpus, output_dir=args.output_dir, args=args, - is_moe_model=args.is_moe_model, + is_moe_model=args.model_info.is_moe, num_gpus_per_node=args.num_gpus_per_node, ) logger.info(f"Final DGD config with planner: {config}") diff --git a/benchmarks/profiler/utils/config.py b/benchmarks/profiler/utils/config.py index 6360cc6c1a..fd28543aba 100644 --- a/benchmarks/profiler/utils/config.py +++ b/benchmarks/profiler/utils/config.py @@ -17,7 +17,7 @@ import logging import math import shlex -from typing import Literal, Optional, Protocol +from typing import Optional from pydantic import BaseModel @@ -378,69 +378,3 @@ def update_image(config: dict, image: str) -> dict: logger.debug(f"Updated image for {service_name} to {image}") return cfg.model_dump() - - -class ConfigModifierProtocol(Protocol): - @classmethod - def convert_config( - cls, - config: dict, - target: Literal["prefill", "decode"], - is_moe_model: bool = False, - ) -> dict: - ... - - @classmethod - def set_config_tp_size( - cls, - config: dict, - tp_size: int, - component_type: SubComponentType = SubComponentType.DECODE, - ) -> dict: - ... - - @classmethod - def set_config_tep_size( - cls, - config: dict, - tep_size: int, - num_gpus_per_node: int, - component_type: SubComponentType = SubComponentType.DECODE, - ) -> dict: - ... - - @classmethod - def set_config_dep_size( - cls, - config: dict, - dep_size: int, - num_gpus_per_node: int, - component_type: SubComponentType = SubComponentType.DECODE, - ) -> dict: - ... - - @classmethod - def get_model_name(cls, config: dict) -> str: - ... - - @classmethod - def get_port(cls, config: dict) -> int: - ... - - @classmethod - def get_kv_cache_size_from_dynamo_log( - cls, dynamo_log_fn: str, attention_dp_size: int = 1 - ) -> int: - ... - - @classmethod - def load_default_config(cls) -> dict: - ... - - @classmethod - def update_model(cls, config: dict, model_name: str) -> dict: - ... - - @classmethod - def update_image(cls, config: dict, image: str) -> dict: - ... diff --git a/benchmarks/profiler/utils/config_modifiers/__init__.py b/benchmarks/profiler/utils/config_modifiers/__init__.py index 80ebdeb5f7..cd33c7d08c 100644 --- a/benchmarks/profiler/utils/config_modifiers/__init__.py +++ b/benchmarks/profiler/utils/config_modifiers/__init__.py @@ -16,7 +16,9 @@ from typing import TYPE_CHECKING if TYPE_CHECKING: - from benchmarks.profiler.utils.config import ConfigModifierProtocol + from benchmarks.profiler.utils.config_modifiers.protocol import ( + ConfigModifierProtocol, + ) from benchmarks.profiler.utils.config_modifiers.sglang import SGLangConfigModifier from benchmarks.profiler.utils.config_modifiers.trtllm import TrtllmConfigModifier diff --git a/benchmarks/profiler/utils/config_modifiers/parallelization_mapping.py b/benchmarks/profiler/utils/config_modifiers/parallelization_mapping.py new file mode 100644 index 0000000000..8e80fd5d9e --- /dev/null +++ b/benchmarks/profiler/utils/config_modifiers/parallelization_mapping.py @@ -0,0 +1,221 @@ +# SPDX-FileCopyrightText: Copyright (c) 2025 NVIDIA CORPORATION & AFFILIATES. All rights reserved. +# SPDX-License-Identifier: Apache-2.0 + +import copy +import logging +from dataclasses import dataclass +from enum import Enum + +from benchmarks.profiler.utils.defaults import EngineType +from benchmarks.profiler.utils.model_info import ModelInfo + +logger = logging.getLogger(__name__) +logger.setLevel(logging.INFO) +console_handler = logging.StreamHandler() +console_handler.setLevel(logging.INFO) +formatter = logging.Formatter( + "%(asctime)s - %(name)s - %(levelname)s - %(message)s", "%Y-%m-%d %H:%M:%S" +) +console_handler.setFormatter(formatter) +logger.addHandler(console_handler) + + +class ParallelizationStrategy(Enum): + """Enum for parallelization strategy types.""" + + TP = "TP" + TEP = "TEP" + DEP = "DEP" + + +@dataclass(frozen=True) +class ParallelizationMapping: + """ + Represents parallelization mapping of configs + """ + + tp: int | None = None + tep: int | None = None + dep: int | None = None + + def label(self) -> str: + if self.tp is not None: + return f"{ParallelizationStrategy.TP.value}={self.tp}" + if self.tep is not None: + return f"{ParallelizationStrategy.TEP.value}={self.tep}" + if self.dep is not None: + return f"{ParallelizationStrategy.DEP.value}={self.dep}" + return "default" + + def get_tp_size(self) -> int: + """ + Get the effective TP size for KV heads splitting. + Both TP and TEP split KV heads, DEP doesn't (returns 1). + """ + if self.tp is not None: + return self.tp + if self.tep is not None: + return self.tep + return 1 # DEP has TP split of 1 + + def get_expert_split(self) -> int: + """ + Get the effective expert split size. + Both TEP and DEP split experts, TP doesn't (returns 1). + """ + if self.tep is not None: + return self.tep + if self.dep is not None: + return self.dep + return 1 # TP has expert split of 1 + + def get_attn_dp_size(self, num_gpus: int) -> int: + """ + Get the attention data parallelism size. + DEP uses data parallelism for attention (returns num_gpus). + TP and TEP don't use data parallelism for attention (returns 1). + + Args: + num_gpus: Total number of GPUs being used + + Returns: + The attention data parallelism size + """ + if self.dep is not None: + return num_gpus + return 1 # TP and TEP have attention DP size of 1 + + +def _check_divisibility( + value: int | None, + divisor: int, + value_name: str, + divisor_name: str, + mapping_label: str, +) -> bool: + """ + Check if value is divisible by divisor. + Returns True if valid (or value is None), False if invalid. + + Args: + value: The value to check (e.g., num_kv_heads, num_experts) + divisor: The divisor to check against + value_name: Name of the value for error messages + divisor_name: Name of the divisor for error messages (e.g., "tp_size", "expert_split") + mapping_label: Label of the mapping for error messages + """ + if value is None: + logger.warning( + f"Skipping {value_name} divisibility check for {mapping_label}: {value_name} is unknown" + ) + return True + + if divisor > 1 and int(value) % divisor != 0: + logger.warning( + f"Invalid mapping {mapping_label}: {value_name}={value} not divisible by {divisor_name}={divisor}" + ) + return False + + return True + + +def _validate_intermediate_size( + mapping: ParallelizationMapping, + intermediate_size: int | None, + quant_block: int | None, +) -> bool: + """ + Validate intermediate size and quantization block for TP and TEP strategies. + Checks: + - intermediate_size % tp_size == 0 + - (intermediate_size // tp_size) divides quant_block (if quant_block is known) + """ + tp_size = mapping.get_tp_size() + + # Check basic divisibility + if not _check_divisibility( + intermediate_size, tp_size, "intermediate_size", "tp_size", mapping.label() + ): + return False + + # Additional check for quantization block constraint + if intermediate_size is not None and quant_block is not None and tp_size > 1: + per_shard = int(intermediate_size) // tp_size + if not _check_divisibility( + per_shard, quant_block, "per_shard", "quant_block", mapping.label() + ): + return False + + return True + + +def get_candidate_parallel_mappings( + num_gpus: int, model_info: ModelInfo, phase: str +) -> list[ParallelizationMapping]: + """ + Return a list of candidate parallelization mappings for a given GPU count and phase, + verified against model properties. + + Verification rules: + - TP and TEP must divide num_kv_heads (if available) + - TEP and DEP must divide num_experts (if available) + """ + is_moe = bool(model_info.is_moe) + num_kv_heads = model_info.num_kv_heads + num_experts = model_info.num_experts + intermediate_size = model_info.intermediate_size + quant_block = model_info.quantization_block_size + + candidates: list[ParallelizationMapping] = [] + if is_moe: + if phase == EngineType.PREFILL: + candidates = [ParallelizationMapping(tep=num_gpus)] + elif phase == EngineType.DECODE: + candidates = [ + ParallelizationMapping(dep=num_gpus), + ParallelizationMapping(tep=num_gpus), + ] + else: + candidates = [ParallelizationMapping(tp=num_gpus)] + + # Verify candidates against model constraints + verified: list[ParallelizationMapping] = [] + for m in candidates: + # Check KV heads divisibility + if not _check_divisibility( + num_kv_heads, m.get_tp_size(), "num_kv_heads", "tp_size", m.label() + ): + continue + + # Check experts divisibility + if not _check_divisibility( + num_experts, m.get_expert_split(), "num_experts", "expert_split", m.label() + ): + continue + + # Check intermediate size and quantization block + if not _validate_intermediate_size(m, intermediate_size, quant_block): + continue + + verified.append(m) + + return verified + + +def apply_parallel_mapping_to_config( + base_config: dict, + mapping: ParallelizationMapping, + phase: str, + config_modifier, + num_gpus_per_node: int | None, +) -> dict: + cfg = copy.deepcopy(base_config) + if mapping.tp is not None: + cfg = config_modifier.set_config_tp_size(cfg, mapping.tp) + elif phase == EngineType.PREFILL and mapping.tep is not None: + cfg = config_modifier.set_config_tep_size(cfg, mapping.tep, num_gpus_per_node) + elif phase == EngineType.DECODE and mapping.dep is not None: + cfg = config_modifier.set_config_dep_size(cfg, mapping.dep, num_gpus_per_node) + else: + pass + return cfg diff --git a/benchmarks/profiler/utils/config_modifiers/protocol.py b/benchmarks/profiler/utils/config_modifiers/protocol.py new file mode 100644 index 0000000000..61f47c8278 --- /dev/null +++ b/benchmarks/profiler/utils/config_modifiers/protocol.py @@ -0,0 +1,85 @@ +# SPDX-FileCopyrightText: Copyright (c) 2025 NVIDIA CORPORATION & AFFILIATES. All rights reserved. +# SPDX-License-Identifier: Apache-2.0 +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +from typing import Protocol + +from benchmarks.profiler.utils.defaults import EngineType +from dynamo.planner.defaults import SubComponentType + + +class ConfigModifierProtocol(Protocol): + @classmethod + def convert_config( + cls, + config: dict, + target: EngineType, + is_moe_model: bool = False, + ) -> dict: + ... + + @classmethod + def set_config_tp_size( + cls, + config: dict, + tp_size: int, + component_type: SubComponentType = SubComponentType.DECODE, + ) -> dict: + ... + + @classmethod + def set_config_tep_size( + cls, + config: dict, + tep_size: int, + num_gpus_per_node: int, + component_type: SubComponentType = SubComponentType.DECODE, + ) -> dict: + ... + + @classmethod + def set_config_dep_size( + cls, + config: dict, + dep_size: int, + num_gpus_per_node: int, + component_type: SubComponentType = SubComponentType.DECODE, + ) -> dict: + ... + + @classmethod + def get_model_name(cls, config: dict) -> str: + ... + + @classmethod + def get_port(cls, config: dict) -> int: + ... + + @classmethod + def get_kv_cache_size_from_dynamo_log( + cls, dynamo_log_fn: str, attention_dp_size: int = 1 + ) -> int: + ... + + @classmethod + def load_default_config(cls) -> dict: + ... + + @classmethod + def update_model(cls, config: dict, model_name: str) -> dict: + ... + + @classmethod + def update_image(cls, config: dict, image: str) -> dict: + ... diff --git a/benchmarks/profiler/utils/config_modifiers/sglang.py b/benchmarks/profiler/utils/config_modifiers/sglang.py index 332d58e85c..7df6b058cb 100644 --- a/benchmarks/profiler/utils/config_modifiers/sglang.py +++ b/benchmarks/profiler/utils/config_modifiers/sglang.py @@ -3,7 +3,6 @@ import logging import re -from typing import Literal import yaml @@ -22,6 +21,7 @@ from benchmarks.profiler.utils.defaults import ( DEFAULT_MODEL_NAME, DYNAMO_RUN_DEFAULT_PORT, + EngineType, ) from dynamo.planner.defaults import SubComponentType @@ -82,7 +82,7 @@ def update_image(cls, config, image: str) -> dict: def convert_config( cls, config: dict, - target: Literal["prefill", "decode"], + target: EngineType, is_moe_model: bool = False, ) -> dict: cfg = Config.model_validate(config) @@ -94,7 +94,7 @@ def convert_config( if "Planner" in cfg.spec.services: del cfg.spec.services["Planner"] - if target == "prefill": + if target == EngineType.PREFILL: # Get service names by inferring from subComponentType first prefill_service_name = get_service_name_by_type( cfg, "sglang", SubComponentType.PREFILL @@ -131,7 +131,7 @@ def convert_config( worker_service.extraPodSpec.mainContainer.args = args - elif target == "decode": + elif target == EngineType.DECODE: # Get service names by inferring from subComponentType first prefill_service_name = get_service_name_by_type( cfg, "sglang", SubComponentType.PREFILL @@ -292,6 +292,12 @@ def get_model_name(cls, config: dict) -> str: return DEFAULT_MODEL_NAME args = break_arguments(args) + # Check for --model-path first (primary argument for SGLang) + for i, arg in enumerate(args): + if arg == "--model-path" and i + 1 < len(args): + return args[i + 1] + + # Fall back to --served-model-name if --model-path not found for i, arg in enumerate(args): if arg == "--served-model-name" and i + 1 < len(args): return args[i + 1] diff --git a/benchmarks/profiler/utils/config_modifiers/trtllm.py b/benchmarks/profiler/utils/config_modifiers/trtllm.py index 020b7efca7..2548eb1942 100644 --- a/benchmarks/profiler/utils/config_modifiers/trtllm.py +++ b/benchmarks/profiler/utils/config_modifiers/trtllm.py @@ -4,7 +4,6 @@ import json import logging import re -from typing import Literal import yaml @@ -24,6 +23,7 @@ from benchmarks.profiler.utils.defaults import ( DEFAULT_MODEL_NAME, DYNAMO_RUN_DEFAULT_PORT, + EngineType, ) from dynamo.planner.defaults import SubComponentType @@ -84,7 +84,7 @@ def update_image(cls, config, image: str) -> dict: def convert_config( cls, config: dict, - target: Literal["prefill", "decode"], + target: EngineType, is_moe_model: bool = False, ) -> dict: if is_moe_model: @@ -101,7 +101,7 @@ def convert_config( if "Planner" in cfg.spec.services: del cfg.spec.services["Planner"] - if target == "prefill": + if target == EngineType.PREFILL: # Get service names by inferring from subComponentType first prefill_service_name = get_service_name_by_type( cfg, "trtllm", SubComponentType.PREFILL @@ -157,7 +157,7 @@ def convert_config( worker_service.extraPodSpec.mainContainer.args = args - elif target == "decode": + elif target == EngineType.DECODE: # Get service names by inferring from subComponentType first prefill_service_name = get_service_name_by_type( cfg, "trtllm", SubComponentType.PREFILL diff --git a/benchmarks/profiler/utils/config_modifiers/vllm.py b/benchmarks/profiler/utils/config_modifiers/vllm.py index c0f004d580..a05bc1758c 100644 --- a/benchmarks/profiler/utils/config_modifiers/vllm.py +++ b/benchmarks/profiler/utils/config_modifiers/vllm.py @@ -2,7 +2,6 @@ # SPDX-License-Identifier: Apache-2.0 import logging -from typing import Literal import yaml @@ -20,6 +19,7 @@ from benchmarks.profiler.utils.defaults import ( DEFAULT_MODEL_NAME, DYNAMO_RUN_DEFAULT_PORT, + EngineType, ) from dynamo.planner.defaults import SubComponentType @@ -79,7 +79,7 @@ def update_image(cls, config, image: str) -> dict: def convert_config( cls, config: dict, - target: Literal["prefill", "decode"], + target: EngineType, is_moe_model: bool = False, ) -> dict: if is_moe_model: @@ -96,7 +96,7 @@ def convert_config( if "Planner" in cfg.spec.services: del cfg.spec.services["Planner"] - if target == "prefill": + if target == EngineType.PREFILL: # Get service names by inferring from subComponentType first prefill_service_name = get_service_name_by_type( cfg, "vllm", SubComponentType.PREFILL @@ -133,7 +133,7 @@ def convert_config( worker_service.extraPodSpec.mainContainer.args = args - elif target == "decode": + elif target == EngineType.DECODE: # Get service names by inferring from subComponentType first prefill_service_name = get_service_name_by_type( cfg, "vllm", SubComponentType.PREFILL diff --git a/benchmarks/profiler/utils/defaults.py b/benchmarks/profiler/utils/defaults.py index 75695c2187..b6c63310d4 100644 --- a/benchmarks/profiler/utils/defaults.py +++ b/benchmarks/profiler/utils/defaults.py @@ -13,9 +13,16 @@ # See the License for the specific language governing permissions and # limitations under the License. +from enum import Enum + DEFAULT_MODEL_NAME = "Qwen/Qwen3-0.6B" DYNAMO_RUN_DEFAULT_PORT = 8000 # set a decode maximum concurrency due to limits of profiling tools # for MoE models with attn-dp, we might hit this limit DECODE_MAX_CONCURRENCY = 2000 + + +class EngineType(str, Enum): + PREFILL = "prefill" + DECODE = "decode" diff --git a/benchmarks/profiler/utils/model_info.py b/benchmarks/profiler/utils/model_info.py index 7542a10283..d92675b04e 100644 --- a/benchmarks/profiler/utils/model_info.py +++ b/benchmarks/profiler/utils/model_info.py @@ -5,6 +5,7 @@ from typing import Optional, Union from huggingface_hub import model_info +from pydantic import BaseModel from transformers import AutoConfig DTYPE_BYTES_MAP = { @@ -103,10 +104,21 @@ def get_model_weight_size( return get_model_weight_size_from_hub(str(model_name_or_path)) +class ModelInfo(BaseModel): + model_size: float + architecture: str + is_moe: bool + max_context_length: Optional[int] = None + num_experts: Optional[int] = None + intermediate_size: Optional[int] = None + num_kv_heads: Optional[int] = None + quantization_block_size: Optional[int] = None + + def get_model_info( model_name_or_path: Union[str, Path], trust_remote_code: bool = False, -) -> dict: +) -> ModelInfo: model_size = get_model_weight_size(model_name_or_path) config = AutoConfig.from_pretrained( @@ -114,10 +126,11 @@ def get_model_info( trust_remote_code=trust_remote_code, ) - if config.architectures[0] in MOE_ARCHITECTURES: - config.is_moe = True + architecture = config.architectures[0] + if architecture in MOE_ARCHITECTURES: + is_moe = True else: - config.is_moe = False + is_moe = False # Detect max context length from config # Different models use different attribute names for max context length @@ -132,7 +145,7 @@ def get_model_info( # Detect number of experts for MoE models # Different models use different attribute names num_experts = None - if config.is_moe: + if is_moe: expert_attrs = [ "n_routed_experts", # DeepSeek V3/R1 "num_local_experts", # Mixtral, Qwen @@ -145,12 +158,78 @@ def get_model_info( num_experts = value break - return { - "model_size": model_size, - "is_moe": config.is_moe, - "max_context_length": max_context_length, - "num_experts": num_experts, - } + # Detect intermediate size (FFN hidden dimension) + intermediate_size = None + intermediate_attrs = [ + "intermediate_size", # Most common (BERT, LLaMA, etc.) + "ffn_dim", # Some transformer models + ] + for attr in intermediate_attrs: + if hasattr(config, attr): + value = getattr(config, attr) + if value is not None: + intermediate_size = value + break + + # Detect number of key-value heads (for GQA) + num_kv_heads = None + kv_head_attrs = [ + "num_key_value_heads", # LLaMA 2/3, Mistral, etc. + "num_kv_heads", # Alternative name + ] + for attr in kv_head_attrs: + if hasattr(config, attr): + value = getattr(config, attr) + if value is not None: + num_kv_heads = value + break + # If not found, check if it equals num_attention_heads (standard MHA) + if num_kv_heads is None and hasattr(config, "num_attention_heads"): + num_kv_heads = config.num_attention_heads + + # Detect quantization block size + quantization_block_size = None + if hasattr(config, "quantization_config"): + quant_config = config.quantization_config + if isinstance(quant_config, dict): + # Check for common quantization block size attributes + quantization_block_size = ( + quant_config.get("weight_block_size") + or quant_config.get("block_size") + or quant_config.get("group_size") + or quant_config.get("q_group_size") + ) + elif quant_config is not None: + # Handle object-based quantization config + for attr in [ + "weight_block_size", + "block_size", + "group_size", + "q_group_size", + ]: + if hasattr(quant_config, attr): + value = getattr(quant_config, attr) + if value is not None: + quantization_block_size = value + break + + # Handle case where block size is a list (e.g., [128, 128] for [input, output] block sizes) + if ( + isinstance(quantization_block_size, list) + and len(quantization_block_size) > 0 + ): + quantization_block_size = max(quantization_block_size) + + return ModelInfo( + model_size=model_size, + architecture=architecture, + is_moe=is_moe, + max_context_length=max_context_length, + num_experts=num_experts, + intermediate_size=intermediate_size, + num_kv_heads=num_kv_heads, + quantization_block_size=quantization_block_size, + ) if __name__ == "__main__": diff --git a/benchmarks/profiler/utils/plot.py b/benchmarks/profiler/utils/plot.py index 753c5a856f..10c7077022 100644 --- a/benchmarks/profiler/utils/plot.py +++ b/benchmarks/profiler/utils/plot.py @@ -14,6 +14,7 @@ # limitations under the License. import logging +from collections import defaultdict import matplotlib.pyplot as plt import numpy as np @@ -33,22 +34,27 @@ logger.addHandler(console_handler) -def plot_prefill_performance(prefill_results, target_ttft, output_dir): +def plot_prefill_performance(prefill_data, target_ttft, output_dir): """ - Plot prefill performance as a 2D scatter plot with GPU count annotations. + Plot prefill performance as a 2D scatter plot with GPU count and mapping annotations. Args: - prefill_results: tuple of (prefill_num_gpu, prefill_ttft, prefill_thpt_per_gpu) + prefill_data: PrefillProfileData instance containing profiling results target_ttft: target TTFT value for the vertical line output_dir: directory to save the plot """ - prefill_num_gpu, prefill_ttft, prefill_thpt_per_gpu = prefill_results plt.figure(figsize=(10, 6)) - plt.scatter(prefill_ttft, prefill_thpt_per_gpu, s=100) - for i, num_gpu in enumerate(prefill_num_gpu): + plt.scatter(prefill_data.ttft, prefill_data.thpt_per_gpu, s=100) + for i, num_gpu in enumerate(prefill_data.num_gpus): + label_suffix = ( + f" [{prefill_data.parallel_mapping_labels[i]}]" + if prefill_data.parallel_mapping_labels + and i < len(prefill_data.parallel_mapping_labels) + else "" + ) plt.annotate( - f"{num_gpu} GPU(s)", - (prefill_ttft[i], prefill_thpt_per_gpu[i]), + f"{num_gpu} GPU(s){label_suffix}", + (prefill_data.ttft[i], prefill_data.thpt_per_gpu[i]), xytext=(10, 0), textcoords="offset points", fontsize=10, @@ -70,19 +76,46 @@ def plot_prefill_performance(prefill_results, target_ttft, output_dir): plt.close() -def plot_decode_performance(decode_results, target_itl, output_dir): +def plot_decode_performance(decode_data, target_itl, output_dir): """ Plot decode performance with multiple GPU count lines. Args: - decode_results: list of tuples (num_gpu, itl_list, thpt_per_gpu_list) + decode_data: DecodeProfileData instance containing profiling results target_itl: target ITL value for the vertical line output_dir: directory to save the plot """ plt.figure(figsize=(10, 6)) - for num_gpu, itl_list, thpt_per_gpu_list in decode_results: - plt.plot(itl_list, thpt_per_gpu_list, label=f"{num_gpu} GPU(s)") + # Group data by (num_gpus, parallel_mapping_label) combination + grouped_data: defaultdict[tuple[int, str], dict[str, list[float]]] = defaultdict( + lambda: {"itl": [], "thpt": []} + ) + + for i in range(len(decode_data.num_gpus)): + num_gpu = decode_data.num_gpus[i] + label = ( + decode_data.parallel_mapping_labels[i] + if decode_data.parallel_mapping_labels + else "" + ) + key = (num_gpu, label) + grouped_data[key]["itl"].append(decode_data.itl[i]) + grouped_data[key]["thpt"].append(decode_data.thpt_per_gpu[i]) + + # Plot each group as a line + for (num_gpu, parallel_mapping_label), data in sorted(grouped_data.items()): + if parallel_mapping_label: + label = f"{num_gpu} GPU(s) [{parallel_mapping_label}]" + else: + label = f"{num_gpu} GPU(s)" + + # Sort by ITL for proper line plotting + sorted_pairs = sorted(zip(data["itl"], data["thpt"])) + itl_sorted = [x[0] for x in sorted_pairs] + thpt_sorted = [x[1] for x in sorted_pairs] + + plt.plot(itl_sorted, thpt_sorted, label=label, marker="o") plt.axvline( x=target_itl, color="r", linestyle="--", label=f"Target ITL: {target_itl} ms" @@ -253,18 +286,24 @@ def plot_decode_3d_surface( plt.close() -def plot_pd_joint_results(isl, osl, prefill_results, decode_results, output_dir): +def plot_pd_joint_results(isl, osl, prefill_data, decode_data, output_dir): + """ + Plot joint prefill and decode results showing cost per 1000 requests under different SLA. + + Args: + isl: input sequence length + osl: output sequence length + prefill_data: PrefillProfileData instance containing profiling results + decode_data: DecodeProfileData instance containing profiling results + output_dir: directory to save the plot + """ GPU_COST_PER_HOUR = 3.0 # $3/hour # compute pareto front for prefill - p_ttft, p_thpt = compute_pareto(prefill_results[1], prefill_results[2]) + p_ttft, p_thpt = compute_pareto(prefill_data.ttft, prefill_data.thpt_per_gpu) # compute pareto front for decode - _d_itl, _d_thpt = [], [] - for _d_result in decode_results: - _d_itl.extend(_d_result[1]) - _d_thpt.extend(_d_result[2]) - d_itl, d_thpt = compute_pareto(_d_itl, _d_thpt) + d_itl, d_thpt = compute_pareto(decode_data.itl, decode_data.thpt_per_gpu) # convert to cost per thousand requests p_ttft = np.array(p_ttft) diff --git a/benchmarks/profiler/utils/profile_cache.py b/benchmarks/profiler/utils/profile_cache.py deleted file mode 100644 index b9e0fc9fae..0000000000 --- a/benchmarks/profiler/utils/profile_cache.py +++ /dev/null @@ -1,138 +0,0 @@ -# SPDX-FileCopyrightText: Copyright (c) 2025 NVIDIA CORPORATION & AFFILIATES. All rights reserved. -# SPDX-License-Identifier: Apache-2.0 -# -# Licensed under the Apache License, Version 2.0 (the "License"); -# you may not use this file except in compliance with the License. -# You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, -# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -# See the License for the specific language governing permissions and -# limitations under the License. - -import glob -import json -import logging -import os -import re -from typing import List, Optional, Tuple - -logger = logging.getLogger(__name__) - - -def check_prefill_results_exist(output_dir: str, tp_size: int, isl: int) -> bool: - """Check if prefill results already exist for a given TP size.""" - work_dir = f"{output_dir}/prefill_tp{tp_size}" - result_file = f"{work_dir}/aiperf_isl{isl}/*/profile_export_aiperf.json" - - # Check if the work directory exists - if not os.path.exists(work_dir): - return False - - # Look for the aiperf result file - result_files = glob.glob(result_file) - if not result_files: - return False - - # Verify the result file has valid data - try: - with open(result_files[0], "r") as f: - data = json.load(f) - # Check if it has the required metrics - if "time_to_first_token" in data and "avg" in data["time_to_first_token"]: - logger.info( - f"Found existing prefill results for TP{tp_size} at {result_files[0]}" - ) - return True - except (json.JSONDecodeError, KeyError, FileNotFoundError): - pass - - return False - - -def check_decode_results_exist( - output_dir: str, tp_size: int, isl: int, osl: int -) -> bool: - """Check if decode results already exist for a given TP size.""" - work_dir = f"{output_dir}/decode_tp{tp_size}" - - # Check if the work directory exists - if not os.path.exists(work_dir): - return False - - # Look for at least one decode result file - result_pattern = ( - f"{work_dir}/aiperf_request*_isl{isl}_osl{osl}_n*/*/profile_export_aiperf.json" - ) - result_files = glob.glob(result_pattern) - - if not result_files: - return False - - # Verify at least one result file has valid data - try: - with open(result_files[0], "r") as f: - data = json.load(f) - # Check if it has the required metrics - if "inter_token_latency" in data and "avg" in data["inter_token_latency"]: - logger.info( - f"Found existing decode results for TP{tp_size} at {result_files[0]} (and {len(result_files)-1} others)" - ) - return True - except (json.JSONDecodeError, KeyError, FileNotFoundError): - pass - - return False - - -def load_existing_prefill_results( - output_dir: str, tp_size: int, isl: int -) -> Tuple[Optional[float], Optional[float]]: - """Load existing prefill results from disk.""" - work_dir = f"{output_dir}/prefill_tp{tp_size}" - result_file = f"{work_dir}/aiperf_isl{isl}/*/profile_export_aiperf.json" - - result_files = glob.glob(result_file) - if result_files: - try: - with open(result_files[0], "r") as f: - data = json.load(f) - ttft = data["time_to_first_token"]["avg"] - thpt_per_gpu = isl / ttft / tp_size * 1000 - return ttft, thpt_per_gpu - except (json.JSONDecodeError, KeyError, FileNotFoundError): - pass - return None, None - - -def load_existing_decode_results( - output_dir: str, tp_size: int, isl: int, osl: int -) -> List[Tuple[float, float, int]]: - """Load existing decode results from disk.""" - work_dir = f"{output_dir}/decode_tp{tp_size}" - - result_pattern = ( - f"{work_dir}/aiperf_request*_isl{isl}_osl{osl}_n*/*/profile_export_aiperf.json" - ) - result_files = glob.glob(result_pattern) - - decode_results = [] - for result_file in result_files: - try: - with open(result_file, "r") as f: - data = json.load(f) - itl = data["inter_token_latency"]["avg"] - thpt_per_gpu = data["output_token_throughput"]["avg"] / tp_size - - # Extract concurrency from filename - match = re.search(r"aiperf_request(\d+)_", result_file) - if match: - concurrency = int(match.group(1)) - decode_results.append((itl, thpt_per_gpu, concurrency)) - except (json.JSONDecodeError, KeyError, FileNotFoundError): - continue - - return decode_results diff --git a/benchmarks/profiler/utils/profiler_argparse.py b/benchmarks/profiler/utils/profiler_argparse.py index 5ae7b18bf1..b42d96880a 100644 --- a/benchmarks/profiler/utils/profiler_argparse.py +++ b/benchmarks/profiler/utils/profiler_argparse.py @@ -76,8 +76,6 @@ def create_profiler_parser() -> argparse.Namespace: max_num_gpus_per_engine: Int (maximum number of GPUs per engine, default: 0) num_gpus_per_node: Int (number of GPUs per node for MoE models - this will be the granularity when searching for the best TEP/DEP size, default: 0) sweep: - skip_existing_results: Boolean (skip TP sizes that already have results in the output directory, default: False) - force_rerun: Boolean (force re-running all tests even if results already exist (overrides --skip-existing-results), default: False) prefill_interpolation_granularity: Int (how many samples to benchmark to interpolate TTFT under different ISL, default: 16) decode_interpolation_granularity: Int (how many samples to benchmark to interpolate ITL under different active kv cache size and decode context length, default: 6) use_ai_configurator: Boolean (use ai-configurator to estimate benchmarking results instead of running actual deployment, default: False) @@ -158,26 +156,20 @@ def create_profiler_parser() -> argparse.Namespace: parser.add_argument( "--min-num-gpus-per-engine", type=int, - default=config.get("hardware", {}).get("min_num_gpus_per_engine", 1), + default=config.get("hardware", {}).get("min_num_gpus_per_engine", 0), help="minimum number of GPUs per engine", ) parser.add_argument( "--max-num-gpus-per-engine", type=int, - default=config.get("hardware", {}).get("max_num_gpus_per_engine", 8), + default=config.get("hardware", {}).get("max_num_gpus_per_engine", 0), help="maximum number of GPUs per engine", ) parser.add_argument( - "--skip-existing-results", - action="store_true", - default=config.get("sweep", {}).get("skip_existing_results", False), - help="Skip TP sizes that already have results in the output directory", - ) - parser.add_argument( - "--force-rerun", - action="store_true", - default=config.get("sweep", {}).get("force_rerun", False), - help="Force re-running all tests even if results already exist (overrides --skip-existing-results)", + "--num-gpus-per-node", + type=int, + default=config.get("hardware", {}).get("num_gpus_per_node", 0), + help="Number of GPUs per node for MoE models - this will be the granularity when searching for the best TEP/DEP size", ) parser.add_argument( "--isl", @@ -235,19 +227,6 @@ def create_profiler_parser() -> argparse.Namespace: default=config.get("sweep", {}).get("dry_run", False), help="Dry run the profile job", ) - parser.add_argument( - "--is-moe-model", - action="store_true", - dest="is_moe_model", - default=config.get("engine", {}).get("is_moe_model", False), - help="Enable MoE (Mixture of Experts) model support, use TEP for prefill and DEP for decode", - ) - parser.add_argument( - "--num-gpus-per-node", - type=int, - default=config.get("hardware", {}).get("num_gpus_per_node", 8), - help="Number of GPUs per node for MoE models - this will be the granularity when searching for the best TEP/DEP size", - ) parser.add_argument( "--enable-gpu-discovery", action="store_true", @@ -311,9 +290,5 @@ def create_profiler_parser() -> argparse.Namespace: if not args.model and not args.config: parser.error("--model or --config is required (provide at least one)") - # Run auto-generation if GPU discovery is enabled - # This will override any manually specified hardware parameters - if args.enable_gpu_discovery: - auto_generate_search_space(args) - + auto_generate_search_space(args) return args diff --git a/benchmarks/profiler/utils/search_space_autogen.py b/benchmarks/profiler/utils/search_space_autogen.py index dfe6fc7cd5..0869430044 100644 --- a/benchmarks/profiler/utils/search_space_autogen.py +++ b/benchmarks/profiler/utils/search_space_autogen.py @@ -9,7 +9,7 @@ import yaml from benchmarks.profiler.utils.config_modifiers import CONFIG_MODIFIERS -from benchmarks.profiler.utils.model_info import get_model_info +from benchmarks.profiler.utils.model_info import ModelInfo, get_model_info from deploy.utils.gpu_inventory import get_gpu_summary logger = logging.getLogger(__name__) @@ -23,7 +23,9 @@ logger.addHandler(console_handler) MODEL_GPU_MEM_FRAC_MAX = 0.9 -MOE_MODEL_MAX_NUM_GPUS = 32 + +# for MoE models, we sweep up to number of GPUs that can hold 8x the model weights +MOE_MODEL_MAX_NUM_GPU_FACTOR = 8 def auto_generate_search_space(args: argparse.Namespace) -> None: @@ -31,17 +33,16 @@ def auto_generate_search_space(args: argparse.Namespace) -> None: args.backend ] # args.backend is already validated in argparse - # first check if config file exists - if args.model: - if not args.config: - # modify config file from default config file - logger.info("DGD config file not provided, using default config file") - config = config_modifier.load_default_config() - - else: - with open(args.config, "r") as f: - config = yaml.safe_load(f) + # first get the config + if not args.config: + # modify config file from default config file + logger.info("DGD config file not provided, using default config file") + config = config_modifier.load_default_config() + else: + with open(args.config, "r") as f: + config = yaml.safe_load(f) + if args.model: logger.info(f"Updating model in DGD config file to {args.model}") config = config_modifier.update_model(config, args.model) if args.dgd_image: @@ -55,62 +56,84 @@ def auto_generate_search_space(args: argparse.Namespace) -> None: yaml.dump(config, f) args.config = config_fn - # now determine the search space - model_info = None - if args.model: - logger.info(f"Getting model info for {args.model}...") - model_info = get_model_info(args.model) - - num_experts_str = ( - f", num_experts={model_info['num_experts']}" - if model_info.get("num_experts") - else "" - ) - logger.info( - f"Model {args.model} has size {model_info['model_size']}, is_moe={model_info['is_moe']}, and max_context_length={model_info['max_context_length']}{num_experts_str}" - ) - args.is_moe_model = model_info["is_moe"] # type: ignore[assignment] - args.max_context_length = model_info["max_context_length"] # type: ignore[assignment] - - if ( - args.min_num_gpus_per_engine == 0 - or args.max_num_gpus_per_engine == 0 - or args.num_gpus_per_node == 0 - ): - if not args.model: - # TODO: get model info provided DGD config - error_msg = "No model provided, cannot auto-generate GPU search space. Please provide `--model` or GPU info" - logger.error(error_msg) - raise RuntimeError(error_msg) - - logger.info("Getting GPU info from k8s cluster...") - gpu_info = get_gpu_summary() - logger.info( - f"Cluster has {gpu_info['gpus_per_node']}x{gpu_info['model']} GPUs per node with {gpu_info['vram']} VRAM" - ) - - # model_info should be set by now (checked above), but mypy needs explicit verification - assert model_info is not None, "model_info must be set when model is provided" - - min_gpu = math.ceil( - model_info["model_size"] / MODEL_GPU_MEM_FRAC_MAX / gpu_info["vram"] # type: ignore[operator] - ) - max_gpu = ( - gpu_info["gpus_per_node"] # type: ignore[misc] - if not model_info["is_moe"] - else MOE_MODEL_MAX_NUM_GPUS - ) - if min_gpu > max_gpu: - error_msg = f"No valid GPU configuration found for model {args.model} on the cluster with {gpu_info['gpus_per_node']}x{gpu_info['model']} GPUs per node" - logger.error(error_msg) - raise RuntimeError(error_msg) - - logger.info( - f"Auto-generated search space for model {args.model} on the cluster with {gpu_info['gpus_per_node']}x{gpu_info['model']} GPUs per node: {min_gpu} to {max_gpu}" - ) - args.min_num_gpus_per_engine = min_gpu - args.max_num_gpus_per_engine = max_gpu - args.num_gpus_per_node = gpu_info["gpus_per_node"] # type: ignore[assignment] - args.num_experts = model_info.get("num_experts") # type: ignore[assignment] + # get model info and update args + model_info: ModelInfo | None = None + if not args.model: + # get the model name from config + args.model = config_modifier.get_model_name(config) + logger.info(f"Getting model info for {args.model}...") + model_info = get_model_info(args.model) + + num_experts_str = ( + f", num_experts={model_info.num_experts}" + if model_info.num_experts is not None + else "" + ) + logger.info( + f"Model {args.model} has size {model_info.model_size}, is_moe={model_info.is_moe}, and max_context_length={model_info.max_context_length}{num_experts_str}" + ) + args.model_info = model_info + # now determine the search space + if args.enable_gpu_discovery: + if ( + args.min_num_gpus_per_engine == 0 + or args.max_num_gpus_per_engine == 0 + or args.num_gpus_per_node == 0 + ): + if not args.model: + # TODO: get model info provided DGD config + error_msg = "No model provided, cannot auto-generate GPU search space. Please provide `--model` or GPU info" + logger.error(error_msg) + raise RuntimeError(error_msg) + + logger.info("Getting GPU info from k8s cluster...") + gpu_info = get_gpu_summary() + logger.info( + f"Cluster has {gpu_info['gpus_per_node']}x{gpu_info['model']} GPUs per node with {gpu_info['vram']} VRAM" + ) + + # model_info should be set by now (checked above), but mypy needs explicit verification + assert ( + model_info is not None + ), "model_info must be set when model is provided" + + vram_mib = int(gpu_info["vram"]) # type: ignore[call-overload] + gpus_per_node = int(gpu_info["gpus_per_node"]) # type: ignore[call-overload] + + min_gpu = math.ceil( + model_info.model_size / MODEL_GPU_MEM_FRAC_MAX / vram_mib + ) + if not model_info.is_moe: + max_gpu = gpus_per_node + else: + max_gpu = max(min_gpu * MOE_MODEL_MAX_NUM_GPU_FACTOR, gpus_per_node) + if min_gpu > max_gpu: + error_msg = f"No valid GPU configuration found for model {args.model} on the cluster with {gpu_info['gpus_per_node']}x{gpu_info['model']} GPUs per node" + logger.error(error_msg) + raise RuntimeError(error_msg) + + logger.info( + f"Auto-generated search space for model {args.model} on the cluster with {gpu_info['gpus_per_node']}x{gpu_info['model']} GPUs per node: {min_gpu} to {max_gpu}" + ) + args.min_num_gpus_per_engine = min_gpu + args.max_num_gpus_per_engine = max_gpu + args.num_gpus_per_node = gpus_per_node # type: ignore[assignment] + else: + # use default values for GPUs + if args.min_num_gpus_per_engine == 0: + logger.warning( + "GPU discover is disabled and min_num_gpus_per_engine is not specified, setting to 1" + ) + args.min_num_gpus_per_engine = 1 + if args.max_num_gpus_per_engine == 0: + logger.warning( + "GPU discover is disabled and max_num_gpus_per_engine is not specified, setting to 4" + ) + args.max_num_gpus_per_engine = 4 + if args.num_gpus_per_node == 0: + logger.warning( + "GPU discover is disabled and num_gpus_per_node is not specified, setting to 8" + ) + args.num_gpus_per_node = 8 return diff --git a/deploy/cloud/operator/config/samples/nvidia.com_v1alpha1_dynamographdeploymentrequest.yaml b/deploy/cloud/operator/config/samples/nvidia.com_v1alpha1_dynamographdeploymentrequest.yaml index 4c0e2982d0..a7ffc60023 100644 --- a/deploy/cloud/operator/config/samples/nvidia.com_v1alpha1_dynamographdeploymentrequest.yaml +++ b/deploy/cloud/operator/config/samples/nvidia.com_v1alpha1_dynamographdeploymentrequest.yaml @@ -37,8 +37,7 @@ spec: # Engine configuration engine: - max_context_length: 16384 # Maximum context length supported by the model - is_moe_model: false # Enable MoE model support (uses TEP/DEP instead of TP) + max_context_length: 16384 # will override max context length of the model if provided # Hardware configuration hardware: diff --git a/deploy/cloud/operator/internal/controller/dynamographdeploymentrequest_controller_test.go b/deploy/cloud/operator/internal/controller/dynamographdeploymentrequest_controller_test.go index 1440b24488..53c1fcd8cc 100644 --- a/deploy/cloud/operator/internal/controller/dynamographdeploymentrequest_controller_test.go +++ b/deploy/cloud/operator/internal/controller/dynamographdeploymentrequest_controller_test.go @@ -852,7 +852,7 @@ var _ = Describe("DGDR Helper Functions", func() { ProfilingConfig: nvidiacomv1alpha1.ProfilingConfigSpec{ Config: createTestConfig(map[string]interface{}{ "sweep": map[string]interface{}{ - "force_rerun": true, + "prefill_interpolation_granularity": 16, }, }), }, diff --git a/docs/planner/sla_planner_quickstart.md b/docs/planner/sla_planner_quickstart.md index e504a16758..13a564e9f5 100644 --- a/docs/planner/sla_planner_quickstart.md +++ b/docs/planner/sla_planner_quickstart.md @@ -315,7 +315,8 @@ profilingConfig: # Profiling sweep settings (optional) sweep: - force_rerun: false + prefill_interpolation_granularity: 16 # Number of samples for prefill ISL sweep + decode_interpolation_granularity: 6 # Number of samples for decode sweep ``` > **Note**: `engine.config` is a **file path** to a DGD YAML file, not inline configuration. Use ConfigMapRef (recommended) or leave it unset to auto-generate. diff --git a/tests/profiler/test_profile_sla_aiconfigurator.py b/tests/profiler/test_profile_sla_aiconfigurator.py index 769140a910..14b624e272 100644 --- a/tests/profiler/test_profile_sla_aiconfigurator.py +++ b/tests/profiler/test_profile_sla_aiconfigurator.py @@ -18,6 +18,19 @@ sys.path.insert(0, str(project_root)) from benchmarks.profiler.profile_sla import run_profile # noqa: E402 +from benchmarks.profiler.utils.model_info import ModelInfo # noqa: E402 + + +# Override the logger fixture from conftest.py to prevent directory creation +@pytest.fixture(autouse=True) +def logger(request): + """Override the logger fixture to prevent test directory creation. + + This replaces the logger fixture from tests/conftest.py that creates + directories named after each test. + """ + # Simply do nothing - no directories created, no file handlers added + yield class TestProfileSlaAiconfigurator: @@ -41,11 +54,9 @@ def __init__(self): self.osl = 500 self.ttft = 50 self.itl = 10 - self.max_context_length = 16384 self.prefill_interpolation_granularity = 16 self.decode_interpolation_granularity = 6 self.service_name = "" - self.is_moe_model = False self.dry_run = False self.use_ai_configurator = True self.aic_system = "h200_sxm" @@ -54,6 +65,13 @@ def __init__(self): self.aic_backend_version = "0.20.0" self.num_gpus_per_node = 8 self.deploy_after_profile = False + # Provide minimal model_info to avoid HF queries + self.model_info = ModelInfo( + model_size=16384.0, + architecture="TestArchitecture", + is_moe=False, + max_context_length=16384, + ) return Args() diff --git a/tests/profiler/test_profile_sla_dryrun.py b/tests/profiler/test_profile_sla_dryrun.py index eaf0a3c9de..f0b77853c8 100644 --- a/tests/profiler/test_profile_sla_dryrun.py +++ b/tests/profiler/test_profile_sla_dryrun.py @@ -19,6 +19,7 @@ sys.path.insert(0, str(project_root)) from benchmarks.profiler.profile_sla import run_profile # noqa: E402 +from benchmarks.profiler.utils.model_info import ModelInfo # noqa: E402 from benchmarks.profiler.utils.search_space_autogen import ( # noqa: E402 auto_generate_search_space, ) @@ -63,7 +64,6 @@ def __init__(self): self.prefill_interpolation_granularity = 16 self.decode_interpolation_granularity = 6 self.service_name = "" - self.is_moe_model = False self.dry_run = True self.use_ai_configurator = False self.aic_system = None @@ -72,6 +72,13 @@ def __init__(self): self.aic_backend_version = None self.num_gpus_per_node = 8 self.deploy_after_profile = False + # Provide minimal model_info to avoid HF queries + self.model_info = ModelInfo( + model_size=16384.0, + architecture="TestArchitecture", + is_moe=False, + max_context_length=self.max_context_length, + ) return Args() @@ -99,7 +106,6 @@ def __init__(self): self.prefill_interpolation_granularity = 16 self.decode_interpolation_granularity = 6 self.service_name = "" - self.is_moe_model = False self.dry_run = True self.use_ai_configurator = False self.aic_system = None @@ -108,6 +114,12 @@ def __init__(self): self.aic_backend_version = None self.num_gpus_per_node = 8 self.deploy_after_profile = False + self.model_info = ModelInfo( + model_size=16384.0, + architecture="TestArchitecture", + is_moe=False, + max_context_length=self.max_context_length, + ) return Args() @@ -149,7 +161,6 @@ def __init__(self): self.prefill_interpolation_granularity = 16 self.decode_interpolation_granularity = 6 self.service_name = "" - self.is_moe_model = False self.dry_run = True self.use_ai_configurator = False self.aic_system = None @@ -158,6 +169,12 @@ def __init__(self): self.aic_backend_version = None self.num_gpus_per_node = 8 self.deploy_after_profile = False + self.model_info = ModelInfo( + model_size=16384.0, + architecture="TestArchitecture", + is_moe=False, + max_context_length=self.max_context_length, + ) return Args() @@ -192,7 +209,6 @@ def __init__(self): self.prefill_interpolation_granularity = 16 self.decode_interpolation_granularity = 6 self.service_name = "" - self.is_moe_model = True self.dry_run = True self.use_ai_configurator = False self.aic_system = None @@ -201,6 +217,13 @@ def __init__(self): self.aic_backend_version = None self.num_gpus_per_node = 8 self.deploy_after_profile = False + self.model_info = ModelInfo( + model_size=65536.0, + architecture="TestMoEArchitecture", + is_moe=True, + max_context_length=self.max_context_length, + num_experts=16, + ) return Args() @@ -224,11 +247,12 @@ def mock_h100_gpu_info(self): @pytest.fixture def mock_model_info(self): """Mock model info for DeepSeek-R1-Distill-Llama-8B.""" - return { - "model_size": 16384, # 16GB model in MiB - "is_moe": False, - "max_context_length": 16384, # 16K tokens - } + return ModelInfo( + model_size=16384.0, # 16GB model in MiB + architecture="LlamaForCausalLM", + is_moe=False, + max_context_length=16384, + ) @pytest.fixture def vllm_args_with_model_autogen(self): @@ -242,12 +266,9 @@ def __init__(self): self.namespace = "test-namespace" self.model = "deepseek-ai/DeepSeek-R1-Distill-Llama-8B" # Specify model for autogen self.dgd_image = "" - self.min_num_gpus_per_engine = ( - 1 # Will be overridden by auto-generation - ) - self.max_num_gpus_per_engine = ( - 8 # Will be overridden by auto-generation - ) + # Set to 0 to trigger auto-generation path + self.min_num_gpus_per_engine = 0 + self.max_num_gpus_per_engine = 0 self.skip_existing_results = False self.force_rerun = False self.isl = 3000 @@ -258,15 +279,16 @@ def __init__(self): self.prefill_interpolation_granularity = 16 self.decode_interpolation_granularity = 6 self.service_name = "" - self.is_moe_model = False self.dry_run = True self.use_ai_configurator = False self.aic_system = None self.aic_model_name = None self.aic_backend = "" self.aic_backend_version = None - self.num_gpus_per_node = 8 # Will be overridden by auto-generation + # Set to 0 to trigger auto-generation path + self.num_gpus_per_node = 0 self.deploy_after_profile = False + self.enable_gpu_discovery = True return Args() @@ -308,12 +330,8 @@ def __init__(self): self.namespace = "test-namespace" self.model = "deepseek-ai/DeepSeek-R1-Distill-Llama-8B" # Specify model for autogen self.dgd_image = "" - self.min_num_gpus_per_engine = ( - 1 # Will be overridden by auto-generation - ) - self.max_num_gpus_per_engine = ( - 8 # Will be overridden by auto-generation - ) + self.min_num_gpus_per_engine = 0 + self.max_num_gpus_per_engine = 0 self.skip_existing_results = False self.force_rerun = False self.isl = 3000 @@ -324,15 +342,15 @@ def __init__(self): self.prefill_interpolation_granularity = 16 self.decode_interpolation_granularity = 6 self.service_name = "" - self.is_moe_model = False self.dry_run = True self.use_ai_configurator = False self.aic_system = None self.aic_model_name = None self.aic_backend = "" self.aic_backend_version = None - self.num_gpus_per_node = 8 # Will be overridden by auto-generation + self.num_gpus_per_node = 0 self.deploy_after_profile = False + self.enable_gpu_discovery = True return Args() @@ -374,12 +392,8 @@ def __init__(self): self.namespace = "test-namespace" self.model = "deepseek-ai/DeepSeek-R1-Distill-Llama-8B" # Specify model for autogen self.dgd_image = "" - self.min_num_gpus_per_engine = ( - 1 # Will be overridden by auto-generation - ) - self.max_num_gpus_per_engine = ( - 8 # Will be overridden by auto-generation - ) + self.min_num_gpus_per_engine = 0 + self.max_num_gpus_per_engine = 0 self.skip_existing_results = False self.force_rerun = False self.isl = 3000 @@ -390,15 +404,15 @@ def __init__(self): self.prefill_interpolation_granularity = 16 self.decode_interpolation_granularity = 6 self.service_name = "" - self.is_moe_model = False self.dry_run = True self.use_ai_configurator = False self.aic_system = None self.aic_model_name = None self.aic_backend = "" self.aic_backend_version = None - self.num_gpus_per_node = 8 # Will be overridden by auto-generation + self.num_gpus_per_node = 0 self.deploy_after_profile = False + self.enable_gpu_discovery = True return Args()