Skip to content

Commit 9adab49

Browse files
committed
feat: add content downlaoder to qury data from the network
1 parent f3e2e80 commit 9adab49

File tree

4 files changed

+167
-1
lines changed

4 files changed

+167
-1
lines changed

Cargo.lock

Lines changed: 1 addition & 0 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

trin-history/Cargo.toml

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -26,6 +26,7 @@ tree_hash.workspace = true
2626
trin-storage.workspace = true
2727
trin-validation.workspace = true
2828
utp-rs.workspace = true
29+
futures = "0.3.31"
2930

3031
[dev-dependencies]
3132
env_logger.workspace = true

trin-history/src/downloader.rs

Lines changed: 154 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,154 @@
1+
/// Downloader struct that load a data CSV file from disk with block number and block hashes
2+
/// and do FIndContent queries in batches to download all the content from the csv file.
3+
/// We don't save the content to disk, we just download it and drop
4+
/// it. But we need to measure the time it takes to download all the content, the number of
5+
/// queries and the number of bytes downloaded, the data ingress rate and the query rate.
6+
use std::fs::File;
7+
use std::{
8+
io::{self, BufRead},
9+
path::Path,
10+
};
11+
12+
use anyhow::anyhow;
13+
use ethportal_api::{
14+
utils::bytes::hex_decode, BlockBodyKey, BlockReceiptsKey, ContentValue, HistoryContentKey,
15+
HistoryContentValue,
16+
};
17+
use futures::{channel::oneshot, future::join_all};
18+
use portalnet::overlay::command::OverlayCommand;
19+
use tokio::sync::mpsc::UnboundedSender;
20+
use tracing::{error, info, warn};
21+
22+
/// The number of blocks to download in a single batch.
23+
const BATCH_SIZE: usize = 40;
24+
/// The path to the CSV file with block numbers and block hashes.
25+
const CSV_PATH: &str = "ethereum_blocks_14000000_merge.csv";
26+
27+
#[derive(Clone)]
28+
pub struct Downloader {
29+
pub overlay_tx: UnboundedSender<OverlayCommand<HistoryContentKey>>,
30+
}
31+
32+
impl Downloader {
33+
pub fn new(overlay_tx: UnboundedSender<OverlayCommand<HistoryContentKey>>) -> Self {
34+
Self { overlay_tx }
35+
}
36+
37+
pub async fn start(self) -> io::Result<()> {
38+
// set the csv path to a file in the root trin-history directory
39+
info!("Opening CSV file");
40+
let csv_path = Path::new(CSV_PATH);
41+
let file = File::open(csv_path)?;
42+
let reader = io::BufReader::new(file);
43+
info!("Reading CSV file");
44+
let lines: Vec<_> = reader.lines().collect::<Result<_, _>>()?;
45+
// Create a hash table in memory with all the block hashes and block numbers
46+
info!("Parsing CSV file");
47+
// skip the header of the csv file
48+
let lines = &lines[1..];
49+
let blocks: Vec<(u64, String)> = lines.iter().map(|line| parse_line(line)).collect();
50+
info!("Processing blocks");
51+
let batches = blocks.chunks(BATCH_SIZE);
52+
53+
for batch in batches {
54+
self.clone().process_batches(batch.to_vec()).await;
55+
}
56+
57+
tokio::signal::ctrl_c()
58+
.await
59+
.expect("failed to pause until ctrl-c");
60+
61+
Ok(())
62+
}
63+
64+
async fn process_batches(self, batch: Vec<(u64, String)>) {
65+
let mut futures = Vec::new();
66+
67+
for (block_number, block_hash) in batch {
68+
let block_body_content_key = generate_block_body_content_key(block_hash.clone());
69+
futures.push(self.find_content(block_body_content_key, block_number));
70+
info!(
71+
block_number = block_number,
72+
"Sent FindContent query for block body"
73+
);
74+
let block_receipts_content_key = generate_block_receipts_content_key(block_hash);
75+
futures.push(self.find_content(block_receipts_content_key, block_number));
76+
info!(
77+
block_number = block_number,
78+
"Sent FindContent query for block receipts"
79+
);
80+
}
81+
join_all(futures).await;
82+
}
83+
84+
async fn find_content(
85+
&self,
86+
content_key: HistoryContentKey,
87+
block_number: u64,
88+
) -> anyhow::Result<()> {
89+
let (tx, rx) = oneshot::channel();
90+
91+
let overlay_command = OverlayCommand::FindContentQuery {
92+
target: content_key.clone(),
93+
callback: tx,
94+
config: Default::default(),
95+
};
96+
97+
if let Err(err) = self.overlay_tx.send(overlay_command) {
98+
warn!(
99+
error = %err,
100+
"Error submitting FindContent query to service"
101+
);
102+
}
103+
match rx.await {
104+
Ok(result) => match result {
105+
Ok(result) => {
106+
HistoryContentValue::decode(&content_key, &result.0)?;
107+
info!(block_number = block_number, "Downloaded content for block");
108+
Ok(())
109+
}
110+
Err(err) => {
111+
error!(
112+
block_number = block_number,
113+
error = %err,
114+
"Error in FindContent query"
115+
);
116+
Err(anyhow!("Error in FindContent query: {:?}", err))
117+
}
118+
},
119+
Err(err) => {
120+
error!(
121+
block_number = block_number,
122+
error = %err,
123+
"Error receiving FindContent query response"
124+
);
125+
Err(err.into())
126+
}
127+
}
128+
}
129+
}
130+
131+
fn parse_line(line: &str) -> (u64, String) {
132+
let parts: Vec<&str> = line.split(',').collect();
133+
let block_number = parts[0].parse().expect("Failed to parse block number");
134+
let block_hash = parts[1].to_string();
135+
(block_number, block_hash)
136+
}
137+
138+
fn generate_block_body_content_key(block_hash: String) -> HistoryContentKey {
139+
HistoryContentKey::BlockBody(BlockBodyKey {
140+
block_hash: <[u8; 32]>::try_from(
141+
hex_decode(&block_hash).expect("Failed to decode block hash"),
142+
)
143+
.expect("Failed to convert block hash to byte array"),
144+
})
145+
}
146+
147+
fn generate_block_receipts_content_key(block_hash: String) -> HistoryContentKey {
148+
HistoryContentKey::BlockReceipts(BlockReceiptsKey {
149+
block_hash: <[u8; 32]>::try_from(
150+
hex_decode(&block_hash).expect("Failed to decode block hash"),
151+
)
152+
.expect("Failed to convert block hash to byte array"),
153+
})
154+
}

