diff --git a/tests/integration/defs/perf/disagg/README.md b/tests/integration/defs/perf/disagg/README.md index 28ba839c6e7..5921900b707 100644 --- a/tests/integration/defs/perf/disagg/README.md +++ b/tests/integration/defs/perf/disagg/README.md @@ -132,6 +132,141 @@ poetry run pytest --disagg test_disagg.py -s -vv -m accuracy poetry run pytest --disagg test_disagg.py -s -vv -k "deepseek-r1-fp4_1k1k" ``` +## Batch Job Submission + +The framework supports automatic batch job submission to maximize parallelism in SLURM cluster environments. Instead of submitting jobs one-by-one, it groups test cases into batches and submits entire batches when needed. + +### Quick Start + +**Default batch size (5 jobs per batch):** +```bash +# Run all tests with default batching +poetry run pytest --disagg test_disagg.py -s -vv + +# Run with test list +poetry run pytest --disagg test_disagg.py -s -vv --disagg-test-list=./testlist/all.txt +``` + +**Custom batch size:** +```bash +# Set batch size via command line +poetry run pytest --disagg test_disagg.py -s -vv --disagg-batch-size=10 + +# Set batch size via environment variable +export DISAGG_BATCH_SIZE=20 +poetry run pytest --disagg test_disagg.py -s -vv + +# Submit all jobs at once (unlimited batch) +poetry run pytest --disagg test_disagg.py -s -vv --disagg-batch-size=0 +``` + +### How Batch Submission Works + +``` +Pytest Collection Phase: + - Collects all test cases (e.g., 100 tests) + - BatchManager splits them into batches (e.g., 20 batches of 5) + +Pytest Execution Phase: + Test 0 runs: + -> Triggers submission of Batch 0 (jobs 0-4) + -> Waits for job 0 to complete + + Test 1-4 run: + -> Batch 0 already submitted, directly wait for completion + + Test 5 runs: + -> Triggers submission of Batch 1 (jobs 5-9) + -> Waits for job 5 to complete + + ... and so on +``` + +### Key Benefits + +- **Parallel Execution**: All jobs in a batch run simultaneously on SLURM cluster +- **Reduced Wait Time**: Total time ≈ MAX(job time) instead of SUM(job times) +- **Automatic Management**: No need to manually split test lists +- **Lazy Loading**: Only submits batches when needed + +### Configuration Options + +**Priority**: Command line option > Environment variable > Default (5) + +**Examples:** + +```bash +# Small batch for quick testing +poetry run pytest --disagg test_disagg.py -s -vv --disagg-batch-size=3 \ + --disagg-test-list=./testlist/debug.txt + +# Large batch for production +poetry run pytest --disagg test_disagg.py -s -vv --disagg-batch-size=50 \ + --disagg-test-list=./testlist/all.txt + +# Submit all at once +poetry run pytest --disagg test_disagg.py -s -vv --disagg-batch-size=0 +``` + +### Timeout Configuration + +The default timeout for waiting for job completion is **10 hours (36000 seconds)**, which accounts for: +- SLURM queue wait time +- Job execution time +- Buffer for delays + +### Performance Comparison + +**Before (Sequential Submission):** +``` +Case 1: submit + wait (1.5h) = 1.5h +Case 2: submit + wait (1.5h) = 1.5h +Case 3: submit + wait (1.5h) = 1.5h +... +Total: 50 × 1.5h = 75 hours +``` + +**After (Batch Submission, batch_size=50):** +``` +Batch 0 (50 jobs): submitted in parallel + Case 1: wait (1.5h) + Case 2-50: wait (0s, already done) + +Total: ~1.5 hours +``` + +**Speedup: 50x** + +### Troubleshooting + +**Check BatchManager initialization:** +``` +====================================================================== +Batch Manager Initialized +Batch size: 5 jobs per batch +====================================================================== + +Total test configs: 20 +Total batches: 4 +``` + +**Monitor batch submission:** +``` +====================================================================== +Submitting Batch 0 +Range: [0:5] (5 jobs) +====================================================================== + + [ 1/5] Job 1234 <- test_config_id_1 + [ 2/5] Job 1235 <- test_config_id_2 + ... +``` + +**If jobs timeout frequently:** +- Check SLURM queue status +- Consider reducing batch size to avoid resource contention +- Verify that timeout (36000s) is sufficient for your workload + ## Test Naming Convention Tests are automatically named using the format: @@ -193,6 +328,7 @@ Test results are saved to: - `GPU_TYPE`: Current GPU type (default: GB200) - `OUTPUT_PATH`: Directory for test results and logs - `WORK_DIR`: Working directory for benchmark execution +- `DISAGG_BATCH_SIZE`: Default batch size for job submission (default: 5) - `DEBUG_MODE`: Enable debug mode (set to "1" to skip job submission) - `DEBUG_JOB_ID`: Job ID to use in debug mode @@ -212,10 +348,11 @@ The framework consists of: 1. **ConfigLoader**: Scans and loads YAML configurations 2. **ConfigValidator**: Validates configuration correctness -3. **JobManager**: Handles SLURM job submission and monitoring -4. **LogParser**: Extracts metrics from benchmark logs -5. **TestCaseTracker**: Tracks test execution timing -6. **ResultSaver**: Saves results to CSV +3. **BatchManager**: Manages batch job submission for parallel execution +4. **JobManager**: Handles SLURM job submission and monitoring +5. **LogParser**: Extracts metrics from benchmark logs +6. **TestCaseTracker**: Tracks test execution timing +7. **ResultSaver**: Saves results to CSV ## Benefits diff --git a/tests/integration/defs/perf/disagg/conftest.py b/tests/integration/defs/perf/disagg/conftest.py index 2dabeda1cd9..a4b88542dfd 100644 --- a/tests/integration/defs/perf/disagg/conftest.py +++ b/tests/integration/defs/perf/disagg/conftest.py @@ -1,9 +1,11 @@ """Pytest configuration for disagg tests. Only collects tests in this directory when --disagg parameter is provided. -Can share options like --disagg-test-list defined in this conftest.py. +Provides batch job submission capability to improve parallelism. """ +import os + import pytest from utils.logger import logger @@ -23,6 +25,15 @@ def pytest_addoption(parser): help="Path to a file containing test IDs (one per line) to run. " "Example: pytest --disagg --disagg-test-list=testlist/testlist_gb200.txt", ) + parser.addoption( + "--disagg-batch-size", + action="store", + type=int, + default=None, + help="Number of jobs to submit per batch. Default: from env DISAGG_BATCH_SIZE or 5. " + "Set to 0 for unlimited (submit all at once). " + "Example: pytest --disagg --disagg-batch-size=10", + ) def pytest_collect_directory(path, parent): @@ -45,7 +56,6 @@ def pytest_collect_directory(path, parent): return True # With --disagg parameter, proceed with normal collection - # Can subsequently use --disagg-test-list and other options from main conftest.py for filtering return None @@ -88,7 +98,7 @@ def pytest_collection_modifyitems(config, items): for item in items: # item.nodeid is the full test identifier like: - # "test_disagg_simple.py::TestDisaggBenchmark::test_benchmark[deepseek-r1-fp4:1k1k:...]" + # "test_disagg.py::TestDisaggBenchmark::test_benchmark[deepseek-r1-fp4:1k1k:...]" if item.nodeid in wanted_tests: selected.append(item) else: @@ -112,3 +122,180 @@ def pytest_collection_modifyitems(config, items): logger.warning(f"Please check that the test IDs in {test_list_file} are correct.") logger.info(f"{'=' * 70}\n") + + +class BatchManager: + """Batch job submission manager for disagg tests. + + Automatically splits test cases into batches and submits them on-demand + to maximize parallelism in SLURM cluster environments. + + Key features: + - Lazy batch submission: only submits when needed + - Configurable batch size via CLI or environment variable + - Maintains job_id mapping for all submitted jobs + """ + + def __init__(self, batch_size=5): + """Initialize batch manager. + + Args: + batch_size: Number of jobs per batch. None or 0 means unlimited (submit all at once). + Default is 5 if not specified. + """ + # Normalize batch_size: None, 0, or negative means unlimited + if batch_size is None or batch_size <= 0: + self.batch_size = None + else: + self.batch_size = batch_size + + self.submitted_batches = set() # Track which batch numbers have been submitted + self.job_mapping = {} # Map test_id -> SLURM job_id + self.all_configs = [] # Ordered list of all test configs + + logger.info(f"\n{'=' * 70}") + logger.info("Batch Manager Initialized") + if self.batch_size: + logger.info(f"Batch size: {self.batch_size} jobs per batch") + else: + logger.info("Batch size: unlimited (submit all at once)") + logger.info(f"{'=' * 70}\n") + + def add_config(self, test_config): + """Add a test configuration to the manager. + + Called during initialization to build the ordered list of configs. + + Args: + test_config: TestConfig object to add + """ + self.all_configs.append(test_config) + + def get_job_id(self, test_config): + """Get SLURM job ID for a test config, submitting batch if needed. + + This is the main entry point. It: + 1. Determines which batch the test belongs to + 2. Submits the entire batch if not already submitted + 3. Returns the job_id for this specific test + + Args: + test_config: TestConfig object to get job_id for + + Returns: + str: SLURM job ID, or None if submission failed + """ + # Find the index of this config in the ordered list + try: + idx = next( + i for i, c in enumerate(self.all_configs) if c.test_id == test_config.test_id + ) + except StopIteration: + logger.error(f"Config not found in manager: {test_config.test_id}") + return None + + # Calculate which batch this test belongs to + if self.batch_size: + batch_num = idx // self.batch_size + else: + batch_num = 0 # All tests in one batch + + # Submit the batch if not already submitted + if batch_num not in self.submitted_batches: + self._submit_batch(batch_num) + + # Return the cached job_id + return self.job_mapping.get(test_config.test_id) + + def _submit_batch(self, batch_num): + """Submit all jobs in a specific batch. + + Args: + batch_num: Batch number to submit (0-indexed) + """ + from execution.executor import JobManager + + # Calculate batch range + if self.batch_size: + start_idx = batch_num * self.batch_size + end_idx = min(start_idx + self.batch_size, len(self.all_configs)) + else: + start_idx = 0 + end_idx = len(self.all_configs) + + batch_configs = self.all_configs[start_idx:end_idx] + + logger.info(f"\n{'=' * 70}") + logger.info(f"Submitting Batch {batch_num}") + logger.info(f"Range: [{start_idx}:{end_idx}] ({len(batch_configs)} jobs)") + logger.info(f"{'=' * 70}\n") + + # Submit all jobs in this batch + success_count = 0 + for i, config in enumerate(batch_configs, 1): + try: + success, job_id = JobManager.submit_test_job(config) + if success and job_id: + self.job_mapping[config.test_id] = job_id + success_count += 1 + # Truncate test_id for display + display_id = ( + config.test_id[:60] + "..." if len(config.test_id) > 60 else config.test_id + ) + logger.success(f" [{i:3d}/{len(batch_configs)}] Job {job_id} <- {display_id}") + else: + self.job_mapping[config.test_id] = None + logger.error(f" [{i:3d}/{len(batch_configs)}] Failed: {config.test_id[:50]}") + except Exception as e: + self.job_mapping[config.test_id] = None + logger.error(f" [{i:3d}/{len(batch_configs)}] Error: {e}") + + # Mark batch as submitted + self.submitted_batches.add(batch_num) + + logger.info(f"\n{'=' * 70}") + logger.success( + f"Batch {batch_num} Complete: {success_count}/{len(batch_configs)} succeeded" + ) + logger.info(f"{'=' * 70}\n") + + +@pytest.fixture(scope="session") +def batch_manager(request): + """Provide batch manager fixture for test methods. + + This session-scoped fixture creates and initializes the BatchManager + with all collected test configs. + + Returns: + BatchManager: Initialized batch manager instance + """ + # Get batch size from CLI option or environment variable + batch_size = request.config.getoption("--disagg-batch-size") + if batch_size is None: + env_batch_size = os.getenv("DISAGG_BATCH_SIZE") + if env_batch_size: + try: + batch_size = int(env_batch_size) + except ValueError: + logger.warning(f"Invalid DISAGG_BATCH_SIZE: {env_batch_size}, using default 5") + batch_size = 5 + else: + batch_size = 5 # Default batch size + + # Create batch manager + manager = BatchManager(batch_size=batch_size) + + # Extract all test configs from collected items + for item in request.session.items: + if hasattr(item, "callspec") and "test_config" in item.callspec.params: + manager.add_config(item.callspec.params["test_config"]) + + # Log statistics + logger.info(f"Total test configs: {len(manager.all_configs)}") + if manager.batch_size: + total_batches = (len(manager.all_configs) + manager.batch_size - 1) // manager.batch_size + logger.info(f"Total batches: {total_batches}") + logger.info("") + + return manager diff --git a/tests/integration/defs/perf/disagg/execution/executor.py b/tests/integration/defs/perf/disagg/execution/executor.py index d454765c536..547b63aa8c4 100644 --- a/tests/integration/defs/perf/disagg/execution/executor.py +++ b/tests/integration/defs/perf/disagg/execution/executor.py @@ -114,7 +114,9 @@ def submit_shell_job( logger.debug(f"Script: {script_path}") logger.debug(f"Log file: {output_log_file}") - output = exec_cmd_with_output(sbatch_args, timeout=60) + # Use check=False to allow submission even with Kerberos warnings + # (mimics submit.py behavior) + output = exec_cmd_with_output(sbatch_args, timeout=60, check=False) job_id = output.strip() # Parse job ID (--parsable returns just the job ID) diff --git a/tests/integration/defs/perf/disagg/execution/subprocess_utils.py b/tests/integration/defs/perf/disagg/execution/subprocess_utils.py index 9ab77714267..39a3f0ac4b9 100644 --- a/tests/integration/defs/perf/disagg/execution/subprocess_utils.py +++ b/tests/integration/defs/perf/disagg/execution/subprocess_utils.py @@ -33,19 +33,22 @@ def exec_cmd(*popenargs, timeout: Optional[float] = None, **kwargs) -> int: return result.returncode -def exec_cmd_with_output(*popenargs, timeout: Optional[float] = None, **kwargs) -> str: +def exec_cmd_with_output( + *popenargs, timeout: Optional[float] = None, check: bool = True, **kwargs +) -> str: """Execute command and return output as string. Args: *popenargs: Command and arguments timeout: Timeout in seconds + check: If True, raise CalledProcessError on non-zero exit code (default: True) **kwargs: Additional subprocess arguments Returns: stdout as string (decoded from bytes) Raises: - subprocess.CalledProcessError: If command returns non-zero exit code + subprocess.CalledProcessError: If check=True and command returns non-zero exit code subprocess.TimeoutExpired: If timeout is reached """ result = subprocess.run( @@ -53,11 +56,15 @@ def exec_cmd_with_output(*popenargs, timeout: Optional[float] = None, **kwargs) stdout=subprocess.PIPE, stderr=subprocess.PIPE, timeout=timeout, - check=True, + check=check, **kwargs, ) - # Log stderr if it exists + # Log stderr if it exists (as warning if check=False, as error if check=True) if result.stderr: - stderr_output = result.stderr.decode() - logger.error(f"Command stderr: {stderr_output}") + stderr_output = result.stderr.decode().strip() + if stderr_output: + if check: + logger.error(f"Command stderr: {stderr_output}") + else: + logger.warning(f"Command stderr: {stderr_output}") return result.stdout.decode() diff --git a/tests/integration/defs/perf/disagg/test_disagg.py b/tests/integration/defs/perf/disagg/test_disagg.py index 39008ca11a1..b60ba851967 100644 --- a/tests/integration/defs/perf/disagg/test_disagg.py +++ b/tests/integration/defs/perf/disagg/test_disagg.py @@ -62,7 +62,7 @@ class TestDisaggBenchmark: @pytest.mark.perf @pytest.mark.parametrize("test_config", PERF_TEST_CASES) - def test_benchmark(self, request, test_config: TestConfig): + def test_benchmark(self, request, batch_manager, test_config: TestConfig): """Performance benchmark test for YAML configurations.""" full_test_name = request.node.name @@ -101,15 +101,14 @@ def test_benchmark(self, request, test_config: TestConfig): ) job_id = EnvManager.get_debug_job_id() else: - # Submit job using JobManager - success, job_id = JobManager.submit_test_job(test_config) + # Get job_id from batch manager (auto-submits batch if needed) + job_id = batch_manager.get_job_id(test_config) # Validate submission result - assert success, f"Job submission failed: {test_config.test_id}" - assert job_id, "Unable to get job ID" + assert job_id, f"Failed to get job_id for {test_config.test_id}" - # Wait for completion (timeout/early failure handled inside) - JobManager.wait_for_completion(job_id, 7200, test_config, check_early_failure=True) + # Wait for completion (timeout: 10 hours = 36000 seconds) + JobManager.wait_for_completion(job_id, 36000, test_config, check_early_failure=True) # End tracking test case test_tracker.end_test_case() @@ -136,7 +135,7 @@ def test_benchmark(self, request, test_config: TestConfig): @pytest.mark.accuracy @pytest.mark.parametrize("test_config", ACCURACY_TEST_CASES) - def test_accuracy(self, request, test_config: TestConfig): + def test_accuracy(self, request, batch_manager, test_config: TestConfig): """Accuracy test for YAML configurations.""" full_test_name = request.node.name @@ -179,15 +178,14 @@ def test_accuracy(self, request, test_config: TestConfig): ) job_id = EnvManager.get_debug_job_id() else: - # Submit job using JobManager - success, job_id = JobManager.submit_test_job(test_config) + # Get job_id from batch manager (auto-submits batch if needed) + job_id = batch_manager.get_job_id(test_config) # Validate submission result - assert success, f"Job submission failed: {test_config.test_id}" - assert job_id, "Unable to get job ID" + assert job_id, f"Failed to get job_id for {test_config.test_id}" - # Wait for completion (timeout/early failure handled inside) - JobManager.wait_for_completion(job_id, 10800, test_config, check_early_failure=True) + # Wait for completion (timeout: 10 hours = 36000 seconds) + JobManager.wait_for_completion(job_id, 36000, test_config, check_early_failure=True) # End tracking test case test_tracker.end_test_case() @@ -216,7 +214,7 @@ def test_accuracy(self, request, test_config: TestConfig): @pytest.mark.stress @pytest.mark.parametrize("test_config", STRESS_TEST_CASES) - def test_stress(self, request, test_config: TestConfig): + def test_stress(self, request, batch_manager, test_config: TestConfig): """Stress test combining performance benchmarks and accuracy validation. This test type is designed for stress testing scenarios where both @@ -265,15 +263,14 @@ def test_stress(self, request, test_config: TestConfig): ) job_id = EnvManager.get_debug_job_id() else: - # Submit job using JobManager - success, job_id = JobManager.submit_test_job(test_config) + # Get job_id from batch manager (auto-submits batch if needed) + job_id = batch_manager.get_job_id(test_config) # Validate submission result - assert success, f"Job submission failed: {test_config.test_id}" - assert job_id, "Unable to get job ID" + assert job_id, f"Failed to get job_id for {test_config.test_id}" - # Wait for completion (longer timeout for stress tests: 4 hours) - JobManager.wait_for_completion(job_id, 10800, test_config, check_early_failure=True) + # Wait for completion (timeout: 10 hours = 36000 seconds) + JobManager.wait_for_completion(job_id, 36000, test_config, check_early_failure=True) # End tracking test case test_tracker.end_test_case()