Skip to content

Commit 2310a4d

Browse files
committed
feat: switch between find content with census and recursive find content queries
1 parent 81fe75e commit 2310a4d

File tree

1 file changed

+106
-22
lines changed

1 file changed

+106
-22
lines changed

trin-history/src/downloader.rs

Lines changed: 106 additions & 22 deletions
Original file line numberDiff line numberDiff line change
@@ -11,7 +11,7 @@ use std::{
1111
time::Duration,
1212
};
1313

14-
use anyhow::anyhow;
14+
use anyhow::{anyhow, Error};
1515
use ethportal_api::{
1616
jsonrpsee::http_client::{HttpClient, HttpClientBuilder},
1717
types::{
@@ -23,17 +23,23 @@ use ethportal_api::{
2323
BlockBodyKey, BlockReceiptsKey, ContentValue, HistoryContentKey, HistoryContentValue,
2424
OverlayContentKey,
2525
};
26-
use futures::future::join_all;
26+
use futures::{channel::oneshot, future::join_all};
2727
use portal_bridge::census::Census;
28-
use portalnet::{constants::DEFAULT_WEB3_HTTP_ADDRESS, overlay::protocol::OverlayProtocol};
28+
use portalnet::{
29+
constants::DEFAULT_WEB3_HTTP_ADDRESS,
30+
overlay::{command::OverlayCommand, protocol::OverlayProtocol},
31+
};
2932
use ssz_types::BitList;
30-
use tracing::{info, warn};
33+
use tracing::{error, info, warn};
3134
use trin_metrics::downloader::DownloaderMetricsReporter;
3235

3336
use crate::{storage::HistoryStorage, validation::ChainHistoryValidator};
3437

3538
/// The number of blocks to download in a single batch.
36-
const BATCH_SIZE: usize = 100;
39+
const BATCH_SIZE: usize = 30;
40+
/// Enable census with full view of the network and peer scoring to find peers to download content
41+
/// from.
42+
const CENSUS: bool = true;
3743
/// The max number of ENRs to send FindContent queries to.
3844
const CENSUS_ENR_LIMIT: usize = 4;
3945
/// The path to the CSV file with block numbers and block hashes.
@@ -55,7 +61,7 @@ impl Display for ContentType {
5561

5662
#[derive(Clone)]
5763
pub struct Downloader {
58-
pub census: Census,
64+
pub census: Option<Census>,
5965
pub overlay_arc:
6066
Arc<OverlayProtocol<HistoryContentKey, XorMetric, ChainHistoryValidator, HistoryStorage>>,
6167
pub metrics: DownloaderMetricsReporter,
@@ -78,15 +84,23 @@ impl Downloader {
7884

7985
let metrics = DownloaderMetricsReporter::new();
8086

81-
let census = Census::new(http_client, CENSUS_ENR_LIMIT, vec![]);
87+
let mut census = None;
88+
89+
if CENSUS {
90+
info!("Census enabled");
91+
census = Some(Census::new(http_client, CENSUS_ENR_LIMIT, vec![]));
92+
} else {
93+
info!("Census disabled");
94+
}
95+
8296
Self {
8397
overlay_arc,
8498
census,
8599
metrics,
86100
}
87101
}
88102

89-
pub async fn start(mut self) -> io::Result<()> {
103+
pub async fn start(self) -> io::Result<()> {
90104
// set the csv path to a file in the root trin-history directory
91105
info!("Opening CSV file");
92106
let csv_path = Path::new(CSV_PATH);
@@ -98,13 +112,15 @@ impl Downloader {
98112
// skip the header of the csv file
99113
let lines = &lines[1..];
100114
let blocks: Vec<(u64, String)> = lines.iter().map(|line| parse_line(line)).collect();
101-
// Initialize the census with the history subnetwork
102-
let _ = Some(
103-
self.census
104-
.init([Subnetwork::History])
105-
.await
106-
.expect("Failed to initialize Census"),
107-
);
115+
// Initialize the census with the history subnetwork if enabled
116+
if let Some(mut census) = self.census.clone() {
117+
let _ = Some(
118+
census
119+
.init([Subnetwork::History])
120+
.await
121+
.expect("Failed to initialize Census"),
122+
);
123+
}
108124

109125
info!("Processing blocks");
110126
let batches = blocks.chunks(BATCH_SIZE);
@@ -147,9 +163,25 @@ impl Downloader {
147163
block_number: u64,
148164
content_type: ContentType,
149165
) -> anyhow::Result<()> {
166+
if CENSUS {
167+
self.find_content_census(&content_key, block_number, content_type)
168+
.await
169+
} else {
170+
self.recursive_find_content(content_key, block_number, content_type)
171+
.await
172+
}
173+
}
174+
175+
/// Send FindContent queries to the interested peers in the census, includes peers scoring
176+
async fn find_content_census(
177+
&self,
178+
content_key: &HistoryContentKey,
179+
block_number: u64,
180+
content_type: ContentType,
181+
) -> Result<(), Error> {
182+
let census = self.census.clone().expect("census should be enabled");
150183
// Select interested peers from the census
151-
let enrs = self
152-
.census
184+
let enrs = census
153185
.select_peers(Subnetwork::History, &content_key.content_id())
154186
.expect("Failed to select peers");
155187
// Send FindContent query to the interested peers
@@ -184,7 +216,7 @@ impl Downloader {
184216
content_type = %content_type,
185217
"Received ConnectionId content"
186218
);
187-
self.census.record_offer_result(
219+
census.record_offer_result(
188220
Subnetwork::History,
189221
enr.node_id(),
190222
0,
@@ -194,7 +226,7 @@ impl Downloader {
194226
continue;
195227
}
196228
Content::Content(content_bytes) => {
197-
let content = HistoryContentValue::decode(&content_key, &content_bytes);
229+
let content = HistoryContentValue::decode(content_key, &content_bytes);
198230

199231
match content {
200232
Ok(_) => {
@@ -203,7 +235,7 @@ impl Downloader {
203235
content_type = %content_type,
204236
"Received content from peer"
205237
);
206-
self.census.record_offer_result(
238+
census.record_offer_result(
207239
Subnetwork::History,
208240
enr.node_id(),
209241
content_bytes.len(),
@@ -220,7 +252,7 @@ impl Downloader {
220252
content_type = %content_type,
221253
"Failed to parse content from peer, invalid content"
222254
);
223-
self.census.record_offer_result(
255+
census.record_offer_result(
224256
Subnetwork::History,
225257
enr.node_id(),
226258
0,
@@ -238,7 +270,7 @@ impl Downloader {
238270
content_type = %content_type,
239271
"Received Enrs content, content not found from peer"
240272
);
241-
self.census.record_offer_result(
273+
census.record_offer_result(
242274
Subnetwork::History,
243275
enr.node_id(),
244276
0,
@@ -256,6 +288,58 @@ impl Downloader {
256288
);
257289
Err(anyhow!("Failed to find content for block"))
258290
}
291+
292+
/// Send recursive FindContent queries to the overlay service
293+
async fn recursive_find_content(
294+
&self,
295+
content_key: HistoryContentKey,
296+
block_number: u64,
297+
content_type: ContentType,
298+
) -> anyhow::Result<()> {
299+
let (tx, rx) = oneshot::channel();
300+
301+
let overlay_command = OverlayCommand::FindContentQuery {
302+
target: content_key.clone(),
303+
callback: tx,
304+
config: Default::default(),
305+
};
306+
307+
if let Err(err) = self.overlay_arc.command_tx.send(overlay_command) {
308+
warn!(
309+
error = %err,
310+
block_number = block_number,
311+
content_type = %content_type,
312+
"Error submitting FindContent query to service"
313+
);
314+
}
315+
match rx.await {
316+
Ok(result) => match result {
317+
Ok(result) => {
318+
HistoryContentValue::decode(&content_key, &result.0)?;
319+
info!(block_number = block_number, "Downloaded content for block");
320+
Ok(())
321+
}
322+
Err(err) => {
323+
error!(
324+
block_number = block_number,
325+
content_type = %content_type,
326+
error = %err,
327+
"Error in FindContent query"
328+
);
329+
Err(anyhow!("Error in FindContent query: {:?}", err))
330+
}
331+
},
332+
Err(err) => {
333+
error!(
334+
block_number = block_number,
335+
content_type = %content_type,
336+
error = %err,
337+
"Error receiving FindContent query response"
338+
);
339+
Err(err.into())
340+
}
341+
}
342+
}
259343
}
260344

261345
fn parse_line(line: &str) -> (u64, String) {

0 commit comments

Comments
 (0)