Skip to content

Commit 7f72f59

Browse files
committed
feat(consume): create pre_hash subgroups w/max size for better xdist balancing
1 parent c4f95f9 commit 7f72f59

File tree

4 files changed

+172
-7
lines changed

4 files changed

+172
-7
lines changed

src/cli/pytest_commands/consume.py

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -124,7 +124,7 @@ def engine() -> None:
124124
def enginex(enginex_fcu_frequency: int, pytest_args: List[str], **_kwargs) -> None:
125125
"""Client consumes Engine X Fixtures via the Engine API."""
126126
command_name = "enginex"
127-
command_paths = get_command_paths(command_name, is_hive=True)
127+
static_test_paths = get_static_test_paths(command_name, is_hive=True)
128128

129129
# Validate the frequency parameter
130130
if enginex_fcu_frequency < 0:
@@ -133,7 +133,9 @@ def enginex(enginex_fcu_frequency: int, pytest_args: List[str], **_kwargs) -> No
133133
# Add the FCU frequency to pytest args as a custom config option
134134
pytest_args_with_fcu = [f"--enginex-fcu-frequency={enginex_fcu_frequency}"] + list(pytest_args)
135135

136-
consume_cmd = ConsumeCommand(command_paths, is_hive=True, command_name=command_name)
136+
consume_cmd = create_consume_command(
137+
static_test_paths=static_test_paths, is_hive=True, command_name=command_name
138+
)
137139
consume_cmd.execute(pytest_args_with_fcu)
138140

139141

src/cli/pytest_commands/processors.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -110,7 +110,7 @@ def process_args(self, args: List[str]) -> List[str]:
110110
if self.command_name == "engine":
111111
modified_args.extend(["-p", "pytest_plugins.consume.simulators.engine.conftest"])
112112
elif self.command_name == "enginex":
113-
modified_args.extend(["-p", "pytest_plugins.consume.simulators.enginex.conftest"])
113+
modified_args.extend(["-p", "pytest_plugins.consume.simulators.engine_reorg.conftest"])
114114
elif self.command_name == "rlp":
115115
modified_args.extend(["-p", "pytest_plugins.consume.simulators.rlp.conftest"])
116116
else:

src/pytest_plugins/consume/consume.py

Lines changed: 154 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -1,12 +1,20 @@
1-
"""A pytest plugin providing common functionality for consuming test fixtures."""
1+
"""
2+
A pytest plugin providing common functionality for consuming test fixtures.
23
4+
Features:
5+
- Downloads and caches test fixtures from various sources (local, URL, release).
6+
- Manages test case generation from fixture files.
7+
- Provides xdist load balancing for large pre-allocation groups (enginex simulator).
8+
"""
9+
10+
import logging
311
import re
412
import sys
513
import tarfile
614
from dataclasses import dataclass
715
from io import BytesIO
816
from pathlib import Path
9-
from typing import List, Optional, Tuple
17+
from typing import Dict, List, Optional, Tuple
1018
from urllib.parse import urlparse
1119

1220
import platformdirs
@@ -15,18 +23,125 @@
1523
import rich
1624

1725
from cli.gen_index import generate_fixtures_index
18-
from ethereum_test_fixtures import BaseFixture
26+
from ethereum_test_fixtures import BaseFixture, BlockchainEngineXFixture
1927
from ethereum_test_fixtures.consume import IndexFile, TestCases
2028
from ethereum_test_forks import get_forks, get_relative_fork_markers, get_transition_forks
2129
from ethereum_test_tools.utility.versioning import get_current_commit_hash_or_tag
2230

2331
from .releases import ReleaseTag, get_release_page_url, get_release_url, is_release_url, is_url
2432

33+
logger = logging.getLogger(__name__)
34+
2535
CACHED_DOWNLOADS_DIRECTORY = (
2636
Path(platformdirs.user_cache_dir("ethereum-execution-spec-tests")) / "cached_downloads"
2737
)
2838

2939

