Skip to content

Commit 776b315

Browse files
committed
feat(consume): add --enginex-max-group-size to load-balance xdist
This create pre_hash subgroups w/max size for better xdist balancing. - Replace function name check with fixture format detection to properly distinguish between engine and enginex simulators. Both use the same test function name but process different fixture formats: - Engine simulator: "blockchain_test_engine" → standard parametrization - EngineX simulator: "blockchain_test_engine_x" → enhanced parametrization with xdist group splitting and load balancing - Fixes unit test failures by checking for 'test_case' parameter presence instead of maintaining an allowlist of function names.
1 parent 50a37e6 commit 776b315

File tree

7 files changed

+822
-219
lines changed

7 files changed

+822
-219
lines changed

src/pytest_plugins/consume/consume.py

Lines changed: 192 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -1,12 +1,20 @@
1-
"""A pytest plugin providing common functionality for consuming test fixtures."""
1+
"""
2+
A pytest plugin providing common functionality for consuming test fixtures.
23
4+
Features:
5+
- Downloads and caches test fixtures from various sources (local, URL, release).
6+
- Manages test case generation from fixture files.
7+
- Provides xdist load balancing for large pre-allocation groups (enginex simulator).
8+
"""
9+
10+
import logging
311
import re
412
import sys
513
import tarfile
614
from dataclasses import dataclass
715
from io import BytesIO
816
from pathlib import Path
9-
from typing import List, Optional, Tuple
17+
from typing import Dict, List, Optional, Tuple
1018
from urllib.parse import urlparse
1119

1220
import platformdirs
@@ -22,11 +30,118 @@
2230

2331
from .releases import ReleaseTag, get_release_page_url, get_release_url, is_release_url, is_url
2432

33+
logger = logging.getLogger(__name__)
34+
2535
CACHED_DOWNLOADS_DIRECTORY = (
2636
Path(platformdirs.user_cache_dir("ethereum-execution-spec-tests")) / "cached_downloads"
2737
)
2838

2939

