Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
145 changes: 141 additions & 4 deletions tests/integration/defs/perf/disagg/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -132,6 +132,141 @@ poetry run pytest --disagg test_disagg.py -s -vv -m accuracy
poetry run pytest --disagg test_disagg.py -s -vv -k "deepseek-r1-fp4_1k1k"
```

## Batch Job Submission

The framework supports automatic batch job submission to maximize parallelism in SLURM cluster environments. Instead of submitting jobs one-by-one, it groups test cases into batches and submits entire batches when needed.

### Quick Start

**Default batch size (5 jobs per batch):**
```bash
# Run all tests with default batching
poetry run pytest --disagg test_disagg.py -s -vv

# Run with test list
poetry run pytest --disagg test_disagg.py -s -vv --disagg-test-list=./testlist/all.txt
```

**Custom batch size:**
```bash
# Set batch size via command line
poetry run pytest --disagg test_disagg.py -s -vv --disagg-batch-size=10

# Set batch size via environment variable
export DISAGG_BATCH_SIZE=20
poetry run pytest --disagg test_disagg.py -s -vv

# Submit all jobs at once (unlimited batch)
poetry run pytest --disagg test_disagg.py -s -vv --disagg-batch-size=0
```

### How Batch Submission Works

```
Pytest Collection Phase:
- Collects all test cases (e.g., 100 tests)
- BatchManager splits them into batches (e.g., 20 batches of 5)

Pytest Execution Phase:
Test 0 runs:
-> Triggers submission of Batch 0 (jobs 0-4)
-> Waits for job 0 to complete

Test 1-4 run:
-> Batch 0 already submitted, directly wait for completion

Test 5 runs:
-> Triggers submission of Batch 1 (jobs 5-9)
-> Waits for job 5 to complete

... and so on
```

### Key Benefits

- **Parallel Execution**: All jobs in a batch run simultaneously on SLURM cluster
- **Reduced Wait Time**: Total time ≈ MAX(job time) instead of SUM(job times)
- **Automatic Management**: No need to manually split test lists
- **Lazy Loading**: Only submits batches when needed

### Configuration Options

**Priority**: Command line option > Environment variable > Default (5)

**Examples:**

```bash
# Small batch for quick testing
poetry run pytest --disagg test_disagg.py -s -vv --disagg-batch-size=3 \
--disagg-test-list=./testlist/debug.txt

# Large batch for production
poetry run pytest --disagg test_disagg.py -s -vv --disagg-batch-size=50 \
--disagg-test-list=./testlist/all.txt

# Submit all at once
poetry run pytest --disagg test_disagg.py -s -vv --disagg-batch-size=0
```

### Timeout Configuration

The default timeout for waiting for job completion is **10 hours (36000 seconds)**, which accounts for:
- SLURM queue wait time
- Job execution time
- Buffer for delays

### Performance Comparison

**Before (Sequential Submission):**
```
Case 1: submit + wait (1.5h) = 1.5h
Case 2: submit + wait (1.5h) = 1.5h
Case 3: submit + wait (1.5h) = 1.5h
...
Total: 50 × 1.5h = 75 hours
```

**After (Batch Submission, batch_size=50):**
```
Batch 0 (50 jobs): submitted in parallel
Case 1: wait (1.5h)
Case 2-50: wait (0s, already done)

Total: ~1.5 hours
```

**Speedup: 50x**

### Troubleshooting

**Check BatchManager initialization:**
```
======================================================================
Batch Manager Initialized
Batch size: 5 jobs per batch
======================================================================

Total test configs: 20
Total batches: 4
```

**Monitor batch submission:**
```
======================================================================
Submitting Batch 0
Range: [0:5] (5 jobs)
======================================================================

