@@ -11,7 +11,7 @@ use std::{
11
11
time:: Duration ,
12
12
} ;
13
13
14
- use anyhow:: anyhow;
14
+ use anyhow:: { anyhow, Error } ;
15
15
use ethportal_api:: {
16
16
jsonrpsee:: http_client:: { HttpClient , HttpClientBuilder } ,
17
17
types:: {
@@ -23,17 +23,23 @@ use ethportal_api::{
23
23
BlockBodyKey , BlockReceiptsKey , ContentValue , HistoryContentKey , HistoryContentValue ,
24
24
OverlayContentKey ,
25
25
} ;
26
- use futures:: future:: join_all;
26
+ use futures:: { channel :: oneshot , future:: join_all} ;
27
27
use portal_bridge:: census:: Census ;
28
- use portalnet:: { constants:: DEFAULT_WEB3_HTTP_ADDRESS , overlay:: protocol:: OverlayProtocol } ;
28
+ use portalnet:: {
29
+ constants:: DEFAULT_WEB3_HTTP_ADDRESS ,
30
+ overlay:: { command:: OverlayCommand , protocol:: OverlayProtocol } ,
31
+ } ;
29
32
use ssz_types:: BitList ;
30
- use tracing:: { info, warn} ;
33
+ use tracing:: { error , info, warn} ;
31
34
use trin_metrics:: downloader:: DownloaderMetricsReporter ;
32
35
33
36
use crate :: { storage:: HistoryStorage , validation:: ChainHistoryValidator } ;
34
37
35
38
/// The number of blocks to download in a single batch.
36
- const BATCH_SIZE : usize = 100 ;
39
+ const BATCH_SIZE : usize = 30 ;
40
+ /// Enable census with full view of the network and peer scoring to find peers to download content
41
+ /// from.
42
+ const CENSUS : bool = true ;
37
43
/// The max number of ENRs to send FindContent queries to.
38
44
const CENSUS_ENR_LIMIT : usize = 4 ;
39
45
/// The path to the CSV file with block numbers and block hashes.
@@ -55,7 +61,7 @@ impl Display for ContentType {
55
61
56
62
#[ derive( Clone ) ]
57
63
pub struct Downloader {
58
- pub census : Census ,
64
+ pub census : Option < Census > ,
59
65
pub overlay_arc :
60
66
Arc < OverlayProtocol < HistoryContentKey , XorMetric , ChainHistoryValidator , HistoryStorage > > ,
61
67
pub metrics : DownloaderMetricsReporter ,
@@ -78,15 +84,23 @@ impl Downloader {
78
84
79
85
let metrics = DownloaderMetricsReporter :: new ( ) ;
80
86
81
- let census = Census :: new ( http_client, CENSUS_ENR_LIMIT , vec ! [ ] ) ;
87
+ let mut census = None ;
88
+
89
+ if CENSUS {
90
+ info ! ( "Census enabled" ) ;
91
+ census = Some ( Census :: new ( http_client, CENSUS_ENR_LIMIT , vec ! [ ] ) ) ;
92
+ } else {
93
+ info ! ( "Census disabled" ) ;
94
+ }
95
+
82
96
Self {
83
97
overlay_arc,
84
98
census,
85
99
metrics,
86
100
}
87
101
}
88
102
89
- pub async fn start ( mut self ) -> io:: Result < ( ) > {
103
+ pub async fn start ( self ) -> io:: Result < ( ) > {
90
104
// set the csv path to a file in the root trin-history directory
91
105
info ! ( "Opening CSV file" ) ;
92
106
let csv_path = Path :: new ( CSV_PATH ) ;
@@ -98,13 +112,15 @@ impl Downloader {
98
112
// skip the header of the csv file
99
113
let lines = & lines[ 1 ..] ;
100
114
let blocks: Vec < ( u64 , String ) > = lines. iter ( ) . map ( |line| parse_line ( line) ) . collect ( ) ;
101
- // Initialize the census with the history subnetwork
102
- let _ = Some (
103
- self . census
104
- . init ( [ Subnetwork :: History ] )
105
- . await
106
- . expect ( "Failed to initialize Census" ) ,
107
- ) ;
115
+ // Initialize the census with the history subnetwork if enabled
116
+ if let Some ( mut census) = self . census . clone ( ) {
117
+ let _ = Some (
118
+ census
119
+ . init ( [ Subnetwork :: History ] )
120
+ . await
121
+ . expect ( "Failed to initialize Census" ) ,
122
+ ) ;
123
+ }
108
124
109
125
info ! ( "Processing blocks" ) ;
110
126
let batches = blocks. chunks ( BATCH_SIZE ) ;
@@ -147,9 +163,25 @@ impl Downloader {
147
163
block_number : u64 ,
148
164
content_type : ContentType ,
149
165
) -> anyhow:: Result < ( ) > {
166
+ if CENSUS {
167
+ self . find_content_census ( & content_key, block_number, content_type)
168
+ . await
169
+ } else {
170
+ self . recursive_find_content ( content_key, block_number, content_type)
171
+ . await
172
+ }
173
+ }
174
+
175
+ /// Send FindContent queries to the interested peers in the census, includes peers scoring
176
+ async fn find_content_census (
177
+ & self ,
178
+ content_key : & HistoryContentKey ,
179
+ block_number : u64 ,
180
+ content_type : ContentType ,
181
+ ) -> Result < ( ) , Error > {
182
+ let census = self . census . clone ( ) . expect ( "census should be enabled" ) ;
150
183
// Select interested peers from the census
151
- let enrs = self
152
- . census
184
+ let enrs = census
153
185
. select_peers ( Subnetwork :: History , & content_key. content_id ( ) )
154
186
. expect ( "Failed to select peers" ) ;
155
187
// Send FindContent query to the interested peers
@@ -184,7 +216,7 @@ impl Downloader {
184
216
content_type = %content_type,
185
217
"Received ConnectionId content"
186
218
) ;
187
- self . census . record_offer_result (
219
+ census. record_offer_result (
188
220
Subnetwork :: History ,
189
221
enr. node_id ( ) ,
190
222
0 ,
@@ -194,7 +226,7 @@ impl Downloader {
194
226
continue ;
195
227
}
196
228
Content :: Content ( content_bytes) => {
197
- let content = HistoryContentValue :: decode ( & content_key, & content_bytes) ;
229
+ let content = HistoryContentValue :: decode ( content_key, & content_bytes) ;
198
230
199
231
match content {
200
232
Ok ( _) => {
@@ -203,7 +235,7 @@ impl Downloader {
203
235
content_type = %content_type,
204
236
"Received content from peer"
205
237
) ;
206
- self . census . record_offer_result (
238
+ census. record_offer_result (
207
239
Subnetwork :: History ,
208
240
enr. node_id ( ) ,
209
241
content_bytes. len ( ) ,
@@ -220,7 +252,7 @@ impl Downloader {
220
252
content_type = %content_type,
221
253
"Failed to parse content from peer, invalid content"
222
254
) ;
223
- self . census . record_offer_result (
255
+ census. record_offer_result (
224
256
Subnetwork :: History ,
225
257
enr. node_id ( ) ,
226
258
0 ,
@@ -238,7 +270,7 @@ impl Downloader {
238
270
content_type = %content_type,
239
271
"Received Enrs content, content not found from peer"
240
272
) ;
241
- self . census . record_offer_result (
273
+ census. record_offer_result (
242
274
Subnetwork :: History ,
243
275
enr. node_id ( ) ,
244
276
0 ,
@@ -256,6 +288,58 @@ impl Downloader {
256
288
) ;
257
289
Err ( anyhow ! ( "Failed to find content for block" ) )
258
290
}
291
+
292
+ /// Send recursive FindContent queries to the overlay service
293
+ async fn recursive_find_content (
294
+ & self ,
295
+ content_key : HistoryContentKey ,
296
+ block_number : u64 ,
297
+ content_type : ContentType ,
298
+ ) -> anyhow:: Result < ( ) > {
299
+ let ( tx, rx) = oneshot:: channel ( ) ;
300
+
301
+ let overlay_command = OverlayCommand :: FindContentQuery {
302
+ target : content_key. clone ( ) ,
303
+ callback : tx,
304
+ config : Default :: default ( ) ,
305
+ } ;
306
+
307
+ if let Err ( err) = self . overlay_arc . command_tx . send ( overlay_command) {
308
+ warn ! (
309
+ error = %err,
310
+ block_number = block_number,
311
+ content_type = %content_type,
312
+ "Error submitting FindContent query to service"
313
+ ) ;
314
+ }
315
+ match rx. await {
316
+ Ok ( result) => match result {
317
+ Ok ( result) => {
318
+ HistoryContentValue :: decode ( & content_key, & result. 0 ) ?;
319
+ info ! ( block_number = block_number, "Downloaded content for block" ) ;
320
+ Ok ( ( ) )
321
+ }
322
+ Err ( err) => {
323
+ error ! (
324
+ block_number = block_number,
325
+ content_type = %content_type,
326
+ error = %err,
327
+ "Error in FindContent query"
328
+ ) ;
329
+ Err ( anyhow ! ( "Error in FindContent query: {:?}" , err) )
330
+ }
331
+ } ,
332
+ Err ( err) => {
333
+ error ! (
334
+ block_number = block_number,
335
+ content_type = %content_type,
336
+ error = %err,
337
+ "Error receiving FindContent query response"
338
+ ) ;
339
+ Err ( err. into ( ) )
340
+ }
341
+ }
342
+ }
259
343
}
260
344
261
345
fn parse_line ( line : & str ) -> ( u64 , String ) {
0 commit comments