55/// queries and the number of bytes downloaded, the data ingress rate and the query rate.
66use std:: fs:: File ;
77use std:: {
8+ fmt:: { Display , Formatter } ,
89 io:: { self , BufRead } ,
910 path:: Path ,
11+ sync:: Arc ,
1012 time:: Duration ,
1113} ;
1214
1315use anyhow:: anyhow;
1416use ethportal_api:: {
1517 jsonrpsee:: http_client:: { HttpClient , HttpClientBuilder } ,
16- types:: { cli:: DEFAULT_WEB3_HTTP_ADDRESS , network:: Subnetwork , query_trace:: QueryTrace } ,
18+ types:: {
19+ distance:: XorMetric ,
20+ network:: Subnetwork ,
21+ portal_wire:: { Content , OfferTrace } ,
22+ } ,
1723 utils:: bytes:: hex_decode,
1824 BlockBodyKey , BlockReceiptsKey , ContentValue , HistoryContentKey , HistoryContentValue ,
25+ OverlayContentKey ,
1926} ;
20- use futures:: { channel :: oneshot , future:: join_all} ;
27+ use futures:: future:: join_all;
2128use portal_bridge:: census:: Census ;
22- use portalnet:: overlay:: { command:: OverlayCommand , config:: FindContentConfig } ;
23- use tokio:: sync:: mpsc:: UnboundedSender ;
24- use tracing:: { error, info, warn} ;
29+ use portalnet:: { constants:: DEFAULT_WEB3_HTTP_ADDRESS , overlay:: protocol:: OverlayProtocol } ;
30+ use ssz_types:: BitList ;
31+ use tracing:: { info, warn} ;
32+
33+ use crate :: { storage:: HistoryStorage , validation:: ChainHistoryValidator } ;
2534
2635/// The number of blocks to download in a single batch.
27- const BATCH_SIZE : usize = 3 ;
36+ const BATCH_SIZE : usize = 100 ;
37+ /// The max number of ENRs to send FindContent queries to.
38+ const CENSUS_ENR_LIMIT : usize = 4 ;
2839/// The path to the CSV file with block numbers and block hashes.
2940const CSV_PATH : & str = "ethereum_blocks_14000000_merge.csv" ;
3041
42+ enum ContentType {
43+ BlockBody ,
44+ BlockReceipts ,
45+ }
46+
47+ impl Display for ContentType {
48+ fn fmt ( & self , f : & mut Formatter < ' _ > ) -> std:: fmt:: Result {
49+ match self {
50+ ContentType :: BlockBody => write ! ( f, "BlockBody" ) ,
51+ ContentType :: BlockReceipts => write ! ( f, "BlockReceipts" ) ,
52+ }
53+ }
54+ }
55+
3156#[ derive( Clone ) ]
3257pub struct Downloader {
3358 pub census : Census ,
34- pub overlay_tx : UnboundedSender < OverlayCommand < HistoryContentKey > > ,
59+ pub overlay_arc :
60+ Arc < OverlayProtocol < HistoryContentKey , XorMetric , ChainHistoryValidator , HistoryStorage > > ,
3561}
3662
3763impl Downloader {
38- pub fn new ( overlay_tx : UnboundedSender < OverlayCommand < HistoryContentKey > > ) -> Self {
64+ pub fn new (
65+ overlay_arc : Arc <
66+ OverlayProtocol < HistoryContentKey , XorMetric , ChainHistoryValidator , HistoryStorage > ,
67+ > ,
68+ ) -> Self {
3969 let http_client: HttpClient = HttpClientBuilder :: default ( )
4070 // increase default timeout to allow for trace_gossip requests that can take a long
4171 // time
@@ -45,8 +75,11 @@ impl Downloader {
4575 . expect ( "Failed to build http client" ) ;
4676
4777 // BUild hhtp client binded to the current node web3rpc
48- let census = Census :: new ( http_client, 0 , vec ! [ ] ) ;
49- Self { overlay_tx, census }
78+ let census = Census :: new ( http_client, CENSUS_ENR_LIMIT , vec ! [ ] ) ;
79+ Self {
80+ overlay_arc,
81+ census,
82+ }
5083 }
5184
5285 pub async fn start ( mut self ) -> io:: Result < ( ) > {
@@ -89,17 +122,17 @@ impl Downloader {
89122
90123 for ( block_number, block_hash) in batch {
91124 let block_body_content_key = generate_block_body_content_key ( block_hash. clone ( ) ) ;
92- futures. push ( self . find_content ( block_body_content_key , block_number ) ) ;
93- info ! (
94- block_number = block_number ,
95- "Sent FindContent query for block body"
96- ) ;
125+ futures. push ( self . find_content (
126+ block_body_content_key ,
127+ block_number,
128+ ContentType :: BlockBody ,
129+ ) ) ;
97130 let block_receipts_content_key = generate_block_receipts_content_key ( block_hash) ;
98- futures. push ( self . find_content ( block_receipts_content_key , block_number ) ) ;
99- info ! (
100- block_number = block_number ,
101- "Sent FindContent query for block receipts"
102- ) ;
131+ futures. push ( self . find_content (
132+ block_receipts_content_key ,
133+ block_number,
134+ ContentType :: BlockReceipts ,
135+ ) ) ;
103136 }
104137 join_all ( futures) . await ;
105138 }
@@ -108,56 +141,116 @@ impl Downloader {
108141 & self ,
109142 content_key : HistoryContentKey ,
110143 block_number : u64 ,
144+ content_type : ContentType ,
111145 ) -> anyhow:: Result < ( ) > {
112- let ( tx, rx) = oneshot:: channel ( ) ;
113-
114- let overlay_command = OverlayCommand :: FindContentQuery {
115- target : content_key. clone ( ) ,
116- callback : tx,
117- config : FindContentConfig {
118- is_trace : true ,
119- ..Default :: default ( )
120- } ,
146+ // 1. Select interested peers from the census
147+ let enrs = self
148+ . census
149+ . select_peers ( Subnetwork :: History , & content_key. content_id ( ) )
150+ . expect ( "Failed to select peers" ) ;
151+ // 2. Send FindContent query to the interested peers
152+ if enrs. is_empty ( ) {
153+ warn ! (
154+ block_number = block_number,
155+ content_type = %content_type,
156+ "No peers found for block. Skipping"
157+ ) ;
158+ return Err ( anyhow ! ( "No peers found for block {block_number}" ) ) ;
121159 } ;
122160
123- if let Err ( err) = self . overlay_tx . send ( overlay_command) {
124- warn ! (
125- error = %err,
126- "Error submitting FindContent query to service"
161+ for ( index, enr) in enrs. iter ( ) . enumerate ( ) {
162+ info ! (
163+ block_number = block_number,
164+ content_type = %content_type,
165+ peer_index = index,
166+ "Sending FindContent query to peer"
127167 ) ;
128- }
129- match rx. await {
130- Ok ( result) => match result {
131- Ok ( result) => {
132- HistoryContentValue :: decode ( & content_key, & result. 0 ) ?;
133- let duration_ms = QueryTrace :: timestamp_millis_u64 (
134- result. 2 . expect ( "QueryTrace not found" ) . started_at_ms ,
135- ) ;
136- info ! (
168+
169+ let result = self
170+ . overlay_arc
171+ . send_find_content ( enr. clone ( ) , content_key. to_bytes ( ) )
172+ . await ?;
173+ let content = result. 0 ;
174+
175+ match content {
176+ Content :: ConnectionId ( _) => {
177+ // Should not return connection ID, should always return the content
178+ warn ! (
137179 block_number = block_number,
138- query_duration = duration_ms ,
139- "Downloaded content for block "
180+ content_type = %content_type ,
181+ "Received ConnectionId content "
140182 ) ;
141- Ok ( ( ) )
183+ self . census . record_offer_result (
184+ Subnetwork :: History ,
185+ enr. node_id ( ) ,
186+ 0 ,
187+ Duration :: from_secs ( 0 ) ,
188+ & OfferTrace :: Failed ,
189+ ) ;
190+ continue ;
142191 }
143- Err ( err) => {
144- error ! (
192+ Content :: Content ( content_bytes) => {
193+ let content = HistoryContentValue :: decode ( & content_key, & content_bytes) ;
194+
195+ match content {
196+ Ok ( _) => {
197+ info ! (
198+ block_number = block_number,
199+ content_type = %content_type,
200+ "Received content from peer"
201+ ) ;
202+ self . census . record_offer_result (
203+ Subnetwork :: History ,
204+ enr. node_id ( ) ,
205+ content_bytes. len ( ) ,
206+ Duration :: from_secs ( 0 ) ,
207+ & OfferTrace :: Success (
208+ BitList :: with_capacity ( 1 ) . expect ( "Failed to create bitlist" ) ,
209+ ) ,
210+ ) ;
211+ return Ok ( ( ) ) ;
212+ }
213+ Err ( _) => {
214+ warn ! (
215+ block_number = block_number,
216+ content_type = %content_type,
217+ "Failed to parse content from peer, invalid content"
218+ ) ;
219+ self . census . record_offer_result (
220+ Subnetwork :: History ,
221+ enr. node_id ( ) ,
222+ 0 ,
223+ Duration :: from_secs ( 0 ) ,
224+ & OfferTrace :: Failed ,
225+ ) ;
226+ continue ;
227+ }
228+ }
229+ }
230+ Content :: Enrs ( _) => {
231+ // Content not found
232+ warn ! (
145233 block_number = block_number,
146- error = %err,
147- "Error in FindContent query"
234+ content_type = %content_type,
235+ "Received Enrs content, content not found from peer"
236+ ) ;
237+ self . census . record_offer_result (
238+ Subnetwork :: History ,
239+ enr. node_id ( ) ,
240+ 0 ,
241+ Duration :: from_secs ( 0 ) ,
242+ & OfferTrace :: Failed ,
148243 ) ;
149- Err ( anyhow ! ( "Error in FindContent query: {:?}" , err ) )
244+ continue ;
150245 }
151- } ,
152- Err ( err) => {
153- error ! (
154- block_number = block_number,
155- error = %err,
156- "Error receiving FindContent query response"
157- ) ;
158- Err ( err. into ( ) )
159246 }
160247 }
248+ warn ! (
249+ block_number = block_number,
250+ content_type = %content_type,
251+ "Failed to find content for block"
252+ ) ;
253+ Err ( anyhow ! ( "Failed to find content for block" ) )
161254 }
162255}
163256
0 commit comments