Skip to content
206 changes: 132 additions & 74 deletions tests/benchmark/compute/instruction/test_system.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
import pytest
from execution_testing import (
Account,
Address,
Alloc,
BenchmarkTestFiller,
Block,
Expand Down Expand Up @@ -58,15 +59,16 @@ def test_xcall(
gas_benchmark_value: int,
) -> None:
"""Benchmark a system execution where a single opcode execution."""
# The attack gas limit is the gas limit which the target tx will use The
# test will scale the block gas limit to setup the contracts accordingly to
# be able to pay for the contract deposit. This has to take into account
# the 200 gas per byte, but also the quadratic memory expansion costs which
# have to be paid each time the memory is being setup
# The attack gas limit represents the transaction gas limit cap or
# the block gas limit. If eip-7825 is applied, the test will create
# multiple transactions for contract deployment. It should account
# for the 200 gas per byte cost and the quadratic memory-expansion
# costs, which must be paid each time memory is initialized.
attack_gas_limit = gas_benchmark_value
max_contract_size = fork.max_code_size()

gas_costs = fork.gas_costs()
tx_gas_limit_cap = fork.transaction_gas_limit_cap()

# Calculate the absolute minimum gas costs to deploy the contract This does
# not take into account setting up the actual memory (using KECCAK256 and
Expand All @@ -90,7 +92,7 @@ def test_xcall(
+ gas_costs.G_COLD_ACCOUNT_ACCESS # Opcode cost
+ 30 # ~Gluing opcodes
)
# Calculate the number of contracts to be targeted
# Calculate an upper bound of the number of contracts to be targeted
num_contracts = (
# Base available gas = GAS_LIMIT - intrinsic - (out of loop MSTOREs)
attack_gas_limit - intrinsic_gas_cost_calc() - gas_costs.G_VERY_LOW * 4
Expand All @@ -108,6 +110,129 @@ def test_xcall(
"during the setup phase of this test."
)

initcode, factory_address, factory_caller_address = (
_deploy_max_contract_factory(pre, fork)
)

# Deploy num_contracts via multiple txs (each capped by tx gas limit).
with TestPhaseManager.setup():
# Rough estimate (rounded down) of contracts per tx based on dominant
# cost factor only. E.g., 17M gas limit + 24KiB contracts = ~3 per tx.
# The goal is to involve the minimum amount of gas pricing to avoid
# complexity and potential brittleness.
# If this estimation is incorrect in the future (i.e. tx gas limit cap)
# is increased or cost per byte, the post-state check will detect it
# and can be adjusted with a more complex formula.
num_contracts_per_tx = (
tx_gas_limit_cap
// (gas_costs.G_CODE_DEPOSIT_BYTE * max_contract_size)
if tx_gas_limit_cap
else num_contracts
)
attack_txs = (
math.ceil(num_contracts / num_contracts_per_tx)
if num_contracts_per_tx
else 1
)

contracts_deployment_txs = []
for _ in range(attack_txs):
contracts_deployment_txs.append(
Transaction(
to=factory_caller_address,
gas_limit=tx_gas_limit_cap or env.gas_limit,
data=Hash(num_contracts_per_tx),
sender=pre.fund_eoa(),
)
)

post = {}
for i in range(num_contracts):
deployed_contract_address = compute_create2_address(
address=factory_address,
salt=i,
initcode=initcode,
)
post[deployed_contract_address] = Account(nonce=1)

attack_call = Bytecode()
if opcode == Op.EXTCODECOPY:
attack_call = Op.EXTCODECOPY(
address=Op.SHA3(32 - 20 - 1, 85), dest_offset=96, size=1000
)
else:
# For the rest of the opcodes, we can use the same generic attack call
# since all only minimally need the `address` of the target.
attack_call = Op.POP(opcode(address=Op.SHA3(32 - 20 - 1, 85)))
attack_code = (
# Setup memory for later CREATE2 address generation loop.
# 0xFF+[Address(20bytes)]+[seed(32bytes)]+[initcode keccak(32bytes)]
Op.MSTORE(0, factory_address)
+ Op.MSTORE8(32 - 20 - 1, 0xFF)
+ Op.MSTORE(32, Op.CALLDATALOAD(0))
+ Op.MSTORE(64, initcode.keccak256())
# Main loop
+ While(
body=attack_call + Op.MSTORE(32, Op.ADD(Op.MLOAD(32), 1)),
)
)
Comment on lines +167 to +178
Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is the same attack code as before but with a twist. The previous version started with seed=0 and simply ran the loop until it ran out of gas, since we could do all in a singe tx.

Now we have to split the attack in multiple txs, thus I allow reading the seed from CALLDATALOAD(0) so we can create the attack txs in a way that makes sense.


attack_address = pre.deploy_contract(code=attack_code)

with TestPhaseManager.execution():
full_txs = (
attack_gas_limit // tx_gas_limit_cap if tx_gas_limit_cap else 1
)
remainder = (
attack_gas_limit % tx_gas_limit_cap if tx_gas_limit_cap else 0
)

num_targeted_contracts_per_full_tx = (
# Base available gas:
# TX_GAS_LIMIT - intrinsic - (out of loop MSTOREs)
(tx_gas_limit_cap or attack_gas_limit)
- intrinsic_gas_cost_calc()
- gas_costs.G_VERY_LOW * 4
) // loop_cost
contract_start_index = 0
opcode_txs = []
for _ in range(full_txs):
opcode_txs.append(
Transaction(
to=attack_address,
gas_limit=tx_gas_limit_cap or attack_gas_limit,
data=Hash(contract_start_index),
sender=pre.fund_eoa(),
)
)
contract_start_index += num_targeted_contracts_per_full_tx
if remainder > 0:
opcode_txs.append(
Transaction(
to=attack_address,
gas_limit=remainder,
data=Hash(contract_start_index),
sender=pre.fund_eoa(),
)
)

blockchain_test(
pre=pre,
post=post,
blocks=[
Block(txs=contracts_deployment_txs),
Block(txs=opcode_txs),
],
exclude_full_post_state_in_output=True,
)


def _deploy_max_contract_factory(
Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The way the contract factory was created remains unchanged.

Note that the factory_code already used storage slot 0 as the internal seed. This was quite nice since it means that when we call the factory mutliple times now, it keeps creating the next bytecode in a coherent way from the previous tx.

pre: Alloc,
fork: Fork,
) -> tuple[Bytecode, Address, Address]:
max_contract_size = fork.max_code_size()

# The initcode will take its address as a starting point to the input to
# the keccak hash function. It will reuse the output of the hash function
# in a loop to create a large amount of seemingly random code, until it
Expand Down Expand Up @@ -177,74 +302,7 @@ def test_xcall(
)
factory_caller_address = pre.deploy_contract(code=factory_caller_code)

with TestPhaseManager.setup():
contracts_deployment_tx = Transaction(
to=factory_caller_address,
gas_limit=env.gas_limit,
gas_price=10**6,
data=Hash(num_contracts),
sender=pre.fund_eoa(),
)

post = {}
deployed_contract_addresses = []
for i in range(num_contracts):
deployed_contract_address = compute_create2_address(
address=factory_address,
salt=i,
initcode=initcode,
)
post[deployed_contract_address] = Account(nonce=1)
deployed_contract_addresses.append(deployed_contract_address)

attack_call = Bytecode()
if opcode == Op.EXTCODECOPY:
attack_call = Op.EXTCODECOPY(
address=Op.SHA3(32 - 20 - 1, 85), dest_offset=96, size=1000
)
else:
# For the rest of the opcodes, we can use the same generic attack call
# since all only minimally need the `address` of the target.
attack_call = Op.POP(opcode(address=Op.SHA3(32 - 20 - 1, 85)))
attack_code = (
# Setup memory for later CREATE2 address generation loop.
# 0xFF+[Address(20bytes)]+[seed(32bytes)]+[initcode keccak(32bytes)]
Op.MSTORE(0, factory_address)
+ Op.MSTORE8(32 - 20 - 1, 0xFF)
+ Op.MSTORE(32, 0)
+ Op.MSTORE(64, initcode.keccak256())
# Main loop
+ While(
body=attack_call + Op.MSTORE(32, Op.ADD(Op.MLOAD(32), 1)),
)
)

if len(attack_code) > max_contract_size:
# TODO: A workaround could be to split the opcode code into multiple
# contracts and call them in sequence.
raise ValueError(
f"Code size {len(attack_code)} exceeds maximum "
f"code size {max_contract_size}"
)
opcode_address = pre.deploy_contract(code=attack_code)

with TestPhaseManager.execution():
opcode_tx = Transaction(
to=opcode_address,
gas_limit=attack_gas_limit,
gas_price=10**9,
sender=pre.fund_eoa(),
)

blockchain_test(
pre=pre,
post=post,
blocks=[
Block(txs=[contracts_deployment_tx]),
Block(txs=[opcode_tx]),
],
exclude_full_post_state_in_output=True,
)
return initcode, factory_address, factory_caller_address


@pytest.mark.parametrize(
Expand Down
Loading