@@ -42,16 +42,79 @@ use lightning::ln::peer_handler::APeerManager;
4242use lightning:: ln:: msgs:: NetAddress ;
4343
4444use std:: ops:: Deref ;
45- use std:: task;
45+ use std:: task:: { self , Poll } ;
46+ use std:: future:: Future ;
4647use std:: net:: SocketAddr ;
4748use std:: net:: TcpStream as StdTcpStream ;
4849use std:: sync:: { Arc , Mutex } ;
4950use std:: sync:: atomic:: { AtomicU64 , Ordering } ;
5051use std:: time:: Duration ;
52+ use std:: pin:: Pin ;
5153use std:: hash:: Hash ;
5254
5355static ID_COUNTER : AtomicU64 = AtomicU64 :: new ( 0 ) ;
5456
57+ // We only need to select over multiple futures in one place, and taking on the full `tokio/macros`
58+ // dependency tree in order to do so (which has broken our MSRV before) is excessive. Instead, we
59+ // define a trivial two- and three- select macro with the specific types we need and just use that.
60+
61+ pub ( crate ) enum SelectorOutput {
62+ A ( Option < ( ) > ) , B ( Option < ( ) > ) , C ( tokio:: io:: Result < usize > ) ,
63+ }
64+
65+ pub ( crate ) struct TwoSelector <
66+ A : Future < Output =Option < ( ) > > + Unpin , B : Future < Output =Option < ( ) > > + Unpin
67+ > {
68+ pub a : A ,
69+ pub b : B ,
70+ }
71+
72+ impl <
73+ A : Future < Output =Option < ( ) > > + Unpin , B : Future < Output =Option < ( ) > > + Unpin
74+ > Future for TwoSelector < A , B > {
75+ type Output = SelectorOutput ;
76+ fn poll ( mut self : Pin < & mut Self > , ctx : & mut task:: Context < ' _ > ) -> Poll < SelectorOutput > {
77+ match Pin :: new ( & mut self . a ) . poll ( ctx) {
78+ Poll :: Ready ( res) => { return Poll :: Ready ( SelectorOutput :: A ( res) ) ; } ,
79+ Poll :: Pending => { } ,
80+ }
81+ match Pin :: new ( & mut self . b ) . poll ( ctx) {
82+ Poll :: Ready ( res) => { return Poll :: Ready ( SelectorOutput :: B ( res) ) ; } ,
83+ Poll :: Pending => { } ,
84+ }
85+ Poll :: Pending
86+ }
87+ }
88+
89+ pub ( crate ) struct ThreeSelector <
90+ A : Future < Output =Option < ( ) > > + Unpin , B : Future < Output =Option < ( ) > > + Unpin , C : Future < Output =tokio:: io:: Result < usize > > + Unpin
91+ > {
92+ pub a : A ,
93+ pub b : B ,
94+ pub c : C ,
95+ }
96+
97+ impl <
98+ A : Future < Output =Option < ( ) > > + Unpin , B : Future < Output =Option < ( ) > > + Unpin , C : Future < Output =tokio:: io:: Result < usize > > + Unpin
99+ > Future for ThreeSelector < A , B , C > {
100+ type Output = SelectorOutput ;
101+ fn poll ( mut self : Pin < & mut Self > , ctx : & mut task:: Context < ' _ > ) -> Poll < SelectorOutput > {
102+ match Pin :: new ( & mut self . a ) . poll ( ctx) {
103+ Poll :: Ready ( res) => { return Poll :: Ready ( SelectorOutput :: A ( res) ) ; } ,
104+ Poll :: Pending => { } ,
105+ }
106+ match Pin :: new ( & mut self . b ) . poll ( ctx) {
107+ Poll :: Ready ( res) => { return Poll :: Ready ( SelectorOutput :: B ( res) ) ; } ,
108+ Poll :: Pending => { } ,
109+ }
110+ match Pin :: new ( & mut self . c ) . poll ( ctx) {
111+ Poll :: Ready ( res) => { return Poll :: Ready ( SelectorOutput :: C ( res) ) ; } ,
112+ Poll :: Pending => { } ,
113+ }
114+ Poll :: Pending
115+ }
116+ }
117+
55118/// Connection contains all our internal state for a connection - we hold a reference to the
56119/// Connection object (in an Arc<Mutex<>>) in each SocketDescriptor we create as well as in the
57120/// read future (which is returned by schedule_read).
@@ -127,29 +190,44 @@ impl Connection {
127190 }
128191 us_lock. read_paused
129192 } ;
130- tokio:: select! {
131- v = write_avail_receiver. recv( ) => {
193+ // TODO: Drop the Box'ing of the futures once Rust has pin-on-stack support.
194+ let select_result = if read_paused {
195+ TwoSelector {
196+ a : Box :: pin ( write_avail_receiver. recv ( ) ) ,
197+ b : Box :: pin ( read_wake_receiver. recv ( ) ) ,
198+ } . await
199+ } else {
200+ ThreeSelector {
201+ a : Box :: pin ( write_avail_receiver. recv ( ) ) ,
202+ b : Box :: pin ( read_wake_receiver. recv ( ) ) ,
203+ c : Box :: pin ( reader. read ( & mut buf) ) ,
204+ } . await
205+ } ;
206+ match select_result {
207+ SelectorOutput :: A ( v) => {
132208 assert ! ( v. is_some( ) ) ; // We can't have dropped the sending end, its in the us Arc!
133209 if peer_manager. as_ref ( ) . write_buffer_space_avail ( & mut our_descriptor) . is_err ( ) {
134210 break Disconnect :: CloseConnection ;
135211 }
136212 } ,
137- _ = read_wake_receiver. recv( ) => { } ,
138- read = reader. read( & mut buf) , if !read_paused => match read {
139- Ok ( 0 ) => break Disconnect :: PeerDisconnected ,
140- Ok ( len) => {
141- let read_res = peer_manager. as_ref( ) . read_event( & mut our_descriptor, & buf[ 0 ..len] ) ;
142- let mut us_lock = us. lock( ) . unwrap( ) ;
143- match read_res {
144- Ok ( pause_read) => {
145- if pause_read {
146- us_lock. read_paused = true ;
147- }
148- } ,
149- Err ( _) => break Disconnect :: CloseConnection ,
150- }
151- } ,
152- Err ( _) => break Disconnect :: PeerDisconnected ,
213+ SelectorOutput :: B ( _) => { } ,
214+ SelectorOutput :: C ( read) => {
215+ match read {
216+ Ok ( 0 ) => break Disconnect :: PeerDisconnected ,
217+ Ok ( len) => {
218+ let read_res = peer_manager. as_ref ( ) . read_event ( & mut our_descriptor, & buf[ 0 ..len] ) ;
219+ let mut us_lock = us. lock ( ) . unwrap ( ) ;
220+ match read_res {
221+ Ok ( pause_read) => {
222+ if pause_read {
223+ us_lock. read_paused = true ;
224+ }
225+ } ,
226+ Err ( _) => break Disconnect :: CloseConnection ,
227+ }
228+ } ,
229+ Err ( _) => break Disconnect :: PeerDisconnected ,
230+ }
153231 } ,
154232 }
155233 let _ = event_waker. try_send ( ( ) ) ;
0 commit comments