19
19
//! will take the raw data received here and extract meaningful results from it.
20
20
21
21
use std:: cmp;
22
- use std:: collections:: HashMap ;
22
+ use std:: collections:: { HashMap , BTreeSet } ;
23
23
use std:: marker:: PhantomData ;
24
24
use std:: sync:: Arc ;
25
25
26
26
use ethcore:: executed:: { Executed , ExecutionError } ;
27
27
28
- use futures:: { Poll , Future } ;
29
- use futures:: sync:: oneshot:: { self , Receiver , Canceled } ;
28
+ use futures:: { Poll , Future , Async } ;
29
+ use futures:: sync:: oneshot:: { self , Receiver } ;
30
30
use network:: PeerId ;
31
31
use parking_lot:: { RwLock , Mutex } ;
32
32
use rand;
33
+ use std:: time:: { Duration , SystemTime } ;
33
34
34
35
use net:: {
35
36
self , Handler , PeerStatus , Status , Capabilities ,
@@ -49,7 +50,45 @@ pub mod request;
49
50
/// The result of execution
50
51
pub type ExecutionResult = Result < Executed , ExecutionError > ;
51
52
53
+ /// The default number of retries for OnDemand queries to send to the other nodes
54
+ pub const DEFAULT_RETRY_COUNT : usize = 10 ;
55
+
56
+ /// The default time limit in milliseconds for inactive (no new peer to connect to) OnDemand queries (0 for unlimited)
57
+ pub const DEFAULT_QUERY_TIME_LIMIT : Duration = Duration :: from_millis ( 10000 ) ;
58
+
59
+ const NULL_DURATION : Duration = Duration :: from_secs ( 0 ) ;
60
+
61
+ /// OnDemand related errors
62
+ pub mod error {
63
+ use futures:: sync:: oneshot:: Canceled ;
64
+
65
+ error_chain ! {
66
+
67
+ foreign_links {
68
+ ChannelCanceled ( Canceled ) #[ doc = "Canceled oneshot channel" ] ;
69
+ }
70
+
71
+ errors {
72
+ #[ doc = "Max number of on-demand query attempts reached without result." ]
73
+ MaxAttemptReach ( query_index: usize ) {
74
+ description( "On-demand query limit reached" )
75
+ display( "On-demand query limit reached on query #{}" , query_index)
76
+ }
77
+
78
+ #[ doc = "No reply with current peer set, time out occured while waiting for new peers for additional query attempt." ]
79
+ TimeoutOnNewPeers ( query_index: usize , remaining_attempts: usize ) {
80
+ description( "Timeout for On-demand query" )
81
+ display( "Timeout for On-demand query; {} query attempts remain for query #{}" , remaining_attempts, query_index)
82
+ }
83
+
84
+ }
85
+
86
+ }
87
+
88
+ }
89
+
52
90
// relevant peer info.
91
+ #[ derive( Debug , Clone , PartialEq , Eq ) ]
53
92
struct Peer {
54
93
status : Status ,
55
94
capabilities : Capabilities ,
@@ -74,13 +113,21 @@ impl Peer {
74
113
}
75
114
}
76
115
116
+
117
+ /// Either an array of responses or a single error.
118
+ type PendingResponse = self :: error:: Result < Vec < Response > > ;
119
+
77
120
// Attempted request info and sender to put received value.
78
121
struct Pending {
79
122
requests : basic_request:: Batch < CheckedRequest > ,
80
123
net_requests : basic_request:: Batch < NetworkRequest > ,
81
124
required_capabilities : Capabilities ,
82
125
responses : Vec < Response > ,
83
- sender : oneshot:: Sender < Vec < Response > > ,
126
+ sender : oneshot:: Sender < PendingResponse > ,
127
+ base_query_index : usize ,
128
+ remaining_query_count : usize ,
129
+ query_id_history : BTreeSet < PeerId > ,
130
+ inactive_time_limit : Option < SystemTime > ,
84
131
}
85
132
86
133
impl Pending {
@@ -142,7 +189,9 @@ impl Pending {
142
189
// if the requests are complete, send the result and consume self.
143
190
fn try_complete ( self ) -> Option < Self > {
144
191
if self . requests . is_complete ( ) {
145
- let _ = self . sender . send ( self . responses ) ;
192
+ if self . sender . send ( Ok ( self . responses ) ) . is_err ( ) {
193
+ debug ! ( target: "on_demand" , "Dropped oneshot channel receiver on complete request at query #{}" , self . query_id_history. len( ) ) ;
194
+ }
146
195
None
147
196
} else {
148
197
Some ( self )
@@ -177,6 +226,25 @@ impl Pending {
177
226
self . net_requests = builder. build ( ) ;
178
227
self . required_capabilities = capabilities;
179
228
}
229
+
230
+ // returning no reponse, it will result in an error.
231
+ // self is consumed on purpose.
232
+ fn no_response ( self ) {
233
+ trace ! ( target: "on_demand" , "Dropping a pending query (no reply) at query #{}" , self . query_id_history. len( ) ) ;
234
+ let err = self :: error:: ErrorKind :: MaxAttemptReach ( self . requests . num_answered ( ) ) ;
235
+ if self . sender . send ( Err ( err. into ( ) ) ) . is_err ( ) {
236
+ debug ! ( target: "on_demand" , "Dropped oneshot channel receiver on no response" ) ;
237
+ }
238
+ }
239
+
240
+ // returning a peer discovery timeout during query attempts
241
+ fn time_out ( self ) {
242
+ trace ! ( target: "on_demand" , "Dropping a pending query (no new peer time out) at query #{}" , self . query_id_history. len( ) ) ;
243
+ let err = self :: error:: ErrorKind :: TimeoutOnNewPeers ( self . requests . num_answered ( ) , self . query_id_history . len ( ) ) ;
244
+ if self . sender . send ( Err ( err. into ( ) ) ) . is_err ( ) {
245
+ debug ! ( target: "on_demand" , "Dropped oneshot channel receiver on time out" ) ;
246
+ }
247
+ }
180
248
}
181
249
182
250
// helper to guess capabilities required for a given batch of network requests.
@@ -230,16 +298,21 @@ fn guess_capabilities(requests: &[CheckedRequest]) -> Capabilities {
230
298
/// A future extracting the concrete output type of the generic adapter
231
299
/// from a vector of responses.
232
300
pub struct OnResponses < T : request:: RequestAdapter > {
233
- receiver : Receiver < Vec < Response > > ,
301
+ receiver : Receiver < PendingResponse > ,
234
302
_marker : PhantomData < T > ,
235
303
}
236
304
237
305
impl < T : request:: RequestAdapter > Future for OnResponses < T > {
238
306
type Item = T :: Out ;
239
- type Error = Canceled ;
307
+ type Error = self :: error :: Error ;
240
308
241
309
fn poll ( & mut self ) -> Poll < Self :: Item , Self :: Error > {
242
- self . receiver . poll ( ) . map ( |async| async . map ( T :: extract_from) )
310
+ match self . receiver . poll ( ) {
311
+ Ok ( Async :: Ready ( Ok ( v) ) ) => Ok ( Async :: Ready ( T :: extract_from ( v) ) ) ,
312
+ Ok ( Async :: Ready ( Err ( e) ) ) => Err ( e) ,
313
+ Ok ( Async :: NotReady ) => Ok ( Async :: NotReady ) ,
314
+ Err ( e) => Err ( e. into ( ) ) ,
315
+ }
243
316
}
244
317
}
245
318
@@ -253,9 +326,12 @@ pub struct OnDemand {
253
326
in_transit : RwLock < HashMap < ReqId , Pending > > ,
254
327
cache : Arc < Mutex < Cache > > ,
255
328
no_immediate_dispatch : bool ,
329
+ base_retry_count : usize ,
330
+ query_inactive_time_limit : Option < Duration > ,
256
331
}
257
332
258
333
impl OnDemand {
334
+
259
335
/// Create a new `OnDemand` service with the given cache.
260
336
pub fn new ( cache : Arc < Mutex < Cache > > ) -> Self {
261
337
OnDemand {
@@ -264,6 +340,8 @@ impl OnDemand {
264
340
in_transit : RwLock :: new ( HashMap :: new ( ) ) ,
265
341
cache,
266
342
no_immediate_dispatch : false ,
343
+ base_retry_count : DEFAULT_RETRY_COUNT ,
344
+ query_inactive_time_limit : Some ( DEFAULT_QUERY_TIME_LIMIT ) ,
267
345
}
268
346
}
269
347
@@ -282,11 +360,11 @@ impl OnDemand {
282
360
/// Fails if back-references are not coherent.
283
361
/// The returned vector of responses will correspond to the requests exactly.
284
362
pub fn request_raw ( & self , ctx : & BasicContext , requests : Vec < Request > )
285
- -> Result < Receiver < Vec < Response > > , basic_request:: NoSuchOutput >
363
+ -> Result < Receiver < PendingResponse > , basic_request:: NoSuchOutput >
286
364
{
287
365
let ( sender, receiver) = oneshot:: channel ( ) ;
288
366
if requests. is_empty ( ) {
289
- assert ! ( sender. send( Vec :: new( ) ) . is_ok( ) , "receiver still in scope; qed" ) ;
367
+ assert ! ( sender. send( Ok ( Vec :: new( ) ) ) . is_ok( ) , "receiver still in scope; qed" ) ;
290
368
return Ok ( receiver) ;
291
369
}
292
370
@@ -325,6 +403,10 @@ impl OnDemand {
325
403
required_capabilities : capabilities,
326
404
responses,
327
405
sender,
406
+ base_query_index : 0 ,
407
+ remaining_query_count : 0 ,
408
+ query_id_history : BTreeSet :: new ( ) ,
409
+ inactive_time_limit : None ,
328
410
} ) ;
329
411
330
412
Ok ( receiver)
@@ -363,30 +445,68 @@ impl OnDemand {
363
445
let peers = self . peers . read ( ) ;
364
446
* pending = :: std:: mem:: replace ( & mut * pending, Vec :: new ( ) ) . into_iter ( )
365
447
. filter ( |pending| !pending. sender . is_canceled ( ) )
366
- . filter_map ( |pending| {
448
+ . filter_map ( |mut pending| {
367
449
// the peer we dispatch to is chosen randomly
368
450
let num_peers = peers. len ( ) ;
369
- let rng = rand:: random :: < usize > ( ) % cmp:: max ( num_peers, 1 ) ;
370
- for ( peer_id, peer) in peers. iter ( ) . chain ( peers. iter ( ) ) . skip ( rng) . take ( num_peers) {
451
+ let history_len = pending. query_id_history . len ( ) ;
452
+ let offset = if history_len == 0 {
453
+ pending. remaining_query_count = self . base_retry_count ;
454
+ let rand = rand:: random :: < usize > ( ) ;
455
+ pending. base_query_index = rand;
456
+ rand
457
+ } else {
458
+ pending. base_query_index + history_len
459
+ } % cmp:: max ( num_peers, 1 ) ;
460
+ let init_remaining_query_count = pending. remaining_query_count ; // to fail in case of big reduction of nb of peers
461
+ for ( peer_id, peer) in peers. iter ( ) . chain ( peers. iter ( ) )
462
+ . skip ( offset) . take ( num_peers) {
371
463
// TODO: see which requests can be answered by the cache?
372
-
373
- if !peer. can_fulfill ( & pending. required_capabilities ) {
374
- continue
464
+ if pending. remaining_query_count == 0 {
465
+ break
375
466
}
376
467
377
- match ctx. request_from ( * peer_id, pending. net_requests . clone ( ) ) {
378
- Ok ( req_id) => {
379
- trace ! ( target: "on_demand" , "Dispatched request {} to peer {}" , req_id, peer_id) ;
380
- self . in_transit . write ( ) . insert ( req_id, pending) ;
381
- return None
468
+ if pending. query_id_history . insert ( peer_id. clone ( ) ) {
469
+
470
+ if !peer. can_fulfill ( & pending. required_capabilities ) {
471
+ trace ! ( target: "on_demand" , "Peer {} without required capabilities, skipping, {} remaining attempts" , peer_id, pending. remaining_query_count) ;
472
+ continue
473
+ }
474
+
475
+ pending. remaining_query_count -= 1 ;
476
+ pending. inactive_time_limit = None ;
477
+
478
+ match ctx. request_from ( * peer_id, pending. net_requests . clone ( ) ) {
479
+ Ok ( req_id) => {
480
+ trace ! ( target: "on_demand" , "Dispatched request {} to peer {}, {} remaining attempts" , req_id, peer_id, pending. remaining_query_count) ;
481
+ self . in_transit . write ( ) . insert ( req_id, pending) ;
482
+ return None
483
+ }
484
+ Err ( net:: Error :: NoCredits ) | Err ( net:: Error :: NotServer ) => { }
485
+ Err ( e) => debug ! ( target: "on_demand" , "Error dispatching request to peer: {}" , e) ,
382
486
}
383
- Err ( net:: Error :: NoCredits ) | Err ( net:: Error :: NotServer ) => { }
384
- Err ( e) => debug ! ( target: "on_demand" , "Error dispatching request to peer: {}" , e) ,
385
487
}
386
488
}
387
489
388
- // TODO: maximum number of failures _when we have peers_.
389
- Some ( pending)
490
+ if pending. remaining_query_count == 0 {
491
+ pending. no_response ( ) ;
492
+ None
493
+ } else if init_remaining_query_count == pending. remaining_query_count {
494
+ if let Some ( query_inactive_time_limit) = self . query_inactive_time_limit {
495
+ let now = SystemTime :: now ( ) ;
496
+ if let Some ( inactive_time_limit) = pending. inactive_time_limit {
497
+ if now > inactive_time_limit {
498
+ pending. time_out ( ) ;
499
+ return None
500
+ }
501
+ } else {
502
+ debug ! ( target: "on_demand" , "No more peers to query, waiting for {} seconds until dropping query" , query_inactive_time_limit. as_secs( ) ) ;
503
+ pending. inactive_time_limit = Some ( now + query_inactive_time_limit) ;
504
+ }
505
+ }
506
+ Some ( pending)
507
+ } else {
508
+ Some ( pending)
509
+ }
390
510
} )
391
511
. collect ( ) ; // `pending` now contains all requests we couldn't dispatch.
392
512
@@ -406,6 +526,21 @@ impl OnDemand {
406
526
self . attempt_dispatch ( ctx) ;
407
527
}
408
528
}
529
+
530
+ /// Set the retry count for a query.
531
+ pub fn default_retry_number ( & mut self , nb_retry : usize ) {
532
+ self . base_retry_count = nb_retry;
533
+ }
534
+
535
+ /// Set the time limit for a query.
536
+ pub fn query_inactive_time_limit ( & mut self , inactive_time_limit : Duration ) {
537
+ self . query_inactive_time_limit = if inactive_time_limit == NULL_DURATION {
538
+ None
539
+ } else {
540
+ Some ( inactive_time_limit)
541
+ } ;
542
+ }
543
+
409
544
}
410
545
411
546
impl Handler for OnDemand {
@@ -458,6 +593,16 @@ impl Handler for OnDemand {
458
593
None => return ,
459
594
} ;
460
595
596
+ if responses. is_empty ( ) {
597
+ if pending. remaining_query_count == 0 {
598
+ pending. no_response ( ) ;
599
+ return ;
600
+ }
601
+ } else {
602
+ // do not keep query counter for others elements of this batch
603
+ pending. query_id_history . clear ( ) ;
604
+ }
605
+
461
606
// for each incoming response
462
607
// 1. ensure verification data filled.
463
608
// 2. pending.requests.supply_response
0 commit comments