@@ -15,12 +15,13 @@ use tracing::{info, warn};
1515
1616pub struct NetworkManager {
1717 network_magic : u64 ,
18+ security_param : u64 ,
1819 next_id : u64 ,
1920 peers : BTreeMap < PeerId , PeerConnection > ,
2021 preferred_upstream : Option < PeerId > ,
2122 blocks_to_fetch : VecDeque < Header > ,
2223 blocks : HashMap < BlockHash , BlockStatus > ,
23- head : Option < Point > ,
24+ chain_prefix : VecDeque < Point > ,
2425 rolled_back : bool ,
2526 events : mpsc:: Receiver < NetworkEvent > ,
2627 events_sender : mpsc:: Sender < NetworkEvent > ,
@@ -31,18 +32,20 @@ pub struct NetworkManager {
3132impl NetworkManager {
3233 pub fn new (
3334 network_magic : u64 ,
35+ security_param : u64 ,
3436 events : mpsc:: Receiver < NetworkEvent > ,
3537 events_sender : mpsc:: Sender < NetworkEvent > ,
3638 block_sink : BlockSink ,
3739 ) -> Self {
3840 Self {
3941 network_magic,
42+ security_param,
4043 next_id : 0 ,
4144 peers : BTreeMap :: new ( ) ,
4245 preferred_upstream : None ,
4346 blocks_to_fetch : VecDeque :: new ( ) ,
4447 blocks : HashMap :: new ( ) ,
45- head : None ,
48+ chain_prefix : VecDeque :: new ( ) ,
4649 rolled_back : false ,
4750 events,
4851 events_sender,
@@ -73,16 +76,17 @@ impl NetworkManager {
7376 id,
7477 } ;
7578 let conn = PeerConnection :: new ( address, self . network_magic , sender, delay) ;
76- if self . preferred_upstream . is_none ( ) {
77- self . peers . insert ( id, conn) ;
78- self . set_preferred_upstream ( id) ;
79- } else {
80- if let Some ( head) = self . head . clone ( )
81- && let Err ( error) = conn. find_intersect ( vec ! [ head] )
79+ if self . preferred_upstream . is_some ( ) {
80+ let points = self . choose_points_for_find_intersect ( ) ;
81+ if !points. is_empty ( )
82+ && let Err ( error) = conn. find_intersect ( points)
8283 {
83- warn ! ( "could not sync {}: {error}" , conn. address) ;
84+ warn ! ( "could not sync {}: {error:# }" , conn. address) ;
8485 }
8586 self . peers . insert ( id, conn) ;
87+ } else {
88+ self . peers . insert ( id, conn) ;
89+ self . set_preferred_upstream ( id) ;
8690 }
8791 }
8892
@@ -91,30 +95,35 @@ impl NetworkManager {
9195 let Some ( upstream) = self . preferred_upstream else {
9296 bail ! ( "no peers" ) ;
9397 } ;
94- let Some ( conn ) = self . peers . get ( & upstream) else {
98+ let Some ( peer ) = self . peers . get ( & upstream) else {
9599 bail ! ( "preferred upstream not found" ) ;
96100 } ;
97- match conn . find_tip ( ) . await {
101+ match peer . find_tip ( ) . await {
98102 Ok ( point) => {
99103 self . sync_to_point ( point) ;
100104 return Ok ( ( ) ) ;
101105 }
102106 Err ( e) => {
103- warn ! ( "could not fetch tip from {}: {e}" , conn . address) ;
107+ warn ! ( "could not fetch tip from {}: {e:# }" , peer . address) ;
104108 self . handle_disconnect ( upstream) ;
105109 }
106110 }
107111 }
108112 }
109113
110114 pub fn sync_to_point ( & mut self , point : Point ) {
111- for conn in self . peers . values ( ) {
112- if let Err ( error) = conn . find_intersect ( vec ! [ point. clone( ) ] ) {
113- warn ! ( "could not sync {}: {error}" , conn . address) ;
115+ for peer in self . peers . values ( ) {
116+ if let Err ( error) = peer . find_intersect ( vec ! [ point. clone( ) ] ) {
117+ warn ! ( "could not sync {}: {error:# }" , peer . address) ;
114118 }
115119 }
116120 }
117121
122+ // Implementation note: this method is deliberately synchronous/non-blocking.
123+ // The task which handles network events should only block when waiting for new messages,
124+ // or when publishing messages to other modules. This avoids deadlock; if our event queue
125+ // is full and this method is blocked on writing to it, the queue can never drain.
126+ // Returns true if we might have new events to publish downstream.
118127 fn handle_peer_update ( & mut self , peer : PeerId , event : PeerEvent ) -> Result < bool > {
119128 let is_preferred = self . preferred_upstream . is_some_and ( |id| id == peer) ;
120129 match event {
@@ -132,8 +141,7 @@ impl NetworkManager {
132141 let Some ( peer) = self . peers . get ( & announcer) else {
133142 continue ;
134143 } ;
135- if let Err ( e) = peer. request_block ( header. hash , header. slot )
136- {
144+ if let Err ( e) = peer. request_block ( header. hash , header. slot ) {
137145 warn ! ( "could not request block from {}: {e}" , peer. address) ;
138146 self . handle_disconnect ( announcer) ;
139147 }
@@ -153,20 +161,24 @@ impl NetworkManager {
153161 if is_preferred {
154162 match point {
155163 Point :: Origin => {
164+ self . rolled_back = !self . chain_prefix . is_empty ( ) ;
165+ self . chain_prefix . clear ( ) ;
156166 self . blocks_to_fetch . clear ( ) ;
157- self . rolled_back = true ;
158167 }
159168 Point :: Specific ( slot, _) => {
160- let mut already_sent = true ;
161- while let Some ( newest) = self . blocks_to_fetch . back ( ) {
162- if newest. slot == slot {
163- already_sent = false ;
164- break ;
165- } else {
166- self . blocks_to_fetch . pop_back ( ) ;
167- }
169+ // don't bother fetching any blocks from after the rollback point
170+ while self . blocks_to_fetch . back ( ) . is_some_and ( |b| b. slot > slot) {
171+ self . blocks_to_fetch . pop_back ( ) ;
168172 }
169- if already_sent {
173+
174+ // If we're rolling back to before a block which we've emitted events for,
175+ // set rolled_back to true so that we signal that in the next message.
176+ while self
177+ . chain_prefix
178+ . back ( )
179+ . is_some_and ( |point| is_point_after ( point, slot) )
180+ {
181+ self . chain_prefix . pop_back ( ) ;
170182 self . rolled_back = true ;
171183 }
172184 }
@@ -188,32 +200,38 @@ impl NetworkManager {
188200 }
189201 }
190202
191- fn handle_disconnect ( & mut self , peer : PeerId ) {
192- let Some ( conn ) = self . peers . remove ( & peer ) else {
203+ fn handle_disconnect ( & mut self , id : PeerId ) {
204+ let Some ( peer ) = self . peers . remove ( & id ) else {
193205 return ;
194206 } ;
195- warn ! ( "disconnected from {}" , conn . address) ;
196- let is_preferred = self . preferred_upstream . is_some_and ( |id| id == peer ) ;
197- if is_preferred && let Some ( new_preferred) = self . peers . keys ( ) . next ( ) . copied ( ) {
207+ warn ! ( "disconnected from {}" , peer . address) ;
208+ let was_preferred = self . preferred_upstream . is_some_and ( |i| i == id ) ;
209+ if was_preferred && let Some ( new_preferred) = self . peers . keys ( ) . next ( ) . copied ( ) {
198210 self . set_preferred_upstream ( new_preferred) ;
199211 }
200212 if self . peers . is_empty ( ) {
201213 warn ! ( "no upstream peers!" ) ;
202214 }
203- let address = conn . address . clone ( ) ;
204- drop ( conn ) ;
215+ let address = peer . address . clone ( ) ;
216+ drop ( peer ) ;
205217 self . handle_new_connection ( address, Duration :: from_secs ( 5 ) ) ;
206218 }
207219
208- fn set_preferred_upstream ( & mut self , peer : PeerId ) {
209- if let Some ( conn) = self . peers . get ( & peer) {
210- info ! ( "setting preferred upstream to {}" , conn. address) ;
211- } else {
212- warn ! ( "setting preferred upstream to unrecognized node {peer:?}" ) ;
213- }
214- self . preferred_upstream = Some ( peer) ;
215- if let Some ( head) = self . head . clone ( ) {
216- self . sync_to_point ( head) ;
220+ fn set_preferred_upstream ( & mut self , id : PeerId ) {
221+ let Some ( peer) = self . peers . get ( & id) else {
222+ warn ! ( "setting preferred upstream to unrecognized node {id:?}" ) ;
223+ return ;
224+ } ;
225+ info ! ( "setting preferred upstream to {}" , peer. address) ;
226+ self . preferred_upstream = Some ( id) ;
227+
228+ // If our preferred upstream changed, resync all connections.
229+ // That will trigger a rollback if needed.
230+ let points = self . choose_points_for_find_intersect ( ) ;
231+ for peer in self . peers . values ( ) {
232+ if let Err ( error) = peer. find_intersect ( points. clone ( ) ) {
233+ warn ! ( "could not sync {}: {error:#}" , peer. address)
234+ }
217235 }
218236 }
219237
@@ -227,12 +245,54 @@ impl NetworkManager {
227245 if self . published_blocks . is_multiple_of ( 100 ) {
228246 info ! ( "Published block {}" , header. number) ;
229247 }
230- self . head = Some ( Point :: Specific ( header. slot , header. hash . to_vec ( ) ) ) ;
248+ let point = Point :: Specific ( header. slot , header. hash . to_vec ( ) ) ;
249+ self . chain_prefix . push_back ( point) ;
250+ while self . chain_prefix . len ( ) > self . security_param as usize {
251+ self . chain_prefix . pop_front ( ) ;
252+ }
231253 self . rolled_back = false ;
232254 self . blocks_to_fetch . pop_front ( ) ;
233255 }
234256 Ok ( ( ) )
235257 }
258+
259+ fn choose_points_for_find_intersect ( & self ) -> Vec < Point > {
260+ let mut iterator = self . chain_prefix . iter ( ) . rev ( ) ;
261+ let mut result = vec ! [ ] ;
262+
263+ // send the 5 most recent points
264+ for _ in 0 ..5 {
265+ if let Some ( next) = iterator. next ( ) {
266+ result. push ( next. clone ( ) ) ;
267+ }
268+ }
269+
270+ // then 5 more points, spaced out by 10 block heights each
271+ let mut iterator = iterator. step_by ( 10 ) ;
272+ for _ in 0 ..5 {
273+ if let Some ( next) = iterator. next ( ) {
274+ result. push ( next. clone ( ) ) ;
275+ }
276+ }
277+
278+ // then 5 more points, spaced out by a total of 100 block heights each
279+ // (in case of an implausibly long rollback)
280+ let mut iterator = iterator. step_by ( 10 ) ;
281+ for _ in 0 ..5 {
282+ if let Some ( next) = iterator. next ( ) {
283+ result. push ( next. clone ( ) ) ;
284+ }
285+ }
286+
287+ result
288+ }
289+ }
290+
291+ const fn is_point_after ( point : & Point , slot : u64 ) -> bool {
292+ match point {
293+ Point :: Origin => false ,
294+ Point :: Specific ( s, _) => * s > slot,
295+ }
236296}
237297
238298pub enum NetworkEvent {
0 commit comments