Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
27 changes: 11 additions & 16 deletions .github/workflows/pr-main_l1.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -180,16 +180,15 @@ jobs:
fail-fast: false
matrix:
include:
- name: "Rpc Compat tests"
simulation: ethereum/rpc-compat
# https://github.com/ethereum/execution-apis/pull/627 changed the simulation to use a pre-merge genesis block, so we need to pin to a commit before that
buildarg: "branch=d08382ae5c808680e976fce4b73f4ba91647199b"
artifact_prefix: rpc_compat
# muted until further notice
# - name: "Devp2p tests"
# simulation: devp2p
# limit: discv4|eth|snap/Ping|Amplification|Status|StorageRanges|ByteCodes|GetBlockHeaders|SimultaneousRequests|SameRequestID|ZeroRequestID|GetBlockBodies|MaliciousHandshake|MaliciousStatus|Transaction|NewPooledTxs|GetBlockReceipts|LargeTxRequest|InvalidTxs|BlockRangeUpdate|AccountRange|GetTrieNodes|GetByteCodes|GetStorageRanges|Findnode|BlobViolations
# artifact_prefix: devp2p
# - name: "Rpc Compat tests"
# simulation: ethereum/rpc-compat
# # https://github.com/ethereum/execution-apis/pull/627 changed the simulation to use a pre-merge genesis block, so we need to pin to a commit before that
# buildarg: "branch=d08382ae5c808680e976fce4b73f4ba91647199b"
# artifact_prefix: rpc_compat
- name: "Devp2p tests"
simulation: devp2p
limit: discv4|eth|snap/Ping|Amplification|Status|StorageRanges|ByteCodes|GetBlockHeaders|SimultaneousRequests|SameRequestID|ZeroRequestID|GetBlockBodies|MaliciousHandshake|MaliciousStatus|Transaction|NewPooledTxs|GetBlockReceipts|LargeTxRequest|InvalidTxs|BlockRangeUpdate|AccountRange|GetTrieNodes|GetByteCodes|GetStorageRanges|Findnode|BlobViolations
artifact_prefix: devp2p
- name: "Engine Auth and EC tests"
simulation: ethereum/engine
limit: engine-(auth|exchange-capabilities)/
Expand Down Expand Up @@ -240,16 +239,12 @@ jobs:
shell: bash
env:
SIM_LIMIT: ${{ matrix.limit }}
SIM_BUILDARG: ${{ matrix.buildarg }}
run: |
FLAGS='--sim.parallelism 4 --sim.loglevel 3'
FLAGS='--sim.parallelism 4 --sim.loglevel 3 --docker.output'
if [[ -n "$SIM_LIMIT" ]]; then
escaped_limit=${SIM_LIMIT//\'/\'\\\'\'}
FLAGS+=" --sim.limit '$escaped_limit'"
fi
if [[ -n "$SIM_BUILDARG" ]]; then
FLAGS+=" --sim.buildarg $SIM_BUILDARG"
fi
echo "flags=$FLAGS" >> "$GITHUB_OUTPUT"

- name: Run Hive Simulation
Expand All @@ -274,7 +269,7 @@ jobs:
uses: actions/upload-artifact@v4
with:
name: hive_failed_logs_${{ matrix.artifact_prefix }}
path: src/results/failed_logs
path: src/results
if-no-files-found: warn

# The purpose of this job is to add it as a required check in GitHub so that we don't have to add every individual job as a required check
Expand Down
21 changes: 19 additions & 2 deletions cmd/ethrex/cli.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ use std::{
fmt::Display,
fs::{File, metadata, read_dir},
io::{self, Write},
mem,
path::{Path, PathBuf},
str::FromStr,
time::{Duration, Instant},
Expand All @@ -16,6 +17,7 @@ use ethrex_p2p::{
};
use ethrex_rlp::encode::RLPEncode;
use ethrex_storage::error::StoreError;
use tokio_util::sync::CancellationToken;
use tracing::{Level, info, warn};

use crate::{
Expand Down Expand Up @@ -511,6 +513,8 @@ pub async fn import_blocks(
genesis: Genesis,
blockchain_opts: BlockchainOptions,
) -> Result<(), ChainError> {
const IMPORT_BATCH_SIZE: usize = 1024;
const MIN_FULL_BLOCKS: usize = 128;
let start_time = Instant::now();
init_datadir(datadir);
let store = init_store(datadir, genesis).await;
Expand Down Expand Up @@ -543,6 +547,7 @@ pub async fn import_blocks(

let mut total_blocks_imported = 0;
for blocks in chains {
let mut block_batch = vec![];
let size = blocks.len();
let mut numbers_and_hashes = blocks
.iter()
Expand All @@ -558,7 +563,8 @@ pub async fn import_blocks(
if last_progress_log.elapsed() >= Duration::from_secs(10) {
let processed = index + 1;
let percent = (((processed as f64 / size as f64) * 100.0) * 10.0).round() / 10.0;
info!(processed, total = size, percent, "Import progress");
let total_time = start_time.elapsed().as_secs();
info!(processed, total = size, percent, total_time, "Import progress");
last_progress_log = Instant::now();
}

Expand All @@ -574,13 +580,24 @@ pub async fn import_blocks(
continue;
}

blockchain
if index + MIN_FULL_BLOCKS < size {
block_batch.push(block);
if block_batch.len() >= IMPORT_BATCH_SIZE || index + MIN_FULL_BLOCKS + 1 == size {
blockchain
.add_blocks_in_batch(mem::take(&mut block_batch), CancellationToken::new())
.await
.map_err(|(err, _)| err)?;
}
} else {
// We need to have the state of the latest 128 blocks
blockchain
.add_block_pipeline(block)
.inspect_err(|err| match err {
// Block number 1's parent not found, the chain must not belong to the same network as the genesis file
ChainError::ParentNotFound if number == 1 => warn!("The chain file is not compatible with the genesis file. Are you sure you selected the correct network?"),
_ => warn!("Failed to add block {number} with hash {hash:#x}"),
})?;
}
}

// Make head canonical and label all special blocks correctly.
Expand Down
2 changes: 1 addition & 1 deletion crates/blockchain/blockchain.rs
Original file line number Diff line number Diff line change
Expand Up @@ -861,7 +861,7 @@ impl Blockchain {
stored
}
});
if self.options.perf_logs_enabled {
if /*self.options.perf_logs_enabled*/ true {
Self::print_add_block_pipeline_logs(
gas_used,
gas_limit,
Expand Down
3 changes: 2 additions & 1 deletion crates/networking/p2p/rlpx/connection/server.rs
Original file line number Diff line number Diff line change
Expand Up @@ -66,7 +66,7 @@ use tokio::{
};
use tokio_stream::StreamExt;
use tokio_util::codec::Framed;
use tracing::{debug, error, trace, warn};
use tracing::{debug, error, info, trace, warn};

const PING_INTERVAL: Duration = Duration::from_secs(10);
const BLOCK_RANGE_UPDATE_INTERVAL: Duration = Duration::from_secs(60);
Expand Down Expand Up @@ -623,6 +623,7 @@ async fn send_block_range_update(state: &mut Established) -> Result<(), PeerConn
trace!(peer=%state.node, "Sending BlockRangeUpdate");
let update = BlockRangeUpdate::new(&state.storage).await?;
let lastet_block = update.latest_block;
info!("Sending BlockRangeUpdate: {update:?}");
send(state, Message::BlockRangeUpdate(update)).await?;
state.last_block_range_update_block = lastet_block - (lastet_block % 32);
}
Expand Down
2 changes: 2 additions & 0 deletions crates/networking/p2p/rlpx/eth/update.rs
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ use ethrex_rlp::{
structs::{Decoder, Encoder},
};
use ethrex_storage::Store;
use tracing::info;

#[derive(Debug, Clone)]
pub struct BlockRangeUpdate {
Expand Down Expand Up @@ -38,6 +39,7 @@ impl BlockRangeUpdate {

/// Validates an incoming BlockRangeUpdate from a peer
pub fn validate(&self) -> Result<(), PeerConnectionError> {
info!("Validating BlockRangeUpdate: {self:?}");
if self.earliest_block > self.latest_block || self.latest_block_hash.is_zero() {
return Err(PeerConnectionError::InvalidBlockRangeUpdate);
}
Expand Down
6 changes: 5 additions & 1 deletion crates/storage/store.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@ use ethrex_rlp::decode::RLPDecode;
use ethrex_rlp::encode::RLPEncode;
use ethrex_trie::{Nibbles, NodeRLP, Trie, TrieLogger, TrieNode, TrieWitness};
use sha3::{Digest as _, Keccak256};
use std::{collections::hash_map::Entry, sync::Arc};
use std::{collections::hash_map::Entry, sync::Arc, time::Instant};
use std::{
collections::{BTreeMap, HashMap},
sync::Mutex,
Expand Down Expand Up @@ -80,6 +80,7 @@ impl Store {
}

pub fn new(path: impl AsRef<Path>, engine_type: EngineType) -> Result<Self, StoreError> {
let start = Instant::now();
let path = path.as_ref();
info!(engine = ?engine_type, ?path, "Opening storage engine");
let store = match engine_type {
Expand All @@ -95,6 +96,7 @@ impl Store {
latest_block_header: Default::default(),
},
};
info!("Opened storage engine in {} secs", start.elapsed().as_secs());

Ok(store)
}
Expand Down Expand Up @@ -618,6 +620,7 @@ impl Store {
}

pub async fn add_initial_state(&mut self, genesis: Genesis) -> Result<(), StoreError> {
let start = Instant::now();
debug!("Storing initial state from genesis");

// Obtain genesis block
Expand Down Expand Up @@ -668,6 +671,7 @@ impl Store {
.await?;
self.forkchoice_update(None, genesis_block_number, genesis_hash, None, None)
.await?;
info!("Added initial state in {} secs", start.elapsed().as_secs());
Ok(())
}

Expand Down
Loading