diff --git a/src/cli/pytest_commands/consume.py b/src/cli/pytest_commands/consume.py index 5e14413c80b..15b23afa0bf 100644 --- a/src/cli/pytest_commands/consume.py +++ b/src/cli/pytest_commands/consume.py @@ -44,9 +44,13 @@ def get_command_logic_test_paths(command_name: str, is_hive: bool) -> List[Path] command_logic_test_paths = [ base_path / "simulators" / "simulator_logic" / f"test_via_{cmd}.py" for cmd in commands ] - elif command_name in ["engine", "rlp"]: + elif command_name in ["engine", "enginex"]: command_logic_test_paths = [ - base_path / "simulators" / "simulator_logic" / f"test_via_{command_name}.py" + base_path / "simulators" / "simulator_logic" / "test_via_engine.py" + ] + elif command_name == "rlp": + command_logic_test_paths = [ + base_path / "simulators" / "simulator_logic" / "test_via_rlp.py" ] elif command_name == "direct": command_logic_test_paths = [base_path / "direct" / "test_via_direct.py"] @@ -103,13 +107,19 @@ def rlp() -> None: @consume_command(is_hive=True) def engine() -> None: - """Client consumes via the Engine API.""" + """Client consumes Engine Fixtures via the Engine API.""" + pass + + +@consume_command(is_hive=True) +def enginex() -> None: + """Client consumes Engine X Fixtures via the Engine API.""" pass @consume_command(is_hive=True) def hive() -> None: - """Client consumes via all available hive methods (rlp, engine).""" + """Client consumes via rlp & engine hive methods.""" pass diff --git a/src/cli/pytest_commands/processors.py b/src/cli/pytest_commands/processors.py index e7948962704..71a13152a5f 100644 --- a/src/cli/pytest_commands/processors.py +++ b/src/cli/pytest_commands/processors.py @@ -101,6 +101,12 @@ def process_args(self, args: List[str]) -> List[str]: if self.command_name == "engine": modified_args.extend(["-p", "pytest_plugins.consume.simulators.engine.conftest"]) + elif self.command_name == "enginex": + modified_args.extend(["-p", "pytest_plugins.consume.simulators.enginex.conftest"]) + if ( + self._has_parallelism_flag(args) or "-n" in modified_args + ) and "--dist" not in modified_args: + modified_args.extend(["--dist=loadgroup"]) elif self.command_name == "rlp": modified_args.extend(["-p", "pytest_plugins.consume.simulators.rlp.conftest"]) else: diff --git a/src/pytest_plugins/consume/consume.py b/src/pytest_plugins/consume/consume.py index ffe2cecb9e1..3906fb63bfc 100644 --- a/src/pytest_plugins/consume/consume.py +++ b/src/pytest_plugins/consume/consume.py @@ -1,12 +1,20 @@ -"""A pytest plugin providing common functionality for consuming test fixtures.""" +""" +A pytest plugin providing common functionality for consuming test fixtures. +Features: +- Downloads and caches test fixtures from various sources (local, URL, release). +- Manages test case generation from fixture files. +- Provides xdist load balancing for large pre-allocation groups (enginex simulator). +""" + +import logging import re import sys import tarfile from dataclasses import dataclass from io import BytesIO from pathlib import Path -from typing import List, Optional, Tuple +from typing import Dict, List, Optional, Tuple from urllib.parse import urlparse import platformdirs @@ -22,11 +30,118 @@ from .releases import ReleaseTag, get_release_page_url, get_release_url, is_release_url, is_url +logger = logging.getLogger(__name__) + CACHED_DOWNLOADS_DIRECTORY = ( Path(platformdirs.user_cache_dir("ethereum-execution-spec-tests")) / "cached_downloads" ) +class XDistGroupMapper: + """ + Maps test cases to xdist groups, splitting large pre-allocation groups into sub-groups. + + This class helps improve load balancing when using pytest-xdist with --dist=loadgroup + by breaking up large pre-allocation groups (e.g., 1000+ tests) into smaller virtual + sub-groups while maintaining the constraint that tests from the same pre-allocation + group must run on the same worker. + """ + + def __init__(self, max_group_size: int = 100): + """Initialize the mapper with a maximum group size.""" + self.max_group_size = max_group_size + self.group_sizes: Dict[str, int] = {} + self.test_to_subgroup: Dict[str, int] = {} + self._built = False + + def build_mapping(self, test_cases: TestCases) -> None: + """ + Build the mapping of test cases to sub-groups. + + This analyzes all test cases and determines which pre-allocation groups + need to be split into sub-groups based on the max_group_size. + """ + if self._built: + return + + # Count tests per pre-allocation group + for test_case in test_cases: + if hasattr(test_case, "pre_hash") and test_case.pre_hash: + pre_hash = test_case.pre_hash + self.group_sizes[pre_hash] = self.group_sizes.get(pre_hash, 0) + 1 + + # Assign sub-groups for large groups + group_counters: Dict[str, int] = {} + for test_case in test_cases: + if hasattr(test_case, "pre_hash") and test_case.pre_hash: + pre_hash = test_case.pre_hash + group_size = self.group_sizes[pre_hash] + + if group_size <= self.max_group_size: + # Small group, no sub-group needed + self.test_to_subgroup[test_case.id] = 0 + else: + # Large group, assign to sub-group using round-robin + counter = group_counters.get(pre_hash, 0) + sub_group = counter // self.max_group_size + self.test_to_subgroup[test_case.id] = sub_group + group_counters[pre_hash] = counter + 1 + + self._built = True + + # Log summary of large groups + large_groups = [ + (pre_hash, size) + for pre_hash, size in self.group_sizes.items() + if size > self.max_group_size + ] + if large_groups: + logger.info( + f"Found {len(large_groups)} pre-allocation groups larger than " + f"{self.max_group_size} tests that will be split for better load balancing" + ) + + def get_xdist_group_name(self, test_case) -> str: + """ + Get the xdist group name for a test case. + + For small groups, returns the pre_hash as-is. + For large groups, returns "{pre_hash}:{sub_group_index}". + """ + if not hasattr(test_case, "pre_hash") or not test_case.pre_hash: + # No pre_hash, use test ID as fallback + return test_case.id + + pre_hash = test_case.pre_hash + group_size = self.group_sizes.get(pre_hash, 0) + + if group_size <= self.max_group_size: + # Small group, use pre_hash as-is + return pre_hash + + # Large group, include sub-group index + sub_group = self.test_to_subgroup.get(test_case.id, 0) + return f"{pre_hash}:{sub_group}" + + def get_split_statistics(self) -> Dict[str, Dict[str, int]]: + """ + Get statistics about how groups were split. + + Returns a dict with information about each pre-allocation group + and how many sub-groups it was split into. + """ + stats = {} + for pre_hash, size in self.group_sizes.items(): + if size > self.max_group_size: + num_subgroups = (size + self.max_group_size - 1) // self.max_group_size + stats[pre_hash] = { + "total_tests": size, + "num_subgroups": num_subgroups, + "average_tests_per_subgroup": size // num_subgroups, + } + return stats + + def default_input() -> str: """ Directory (default) to consume generated test fixtures from. Defined as a @@ -419,6 +534,28 @@ def pytest_configure(config): # noqa: D103 index = IndexFile.model_validate_json(index_file.read_text()) config.test_cases = index.test_cases + # Create XDistGroupMapper for enginex simulator if enginex options are present + try: + max_group_size = config.getoption("--enginex-max-group-size", None) + if max_group_size is not None: + config.xdist_group_mapper = XDistGroupMapper(max_group_size) + config.xdist_group_mapper.build_mapping(config.test_cases) + + split_stats = config.xdist_group_mapper.get_split_statistics() + if split_stats and config.option.verbose >= 1: + rich.print("[bold yellow]Pre-allocation group splitting for load balancing:[/]") + for pre_hash, stats in split_stats.items(): + rich.print( + f" Group {pre_hash[:8]}: {stats['total_tests']} tests → " + f"{stats['num_subgroups']} sub-groups " + f"(~{stats['average_tests_per_subgroup']} tests each)" + ) + rich.print(f" Max group size: {max_group_size}") + else: + config.xdist_group_mapper = None + except ValueError: + config.xdist_group_mapper = None + for fixture_format in BaseFixture.formats.values(): config.addinivalue_line( "markers", @@ -485,22 +622,70 @@ def pytest_generate_tests(metafunc): """ Generate test cases for every test fixture in all the JSON fixture files within the specified fixtures directory, or read from stdin if the directory is 'stdin'. + + This function only applies to the test_blockchain_via_engine test function + to avoid conflicts with other consume simulators. """ if "cache" in sys.argv: return + # Only apply to functions that have a 'test_case' parameter (consume test functions) + if "test_case" not in metafunc.fixturenames: + return + test_cases = metafunc.config.test_cases + xdist_group_mapper = getattr(metafunc.config, "xdist_group_mapper", None) param_list = [] + + # Check if this is an enginex simulator (has enginex-specific enhancements) + is_enginex_function = ( + hasattr(metafunc.config, "_supported_fixture_formats") + and "blockchain_test_engine_x" in metafunc.config._supported_fixture_formats + ) for test_case in test_cases: - if test_case.format.format_name not in metafunc.config._supported_fixture_formats: + # Check if _supported_fixture_formats is set, if not allow all formats + supported_formats = getattr(metafunc.config, "_supported_fixture_formats", None) + if supported_formats and test_case.format.format_name not in supported_formats: continue + fork_markers = get_relative_fork_markers(test_case.fork, strict_mode=False) - param = pytest.param( - test_case, - id=test_case.id, - marks=[getattr(pytest.mark, m) for m in fork_markers] - + [getattr(pytest.mark, test_case.format.format_name)], - ) + + # Basic test ID and markers (used by all consume tests) + test_id = test_case.id + markers = [getattr(pytest.mark, m) for m in fork_markers] + [ + getattr(pytest.mark, test_case.format.format_name) + ] + + # Apply enginex-specific enhancements only for enginex functions + if is_enginex_function: + # Determine xdist group name for enginex load balancing + if xdist_group_mapper and hasattr(test_case, "pre_hash") and test_case.pre_hash: + # Use the mapper to get potentially split group name + xdist_group_name = xdist_group_mapper.get_xdist_group_name(test_case) + elif hasattr(test_case, "pre_hash") and test_case.pre_hash: + # No mapper or not enginex, use pre_hash directly + xdist_group_name = test_case.pre_hash + else: + # No pre_hash, use test ID + xdist_group_name = test_case.id + + # Create enhanced test ID showing the xdist group name for easier identification + if hasattr(test_case, "pre_hash") and test_case.pre_hash: + # Show first 8 chars of xdist group name (includes sub-group if split) + group_display = ( + xdist_group_name[:8] if len(xdist_group_name) > 8 else xdist_group_name + ) + # If it's a split group (contains ':'), show that clearly + if ":" in xdist_group_name: + # Extract sub-group number for display + pre_hash_part, sub_group = xdist_group_name.split(":", 1) + group_display = f"{pre_hash_part[:8]}:{sub_group}" + test_id = f"{test_case.id}[{group_display}]" + + # Add xdist group marker for load balancing + markers.append(pytest.mark.xdist_group(name=xdist_group_name)) + + param = pytest.param(test_case, id=test_id, marks=markers) param_list.append(param) metafunc.parametrize("test_case", param_list) diff --git a/src/pytest_plugins/consume/hive_engine_test/__init__.py b/src/pytest_plugins/consume/hive_engine_test/__init__.py deleted file mode 100644 index 2d1322a5332..00000000000 --- a/src/pytest_plugins/consume/hive_engine_test/__init__.py +++ /dev/null @@ -1 +0,0 @@ -"""Hive engine test consumer plugin.""" diff --git a/src/pytest_plugins/consume/hive_simulators_reorg/__init__.py b/src/pytest_plugins/consume/hive_simulators_reorg/__init__.py deleted file mode 100644 index 59ca949d150..00000000000 --- a/src/pytest_plugins/consume/hive_simulators_reorg/__init__.py +++ /dev/null @@ -1 +0,0 @@ -"""Hive simulators reorganization consumer plugin.""" diff --git a/src/pytest_plugins/consume/simulators/engine/__init__.py b/src/pytest_plugins/consume/simulators/engine/__init__.py index 2cb194bb7d7..d0b290cdd44 100644 --- a/src/pytest_plugins/consume/simulators/engine/__init__.py +++ b/src/pytest_plugins/consume/simulators/engine/__init__.py @@ -1 +1 @@ -"""Consume Engine test functions.""" +"""The consume engine simulator.""" diff --git a/src/pytest_plugins/consume/simulators/engine/conftest.py b/src/pytest_plugins/consume/simulators/engine/conftest.py index e7cda8ffa7d..e01a0fbdc65 100644 --- a/src/pytest_plugins/consume/simulators/engine/conftest.py +++ b/src/pytest_plugins/consume/simulators/engine/conftest.py @@ -1,8 +1,4 @@ -""" -Pytest fixtures for the `consume engine` simulator. - -Configures the hive back-end & EL clients for each individual test execution. -""" +"""Pytest plugin for the `consume engine` simulator.""" import io from typing import Mapping @@ -29,6 +25,18 @@ def pytest_configure(config): config._supported_fixture_formats = [BlockchainEngineFixture.format_name] +@pytest.fixture(scope="module") +def test_suite_name() -> str: + """The name of the hive test suite used in this simulator.""" + return "eest/consume-engine" + + +@pytest.fixture(scope="module") +def test_suite_description() -> str: + """The description of the hive test suite used in this simulator.""" + return "Execute blockchain tests against clients using the Engine API." + + @pytest.fixture(scope="function") def engine_rpc(client: Client, client_exception_mapper: ExceptionMapper | None) -> EngineRPC: """Initialize engine RPC client for the execution client under test.""" @@ -42,18 +50,6 @@ def engine_rpc(client: Client, client_exception_mapper: ExceptionMapper | None) return EngineRPC(f"http://{client.ip}:8551") -@pytest.fixture(scope="module") -def test_suite_name() -> str: - """The name of the hive test suite used in this simulator.""" - return "eest/consume-engine" - - -@pytest.fixture(scope="module") -def test_suite_description() -> str: - """The description of the hive test suite used in this simulator.""" - return "Execute blockchain tests against clients using the Engine API." - - @pytest.fixture(scope="function") def client_files(buffered_genesis: io.BufferedReader) -> Mapping[str, io.BufferedReader]: """Define the files that hive will start the client with.""" diff --git a/src/pytest_plugins/consume/simulators/enginex/__init__.py b/src/pytest_plugins/consume/simulators/enginex/__init__.py new file mode 100644 index 00000000000..4ffa1ec0af5 --- /dev/null +++ b/src/pytest_plugins/consume/simulators/enginex/__init__.py @@ -0,0 +1 @@ +"""The consume enginex simulator.""" diff --git a/src/pytest_plugins/consume/simulators/enginex/conftest.py b/src/pytest_plugins/consume/simulators/enginex/conftest.py new file mode 100644 index 00000000000..5d29a312d0f --- /dev/null +++ b/src/pytest_plugins/consume/simulators/enginex/conftest.py @@ -0,0 +1,173 @@ +""" +Pytest fixtures for the `consume enginex` simulator. + +Configures the hive back-end & EL clients for test execution with BlockchainEngineXFixtures. +""" + +import logging + +import pytest +from hive.client import Client + +from ethereum_test_exceptions import ExceptionMapper +from ethereum_test_fixtures import BlockchainEngineXFixture +from ethereum_test_rpc import EngineRPC + +logger = logging.getLogger(__name__) + +pytest_plugins = ( + "pytest_plugins.pytest_hive.pytest_hive", + "pytest_plugins.consume.simulators.base", + "pytest_plugins.consume.simulators.multi_test_client", + "pytest_plugins.consume.simulators.test_case_description", + "pytest_plugins.consume.simulators.timing_data", + "pytest_plugins.consume.simulators.exceptions", + "pytest_plugins.consume.simulators.helpers.test_tracker", +) + + +def pytest_addoption(parser): + """Add enginex-specific command line options.""" + enginex_group = parser.getgroup("enginex", "EngineX simulator options") + enginex_group.addoption( + "--enginex-fcu-frequency", + dest="enginex_fcu_frequency", + action="store", + type=int, + default=0, + help=( + "Control forkchoice update frequency for enginex simulator. " + "0=disable FCUs (default), 1=FCU every test, N=FCU every Nth test per " + "pre-allocation group." + ), + ) + enginex_group.addoption( + "--enginex-max-group-size", + dest="enginex_max_group_size", + action="store", + type=int, + default=100, + help=( + "Maximum number of tests per xdist group. Large pre-allocation groups will be " + "split into virtual sub-groups to improve load balancing. Default: 100." + ), + ) + + +def pytest_configure(config): + """Set the supported fixture formats and store enginex configuration.""" + config._supported_fixture_formats = [BlockchainEngineXFixture.format_name] + + +@pytest.fixture(scope="module") +def test_suite_name() -> str: + """The name of the hive test suite used in this simulator.""" + return "eest/consume-enginex" + + +@pytest.fixture(scope="module") +def test_suite_description() -> str: + """The description of the hive test suite used in this simulator.""" + return ( + "Execute blockchain tests against clients using the Engine API with " + "pre-allocation group optimization using Engine X fixtures." + ) + + +def pytest_collection_modifyitems(session, config, items): + """ + Build group test counts during collection phase. + + This hook analyzes all collected test items to determine how many tests + belong to each group (pre-allocation groups or xdist subgroups), enabling + automatic client cleanup when all tests in a group are complete. + """ + # Only process items for enginex simulator + if not hasattr(config, "_supported_fixture_formats"): + return + + if BlockchainEngineXFixture.format_name not in config._supported_fixture_formats: + return + + # Get the test tracker from session if available + test_tracker = getattr(session, "_pre_alloc_group_test_tracker", None) + if test_tracker is None: + # Tracker will be created later by the fixture, store counts on session for now + group_counts = {} + for item in items: + if hasattr(item, "callspec") and "test_case" in item.callspec.params: + test_case = item.callspec.params["test_case"] + if hasattr(test_case, "pre_hash"): + # Get group identifier from xdist marker if available + group_identifier = None + for marker in item.iter_markers("xdist_group"): + if hasattr(marker, "kwargs") and "name" in marker.kwargs: + group_identifier = marker.kwargs["name"] + break + + # Fallback to pre_hash if no xdist marker (sequential execution) + if group_identifier is None: + group_identifier = test_case.pre_hash + + group_counts[group_identifier] = group_counts.get(group_identifier, 0) + 1 + + # Store on session for later retrieval by test_tracker fixture + session._pre_alloc_group_counts = group_counts + logger.info(f"Collected {len(group_counts)} groups with tests: {dict(group_counts)}") + else: + # Update tracker directly if it exists + group_counts = {} + for item in items: + if hasattr(item, "callspec") and "test_case" in item.callspec.params: + test_case = item.callspec.params["test_case"] + if hasattr(test_case, "pre_hash"): + # Get group identifier from xdist marker if available + group_identifier = None + for marker in item.iter_markers("xdist_group"): + if hasattr(marker, "kwargs") and "name" in marker.kwargs: + group_identifier = marker.kwargs["name"] + break + + # Fallback to pre_hash if no xdist marker (sequential execution) + if group_identifier is None: + group_identifier = test_case.pre_hash + + group_counts[group_identifier] = group_counts.get(group_identifier, 0) + 1 + + for group_identifier, count in group_counts.items(): + test_tracker.set_group_test_count(group_identifier, count) + + logger.info(f"Updated test tracker with {len(group_counts)} groups") + + +@pytest.fixture(scope="function") +def engine_rpc(client: Client, client_exception_mapper: ExceptionMapper | None) -> EngineRPC: + """Initialize engine RPC client for the execution client under test.""" + if client_exception_mapper: + return EngineRPC( + f"http://{client.ip}:8551", + response_validation_context={ + "exception_mapper": client_exception_mapper, + }, + ) + return EngineRPC(f"http://{client.ip}:8551") + + +@pytest.fixture(scope="session") +def fcu_frequency_tracker(request): + """ + Session-scoped FCU frequency tracker for enginex simulator. + + This fixture is imported from test_tracker module and configured + with the --enginex-fcu-frequency command line option. + """ + # Import here to avoid circular imports + from ..helpers.test_tracker import FCUFrequencyTracker + + # Get FCU frequency from pytest config (set by command line argument) + fcu_frequency = getattr(request.config, "enginex_fcu_frequency", 1) + + tracker = FCUFrequencyTracker(fcu_frequency=fcu_frequency) + logger.info(f"FCU frequency tracker initialized with frequency: {fcu_frequency}") + + return tracker diff --git a/src/pytest_plugins/consume/simulators/helpers/client_wrapper.py b/src/pytest_plugins/consume/simulators/helpers/client_wrapper.py new file mode 100644 index 00000000000..ee0674379d6 --- /dev/null +++ b/src/pytest_plugins/consume/simulators/helpers/client_wrapper.py @@ -0,0 +1,573 @@ +"""Client wrapper classes for managing client lifecycle in engine simulators.""" + +import io +import json +import logging +from abc import ABC, abstractmethod +from pathlib import Path +from typing import Dict, Optional, cast + +from hive.client import Client, ClientType + +from ethereum_test_base_types import Number, to_json +from ethereum_test_fixtures import BlockchainFixtureCommon +from ethereum_test_fixtures.pre_alloc_groups import PreAllocGroup +from ethereum_test_forks import Fork +from pytest_plugins.consume.simulators.helpers.ruleset import ruleset + +from .test_tracker import PreAllocGroupTestTracker + +logger = logging.getLogger(__name__) + + +def get_group_identifier_from_request(request, pre_hash: str) -> str: + """ + Determine the appropriate group identifier for client tracking. + + For xdist execution: Uses xdist group name (includes subgroup suffix if split) + For sequential execution: Uses pre_hash directly + + Args: + request: The pytest request object containing test metadata + pre_hash: The pre-allocation group hash + + Returns: + Group identifier string to use for client tracking + + """ + # Check if this test has an xdist_group marker (indicates xdist execution) + xdist_group_marker = None + iter_markers = getattr(request.node, "iter_markers", None) + if iter_markers is None: + return pre_hash + + for marker in iter_markers("xdist_group"): + xdist_group_marker = marker + break + + if ( + xdist_group_marker + and hasattr(xdist_group_marker, "kwargs") + and "name" in xdist_group_marker.kwargs + ): + group_identifier = xdist_group_marker.kwargs["name"] + logger.debug(f"Using xdist group identifier: {group_identifier}") + return group_identifier + + # Fallback to pre_hash for sequential execution or when no xdist marker is found + logger.debug(f"Using pre_hash as group identifier: {pre_hash}") + return pre_hash + + +def extract_pre_hash_from_group_identifier(group_identifier: str) -> str: + """ + Extract the pre_hash from a group identifier. + + For xdist subgroups: Removes the subgroup suffix (e.g., "0x123:0" -> "0x123") + For sequential: Returns as-is (group_identifier == pre_hash) + + Args: + group_identifier: The group identifier string + + Returns: + The pre_hash without any subgroup suffix + + """ + if ":" in group_identifier: + # Split subgroup format: "pre_hash:subgroup_index" + pre_hash = group_identifier.split(":", 1)[0] + logger.debug(f"Extracted pre_hash {pre_hash} from group identifier {group_identifier}") + return pre_hash + + # No subgroup suffix, return as-is + return group_identifier + + +class ClientWrapper(ABC): + """ + Abstract base class for managing client instances in engine simulators. + + This class encapsulates the common logic for generating genesis configurations, + environment variables, and client files needed to start a client. + """ + + def __init__(self, client_type: ClientType): + """ + Initialize the client wrapper. + + Args: + client_type: The type of client to manage + + """ + self.client_type = client_type + self.client: Optional[Client] = None + self._is_started = False + self.test_count = 0 + + @abstractmethod + def _get_fork(self) -> Fork: + """Get the fork for this client.""" + pass + + @abstractmethod + def _get_chain_id(self) -> int: + """Get the chain ID for this client.""" + pass + + @abstractmethod + def _get_pre_alloc(self) -> dict: + """Get the pre-allocation for this client.""" + pass + + @abstractmethod + def _get_genesis_header(self) -> dict: + """Get the genesis header for this client.""" + pass + + def get_genesis_config(self) -> dict: + """ + Get the genesis configuration for this client. + + Returns: + Genesis configuration dict + + """ + # Convert genesis header to JSON format + genesis = self._get_genesis_header() + + # Convert pre-allocation to JSON format + alloc = self._get_pre_alloc() + + # NOTE: nethermind requires account keys without '0x' prefix + genesis["alloc"] = {k.replace("0x", ""): v for k, v in alloc.items()} + + return genesis + + def get_environment(self) -> dict: + """ + Get the environment variables for this client. + + Returns: + Environment variables dict + + """ + fork = self._get_fork() + chain_id = self._get_chain_id() + + assert fork in ruleset, f"fork '{fork}' missing in hive ruleset" + + # Set check live port for engine simulator + check_live_port = 8551 # Engine API port + + return { + "HIVE_CHAIN_ID": str(Number(chain_id)), + "HIVE_FORK_DAO_VOTE": "1", + "HIVE_NODETYPE": "full", + "HIVE_CHECK_LIVE_PORT": str(check_live_port), + **{k: f"{v:d}" for k, v in ruleset[fork].items()}, + } + + def get_client_files(self) -> dict: + """ + Get the client files dict needed for start_client(). + + Returns: + Dict with genesis.json file + + """ + # Create buffered genesis file + genesis_config = self.get_genesis_config() + genesis_json = json.dumps(genesis_config) + genesis_bytes = genesis_json.encode("utf-8") + buffered_genesis = io.BufferedReader(cast(io.RawIOBase, io.BytesIO(genesis_bytes))) + + return {"/genesis.json": buffered_genesis} + + def set_client(self, client: Client) -> None: + """ + Set the client instance after it has been started. + + Args: + client: The started client instance + + """ + if self._is_started: + raise RuntimeError(f"Client {self.client_type.name} is already set") + + self.client = client + self._is_started = True + logger.info(f"Client ({self.client_type.name}) registered") + + def increment_test_count(self) -> None: + """Increment the count of tests that have used this client.""" + self.test_count += 1 + logger.debug(f"Test count for {self.client_type.name}: {self.test_count}") + + def stop(self) -> None: + """Mark the client as stopped.""" + if self._is_started: + logger.info( + f"Marking client ({self.client_type.name}) as stopped after {self.test_count} " + "tests." + ) + self.client = None + self._is_started = False + + @property + def is_running(self) -> bool: + """Check if the client is currently running.""" + return self._is_started and self.client is not None + + +class RestartClient(ClientWrapper): + """ + Client wrapper for the restart simulator where clients restart for each test. + + This class manages clients that are started and stopped for each individual test, + providing complete isolation between test executions. + """ + + def __init__(self, client_type: ClientType, fixture: BlockchainFixtureCommon): + """ + Initialize a restart client wrapper. + + Args: + client_type: The type of client to manage + fixture: The blockchain fixture for this test + + """ + super().__init__(client_type) + self.fixture = fixture + + def _get_fork(self) -> Fork: + """Get the fork from the fixture.""" + return self.fixture.fork + + def _get_chain_id(self) -> int: + """Get the chain ID from the fixture config.""" + return self.fixture.config.chain_id + + def _get_pre_alloc(self) -> dict: + """Get the pre-allocation from the fixture.""" + return to_json(self.fixture.pre) + + def _get_genesis_header(self) -> dict: + """Get the genesis header from the fixture.""" + return to_json(self.fixture.genesis) + + +class MultiTestClient(ClientWrapper): + """ + Client wrapper for multi-test execution where clients are used across tests. + + This class manages clients that are reused across multiple tests in the same + pre-allocation group. + """ + + def __init__( + self, + pre_hash: str, + client_type: ClientType, + pre_alloc_group: PreAllocGroup, + ): + """ + Initialize a multi-test client wrapper. + + Args: + pre_hash: The hash identifying the pre-allocation group + client_type: The type of client to manage + pre_alloc_group: The pre-allocation group data for this group + + """ + super().__init__(client_type) + self.pre_hash = pre_hash + self.pre_alloc_group = pre_alloc_group + + def _get_fork(self) -> Fork: + """Get the fork from the pre-allocation group.""" + return self.pre_alloc_group.fork + + def _get_chain_id(self) -> int: + """Get the chain ID from the pre-allocation group environment.""" + # TODO: Environment doesn't have chain_id field - see work_in_progress.md + return 1 + + def _get_pre_alloc(self) -> dict: + """Get the pre-allocation from the pre-allocation group.""" + return to_json(self.pre_alloc_group.pre) + + def _get_genesis_header(self) -> dict: + """Get the genesis header from the pre-allocation group.""" + return self.pre_alloc_group.genesis.model_dump(by_alias=True) + + def set_client(self, client: Client) -> None: + """Override to log with pre_hash information.""" + if self._is_started: + raise RuntimeError(f"Client for pre-allocation group {self.pre_hash} is already set") + + self.client = client + self._is_started = True + logger.info( + f"Multi-test client ({self.client_type.name}) registered for pre-allocation group " + f"{self.pre_hash}" + ) + + def stop(self) -> None: + """Override to log with pre_hash information and actually stop the client.""" + if self._is_started: + logger.info( + f"Stopping multi-test client ({self.client_type.name}) for pre-allocation group " + f"{self.pre_hash} after {self.test_count} tests" + ) + # Actually stop the Hive client + if self.client is not None: + try: + self.client.stop() + logger.debug(f"Hive client stopped for pre-allocation group {self.pre_hash}") + except Exception as e: + logger.error( + f"Error stopping Hive client for pre-allocation group {self.pre_hash}: {e}" + ) + + self.client = None + self._is_started = False + + +class MultiTestClientManager: + """ + Singleton manager for coordinating multi-test clients across test execution. + + This class tracks all multi-test clients by their group identifier and ensures proper + lifecycle management including cleanup at session end. Group identifiers can be + either pre_hash (for sequential execution) or xdist group names (for parallel execution). + """ + + _instance: Optional["MultiTestClientManager"] = None + _initialized: bool + + def __new__(cls) -> "MultiTestClientManager": + """Ensure only one instance of MultiTestClientManager exists.""" + if cls._instance is None: + cls._instance = super().__new__(cls) + cls._instance._initialized = False + return cls._instance + + def __init__(self) -> None: + """Initialize the manager if not already initialized.""" + if hasattr(self, "_initialized") and self._initialized: + return + + self.multi_test_clients: Dict[str, MultiTestClient] = {} # group_identifier -> client + self.pre_alloc_path: Optional[Path] = None + self.test_tracker: Optional["PreAllocGroupTestTracker"] = None + self._initialized = True + logger.info("MultiTestClientManager initialized") + + def set_pre_alloc_path(self, path: Path) -> None: + """ + Set the path to the pre-allocation files directory. + + Args: + path: Path to the directory containing pre-allocation JSON files + + """ + self.pre_alloc_path = path + logger.debug(f"Pre-alloc path set to: {path}") + + def set_test_tracker(self, test_tracker: "PreAllocGroupTestTracker") -> None: + """ + Set the test tracker for automatic client cleanup. + + Args: + test_tracker: The test tracker instance + + """ + self.test_tracker = test_tracker + logger.debug("Test tracker set for automatic client cleanup") + + def load_pre_alloc_group(self, group_identifier: str) -> PreAllocGroup: + """ + Load the pre-allocation group for a given group identifier. + + Args: + group_identifier: The group identifier (pre_hash or xdist group name) + + Returns: + The loaded PreAllocGroup + + Raises: + RuntimeError: If pre-alloc path is not set + FileNotFoundError: If pre-allocation file is not found + + """ + if self.pre_alloc_path is None: + raise RuntimeError("Pre-alloc path not set in MultiTestClientManager") + + # Extract pre_hash from group identifier (handles subgroups) + pre_hash = extract_pre_hash_from_group_identifier(group_identifier) + pre_alloc_file = self.pre_alloc_path / f"{pre_hash}.json" + if not pre_alloc_file.exists(): + raise FileNotFoundError(f"Pre-allocation file not found: {pre_alloc_file}") + + return PreAllocGroup.model_validate_json(pre_alloc_file.read_text()) + + def get_or_create_multi_test_client( + self, + group_identifier: str, + client_type: ClientType, + ) -> MultiTestClient: + """ + Get an existing MultiTestClient or create a new one for the given group identifier. + + This method doesn't start the actual client - that's done by HiveTestSuite. + It just manages the MultiTestClient wrapper objects. + + Args: + group_identifier: The group identifier (pre_hash or xdist group name) + client_type: The type of client that will be started + + Returns: + The MultiTestClient wrapper instance + + """ + # Check if we already have a MultiTestClient for this group identifier + if group_identifier in self.multi_test_clients: + multi_test_client = self.multi_test_clients[group_identifier] + if multi_test_client.is_running: + logger.debug(f"Found existing MultiTestClient for group {group_identifier}") + return multi_test_client + else: + # MultiTestClient exists but isn't running, remove it + logger.warning( + f"Found stopped MultiTestClient for group {group_identifier}, removing" + ) + del self.multi_test_clients[group_identifier] + + # Load the pre-allocation group for this group + pre_alloc_group = self.load_pre_alloc_group(group_identifier) + + # Extract pre_hash for the MultiTestClient constructor + pre_hash = extract_pre_hash_from_group_identifier(group_identifier) + + # Create new MultiTestClient wrapper + multi_test_client = MultiTestClient( + pre_hash=pre_hash, + client_type=client_type, + pre_alloc_group=pre_alloc_group, + ) + + # Track the MultiTestClient by group identifier + self.multi_test_clients[group_identifier] = multi_test_client + + logger.info( + f"Created new MultiTestClient wrapper for group {group_identifier} " + f"(pre_hash: {pre_hash}, total tracked clients: {len(self.multi_test_clients)})" + ) + + return multi_test_client + + def get_client_for_test( + self, group_identifier: str, test_id: Optional[str] = None + ) -> Optional[Client]: + """ + Get the actual client instance for a test with the given group identifier. + + Args: + group_identifier: The group identifier (pre_hash or xdist group name) + test_id: Optional test ID for completion tracking + + Returns: + The client instance if available, None otherwise + + """ + if group_identifier in self.multi_test_clients: + multi_test_client = self.multi_test_clients[group_identifier] + if multi_test_client.is_running: + multi_test_client.increment_test_count() + return multi_test_client.client + return None + + def mark_test_completed(self, group_identifier: str, test_id: str) -> None: + """ + Mark a test as completed and trigger automatic client cleanup if appropriate. + + Args: + group_identifier: The group identifier (pre_hash or xdist group name) + test_id: The unique test identifier + + """ + if self.test_tracker is None: + logger.debug("No test tracker available, skipping completion tracking") + return + + # Mark test as completed in tracker + is_group_complete = self.test_tracker.mark_test_completed(group_identifier, test_id) + + if is_group_complete: + # All tests in this group are complete + self._auto_stop_client_if_complete(group_identifier) + + def _auto_stop_client_if_complete(self, group_identifier: str) -> None: + """ + Automatically stop the client for a group if all tests are complete. + + Args: + group_identifier: The group identifier (pre_hash or xdist group name) + + """ + if group_identifier not in self.multi_test_clients: + logger.debug(f"No client found for group {group_identifier}") + return + + multi_test_client = self.multi_test_clients[group_identifier] + if not multi_test_client.is_running: + logger.debug(f"Client for group {group_identifier} is already stopped") + return + + # Stop the client and remove from tracking + logger.info( + f"Auto-stopping client for group {group_identifier} - " + f"all tests completed ({multi_test_client.test_count} tests executed)" + ) + + try: + multi_test_client.stop() + except Exception as e: + logger.error(f"Error auto-stopping client for group {group_identifier}: {e}") + finally: + # Remove from tracking to free memory + del self.multi_test_clients[group_identifier] + logger.debug(f"Removed completed client from tracking: {group_identifier}") + + def stop_all_clients(self) -> None: + """Mark all multi-test clients as stopped.""" + logger.info(f"Marking all {len(self.multi_test_clients)} multi-test clients as stopped") + + for group_identifier, multi_test_client in list(self.multi_test_clients.items()): + try: + multi_test_client.stop() + except Exception as e: + logger.error(f"Error stopping MultiTestClient for group {group_identifier}: {e}") + finally: + del self.multi_test_clients[group_identifier] + + logger.info("All MultiTestClient wrappers cleared") + + def get_client_count(self) -> int: + """Get the number of tracked multi-test clients.""" + return len(self.multi_test_clients) + + def get_test_counts(self) -> Dict[str, int]: + """Get test counts for each multi-test client.""" + return { + group_identifier: client.test_count + for group_identifier, client in self.multi_test_clients.items() + } + + def reset(self) -> None: + """Reset the manager, clearing all state.""" + self.stop_all_clients() + self.multi_test_clients.clear() + self.pre_alloc_path = None + self.test_tracker = None + logger.info("MultiTestClientManager reset") diff --git a/src/pytest_plugins/consume/simulators/helpers/test_tracker.py b/src/pytest_plugins/consume/simulators/helpers/test_tracker.py new file mode 100644 index 00000000000..aff97f77a85 --- /dev/null +++ b/src/pytest_plugins/consume/simulators/helpers/test_tracker.py @@ -0,0 +1,280 @@ +"""Test tracking utilities for pre-allocation group lifecycle management.""" + +import logging +from dataclasses import dataclass, field +from typing import Dict, Set + +import pytest + +logger = logging.getLogger(__name__) + + +@dataclass +class PreAllocGroupTestTracker: + """ + Tracks test execution progress per test group. + + This class enables automatic client cleanup by monitoring when all tests + in a group have completed execution. Groups can be either pre-allocation + groups (sequential execution) or xdist subgroups (parallel execution). + """ + + group_test_counts: Dict[str, int] = field(default_factory=dict) + """Total number of tests per group (group_identifier -> count).""" + + group_completed_tests: Dict[str, Set[str]] = field(default_factory=dict) + """Completed test IDs per group (group_identifier -> {test_ids}).""" + + def set_group_test_count(self, group_identifier: str, total_tests: int) -> None: + """ + Set the total number of tests for a group. + + Args: + group_identifier: The group identifier (pre_hash or xdist group name) + total_tests: Total number of tests in this group + + """ + if group_identifier in self.group_test_counts: + existing_count = self.group_test_counts[group_identifier] + if existing_count != total_tests: + logger.warning( + f"Group {group_identifier} test count mismatch: " + f"existing={existing_count}, new={total_tests}" + ) + + self.group_test_counts[group_identifier] = total_tests + if group_identifier not in self.group_completed_tests: + self.group_completed_tests[group_identifier] = set() + + logger.debug(f"Set test count for group {group_identifier}: {total_tests}") + + def mark_test_completed(self, group_identifier: str, test_id: str) -> bool: + """ + Mark a test as completed for the given group. + + Args: + group_identifier: The group identifier (pre_hash or xdist group name) + test_id: The unique test identifier + + Returns: + True if all tests in the group are now complete + + """ + if group_identifier not in self.group_completed_tests: + self.group_completed_tests[group_identifier] = set() + + # Avoid double-counting the same test + if test_id in self.group_completed_tests[group_identifier]: + logger.debug( + f"Test {test_id} already marked as completed for group {group_identifier}" + ) + return self.is_group_complete(group_identifier) + + self.group_completed_tests[group_identifier].add(test_id) + completed_count = len(self.group_completed_tests[group_identifier]) + total_count = self.group_test_counts.get(group_identifier, 0) + + logger.debug( + f"Test {test_id} completed for group {group_identifier} " + f"({completed_count}/{total_count})" + ) + + is_complete = self.is_group_complete(group_identifier) + if is_complete: + logger.info( + f"All tests completed for group {group_identifier} " + f"({completed_count}/{total_count}) - ready for client cleanup" + ) + + return is_complete + + def is_group_complete(self, group_identifier: str) -> bool: + """ + Check if all tests in a group have completed. + + Args: + group_identifier: The group identifier (pre_hash or xdist group name) + + Returns: + True if all tests in the group are complete + + """ + if group_identifier not in self.group_test_counts: + logger.warning(f"No test count found for group {group_identifier}") + return False + + total_count = self.group_test_counts[group_identifier] + completed_count = len(self.group_completed_tests.get(group_identifier, set())) + + return completed_count >= total_count + + def get_completion_status(self, group_identifier: str) -> tuple[int, int]: + """ + Get completion status for a group. + + Args: + group_identifier: The group identifier (pre_hash or xdist group name) + + Returns: + Tuple of (completed_count, total_count) + + """ + total_count = self.group_test_counts.get(group_identifier, 0) + completed_count = len(self.group_completed_tests.get(group_identifier, set())) + return completed_count, total_count + + def get_all_completion_status(self) -> Dict[str, tuple[int, int]]: + """ + Get completion status for all tracked groups. + + Returns: + Dict mapping group_identifier to (completed_count, total_count) + + """ + return { + group_identifier: self.get_completion_status(group_identifier) + for group_identifier in self.group_test_counts + } + + def reset_group(self, group_identifier: str) -> None: + """ + Reset tracking data for a specific group. + + Args: + group_identifier: The group identifier to reset + + """ + if group_identifier in self.group_test_counts: + del self.group_test_counts[group_identifier] + if group_identifier in self.group_completed_tests: + del self.group_completed_tests[group_identifier] + logger.debug(f"Reset tracking data for group {group_identifier}") + + def reset_all(self) -> None: + """Reset all tracking data.""" + self.group_test_counts.clear() + self.group_completed_tests.clear() + logger.debug("Reset all test tracking data") + + +@pytest.fixture(scope="session") +def pre_alloc_group_test_tracker(request) -> PreAllocGroupTestTracker: + """ + Session-scoped test tracker for pre-allocation group lifecycle management. + + This fixture provides a centralized way to track test completion across + all pre-allocation groups during a pytest session. + """ + tracker = PreAllocGroupTestTracker() + + # Store tracker on session for access by collection hooks + request.session._pre_alloc_group_test_tracker = tracker + + # Load pre-collected group counts if available + if hasattr(request.session, "_pre_alloc_group_counts"): + group_counts = request.session._pre_alloc_group_counts + for group_identifier, count in group_counts.items(): + tracker.set_group_test_count(group_identifier, count) + logger.info(f"Loaded test counts for {len(group_counts)} groups") + + logger.info("Pre-allocation group test tracker initialized") + return tracker + + +@dataclass +class FCUFrequencyTracker: + """ + Tracks forkchoice update frequency per group. + + This class enables controlling how often forkchoice updates are performed + during test execution on a per-group basis (supporting both pre-allocation + groups and xdist subgroups). + """ + + fcu_frequency: int + """Frequency of FCU operations (0=disabled, 1=every test, N=every Nth test).""" + + group_test_counters: Dict[str, int] = field(default_factory=dict) + """Test counters per group (group_identifier -> count).""" + + def should_perform_fcu(self, group_identifier: str) -> bool: + """ + Check if forkchoice update should be performed for this test. + + Args: + group_identifier: The group identifier (pre_hash or xdist group name) + + Returns: + True if FCU should be performed for this test + + """ + if self.fcu_frequency == 0: + logger.debug(f"FCU disabled for group {group_identifier} (frequency=0)") + return False + + current_count = self.group_test_counters.get(group_identifier, 0) + should_perform = (current_count % self.fcu_frequency) == 0 + + logger.debug( + f"FCU decision for group {group_identifier}: " + f"perform={should_perform} (test_count={current_count}, " + f"frequency={self.fcu_frequency})" + ) + + return should_perform + + def increment_test_count(self, group_identifier: str) -> None: + """ + Increment test counter for group. + + Args: + group_identifier: The group identifier (pre_hash or xdist group name) + + """ + current_count = self.group_test_counters.get(group_identifier, 0) + new_count = current_count + 1 + self.group_test_counters[group_identifier] = new_count + + logger.debug( + f"Incremented test count for group {group_identifier}: {current_count} -> {new_count}" + ) + + def get_test_count(self, group_identifier: str) -> int: + """ + Get current test count for group. + + Args: + group_identifier: The group identifier (pre_hash or xdist group name) + + Returns: + Current test count for the group + + """ + return self.group_test_counters.get(group_identifier, 0) + + def get_all_test_counts(self) -> Dict[str, int]: + """ + Get test counts for all tracked groups. + + Returns: + Dict mapping group_identifier to test count + + """ + return dict(self.group_test_counters) + + def reset_group(self, group_identifier: str) -> None: + """ + Reset test counter for a specific group. + + Args: + group_identifier: The group identifier to reset + + """ + if group_identifier in self.group_test_counters: + del self.group_test_counters[group_identifier] + logger.debug(f"Reset test counter for group {group_identifier}") + + def reset_all(self) -> None: + """Reset all test counters.""" + self.group_test_counters.clear() + logger.debug("Reset all FCU frequency test counters") diff --git a/src/pytest_plugins/consume/simulators/multi_test_client.py b/src/pytest_plugins/consume/simulators/multi_test_client.py new file mode 100644 index 00000000000..51eecc9505c --- /dev/null +++ b/src/pytest_plugins/consume/simulators/multi_test_client.py @@ -0,0 +1,185 @@ +"""Common pytest fixtures for simulators with multi-test client architecture.""" + +import io +import json +import logging +from typing import Dict, Generator, Mapping, cast + +import pytest +from hive.client import Client, ClientType +from hive.testing import HiveTest, HiveTestSuite + +from ethereum_test_base_types import to_json +from ethereum_test_fixtures import BlockchainEngineXFixture +from ethereum_test_fixtures.blockchain import FixtureHeader +from ethereum_test_fixtures.pre_alloc_groups import PreAllocGroup +from pytest_plugins.consume.consume import FixturesSource +from pytest_plugins.consume.simulators.helpers.ruleset import ( + ruleset, # TODO: generate dynamically +) +from pytest_plugins.filler.fixture_output import FixtureOutput + +from .helpers.client_wrapper import ( + MultiTestClientManager, + get_group_identifier_from_request, +) +from .helpers.timing import TimingData + +logger = logging.getLogger(__name__) + + +@pytest.fixture(scope="session") +def pre_alloc_group_cache() -> Dict[str, PreAllocGroup]: + """Cache for pre-allocation groups to avoid reloading from disk.""" + return {} + + +@pytest.fixture(scope="function") +def pre_alloc_group( + fixture: BlockchainEngineXFixture, + fixtures_source: FixturesSource, + pre_alloc_group_cache: Dict[str, PreAllocGroup], +) -> PreAllocGroup: + """Load the pre-allocation group for the current test case.""" + pre_hash = fixture.pre_hash + + # Check cache first + if pre_hash in pre_alloc_group_cache: + return pre_alloc_group_cache[pre_hash] + + # Load from disk + if fixtures_source.is_stdin: + raise ValueError("Pre-allocation groups require file-based fixture input.") + + # Look for pre-allocation group file using FixtureOutput path structure + fixture_output = FixtureOutput(output_path=fixtures_source.path) + pre_alloc_path = fixture_output.pre_alloc_groups_folder_path / f"{pre_hash}.json" + if not pre_alloc_path.exists(): + raise FileNotFoundError(f"Pre-allocation group file not found: {pre_alloc_path}") + + # Load and cache + with open(pre_alloc_path) as f: + pre_alloc_group_obj = PreAllocGroup.model_validate_json(f.read()) + + pre_alloc_group_cache[pre_hash] = pre_alloc_group_obj + return pre_alloc_group_obj + + +def create_environment(pre_alloc_group: PreAllocGroup, check_live_port: int) -> dict: + """Define environment using PreAllocGroup data.""" + fork = pre_alloc_group.fork + assert fork in ruleset, f"fork '{fork}' missing in hive ruleset" + return { + "HIVE_CHAIN_ID": "1", # TODO: Environment doesn't have chain_id - see work_in_progress.md + "HIVE_FORK_DAO_VOTE": "1", + "HIVE_NODETYPE": "full", + "HIVE_CHECK_LIVE_PORT": str(check_live_port), + **{k: f"{v:d}" for k, v in ruleset[fork].items()}, + } + + +def client_files(pre_alloc_group: PreAllocGroup) -> Mapping[str, io.BufferedReader]: + """Define the files that hive will start the client with.""" + genesis = to_json(pre_alloc_group.genesis) # type: ignore + alloc = to_json(pre_alloc_group.pre) + + # NOTE: nethermind requires account keys without '0x' prefix + genesis["alloc"] = {k.replace("0x", ""): v for k, v in alloc.items()} + + genesis_json = json.dumps(genesis) + genesis_bytes = genesis_json.encode("utf-8") + buffered_genesis = io.BufferedReader(cast(io.RawIOBase, io.BytesIO(genesis_bytes))) + + files = {} + files["/genesis.json"] = buffered_genesis + return files + + +@pytest.fixture(scope="session") +def multi_test_client_manager() -> Generator[MultiTestClientManager, None, None]: + """Provide singleton MultiTestClientManager with session cleanup.""" + manager = MultiTestClientManager() + try: + yield manager + finally: + logger.info("Cleaning up multi-test clients at session end...") + manager.stop_all_clients() + + +@pytest.fixture(scope="function") +def genesis_header(pre_alloc_group: PreAllocGroup) -> FixtureHeader: + """Provide the genesis header from the pre-allocation group.""" + return pre_alloc_group.genesis # type: ignore + + +@pytest.fixture(scope="function") +def client( + test_suite: HiveTestSuite, + hive_test: HiveTest, + client_type: ClientType, + total_timing_data: TimingData, + fixture: BlockchainEngineXFixture, + pre_alloc_group: PreAllocGroup, + multi_test_client_manager: MultiTestClientManager, + fixtures_source: FixturesSource, + pre_alloc_group_test_tracker, + request, +) -> Generator[Client, None, None]: + """Initialize or reuse multi-test client for the test group.""" + logger.info("🔥 MULTI-TEST CLIENT FIXTURE CALLED - Using multi-test client architecture!") + pre_hash = fixture.pre_hash + test_id = request.node.nodeid + + # Determine the appropriate group identifier for this test + group_identifier = get_group_identifier_from_request(request, pre_hash) + logger.info(f"Using group identifier: {group_identifier} (pre_hash: {pre_hash})") + + # Set pre-alloc path in manager if not already set + if multi_test_client_manager.pre_alloc_path is None: + fixture_output = FixtureOutput(output_path=fixtures_source.path) + multi_test_client_manager.set_pre_alloc_path(fixture_output.pre_alloc_groups_folder_path) + + # Set test tracker in manager if not already set + if multi_test_client_manager.test_tracker is None: + multi_test_client_manager.set_test_tracker(pre_alloc_group_test_tracker) + + # Check for existing client + existing_client = multi_test_client_manager.get_client_for_test(group_identifier, test_id) + if existing_client is not None: + logger.info(f"Reusing multi-test client for group {group_identifier}") + hive_test.register_shared_client(existing_client) + try: + yield existing_client + finally: + # Mark test as completed when fixture teardown occurs + multi_test_client_manager.mark_test_completed(group_identifier, test_id) + return + + # Start new multi-test client + logger.info(f"Starting multi-test client for group {group_identifier}") + + with total_timing_data.time("Start multi-test client"): + hive_client = test_suite.start_client( + client_type=client_type, + environment=create_environment(pre_alloc_group, 8551), + files=client_files(pre_alloc_group), + ) + + assert hive_client is not None, ( + f"Failed to start multi-test client for group {group_identifier}" + ) + + # Register with manager + multi_test_client = multi_test_client_manager.get_or_create_multi_test_client( + group_identifier=group_identifier, + client_type=client_type, + ) + multi_test_client.set_client(hive_client) + hive_test.register_shared_client(hive_client) + + logger.info(f"Multi-test client ready for group {group_identifier}") + try: + yield hive_client + finally: + # Mark test as completed when fixture teardown occurs + multi_test_client_manager.mark_test_completed(group_identifier, test_id) diff --git a/src/pytest_plugins/consume/simulators/rlp/__init__.py b/src/pytest_plugins/consume/simulators/rlp/__init__.py index a76490fc945..f157db34172 100644 --- a/src/pytest_plugins/consume/simulators/rlp/__init__.py +++ b/src/pytest_plugins/consume/simulators/rlp/__init__.py @@ -1 +1 @@ -"""Consume RLP test functions.""" +"""The consume rlp simulator.""" diff --git a/src/pytest_plugins/consume/simulators/simulator_logic/test_via_engine.py b/src/pytest_plugins/consume/simulators/simulator_logic/test_via_engine.py index 26e95719c56..0c9bb840d7c 100644 --- a/src/pytest_plugins/consume/simulators/simulator_logic/test_via_engine.py +++ b/src/pytest_plugins/consume/simulators/simulator_logic/test_via_engine.py @@ -7,8 +7,11 @@ import time +import pytest + from ethereum_test_exceptions import UndefinedException -from ethereum_test_fixtures import BlockchainEngineFixture +from ethereum_test_fixtures import BlockchainEngineFixture, BlockchainEngineXFixture +from ethereum_test_fixtures.blockchain import FixtureHeader from ethereum_test_rpc import EngineRPC, EthRPC from ethereum_test_rpc.types import ForkchoiceState, JSONRPCError, PayloadStatusEnum @@ -28,59 +31,83 @@ def __init__(self, *args: object) -> None: logger.fail(str(self)) +@pytest.mark.usefixtures("hive_test") def test_blockchain_via_engine( timing_data: TimingData, eth_rpc: EthRPC, engine_rpc: EngineRPC, - fixture: BlockchainEngineFixture, + fixture: BlockchainEngineFixture | BlockchainEngineXFixture, + genesis_header: FixtureHeader, strict_exception_matching: bool, + fcu_frequency_tracker=None, # Optional for enginex simulator + request=None, # For accessing test info ): """ - 1. Check the client genesis block hash matches `fixture.genesis.block_hash`. + 1. Check the client genesis block hash matches `genesis.block_hash`. 2. Execute the test case fixture blocks against the client under test using the `engine_newPayloadVX` method from the Engine API. - 3. For valid payloads a forkchoice update is performed to finalize the chain. + 3. For valid payloads a forkchoice update is performed to finalize the chain + (controlled by FCU frequency for enginex simulator). """ - # Send a initial forkchoice update - with timing_data.time("Initial forkchoice update"): - logger.info("Sending initial forkchoice update to genesis block...") - delay = 0.5 - for attempt in range(3): - forkchoice_response = engine_rpc.forkchoice_updated( - forkchoice_state=ForkchoiceState( - head_block_hash=fixture.genesis.block_hash, - ), - payload_attributes=None, - version=fixture.payloads[0].forkchoice_updated_version, - ) - status = forkchoice_response.payload_status.status - logger.info(f"Initial forkchoice update response attempt {attempt + 1}: {status}") - if status != PayloadStatusEnum.SYNCING: - break - if attempt < 2: - time.sleep(delay) - delay *= 2 - - if forkchoice_response.payload_status.status != PayloadStatusEnum.VALID: - logger.error( - f"Client failed to initialize properly after 3 attempts, " - f"final status: {forkchoice_response.payload_status.status}" - ) - raise LoggedError( - f"unexpected status on forkchoice updated to genesis: {forkchoice_response}" - ) - - with timing_data.time("Get genesis block"): - logger.info("Calling getBlockByNumber to get genesis block...") - genesis_block = eth_rpc.get_block_by_number(0) - if genesis_block["hash"] != str(fixture.genesis.block_hash): - expected = fixture.genesis.block_hash - got = genesis_block["hash"] - logger.fail(f"Genesis block hash mismatch. Expected: {expected}, Got: {got}") - raise GenesisBlockMismatchExceptionError( - expected_header=fixture.genesis, - got_genesis_block=genesis_block, - ) + # Determine if we should perform forkchoice updates based on frequency tracker + should_perform_fcus = True # Default behavior for engine simulator + group_identifier = None + + if fcu_frequency_tracker is not None and hasattr(fixture, "pre_hash"): + # EngineX simulator with forkchoice update frequency control + # Use group identifier for tracking (supports both sequential and xdist execution) + from ..helpers.client_wrapper import get_group_identifier_from_request + + group_identifier = get_group_identifier_from_request(request, fixture.pre_hash) + should_perform_fcus = fcu_frequency_tracker.should_perform_fcu(group_identifier) + + logger.info( + f"Forkchoice update frequency check for group {group_identifier}: " + f"perform_fcu={should_perform_fcus} " + f"(frequency={fcu_frequency_tracker.fcu_frequency}, " + f"test_count={fcu_frequency_tracker.get_test_count(group_identifier)})" + ) + + # Always increment the test counter at the start for proper tracking + if fcu_frequency_tracker is not None and group_identifier is not None: + fcu_frequency_tracker.increment_test_count(group_identifier) + + if not isinstance(fixture, BlockchainEngineXFixture): + # Skip the initial FCU update for enginex simulator + with timing_data.time("Initial forkchoice update"): + logger.info("Sending initial forkchoice update to genesis block...") + delay = 0.5 + for attempt in range(3): + forkchoice_response = engine_rpc.forkchoice_updated( + forkchoice_state=ForkchoiceState( + head_block_hash=genesis_header.block_hash, + ), + payload_attributes=None, + version=fixture.payloads[0].forkchoice_updated_version, + ) + status = forkchoice_response.payload_status.status + logger.info(f"Initial forkchoice update response attempt {attempt + 1}: {status}") + if status != PayloadStatusEnum.SYNCING: + break + if attempt < 2: + time.sleep(delay) + delay *= 2 + if forkchoice_response.payload_status.status != PayloadStatusEnum.VALID: + raise LoggedError( + f"unexpected status on forkchoice updated to genesis: {forkchoice_response}" + ) + + with timing_data.time("Get genesis block"): + logger.info("Calling getBlockByNumber to get genesis block...") + client_genesis_response = eth_rpc.get_block_by_number(0) + if client_genesis_response["hash"] != str(genesis_header.block_hash): + expected = genesis_header.block_hash + got = client_genesis_response["hash"] + logger.fail(f"Genesis block hash mismatch. Expected: {expected}, Got: {got}") + raise GenesisBlockMismatchExceptionError( + expected_header=genesis_header, + got_genesis_block=client_genesis_response, + ) with timing_data.time("Payloads execution") as total_payload_timing: logger.info(f"Starting execution of {len(fixture.payloads)} payloads...") @@ -150,7 +177,7 @@ def test_blockchain_via_engine( f"Unexpected error code: {e.code}, expected: {payload.error_code}" ) from e - if payload.valid(): + if payload.valid() and should_perform_fcus: with payload_timing.time( f"engine_forkchoiceUpdatedV{payload.forkchoice_updated_version}" ): @@ -171,4 +198,18 @@ def test_blockchain_via_engine( f"unexpected status: want {PayloadStatusEnum.VALID}," f" got {forkchoice_response.payload_status.status}" ) + elif payload.valid() and not should_perform_fcus: + logger.info( + f"Skipping forkchoice update for payload {i + 1} due to frequency setting " + f"(group: {group_identifier})" + ) logger.info("All payloads processed successfully.") + + # Log final FCU frequency statistics for enginex simulator + if fcu_frequency_tracker is not None and group_identifier is not None: + final_count = fcu_frequency_tracker.get_test_count(group_identifier) + logger.info( + f"Test completed for group {group_identifier}. " + f"Total tests in group: {final_count}, " + f"FCU frequency: {fcu_frequency_tracker.fcu_frequency}" + ) diff --git a/src/pytest_plugins/consume/simulators/single_test_client.py b/src/pytest_plugins/consume/simulators/single_test_client.py index 6836c94a6fe..e7967c33248 100644 --- a/src/pytest_plugins/consume/simulators/single_test_client.py +++ b/src/pytest_plugins/consume/simulators/single_test_client.py @@ -57,7 +57,7 @@ def buffered_genesis(client_genesis: dict) -> io.BufferedReader: @pytest.fixture(scope="function") def genesis_header(fixture: BlockchainFixtureCommon) -> FixtureHeader: - """Provide the genesis header from the shared pre-state group.""" + """Provide the genesis header from the pre-allocation group.""" return fixture.genesis # type: ignore diff --git a/src/pytest_plugins/consume/tests/test_group_identifier_container.py b/src/pytest_plugins/consume/tests/test_group_identifier_container.py new file mode 100644 index 00000000000..3285f4432aa --- /dev/null +++ b/src/pytest_plugins/consume/tests/test_group_identifier_container.py @@ -0,0 +1,319 @@ +""" +Unit tests for test group identifier container cleanup. + +This module tests the container cleanup to ensures proper client lifecycle +management for both sequential and xdist execution modes. + +The test specifically addresses a regression introduced when subgroup splitting was +added for load balancing. Previously, each subgroup would create separate containers +for the same pre-allocation group, leading to container count explosion +(e.g., 24-25 containers instead of the expected 8 with 8 workers). +""" + +from unittest.mock import Mock + +import pytest + +from pytest_plugins.consume.simulators.helpers.client_wrapper import ( + extract_pre_hash_from_group_identifier, + get_group_identifier_from_request, +) + + +class TestGroupIdentifierDetection: + """Test group identifier detection for different execution modes.""" + + def test_sequential_execution_no_xdist_marker(self): + """Test group identifier detection for sequential execution (no xdist marker).""" + # Setup: Mock request with no xdist markers + request_mock = Mock() + request_mock.node.iter_markers = Mock(return_value=[]) + + pre_hash = "0x479393be6619d67f" + + # Execute + group_id = get_group_identifier_from_request(request_mock, pre_hash) + + # Verify: Should use pre_hash directly for sequential execution + assert group_id == pre_hash + + def test_xdist_execution_with_subgroup(self): + """Test group identifier detection for xdist execution with subgroups.""" + # Setup: Mock request with xdist marker containing subgroup + xdist_marker = Mock() + xdist_marker.kwargs = {"name": "0x479393be6619d67f:2"} + + request_mock = Mock() + request_mock.node.iter_markers = Mock(return_value=[xdist_marker]) + + pre_hash = "0x479393be6619d67f" + + # Execute + group_id = get_group_identifier_from_request(request_mock, pre_hash) + + # Verify: Should use xdist group name (with subgroup suffix) + assert group_id == "0x479393be6619d67f:2" + + def test_xdist_execution_without_subgroup(self): + """Test group identifier detection for xdist execution without subgroups.""" + # Setup: Mock request with xdist marker without subgroup + xdist_marker = Mock() + xdist_marker.kwargs = {"name": "0x479393be6619d67f"} + + request_mock = Mock() + request_mock.node.iter_markers = Mock(return_value=[xdist_marker]) + + pre_hash = "0x479393be6619d67f" + + # Execute + group_id = get_group_identifier_from_request(request_mock, pre_hash) + + # Verify: Should use xdist group name (same as pre_hash) + assert group_id == pre_hash + + def test_missing_iter_markers_method(self): + """Test fallback when request.node doesn't have iter_markers method.""" + # Setup: Mock request without iter_markers method + request_mock = Mock() + del request_mock.node.iter_markers # Remove the method + + pre_hash = "0x479393be6619d67f" + + # Execute + group_id = get_group_identifier_from_request(request_mock, pre_hash) + + # Verify: Should fallback to pre_hash + assert group_id == pre_hash + + def test_xdist_marker_without_name_kwargs(self): + """Test handling of xdist marker without proper name kwargs.""" + # Setup: Mock request with malformed xdist marker + xdist_marker = Mock() + xdist_marker.kwargs = {} # No 'name' key + + request_mock = Mock() + request_mock.node.iter_markers = Mock(return_value=[xdist_marker]) + + pre_hash = "0x479393be6619d67f" + + # Execute + group_id = get_group_identifier_from_request(request_mock, pre_hash) + + # Verify: Should fallback to pre_hash + assert group_id == pre_hash + + +class TestPreHashExtraction: + """Test pre_hash extraction from group identifiers.""" + + def test_extract_from_non_subgroup_identifier(self): + """Test extraction from group identifier without subgroup.""" + group_id = "0x479393be6619d67f" + + extracted = extract_pre_hash_from_group_identifier(group_id) + + assert extracted == group_id + + def test_extract_from_subgroup_identifier(self): + """Test extraction from group identifier with subgroup.""" + group_id = "0x479393be6619d67f:2" + expected = "0x479393be6619d67f" + + extracted = extract_pre_hash_from_group_identifier(group_id) + + assert extracted == expected + + def test_extract_with_multiple_colons(self): + """Test extraction with multiple colons (edge case).""" + group_id = "0x479393be6619d67f:2:extra:data" + expected = "0x479393be6619d67f" + + extracted = extract_pre_hash_from_group_identifier(group_id) + + assert extracted == expected + + def test_extract_from_empty_string(self): + """Test extraction from empty string.""" + group_id = "" + + extracted = extract_pre_hash_from_group_identifier(group_id) + + assert extracted == "" + + def test_extract_with_colon_only(self): + """Test extraction with colon only.""" + group_id = ":" + expected = "" + + extracted = extract_pre_hash_from_group_identifier(group_id) + + assert extracted == expected + + +class TestContainerIsolationScenario: + """Test the key scenario that fixes the container cleanup regression.""" + + def test_subgroup_container_isolation(self): + """Test that subgroups get separate container tracking.""" + # Setup: Simulate large pre-allocation group split into subgroups + pre_hash = "0x479393be6619d67f" + subgroups = [f"{pre_hash}:{i}" for i in range(5)] + + # Simulate container creation using group identifiers + containers = {} + for subgroup in subgroups: + container_key = subgroup # Key change: use subgroup as container key + extracted_pre_hash = extract_pre_hash_from_group_identifier(subgroup) + + containers[container_key] = { + "group_identifier": subgroup, + "pre_hash": extracted_pre_hash, + "tests_completed": 0, + "total_tests": 400, + } + + # Verify: Each subgroup gets its own container tracking + assert len(containers) == 5 + + # Verify: All containers reference the same pre-allocation file + for container in containers.values(): + assert container["pre_hash"] == pre_hash + + # Verify: Each container has unique group identifier + group_identifiers = [c["group_identifier"] for c in containers.values()] + assert len(set(group_identifiers)) == 5 # All unique + + def test_subgroup_cleanup_isolation(self): + """Test that subgroup cleanup is isolated to completed groups only.""" + # Setup: Multiple subgroups with different completion states + pre_hash = "0x479393be6619d67f" + containers = { + f"{pre_hash}:0": {"tests_completed": 400, "total_tests": 400}, # Complete + f"{pre_hash}:1": {"tests_completed": 200, "total_tests": 400}, # Partial + f"{pre_hash}:2": {"tests_completed": 0, "total_tests": 400}, # Not started + } + + # Simulate cleanup detection + completed_containers = [ + k for k, v in containers.items() if v["tests_completed"] >= v["total_tests"] + ] + + # Verify: Only completed subgroup is marked for cleanup + assert len(completed_containers) == 1 + assert completed_containers[0] == f"{pre_hash}:0" + + def test_sequential_vs_xdist_behavior(self): + """Test that sequential and xdist modes result in different container strategies.""" + pre_hash = "0x479393be6619d67f" + + # Sequential execution: single container for entire pre-allocation group + sequential_containers = {pre_hash: {"total_tests": 2000}} + + # XDist execution: multiple containers for subgroups + xdist_containers = {f"{pre_hash}:{i}": {"total_tests": 400} for i in range(5)} + + # Verify: Different container strategies + assert len(sequential_containers) == 1 # Single container + assert len(xdist_containers) == 5 # Multiple containers + + # Verify: Same total test count + sequential_total = sum(c["total_tests"] for c in sequential_containers.values()) + xdist_total = sum(c["total_tests"] for c in xdist_containers.values()) + assert sequential_total == xdist_total == 2000 + + +class TestRegressionScenario: + """Test the specific regression scenario that was reported.""" + + def test_container_count_regression_fix(self): + """ + Test that the fix prevents the container count regression. + + Before fix: 8 workers × 3 subgroups = 24-25 containers + After fix: Max 8 containers (1 per worker, different subgroups) + """ + # Setup: Simulate 8 workers with subgroups distributed across them + pre_hash = "0x479393be6619d67f" + num_workers = 8 + + # Before fix: Each worker could create containers for different subgroups + # This would lead to multiple containers per pre_hash across workers + before_fix_containers = {} + for worker in range(num_workers): + for subgroup in range(3): # 3 subgroups + # Old key: pre_hash (same for all subgroups) + # This caused multiple containers for same pre_hash + old_key = pre_hash + container_id = f"worker_{worker}_subgroup_{subgroup}" + before_fix_containers[container_id] = {"key": old_key} + + # After fix: Each subgroup gets unique container key + after_fix_containers = {} + for worker in range(num_workers): + # Each worker handles one subgroup (distributed by xdist) + subgroup = worker % 3 # Distribute subgroups across workers + new_key = f"{pre_hash}:{subgroup}" + container_id = f"worker_{worker}" + after_fix_containers[container_id] = {"key": new_key} + + # Verify: Fix reduces container proliferation + # Before: 24 containers (8 workers × 3 subgroups) + assert len(before_fix_containers) == 24 + + # After: 8 containers (1 per worker) + assert len(after_fix_containers) == 8 + + # Verify: Unique container keys in fixed version + after_fix_keys = [c["key"] for c in after_fix_containers.values()] + unique_keys = set(after_fix_keys) + assert len(unique_keys) <= 3 # At most one container per subgroup + + +@pytest.mark.parametrize( + "execution_mode,expected_containers", + [ + ("sequential", 1), # Single container for entire pre-allocation group + ("xdist_small", 1), # Small group, no splitting needed + ("xdist_large", 5), # Large group, split into 5 subgroups + ], +) +def test_container_strategy_by_execution_mode(execution_mode, expected_containers): + """Test container strategy varies by execution mode and group size.""" + pre_hash = "0x479393be6619d67f" + + if execution_mode == "sequential": + # Sequential: Always use pre_hash as container key + container_keys = [pre_hash] + elif execution_mode == "xdist_small": + # Small xdist group: No subgroup splitting + container_keys = [pre_hash] + elif execution_mode == "xdist_large": + # Large xdist group: Split into subgroups + container_keys = [f"{pre_hash}:{i}" for i in range(5)] + + assert len(container_keys) == expected_containers + + +class TestEdgeCases: + """Test edge cases and error conditions.""" + + def test_none_request_handling(self): + """Test handling of None request parameter.""" + with pytest.raises(AttributeError): + get_group_identifier_from_request(None, "0x123") + + def test_empty_pre_hash(self): + """Test handling of empty pre_hash.""" + request_mock = Mock() + request_mock.node.iter_markers = Mock(return_value=[]) + + group_id = get_group_identifier_from_request(request_mock, "") + assert group_id == "" + + def test_none_pre_hash(self): + """Test handling of None pre_hash.""" + request_mock = Mock() + request_mock.node.iter_markers = Mock(return_value=[]) + + group_id = get_group_identifier_from_request(request_mock, None) + assert group_id is None diff --git a/whitelist.txt b/whitelist.txt index 777f990dd82..9a74204dffc 100644 --- a/whitelist.txt +++ b/whitelist.txt @@ -1057,7 +1057,8 @@ Typecheck autoformat Typechecking groupstats -SharedPreStateGroup +enginex +loadgroup qube aspell codespell