@@ -79,6 +79,7 @@ impl<NC: NeighborComms> StackerDBSync<NC> {
79
79
num_attempted_connections : 0 ,
80
80
rounds : 0 ,
81
81
push_round : 0 ,
82
+ last_eviction_time : get_epoch_time_secs ( ) ,
82
83
} ;
83
84
dbsync. reset ( None , config) ;
84
85
dbsync
@@ -217,9 +218,36 @@ impl<NC: NeighborComms> StackerDBSync<NC> {
217
218
self . expected_versions . clear ( ) ;
218
219
self . downloaded_chunks . clear ( ) ;
219
220
220
- // reset comms, but keep all connected replicas pinned
221
+ // reset comms, but keep all connected replicas pinned.
222
+ // Randomly evict one every so often.
221
223
self . comms . reset ( ) ;
222
224
if let Some ( network) = network {
225
+ let mut eviction_index = None ;
226
+ if self . last_eviction_time + 60 < get_epoch_time_secs ( ) {
227
+ self . last_eviction_time = get_epoch_time_secs ( ) ;
228
+ if self . replicas . len ( ) > 0 {
229
+ eviction_index = Some ( thread_rng ( ) . gen_range ( 0 ..self . replicas . len ( ) ) ) ;
230
+ }
231
+ }
232
+
233
+ let remove_naddr = eviction_index. and_then ( |idx| {
234
+ let removed = self . replicas . iter ( ) . nth ( idx) . cloned ( ) ;
235
+ if let Some ( naddr) = removed. as_ref ( ) {
236
+ debug ! (
237
+ "{:?}: {}: don't reuse connection for replica {:?}" ,
238
+ network. get_local_peer( ) ,
239
+ & self . smart_contract_id,
240
+ & naddr,
241
+ ) ;
242
+ }
243
+ removed
244
+ } ) ;
245
+
246
+ if let Some ( naddr) = remove_naddr {
247
+ self . replicas . remove ( & naddr) ;
248
+ }
249
+
250
+ // retain the remaining replica connections
223
251
for naddr in self . replicas . iter ( ) {
224
252
if let Some ( event_id) = network. get_event_id ( & naddr. to_neighbor_key ( network) ) {
225
253
self . comms . pin_connection ( event_id) ;
@@ -668,7 +696,8 @@ impl<NC: NeighborComms> StackerDBSync<NC> {
668
696
/// We might not be connected to any yet.
669
697
/// Clears self.replicas, and fills in self.connected_replicas with already-connected neighbors
670
698
/// Returns Ok(true) if we can proceed to sync
671
- /// Returns Ok(false) if we have no known peers
699
+ /// Returns Ok(false) if we should try this again
700
+ /// Returns Err(NoSuchNeighbor) if we don't have anyone to talk to
672
701
/// Returns Err(..) on DB query error
673
702
pub fn connect_begin ( & mut self , network : & mut PeerNetwork ) -> Result < bool , net_error > {
674
703
if self . replicas . len ( ) == 0 {
@@ -686,7 +715,7 @@ impl<NC: NeighborComms> StackerDBSync<NC> {
686
715
) ;
687
716
if self . replicas . len ( ) == 0 {
688
717
// nothing to do
689
- return Ok ( false ) ;
718
+ return Err ( net_error :: NoSuchNeighbor ) ;
690
719
}
691
720
692
721
let naddrs = mem:: replace ( & mut self . replicas , HashSet :: new ( ) ) ;
@@ -729,11 +758,12 @@ impl<NC: NeighborComms> StackerDBSync<NC> {
729
758
) ;
730
759
self . num_attempted_connections += 1 ;
731
760
self . num_connections += 1 ;
761
+ self . connected_replicas . insert ( naddr) ;
732
762
}
733
763
Ok ( false ) => {
734
764
// need to retry
735
- self . replicas . insert ( naddr) ;
736
765
self . num_attempted_connections += 1 ;
766
+ self . replicas . insert ( naddr) ;
737
767
}
738
768
Err ( _e) => {
739
769
debug ! (
@@ -746,7 +776,7 @@ impl<NC: NeighborComms> StackerDBSync<NC> {
746
776
}
747
777
}
748
778
}
749
- Ok ( self . replicas . len ( ) == 0 )
779
+ Ok ( self . connected_replicas . len ( ) > 0 )
750
780
}
751
781
752
782
/// Finish up connecting to our replicas.
@@ -1154,7 +1184,8 @@ impl<NC: NeighborComms> StackerDBSync<NC> {
1154
1184
) ;
1155
1185
1156
1186
// fill up our comms with $capacity requests
1157
- for _i in 0 ..self . request_capacity {
1187
+ let mut num_sent = 0 ;
1188
+ for _i in 0 ..self . chunk_push_priorities . len ( ) {
1158
1189
if self . comms . count_inflight ( ) >= self . request_capacity {
1159
1190
break ;
1160
1191
}
@@ -1173,6 +1204,9 @@ impl<NC: NeighborComms> StackerDBSync<NC> {
1173
1204
chunk_push. chunk_data. slot_id,
1174
1205
chunk_push. chunk_data. slot_version,
1175
1206
) ;
1207
+
1208
+ // next-prioritized chunk
1209
+ cur_priority = ( cur_priority + 1 ) % self . chunk_push_priorities . len ( ) ;
1176
1210
continue ;
1177
1211
} ;
1178
1212
@@ -1213,6 +1247,11 @@ impl<NC: NeighborComms> StackerDBSync<NC> {
1213
1247
1214
1248
// next-prioritized chunk
1215
1249
cur_priority = ( cur_priority + 1 ) % self . chunk_push_priorities . len ( ) ;
1250
+
1251
+ num_sent += 1 ;
1252
+ if num_sent > self . request_capacity {
1253
+ break ;
1254
+ }
1216
1255
}
1217
1256
self . next_chunk_push_priority = cur_priority;
1218
1257
Ok ( self
@@ -1370,14 +1409,22 @@ impl<NC: NeighborComms> StackerDBSync<NC> {
1370
1409
let mut blocked = true ;
1371
1410
match self . state {
1372
1411
StackerDBSyncState :: ConnectBegin => {
1373
- let done = self . connect_begin ( network) ?;
1412
+ let done = match self . connect_begin ( network) {
1413
+ Ok ( done) => done,
1414
+ Err ( net_error:: NoSuchNeighbor ) => {
1415
+ // nothing to do
1416
+ self . state = StackerDBSyncState :: Finished ;
1417
+ blocked = false ;
1418
+ false
1419
+ }
1420
+ Err ( e) => {
1421
+ return Err ( e) ;
1422
+ }
1423
+ } ;
1374
1424
if done {
1375
1425
self . state = StackerDBSyncState :: ConnectFinish ;
1376
- } else {
1377
- // no replicas; try again
1378
- self . state = StackerDBSyncState :: Finished ;
1426
+ blocked = false ;
1379
1427
}
1380
- blocked = false ;
1381
1428
}
1382
1429
StackerDBSyncState :: ConnectFinish => {
1383
1430
let done = self . connect_try_finish ( network) ?;
0 commit comments