@@ -79,6 +79,7 @@ impl<NC: NeighborComms> StackerDBSync<NC> {
79
79
num_attempted_connections : 0 ,
80
80
rounds : 0 ,
81
81
push_round : 0 ,
82
+ last_eviction_time : get_epoch_time_secs ( ) ,
82
83
} ;
83
84
dbsync. reset ( None , config) ;
84
85
dbsync
@@ -217,10 +218,32 @@ impl<NC: NeighborComms> StackerDBSync<NC> {
217
218
self . expected_versions . clear ( ) ;
218
219
self . downloaded_chunks . clear ( ) ;
219
220
220
- // reset comms, but keep all connected replicas pinned
221
+ // reset comms, but keep all connected replicas pinned.
222
+ // Randomly evict one every so often.
221
223
self . comms . reset ( ) ;
222
224
if let Some ( network) = network {
223
- for naddr in self . replicas . iter ( ) {
225
+ let mut eviction_index = None ;
226
+ if self . last_eviction_time + 60 < get_epoch_time_secs ( ) {
227
+ self . last_eviction_time = get_epoch_time_secs ( ) ;
228
+ if self . replicas . len ( ) > 0 {
229
+ eviction_index = Some ( thread_rng ( ) . gen :: < usize > ( ) % self . replicas . len ( ) ) ;
230
+ }
231
+ }
232
+
233
+ let mut remove_naddr = None ;
234
+ for ( i, naddr) in self . replicas . iter ( ) . enumerate ( ) {
235
+ if let Some ( eviction_index) = eviction_index. as_ref ( ) {
236
+ if * eviction_index == i {
237
+ debug ! (
238
+ "{:?}: {}: don't reuse connection for replica {:?}" ,
239
+ network. get_local_peer( ) ,
240
+ & self . smart_contract_id,
241
+ & naddr,
242
+ ) ;
243
+ remove_naddr = Some ( naddr. clone ( ) ) ;
244
+ continue ;
245
+ }
246
+ }
224
247
if let Some ( event_id) = network. get_event_id ( & naddr. to_neighbor_key ( network) ) {
225
248
self . comms . pin_connection ( event_id) ;
226
249
debug ! (
@@ -232,6 +255,9 @@ impl<NC: NeighborComms> StackerDBSync<NC> {
232
255
) ;
233
256
}
234
257
}
258
+ if let Some ( naddr) = remove_naddr. take ( ) {
259
+ self . replicas . remove ( & naddr) ;
260
+ }
235
261
}
236
262
237
263
// reload from config
@@ -668,7 +694,8 @@ impl<NC: NeighborComms> StackerDBSync<NC> {
668
694
/// We might not be connected to any yet.
669
695
/// Clears self.replicas, and fills in self.connected_replicas with already-connected neighbors
670
696
/// Returns Ok(true) if we can proceed to sync
671
- /// Returns Ok(false) if we have no known peers
697
+ /// Returns Ok(false) if we should try this again
698
+ /// Returns Err(NoSuchNeighbor) if we don't have anyone to talk to
672
699
/// Returns Err(..) on DB query error
673
700
pub fn connect_begin ( & mut self , network : & mut PeerNetwork ) -> Result < bool , net_error > {
674
701
if self . replicas . len ( ) == 0 {
@@ -686,7 +713,7 @@ impl<NC: NeighborComms> StackerDBSync<NC> {
686
713
) ;
687
714
if self . replicas . len ( ) == 0 {
688
715
// nothing to do
689
- return Ok ( false ) ;
716
+ return Err ( net_error :: NoSuchNeighbor ) ;
690
717
}
691
718
692
719
let naddrs = mem:: replace ( & mut self . replicas , HashSet :: new ( ) ) ;
@@ -729,11 +756,12 @@ impl<NC: NeighborComms> StackerDBSync<NC> {
729
756
) ;
730
757
self . num_attempted_connections += 1 ;
731
758
self . num_connections += 1 ;
759
+ self . connected_replicas . insert ( naddr) ;
732
760
}
733
761
Ok ( false ) => {
734
762
// need to retry
735
- self . replicas . insert ( naddr) ;
736
763
self . num_attempted_connections += 1 ;
764
+ self . replicas . insert ( naddr) ;
737
765
}
738
766
Err ( _e) => {
739
767
debug ! (
@@ -746,7 +774,7 @@ impl<NC: NeighborComms> StackerDBSync<NC> {
746
774
}
747
775
}
748
776
}
749
- Ok ( self . replicas . len ( ) == 0 )
777
+ Ok ( self . connected_replicas . len ( ) > 0 )
750
778
}
751
779
752
780
/// Finish up connecting to our replicas.
@@ -1154,7 +1182,8 @@ impl<NC: NeighborComms> StackerDBSync<NC> {
1154
1182
) ;
1155
1183
1156
1184
// fill up our comms with $capacity requests
1157
- for _i in 0 ..self . request_capacity {
1185
+ let mut num_sent = 0 ;
1186
+ for _i in 0 ..self . chunk_push_priorities . len ( ) {
1158
1187
if self . comms . count_inflight ( ) >= self . request_capacity {
1159
1188
break ;
1160
1189
}
@@ -1173,6 +1202,9 @@ impl<NC: NeighborComms> StackerDBSync<NC> {
1173
1202
chunk_push. chunk_data. slot_id,
1174
1203
chunk_push. chunk_data. slot_version,
1175
1204
) ;
1205
+
1206
+ // next-prioritized chunk
1207
+ cur_priority = ( cur_priority + 1 ) % self . chunk_push_priorities . len ( ) ;
1176
1208
continue ;
1177
1209
} ;
1178
1210
@@ -1213,6 +1245,11 @@ impl<NC: NeighborComms> StackerDBSync<NC> {
1213
1245
1214
1246
// next-prioritized chunk
1215
1247
cur_priority = ( cur_priority + 1 ) % self . chunk_push_priorities . len ( ) ;
1248
+
1249
+ num_sent += 1 ;
1250
+ if num_sent > self . request_capacity {
1251
+ break ;
1252
+ }
1216
1253
}
1217
1254
self . next_chunk_push_priority = cur_priority;
1218
1255
Ok ( self
@@ -1370,14 +1407,22 @@ impl<NC: NeighborComms> StackerDBSync<NC> {
1370
1407
let mut blocked = true ;
1371
1408
match self . state {
1372
1409
StackerDBSyncState :: ConnectBegin => {
1373
- let done = self . connect_begin ( network) ?;
1410
+ let done = match self . connect_begin ( network) {
1411
+ Ok ( done) => done,
1412
+ Err ( net_error:: NoSuchNeighbor ) => {
1413
+ // nothing to do
1414
+ self . state = StackerDBSyncState :: Finished ;
1415
+ blocked = false ;
1416
+ false
1417
+ }
1418
+ Err ( e) => {
1419
+ return Err ( e) ;
1420
+ }
1421
+ } ;
1374
1422
if done {
1375
1423
self . state = StackerDBSyncState :: ConnectFinish ;
1376
- } else {
1377
- // no replicas; try again
1378
- self . state = StackerDBSyncState :: Finished ;
1424
+ blocked = false ;
1379
1425
}
1380
- blocked = false ;
1381
1426
}
1382
1427
StackerDBSyncState :: ConnectFinish => {
1383
1428
let done = self . connect_try_finish ( network) ?;
0 commit comments