Skip to content

Commit 3531cc1

Browse files
committed
feat(consume): create pre_hash subgroups w/max size for better xdist balancing
1 parent 9ee616b commit 3531cc1

File tree

2 files changed

+167
-4
lines changed

2 files changed

+167
-4
lines changed

src/pytest_plugins/consume/consume.py

Lines changed: 154 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -1,12 +1,20 @@
1-
"""A pytest plugin providing common functionality for consuming test fixtures."""
1+
"""
2+
A pytest plugin providing common functionality for consuming test fixtures.
23
4+
Features:
5+
- Downloads and caches test fixtures from various sources (local, URL, release).
6+
- Manages test case generation from fixture files.
7+
- Provides xdist load balancing for large pre-allocation groups (enginex simulator).
8+
"""
9+
10+
import logging
311
import re
412
import sys
513
import tarfile
614
from dataclasses import dataclass
715
from io import BytesIO
816
from pathlib import Path
9-
from typing import List, Tuple
17+
from typing import Dict, List, Tuple
1018
from urllib.parse import urlparse
1119

1220
import platformdirs
@@ -15,18 +23,125 @@
1523
import rich
1624

1725
from cli.gen_index import generate_fixtures_index
18-
from ethereum_test_fixtures import BaseFixture
26+
from ethereum_test_fixtures import BaseFixture, BlockchainEngineXFixture
1927
from ethereum_test_fixtures.consume import IndexFile, TestCases
2028
from ethereum_test_forks import get_forks, get_relative_fork_markers, get_transition_forks
2129
from ethereum_test_tools.utility.versioning import get_current_commit_hash_or_tag
2230

2331
from .releases import ReleaseTag, get_release_page_url, get_release_url, is_release_url, is_url
2432

33+
logger = logging.getLogger(__name__)
34+
2535
CACHED_DOWNLOADS_DIRECTORY = (
2636
Path(platformdirs.user_cache_dir("ethereum-execution-spec-tests")) / "cached_downloads"
2737
)
2838

2939

