Skip to content

Commit f8b2a8f

Browse files
[None][chore] Support multiple job submission at the same time (#10492)
Signed-off-by: FredricZ-2007 <[email protected]> Co-authored-by: FredricZ-2007 <[email protected]>
1 parent b85c447 commit f8b2a8f

File tree

5 files changed

+365
-35
lines changed

5 files changed

+365
-35
lines changed

tests/integration/defs/perf/disagg/README.md

Lines changed: 141 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -132,6 +132,141 @@ poetry run pytest --disagg test_disagg.py -s -vv -m accuracy
132132
poetry run pytest --disagg test_disagg.py -s -vv -k "deepseek-r1-fp4_1k1k"
133133
```
134134

135+
## Batch Job Submission
136+
137+
The framework supports automatic batch job submission to maximize parallelism in SLURM cluster environments. Instead of submitting jobs one-by-one, it groups test cases into batches and submits entire batches when needed.
138+
139+
### Quick Start
140+
141+
**Default batch size (5 jobs per batch):**
142+
```bash
143+
# Run all tests with default batching
144+
poetry run pytest --disagg test_disagg.py -s -vv
145+
146+
# Run with test list
147+
poetry run pytest --disagg test_disagg.py -s -vv --disagg-test-list=./testlist/all.txt
148+
```
149+
150+
**Custom batch size:**
151+
```bash
152+
# Set batch size via command line
153+
poetry run pytest --disagg test_disagg.py -s -vv --disagg-batch-size=10
154+
155+
# Set batch size via environment variable
156+
export DISAGG_BATCH_SIZE=20
157+
poetry run pytest --disagg test_disagg.py -s -vv
158+
159+
# Submit all jobs at once (unlimited batch)
160+
poetry run pytest --disagg test_disagg.py -s -vv --disagg-batch-size=0
161+
```
162+
163+
### How Batch Submission Works
164+
165+
```
166+
Pytest Collection Phase:
167+
- Collects all test cases (e.g., 100 tests)
168+
- BatchManager splits them into batches (e.g., 20 batches of 5)
169+
170+
Pytest Execution Phase:
171+
Test 0 runs:
172+
-> Triggers submission of Batch 0 (jobs 0-4)
173+
-> Waits for job 0 to complete
174+
175+
Test 1-4 run:
176+
-> Batch 0 already submitted, directly wait for completion
177+
178+
Test 5 runs:
179+
-> Triggers submission of Batch 1 (jobs 5-9)
180+
-> Waits for job 5 to complete
181+
182+
... and so on
183+
```
184+
185+
### Key Benefits
186+
187+
- **Parallel Execution**: All jobs in a batch run simultaneously on SLURM cluster
188+
- **Reduced Wait Time**: Total time ≈ MAX(job time) instead of SUM(job times)
189+
- **Automatic Management**: No need to manually split test lists
190+
- **Lazy Loading**: Only submits batches when needed
191+
192+
### Configuration Options
193+
194+
**Priority**: Command line option > Environment variable > Default (5)
195+
196+
**Examples:**
197+
198+
```bash
199+
# Small batch for quick testing
200+
poetry run pytest --disagg test_disagg.py -s -vv --disagg-batch-size=3 \
201+
--disagg-test-list=./testlist/debug.txt
202+
203+
# Large batch for production
204+
poetry run pytest --disagg test_disagg.py -s -vv --disagg-batch-size=50 \
205+
--disagg-test-list=./testlist/all.txt
206+
207+
# Submit all at once
208+
poetry run pytest --disagg test_disagg.py -s -vv --disagg-batch-size=0
209+
```
210+
211+
### Timeout Configuration
212+
213+
The default timeout for waiting for job completion is **10 hours (36000 seconds)**, which accounts for:
214+
- SLURM queue wait time
215+
- Job execution time
216+
- Buffer for delays
217+
218+
### Performance Comparison
219+
220+
**Before (Sequential Submission):**
221+
```
222+
Case 1: submit + wait (1.5h) = 1.5h
223+
Case 2: submit + wait (1.5h) = 1.5h
224+
Case 3: submit + wait (1.5h) = 1.5h
225+
...
226+
Total: 50 × 1.5h = 75 hours
227+
```
228+
229+
**After (Batch Submission, batch_size=50):**
230+
```
231+
Batch 0 (50 jobs): submitted in parallel
232+
Case 1: wait (1.5h)
233+
Case 2-50: wait (0s, already done)
234+
235+
Total: ~1.5 hours
236+
```
237+
238+
**Speedup: 50x**
239+
240+
### Troubleshooting
241+
242+
**Check BatchManager initialization:**
243+
```
244+
======================================================================
245+
Batch Manager Initialized
246+
Batch size: 5 jobs per batch
247+
======================================================================
248+
249+
Total test configs: 20
250+
Total batches: 4
251+
```
252+
253+
**Monitor batch submission:**
254+
```
255+
======================================================================
256+
Submitting Batch 0
257+
Range: [0:5] (5 jobs)
258+
======================================================================
259+
260+
[ 1/5] Job 1234 <- test_config_id_1
261+
[ 2/5] Job 1235 <- test_config_id_2
262+
...
263+
```
264+
265+
**If jobs timeout frequently:**
266+
- Check SLURM queue status
267+
- Consider reducing batch size to avoid resource contention
268+
- Verify that timeout (36000s) is sufficient for your workload
269+
135270
## Test Naming Convention
136271

137272
Tests are automatically named using the format:
@@ -193,6 +328,7 @@ Test results are saved to:
193328
- `GPU_TYPE`: Current GPU type (default: GB200)
194329
- `OUTPUT_PATH`: Directory for test results and logs
195330
- `WORK_DIR`: Working directory for benchmark execution
331+
- `DISAGG_BATCH_SIZE`: Default batch size for job submission (default: 5)
196332
- `DEBUG_MODE`: Enable debug mode (set to "1" to skip job submission)
197333
- `DEBUG_JOB_ID`: Job ID to use in debug mode
198334

@@ -212,10 +348,11 @@ The framework consists of:
212348

213349
1. **ConfigLoader**: Scans and loads YAML configurations
214350
2. **ConfigValidator**: Validates configuration correctness
215-
3. **JobManager**: Handles SLURM job submission and monitoring
216-
4. **LogParser**: Extracts metrics from benchmark logs
217-
5. **TestCaseTracker**: Tracks test execution timing
218-
6. **ResultSaver**: Saves results to CSV
351+
3. **BatchManager**: Manages batch job submission for parallel execution
352+
4. **JobManager**: Handles SLURM job submission and monitoring
353+
5. **LogParser**: Extracts metrics from benchmark logs
354+
6. **TestCaseTracker**: Tracks test execution timing
355+
7. **ResultSaver**: Saves results to CSV
219356

220357
## Benefits
221358

tests/integration/defs/perf/disagg/conftest.py

Lines changed: 190 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -1,9 +1,11 @@
11
"""Pytest configuration for disagg tests.
22
33
Only collects tests in this directory when --disagg parameter is provided.
4-
Can share options like --disagg-test-list defined in this conftest.py.
4+
Provides batch job submission capability to improve parallelism.
55
"""
66

7+
import os
8+
79
import pytest
810
from utils.logger import logger
911

@@ -23,6 +25,15 @@ def pytest_addoption(parser):
2325
help="Path to a file containing test IDs (one per line) to run. "
2426
"Example: pytest --disagg --disagg-test-list=testlist/testlist_gb200.txt",
2527
)
28+
parser.addoption(
29+
"--disagg-batch-size",
30+
action="store",
31+
type=int,
32+
default=None,
33+
help="Number of jobs to submit per batch. Default: from env DISAGG_BATCH_SIZE or 5. "
34+
"Set to 0 for unlimited (submit all at once). "
35+
"Example: pytest --disagg --disagg-batch-size=10",
36+
)
2637

2738

2839
def pytest_collect_directory(path, parent):
@@ -45,7 +56,6 @@ def pytest_collect_directory(path, parent):
4556
return True
4657

4758
# With --disagg parameter, proceed with normal collection
48-
# Can subsequently use --disagg-test-list and other options from main conftest.py for filtering
4959
return None
5060

5161

@@ -88,7 +98,7 @@ def pytest_collection_modifyitems(config, items):
8898

8999
for item in items:
90100
# item.nodeid is the full test identifier like:
91-
# "test_disagg_simple.py::TestDisaggBenchmark::test_benchmark[deepseek-r1-fp4:1k1k:...]"
101+
# "test_disagg.py::TestDisaggBenchmark::test_benchmark[deepseek-r1-fp4:1k1k:...]"
92102
if item.nodeid in wanted_tests:
93103
selected.append(item)
94104
else:
@@ -112,3 +122,180 @@ def pytest_collection_modifyitems(config, items):
112122
logger.warning(f"Please check that the test IDs in {test_list_file} are correct.")
113123

114124
logger.info(f"{'=' * 70}\n")
125+
126+
127+
class BatchManager:
128+
"""Batch job submission manager for disagg tests.
129+
130+
Automatically splits test cases into batches and submits them on-demand
131+
to maximize parallelism in SLURM cluster environments.
132+
133+
Key features:
134+
- Lazy batch submission: only submits when needed
135+
- Configurable batch size via CLI or environment variable
136+
- Maintains job_id mapping for all submitted jobs
137+
"""
138+
139+
def __init__(self, batch_size=5):
140+
"""Initialize batch manager.
141+
142+
Args:
143+
batch_size: Number of jobs per batch. None or 0 means unlimited (submit all at once).
144+
Default is 5 if not specified.
145+
"""
146+
# Normalize batch_size: None, 0, or negative means unlimited
147+
if batch_size is None or batch_size <= 0:
148+
self.batch_size = None
149+
else:
150+
self.batch_size = batch_size
151+
152+
self.submitted_batches = set() # Track which batch numbers have been submitted
153+
self.job_mapping = {} # Map test_id -> SLURM job_id
154+
self.all_configs = [] # Ordered list of all test configs
155+
156+
logger.info(f"\n{'=' * 70}")
157+
logger.info("Batch Manager Initialized")
158+
if self.batch_size:
159+
logger.info(f"Batch size: {self.batch_size} jobs per batch")
160+
else:
161+
logger.info("Batch size: unlimited (submit all at once)")
162+
logger.info(f"{'=' * 70}\n")
163+
164+
def add_config(self, test_config):
165+
"""Add a test configuration to the manager.
166+
167+
Called during initialization to build the ordered list of configs.
168+
169+
Args:
170+
test_config: TestConfig object to add
171+
"""
172+
self.all_configs.append(test_config)
173+
174+
def get_job_id(self, test_config):
175+
"""Get SLURM job ID for a test config, submitting batch if needed.
176+
177+
This is the main entry point. It:
178+
1. Determines which batch the test belongs to
179+
2. Submits the entire batch if not already submitted
180+
3. Returns the job_id for this specific test
181+
182+
Args:
183+
test_config: TestConfig object to get job_id for
184+
185+
Returns:
186+
str: SLURM job ID, or None if submission failed
187+
"""
188+
# Find the index of this config in the ordered list
189+
try:
190+
idx = next(
191+
i for i, c in enumerate(self.all_configs) if c.test_id == test_config.test_id
192+
)
193+
except StopIteration:
194+
logger.error(f"Config not found in manager: {test_config.test_id}")
195+
return None
196+
197+
# Calculate which batch this test belongs to
198+
if self.batch_size:
199+
batch_num = idx // self.batch_size
200+
else:
201+
batch_num = 0 # All tests in one batch
202+
203+
# Submit the batch if not already submitted
204+
if batch_num not in self.submitted_batches:
205+
self._submit_batch(batch_num)
206+
207+
# Return the cached job_id
208+
return self.job_mapping.get(test_config.test_id)
209+
210+
def _submit_batch(self, batch_num):
211+
"""Submit all jobs in a specific batch.
212+
213+
Args:
214+
batch_num: Batch number to submit (0-indexed)
215+
"""
216+
from execution.executor import JobManager
217+
218+
# Calculate batch range
219+
if self.batch_size:
220+
start_idx = batch_num * self.batch_size
221+
end_idx = min(start_idx + self.batch_size, len(self.all_configs))
222+
else:
223+
start_idx = 0
224+
end_idx = len(self.all_configs)
225+
226+
batch_configs = self.all_configs[start_idx:end_idx]
227+
228+
logger.info(f"\n{'=' * 70}")
229+
logger.info(f"Submitting Batch {batch_num}")
230+
logger.info(f"Range: [{start_idx}:{end_idx}] ({len(batch_configs)} jobs)")
231+
logger.info(f"{'=' * 70}\n")
232+
233+
# Submit all jobs in this batch
234+
success_count = 0
235+
for i, config in enumerate(batch_configs, 1):
236+
try:
237+
success, job_id = JobManager.submit_test_job(config)
238+
if success and job_id:
239+
self.job_mapping[config.test_id] = job_id
240+
success_count += 1
241+
# Truncate test_id for display
242+
display_id = (
243+
config.test_id[:60] + "..." if len(config.test_id) > 60 else config.test_id
244+
)
245+
logger.success(f" [{i:3d}/{len(batch_configs)}] Job {job_id} <- {display_id}")
246+
else:
247+
self.job_mapping[config.test_id] = None
248+
logger.error(f" [{i:3d}/{len(batch_configs)}] Failed: {config.test_id[:50]}")
249+
except Exception as e:
250+
self.job_mapping[config.test_id] = None
251+
logger.error(f" [{i:3d}/{len(batch_configs)}] Error: {e}")
252+
253+
# Mark batch as submitted
254+
self.submitted_batches.add(batch_num)
255+
256+
logger.info(f"\n{'=' * 70}")
257+
logger.success(
258+
f"Batch {batch_num} Complete: {success_count}/{len(batch_configs)} succeeded"
259+
)
260+
logger.info(f"{'=' * 70}\n")
261+
262+
263+
@pytest.fixture(scope="session")
264+
def batch_manager(request):
265+
"""Provide batch manager fixture for test methods.
266+
267+
This session-scoped fixture creates and initializes the BatchManager
268+
with all collected test configs.
269+
270+
Returns:
271+
BatchManager: Initialized batch manager instance
272+
"""
273+
# Get batch size from CLI option or environment variable
274+
batch_size = request.config.getoption("--disagg-batch-size")
275+
if batch_size is None:
276+
env_batch_size = os.getenv("DISAGG_BATCH_SIZE")
277+
if env_batch_size:
278+
try:
279+
batch_size = int(env_batch_size)
280+
except ValueError:
281+
logger.warning(f"Invalid DISAGG_BATCH_SIZE: {env_batch_size}, using default 5")
282+
batch_size = 5
283+
else:
284+
batch_size = 5 # Default batch size
285+
286+
# Create batch manager
287+
manager = BatchManager(batch_size=batch_size)
288+
289+
# Extract all test configs from collected items
290+
for item in request.session.items:
291+
if hasattr(item, "callspec") and "test_config" in item.callspec.params:
292+
manager.add_config(item.callspec.params["test_config"])
293+
294+
# Log statistics
295+
logger.info(f"Total test configs: {len(manager.all_configs)}")
296+
if manager.batch_size:
297+
total_batches = (len(manager.all_configs) + manager.batch_size - 1) // manager.batch_size
298+
logger.info(f"Total batches: {total_batches}")
299+
logger.info("")
300+
301+
return manager

0 commit comments

Comments
 (0)