Skip to content

Commit 3d3e2b6

Browse files
ilitteriCopilot
andauthored
fix(l2): use checkpoints to persist previous batch state (#5037)
**Motivation** L2s with batches with more than 131 blocks (with at least one tx in the batch) stopped working correctly after the path-based feature was introduced. **Description** Extends `ethrex-storage` API with a `create_snapshot` function, which creates a checkpoint of the DB at a provided path, and it is used in the following manner: - The node initializes with a checkpoint of the current state of the store and heals with the additional blocks. - On each batch seal, a new checkpoint is created containing the state of the latest block from the sealed batch. - When it's time to prepare a batch to commit, the previously created checkpoint is used for both state diff computation and witness generation. - Checkpoints are erased once the batch they served is verified on the L1. **Future work** Remove checkpoints after the batch they served is verified in the L1. --------- Co-authored-by: Copilot <[email protected]>
1 parent 23c14ec commit 3d3e2b6

File tree

11 files changed

+674
-98
lines changed

11 files changed

+674
-98
lines changed

cmd/ethrex/Cargo.toml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -83,7 +83,7 @@ c-kzg = [
8383
"ethrex-crypto/c-kzg",
8484
]
8585
metrics = ["ethrex-blockchain/metrics", "ethrex-l2?/metrics"]
86-
rocksdb = ["ethrex-storage/rocksdb", "ethrex-p2p/rocksdb"]
86+
rocksdb = ["ethrex-storage/rocksdb", "ethrex-p2p/rocksdb", "ethrex-l2?/rocksdb"]
8787
jemalloc = ["dep:tikv-jemallocator"]
8888
jemalloc_profiling = [
8989
"jemalloc",

cmd/ethrex/l2/initializers.rs

Lines changed: 113 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -7,7 +7,9 @@ use crate::l2::{L2Options, SequencerOptions};
77
use crate::utils::{
88
NodeConfigFile, get_client_version, init_datadir, read_jwtsecret_file, store_node_config_file,
99
};
10-
use ethrex_blockchain::{Blockchain, BlockchainType, L2Config};
10+
use ethrex_blockchain::{Blockchain, BlockchainOptions, BlockchainType, L2Config};
11+
use ethrex_common::fd_limit::raise_fd_limit;
12+
use ethrex_common::types::Genesis;
1113
use ethrex_common::types::fee_config::{FeeConfig, L1FeeConfig, OperatorFeeConfig};
1214
use ethrex_common::{Address, types::DEFAULT_BUILDER_GAS_CEIL};
1315
use ethrex_l2::SequencerConfig;
@@ -18,6 +20,8 @@ use ethrex_p2p::{
1820
sync_manager::SyncManager,
1921
types::{Node, NodeRecord},
2022
};
23+
#[cfg(feature = "rocksdb")]
24+
use ethrex_storage::EngineType;
2125
use ethrex_storage::Store;
2226
use ethrex_storage_rollup::{EngineTypeRollup, StoreRollup};
2327
use secp256k1::SecretKey;
@@ -146,14 +150,19 @@ pub async fn init_l2(
146150
opts: L2Options,
147151
log_filter_handler: Option<reload::Handle<EnvFilter, Registry>>,
148152
) -> eyre::Result<()> {
153+
raise_fd_limit()?;
154+
149155
let datadir = opts.node_opts.datadir.clone();
150156
init_datadir(&opts.node_opts.datadir);
151157
let rollup_store_dir = datadir.join("rollup_store");
152158

159+
// Checkpoints are stored in the main datadir
160+
let checkpoints_dir = datadir.clone();
161+
153162
let network = get_network(&opts.node_opts);
154163

155164
let genesis = network.get_genesis()?;
156-
let store = init_store(&datadir, genesis).await;
165+
let store = init_store(&datadir, genesis.clone()).await;
157166
let rollup_store = init_rollup_store(&rollup_store_dir).await;
158167

159168
let operator_fee_config = get_operator_fee_config(&opts.sequencer_opts).await?;
@@ -180,10 +189,18 @@ pub async fn init_l2(
180189
perf_logs_enabled: true,
181190
};
182191

183-
let blockchain = init_blockchain(store.clone(), blockchain_opts);
192+
let blockchain = init_blockchain(store.clone(), blockchain_opts.clone());
184193

185194
regenerate_head_state(&store, &blockchain).await?;
186195

196+
let (initial_checkpoint_store, initial_checkpoint_blockchain) = initialize_checkpoint(
197+
&store,
198+
&checkpoints_dir.join("initial_checkpoint"),
199+
genesis.clone(),
200+
blockchain_opts,
201+
)
202+
.await?;
203+
187204
let signer = get_signer(&datadir);
188205

189206
let local_p2p_node = get_local_p2p_node(&opts.node_opts, &signer);
@@ -277,6 +294,10 @@ pub async fn init_l2(
277294
"http://{}:{}",
278295
opts.node_opts.http_addr, opts.node_opts.http_port
279296
),
297+
initial_checkpoint_store,
298+
initial_checkpoint_blockchain,
299+
genesis,
300+
checkpoints_dir,
280301
)
281302
.into_future();
282303

@@ -344,3 +365,92 @@ pub async fn get_operator_fee_config(
344365
};
345366
Ok(operator_fee_config)
346367
}
368+
369+
/// Initializes a checkpoint of the given store at the specified path.
370+
///
371+
/// If there's no previous checkpoint, it creates one from the current store state.
372+
///
373+
/// This function performs the following steps:
374+
/// 1. Creates a checkpoint of the provided store at the specified path.
375+
/// 2. Initializes a new store and blockchain for the checkpoint.
376+
/// 3. Regenerates the head state in the checkpoint store.
377+
/// 4. Validates that the checkpoint store's head block number and latest block match those of the original store.
378+
async fn initialize_checkpoint(
379+
store: &Store,
380+
path: &Path,
381+
genesis: Genesis,
382+
blockchain_opts: BlockchainOptions,
383+
) -> eyre::Result<(Store, Arc<Blockchain>)> {
384+
// If the checkpoint is not present, create it
385+
if !path.exists() {
386+
store.create_checkpoint(path).await?;
387+
}
388+
389+
// We now load the checkpoint, validate it, and regenerate its state.
390+
391+
#[cfg(feature = "rocksdb")]
392+
let engine_type = EngineType::RocksDB;
393+
#[cfg(not(feature = "rocksdb"))]
394+
let engine_type = EngineType::InMemory;
395+
396+
let checkpoint_store = {
397+
let checkpoint_store_inner = Store::new(path, engine_type)?;
398+
399+
checkpoint_store_inner
400+
.add_initial_state(genesis.clone())
401+
.await?;
402+
403+
checkpoint_store_inner
404+
};
405+
406+
let checkpoint_blockchain =
407+
Arc::new(Blockchain::new(checkpoint_store.clone(), blockchain_opts));
408+
409+
let checkpoint_head_block_number = checkpoint_store.get_latest_block_number().await?;
410+
411+
let db_head_block_number = store.get_latest_block_number().await?;
412+
413+
if checkpoint_head_block_number != db_head_block_number {
414+
return Err(eyre::eyre!(
415+
"checkpoint store head block number does not match main store head block number before regeneration"
416+
));
417+
}
418+
419+
regenerate_head_state(&checkpoint_store, &checkpoint_blockchain).await?;
420+
421+
let checkpoint_latest_block_number = checkpoint_store.get_latest_block_number().await?;
422+
423+
let db_latest_block_number = store.get_latest_block_number().await?;
424+
425+
let checkpoint_latest_block_header = checkpoint_store
426+
.get_block_header(checkpoint_latest_block_number)?
427+
.ok_or(eyre::eyre!(
428+
"latest block header ({checkpoint_latest_block_number}) not found in checkpoint store"
429+
))?;
430+
431+
let db_latest_block_header = store
432+
.get_block_header(db_latest_block_number)?
433+
.ok_or(eyre::eyre!("latest block not found in main store"))?;
434+
435+
// Final sanity check
436+
437+
if !checkpoint_store.has_state_root(checkpoint_latest_block_header.state_root)? {
438+
return Err(eyre::eyre!(
439+
"checkpoint store state is not regenerated properly"
440+
));
441+
}
442+
443+
if checkpoint_latest_block_number != db_head_block_number {
444+
return Err(eyre::eyre!(
445+
"checkpoint store latest block number does not match main store head block number after regeneration"
446+
));
447+
}
448+
449+
if checkpoint_latest_block_header.state_root != db_latest_block_header.state_root {
450+
return Err(eyre::eyre!(
451+
"checkpoint store latest block hash does not match main store latest block hash after regeneration"
452+
));
453+
}
454+
455+
Ok((checkpoint_store, checkpoint_blockchain))
456+
}

crates/blockchain/blockchain.rs

Lines changed: 91 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -201,13 +201,25 @@ impl Blockchain {
201201
blocks: &[Block],
202202
fee_configs: Option<&[FeeConfig]>,
203203
) -> Result<ExecutionWitness, ChainError> {
204-
let first_block_header = blocks
204+
let first_block_header = &blocks
205205
.first()
206206
.ok_or(ChainError::WitnessGeneration(
207207
"Empty block batch".to_string(),
208208
))?
209-
.header
210-
.clone();
209+
.header;
210+
211+
// Later on, we need to access block hashes by number. To avoid needing
212+
// to apply fork choice for each block, we cache them here.
213+
let mut block_hashes_map: BTreeMap<u64, H256> = blocks
214+
.iter()
215+
.cloned()
216+
.map(|block| (block.header.number, block.hash()))
217+
.collect();
218+
219+
block_hashes_map.insert(
220+
first_block_header.number.saturating_sub(1),
221+
first_block_header.parent_hash,
222+
);
211223

212224
// Get state at previous block
213225
let trie = self
@@ -216,7 +228,17 @@ impl Blockchain {
216228
.map_err(|_| ChainError::ParentStateNotFound)?
217229
.ok_or(ChainError::ParentStateNotFound)?;
218230

219-
let (state_trie_witness, mut trie) = TrieLogger::open_trie(trie);
231+
let (mut current_trie_witness, mut trie) = TrieLogger::open_trie(trie);
232+
233+
// For each block, a new TrieLogger will be opened, each containing the
234+
// witness accessed during the block execution. We need to accumulate
235+
// all the nodes accessed during the entire batch execution.
236+
let mut accumulated_state_trie_witness = current_trie_witness
237+
.lock()
238+
.map_err(|_| {
239+
ChainError::WitnessGeneration("Failed to lock state trie witness".to_string())
240+
})?
241+
.clone();
220242

221243
let mut touched_account_storage_slots = BTreeMap::new();
222244
// This will become the state trie + storage trie
@@ -232,10 +254,18 @@ impl Blockchain {
232254

233255
for (i, block) in blocks.iter().enumerate() {
234256
let parent_hash = block.header.parent_hash;
257+
258+
// This assumes that the user has the necessary state stored already,
259+
// so if the user only has the state previous to the first block, it
260+
// will fail in the second iteration of this for loop. To ensure this,
261+
// doesn't fail, later in this function we store the new state after
262+
// re-execution.
235263
let vm_db: DynVmDatabase =
236264
Box::new(StoreVmDatabase::new(self.storage.clone(), parent_hash));
265+
237266
let logger = Arc::new(DatabaseLogger::new(Arc::new(Mutex::new(Box::new(vm_db)))));
238-
let mut vm = match &self.options.r#type {
267+
268+
let mut vm = match self.options.r#type {
239269
BlockchainType::L1 => Evm::new_from_db_for_l1(logger.clone()),
240270
BlockchainType::L2(_) => {
241271
let l2_config = match fee_configs {
@@ -253,7 +283,8 @@ impl Blockchain {
253283
};
254284

255285
// Re-execute block with logger
256-
vm.execute_block(block)?;
286+
let execution_result = vm.execute_block(block)?;
287+
257288
// Gather account updates
258289
let account_updates = vm.get_state_transitions()?;
259290

@@ -276,7 +307,9 @@ impl Blockchain {
276307
ChainError::WitnessGeneration("Failed to get block hashes".to_string())
277308
})?
278309
.clone();
310+
279311
block_hashes.extend(logger_block_hashes);
312+
280313
// Access all the accounts needed for withdrawals
281314
if let Some(withdrawals) = block.body.withdrawals.as_ref() {
282315
for withdrawal in withdrawals {
@@ -320,6 +353,7 @@ impl Blockchain {
320353
used_storage_tries.insert(*account, (storage_trie_witness, storage_trie));
321354
}
322355
}
356+
323357
// Store all the accessed evm bytecodes
324358
for code_hash in logger
325359
.code_accessed
@@ -342,14 +376,21 @@ impl Blockchain {
342376
}
343377

344378
// Apply account updates to the trie recording all the necessary nodes to do so
345-
let (updated_trie, storage_tries_after_update) = self
379+
let (storage_tries_after_update, account_updates_list) = self
346380
.storage
347381
.apply_account_updates_from_trie_with_witness(
348382
trie,
349383
&account_updates,
350384
used_storage_tries,
351385
)
352386
.await?;
387+
388+
// We cannot ensure that the users of this function have the necessary
389+
// state stored, so in order for it to not assume anything, we update
390+
// the storage with the new state after re-execution
391+
self.store_block(block.clone(), account_updates_list, execution_result)
392+
.await?;
393+
353394
for (address, (witness, _storage_trie)) in storage_tries_after_update {
354395
let mut witness = witness.lock().map_err(|_| {
355396
ChainError::WitnessGeneration("Failed to lock storage trie witness".to_string())
@@ -359,15 +400,40 @@ impl Blockchain {
359400
used_trie_nodes.extend_from_slice(&witness);
360401
touched_account_storage_slots.entry(address).or_default();
361402
}
403+
404+
let (new_state_trie_witness, updated_trie) = TrieLogger::open_trie(
405+
self.storage
406+
.state_trie(
407+
block_hashes_map
408+
.get(&block.header.number)
409+
.ok_or(ChainError::WitnessGeneration(
410+
"Block hash not found for witness generation".to_string(),
411+
))?
412+
.to_owned(),
413+
)
414+
.map_err(|_| ChainError::ParentStateNotFound)?
415+
.ok_or(ChainError::ParentStateNotFound)?,
416+
);
417+
418+
// Use the updated state trie for the next block
362419
trie = updated_trie;
420+
421+
for state_trie_witness in current_trie_witness
422+
.lock()
423+
.map_err(|_| {
424+
ChainError::WitnessGeneration("Failed to lock state trie witness".to_string())
425+
})?
426+
.iter()
427+
{
428+
accumulated_state_trie_witness.insert(state_trie_witness.clone());
429+
}
430+
431+
current_trie_witness = new_state_trie_witness;
363432
}
364433

365-
// Get the witness for the state trie
366-
let mut state_trie_witness = state_trie_witness.lock().map_err(|_| {
367-
ChainError::WitnessGeneration("Failed to lock state trie witness".to_string())
368-
})?;
369-
let state_trie_witness = std::mem::take(&mut *state_trie_witness);
370-
used_trie_nodes.extend_from_slice(&Vec::from_iter(state_trie_witness.into_iter()));
434+
used_trie_nodes
435+
.extend_from_slice(&Vec::from_iter(accumulated_state_trie_witness.into_iter()));
436+
371437
// If the witness is empty at least try to store the root
372438
if used_trie_nodes.is_empty()
373439
&& let Some(root) = root_node
@@ -376,6 +442,7 @@ impl Blockchain {
376442
}
377443

378444
let mut needed_block_numbers = block_hashes.keys().collect::<Vec<_>>();
445+
379446
needed_block_numbers.sort();
380447

381448
// Last needed block header for the witness is the parent of the last block we need to execute
@@ -385,17 +452,26 @@ impl Blockchain {
385452
.header
386453
.number
387454
.saturating_sub(1);
455+
388456
// The first block number we need is either the parent of the first block number or the earliest block number used by BLOCKHASH
389457
let mut first_needed_block_number = first_block_header.number.saturating_sub(1);
458+
390459
if let Some(block_number_from_logger) = needed_block_numbers.first()
391460
&& **block_number_from_logger < first_needed_block_number
392461
{
393462
first_needed_block_number = **block_number_from_logger;
394463
}
464+
395465
let mut block_headers_bytes = Vec::new();
466+
396467
for block_number in first_needed_block_number..=last_needed_block_number {
397-
let header = self.storage.get_block_header(block_number)?.ok_or(
398-
ChainError::WitnessGeneration("Failed to get block header".to_string()),
468+
let hash = block_hashes_map
469+
.get(&block_number)
470+
.ok_or(ChainError::WitnessGeneration(format!(
471+
"Failed to get block {block_number} hash"
472+
)))?;
473+
let header = self.storage.get_block_header_by_hash(*hash)?.ok_or(
474+
ChainError::WitnessGeneration(format!("Failed to get block {block_number} header")),
399475
)?;
400476
block_headers_bytes.push(header.encode_to_vec());
401477
}

crates/l2/Cargo.toml

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -80,5 +80,6 @@ panic = "deny"
8080

8181
[features]
8282
default = ["l2"]
83+
rocksdb = []
8384
metrics = ["ethrex-blockchain/metrics"]
8485
l2 = ["guest_program/l2"]

0 commit comments

Comments
 (0)