@@ -82,7 +82,8 @@ impl<NC: NeighborComms> StackerDBSync<NC> {
82
82
Ok ( dbsync)
83
83
}
84
84
85
- /// Reset this state machine, and get the final result
85
+ /// Reset this state machine, and get the StackerDBSyncResult with newly-obtained chunk data
86
+ /// and newly-learned information about broken and dead peers.
86
87
pub fn reset (
87
88
& mut self ,
88
89
network : Option < & PeerNetwork > ,
@@ -155,6 +156,8 @@ impl<NC: NeighborComms> StackerDBSync<NC> {
155
156
/// Given the downloaded set of chunk inventories, identify:
156
157
/// * which chunks we need to fetch, because they're newer than ours.
157
158
/// * what order to fetch chunks in, in rarest-first order
159
+ /// Returns a list of (chunk requests, list of neighbors that can service them), which is
160
+ /// ordered from rarest chunk to most-common chunk.
158
161
pub fn make_chunk_request_schedule (
159
162
& self ,
160
163
network : & PeerNetwork ,
@@ -246,6 +249,8 @@ impl<NC: NeighborComms> StackerDBSync<NC> {
246
249
. collect ( ) ;
247
250
248
251
schedule. sort_by ( |item_1, item_2| item_1. 1 . len ( ) . cmp ( & item_2. 1 . len ( ) ) ) ;
252
+ schedule. reverse ( ) ;
253
+
249
254
test_debug ! (
250
255
"{:?}: Will request up to {} chunks for {}" ,
251
256
network. get_local_peer( ) ,
@@ -268,7 +273,7 @@ impl<NC: NeighborComms> StackerDBSync<NC> {
268
273
let mut need_chunks: HashMap < usize , ( StackerDBPushChunkData , Vec < NeighborAddress > ) > =
269
274
HashMap :: new ( ) ;
270
275
271
- // who has data we need ?
276
+ // who needs data we can serve ?
272
277
for ( i, local_version) in local_slot_versions. iter ( ) . enumerate ( ) {
273
278
let mut local_chunk = None ;
274
279
for ( naddr, chunk_inv) in self . chunk_invs . iter ( ) {
@@ -301,7 +306,7 @@ impl<NC: NeighborComms> StackerDBSync<NC> {
301
306
} ) ;
302
307
}
303
308
304
- let our_chunk = if let Some ( chunk) = local_chunk. take ( ) {
309
+ let our_chunk = if let Some ( chunk) = local_chunk. as_ref ( ) {
305
310
chunk
306
311
} else {
307
312
// we don't have this chunk
@@ -316,7 +321,6 @@ impl<NC: NeighborComms> StackerDBSync<NC> {
316
321
} ;
317
322
318
323
if !do_replicate {
319
- local_chunk = Some ( our_chunk) ;
320
324
continue ;
321
325
}
322
326
@@ -328,7 +332,6 @@ impl<NC: NeighborComms> StackerDBSync<NC> {
328
332
// Add a record for it.
329
333
need_chunks. insert ( i, ( our_chunk. clone ( ) , vec ! [ naddr. clone( ) ] ) ) ;
330
334
} ;
331
- local_chunk = Some ( our_chunk) ;
332
335
}
333
336
}
334
337
@@ -340,6 +343,7 @@ impl<NC: NeighborComms> StackerDBSync<NC> {
340
343
. collect ( ) ;
341
344
342
345
schedule. sort_by ( |item_1, item_2| item_1. 1 . len ( ) . cmp ( & item_2. 1 . len ( ) ) ) ;
346
+ schedule. reverse ( ) ;
343
347
test_debug ! (
344
348
"{:?}: Will push up to {} chunks for {}" ,
345
349
network. get_local_peer( ) ,
@@ -383,29 +387,8 @@ impl<NC: NeighborComms> StackerDBSync<NC> {
383
387
self . downloaded_chunks . insert ( naddr. clone ( ) , vec ! [ data] ) ;
384
388
}
385
389
386
- // yes, this is a linear scan. But because the number of chunks in the DB is a small O(1)
387
- // enforced by the protocol, this isn't a big deal.
388
- // This loop is only expected to run once, but is in place for defensive purposes.
389
- loop {
390
- let mut remove_idx = None ;
391
- for ( i, ( chunk, ..) ) in self . chunk_fetch_priorities . iter ( ) . enumerate ( ) {
392
- if chunk. slot_id == slot_id {
393
- remove_idx = Some ( i) ;
394
- break ;
395
- }
396
- }
397
- if let Some ( remove_idx) = remove_idx {
398
- test_debug ! (
399
- "Downloaded chunk {}.{} from {:?}" ,
400
- slot_id,
401
- _slot_version,
402
- & naddr
403
- ) ;
404
- self . chunk_fetch_priorities . remove ( remove_idx) ;
405
- } else {
406
- break ;
407
- }
408
- }
390
+ self . chunk_fetch_priorities . retain ( |( chunk, ..) | chunk. slot_id != slot_id) ;
391
+
409
392
if self . chunk_fetch_priorities . len ( ) > 0 {
410
393
let next_chunk_fetch_priority =
411
394
self . next_chunk_fetch_priority % self . chunk_fetch_priorities . len ( ) ;
@@ -422,33 +405,11 @@ impl<NC: NeighborComms> StackerDBSync<NC> {
422
405
naddr : NeighborAddress ,
423
406
new_inv : StackerDBChunkInvData ,
424
407
slot_id : u32 ,
425
- slot_version : u32 ,
426
408
) {
427
409
self . chunk_invs . insert ( naddr. clone ( ) , new_inv) ;
428
410
429
- // yes, this is a linear scan. But because the number of chunks in the DB is a small O(1)
430
- // enforced by the protocol, this isn't a big deal.
431
- // This loop is only expected to run once, but is in place for defensive purposes.
432
- loop {
433
- let mut remove_idx = None ;
434
- for ( i, ( chunk, ..) ) in self . chunk_push_priorities . iter ( ) . enumerate ( ) {
435
- if chunk. chunk_data . slot_id == slot_id {
436
- remove_idx = Some ( i) ;
437
- break ;
438
- }
439
- }
440
- if let Some ( remove_idx) = remove_idx {
441
- test_debug ! (
442
- "Pushed chunk {}.{} from {:?}" ,
443
- slot_id,
444
- slot_version,
445
- & naddr
446
- ) ;
447
- self . chunk_push_priorities . remove ( remove_idx) ;
448
- } else {
449
- break ;
450
- }
451
- }
411
+ self . chunk_push_priorities . retain ( |( chunk, ..) | chunk. chunk_data . slot_id != slot_id) ;
412
+
452
413
if self . chunk_push_priorities . len ( ) > 0 {
453
414
let next_chunk_push_priority =
454
415
self . next_chunk_push_priority % self . chunk_push_priorities . len ( ) ;
@@ -996,8 +957,8 @@ impl<NC: NeighborComms> StackerDBSync<NC> {
996
957
& naddr
997
958
) ;
998
959
999
- if let Some ( ( slot_id, slot_version ) ) = self . chunk_push_receipts . get ( & naddr) {
1000
- self . add_pushed_chunk ( naddr, new_chunk_inv, * slot_id, * slot_version ) ;
960
+ if let Some ( ( slot_id, _ ) ) = self . chunk_push_receipts . get ( & naddr) {
961
+ self . add_pushed_chunk ( naddr, new_chunk_inv, * slot_id) ;
1001
962
}
1002
963
}
1003
964
0 commit comments