@@ -74,6 +74,8 @@ impl<NC: NeighborComms> StackerDBSync<NC> {
74
74
stale_neighbors : HashSet :: new ( ) ,
75
75
num_connections : 0 ,
76
76
num_attempted_connections : 0 ,
77
+ rounds : 0 ,
78
+ push_round : 0 ,
77
79
} ;
78
80
dbsync. reset ( None , config) ;
79
81
dbsync
@@ -215,6 +217,7 @@ impl<NC: NeighborComms> StackerDBSync<NC> {
215
217
self . state = StackerDBSyncState :: ConnectBegin ;
216
218
self . num_connections = 0 ;
217
219
self . num_attempted_connections = 0 ;
220
+ self . rounds += 1 ;
218
221
result
219
222
}
220
223
@@ -407,6 +410,16 @@ impl<NC: NeighborComms> StackerDBSync<NC> {
407
410
thread_rng ( ) . gen :: < u32 > ( ) % chunk_inv. num_outbound_replicas == 0
408
411
} ;
409
412
413
+ debug ! (
414
+ "{:?}: Can push chunk StackerDBChunk(db={},id={},ver={}) to {}. Replicate? {}" ,
415
+ & network. get_local_peer( ) ,
416
+ & self . smart_contract_id,
417
+ our_chunk. chunk_data. slot_id,
418
+ our_chunk. chunk_data. slot_version,
419
+ & naddr,
420
+ do_replicate
421
+ ) ;
422
+
410
423
if !do_replicate {
411
424
continue ;
412
425
}
@@ -1000,9 +1013,11 @@ impl<NC: NeighborComms> StackerDBSync<NC> {
1000
1013
/// Returns true if there are no more chunks to push.
1001
1014
/// Returns false if there are
1002
1015
pub fn pushchunks_begin ( & mut self , network : & mut PeerNetwork ) -> Result < bool , net_error > {
1003
- if self . chunk_push_priorities . len ( ) == 0 {
1016
+ if self . chunk_push_priorities . len ( ) == 0 && self . push_round != self . rounds {
1017
+ // only do this once per round
1004
1018
let priorities = self . make_chunk_push_schedule ( & network) ?;
1005
1019
self . chunk_push_priorities = priorities;
1020
+ self . push_round = self . rounds ;
1006
1021
}
1007
1022
if self . chunk_push_priorities . len ( ) == 0 {
1008
1023
// done
@@ -1017,8 +1032,6 @@ impl<NC: NeighborComms> StackerDBSync<NC> {
1017
1032
self . chunk_push_priorities. len( )
1018
1033
) ;
1019
1034
1020
- let mut pushed = 0 ;
1021
-
1022
1035
// fill up our comms with $capacity requests
1023
1036
for _i in 0 ..self . request_capacity {
1024
1037
if self . comms . count_inflight ( ) >= self . request_capacity {
@@ -1030,7 +1043,8 @@ impl<NC: NeighborComms> StackerDBSync<NC> {
1030
1043
. 1
1031
1044
. iter ( )
1032
1045
. enumerate ( )
1033
- . find ( |( _i, naddr) | !self . comms . has_inflight ( naddr) ) ;
1046
+ // .find(|(_i, naddr)| !self.comms.has_inflight(naddr));
1047
+ . find ( |( _i, _naddr) | true ) ;
1034
1048
1035
1049
let ( idx, selected_neighbor) = if let Some ( x) = selected_neighbor_opt {
1036
1050
x
@@ -1072,8 +1086,6 @@ impl<NC: NeighborComms> StackerDBSync<NC> {
1072
1086
continue ;
1073
1087
}
1074
1088
1075
- pushed += 1 ;
1076
-
1077
1089
// record what we just sent
1078
1090
self . chunk_push_receipts
1079
1091
. insert ( selected_neighbor. clone ( ) , ( slot_id, slot_version) ) ;
@@ -1088,7 +1100,13 @@ impl<NC: NeighborComms> StackerDBSync<NC> {
1088
1100
return Err ( net_error:: PeerNotConnected ) ;
1089
1101
}
1090
1102
self . next_chunk_push_priority = cur_priority;
1091
- Ok ( self . chunk_push_priorities . len ( ) == 0 )
1103
+ Ok ( self
1104
+ . chunk_push_priorities
1105
+ . iter ( )
1106
+ . fold ( 0usize , |acc, ( _chunk, num_naddrs) | {
1107
+ acc. saturating_add ( num_naddrs. len ( ) )
1108
+ } )
1109
+ == 0 )
1092
1110
}
1093
1111
1094
1112
/// Collect push-chunk replies from neighbors.
@@ -1138,7 +1156,14 @@ impl<NC: NeighborComms> StackerDBSync<NC> {
1138
1156
}
1139
1157
}
1140
1158
1141
- self . comms . count_inflight ( ) == 0
1159
+ let inflight = self . comms . count_inflight ( ) ;
1160
+ debug ! (
1161
+ "{:?}: inflight messages for {}: {:?}" ,
1162
+ network. get_local_peer( ) ,
1163
+ & self . smart_contract_id,
1164
+ inflight
1165
+ ) ;
1166
+ inflight == 0
1142
1167
}
1143
1168
1144
1169
/// Recalculate the download schedule based on chunkinvs received on push
@@ -1189,8 +1214,9 @@ impl<NC: NeighborComms> StackerDBSync<NC> {
1189
1214
1190
1215
loop {
1191
1216
debug ! (
1192
- "{:?}: stacker DB sync state is {:?}" ,
1217
+ "{:?}: stacker DB sync state for {} is {:?}" ,
1193
1218
network. get_local_peer( ) ,
1219
+ & self . smart_contract_id,
1194
1220
& self . state
1195
1221
) ;
1196
1222
let mut blocked = true ;
0 commit comments