@@ -6,15 +6,48 @@ use std::{
66use crate :: {
77 BlockSink , chain_state:: ChainState , connection:: { PeerChainSyncEvent , PeerConnection , PeerEvent }
88} ;
9+ use acropolis_common:: BlockHash ;
910use anyhow:: { Context as _, Result , bail} ;
1011use pallas:: network:: miniprotocols:: Point ;
1112use tokio:: sync:: mpsc;
1213use tracing:: { info, warn} ;
1314
15+ struct PeerData {
16+ conn : PeerConnection ,
17+ reqs : Vec < ( BlockHash , u64 ) > ,
18+ }
19+ impl PeerData {
20+ fn new ( conn : PeerConnection ) -> Self {
21+ Self {
22+ conn,
23+ reqs : vec ! [ ]
24+ }
25+ }
26+
27+ fn find_intersect ( & self , points : Vec < Point > ) {
28+ if let Err ( error) = self . conn . find_intersect ( points) {
29+ warn ! ( "could not sync {}: {error:#}" , self . conn. address) ;
30+ }
31+ }
32+
33+ fn request_block ( & mut self , hash : BlockHash , slot : u64 ) -> bool {
34+ if let Err ( error) = self . conn . request_block ( hash, slot) {
35+ warn ! ( "could not request block from {}: {error:#}" , self . conn. address) ;
36+ return false ;
37+ }
38+ self . reqs . push ( ( hash, slot) ) ;
39+ true
40+ }
41+
42+ fn ack_block ( & mut self , hash : BlockHash ) {
43+ self . reqs . retain ( |( h, _) | * h != hash) ;
44+ }
45+ }
46+
1447pub struct NetworkManager {
1548 network_magic : u64 ,
1649 next_id : u64 ,
17- peers : BTreeMap < PeerId , PeerConnection > ,
50+ peers : BTreeMap < PeerId , PeerData > ,
1851 chain : ChainState ,
1952 rolled_back : bool ,
2053 events : mpsc:: Receiver < NetworkEvent > ,
@@ -65,16 +98,15 @@ impl NetworkManager {
6598 id,
6699 } ;
67100 let conn = PeerConnection :: new ( address, self . network_magic , sender, delay) ;
101+ let peer = PeerData :: new ( conn) ;
68102 if self . chain . preferred_upstream . is_some ( ) {
69103 let points = self . chain . choose_points_for_find_intersect ( ) ;
70- if !points. is_empty ( )
71- && let Err ( error) = conn. find_intersect ( points)
72- {
73- warn ! ( "could not sync {}: {error:#}" , conn. address) ;
104+ if !points. is_empty ( ) {
105+ peer. find_intersect ( points) ;
74106 }
75- self . peers . insert ( id, conn ) ;
107+ self . peers . insert ( id, peer ) ;
76108 } else {
77- self . peers . insert ( id, conn ) ;
109+ self . peers . insert ( id, peer ) ;
78110 self . set_preferred_upstream ( id) ;
79111 }
80112 }
@@ -87,13 +119,13 @@ impl NetworkManager {
87119 let Some ( peer) = self . peers . get ( & upstream) else {
88120 bail ! ( "preferred upstream not found" ) ;
89121 } ;
90- match peer. find_tip ( ) . await {
122+ match peer. conn . find_tip ( ) . await {
91123 Ok ( point) => {
92124 self . sync_to_point ( point) ;
93125 return Ok ( ( ) ) ;
94126 }
95127 Err ( e) => {
96- warn ! ( "could not fetch tip from {}: {e:#}" , peer. address) ;
128+ warn ! ( "could not fetch tip from {}: {e:#}" , peer. conn . address) ;
97129 self . handle_disconnect ( upstream) ;
98130 }
99131 }
@@ -102,17 +134,14 @@ impl NetworkManager {
102134
103135 pub fn sync_to_point ( & mut self , point : Point ) {
104136 for peer in self . peers . values ( ) {
105- if let Err ( error) = peer. find_intersect ( vec ! [ point. clone( ) ] ) {
106- warn ! ( "could not sync {}: {error:#}" , peer. address) ;
107- }
137+ peer. find_intersect ( vec ! [ point. clone( ) ] ) ;
108138 }
109139 }
110140
111141 // Implementation note: this method is deliberately synchronous/non-blocking.
112142 // The task which handles network events should only block when waiting for new messages,
113143 // or when publishing messages to other modules. This avoids deadlock; if our event queue
114144 // is full and this method is blocked on writing to it, the queue can never drain.
115- // Returns true if we might have new events to publish downstream.
116145 fn handle_peer_update ( & mut self , peer : PeerId , event : PeerEvent ) {
117146 match event {
118147 PeerEvent :: ChainSync ( PeerChainSyncEvent :: RollForward ( header) ) => {
@@ -121,16 +150,7 @@ impl NetworkManager {
121150 let request_body_from = self . chain . handle_roll_forward ( peer, header) ;
122151 if !request_body_from. is_empty ( ) {
123152 // Request the block from the first peer which announced it
124- for announcer in request_body_from {
125- let Some ( peer) = self . peers . get ( & announcer) else {
126- continue ;
127- } ;
128- if let Err ( e) = peer. request_block ( hash, slot) {
129- warn ! ( "could not request block from {}: {e}" , peer. address) ;
130- self . handle_disconnect ( announcer) ;
131- }
132- break ; // only fetch from one
133- }
153+ self . request_block ( slot, hash, request_body_from) ;
134154 }
135155 }
136156 PeerEvent :: ChainSync ( PeerChainSyncEvent :: RollBackward ( point) ) => {
@@ -140,6 +160,9 @@ impl NetworkManager {
140160 }
141161 }
142162 PeerEvent :: BlockFetched ( fetched) => {
163+ for peer in self . peers . values_mut ( ) {
164+ peer. ack_block ( fetched. hash ) ;
165+ }
143166 self . chain . handle_body_fetched ( fetched. slot , fetched. hash , fetched. body ) ;
144167 }
145168 PeerEvent :: Disconnected => {
@@ -152,42 +175,57 @@ impl NetworkManager {
152175 let Some ( peer) = self . peers . remove ( & id) else {
153176 return ;
154177 } ;
155- warn ! ( "disconnected from {}" , peer. address) ;
178+ warn ! ( "disconnected from {}" , peer. conn . address) ;
156179 let was_preferred = self . chain . preferred_upstream . is_some_and ( |i| i == id) ;
157180 if was_preferred && let Some ( new_preferred) = self . peers . keys ( ) . next ( ) . copied ( ) {
158181 self . set_preferred_upstream ( new_preferred) ;
159182 }
160183 if self . peers . is_empty ( ) {
161184 warn ! ( "no upstream peers!" ) ;
162185 }
163- let address = peer. address . clone ( ) ;
164- drop ( peer) ;
186+ for ( requested_hash, requested_slot) in peer. reqs {
187+ let announcers = self . chain . block_announcers ( requested_slot, requested_hash) ;
188+ self . request_block ( requested_slot, requested_hash, announcers) ;
189+ }
190+
191+ let address = peer. conn . address . clone ( ) ;
165192 self . handle_new_connection ( address, Duration :: from_secs ( 5 ) ) ;
166193 }
167194
195+ fn request_block ( & mut self , slot : u64 , hash : BlockHash , announcers : Vec < PeerId > ) {
196+ for announcer in announcers {
197+ let Some ( peer) = self . peers . get_mut ( & announcer) else {
198+ continue ;
199+ } ;
200+ if peer. request_block ( hash, slot) {
201+ break ; // only fetch from one
202+ } else {
203+ self . handle_disconnect ( announcer) ;
204+ }
205+ }
206+ }
207+
168208 fn set_preferred_upstream ( & mut self , id : PeerId ) {
169209 let Some ( peer) = self . peers . get ( & id) else {
170210 warn ! ( "setting preferred upstream to unrecognized node {id:?}" ) ;
171211 return ;
172212 } ;
173- info ! ( "setting preferred upstream to {}" , peer. address) ;
213+ info ! ( "setting preferred upstream to {}" , peer. conn . address) ;
174214 self . chain . handle_new_preferred_upstream ( id) ;
175215
176216 // If our preferred upstream changed, resync all connections.
177217 // That will trigger a rollback if needed.
178218 let points = self . chain . choose_points_for_find_intersect ( ) ;
179219 for peer in self . peers . values ( ) {
180- if let Err ( error) = peer. find_intersect ( points. clone ( ) ) {
181- warn ! ( "could not sync {}: {error:#}" , peer. address)
182- }
220+ peer. find_intersect ( points. clone ( ) ) ;
183221 }
184222 }
185223
186224 async fn publish_blocks ( & mut self ) -> Result < ( ) > {
187225 while let Some ( ( header, body, rolled_back) ) = self . chain . next_unpublished_block ( ) {
188226 self . block_sink . announce ( header, body, rolled_back) . await ?;
189227 self . published_blocks += 1 ;
190- if self . published_blocks . is_multiple_of ( 1 ) {
228+ if self . published_blocks . is_multiple_of ( 100 ) {
191229 info ! ( "Published block {}" , header. number) ;
192230 }
193231 self . chain . handle_block_published ( ) ;
0 commit comments