44 * License, v. 2.0. If a copy of the MPL was not distributed with this
55 * file, You can obtain one at http://mozilla.org/MPL/2.0/. */
66
7- extern crate aesm_client;
8- extern crate byteorder;
9- extern crate enclave_runner;
10- extern crate libc;
11- extern crate sgxs_loaders;
7+ use std:: io:: { Error , ErrorKind , Result as IoResult , Write } ;
8+ use std:: mem:: size_of;
9+ use std:: net:: Shutdown ;
10+ use std:: net:: { Ipv4Addr , Ipv6Addr , SocketAddr , TcpStream as SyncTcpStream } ;
11+ use std:: thread;
12+ use std:: future:: Future ;
13+ use std:: marker:: Unpin ;
14+ use std:: pin:: Pin ;
15+ use std:: task:: { Context , Poll } ;
16+
17+ use futures:: { FutureExt , Stream , StreamExt , TryStreamExt } ;
18+ use tokio:: io:: { self , AsyncRead , AsyncReadExt } ;
19+ use tokio:: net:: { TcpListener , TcpStream } ;
1220
1321use aesm_client:: AesmClient ;
14- use enclave_runner:: usercalls:: { SyncListener , SyncStream , UsercallExtension } ;
22+ use enclave_runner:: usercalls:: { AsyncListener , AsyncStream , UsercallExtension } ;
1523use enclave_runner:: EnclaveBuilder ;
1624use sgxs_loaders:: isgx:: Device as IsgxDevice ;
17- use std:: io;
18-
19- use byteorder:: { NetworkEndian , ReadBytesExt } ;
20- use std:: io:: { Error , ErrorKind , Read , Result as IoResult , Write } ;
21- use std:: mem:: size_of;
22- use std:: net:: Shutdown ;
23- use std:: net:: { Ipv4Addr , Ipv6Addr , SocketAddr , TcpListener , TcpStream } ;
24- use std:: thread;
2525
2626/// This example demonstrates use of usercall extensions for bind call.
2727/// User call extension allow the enclave code to "bind" to an external service via a customized enclave runner.
@@ -49,11 +49,11 @@ struct ProxyIpv4Addr {
4949}
5050
5151impl ProxyIpv4Addr {
52- fn from_reader ( rdr : & mut impl Read ) -> IoResult < Self > {
53- let src_addr = rdr. read_u32 :: < NetworkEndian > ( ) ?;
54- let dst_addr = rdr. read_u32 :: < NetworkEndian > ( ) ?;
55- let src_port = rdr. read_u16 :: < NetworkEndian > ( ) ?;
56- let dst_port = rdr. read_u16 :: < NetworkEndian > ( ) ?;
52+ async fn from_reader ( rdr : & mut ( impl AsyncRead + Unpin ) ) -> IoResult < Self > {
53+ let src_addr = rdr. read_u32 ( ) . await ?;
54+ let dst_addr = rdr. read_u32 ( ) . await ?;
55+ let src_port = rdr. read_u16 ( ) . await ?;
56+ let dst_port = rdr. read_u16 ( ) . await ?;
5757
5858 Ok ( ProxyIpv4Addr {
5959 src_addr,
@@ -67,8 +67,8 @@ impl From<ProxyIpv4Addr> for (SocketAddr, SocketAddr) {
6767 fn from ( proxy_addr : ProxyIpv4Addr ) -> ( SocketAddr , SocketAddr ) {
6868 let src_ip_addr: Ipv4Addr = proxy_addr. src_addr . into ( ) ;
6969 let dst_ip_addr: Ipv4Addr = proxy_addr. dst_addr . into ( ) ;
70- let local_addr = SocketAddr :: new ( src_ip_addr. into ( ) , proxy_addr. src_port ) ;
71- let peer_addr = SocketAddr :: new ( dst_ip_addr. into ( ) , proxy_addr. dst_port ) ;
70+ let peer_addr = SocketAddr :: new ( src_ip_addr. into ( ) , proxy_addr. src_port ) ;
71+ let local_addr = SocketAddr :: new ( dst_ip_addr. into ( ) , proxy_addr. dst_port ) ;
7272 ( local_addr, peer_addr)
7373 }
7474}
@@ -82,13 +82,13 @@ struct ProxyIpv6Addr {
8282}
8383
8484impl ProxyIpv6Addr {
85- fn from_reader ( rdr : & mut impl Read ) -> IoResult < Self > {
85+ async fn from_reader ( rdr : & mut ( impl AsyncRead + Unpin ) ) -> IoResult < Self > {
8686 let mut addr = ProxyIpv6Addr :: default ( ) ;
8787
88- let _ = rdr. read_exact ( & mut addr. src_addr [ ..] ) ?;
89- let _ = rdr. read_exact ( & mut addr. dst_addr [ ..] ) ?;
90- addr. src_port = rdr. read_u16 :: < NetworkEndian > ( ) ?;
91- addr. dst_port = rdr. read_u16 :: < NetworkEndian > ( ) ?;
88+ let _ = rdr. read_exact ( & mut addr. src_addr [ ..] ) . await ?;
89+ let _ = rdr. read_exact ( & mut addr. dst_addr [ ..] ) . await ?;
90+ addr. src_port = rdr. read_u16 ( ) . await ?;
91+ addr. dst_port = rdr. read_u16 ( ) . await ?;
9292
9393 Ok ( addr)
9494 }
@@ -98,8 +98,8 @@ impl From<ProxyIpv6Addr> for (SocketAddr, SocketAddr) {
9898 fn from ( proxy_addr : ProxyIpv6Addr ) -> ( SocketAddr , SocketAddr ) {
9999 let src_ip_addr: Ipv6Addr = proxy_addr. src_addr . into ( ) ;
100100 let dst_ip_addr: Ipv6Addr = proxy_addr. dst_addr . into ( ) ;
101- let local_addr = SocketAddr :: new ( src_ip_addr. into ( ) , proxy_addr. src_port ) ;
102- let peer_addr = SocketAddr :: new ( dst_ip_addr. into ( ) , proxy_addr. dst_port ) ;
101+ let peer_addr = SocketAddr :: new ( src_ip_addr. into ( ) , proxy_addr. src_port ) ;
102+ let local_addr = SocketAddr :: new ( dst_ip_addr. into ( ) , proxy_addr. dst_port ) ;
103103 ( local_addr, peer_addr)
104104 }
105105}
@@ -119,7 +119,7 @@ impl ProxyAddrReader {
119119 fn new ( ty : ProxyAddrType , len : u16 ) -> ProxyAddrReader {
120120 ProxyAddrReader { ty, len }
121121 }
122- fn read ( & self , rdr : & mut impl Read ) -> IoResult < Option < ( SocketAddr , SocketAddr ) > > {
122+ async fn read ( & self , rdr : & mut ( impl AsyncRead + Unpin ) ) -> IoResult < Option < ( SocketAddr , SocketAddr ) > > {
123123 match self . ty {
124124 ProxyAddrType :: V4 => {
125125 if self . len as usize != size_of :: < ProxyIpv4Addr > ( ) {
@@ -128,7 +128,7 @@ impl ProxyAddrReader {
128128 "Unexpected address length received" ,
129129 ) ) ?;
130130 }
131- let addr = ProxyIpv4Addr :: from_reader ( rdr) ?;
131+ let addr = ProxyIpv4Addr :: from_reader ( rdr) . await ?;
132132 let ( local, peer) = addr. into ( ) ;
133133 Ok ( Some ( ( local, peer) ) )
134134 }
@@ -139,12 +139,12 @@ impl ProxyAddrReader {
139139 "Unexpected address length received" ,
140140 ) ) ?;
141141 }
142- let addr = ProxyIpv6Addr :: from_reader ( rdr) ?;
142+ let addr = ProxyIpv6Addr :: from_reader ( rdr) . await ?;
143143 let ( local, peer) = addr. into ( ) ;
144144 Ok ( Some ( ( local, peer) ) )
145145 }
146146 ProxyAddrType :: Unspec => {
147- io:: copy ( & mut rdr. take ( self . len as _ ) , & mut io:: sink ( ) ) ?;
147+ io:: copy ( & mut rdr. take ( self . len as _ ) , & mut io:: sink ( ) ) . await ?;
148148 Ok ( None )
149149 }
150150 }
@@ -161,18 +161,18 @@ struct ProxyHdrV2 {
161161}
162162
163163impl ProxyHdrV2 {
164- fn from_reader ( rdr : & mut impl Read ) -> IoResult < Self > {
164+ async fn from_reader ( rdr : & mut ( impl AsyncRead + Unpin ) ) -> IoResult < Self > {
165165 let mut sig: [ u8 ; SIZE_HEADER_SIG ] = [ 0 ; SIZE_HEADER_SIG ] ;
166- let _ = rdr. read_exact ( & mut sig[ ..] ) ?;
166+ let _ = rdr. read_exact ( & mut sig[ ..] ) . await ?;
167167 if & sig[ ..] != HEADER_SIG {
168168 return Err ( Error :: new (
169169 ErrorKind :: InvalidData ,
170170 "Protocol header signature mismatch" ,
171171 ) ) ;
172172 }
173- let ver_cmd = rdr. read_u8 ( ) ?;
174- let fam = rdr. read_u8 ( ) ?;
175- let len = rdr. read_u16 :: < NetworkEndian > ( ) ?;
173+ let ver_cmd = rdr. read_u8 ( ) . await ?;
174+ let fam = rdr. read_u8 ( ) . await ?;
175+ let len = rdr. read_u16 ( ) . await ?;
176176 let addr_reader = match ( ver_cmd, fam) {
177177 ( V2CMD_LOCAL , FAMILY_UNSPEC ) => ProxyAddrReader :: new ( ProxyAddrType :: Unspec , len) ,
178178 ( V2CMD_PROXY , FAMILY_TCP4 ) => ProxyAddrReader :: new ( ProxyAddrType :: V4 , len) ,
@@ -192,49 +192,64 @@ impl ProxyHdrV2 {
192192 } )
193193 }
194194}
195- fn read_proxy_protocol_header (
195+ async fn read_proxy_protocol_header (
196196 stream : & mut TcpStream ,
197197) -> IoResult < Option < ( SocketAddr , SocketAddr ) > > {
198- let hdr = ProxyHdrV2 :: from_reader ( stream) ?;
199- hdr. addr_reader . read ( stream)
198+ let hdr = ProxyHdrV2 :: from_reader ( stream) . await ?;
199+ hdr. addr_reader . read ( stream) . await
200200}
201201
202202struct ProxyProtocol {
203- listener : TcpListener ,
203+ listen_stream : Pin < Box < dyn Send + Stream < Item = IoResult < ( TcpStream , Option < ( SocketAddr , SocketAddr ) > ) > > > >
204204}
205205
206206impl ProxyProtocol {
207- fn new ( addr : & str ) -> IoResult < ( Self , String ) > {
208- TcpListener :: bind ( addr) . map ( |listener| {
207+ async fn new ( addr : & str ) -> IoResult < ( Self , String ) > {
208+ TcpListener :: bind ( addr) . await . map ( |listener| {
209209 let local_address = match listener. local_addr ( ) {
210210 Ok ( local_address) => local_address. to_string ( ) ,
211211 Err ( _) => "error" . to_string ( ) ,
212212 } ;
213- ( ProxyProtocol { listener } , local_address)
213+ let listen_stream = listener. and_then ( |mut stream| { async {
214+ let proxied_addrs = read_proxy_protocol_header ( & mut stream) . await ?;
215+ Ok ( ( stream, proxied_addrs) )
216+ } } ) . boxed ( ) ;
217+ ( ProxyProtocol { listen_stream } , local_address)
214218 } )
215219 }
216220}
217221
218- impl SyncListener for ProxyProtocol {
219- fn accept ( & self , local_addr : Option < & mut String > , peer_addr : Option < & mut String > ) -> IoResult < Box < dyn SyncStream > > {
220- let ( mut stream, peer_address_tcp) = self . listener . accept ( ) ?;
221- let local_address_tcp = stream. local_addr ( ) ?;
222- eprintln ! (
223- "runner:: bind - local_address is {}, peer address is {}" ,
224- local_address_tcp, peer_address_tcp
225- ) ;
226- let ( local_address, peer_address) = read_proxy_protocol_header ( & mut stream) ?
227- . unwrap_or ( ( local_address_tcp, peer_address_tcp) ) ;
228-
229- if let Some ( local_addr) = local_addr {
230- * local_addr = local_address. to_string ( ) ;
231- }
222+ impl AsyncListener for ProxyProtocol {
223+ fn poll_accept (
224+ self : Pin < & mut Self > ,
225+ cx : & mut Context ,
226+ local_addr : Option < & mut String > ,
227+ peer_addr : Option < & mut String > ,
228+ ) -> Poll < IoResult < Option < Box < dyn AsyncStream > > > > {
229+ self . get_mut ( ) . listen_stream . as_mut ( ) . poll_next ( cx) . map ( |item| match item {
230+ Some ( Ok ( ( stream, proxied_addrs) ) ) => {
231+ let local_address_tcp = stream. local_addr ( ) ?;
232+ let peer_address_tcp = stream. peer_addr ( ) ?;
233+ eprintln ! (
234+ "runner:: bind - local_address is {}, peer address is {}" ,
235+ local_address_tcp, peer_address_tcp
236+ ) ;
237+ let ( local_address, peer_address) = proxied_addrs
238+ . unwrap_or ( ( local_address_tcp, peer_address_tcp) ) ;
239+
240+ if let Some ( local_addr) = local_addr {
241+ * local_addr = local_address. to_string ( ) ;
242+ }
232243
233- if let Some ( peer_addr) = peer_addr {
234- * peer_addr = peer_address. to_string ( ) ;
235- }
244+ if let Some ( peer_addr) = peer_addr {
245+ * peer_addr = peer_address. to_string ( ) ;
246+ }
236247
237- Ok ( Box :: new ( stream) )
248+ Ok ( Some ( Box :: new ( stream) as _ ) )
249+ } ,
250+ Some ( Err ( e) ) => Err ( e) ,
251+ None => Ok ( None ) ,
252+ } )
238253 }
239254}
240255
@@ -243,21 +258,23 @@ const HAPROXY_ADDRESS: &str = "localhost:6010";
243258#[ derive( Debug ) ]
244259struct HaproxyService ;
245260impl UsercallExtension for HaproxyService {
246- fn bind_stream (
247- & self ,
248- addr : & str ,
249- local_addr : Option < & mut String > ,
250- ) -> IoResult < Option < Box < dyn SyncListener > > > {
251- if addr == HAPROXY_ADDRESS {
252- let ( listener, local_address) = ProxyProtocol :: new ( addr) ?;
253- if let Some ( local_addr) = local_addr {
254- ( * local_addr) = local_address;
255- }
261+ fn bind_stream < ' future > (
262+ & ' future self ,
263+ addr : & ' future str ,
264+ local_addr : Option < & ' future mut String > ,
265+ ) -> Pin < Box < dyn Future < Output = IoResult < Option < Box < dyn AsyncListener > > > > + ' future > > {
266+ async move {
267+ if addr == HAPROXY_ADDRESS {
268+ let ( listener, local_address) = ProxyProtocol :: new ( addr) . await ?;
269+ if let Some ( local_addr) = local_addr {
270+ ( * local_addr) = local_address;
271+ }
256272
257- Ok ( Some ( Box :: new ( listener) ) )
258- } else {
259- Ok ( None )
260- }
273+ Ok ( Some ( Box :: new ( listener) as _ ) )
274+ } else {
275+ Ok ( None )
276+ }
277+ } . boxed_local ( )
261278 }
262279}
263280
@@ -298,7 +315,7 @@ impl SimulateHaProxyConfig {
298315 static TEST_DATA : & ' static str = "connection test data" ;
299316
300317 thread:: sleep ( std:: time:: Duration :: from_secs ( 2 ) ) ;
301- let mut stream = TcpStream :: connect ( HAPROXY_ADDRESS ) . unwrap ( ) ;
318+ let mut stream = SyncTcpStream :: connect ( HAPROXY_ADDRESS ) . unwrap ( ) ;
302319 stream. write_all ( header) . unwrap ( ) ;
303320 stream
304321 . write_all ( & format ! ( "{} {}\n " , TEST_DATA , profile_name) . as_bytes ( ) )
0 commit comments