40+
class XDistGroupMapper:
41+
"""
42+
Maps test cases to xdist groups, splitting large pre-allocation groups into sub-groups.
43+
44+
This class helps improve load balancing when using pytest-xdist with --dist=loadgroup
45+
by breaking up large pre-allocation groups (e.g., 1000+ tests) into smaller virtual
46+
sub-groups while maintaining the constraint that tests from the same pre-allocation
47+
group must run on the same worker.
48+
"""
49+
50+
def __init__(self, max_group_size: int = 100):
51+
"""Initialize the mapper with a maximum group size."""
52+
self.max_group_size = max_group_size
53+
self.group_sizes: Dict[str, int] = {}
54+
self.test_to_subgroup: Dict[str, int] = {}
55+
self._built = False
56+
57+
def build_mapping(self, test_cases: TestCases) -> None:
58+
"""
59+
Build the mapping of test cases to sub-groups.
60+
61+
This analyzes all test cases and determines which pre-allocation groups
62+
need to be split into sub-groups based on the max_group_size.
63+
"""
64+
if self._built:
65+
return
66+
67+
# Count tests per pre-allocation group
68+
for test_case in test_cases:
69+
if hasattr(test_case, "pre_hash") and test_case.pre_hash:
70+
pre_hash = test_case.pre_hash
71+
self.group_sizes[pre_hash] = self.group_sizes.get(pre_hash, 0) + 1
72+
73+
# Assign sub-groups for large groups
74+
group_counters: Dict[str, int] = {}
75+
for test_case in test_cases:
76+
if hasattr(test_case, "pre_hash") and test_case.pre_hash:
77+
pre_hash = test_case.pre_hash
78+
group_size = self.group_sizes[pre_hash]
79+
80+
if group_size <= self.max_group_size:
81+
# Small group, no sub-group needed
82+
self.test_to_subgroup[test_case.id] = 0
83+
else:
84+
# Large group, assign to sub-group using round-robin
85+
counter = group_counters.get(pre_hash, 0)
86+
sub_group = counter // self.max_group_size
87+
self.test_to_subgroup[test_case.id] = sub_group
88+
group_counters[pre_hash] = counter + 1
89+
90+
self._built = True
91+
92+
# Log summary of large groups
93+
large_groups = [
94+
(pre_hash, size)
95+
for pre_hash, size in self.group_sizes.items()
96+
if size > self.max_group_size
97+
]
98+
if large_groups:
99+
logger.info(
100+
f"Found {len(large_groups)} pre-allocation groups larger than "
101+
f"{self.max_group_size} tests that will be split for better load balancing"
102+
)
103+
104+
def get_xdist_group_name(self, test_case) -> str:
105+
"""
106+
Get the xdist group name for a test case.
107+
108+
For small groups, returns the pre_hash as-is.
109+
For large groups, returns "{pre_hash}:{sub_group_index}".
110+
"""
111+
if not hasattr(test_case, "pre_hash") or not test_case.pre_hash:
112+
# No pre_hash, use test ID as fallback
113+
return test_case.id
114+
115+
pre_hash = test_case.pre_hash
116+
group_size = self.group_sizes.get(pre_hash, 0)
117+
118+
if group_size <= self.max_group_size:
119+
# Small group, use pre_hash as-is
120+
return pre_hash
121+
122+
# Large group, include sub-group index
123+
sub_group = self.test_to_subgroup.get(test_case.id, 0)
124+
return f"{pre_hash}:{sub_group}"
125+
126+
def get_split_statistics(self) -> Dict[str, Dict[str, int]]:
127+
"""
128+
Get statistics about how groups were split.
129+
130+
Returns a dict with information about each pre-allocation group
131+
and how many sub-groups it was split into.
132+
"""
133+
stats = {}
134+
for pre_hash, size in self.group_sizes.items():
135+
if size > self.max_group_size:
136+
num_subgroups = (size + self.max_group_size - 1) // self.max_group_size
137+
stats[pre_hash] = {
138+
"total_tests": size,
139+
"num_subgroups": num_subgroups,
140+
"average_tests_per_subgroup": size // num_subgroups,
141+
}
142+
return stats
143+
144+
30145
def default_input() -> str:
31146
"""
32147
Directory (default) to consume generated test fixtures from. Defined as a
@@ -419,6 +534,28 @@ def pytest_configure(config): # noqa: D103
419534
index = IndexFile.model_validate_json(index_file.read_text())
420535
config.test_cases = index.test_cases
421536

537+
# Create XDistGroupMapper for enginex simulator if enginex options are present
538+
try:
539+
max_group_size = config.getoption("--enginex-max-group-size", None)
540+
if max_group_size is not None:
541+
config.xdist_group_mapper = XDistGroupMapper(max_group_size)
542+
config.xdist_group_mapper.build_mapping(config.test_cases)
543+
544+
split_stats = config.xdist_group_mapper.get_split_statistics()
545+
if split_stats and config.option.verbose >= 1:
546+
rich.print("[bold yellow]Pre-allocation group splitting for load balancing:[/]")
547+
for pre_hash, stats in split_stats.items():
548+
rich.print(
549+
f" Group {pre_hash[:8]}: {stats['total_tests']} tests → "
550+
f"{stats['num_subgroups']} sub-groups "
551+
f"(~{stats['average_tests_per_subgroup']} tests each)"
552+
)
553+
rich.print(f" Max group size: {max_group_size}")
554+
else:
555+
config.xdist_group_mapper = None
556+
except ValueError:
557+
config.xdist_group_mapper = None
558+
422559
for fixture_format in BaseFixture.formats.values():
423560
config.addinivalue_line(
424561
"markers",
@@ -485,29 +622,70 @@ def pytest_generate_tests(metafunc):
485622
"""
486623
Generate test cases for every test fixture in all the JSON fixture files
487624
within the specified fixtures directory, or read from stdin if the directory is 'stdin'.
625+
626+
This function only applies to the test_blockchain_via_engine test function
627+
to avoid conflicts with other consume simulators.
488628
"""
489629
if "cache" in sys.argv:
490630
return
491631

632+
# Only apply to functions that have a 'test_case' parameter (consume test functions)
633+
if "test_case" not in metafunc.fixturenames:
634+
return
635+
492636
test_cases = metafunc.config.test_cases
637+
xdist_group_mapper = getattr(metafunc.config, "xdist_group_mapper", None)
493638
param_list = []
639+
640+
# Check if this is an enginex simulator (has enginex-specific enhancements)
641+
is_enginex_function = (
642+
hasattr(metafunc.config, "_supported_fixture_formats")
643+
and "blockchain_test_engine_x" in metafunc.config._supported_fixture_formats
644+
)
494645
for test_case in test_cases:
495-
if test_case.format.format_name not in metafunc.config._supported_fixture_formats:
646+
# Check if _supported_fixture_formats is set, if not allow all formats
647+
supported_formats = getattr(metafunc.config, "_supported_fixture_formats", None)
648+
if supported_formats and test_case.format.format_name not in supported_formats:
496649
continue
650+
497651
fork_markers = get_relative_fork_markers(test_case.fork, strict_mode=False)
498652

499-
# Append pre_hash (first 8 chars) to test ID for easier selection with --sim.limit
653+
# Basic test ID and markers (used by all consume tests)
500654
test_id = test_case.id
501-
if hasattr(test_case, "pre_hash") and test_case.pre_hash:
502-
test_id = f"{test_case.id}[{test_case.pre_hash[:8]}]"
503-
504-
param = pytest.param(
505-
test_case,
506-
id=test_id,
507-
marks=[getattr(pytest.mark, m) for m in fork_markers]
508-
+ [getattr(pytest.mark, test_case.format.format_name)]
509-
+ [pytest.mark.xdist_group(name=test_case.pre_hash)],
510-
)
655+
markers = [getattr(pytest.mark, m) for m in fork_markers] + [
656+
getattr(pytest.mark, test_case.format.format_name)
657+
]
658+
659+
# Apply enginex-specific enhancements only for enginex functions
660+
if is_enginex_function:
661+
# Determine xdist group name for enginex load balancing
662+
if xdist_group_mapper and hasattr(test_case, "pre_hash") and test_case.pre_hash:
663+
# Use the mapper to get potentially split group name
664+
xdist_group_name = xdist_group_mapper.get_xdist_group_name(test_case)
665+
elif hasattr(test_case, "pre_hash") and test_case.pre_hash:
666+
# No mapper or not enginex, use pre_hash directly
667+
xdist_group_name = test_case.pre_hash
668+
else:
669+
# No pre_hash, use test ID
670+
xdist_group_name = test_case.id
671+
672+
# Create enhanced test ID showing the xdist group name for easier identification
673+
if hasattr(test_case, "pre_hash") and test_case.pre_hash:
674+
# Show first 8 chars of xdist group name (includes sub-group if split)
675+
group_display = (
676+
xdist_group_name[:8] if len(xdist_group_name) > 8 else xdist_group_name
677+
)
678+
# If it's a split group (contains ':'), show that clearly
679+
if ":" in xdist_group_name:
680+
# Extract sub-group number for display
681+
pre_hash_part, sub_group = xdist_group_name.split(":", 1)
682+
group_display = f"{pre_hash_part[:8]}:{sub_group}"
683+
test_id = f"{test_case.id}[{group_display}]"
684+
685+
# Add xdist group marker for load balancing
686+
markers.append(pytest.mark.xdist_group(name=xdist_group_name))
687+
688+
param = pytest.param(test_case, id=test_id, marks=markers)
511689
param_list.append(param)
512690

513691
metafunc.parametrize("test_case", param_list)

src/pytest_plugins/consume/simulators/enginex/conftest.py

Lines changed: 43 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -31,6 +31,7 @@ def pytest_addoption(parser):
3131
enginex_group = parser.getgroup("enginex", "EngineX simulator options")
3232
enginex_group.addoption(
3333
"--enginex-fcu-frequency",
34+
dest="enginex_fcu_frequency",
3435
action="store",
3536
type=int,
3637
default=0,
@@ -40,15 +41,23 @@ def pytest_addoption(parser):
4041
"pre-allocation group."
4142
),
4243
)
44+
enginex_group.addoption(
45+
"--enginex-max-group-size",
46+
dest="enginex_max_group_size",
47+
action="store",
48+
type=int,
49+
default=100,
50+
help=(
51+
"Maximum number of tests per xdist group. Large pre-allocation groups will be "
52+
"split into virtual sub-groups to improve load balancing. Default: 100."
53+
),
54+
)
4355

4456

4557
def pytest_configure(config):
4658
"""Set the supported fixture formats and store enginex configuration."""
4759
config._supported_fixture_formats = [BlockchainEngineXFixture.format_name]
4860

49-
# Store FCU frequency on config for access by fixtures
50-
config.enginex_fcu_frequency = config.getoption("--enginex-fcu-frequency", 1)
51-
5261

5362
@pytest.fixture(scope="module")
5463
def test_suite_name() -> str:
@@ -67,11 +76,11 @@ def test_suite_description() -> str:
6776

6877
def pytest_collection_modifyitems(session, config, items):
6978
"""
70-
Build pre-allocation group test counts during collection phase.
79+
Build group test counts during collection phase.
7180
7281
This hook analyzes all collected test items to determine how many tests
73-
belong to each pre-allocation group, enabling automatic client cleanup
74-
when all tests in a group are complete.
82+
belong to each group (pre-allocation groups or xdist subgroups), enabling
83+
automatic client cleanup when all tests in a group are complete.
7584
"""
7685
# Only process items for enginex simulator
7786
if not hasattr(config, "_supported_fixture_formats"):
@@ -89,28 +98,46 @@ def pytest_collection_modifyitems(session, config, items):
8998
if hasattr(item, "callspec") and "test_case" in item.callspec.params:
9099
test_case = item.callspec.params["test_case"]
91100
if hasattr(test_case, "pre_hash"):
92-
pre_hash = test_case.pre_hash
93-
group_counts[pre_hash] = group_counts.get(pre_hash, 0) + 1
101+
# Get group identifier from xdist marker if available
102+
group_identifier = None
103+
for marker in item.iter_markers("xdist_group"):
104+
if hasattr(marker, "kwargs") and "name" in marker.kwargs:
105+
group_identifier = marker.kwargs["name"]
106+
break
107+
108+
# Fallback to pre_hash if no xdist marker (sequential execution)
109+
if group_identifier is None:
110+
group_identifier = test_case.pre_hash
111+
112+
group_counts[group_identifier] = group_counts.get(group_identifier, 0) + 1
94113

95114
# Store on session for later retrieval by test_tracker fixture
96115
session._pre_alloc_group_counts = group_counts
97-
logger.info(
98-
f"Collected {len(group_counts)} pre-allocation groups with tests: {dict(group_counts)}"
99-
)
116+
logger.info(f"Collected {len(group_counts)} groups with tests: {dict(group_counts)}")
100117
else:
101118
# Update tracker directly if it exists
102119
group_counts = {}
103120
for item in items:
104121
if hasattr(item, "callspec") and "test_case" in item.callspec.params:
105122
test_case = item.callspec.params["test_case"]
106123
if hasattr(test_case, "pre_hash"):
107-
pre_hash = test_case.pre_hash
108-
group_counts[pre_hash] = group_counts.get(pre_hash, 0) + 1
124+
# Get group identifier from xdist marker if available
125+
group_identifier = None
126+
for marker in item.iter_markers("xdist_group"):
127+
if hasattr(marker, "kwargs") and "name" in marker.kwargs:
128+
group_identifier = marker.kwargs["name"]
129+
break
130+
131+
# Fallback to pre_hash if no xdist marker (sequential execution)
132+
if group_identifier is None:
133+
group_identifier = test_case.pre_hash
134+
135+
group_counts[group_identifier] = group_counts.get(group_identifier, 0) + 1
109136

110-
for pre_hash, count in group_counts.items():
111-
test_tracker.set_group_test_count(pre_hash, count)
137+
for group_identifier, count in group_counts.items():
138+
test_tracker.set_group_test_count(group_identifier, count)
112139

113-
logger.info(f"Updated test tracker with {len(group_counts)} pre-allocation groups")
140+
logger.info(f"Updated test tracker with {len(group_counts)} groups")
114141

115142

116143
@pytest.fixture(scope="function")

0 commit comments

Comments
 (0)