Skip to content

Commit 71543ef

Browse files
RomanHodulak07Vaishnavi-Singh
authored andcommitted
feat(era): Add history import (paradigmxyz#15737)
1 parent 55fb8e0 commit 71543ef

File tree

13 files changed

+415
-64
lines changed

13 files changed

+415
-64
lines changed

.github/assets/check_wasm.sh

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -73,6 +73,7 @@ exclude_crates=(
7373
reth-testing-utils
7474
reth-optimism-txpool # reth-transaction-pool
7575
reth-era-downloader # tokio
76+
reth-era-import # tokio
7677
)
7778

7879
# Array to hold the results

Cargo.lock

Lines changed: 22 additions & 0 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

Cargo.toml

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -30,6 +30,7 @@ members = [
3030
"crates/engine/util/",
3131
"crates/era",
3232
"crates/era-downloader",
33+
"crates/era-import",
3334
"crates/errors/",
3435
"crates/ethereum-forks/",
3536
"crates/ethereum/cli/",
@@ -346,6 +347,7 @@ reth-engine-util = { path = "crates/engine/util" }
346347
reth-errors = { path = "crates/errors" }
347348
reth-era = { path = "crates/era" }
348349
reth-era-downloader = { path = "crates/era-downloader" }
350+
reth-era-import = { path = "crates/era-import" }
349351
reth-eth-wire = { path = "crates/net/eth-wire" }
350352
reth-eth-wire-types = { path = "crates/net/eth-wire-types" }
351353
reth-ethereum-cli = { path = "crates/ethereum/cli" }

crates/era-downloader/Cargo.toml

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,6 @@
11
[package]
22
name = "reth-era-downloader"
3+
description = "An asynchronous stream interface for downloading ERA1 files"
34
version.workspace = true
45
edition.workspace = true
56
rust-version.workspace = true

crates/era-downloader/src/client.rs

Lines changed: 6 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -11,14 +11,17 @@ use tokio::{
1111
/// Accesses the network over HTTP.
1212
pub trait HttpClient {
1313
/// Makes an HTTP GET request to `url`. Returns a stream of response body bytes.
14-
fn get<U: IntoUrl>(
14+
fn get<U: IntoUrl + Send + Sync>(
1515
&self,
1616
url: U,
17-
) -> impl Future<Output = eyre::Result<impl Stream<Item = eyre::Result<Bytes>> + Unpin>>;
17+
) -> impl Future<
18+
Output = eyre::Result<impl Stream<Item = eyre::Result<Bytes>> + Send + Sync + Unpin>,
19+
> + Send
20+
+ Sync;
1821
}
1922

2023
impl HttpClient for Client {
21-
async fn get<U: IntoUrl>(
24+
async fn get<U: IntoUrl + Send + Sync>(
2225
&self,
2326
url: U,
2427
) -> eyre::Result<impl Stream<Item = eyre::Result<Bytes>> + Unpin> {

crates/era-downloader/src/stream.rs

Lines changed: 17 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -45,6 +45,20 @@ impl EraStreamConfig {
4545
}
4646

4747
/// An asynchronous stream of ERA1 files.
48+
///
49+
/// # Examples
50+
/// ```
51+
/// use futures_util::StreamExt;
52+
/// use reth_era_downloader::{EraStream, HttpClient};
53+
///
54+
/// # async fn import(mut stream: EraStream<impl HttpClient + Clone + Send + Sync + 'static + Unpin>) -> eyre::Result<()> {
55+
/// while let Some(file) = stream.next().await {
56+
/// let file = file?;
57+
/// // Process `file: Box<Path>`
58+
/// }
59+
/// # Ok(())
60+
/// # }
61+
/// ```
4862
#[derive(Debug)]
4963
pub struct EraStream<Http> {
5064
download_stream: DownloadStream,
@@ -98,7 +112,8 @@ impl<Http: HttpClient + Clone + Send + Sync + 'static + Unpin> Stream for EraStr
98112
}
99113
}
100114

101-
type DownloadFuture = Pin<Box<dyn Future<Output = eyre::Result<Box<Path>>>>>;
115+
type DownloadFuture =
116+
Pin<Box<dyn Future<Output = eyre::Result<Box<Path>>> + Send + Sync + 'static>>;
102117

103118
struct DownloadStream {
104119
downloads: FuturesOrdered<DownloadFuture>,
@@ -167,7 +182,7 @@ enum State {
167182
}
168183

169184
impl<Http: HttpClient + Clone + Send + Sync + 'static + Unpin> Stream for StartingStream<Http> {
170-
type Item = Pin<Box<dyn Future<Output = eyre::Result<Box<Path>>>>>;
185+
type Item = DownloadFuture;
171186

172187
fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
173188
if self.state == State::Initial {

crates/era-downloader/tests/it/main.rs

Lines changed: 11 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -23,10 +23,13 @@ const MAINNET_1: &[u8] = include_bytes!("../res/mainnet-00001-a5364e9a.era1");
2323
struct StubClient;
2424

2525
impl HttpClient for StubClient {
26-
fn get<U: IntoUrl>(
26+
fn get<U: IntoUrl + Send + Sync>(
2727
&self,
2828
url: U,
29-
) -> impl Future<Output = eyre::Result<impl Stream<Item = eyre::Result<Bytes>> + Unpin>> {
29+
) -> impl Future<
30+
Output = eyre::Result<impl Stream<Item = eyre::Result<Bytes>> + Send + Sync + Unpin>,
31+
> + Send
32+
+ Sync {
3033
let url = url.into_url().unwrap();
3134

3235
async move {
@@ -35,37 +38,37 @@ impl HttpClient for StubClient {
3538
Ok(Box::new(futures::stream::once(Box::pin(async move {
3639
Ok(bytes::Bytes::from(NIMBUS))
3740
})))
38-
as Box<dyn Stream<Item = eyre::Result<Bytes>> + Unpin>)
41+
as Box<dyn Stream<Item = eyre::Result<Bytes>> + Send + Sync + Unpin>)
3942
}
4043
"https://era1.ethportal.net/" => {
4144
Ok(Box::new(futures::stream::once(Box::pin(async move {
4245
Ok(bytes::Bytes::from(ETH_PORTAL))
4346
})))
44-
as Box<dyn Stream<Item = eyre::Result<Bytes>> + Unpin>)
47+
as Box<dyn Stream<Item = eyre::Result<Bytes>> + Send + Sync + Unpin>)
4548
}
4649
"https://era1.ethportal.net/mainnet-00000-5ec1ffb8.era1" => {
4750
Ok(Box::new(futures::stream::once(Box::pin(async move {
4851
Ok(bytes::Bytes::from(MAINNET_0))
4952
})))
50-
as Box<dyn Stream<Item = eyre::Result<Bytes>> + Unpin>)
53+
as Box<dyn Stream<Item = eyre::Result<Bytes>> + Send + Sync + Unpin>)
5154
}
5255
"https://mainnet.era1.nimbus.team/mainnet-00000-5ec1ffb8.era1" => {
5356
Ok(Box::new(futures::stream::once(Box::pin(async move {
5457
Ok(bytes::Bytes::from(MAINNET_0))
5558
})))
56-
as Box<dyn Stream<Item = eyre::Result<Bytes>> + Unpin>)
59+
as Box<dyn Stream<Item = eyre::Result<Bytes>> + Send + Sync + Unpin>)
5760
}
5861
"https://era1.ethportal.net/mainnet-00001-a5364e9a.era1" => {
5962
Ok(Box::new(futures::stream::once(Box::pin(async move {
6063
Ok(bytes::Bytes::from(MAINNET_1))
6164
})))
62-
as Box<dyn Stream<Item = eyre::Result<Bytes>> + Unpin>)
65+
as Box<dyn Stream<Item = eyre::Result<Bytes>> + Send + Sync + Unpin>)
6366
}
6467
"https://mainnet.era1.nimbus.team/mainnet-00001-a5364e9a.era1" => {
6568
Ok(Box::new(futures::stream::once(Box::pin(async move {
6669
Ok(bytes::Bytes::from(MAINNET_1))
6770
})))
68-
as Box<dyn Stream<Item = eyre::Result<Bytes>> + Unpin>)
71+
as Box<dyn Stream<Item = eyre::Result<Bytes>> + Send + Sync + Unpin>)
6972
}
7073
v => unimplemented!("Unexpected URL \"{v}\""),
7174
}

crates/era-import/Cargo.toml

Lines changed: 52 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,52 @@
1+
[package]
2+
name = "reth-era-import"
3+
description = "Imports history from ERA files"
4+
version.workspace = true
5+
edition.workspace = true
6+
rust-version.workspace = true
7+
license.workspace = true
8+
homepage.workspace = true
9+
repository.workspace = true
10+
exclude.workspace = true
11+
12+
[dependencies]
13+
# alloy
14+
alloy-primitives.workspace = true
15+
16+
# reth
17+
reth-db-api.workspace = true
18+
reth-era.workspace = true
19+
reth-era-downloader.workspace = true
20+
reth-etl.workspace = true
21+
reth-fs-util.workspace = true
22+
reth-provider.workspace = true
23+
reth-storage-api.workspace = true
24+
reth-primitives-traits.workspace = true
25+
26+
# async
27+
tokio.workspace = true
28+
tokio.features = ["fs", "io-util"]
29+
futures-util.workspace = true
30+
31+
# errors
32+
eyre.workspace = true
33+
tracing.workspace = true
34+
35+
[dev-dependencies]
36+
# reth
37+
reth-provider.workspace = true
38+
reth-provider.features = ["test-utils"]
39+
reth-db-common.workspace = true
40+
41+
# async
42+
tokio.workspace = true
43+
tokio.features = ["fs", "io-util", "macros"]
44+
45+
# http
46+
reqwest.workspace = true
47+
48+
# file system
49+
tempfile.workspace = true
50+
51+
[lints]
52+
workspace = true

crates/era-import/src/history.rs

Lines changed: 148 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,148 @@
1+
use alloy_primitives::{BlockHash, BlockNumber};
2+
use futures_util::StreamExt;
3+
use reth_db_api::{
4+
cursor::{DbCursorRO, DbCursorRW},
5+
table::Value,
6+
tables,
7+
transaction::{DbTx, DbTxMut},
8+
RawKey, RawTable, RawValue,
9+
};
10+
use reth_era::{era1_file::Era1Reader, execution_types::DecodeCompressed};
11+
use reth_era_downloader::{EraStream, HttpClient};
12+
use reth_etl::Collector;
13+
use reth_fs_util as fs;
14+
use reth_primitives_traits::{Block, FullBlockBody, FullBlockHeader, NodePrimitives};
15+
use reth_provider::{
16+
BlockWriter, ProviderError, StaticFileProviderFactory, StaticFileSegment, StaticFileWriter,
17+
};
18+
use reth_storage_api::{DBProvider, HeaderProvider, NodePrimitivesProvider, StorageLocation};
19+
use std::sync::mpsc;
20+
use tracing::info;
21+
22+
/// Imports blocks from `downloader` using `provider`.
23+
///
24+
/// Returns current block height.
25+
pub fn import<H, P, B, BB, BH>(
26+
mut downloader: EraStream<H>,
27+
provider: &P,
28+
mut hash_collector: Collector<BlockHash, BlockNumber>,
29+
) -> eyre::Result<BlockNumber>
30+
where
31+
B: Block<Header = BH, Body = BB>,
32+
BH: FullBlockHeader + Value,
33+
BB: FullBlockBody<
34+
Transaction = <<P as NodePrimitivesProvider>::Primitives as NodePrimitives>::SignedTx,
35+
OmmerHeader = BH,
36+
>,
37+
H: HttpClient + Clone + Send + Sync + 'static + Unpin,
38+
P: DBProvider<Tx: DbTxMut> + StaticFileProviderFactory + BlockWriter<Block = B>,
39+
<P as NodePrimitivesProvider>::Primitives: NodePrimitives<BlockHeader = BH, BlockBody = BB>,
40+
{
41+
let (tx, rx) = mpsc::channel();
42+
43+
// Handle IO-bound async download in a background tokio task
44+
tokio::spawn(async move {
45+
while let Some(file) = downloader.next().await {
46+
tx.send(Some(file))?;
47+
}
48+
tx.send(None)
49+
});
50+
51+
let static_file_provider = provider.static_file_provider();
52+
53+
// Consistency check of expected headers in static files vs DB is done on provider::sync_gap
54+
// when poll_execute_ready is polled.
55+
let mut last_header_number = static_file_provider
56+
.get_highest_static_file_block(StaticFileSegment::Headers)
57+
.unwrap_or_default();
58+
59+
// Find the latest total difficulty
60+
let mut td = static_file_provider
61+
.header_td_by_number(last_header_number)?
62+
.ok_or(ProviderError::TotalDifficultyNotFound(last_header_number))?;
63+
64+
// Although headers were downloaded in reverse order, the collector iterates it in ascending
65+
// order
66+
let mut writer = static_file_provider.latest_writer(StaticFileSegment::Headers)?;
67+
68+
while let Some(path) = rx.recv()? {
69+
let path = path?;
70+
let file = fs::open(path.clone())?;
71+
let mut reader = Era1Reader::new(file);
72+
73+
for block in reader.iter() {
74+
let block = block?;
75+
let header: BH = block.header.decode()?;
76+
let body: BB = block.body.decode()?;
77+
let number = header.number();
78+
79+
if number == 0 {
80+
continue;
81+
}
82+
83+
let hash = header.hash_slow();
84+
last_header_number = number;
85+
86+
// Increase total difficulty
87+
td += header.difficulty();
88+
89+
// Append to Headers segment
90+
writer.append_header(&header, td, &hash)?;
91+
92+
// Write bodies to database.
93+
provider.append_block_bodies(
94+
vec![(header.number(), Some(body))],
95+
// We are writing transactions directly to static files.
96+
StorageLocation::StaticFiles,
97+
)?;
98+
99+
hash_collector.insert(hash, number)?;
100+
}
101+
102+
info!(target: "era::history::import", "Processed {}", path.to_string_lossy());
103+
104+
fs::remove_file(path)?;
105+
}
106+
107+
let total_headers = hash_collector.len();
108+
info!(target: "era::history::import", total = total_headers, "Writing headers hash index");
109+
110+
// Database cursor for hash to number index
111+
let mut cursor_header_numbers =
112+
provider.tx_ref().cursor_write::<RawTable<tables::HeaderNumbers>>()?;
113+
let mut first_sync = false;
114+
115+
// If we only have the genesis block hash, then we are at first sync, and we can remove it,
116+
// add it to the collector and use tx.append on all hashes.
117+
if provider.tx_ref().entries::<RawTable<tables::HeaderNumbers>>()? == 1 {
118+
if let Some((hash, block_number)) = cursor_header_numbers.last()? {
119+
if block_number.value()? == 0 {
120+
hash_collector.insert(hash.key()?, 0)?;
121+
cursor_header_numbers.delete_current()?;
122+
first_sync = true;
123+
}
124+
}
125+
}
126+
127+
let interval = (total_headers / 10).max(1);
128+
129+
// Build block hash to block number index
130+
for (index, hash_to_number) in hash_collector.iter()?.enumerate() {
131+
let (hash, number) = hash_to_number?;
132+
133+
if index > 0 && index % interval == 0 && total_headers > 100 {
134+
info!(target: "era::history::import", progress = %format!("{:.2}%", (index as f64 / total_headers as f64) * 100.0), "Writing headers hash index");
135+
}
136+
137+
let hash = RawKey::<BlockHash>::from_vec(hash);
138+
let number = RawValue::<BlockNumber>::from_vec(number);
139+
140+
if first_sync {
141+
cursor_header_numbers.append(hash, &number)?;
142+
} else {
143+
cursor_header_numbers.upsert(hash, &number)?;
144+
}
145+
}
146+
147+
Ok(last_header_number)
148+
}

crates/era-import/src/lib.rs

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,7 @@
1+
//! Imports history from ERA files.
2+
//!
3+
//! The import is downloaded using [`reth_era_downloader`] and parsed using [`reth_era`].
4+
5+
mod history;
6+
7+
pub use history::import;

0 commit comments

Comments
 (0)