trin-history/src/lib.rs

Lines changed: 11 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,7 @@
11
#![warn(clippy::unwrap_used)]
22
#![warn(clippy::uninlined_format_args)]
33

4+
mod downloader;
45
pub mod events;
56
mod jsonrpc;
67
pub mod network;
@@ -9,6 +10,7 @@ pub mod validation;
910

1011
use std::sync::Arc;
1112

13+
use downloader::Downloader;
1214
use ethportal_api::types::jsonrpc::request::HistoryJsonRpcRequest;
1315
use network::HistoryNetwork;
1416
use portalnet::{
@@ -21,7 +23,7 @@ use tokio::{
2123
task::JoinHandle,
2224
time::{interval, Duration},
2325
};
24-
use tracing::info;
26+
use tracing::{error, info};
2527
use trin_storage::PortalStorageConfig;
2628
use trin_validation::oracle::HeaderOracle;
2729
use utp_rs::socket::UtpSocket;
@@ -99,6 +101,14 @@ pub fn spawn_history_network(
99101
// hacky test: make sure we establish a session with the boot node
100102
network.overlay.ping_bootnodes().await;
101103

104+
let overlay_tx = network.overlay.command_tx.clone();
105+
let downloader = Downloader::new(overlay_tx);
106+
tokio::spawn(async move {
107+
if let Err(e) = downloader.start().await {
108+
error!("Downloader error: {:?}", e);
109+
}
110+
});
111+
102112
tokio::signal::ctrl_c()
103113
.await
104114
.expect("failed to pause until ctrl-c");

0 commit comments

Comments
 (0)