11/// Downloader struct that load a data CSV file from disk with block number and block hashes
22/// and do FIndContent queries in batches to download all the content from the csv file.
33/// We don't save the content to disk, we just download it and drop
4- /// it. But we need to measure the time it takes to download all the content, the number of
5- /// queries and the number of bytes downloaded, the data ingress rate and the query rate.
4+ /// it.
65use std:: fs:: File ;
76use std:: {
7+ fmt:: { Display , Formatter } ,
88 io:: { self , BufRead } ,
99 path:: Path ,
10+ sync:: Arc ,
1011 time:: Duration ,
1112} ;
1213
1314use anyhow:: anyhow;
1415use ethportal_api:: {
1516 jsonrpsee:: http_client:: { HttpClient , HttpClientBuilder } ,
16- types:: { cli:: DEFAULT_WEB3_HTTP_ADDRESS , network:: Subnetwork , query_trace:: QueryTrace } ,
17+ types:: {
18+ distance:: XorMetric ,
19+ network:: Subnetwork ,
20+ portal_wire:: { Content , OfferTrace } ,
21+ } ,
1722 utils:: bytes:: hex_decode,
1823 BlockBodyKey , BlockReceiptsKey , ContentValue , HistoryContentKey , HistoryContentValue ,
24+ OverlayContentKey ,
1925} ;
20- use futures:: { channel :: oneshot , future:: join_all} ;
26+ use futures:: future:: join_all;
2127use portal_bridge:: census:: Census ;
22- use portalnet:: overlay:: { command:: OverlayCommand , config:: FindContentConfig } ;
23- use tokio:: sync:: mpsc:: UnboundedSender ;
24- use tracing:: { error, info, warn} ;
28+ use portalnet:: { constants:: DEFAULT_WEB3_HTTP_ADDRESS , overlay:: protocol:: OverlayProtocol } ;
29+ use ssz_types:: BitList ;
30+ use tracing:: { info, warn} ;
31+
32+ use crate :: { storage:: HistoryStorage , validation:: ChainHistoryValidator } ;
2533
2634/// The number of blocks to download in a single batch.
27- const BATCH_SIZE : usize = 3 ;
35+ const BATCH_SIZE : usize = 100 ;
36+ /// The max number of ENRs to send FindContent queries to.
37+ const CENSUS_ENR_LIMIT : usize = 4 ;
2838/// The path to the CSV file with block numbers and block hashes.
2939const CSV_PATH : & str = "ethereum_blocks_14000000_merge.csv" ;
3040
41+ enum ContentType {
42+ BlockBody ,
43+ BlockReceipts ,
44+ }
45+
46+ impl Display for ContentType {
47+ fn fmt ( & self , f : & mut Formatter < ' _ > ) -> std:: fmt:: Result {
48+ match self {
49+ ContentType :: BlockBody => write ! ( f, "BlockBody" ) ,
50+ ContentType :: BlockReceipts => write ! ( f, "BlockReceipts" ) ,
51+ }
52+ }
53+ }
54+
3155#[ derive( Clone ) ]
3256pub struct Downloader {
3357 pub census : Census ,
34- pub overlay_tx : UnboundedSender < OverlayCommand < HistoryContentKey > > ,
58+ pub overlay_arc :
59+ Arc < OverlayProtocol < HistoryContentKey , XorMetric , ChainHistoryValidator , HistoryStorage > > ,
3560}
3661
3762impl Downloader {
38- pub fn new ( overlay_tx : UnboundedSender < OverlayCommand < HistoryContentKey > > ) -> Self {
63+ pub fn new (
64+ overlay_arc : Arc <
65+ OverlayProtocol < HistoryContentKey , XorMetric , ChainHistoryValidator , HistoryStorage > ,
66+ > ,
67+ ) -> Self {
68+ // Build hhtp client bound to the current node web3rpc
3969 let http_client: HttpClient = HttpClientBuilder :: default ( )
4070 // increase default timeout to allow for trace_gossip requests that can take a long
4171 // time
@@ -44,9 +74,11 @@ impl Downloader {
4474 . map_err ( |e| e. to_string ( ) )
4575 . expect ( "Failed to build http client" ) ;
4676
47- // BUild hhtp client binded to the current node web3rpc
48- let census = Census :: new ( http_client, 0 , vec ! [ ] ) ;
49- Self { overlay_tx, census }
77+ let census = Census :: new ( http_client, CENSUS_ENR_LIMIT , vec ! [ ] ) ;
78+ Self {
79+ overlay_arc,
80+ census,
81+ }
5082 }
5183
5284 pub async fn start ( mut self ) -> io:: Result < ( ) > {
@@ -57,7 +89,6 @@ impl Downloader {
5789 let reader = io:: BufReader :: new ( file) ;
5890 info ! ( "Reading CSV file" ) ;
5991 let lines: Vec < _ > = reader. lines ( ) . collect :: < Result < _ , _ > > ( ) ?;
60- // Create a hash table in memory with all the block hashes and block numbers
6192 info ! ( "Parsing CSV file" ) ;
6293 // skip the header of the csv file
6394 let lines = & lines[ 1 ..] ;
@@ -89,17 +120,17 @@ impl Downloader {
89120
90121 for ( block_number, block_hash) in batch {
91122 let block_body_content_key = generate_block_body_content_key ( block_hash. clone ( ) ) ;
92- futures. push ( self . find_content ( block_body_content_key , block_number ) ) ;
93- info ! (
94- block_number = block_number ,
95- "Sent FindContent query for block body"
96- ) ;
123+ futures. push ( self . find_content (
124+ block_body_content_key ,
125+ block_number,
126+ ContentType :: BlockBody ,
127+ ) ) ;
97128 let block_receipts_content_key = generate_block_receipts_content_key ( block_hash) ;
98- futures. push ( self . find_content ( block_receipts_content_key , block_number ) ) ;
99- info ! (
100- block_number = block_number ,
101- "Sent FindContent query for block receipts"
102- ) ;
129+ futures. push ( self . find_content (
130+ block_receipts_content_key ,
131+ block_number,
132+ ContentType :: BlockReceipts ,
133+ ) ) ;
103134 }
104135 join_all ( futures) . await ;
105136 }
@@ -108,56 +139,116 @@ impl Downloader {
108139 & self ,
109140 content_key : HistoryContentKey ,
110141 block_number : u64 ,
142+ content_type : ContentType ,
111143 ) -> anyhow:: Result < ( ) > {
112- let ( tx, rx) = oneshot:: channel ( ) ;
113-
114- let overlay_command = OverlayCommand :: FindContentQuery {
115- target : content_key. clone ( ) ,
116- callback : tx,
117- config : FindContentConfig {
118- is_trace : true ,
119- ..Default :: default ( )
120- } ,
144+ // Select interested peers from the census
145+ let enrs = self
146+ . census
147+ . select_peers ( Subnetwork :: History , & content_key. content_id ( ) )
148+ . expect ( "Failed to select peers" ) ;
149+ // Send FindContent query to the interested peers
150+ if enrs. is_empty ( ) {
151+ warn ! (
152+ block_number = block_number,
153+ content_type = %content_type,
154+ "No peers found for block. Skipping"
155+ ) ;
156+ return Err ( anyhow ! ( "No peers found for block {block_number}" ) ) ;
121157 } ;
122158
123- if let Err ( err) = self . overlay_tx . send ( overlay_command) {
124- warn ! (
125- error = %err,
126- "Error submitting FindContent query to service"
159+ for ( index, enr) in enrs. iter ( ) . enumerate ( ) {
160+ info ! (
161+ block_number = block_number,
162+ content_type = %content_type,
163+ peer_index = index,
164+ "Sending FindContent query to peer"
127165 ) ;
128- }
129- match rx. await {
130- Ok ( result) => match result {
131- Ok ( result) => {
132- HistoryContentValue :: decode ( & content_key, & result. 0 ) ?;
133- let duration_ms = QueryTrace :: timestamp_millis_u64 (
134- result. 2 . expect ( "QueryTrace not found" ) . started_at_ms ,
135- ) ;
136- info ! (
166+
167+ let result = self
168+ . overlay_arc
169+ . send_find_content ( enr. clone ( ) , content_key. to_bytes ( ) )
170+ . await ?;
171+ let content = result. 0 ;
172+
173+ match content {
174+ Content :: ConnectionId ( _) => {
175+ // Should not return connection ID, should always return the content
176+ warn ! (
137177 block_number = block_number,
138- query_duration = duration_ms ,
139- "Downloaded content for block "
178+ content_type = %content_type ,
179+ "Received ConnectionId content "
140180 ) ;
141- Ok ( ( ) )
181+ self . census . record_offer_result (
182+ Subnetwork :: History ,
183+ enr. node_id ( ) ,
184+ 0 ,
185+ Duration :: from_secs ( 0 ) ,
186+ & OfferTrace :: Failed ,
187+ ) ;
188+ continue ;
142189 }
143- Err ( err) => {
144- error ! (
190+ Content :: Content ( content_bytes) => {
191+ let content = HistoryContentValue :: decode ( & content_key, & content_bytes) ;
192+
193+ match content {
194+ Ok ( _) => {
195+ info ! (
196+ block_number = block_number,
197+ content_type = %content_type,
198+ "Received content from peer"
199+ ) ;
200+ self . census . record_offer_result (
201+ Subnetwork :: History ,
202+ enr. node_id ( ) ,
203+ content_bytes. len ( ) ,
204+ Duration :: from_secs ( 0 ) ,
205+ & OfferTrace :: Success (
206+ BitList :: with_capacity ( 1 ) . expect ( "Failed to create bitlist" ) ,
207+ ) ,
208+ ) ;
209+ return Ok ( ( ) ) ;
210+ }
211+ Err ( _) => {
212+ warn ! (
213+ block_number = block_number,
214+ content_type = %content_type,
215+ "Failed to parse content from peer, invalid content"
216+ ) ;
217+ self . census . record_offer_result (
218+ Subnetwork :: History ,
219+ enr. node_id ( ) ,
220+ 0 ,
221+ Duration :: from_secs ( 0 ) ,
222+ & OfferTrace :: Failed ,
223+ ) ;
224+ continue ;
225+ }
226+ }
227+ }
228+ Content :: Enrs ( _) => {
229+ // Content not found
230+ warn ! (
145231 block_number = block_number,
146- error = %err,
147- "Error in FindContent query"
232+ content_type = %content_type,
233+ "Received Enrs content, content not found from peer"
234+ ) ;
235+ self . census . record_offer_result (
236+ Subnetwork :: History ,
237+ enr. node_id ( ) ,
238+ 0 ,
239+ Duration :: from_secs ( 0 ) ,
240+ & OfferTrace :: Failed ,
148241 ) ;
149- Err ( anyhow ! ( "Error in FindContent query: {:?}" , err ) )
242+ continue ;
150243 }
151- } ,
152- Err ( err) => {
153- error ! (
154- block_number = block_number,
155- error = %err,
156- "Error receiving FindContent query response"
157- ) ;
158- Err ( err. into ( ) )
159244 }
160245 }
246+ warn ! (
247+ block_number = block_number,
248+ content_type = %content_type,
249+ "Failed to find content for block"
250+ ) ;
251+ Err ( anyhow ! ( "Failed to find content for block" ) )
161252 }
162253}
163254
0 commit comments