40+
class XDistGroupMapper:
41+
"""
42+
Maps test cases to xdist groups, splitting large pre-allocation groups into sub-groups.
43+
44+
This class helps improve load balancing when using pytest-xdist with --dist=loadgroup
45+
by breaking up large pre-allocation groups (e.g., 1000+ tests) into smaller virtual
46+
sub-groups while maintaining the constraint that tests from the same pre-allocation
47+
group must run on the same worker.
48+
"""
49+
50+
def __init__(self, max_group_size: int = 400):
51+
"""Initialize the mapper with a maximum group size."""
52+
self.max_group_size = max_group_size
53+
self.group_sizes: Dict[str, int] = {}
54+
self.test_to_subgroup: Dict[str, int] = {}
55+
self._built = False
56+
57+
def build_mapping(self, test_cases: TestCases) -> None:
58+
"""
59+
Build the mapping of test cases to sub-groups.
60+
61+
This analyzes all test cases and determines which pre-allocation groups
62+
need to be split into sub-groups based on the max_group_size.
63+
"""
64+
if self._built:
65+
return
66+
67+
# Count tests per pre-allocation group
68+
for test_case in test_cases:
69+
if hasattr(test_case, "pre_hash") and test_case.pre_hash:
70+
pre_hash = test_case.pre_hash
71+
self.group_sizes[pre_hash] = self.group_sizes.get(pre_hash, 0) + 1
72+
73+
# Assign sub-groups for large groups
74+
group_counters: Dict[str, int] = {}
75+
for test_case in test_cases:
76+
if hasattr(test_case, "pre_hash") and test_case.pre_hash:
77+
pre_hash = test_case.pre_hash
78+
group_size = self.group_sizes[pre_hash]
79+
80+
if group_size <= self.max_group_size:
81+
# Small group, no sub-group needed
82+
self.test_to_subgroup[test_case.id] = 0
83+
else:
84+
# Large group, assign to sub-group using round-robin
85+
counter = group_counters.get(pre_hash, 0)
86+
sub_group = counter // self.max_group_size
87+
self.test_to_subgroup[test_case.id] = sub_group
88+
group_counters[pre_hash] = counter + 1
89+
90+
self._built = True
91+
92+
# Log summary of large groups
93+
large_groups = [
94+
(pre_hash, size)
95+
for pre_hash, size in self.group_sizes.items()
96+
if size > self.max_group_size
97+
]
98+
if large_groups:
99+
logger.info(
100+
f"Found {len(large_groups)} pre-allocation groups larger than "
101+
f"{self.max_group_size} tests that will be split for better load balancing"
102+
)
103+
104+
def get_xdist_group_name(self, test_case) -> str:
105+
"""
106+
Get the xdist group name for a test case.
107+
108+
For small groups, returns the pre_hash as-is.
109+
For large groups, returns "{pre_hash}:{sub_group_index}".
110+
"""
111+
if not hasattr(test_case, "pre_hash") or not test_case.pre_hash:
112+
# No pre_hash, use test ID as fallback
113+
return test_case.id
114+
115+
pre_hash = test_case.pre_hash
116+
group_size = self.group_sizes.get(pre_hash, 0)
117+
118+
if group_size <= self.max_group_size:
119+
# Small group, use pre_hash as-is
120+
return pre_hash
121+
122+
# Large group, include sub-group index
123+
sub_group = self.test_to_subgroup.get(test_case.id, 0)
124+
return f"{pre_hash}:{sub_group}"
125+
126+
def get_split_statistics(self) -> Dict[str, Dict[str, int]]:
127+
"""
128+
Get statistics about how groups were split.
129+
130+
Returns a dict with information about each pre-allocation group
131+
and how many sub-groups it was split into.
132+
"""
133+
stats = {}
134+
for pre_hash, size in self.group_sizes.items():
135+
if size > self.max_group_size:
136+
num_subgroups = (size + self.max_group_size - 1) // self.max_group_size
137+
stats[pre_hash] = {
138+
"total_tests": size,
139+
"num_subgroups": num_subgroups,
140+
"tests_per_subgroup": size // num_subgroups,
141+
}
142+
return stats
143+
144+
30145
def default_input() -> str:
31146
"""
32147
Directory (default) to consume generated test fixtures from. Defined as a
@@ -357,6 +472,29 @@ def pytest_configure(config): # noqa: D103
357472
index = IndexFile.model_validate_json(index_file.read_text())
358473
config.test_cases = index.test_cases
359474

475+
# Create XDistGroupMapper for enginex simulator if needed
476+
if (
477+
hasattr(config, "_supported_fixture_formats")
478+
and BlockchainEngineXFixture.format_name in config._supported_fixture_formats
479+
):
480+
max_group_size = getattr(config, "enginex_max_group_size", 400)
481+
config.xdist_group_mapper = XDistGroupMapper(max_group_size)
482+
config.xdist_group_mapper.build_mapping(config.test_cases)
483+
484+
# Log statistics about group splitting
485+
split_stats = config.xdist_group_mapper.get_split_statistics()
486+
if split_stats:
487+
rich.print("[bold yellow]Pre-allocation group splitting for load balancing:[/]")
488+
for pre_hash, stats in split_stats.items():
489+
rich.print(
490+
f" Group {pre_hash[:8]}: {stats['total_tests']} tests → "
491+
f"{stats['num_subgroups']} sub-groups "
492+
f"(~{stats['tests_per_subgroup']} tests each)"
493+
)
494+
rich.print(f" Max group size: {max_group_size}")
495+
else:
496+
config.xdist_group_mapper = None
497+
360498
for fixture_format in BaseFixture.formats.values():
361499
config.addinivalue_line(
362500
"markers",
@@ -428,6 +566,7 @@ def pytest_generate_tests(metafunc):
428566
return
429567

430568
test_cases = metafunc.config.test_cases
569+
xdist_group_mapper = getattr(metafunc.config, "xdist_group_mapper", None)
431570
param_list = []
432571
for test_case in test_cases:
433572
if test_case.format.format_name not in metafunc.config._supported_fixture_formats:
@@ -439,12 +578,23 @@ def pytest_generate_tests(metafunc):
439578
if hasattr(test_case, "pre_hash") and test_case.pre_hash:
440579
test_id = f"{test_case.id}[{test_case.pre_hash[:8]}]"
441580

581+
# Determine xdist group name
582+
if xdist_group_mapper and hasattr(test_case, "pre_hash") and test_case.pre_hash:
583+
# Use the mapper to get potentially split group name
584+
xdist_group_name = xdist_group_mapper.get_xdist_group_name(test_case)
585+
elif hasattr(test_case, "pre_hash") and test_case.pre_hash:
586+
# No mapper or not enginex, use pre_hash directly
587+
xdist_group_name = test_case.pre_hash
588+
else:
589+
# No pre_hash, use test ID
590+
xdist_group_name = test_case.id
591+
442592
param = pytest.param(
443593
test_case,
444594
id=test_id,
445595
marks=[getattr(pytest.mark, m) for m in fork_markers]
446596
+ [getattr(pytest.mark, test_case.format.format_name)]
447-
+ [pytest.mark.xdist_group(name=test_case.pre_hash)],
597+
+ [pytest.mark.xdist_group(name=xdist_group_name)],
448598
)
449599
param_list.append(param)
450600

src/pytest_plugins/consume/simulators/enginex/conftest.py

Lines changed: 13 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -39,6 +39,16 @@ def pytest_addoption(parser):
3939
"pre-allocation group."
4040
),
4141
)
42+
enginex_group.addoption(
43+
"--enginex-max-group-size",
44+
action="store",
45+
type=int,
46+
default=400,
47+
help=(
48+
"Maximum number of tests per xdist group. Large pre-allocation groups will be "
49+
"split into virtual sub-groups to improve load balancing. Default: 400."
50+
),
51+
)
4252

4353

4454
def pytest_configure(config):
@@ -48,6 +58,9 @@ def pytest_configure(config):
4858
# Store FCU frequency on config for access by fixtures
4959
config.enginex_fcu_frequency = config.getoption("--enginex-fcu-frequency", 1)
5060

61+
# Store max group size on config for access during test generation
62+
config.enginex_max_group_size = config.getoption("--enginex-max-group-size", 400)
63+
5164

5265
@pytest.fixture(scope="module")
5366
def test_suite_name() -> str:

0 commit comments

Comments
 (0)