-
Notifications
You must be signed in to change notification settings - Fork 167
feat(consume): add consume enginex
simulator
#1765
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: main
Are you sure you want to change the base?
Conversation
a2411f5
to
b5a12fb
Compare
c49dd35
to
19cf929
Compare
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Just some small comments (mostly optional suggestions)! Will continue reviewing tmo :)
Still amazed at the speed up!!
|
||
# Store on session for later retrieval by test_tracker fixture | ||
session._pre_alloc_group_counts = group_counts | ||
logger.info(f"Collected {len(group_counts)} groups with tests: {dict(group_counts)}") |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We could consider adding {dict(group_counts)}
to the debug log level while keeping the group count at the info log level. I only say this as on my first run this looked quite sporradic :D
fcu_frequency = getattr(request.config, "enginex_fcu_frequency", 1) | ||
|
||
tracker = FCUFrequencyTracker(fcu_frequency=fcu_frequency) | ||
logger.info(f"FCU frequency tracker initialized with frequency: {fcu_frequency}") |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Maybe only log if not 0 (default) assuming the above. I.e only log if the flag is provided.
logger.info(f"FCU frequency tracker initialized with frequency: {fcu_frequency}") | |
logger.info(f"FCU frequency tracker initialized with frequency: {fcu_frequency}") |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Or debug level logs.
src/pytest_plugins/consume/simulators/hive_tests/test_via_engine.py
Outdated
Show resolved
Hide resolved
|
||
def __init__(self, max_group_size: int = 400): | ||
"""Initialize the mapper with a maximum group size.""" | ||
self.max_group_size = max_group_size |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It could be nice to add another check here for max group size, re division by zero. Even though the flag checks this.
CACHED_DOWNLOADS_DIRECTORY = ( | ||
Path(platformdirs.user_cache_dir("ethereum-execution-spec-tests")) / "cached_downloads" | ||
) | ||
|
||
|
||
class XDistGroupMapper: |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I'm very happy with this here already, but might be nice to consider moving it to its own file in helpers!
group_counts = {} | ||
for item in items: | ||
if hasattr(item, "callspec") and "test_case" in item.callspec.params: | ||
test_case = item.callspec.params["test_case"] | ||
if hasattr(test_case, "pre_hash"): | ||
# Get group identifier from xdist marker if available | ||
group_identifier = None | ||
for marker in item.iter_markers("xdist_group"): | ||
if hasattr(marker, "kwargs") and "name" in marker.kwargs: | ||
group_identifier = marker.kwargs["name"] | ||
break | ||
|
||
# Fallback to pre_hash if no xdist marker (sequential execution) | ||
if group_identifier is None: | ||
group_identifier = test_case.pre_hash | ||
|
||
group_counts[group_identifier] = group_counts.get(group_identifier, 0) + 1 |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We are duplicating this logic I believe. In both the if and else. Maybe we can shift it above the if/else?
Args: | ||
request: The pytest request object containing test metadata | ||
pre_hash: The pre-allocation group hash | ||
|
||
Returns: | ||
Group identifier string to use for client tracking |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Personal nit again, as not a huge fan of these in the docstrings. There are some more like this within this file. Won't push hard though, just personal preference
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I gave this a quick lock and I have a couple of comments, we can chat about it when you have time.
def __new__(cls) -> "MultiTestClientManager": | ||
"""Ensure only one instance of MultiTestClientManager exists.""" | ||
if cls._instance is None: | ||
cls._instance = super().__new__(cls) | ||
cls._instance._initialized = False | ||
return cls._instance |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Is the purpose of this to have a single client instance across all xdist workers?
If so, I don't think it'll work because multi_test_client_manager()
, even though marked with scope="session"
, will have one instance per worker.
If we want two different multi_test_client_manager
instances from two different workers to access the same client, we need to have inter-process comms.
Here's an example:
execution-spec-tests/src/pytest_plugins/execute/rpc/hive.py
Lines 384 to 446 in 724131d
@pytest.fixture(autouse=True, scope="session") | |
def client( | |
base_hive_test: HiveTest, | |
client_files: dict, | |
environment: dict, | |
client_type: ClientType, | |
session_temp_folder: Path, | |
) -> Generator[Client, None, None]: | |
"""Initialize the client with the appropriate files and environment variables.""" | |
base_name = "hive_client" | |
base_file = session_temp_folder / base_name | |
base_error_file = session_temp_folder / f"{base_name}.err" | |
base_lock_file = session_temp_folder / f"{base_name}.lock" | |
client: Client | None = None | |
with FileLock(base_lock_file): | |
if not base_error_file.exists(): | |
if base_file.exists(): | |
with open(base_file, "r") as f: | |
client = Client(**json.load(f)) | |
else: | |
base_error_file.touch() # Assume error | |
client = base_hive_test.start_client( | |
client_type=client_type, environment=environment, files=client_files | |
) | |
if client is not None: | |
base_error_file.unlink() # Success | |
with open(base_file, "w") as f: | |
json.dump( | |
asdict(replace(client, config=None)), # type: ignore | |
f, | |
) | |
error_message = ( | |
f"Unable to connect to the client container ({client_type.name}) via Hive during test " | |
"setup. Check the client or Hive server logs for more information." | |
) | |
assert client is not None, error_message | |
users_file_name = f"{base_name}_users" | |
users_file = session_temp_folder / users_file_name | |
users_lock_file = session_temp_folder / f"{users_file_name}.lock" | |
with FileLock(users_lock_file): | |
if users_file.exists(): | |
with open(users_file, "r") as f: | |
users = json.load(f) | |
else: | |
users = 0 | |
users += 1 | |
with open(users_file, "w") as f: | |
json.dump(users, f) | |
yield client | |
with FileLock(users_lock_file): | |
with open(users_file, "r") as f: | |
users = json.load(f) | |
users -= 1 | |
with open(users_file, "w") as f: | |
json.dump(users, f) | |
if users == 0: | |
client.stop() | |
base_file.unlink() | |
users_file.unlink() |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This refers to only creating once instance of the client manager. It doesn't impose any contraints to how those clients are used by the manager.
It's a "multi-test client manager" in the sense "it manages a client on which multiple tests can be executed". However, what clearly needs better documentation/explanation is that this client will only ever execute payloads from tests from exactly one pre-alloc-group and never will from a different pre-alloc group. There's a 1-to-1 mapping between (sub-)groups and clients; "sub-" is explained below.
This behavior is enabled by the xdist flag --dist=loadgroup
which distributes tests by groups which we define as pre-alloc group. Unless --enginex-max-group-size=N
is used, then sub-groups of max size test size N are created from the pre-alloc group. And each sub-group then maps 1-to-1 to a client instance.
The label is applied here (comment should be improved):
execution-spec-tests/src/pytest_plugins/consume/consume.py
Lines 614 to 615 in 743e6f7
# Add xdist group marker for load balancing | |
markers.append(pytest.mark.xdist_group(name=xdist_group_name)) |
--dist=loadgroup
docs with examples: https://pytest-xdist.readthedocs.io/en/latest/distribution.html#running-tests-across-multiple-cpus
This means that we only ever send payloads from tests from a specific (sub-)group to any one worker (and a group ^= client instance). So there is never any cross-thread interaction with regards to clients. Each worker is responsible for one client at a time, and on that single client, the tests from a (sub-)group are executed sequentially. Therefore, there's no need to lock clients. It's quite the trick, but it appears to work adequately.
Btw, I need to add --dist=loadgroup
to enginex's pytest_configure()
by default. This is a small change that's on the todo list.
Do you think there's an argument to allow multiple workers to send payloads to the same client instance?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
About --enginex-max-group-size=N
. If you have 8 groups and 8 workers, but say group-0
has a 1000 tests and all other groups have 100 tests, then we'll clearly be limited by the worker that gets group-0
. If we set this flag to 100, then we'll get 19 groups of equal size across 8 workers. Not optimal, but much better.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Setting the default for --enginex-max-group-size=100
seems reasonable imo!
|
||
|
||
@dataclass | ||
class FCUFrequencyTracker: |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This one will also need inter-process comms, then we might need to lock the client when we send an FCU to a specific client.
@@ -150,7 +168,7 @@ def test_blockchain_via_engine( | |||
f"Unexpected error code: {e.code}, expected: {payload.error_code}" | |||
) from e | |||
|
|||
if payload.valid(): | |||
if payload.valid() and should_perform_fcus: | |||
with payload_timing.time( |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
A couple of questions:
- Should we lock the client here
- Should we fcu back to genesis after the fcu is completed
- If we don't fcu back to genesis, what happens when:
- On test
A
, we FcU to blockA
, whereG <- A
, now canonical head isA
- On test
B
, we new-payload to blockB
, whereG <- B
- On test
743e6f7
to
f2ec3eb
Compare
Just some additional ideas (we spoke about already) for enginex. Either in this PR or follow-ups:
|
9855463
to
6e7140e
Compare
Also removes a file that was unintentionally added in #1718.
This create pre_hash subgroups w/max size for better xdist balancing. - Replace function name check with fixture format detection to properly distinguish between engine and enginex simulators. Both use the same test function name but process different fixture formats: - Engine simulator: "blockchain_test_engine" → standard parametrization - EngineX simulator: "blockchain_test_engine_x" → enhanced parametrization with xdist group splitting and load balancing - Fixes unit test failures by checking for 'test_case' parameter presence instead of maintaining an allowlist of function names.
e91d29b
to
cdf62c0
Compare
🗒️ Description
Requires refactor(consume): create explicit pytest plugin structure #1801.Requires refactor(fill,fixtures): rename "engine reorg" to "engine x"; improve pre-allocation group terminology #1760..Adds a simulator
consume enginex
that runsBlockchainEngineXFixture
against clients which has the potential to speed-up consensus test execution via by 10-50x.Running The Simulator & Initial results
I've been testing locally with a subset of 1846 tests that create 29 groups from Cancun:
Then consume against a hive dev server:
Results for 1846 tests with 29 groups (=^ 29 client initializations):
reth
: 85.37s (0:01:25).besu
: 361.71s (0:06:01).(client versions at Pectra Fork).
FCU Behavior
Xdist Behavior
Tests get distributed to xdist worker by pre-allocation group using
loadgroup
:🔗 Related Issues
BlockchainEngineReorgFixture
#1706✅ Checklist
mkdocs serve
locally and verified the auto-generated docs for new tests in the Test Case Reference are correctly formatted.