[ 1/5] Job 1234 <- test_config_id_1
[ 2/5] Job 1235 <- test_config_id_2
...
```

**If jobs timeout frequently:**
- Check SLURM queue status
- Consider reducing batch size to avoid resource contention
- Verify that timeout (36000s) is sufficient for your workload

## Test Naming Convention

Tests are automatically named using the format:
Expand Down Expand Up @@ -193,6 +328,7 @@ Test results are saved to:
- `GPU_TYPE`: Current GPU type (default: GB200)
- `OUTPUT_PATH`: Directory for test results and logs
- `WORK_DIR`: Working directory for benchmark execution
- `DISAGG_BATCH_SIZE`: Default batch size for job submission (default: 5)
- `DEBUG_MODE`: Enable debug mode (set to "1" to skip job submission)
- `DEBUG_JOB_ID`: Job ID to use in debug mode

Expand All @@ -212,10 +348,11 @@ The framework consists of:

1. **ConfigLoader**: Scans and loads YAML configurations
2. **ConfigValidator**: Validates configuration correctness
3. **JobManager**: Handles SLURM job submission and monitoring
4. **LogParser**: Extracts metrics from benchmark logs
5. **TestCaseTracker**: Tracks test execution timing
6. **ResultSaver**: Saves results to CSV
3. **BatchManager**: Manages batch job submission for parallel execution
4. **JobManager**: Handles SLURM job submission and monitoring
5. **LogParser**: Extracts metrics from benchmark logs
6. **TestCaseTracker**: Tracks test execution timing
7. **ResultSaver**: Saves results to CSV

## Benefits

Expand Down
193 changes: 190 additions & 3 deletions tests/integration/defs/perf/disagg/conftest.py
Original file line number Diff line number Diff line change
@@ -1,9 +1,11 @@
"""Pytest configuration for disagg tests.

Only collects tests in this directory when --disagg parameter is provided.
Can share options like --disagg-test-list defined in this conftest.py.
Provides batch job submission capability to improve parallelism.
"""

import os

import pytest
from utils.logger import logger

Expand All @@ -23,6 +25,15 @@ def pytest_addoption(parser):
help="Path to a file containing test IDs (one per line) to run. "
"Example: pytest --disagg --disagg-test-list=testlist/testlist_gb200.txt",
)
parser.addoption(
"--disagg-batch-size",
action="store",
type=int,
default=None,
help="Number of jobs to submit per batch. Default: from env DISAGG_BATCH_SIZE or 5. "
"Set to 0 for unlimited (submit all at once). "
"Example: pytest --disagg --disagg-batch-size=10",
)


def pytest_collect_directory(path, parent):
Expand All @@ -45,7 +56,6 @@ def pytest_collect_directory(path, parent):
return True

# With --disagg parameter, proceed with normal collection
# Can subsequently use --disagg-test-list and other options from main conftest.py for filtering
return None


Expand Down Expand Up @@ -88,7 +98,7 @@ def pytest_collection_modifyitems(config, items):

for item in items:
# item.nodeid is the full test identifier like:
# "test_disagg_simple.py::TestDisaggBenchmark::test_benchmark[deepseek-r1-fp4:1k1k:...]"
# "test_disagg.py::TestDisaggBenchmark::test_benchmark[deepseek-r1-fp4:1k1k:...]"
if item.nodeid in wanted_tests:
selected.append(item)
else:
Expand All @@ -112,3 +122,180 @@ def pytest_collection_modifyitems(config, items):
logger.warning(f"Please check that the test IDs in {test_list_file} are correct.")

logger.info(f"{'=' * 70}\n")


class BatchManager:
"""Batch job submission manager for disagg tests.

Automatically splits test cases into batches and submits them on-demand
to maximize parallelism in SLURM cluster environments.

Key features:
- Lazy batch submission: only submits when needed
- Configurable batch size via CLI or environment variable
- Maintains job_id mapping for all submitted jobs
"""

def __init__(self, batch_size=5):
"""Initialize batch manager.

Args:
batch_size: Number of jobs per batch. None or 0 means unlimited (submit all at once).
Default is 5 if not specified.
"""
# Normalize batch_size: None, 0, or negative means unlimited
if batch_size is None or batch_size <= 0:
self.batch_size = None
else:
self.batch_size = batch_size

self.submitted_batches = set() # Track which batch numbers have been submitted
self.job_mapping = {} # Map test_id -> SLURM job_id
self.all_configs = [] # Ordered list of all test configs

logger.info(f"\n{'=' * 70}")
logger.info("Batch Manager Initialized")
if self.batch_size:
logger.info(f"Batch size: {self.batch_size} jobs per batch")
else:
logger.info("Batch size: unlimited (submit all at once)")
logger.info(f"{'=' * 70}\n")

def add_config(self, test_config):
"""Add a test configuration to the manager.

Called during initialization to build the ordered list of configs.

Args:
test_config: TestConfig object to add
"""
self.all_configs.append(test_config)

def get_job_id(self, test_config):
"""Get SLURM job ID for a test config, submitting batch if needed.

This is the main entry point. It:
1. Determines which batch the test belongs to
2. Submits the entire batch if not already submitted
3. Returns the job_id for this specific test

Args:
test_config: TestConfig object to get job_id for

Returns:
str: SLURM job ID, or None if submission failed
"""
# Find the index of this config in the ordered list
try:
idx = next(
i for i, c in enumerate(self.all_configs) if c.test_id == test_config.test_id
)
except StopIteration:
logger.error(f"Config not found in manager: {test_config.test_id}")
return None

# Calculate which batch this test belongs to
if self.batch_size:
batch_num = idx // self.batch_size
else:
batch_num = 0 # All tests in one batch

# Submit the batch if not already submitted
if batch_num not in self.submitted_batches:
self._submit_batch(batch_num)

# Return the cached job_id
return self.job_mapping.get(test_config.test_id)

def _submit_batch(self, batch_num):
"""Submit all jobs in a specific batch.

Args:
batch_num: Batch number to submit (0-indexed)
"""
from execution.executor import JobManager

# Calculate batch range
if self.batch_size:
start_idx = batch_num * self.batch_size
end_idx = min(start_idx + self.batch_size, len(self.all_configs))
else:
start_idx = 0
end_idx = len(self.all_configs)

batch_configs = self.all_configs[start_idx:end_idx]

logger.info(f"\n{'=' * 70}")
logger.info(f"Submitting Batch {batch_num}")
logger.info(f"Range: [{start_idx}:{end_idx}] ({len(batch_configs)} jobs)")
logger.info(f"{'=' * 70}\n")

# Submit all jobs in this batch
success_count = 0
for i, config in enumerate(batch_configs, 1):
try:
success, job_id = JobManager.submit_test_job(config)
if success and job_id:
self.job_mapping[config.test_id] = job_id
success_count += 1
# Truncate test_id for display
display_id = (
config.test_id[:60] + "..." if len(config.test_id) > 60 else config.test_id
)
logger.success(f" [{i:3d}/{len(batch_configs)}] Job {job_id} <- {display_id}")
else:
self.job_mapping[config.test_id] = None
logger.error(f" [{i:3d}/{len(batch_configs)}] Failed: {config.test_id[:50]}")
except Exception as e:
self.job_mapping[config.test_id] = None
logger.error(f" [{i:3d}/{len(batch_configs)}] Error: {e}")

# Mark batch as submitted
self.submitted_batches.add(batch_num)

logger.info(f"\n{'=' * 70}")
logger.success(
f"Batch {batch_num} Complete: {success_count}/{len(batch_configs)} succeeded"
)
logger.info(f"{'=' * 70}\n")


@pytest.fixture(scope="session")
def batch_manager(request):
"""Provide batch manager fixture for test methods.

This session-scoped fixture creates and initializes the BatchManager
with all collected test configs.

Returns:
BatchManager: Initialized batch manager instance
"""
# Get batch size from CLI option or environment variable
batch_size = request.config.getoption("--disagg-batch-size")
if batch_size is None:
env_batch_size = os.getenv("DISAGG_BATCH_SIZE")
if env_batch_size:
try:
batch_size = int(env_batch_size)
except ValueError:
logger.warning(f"Invalid DISAGG_BATCH_SIZE: {env_batch_size}, using default 5")
batch_size = 5
else:
batch_size = 5 # Default batch size

# Create batch manager
manager = BatchManager(batch_size=batch_size)

# Extract all test configs from collected items
for item in request.session.items:
if hasattr(item, "callspec") and "test_config" in item.callspec.params:
manager.add_config(item.callspec.params["test_config"])

# Log statistics
logger.info(f"Total test configs: {len(manager.all_configs)}")
if manager.batch_size:
total_batches = (len(manager.all_configs) + manager.batch_size - 1) // manager.batch_size
logger.info(f"Total batches: {total_batches}")
logger.info("")

return manager
Loading