1
1
/// Downloader struct that load a data CSV file from disk with block number and block hashes
2
2
/// and do FIndContent queries in batches to download all the content from the csv file.
3
3
/// We don't save the content to disk, we just download it and drop
4
- /// it. But we need to measure the time it takes to download all the content, the number of
5
- /// queries and the number of bytes downloaded, the data ingress rate and the query rate.
4
+ /// it.
6
5
use std:: fs:: File ;
7
6
use std:: {
7
+ fmt:: { Display , Formatter } ,
8
8
io:: { self , BufRead } ,
9
9
path:: Path ,
10
+ sync:: Arc ,
10
11
time:: Duration ,
11
12
} ;
12
13
13
14
use anyhow:: anyhow;
14
15
use ethportal_api:: {
15
16
jsonrpsee:: http_client:: { HttpClient , HttpClientBuilder } ,
16
- types:: { cli:: DEFAULT_WEB3_HTTP_ADDRESS , network:: Subnetwork , query_trace:: QueryTrace } ,
17
+ types:: {
18
+ distance:: XorMetric ,
19
+ network:: Subnetwork ,
20
+ portal_wire:: { Content , OfferTrace } ,
21
+ } ,
17
22
utils:: bytes:: hex_decode,
18
23
BlockBodyKey , BlockReceiptsKey , ContentValue , HistoryContentKey , HistoryContentValue ,
24
+ OverlayContentKey ,
19
25
} ;
20
- use futures:: { channel :: oneshot , future:: join_all} ;
26
+ use futures:: future:: join_all;
21
27
use portal_bridge:: census:: Census ;
22
- use portalnet:: overlay:: { command:: OverlayCommand , config:: FindContentConfig } ;
23
- use tokio:: sync:: mpsc:: UnboundedSender ;
24
- use tracing:: { error, info, warn} ;
28
+ use portalnet:: { constants:: DEFAULT_WEB3_HTTP_ADDRESS , overlay:: protocol:: OverlayProtocol } ;
29
+ use ssz_types:: BitList ;
30
+ use tracing:: { info, warn} ;
31
+
32
+ use crate :: { storage:: HistoryStorage , validation:: ChainHistoryValidator } ;
25
33
26
34
/// The number of blocks to download in a single batch.
27
- const BATCH_SIZE : usize = 3 ;
35
+ const BATCH_SIZE : usize = 100 ;
36
+ /// The max number of ENRs to send FindContent queries to.
37
+ const CENSUS_ENR_LIMIT : usize = 4 ;
28
38
/// The path to the CSV file with block numbers and block hashes.
29
39
const CSV_PATH : & str = "ethereum_blocks_14000000_merge.csv" ;
30
40
41
+ enum ContentType {
42
+ BlockBody ,
43
+ BlockReceipts ,
44
+ }
45
+
46
+ impl Display for ContentType {
47
+ fn fmt ( & self , f : & mut Formatter < ' _ > ) -> std:: fmt:: Result {
48
+ match self {
49
+ ContentType :: BlockBody => write ! ( f, "BlockBody" ) ,
50
+ ContentType :: BlockReceipts => write ! ( f, "BlockReceipts" ) ,
51
+ }
52
+ }
53
+ }
54
+
31
55
#[ derive( Clone ) ]
32
56
pub struct Downloader {
33
57
pub census : Census ,
34
- pub overlay_tx : UnboundedSender < OverlayCommand < HistoryContentKey > > ,
58
+ pub overlay_arc :
59
+ Arc < OverlayProtocol < HistoryContentKey , XorMetric , ChainHistoryValidator , HistoryStorage > > ,
35
60
}
36
61
37
62
impl Downloader {
38
- pub fn new ( overlay_tx : UnboundedSender < OverlayCommand < HistoryContentKey > > ) -> Self {
63
+ pub fn new (
64
+ overlay_arc : Arc <
65
+ OverlayProtocol < HistoryContentKey , XorMetric , ChainHistoryValidator , HistoryStorage > ,
66
+ > ,
67
+ ) -> Self {
68
+ // Build hhtp client bound to the current node web3rpc
39
69
let http_client: HttpClient = HttpClientBuilder :: default ( )
40
70
// increase default timeout to allow for trace_gossip requests that can take a long
41
71
// time
@@ -44,9 +74,11 @@ impl Downloader {
44
74
. map_err ( |e| e. to_string ( ) )
45
75
. expect ( "Failed to build http client" ) ;
46
76
47
- // BUild hhtp client binded to the current node web3rpc
48
- let census = Census :: new ( http_client, 0 , vec ! [ ] ) ;
49
- Self { overlay_tx, census }
77
+ let census = Census :: new ( http_client, CENSUS_ENR_LIMIT , vec ! [ ] ) ;
78
+ Self {
79
+ overlay_arc,
80
+ census,
81
+ }
50
82
}
51
83
52
84
pub async fn start ( mut self ) -> io:: Result < ( ) > {
@@ -57,7 +89,6 @@ impl Downloader {
57
89
let reader = io:: BufReader :: new ( file) ;
58
90
info ! ( "Reading CSV file" ) ;
59
91
let lines: Vec < _ > = reader. lines ( ) . collect :: < Result < _ , _ > > ( ) ?;
60
- // Create a hash table in memory with all the block hashes and block numbers
61
92
info ! ( "Parsing CSV file" ) ;
62
93
// skip the header of the csv file
63
94
let lines = & lines[ 1 ..] ;
@@ -89,17 +120,17 @@ impl Downloader {
89
120
90
121
for ( block_number, block_hash) in batch {
91
122
let block_body_content_key = generate_block_body_content_key ( block_hash. clone ( ) ) ;
92
- futures. push ( self . find_content ( block_body_content_key , block_number ) ) ;
93
- info ! (
94
- block_number = block_number ,
95
- "Sent FindContent query for block body"
96
- ) ;
123
+ futures. push ( self . find_content (
124
+ block_body_content_key ,
125
+ block_number,
126
+ ContentType :: BlockBody ,
127
+ ) ) ;
97
128
let block_receipts_content_key = generate_block_receipts_content_key ( block_hash) ;
98
- futures. push ( self . find_content ( block_receipts_content_key , block_number ) ) ;
99
- info ! (
100
- block_number = block_number ,
101
- "Sent FindContent query for block receipts"
102
- ) ;
129
+ futures. push ( self . find_content (
130
+ block_receipts_content_key ,
131
+ block_number,
132
+ ContentType :: BlockReceipts ,
133
+ ) ) ;
103
134
}
104
135
join_all ( futures) . await ;
105
136
}
@@ -108,56 +139,116 @@ impl Downloader {
108
139
& self ,
109
140
content_key : HistoryContentKey ,
110
141
block_number : u64 ,
142
+ content_type : ContentType ,
111
143
) -> anyhow:: Result < ( ) > {
112
- let ( tx, rx) = oneshot:: channel ( ) ;
113
-
114
- let overlay_command = OverlayCommand :: FindContentQuery {
115
- target : content_key. clone ( ) ,
116
- callback : tx,
117
- config : FindContentConfig {
118
- is_trace : true ,
119
- ..Default :: default ( )
120
- } ,
144
+ // Select interested peers from the census
145
+ let enrs = self
146
+ . census
147
+ . select_peers ( Subnetwork :: History , & content_key. content_id ( ) )
148
+ . expect ( "Failed to select peers" ) ;
149
+ // Send FindContent query to the interested peers
150
+ if enrs. is_empty ( ) {
151
+ warn ! (
152
+ block_number = block_number,
153
+ content_type = %content_type,
154
+ "No peers found for block. Skipping"
155
+ ) ;
156
+ return Err ( anyhow ! ( "No peers found for block {block_number}" ) ) ;
121
157
} ;
122
158
123
- if let Err ( err) = self . overlay_tx . send ( overlay_command) {
124
- warn ! (
125
- error = %err,
126
- "Error submitting FindContent query to service"
159
+ for ( index, enr) in enrs. iter ( ) . enumerate ( ) {
160
+ info ! (
161
+ block_number = block_number,
162
+ content_type = %content_type,
163
+ peer_index = index,
164
+ "Sending FindContent query to peer"
127
165
) ;
128
- }
129
- match rx. await {
130
- Ok ( result) => match result {
131
- Ok ( result) => {
132
- HistoryContentValue :: decode ( & content_key, & result. 0 ) ?;
133
- let duration_ms = QueryTrace :: timestamp_millis_u64 (
134
- result. 2 . expect ( "QueryTrace not found" ) . started_at_ms ,
135
- ) ;
136
- info ! (
166
+
167
+ let result = self
168
+ . overlay_arc
169
+ . send_find_content ( enr. clone ( ) , content_key. to_bytes ( ) )
170
+ . await ?;
171
+ let content = result. 0 ;
172
+
173
+ match content {
174
+ Content :: ConnectionId ( _) => {
175
+ // Should not return connection ID, should always return the content
176
+ warn ! (
137
177
block_number = block_number,
138
- query_duration = duration_ms ,
139
- "Downloaded content for block "
178
+ content_type = %content_type ,
179
+ "Received ConnectionId content "
140
180
) ;
141
- Ok ( ( ) )
181
+ self . census . record_offer_result (
182
+ Subnetwork :: History ,
183
+ enr. node_id ( ) ,
184
+ 0 ,
185
+ Duration :: from_secs ( 0 ) ,
186
+ & OfferTrace :: Failed ,
187
+ ) ;
188
+ continue ;
142
189
}
143
- Err ( err) => {
144
- error ! (
190
+ Content :: Content ( content_bytes) => {
191
+ let content = HistoryContentValue :: decode ( & content_key, & content_bytes) ;
192
+
193
+ match content {
194
+ Ok ( _) => {
195
+ info ! (
196
+ block_number = block_number,
197
+ content_type = %content_type,
198
+ "Received content from peer"
199
+ ) ;
200
+ self . census . record_offer_result (
201
+ Subnetwork :: History ,
202
+ enr. node_id ( ) ,
203
+ content_bytes. len ( ) ,
204
+ Duration :: from_secs ( 0 ) ,
205
+ & OfferTrace :: Success (
206
+ BitList :: with_capacity ( 1 ) . expect ( "Failed to create bitlist" ) ,
207
+ ) ,
208
+ ) ;
209
+ return Ok ( ( ) ) ;
210
+ }
211
+ Err ( _) => {
212
+ warn ! (
213
+ block_number = block_number,
214
+ content_type = %content_type,
215
+ "Failed to parse content from peer, invalid content"
216
+ ) ;
217
+ self . census . record_offer_result (
218
+ Subnetwork :: History ,
219
+ enr. node_id ( ) ,
220
+ 0 ,
221
+ Duration :: from_secs ( 0 ) ,
222
+ & OfferTrace :: Failed ,
223
+ ) ;
224
+ continue ;
225
+ }
226
+ }
227
+ }
228
+ Content :: Enrs ( _) => {
229
+ // Content not found
230
+ warn ! (
145
231
block_number = block_number,
146
- error = %err,
147
- "Error in FindContent query"
232
+ content_type = %content_type,
233
+ "Received Enrs content, content not found from peer"
234
+ ) ;
235
+ self . census . record_offer_result (
236
+ Subnetwork :: History ,
237
+ enr. node_id ( ) ,
238
+ 0 ,
239
+ Duration :: from_secs ( 0 ) ,
240
+ & OfferTrace :: Failed ,
148
241
) ;
149
- Err ( anyhow ! ( "Error in FindContent query: {:?}" , err ) )
242
+ continue ;
150
243
}
151
- } ,
152
- Err ( err) => {
153
- error ! (
154
- block_number = block_number,
155
- error = %err,
156
- "Error receiving FindContent query response"
157
- ) ;
158
- Err ( err. into ( ) )
159
244
}
160
245
}
246
+ warn ! (
247
+ block_number = block_number,
248
+ content_type = %content_type,
249
+ "Failed to find content for block"
250
+ ) ;
251
+ Err ( anyhow ! ( "Failed to find content for block" ) )
161
252
}
162
253
}
163
254
0 commit comments