@@ -36,12 +36,10 @@ use tokio::{io, time};
3636use tokio:: sync:: mpsc;
3737use tokio:: io:: { AsyncReadExt , AsyncWrite , AsyncWriteExt } ;
3838
39- use lightning:: chain:: keysinterface:: NodeSigner ;
4039use lightning:: ln:: peer_handler;
4140use lightning:: ln:: peer_handler:: SocketDescriptor as LnSocketTrait ;
42- use lightning:: ln:: peer_handler:: CustomMessageHandler ;
43- use lightning:: ln:: msgs:: { ChannelMessageHandler , NetAddress , OnionMessageHandler , RoutingMessageHandler } ;
44- use lightning:: util:: logger:: Logger ;
41+ use lightning:: ln:: peer_handler:: APeerManager ;
42+ use lightning:: ln:: msgs:: NetAddress ;
4543
4644use std:: ops:: Deref ;
4745use std:: task;
@@ -80,53 +78,25 @@ struct Connection {
8078 id : u64 ,
8179}
8280impl Connection {
83- async fn poll_event_process < PM , CMH , RMH , OMH , L , UMH , NS > (
81+ async fn poll_event_process < PM : Deref + ' static + Send + Sync > (
8482 peer_manager : PM ,
8583 mut event_receiver : mpsc:: Receiver < ( ) > ,
86- ) where
87- PM : Deref < Target = peer_handler:: PeerManager < SocketDescriptor , CMH , RMH , OMH , L , UMH , NS > > + ' static + Send + Sync ,
88- CMH : Deref + ' static + Send + Sync ,
89- RMH : Deref + ' static + Send + Sync ,
90- OMH : Deref + ' static + Send + Sync ,
91- L : Deref + ' static + Send + Sync ,
92- UMH : Deref + ' static + Send + Sync ,
93- NS : Deref + ' static + Send + Sync ,
94- CMH :: Target : ChannelMessageHandler + Send + Sync ,
95- RMH :: Target : RoutingMessageHandler + Send + Sync ,
96- OMH :: Target : OnionMessageHandler + Send + Sync ,
97- L :: Target : Logger + Send + Sync ,
98- UMH :: Target : CustomMessageHandler + Send + Sync ,
99- NS :: Target : NodeSigner + Send + Sync ,
100- {
84+ ) where PM :: Target : APeerManager < Descriptor = SocketDescriptor > {
10185 loop {
10286 if event_receiver. recv ( ) . await . is_none ( ) {
10387 return ;
10488 }
105- peer_manager. process_events ( ) ;
89+ peer_manager. pm ( ) . process_events ( ) ;
10690 }
10791 }
10892
109- async fn schedule_read < PM , CMH , RMH , OMH , L , UMH , NS > (
93+ async fn schedule_read < PM : Deref + ' static + Send + Sync + Clone > (
11094 peer_manager : PM ,
11195 us : Arc < Mutex < Self > > ,
11296 mut reader : io:: ReadHalf < TcpStream > ,
11397 mut read_wake_receiver : mpsc:: Receiver < ( ) > ,
11498 mut write_avail_receiver : mpsc:: Receiver < ( ) > ,
115- ) where
116- PM : Deref < Target = peer_handler:: PeerManager < SocketDescriptor , CMH , RMH , OMH , L , UMH , NS > > + ' static + Send + Sync + Clone ,
117- CMH : Deref + ' static + Send + Sync ,
118- RMH : Deref + ' static + Send + Sync ,
119- OMH : Deref + ' static + Send + Sync ,
120- L : Deref + ' static + Send + Sync ,
121- UMH : Deref + ' static + Send + Sync ,
122- NS : Deref + ' static + Send + Sync ,
123- CMH :: Target : ChannelMessageHandler + ' static + Send + Sync ,
124- RMH :: Target : RoutingMessageHandler + ' static + Send + Sync ,
125- OMH :: Target : OnionMessageHandler + ' static + Send + Sync ,
126- L :: Target : Logger + ' static + Send + Sync ,
127- UMH :: Target : CustomMessageHandler + ' static + Send + Sync ,
128- NS :: Target : NodeSigner + ' static + Send + Sync ,
129- {
99+ ) where PM :: Target : APeerManager < Descriptor = SocketDescriptor > {
130100 // Create a waker to wake up poll_event_process, above
131101 let ( event_waker, event_receiver) = mpsc:: channel ( 1 ) ;
132102 tokio:: spawn ( Self :: poll_event_process ( peer_manager. clone ( ) , event_receiver) ) ;
@@ -160,15 +130,15 @@ impl Connection {
160130 tokio:: select! {
161131 v = write_avail_receiver. recv( ) => {
162132 assert!( v. is_some( ) ) ; // We can't have dropped the sending end, its in the us Arc!
163- if peer_manager. write_buffer_space_avail( & mut our_descriptor) . is_err( ) {
133+ if peer_manager. pm ( ) . write_buffer_space_avail( & mut our_descriptor) . is_err( ) {
164134 break Disconnect :: CloseConnection ;
165135 }
166136 } ,
167137 _ = read_wake_receiver. recv( ) => { } ,
168138 read = reader. read( & mut buf) , if !read_paused => match read {
169139 Ok ( 0 ) => break Disconnect :: PeerDisconnected ,
170140 Ok ( len) => {
171- let read_res = peer_manager. read_event( & mut our_descriptor, & buf[ 0 ..len] ) ;
141+ let read_res = peer_manager. pm ( ) . read_event( & mut our_descriptor, & buf[ 0 ..len] ) ;
172142 let mut us_lock = us. lock( ) . unwrap( ) ;
173143 match read_res {
174144 Ok ( pause_read) => {
@@ -197,8 +167,8 @@ impl Connection {
197167 let _ = writer. shutdown ( ) . await ;
198168 }
199169 if let Disconnect :: PeerDisconnected = disconnect_type {
200- peer_manager. socket_disconnected ( & our_descriptor) ;
201- peer_manager. process_events ( ) ;
170+ peer_manager. pm ( ) . socket_disconnected ( & our_descriptor) ;
171+ peer_manager. pm ( ) . process_events ( ) ;
202172 }
203173 }
204174
@@ -245,30 +215,17 @@ fn get_addr_from_stream(stream: &StdTcpStream) -> Option<NetAddress> {
245215/// The returned future will complete when the peer is disconnected and associated handling
246216/// futures are freed, though, because all processing futures are spawned with tokio::spawn, you do
247217/// not need to poll the provided future in order to make progress.
248- pub fn setup_inbound < PM , CMH , RMH , OMH , L , UMH , NS > (
218+ pub fn setup_inbound < PM : Deref + ' static + Send + Sync + Clone > (
249219 peer_manager : PM ,
250220 stream : StdTcpStream ,
251- ) -> impl std:: future:: Future < Output =( ) > where
252- PM : Deref < Target = peer_handler:: PeerManager < SocketDescriptor , CMH , RMH , OMH , L , UMH , NS > > + ' static + Send + Sync + Clone ,
253- CMH : Deref + ' static + Send + Sync ,
254- RMH : Deref + ' static + Send + Sync ,
255- OMH : Deref + ' static + Send + Sync ,
256- L : Deref + ' static + Send + Sync ,
257- UMH : Deref + ' static + Send + Sync ,
258- NS : Deref + ' static + Send + Sync ,
259- CMH :: Target : ChannelMessageHandler + Send + Sync ,
260- RMH :: Target : RoutingMessageHandler + Send + Sync ,
261- OMH :: Target : OnionMessageHandler + Send + Sync ,
262- L :: Target : Logger + Send + Sync ,
263- UMH :: Target : CustomMessageHandler + Send + Sync ,
264- NS :: Target : NodeSigner + Send + Sync ,
265- {
221+ ) -> impl std:: future:: Future < Output =( ) >
222+ where PM :: Target : APeerManager < Descriptor = SocketDescriptor > {
266223 let remote_addr = get_addr_from_stream ( & stream) ;
267224 let ( reader, write_receiver, read_receiver, us) = Connection :: new ( stream) ;
268225 #[ cfg( test) ]
269226 let last_us = Arc :: clone ( & us) ;
270227
271- let handle_opt = if peer_manager. new_inbound_connection ( SocketDescriptor :: new ( us. clone ( ) ) , remote_addr) . is_ok ( ) {
228+ let handle_opt = if peer_manager. pm ( ) . new_inbound_connection ( SocketDescriptor :: new ( us. clone ( ) ) , remote_addr) . is_ok ( ) {
272229 Some ( tokio:: spawn ( Connection :: schedule_read ( peer_manager, us, reader, read_receiver, write_receiver) ) )
273230 } else {
274231 // Note that we will skip socket_disconnected here, in accordance with the PeerManager
@@ -300,30 +257,17 @@ pub fn setup_inbound<PM, CMH, RMH, OMH, L, UMH, NS>(
300257/// The returned future will complete when the peer is disconnected and associated handling
301258/// futures are freed, though, because all processing futures are spawned with tokio::spawn, you do
302259/// not need to poll the provided future in order to make progress.
303- pub fn setup_outbound < PM , CMH , RMH , OMH , L , UMH , NS > (
260+ pub fn setup_outbound < PM : Deref + ' static + Send + Sync + Clone > (
304261 peer_manager : PM ,
305262 their_node_id : PublicKey ,
306263 stream : StdTcpStream ,
307- ) -> impl std:: future:: Future < Output =( ) > where
308- PM : Deref < Target = peer_handler:: PeerManager < SocketDescriptor , CMH , RMH , OMH , L , UMH , NS > > + ' static + Send + Sync + Clone ,
309- CMH : Deref + ' static + Send + Sync ,
310- RMH : Deref + ' static + Send + Sync ,
311- OMH : Deref + ' static + Send + Sync ,
312- L : Deref + ' static + Send + Sync ,
313- UMH : Deref + ' static + Send + Sync ,
314- NS : Deref + ' static + Send + Sync ,
315- CMH :: Target : ChannelMessageHandler + Send + Sync ,
316- RMH :: Target : RoutingMessageHandler + Send + Sync ,
317- OMH :: Target : OnionMessageHandler + Send + Sync ,
318- L :: Target : Logger + Send + Sync ,
319- UMH :: Target : CustomMessageHandler + Send + Sync ,
320- NS :: Target : NodeSigner + Send + Sync ,
321- {
264+ ) -> impl std:: future:: Future < Output =( ) >
265+ where PM :: Target : APeerManager < Descriptor = SocketDescriptor > {
322266 let remote_addr = get_addr_from_stream ( & stream) ;
323267 let ( reader, mut write_receiver, read_receiver, us) = Connection :: new ( stream) ;
324268 #[ cfg( test) ]
325269 let last_us = Arc :: clone ( & us) ;
326- let handle_opt = if let Ok ( initial_send) = peer_manager. new_outbound_connection ( their_node_id, SocketDescriptor :: new ( us. clone ( ) ) , remote_addr) {
270+ let handle_opt = if let Ok ( initial_send) = peer_manager. pm ( ) . new_outbound_connection ( their_node_id, SocketDescriptor :: new ( us. clone ( ) ) , remote_addr) {
327271 Some ( tokio:: spawn ( async move {
328272 // We should essentially always have enough room in a TCP socket buffer to send the
329273 // initial 10s of bytes. However, tokio running in single-threaded mode will always
@@ -342,7 +286,7 @@ pub fn setup_outbound<PM, CMH, RMH, OMH, L, UMH, NS>(
342286 } ,
343287 _ => {
344288 eprintln ! ( "Failed to write first full message to socket!" ) ;
345- peer_manager. socket_disconnected ( & SocketDescriptor :: new ( Arc :: clone ( & us) ) ) ;
289+ peer_manager. pm ( ) . socket_disconnected ( & SocketDescriptor :: new ( Arc :: clone ( & us) ) ) ;
346290 break Err ( ( ) ) ;
347291 }
348292 }
@@ -385,25 +329,12 @@ pub fn setup_outbound<PM, CMH, RMH, OMH, L, UMH, NS>(
385329/// disconnected and associated handling futures are freed, though, because all processing in said
386330/// futures are spawned with tokio::spawn, you do not need to poll the second future in order to
387331/// make progress.
388- pub async fn connect_outbound < PM , CMH , RMH , OMH , L , UMH , NS > (
332+ pub async fn connect_outbound < PM : Deref + ' static + Send + Sync + Clone > (
389333 peer_manager : PM ,
390334 their_node_id : PublicKey ,
391335 addr : SocketAddr ,
392- ) -> Option < impl std:: future:: Future < Output =( ) > > where
393- PM : Deref < Target = peer_handler:: PeerManager < SocketDescriptor , CMH , RMH , OMH , L , UMH , NS > > + ' static + Send + Sync + Clone ,
394- CMH : Deref + ' static + Send + Sync ,
395- RMH : Deref + ' static + Send + Sync ,
396- OMH : Deref + ' static + Send + Sync ,
397- L : Deref + ' static + Send + Sync ,
398- UMH : Deref + ' static + Send + Sync ,
399- NS : Deref + ' static + Send + Sync ,
400- CMH :: Target : ChannelMessageHandler + Send + Sync ,
401- RMH :: Target : RoutingMessageHandler + Send + Sync ,
402- OMH :: Target : OnionMessageHandler + Send + Sync ,
403- L :: Target : Logger + Send + Sync ,
404- UMH :: Target : CustomMessageHandler + Send + Sync ,
405- NS :: Target : NodeSigner + Send + Sync ,
406- {
336+ ) -> Option < impl std:: future:: Future < Output =( ) > >
337+ where PM :: Target : APeerManager < Descriptor = SocketDescriptor > {
407338 if let Ok ( Ok ( stream) ) = time:: timeout ( Duration :: from_secs ( 10 ) , async { TcpStream :: connect ( & addr) . await . map ( |s| s. into_std ( ) . unwrap ( ) ) } ) . await {
408339 Some ( setup_outbound ( peer_manager, their_node_id, stream) )
409340 } else { None }
0 commit comments