diff --git a/.github/workflows/pr-main_l1.yaml b/.github/workflows/pr-main_l1.yaml index 4927a1ec24..6b811b9e4a 100644 --- a/.github/workflows/pr-main_l1.yaml +++ b/.github/workflows/pr-main_l1.yaml @@ -180,16 +180,15 @@ jobs: fail-fast: false matrix: include: - - name: "Rpc Compat tests" - simulation: ethereum/rpc-compat - # https://github.com/ethereum/execution-apis/pull/627 changed the simulation to use a pre-merge genesis block, so we need to pin to a commit before that - buildarg: "branch=d08382ae5c808680e976fce4b73f4ba91647199b" - artifact_prefix: rpc_compat - # muted until further notice - # - name: "Devp2p tests" - # simulation: devp2p - # limit: discv4|eth|snap/Ping|Amplification|Status|StorageRanges|ByteCodes|GetBlockHeaders|SimultaneousRequests|SameRequestID|ZeroRequestID|GetBlockBodies|MaliciousHandshake|MaliciousStatus|Transaction|NewPooledTxs|GetBlockReceipts|LargeTxRequest|InvalidTxs|BlockRangeUpdate|AccountRange|GetTrieNodes|GetByteCodes|GetStorageRanges|Findnode|BlobViolations - # artifact_prefix: devp2p + # - name: "Rpc Compat tests" + # simulation: ethereum/rpc-compat + # # https://github.com/ethereum/execution-apis/pull/627 changed the simulation to use a pre-merge genesis block, so we need to pin to a commit before that + # buildarg: "branch=d08382ae5c808680e976fce4b73f4ba91647199b" + # artifact_prefix: rpc_compat + - name: "Devp2p tests" + simulation: devp2p + limit: discv4|eth|snap/Ping|Amplification|Status|StorageRanges|ByteCodes|GetBlockHeaders|SimultaneousRequests|SameRequestID|ZeroRequestID|GetBlockBodies|MaliciousHandshake|MaliciousStatus|Transaction|NewPooledTxs|GetBlockReceipts|LargeTxRequest|InvalidTxs|BlockRangeUpdate|AccountRange|GetTrieNodes|GetByteCodes|GetStorageRanges|Findnode|BlobViolations + artifact_prefix: devp2p - name: "Engine Auth and EC tests" simulation: ethereum/engine limit: engine-(auth|exchange-capabilities)/ @@ -240,16 +239,12 @@ jobs: shell: bash env: SIM_LIMIT: ${{ matrix.limit }} - SIM_BUILDARG: ${{ matrix.buildarg }} run: | - FLAGS='--sim.parallelism 4 --sim.loglevel 3' + FLAGS='--sim.parallelism 4 --sim.loglevel 3 --docker.output' if [[ -n "$SIM_LIMIT" ]]; then escaped_limit=${SIM_LIMIT//\'/\'\\\'\'} FLAGS+=" --sim.limit '$escaped_limit'" fi - if [[ -n "$SIM_BUILDARG" ]]; then - FLAGS+=" --sim.buildarg $SIM_BUILDARG" - fi echo "flags=$FLAGS" >> "$GITHUB_OUTPUT" - name: Run Hive Simulation @@ -274,7 +269,7 @@ jobs: uses: actions/upload-artifact@v4 with: name: hive_failed_logs_${{ matrix.artifact_prefix }} - path: src/results/failed_logs + path: src/results if-no-files-found: warn # The purpose of this job is to add it as a required check in GitHub so that we don't have to add every individual job as a required check diff --git a/cmd/ethrex/cli.rs b/cmd/ethrex/cli.rs index 002f261c63..7da48fc10c 100644 --- a/cmd/ethrex/cli.rs +++ b/cmd/ethrex/cli.rs @@ -2,6 +2,7 @@ use std::{ fmt::Display, fs::{File, metadata, read_dir}, io::{self, Write}, + mem, path::{Path, PathBuf}, str::FromStr, time::{Duration, Instant}, @@ -16,6 +17,7 @@ use ethrex_p2p::{ }; use ethrex_rlp::encode::RLPEncode; use ethrex_storage::error::StoreError; +use tokio_util::sync::CancellationToken; use tracing::{Level, info, warn}; use crate::{ @@ -511,6 +513,8 @@ pub async fn import_blocks( genesis: Genesis, blockchain_opts: BlockchainOptions, ) -> Result<(), ChainError> { + const IMPORT_BATCH_SIZE: usize = 1024; + const MIN_FULL_BLOCKS: usize = 128; let start_time = Instant::now(); init_datadir(datadir); let store = init_store(datadir, genesis).await; @@ -543,6 +547,7 @@ pub async fn import_blocks( let mut total_blocks_imported = 0; for blocks in chains { + let mut block_batch = vec![]; let size = blocks.len(); let mut numbers_and_hashes = blocks .iter() @@ -558,7 +563,8 @@ pub async fn import_blocks( if last_progress_log.elapsed() >= Duration::from_secs(10) { let processed = index + 1; let percent = (((processed as f64 / size as f64) * 100.0) * 10.0).round() / 10.0; - info!(processed, total = size, percent, "Import progress"); + let total_time = start_time.elapsed().as_secs(); + info!(processed, total = size, percent, total_time, "Import progress"); last_progress_log = Instant::now(); } @@ -574,13 +580,24 @@ pub async fn import_blocks( continue; } - blockchain + if index + MIN_FULL_BLOCKS < size { + block_batch.push(block); + if block_batch.len() >= IMPORT_BATCH_SIZE || index + MIN_FULL_BLOCKS + 1 == size { + blockchain + .add_blocks_in_batch(mem::take(&mut block_batch), CancellationToken::new()) + .await + .map_err(|(err, _)| err)?; + } + } else { + // We need to have the state of the latest 128 blocks + blockchain .add_block_pipeline(block) .inspect_err(|err| match err { // Block number 1's parent not found, the chain must not belong to the same network as the genesis file ChainError::ParentNotFound if number == 1 => warn!("The chain file is not compatible with the genesis file. Are you sure you selected the correct network?"), _ => warn!("Failed to add block {number} with hash {hash:#x}"), })?; + } } // Make head canonical and label all special blocks correctly. diff --git a/crates/blockchain/blockchain.rs b/crates/blockchain/blockchain.rs index a526b3b86e..ee21b42eb1 100644 --- a/crates/blockchain/blockchain.rs +++ b/crates/blockchain/blockchain.rs @@ -861,7 +861,7 @@ impl Blockchain { stored } }); - if self.options.perf_logs_enabled { + if /*self.options.perf_logs_enabled*/ true { Self::print_add_block_pipeline_logs( gas_used, gas_limit, diff --git a/crates/networking/p2p/rlpx/connection/server.rs b/crates/networking/p2p/rlpx/connection/server.rs index 65c128904a..89a3b110eb 100644 --- a/crates/networking/p2p/rlpx/connection/server.rs +++ b/crates/networking/p2p/rlpx/connection/server.rs @@ -66,7 +66,7 @@ use tokio::{ }; use tokio_stream::StreamExt; use tokio_util::codec::Framed; -use tracing::{debug, error, trace, warn}; +use tracing::{debug, error, info, trace, warn}; const PING_INTERVAL: Duration = Duration::from_secs(10); const BLOCK_RANGE_UPDATE_INTERVAL: Duration = Duration::from_secs(60); @@ -623,6 +623,7 @@ async fn send_block_range_update(state: &mut Established) -> Result<(), PeerConn trace!(peer=%state.node, "Sending BlockRangeUpdate"); let update = BlockRangeUpdate::new(&state.storage).await?; let lastet_block = update.latest_block; + info!("Sending BlockRangeUpdate: {update:?}"); send(state, Message::BlockRangeUpdate(update)).await?; state.last_block_range_update_block = lastet_block - (lastet_block % 32); } diff --git a/crates/networking/p2p/rlpx/eth/update.rs b/crates/networking/p2p/rlpx/eth/update.rs index 7e9ea31c87..8ff21dbde2 100644 --- a/crates/networking/p2p/rlpx/eth/update.rs +++ b/crates/networking/p2p/rlpx/eth/update.rs @@ -10,6 +10,7 @@ use ethrex_rlp::{ structs::{Decoder, Encoder}, }; use ethrex_storage::Store; +use tracing::info; #[derive(Debug, Clone)] pub struct BlockRangeUpdate { @@ -38,6 +39,7 @@ impl BlockRangeUpdate { /// Validates an incoming BlockRangeUpdate from a peer pub fn validate(&self) -> Result<(), PeerConnectionError> { + info!("Validating BlockRangeUpdate: {self:?}"); if self.earliest_block > self.latest_block || self.latest_block_hash.is_zero() { return Err(PeerConnectionError::InvalidBlockRangeUpdate); } diff --git a/crates/storage/store.rs b/crates/storage/store.rs index 75c97ce107..c8dc334c00 100644 --- a/crates/storage/store.rs +++ b/crates/storage/store.rs @@ -17,7 +17,7 @@ use ethrex_rlp::decode::RLPDecode; use ethrex_rlp::encode::RLPEncode; use ethrex_trie::{Nibbles, NodeRLP, Trie, TrieLogger, TrieNode, TrieWitness}; use sha3::{Digest as _, Keccak256}; -use std::{collections::hash_map::Entry, sync::Arc}; +use std::{collections::hash_map::Entry, sync::Arc, time::Instant}; use std::{ collections::{BTreeMap, HashMap}, sync::Mutex, @@ -80,6 +80,7 @@ impl Store { } pub fn new(path: impl AsRef, engine_type: EngineType) -> Result { + let start = Instant::now(); let path = path.as_ref(); info!(engine = ?engine_type, ?path, "Opening storage engine"); let store = match engine_type { @@ -95,6 +96,7 @@ impl Store { latest_block_header: Default::default(), }, }; + info!("Opened storage engine in {} secs", start.elapsed().as_secs()); Ok(store) } @@ -618,6 +620,7 @@ impl Store { } pub async fn add_initial_state(&mut self, genesis: Genesis) -> Result<(), StoreError> { + let start = Instant::now(); debug!("Storing initial state from genesis"); // Obtain genesis block @@ -668,6 +671,7 @@ impl Store { .await?; self.forkchoice_update(None, genesis_block_number, genesis_hash, None, None) .await?; + info!("Added initial state in {} secs", start.elapsed().as_secs()); Ok(()) }