diff --git a/tests/integration-tests/configs/develop.yaml b/tests/integration-tests/configs/develop.yaml index 4094547047..590ade4bd1 100644 --- a/tests/integration-tests/configs/develop.yaml +++ b/tests/integration-tests/configs/develop.yaml @@ -857,6 +857,10 @@ test-suites: instances: [ "c5n.18xlarge" ] oss: [{{ NO_ROCKY_OS_X86_0 }}] # ParallelCluster does not release official Rocky images. Skip the test. schedulers: [ "slurm" ] + - regions: [ {{ c5_xlarge_CAPACITY_RESERVATION_510_INSTANCES_2_HOURS_YESPG_NO_ROCKY_OS_X86_1 }} ] + instances: [ "c5.xlarge" ] + oss: [ {{ NO_ROCKY_OS_X86_1 }} ] # ParallelCluster does not release official Rocky images. Skip the test. + schedulers: [ "slurm" ] test_starccm.py::test_starccm: dimensions: - regions: [ {{ c5n_18xlarge_CAPACITY_RESERVATION_35_INSTANCES_2_HOURS_YESPG_NO_ROCKY_OS_X86_0 }} ] diff --git a/tests/integration-tests/tests/common/osu_common.py b/tests/integration-tests/tests/common/osu_common.py index 03f7a003fd..f982f7c815 100644 --- a/tests/integration-tests/tests/common/osu_common.py +++ b/tests/integration-tests/tests/common/osu_common.py @@ -39,7 +39,7 @@ def compile_osu(mpi_variant, remote_command_executor): ) -def run_individual_osu_benchmark( +def run_individual_osu_benchmark( # noqa C901 mpi_version, benchmark_group, benchmark_name, @@ -95,15 +95,36 @@ def run_individual_osu_benchmark( num_of_processes_per_node=slots_per_instance, network_interfaces_count=network_interfaces_count, ) - if partition: - result = scheduler_commands.submit_script( - str(submission_script), slots=slots, partition=partition, nodes=num_instances - ) - else: - result = scheduler_commands.submit_script(str(submission_script), slots=slots, nodes=num_instances) - job_id = scheduler_commands.assert_job_submitted(result.stdout) - scheduler_commands.wait_job_completed(job_id, timeout=timeout) - scheduler_commands.assert_job_succeeded(job_id) + + def submit_job(): + if partition: + result = scheduler_commands.submit_script( + str(submission_script), slots=slots, partition=partition, nodes=num_instances + ) + else: + result = scheduler_commands.submit_script(str(submission_script), slots=slots, nodes=num_instances) + return scheduler_commands.assert_job_submitted(result.stdout) + + job_id = submit_job() + for attempt in range(2): + try: + scheduler_commands.wait_job_completed(job_id, timeout=timeout) + except Exception: + if attempt == 0: + logging.warning(f"wait_job_completed failed for job {job_id}, canceling and retrying") + scheduler_commands.cancel_job(job_id) + job_id = submit_job() + continue + raise + try: + scheduler_commands.assert_job_succeeded(job_id) + break + except Exception: + if attempt == 0: + logging.warning(f"assert_job_succeeded failed for job {job_id}, retrying") + job_id = submit_job() + continue + raise output = remote_command_executor.run_remote_command(f"cat /shared/{benchmark_name}.out").stdout return job_id, output diff --git a/tests/integration-tests/tests/performance_tests/README.md b/tests/integration-tests/tests/performance_tests/README.md index 0c03ec5a8f..ec6d600eda 100644 --- a/tests/integration-tests/tests/performance_tests/README.md +++ b/tests/integration-tests/tests/performance_tests/README.md @@ -1,8 +1,145 @@ -# Performance Test +# Performance Tests -Performance tests allow you to compare the performance of a given cluster configuration with respect to a pre-defined baseline. +## Available Test Suites -The outcomes of a performance test are: +| Test | Purpose | Instance Requirements | +|------|---------|----------------------| +| `test_osu.py` | MPI latency and bandwidth benchmarks | EFA-enabled (c5n.18xlarge, p5, p6) | +| `test_starccm.py` | Real CFD workload performance | EFA-enabled (hpc6a.48xlarge) | +| `test_scaling.py` | Cluster scale-up/down timing | Any instance type | +| `test_startup_time.py` | Compute node bootstrap time | Any instance type | +| `test_simple.py` | Job scheduling metrics | Any instance type | +| `test_openfoam.py` | OpenFOAM CFD workload | EFA-enabled | + +--- + +## Choosing the Right Test Configuration + +### Instance Type Selection + +| Goal | Recommended Instance | Why | +|------|----------------------------|-----| +| MPI latency/bandwidth baseline | EFA-enabled (c5n.18xlarge) | Measures true network performance | +| System-level jitter detection | Any (e.g. c5.xlarge) | Issue is CPU-bound, not network-bound | +| Scaling/bootstrap tests | Any (e.g. c5.large) | Network performance not relevant | + +### Node Count Selection + +| Benchmark Type | Recommended Nodes | Rationale | +|----------------|-------------------|-----------| +| pt2pt (osu_latency, osu_bibw) | 2 | Only 2 ranks communicate| +| Collective (osu_allreduce, osu_barrier) | 200-500 | More nodes increase probability of detecting issues | +| StarCCM/OpenFOAM | 8-32 | Matches typical customer usage; diminishing returns beyond | +| Scaling tests | 1000+ | Tests scheduler and infrastructure at scale | + +### Placement Group + +| Scenario | Use Placement Group? | Why | +|----------|---------------------|-----| +| MPI performance baseline | Yes | Reduces network variance, cleaner signal | +| System jitter detection | Yes | Lower baseline makes jitter more visible | +| Scaling tests | Optional | May hit capacity limits at large scale | + +--- + +## OSU Benchmarks Deep Dive + +### Benchmark Categories + +**Point-to-Point (pt2pt)** +- `osu_latency`: Measures round-trip latency between 2 ranks +- `osu_bibw`: Measures bidirectional bandwidth between 2 ranks +- Always uses exactly 2 nodes regardless of cluster size + +**Collective** +- `osu_allreduce`: All ranks contribute and receive result +- `osu_allgather`: All ranks gather data from all other ranks +- `osu_barrier`: Pure synchronization (most sensitive to jitter) +- `osu_bcast`: One-to-all broadcast +- `osu_alltoall`: All-to-all personalized exchange +- Performance depends on the **slowest node** - scales with node count + +### When to Use Each + +| Issue to Detect | Best Benchmark | Node Count | +|-----------------|----------------|------------| +| EFA driver regression | osu_latency, osu_bibw | 2 | +| Network baseline | osu_latency | 2 | +| System daemon interference | osu_allreduce, osu_barrier | 200-500 | +| MPI library scaling bugs | osu_allreduce | 100+ | +| Multi-NIC bandwidth | osu_mbw_mr | 2 | + +--- + +## Detecting System-Level Performance Issues + +Some performance regressions are caused by system-level interference (daemons, background processes) rather than network issues. + +### Characteristics of System-Level Issues + +- Affects collective operations more than pt2pt +- More visible at scale (more nodes = higher probability of hitting the issue) +- Causes latency spikes/jitter rather than sustained degradation +- May be periodic (e.g., processes running on timers) + +### Recommended Test Strategy + +1. **Use collective benchmarks** (osu_allreduce, osu_barrier) - they're bottlenecked by the slowest node +2. **Scale to 200-500 nodes** - increases probability of detection +3. **Run multiple iterations** - captures variance +4. **Measure percentiles (p95, p99)** - not just averages +5. **Use placement group** - reduces network noise, makes system jitter more visible + +### Example: Detecting Periodic Daemon Impact + +If a daemon runs every 60 seconds on each node, and each time it consumes 1 second: +- With 2 nodes: ~3% chance of hitting it during a benchmark +- With 100 nodes: ~81% chance +- With 500 nodes: ~99.8% chance + +--- + +## StarCCM and Real Workload Tests + +### When StarCCM is Appropriate + +| Use Case | Appropriate? | +|----------|--------------| +| Validating real HPC performance | Yes | +| Detecting network regressions | Yes | +| Detecting system jitter | No (metric too coarse) | + +### Scaling Considerations + +Current baselines (8/16/32 nodes) are sufficient for most regression detection. Scaling to 100+ nodes makes it harder to maintain stable baselines. + +--- + +## NCCL Tests + +NCCL tests measure GPU-to-GPU communication performance. + +### When NCCL Tests Are Useful + +| Issue Type | NCCL Useful? | +|------------|--------------| +| EFA driver regression | Yes | +| NCCL library bugs | Yes | +| GPU driver issues | Yes | +| System daemon interference | No (GPU ops are async from CPU) | + +### Current Configuration + +- Runs on 2 GPU nodes (p4d, p5, p6) +- Measures `all_reduce_perf` bandwidth +- Validates multi-NIC EFA configuration + + +--- + +## Job Scheduling Metrics + +The outcomes of a job time statistics: 1. statistics from the observed metrics 2. box-plots comparing the candidate configuration under tests with respect to the baseline 3. test failure if the candidate configuration under test diff --git a/tests/integration-tests/tests/performance_tests/common.py b/tests/integration-tests/tests/performance_tests/common.py index 3d3b447034..3441d0f940 100644 --- a/tests/integration-tests/tests/performance_tests/common.py +++ b/tests/integration-tests/tests/performance_tests/common.py @@ -255,7 +255,7 @@ def _log_output_performance_difference(node, performance_degradation, observed_v ) -def push_result_to_dynamodb(name, result, instance, os, mpi_variation=None): +def push_result_to_dynamodb(name, result, instance, os, mpi_variation=None, num_instances=None): reporting_region = METADATA_DEFAULT_REGION logging.info(f"Metadata reporting region {reporting_region}") # Create the metadata table in case it doesn't exist @@ -275,6 +275,7 @@ def push_result_to_dynamodb(name, result, instance, os, mpi_variation=None): "result": str(result), "pcluster_version": f"v{get_installed_parallelcluster_version()}", "mpi_variation": str(mpi_variation), + "num_instances": num_instances, } # Put item in the table diff --git a/tests/integration-tests/tests/performance_tests/test_osu.py b/tests/integration-tests/tests/performance_tests/test_osu.py index d7d712f917..a4cb198883 100644 --- a/tests/integration-tests/tests/performance_tests/test_osu.py +++ b/tests/integration-tests/tests/performance_tests/test_osu.py @@ -9,21 +9,19 @@ # or in the "LICENSE.txt" file accompanying this file. # This file is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, express or implied. # See the License for the specific language governing permissions and limitations under the License. -import json import logging import re -import boto3 import pytest from assertpy import assert_that from remote_command_executor import RemoteCommandExecutor +from utils import get_instance_info from tests.common.assertions import assert_no_errors_in_logs from tests.common.osu_common import run_individual_osu_benchmark from tests.common.utils import ( fetch_instance_slots, get_capacity_reservation_id, - get_installed_parallelcluster_version, run_system_analyzer, write_file, ) @@ -34,12 +32,10 @@ @pytest.mark.usefixtures("serial_execution_by_instance") -@pytest.mark.parametrize("in_place_update_on_fleet_enabled", ["true", "false"]) def test_osu( os, region, scheduler, - in_place_update_on_fleet_enabled, instance, pcluster_config_reader, clusters_factory, @@ -50,25 +46,21 @@ def test_osu( scheduler_commands_factory, request, ): - if in_place_update_on_fleet_enabled == "true": - message = "Skipping the test as we want to compare performance when cfn-hup is disabled" - logging.warn(message) - pytest.skip(message) - - if instance not in OSU_BENCHMARKS_INSTANCES: - raise Exception( - f"OSU benchmarks can't be run on instance {instance}. " - f"Only these instances are supported: {OSU_BENCHMARKS_INSTANCES}" - ) - - max_queue_size = 32 + instance_info = get_instance_info(instance) + instance_memory = instance_info["MemoryInfo"]["SizeInMiB"] + instance_efa_supported = instance_info["NetworkInfo"]["EfaSupported"] + if instance_memory <= 16384: + # For smaller instance types, run a large cluster. The head node needs to be large to handle the cluster. + max_queue_size = 500 + head_node_instance_type = "c5n.18xlarge" + else: + # For larger instance types, run a small cluster. The head node uses the same instance type as compute nodes. + max_queue_size = 32 + head_node_instance_type = instance capacity_type = "ONDEMAND" capacity_reservation_id = None placement_group_enabled = True - chef_attributes_dict = {"cluster": {"in_place_update_on_fleet_enabled": in_place_update_on_fleet_enabled}} - extra_chef_attributes = json.dumps(chef_attributes_dict) - if instance in ["p6-b200.48xlarge", "p5en.48xlarge"]: max_queue_size = 2 capacity_type = "CAPACITY_BLOCK" @@ -84,11 +76,12 @@ def test_osu( slots_per_instance = fetch_instance_slots(region, instance, multithreading_disabled=True) cluster_config = pcluster_config_reader( + head_node_instance_type=head_node_instance_type, + instance_efa_supported=instance_efa_supported, max_queue_size=max_queue_size, capacity_type=capacity_type, capacity_reservation_id=capacity_reservation_id, placement_group_enabled=placement_group_enabled, - extra_chef_attributes=extra_chef_attributes, ) cluster = clusters_factory(cluster_config) remote_command_executor = RemoteCommandExecutor(cluster) @@ -102,20 +95,22 @@ def test_osu( # Run OSU benchmarks in efa-enabled queue. for mpi_version in mpi_variants: - benchmark_failures.extend( - _test_osu_benchmarks_pt2pt( - mpi_version, - remote_command_executor, - scheduler_commands, - test_datadir, - output_dir, - os, - instance, - network_interfaces_count, - slots_per_instance, - partition="efa-enabled", + if max_queue_size < 40: + # pt2pt benchmarks only make sense when the number of nodes are small + benchmark_failures.extend( + _test_osu_benchmarks_pt2pt( + mpi_version, + remote_command_executor, + scheduler_commands, + test_datadir, + output_dir, + os, + instance, + network_interfaces_count, + slots_per_instance, + partition="efa-enabled", + ) ) - ) benchmark_failures.extend( _test_osu_benchmarks_collective( mpi_version, @@ -183,7 +178,7 @@ def _test_osu_benchmarks_pt2pt( test_datadir, ) failures = _check_osu_benchmarks_results( - test_datadir, output_dir, os, instance, mpi_version, benchmark_name, output + test_datadir, output_dir, os, instance, mpi_version, benchmark_name, num_instances, output ) if failures > accepted_number_of_failures: failed_benchmarks.append(f"{mpi_version}-{benchmark_name}") @@ -209,7 +204,12 @@ def _test_osu_benchmarks_collective( failed_benchmarks = [] benchmark_group = "collective" - for benchmark_name in ["osu_allgather", "osu_bcast", "osu_allreduce", "osu_alltoall"]: + benchmark_names = ["osu_allgather", "osu_bcast", "osu_allreduce", "osu_barrier"] + if num_instances < 40: + # All to all benchmark has time complexity of O(n^2) where n is the number of instances. + # We run it for small clusters. + benchmark_names.append("osu_alltoall") + for benchmark_name in benchmark_names: _, output = run_individual_osu_benchmark( mpi_version, benchmark_group, @@ -221,10 +221,10 @@ def _test_osu_benchmarks_collective( slots_per_instance, network_interfaces_count, test_datadir, - timeout=24, + timeout=24 + num_instances * 0.1, ) failures = _check_osu_benchmarks_results( - test_datadir, output_dir, os, instance, mpi_version, benchmark_name, output + test_datadir, output_dir, os, instance, mpi_version, benchmark_name, num_instances, output ) if failures > accepted_number_of_failures: failed_benchmarks.append(f"{mpi_version}-{benchmark_name}") @@ -303,7 +303,9 @@ def _test_osu_benchmarks_multiple_bandwidth( assert_that(float(max_bandwidth)).is_greater_than(expected_bandwidth) -def _check_osu_benchmarks_results(test_datadir, output_dir, os, instance, mpi_version, benchmark_name, output): +def _check_osu_benchmarks_results( + test_datadir, output_dir, os, instance, mpi_version, benchmark_name, num_instances, output +): logging.info(output) write_file( dirname=f"{output_dir}/osu-results", @@ -312,71 +314,55 @@ def _check_osu_benchmarks_results(test_datadir, output_dir, os, instance, mpi_ve ) # Check avg latency for all packet sizes failures = 0 - metric_data = [] - metric_namespace = "ParallelCluster/test_efa" evaluation_output = "" - result = re.findall(r"(\d+)\s+(\d+)\.", output) - push_result_to_dynamodb(f"OSU_{benchmark_name}", result, instance, os, mpi_version) - for packet_size, value in result: - with open( - str(test_datadir / "osu_benchmarks" / "results" / os / instance / mpi_version / benchmark_name), - encoding="utf-8", - ) as result: - previous_result_match = re.search(rf"{packet_size}\s+(\d+)\.", result.read()) - previous_result = previous_result_match.group(1) if previous_result_match else None - - if previous_result is None: - logging.warning(f"Previous result for {benchmark_name} with packet size {packet_size} not found") - continue - - if benchmark_name == "osu_bibw": - # Invert logic because osu_bibw is in MB/s - tolerated_value = float(previous_result) - (float(previous_result) * 0.2) - is_failure = int(value) < tolerated_value - else: - multiplier = 0.3 if benchmark_name == "osu_latency" else 0.2 - tolerated_value = float(previous_result) + max(float(previous_result) * multiplier, 10) - - is_failure = int(value) > tolerated_value - - percentage_diff = (float(value) - float(tolerated_value)) / float(tolerated_value) * 100 - - outcome = "DEGRADATION" if is_failure else "IMPROVEMENT" - - message = ( - f"{outcome} : {mpi_version} - {benchmark_name} - packet size {packet_size}: " - f"tolerated: {tolerated_value}, current: {value}, percentage_diff: {percentage_diff}%" - ) - - evaluation_output += f"\n{message}" - - dimensions = { - "PclusterVersion": get_installed_parallelcluster_version(), - "MpiVariant": mpi_version, - "Instance": instance, - "OsuBenchmarkName": benchmark_name, - "PacketSize": packet_size, - "OperatingSystem": os, - } - metric_data.append( - { - "MetricName": "Latency", - "Dimensions": [{"Name": name, "Value": str(value)} for name, value in dimensions.items()], - "Value": int(value), - "Unit": "Microseconds", - } - ) - - if is_failure: - failures = failures + 1 - logging.error(message) - else: - logging.info(message) - write_file( - dirname=f"{output_dir}/osu-results", - filename=f"{os}-{instance}-{mpi_version}-{benchmark_name}-evaluation.out", - content=evaluation_output, - ) - boto3.client("cloudwatch").put_metric_data(Namespace=metric_namespace, MetricData=metric_data) + if benchmark_name == "osu_barrier": + # osu_barrier outputs only a single latency value without packet size + match = re.search(r"^\s+(\d+\.\d+)\s*$", output, re.MULTILINE) + result = match.group(1) + else: + result = re.findall(r"(\d+)\s+(\d+)\.", output) + push_result_to_dynamodb(f"OSU_{benchmark_name}", result, instance, os, mpi_version, num_instances) + baseline_file_path = test_datadir / "osu_benchmarks" / "results" / os / instance / mpi_version / benchmark_name + if baseline_file_path.exists(): + for packet_size, value in result: + with open(str(baseline_file_path), encoding="utf-8") as result: + previous_result_match = re.search(rf"{packet_size}\s+(\d+)\.", result.read()) + previous_result = previous_result_match.group(1) if previous_result_match else None + + if previous_result is None: + logging.warning(f"Previous result for {benchmark_name} with packet size {packet_size} not found") + continue + + if benchmark_name == "osu_bibw": + # Invert logic because osu_bibw is in MB/s + tolerated_value = float(previous_result) - (float(previous_result) * 0.2) + is_failure = int(value) < tolerated_value + else: + multiplier = 0.3 if benchmark_name == "osu_latency" else 0.2 + tolerated_value = float(previous_result) + max(float(previous_result) * multiplier, 10) + + is_failure = int(value) > tolerated_value + + percentage_diff = (float(value) - float(tolerated_value)) / float(tolerated_value) * 100 + + outcome = "DEGRADATION" if is_failure else "IMPROVEMENT" + + message = ( + f"{outcome} : {mpi_version} - {benchmark_name} - packet size {packet_size}: " + f"tolerated: {tolerated_value}, current: {value}, percentage_diff: {percentage_diff}%" + ) + + evaluation_output += f"\n{message}" + + if is_failure: + failures = failures + 1 + logging.error(message) + else: + logging.info(message) + write_file( + dirname=f"{output_dir}/osu-results", + filename=f"{os}-{instance}-{mpi_version}-{benchmark_name}-evaluation.out", + content=evaluation_output, + ) return failures diff --git a/tests/integration-tests/tests/performance_tests/test_osu/test_osu/pcluster.config.yaml b/tests/integration-tests/tests/performance_tests/test_osu/test_osu/pcluster.config.yaml index 511630d55b..639cdfa7aa 100644 --- a/tests/integration-tests/tests/performance_tests/test_osu/test_osu/pcluster.config.yaml +++ b/tests/integration-tests/tests/performance_tests/test_osu/test_osu/pcluster.config.yaml @@ -1,7 +1,7 @@ Image: Os: {{ os }} HeadNode: - InstanceType: {{ instance }} + InstanceType: {{ head_node_instance_type }} Networking: SubnetId: {{ public_subnet_id }} Ssh: @@ -32,12 +32,11 @@ Scheduling: MaxCount: {{ max_queue_size }} MinCount: {{ max_queue_size }} DisableSimultaneousMultithreading: true + {% if instance_efa_supported %} Efa: Enabled: true + {% endif %} SharedStorage: - MountDir: /shared Name: name1 StorageType: Ebs -DevSettings: - Cookbook: - ExtraChefAttributes: '{{ extra_chef_attributes }}'