40+
class XDistGroupMapper:
41+
"""
42+
Maps test cases to xdist groups, splitting large pre-allocation groups into sub-groups.
43+
44+
This class helps improve load balancing when using pytest-xdist with --dist=loadgroup
45+
by breaking up large pre-allocation groups (e.g., 1000+ tests) into smaller virtual
46+
sub-groups while maintaining the constraint that tests from the same pre-allocation
47+
group must run on the same worker.
48+
"""
49+
50+
def __init__(self, max_group_size: int = 400):
51+
"""Initialize the mapper with a maximum group size."""
52+
self.max_group_size = max_group_size
53+
self.group_sizes: Dict[str, int] = {}
54+
self.test_to_subgroup: Dict[str, int] = {}
55+
self._built = False
56+
57+
def build_mapping(self, test_cases: TestCases) -> None:
58+
"""
59+
Build the mapping of test cases to sub-groups.
60+
61+
This analyzes all test cases and determines which pre-allocation groups
62+
need to be split into sub-groups based on the max_group_size.
63+
"""
64+
if self._built:
65+
return
66+
67+
# Count tests per pre-allocation group
68+
for test_case in test_cases:
69+
if hasattr(test_case, "pre_hash") and test_case.pre_hash:
70+
pre_hash = test_case.pre_hash
71+
self.group_sizes[pre_hash] = self.group_sizes.get(pre_hash, 0) + 1
72+
73+
# Assign sub-groups for large groups
74+
group_counters: Dict[str, int] = {}
75+
for test_case in test_cases:
76+
if hasattr(test_case, "pre_hash") and test_case.pre_hash:
77+
pre_hash = test_case.pre_hash
78+
group_size = self.group_sizes[pre_hash]
79+
80+
if group_size <= self.max_group_size:
81+
# Small group, no sub-group needed
82+
self.test_to_subgroup[test_case.id] = 0
83+
else:
84+
# Large group, assign to sub-group using round-robin
85+
counter = group_counters.get(pre_hash, 0)
86+
sub_group = counter // self.max_group_size
87+
self.test_to_subgroup[test_case.id] = sub_group
88+
group_counters[pre_hash] = counter + 1
89+
90+
self._built = True
91+
92+
# Log summary of large groups
93+
large_groups = [
94+
(pre_hash, size)
95+
for pre_hash, size in self.group_sizes.items()
96+
if size > self.max_group_size
97+
]
98+
if large_groups:
99+
logger.info(
100+
f"Found {len(large_groups)} pre-allocation groups larger than "
101+
f"{self.max_group_size} tests that will be split for better load balancing"
102+
)
103+
104+
def get_xdist_group_name(self, test_case) -> str:
105+
"""
106+
Get the xdist group name for a test case.
107+
108+
For small groups, returns the pre_hash as-is.
109+
For large groups, returns "{pre_hash}:{sub_group_index}".
110+
"""
111+
if not hasattr(test_case, "pre_hash") or not test_case.pre_hash:
112+
# No pre_hash, use test ID as fallback
113+
return test_case.id
114+
115+
pre_hash = test_case.pre_hash
116+
group_size = self.group_sizes.get(pre_hash, 0)
117+
118+
if group_size <= self.max_group_size:
119+
# Small group, use pre_hash as-is
120+
return pre_hash
121+
122+
# Large group, include sub-group index
123+
sub_group = self.test_to_subgroup.get(test_case.id, 0)
124+
return f"{pre_hash}:{sub_group}"
125+
126+
def get_split_statistics(self) -> Dict[str, Dict[str, int]]:
127+
"""
128+
Get statistics about how groups were split.
129+
130+
Returns a dict with information about each pre-allocation group
131+
and how many sub-groups it was split into.
132+
"""
133+
stats = {}
134+
for pre_hash, size in self.group_sizes.items():
135+
if size > self.max_group_size:
136+
num_subgroups = (size + self.max_group_size - 1) // self.max_group_size
137+
stats[pre_hash] = {
138+
"total_tests": size,
139+
"num_subgroups": num_subgroups,
140+
"tests_per_subgroup": size // num_subgroups,
141+
}
142+
return stats
143+
144+
30145
def default_input() -> str:
31146
"""
32147
Directory (default) to consume generated test fixtures from. Defined as a
@@ -345,6 +460,29 @@ def pytest_configure(config): # noqa: D103
345460
index = IndexFile.model_validate_json(index_file.read_text())
346461
config.test_cases = index.test_cases
347462

463+
# Create XDistGroupMapper for enginex simulator if needed
464+
if (
465+
hasattr(config, "_supported_fixture_formats")
466+
and BlockchainEngineXFixture.format_name in config._supported_fixture_formats
467+
):
468+
max_group_size = getattr(config, "enginex_max_group_size", 400)
469+
config.xdist_group_mapper = XDistGroupMapper(max_group_size)
470+
config.xdist_group_mapper.build_mapping(config.test_cases)
471+
472+
# Log statistics about group splitting
473+
split_stats = config.xdist_group_mapper.get_split_statistics()
474+
if split_stats:
475+
rich.print("[bold yellow]Pre-allocation group splitting for load balancing:[/]")
476+
for pre_hash, stats in split_stats.items():
477+
rich.print(
478+
f" Group {pre_hash[:8]}: {stats['total_tests']} tests → "
479+
f"{stats['num_subgroups']} sub-groups "
480+
f"(~{stats['tests_per_subgroup']} tests each)"
481+
)
482+
rich.print(f" Max group size: {max_group_size}")
483+
else:
484+
config.xdist_group_mapper = None
485+
348486
for fixture_format in BaseFixture.formats.values():
349487
config.addinivalue_line(
350488
"markers",
@@ -416,6 +554,7 @@ def pytest_generate_tests(metafunc):
416554
return
417555

418556
test_cases = metafunc.config.test_cases
557+
xdist_group_mapper = getattr(metafunc.config, "xdist_group_mapper", None)
419558
param_list = []
420559
for test_case in test_cases:
421560
if test_case.format.format_name not in metafunc.config._supported_fixture_formats:
@@ -427,12 +566,23 @@ def pytest_generate_tests(metafunc):
427566
if hasattr(test_case, "pre_hash") and test_case.pre_hash:
428567
test_id = f"{test_case.id}[{test_case.pre_hash[:8]}]"
429568

569+
# Determine xdist group name
570+
if xdist_group_mapper and hasattr(test_case, "pre_hash") and test_case.pre_hash:
571+
# Use the mapper to get potentially split group name
572+
xdist_group_name = xdist_group_mapper.get_xdist_group_name(test_case)
573+
elif hasattr(test_case, "pre_hash") and test_case.pre_hash:
574+
# No mapper or not enginex, use pre_hash directly
575+
xdist_group_name = test_case.pre_hash
576+
else:
577+
# No pre_hash, use test ID
578+
xdist_group_name = test_case.id
579+
430580
param = pytest.param(
431581
test_case,
432582
id=test_id,
433583
marks=[getattr(pytest.mark, m) for m in fork_markers]
434584
+ [getattr(pytest.mark, test_case.format.format_name)]
435-
+ [pytest.mark.xdist_group(name=test_case.pre_hash)],
585+
+ [pytest.mark.xdist_group(name=xdist_group_name)],
436586
)
437587
param_list.append(param)
438588

src/pytest_plugins/consume/simulators/enginex/conftest.py

Lines changed: 13 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -39,6 +39,16 @@ def pytest_addoption(parser):
3939
"pre-allocation group."
4040
),
4141
)
42+
enginex_group.addoption(
43+
"--enginex-max-group-size",
44+
action="store",
45+
type=int,
46+
default=400,
47+
help=(
48+
"Maximum number of tests per xdist group. Large pre-allocation groups will be "
49+
"split into virtual sub-groups to improve load balancing. Default: 400."
50+
),
51+
)
4252

4353

4454
def pytest_configure(config):
@@ -48,6 +58,9 @@ def pytest_configure(config):
4858
# Store FCU frequency on config for access by fixtures
4959
config.enginex_fcu_frequency = config.getoption("--enginex-fcu-frequency", 1)
5060

61+
# Store max group size on config for access during test generation
62+
config.enginex_max_group_size = config.getoption("--enginex-max-group-size", 400)
63+
5164

5265
@pytest.fixture(scope="module")
5366
def test_suite_name() -> str:

0 commit comments

Comments
 (0)