11use crate :: { policy, Inbound } ;
22use linkerd_app_core:: {
3- io ,
4- proxy :: identity :: LocalCrtKey ,
3+ identity :: LocalCrtKey ,
4+ io , rustls ,
55 svc:: { self , ExtractParam , InsertParam , Param } ,
66 tls,
77 transport:: { self , metrics:: SensorIo , ClientAddr , OrigDstAddr , Remote , ServerAddr } ,
88 transport_header:: { self , NewTransportHeaderServer , SessionProtocol , TransportHeader } ,
99 Conditional , Error , NameAddr , Result ,
1010} ;
11- use std:: { convert:: TryFrom , fmt:: Debug } ;
11+ use std:: { convert:: TryFrom , fmt:: Debug , task } ;
1212use thiserror:: Error ;
1313use tracing:: { debug_span, info_span} ;
1414
@@ -52,8 +52,9 @@ pub struct ClientInfo {
5252 pub local_addr : OrigDstAddr ,
5353}
5454
55- type FwdIo < I > = SensorIo < io:: PrefixedIo < tls:: server:: Io < I > > > ;
56- pub type GatewayIo < I > = io:: EitherIo < FwdIo < I > , SensorIo < tls:: server:: Io < I > > > ;
55+ type TlsIo < I > = tls:: server:: Io < rustls:: ServerIo < tls:: server:: DetectIo < I > > , I > ;
56+ type FwdIo < I > = SensorIo < io:: PrefixedIo < TlsIo < I > > > ;
57+ pub type GatewayIo < I > = io:: EitherIo < FwdIo < I > , SensorIo < TlsIo < I > > > ;
5758
5859#[ derive( Clone ) ]
5960struct TlsParams {
@@ -102,7 +103,6 @@ impl<N> Inbound<N> {
102103 rt. metrics . proxy . transport . clone ( ) ,
103104 ) )
104105 . instrument ( |_: & _ | debug_span ! ( "opaque" ) )
105- . check_new_service :: < Local , _ > ( )
106106 // When the transport header is present, it may be used for either local TCP
107107 // forwarding, or we may be processing an HTTP gateway connection. HTTP gateway
108108 // connections that have a transport header must provide a target name as a part of
@@ -129,8 +129,13 @@ impl<N> Inbound<N> {
129129 negotiated_protocol : client. alpn ,
130130 } ,
131131 ) ;
132- let permit = allow. check_authorized ( client. client_addr , & tls) ?;
133- Ok ( svc:: Either :: A ( Local { addr : Remote ( ServerAddr ( addr) ) , permit, client_id : client. client_id , } ) )
132+ let permit =
133+ allow. check_authorized ( client. client_addr , & tls) ?;
134+ Ok ( svc:: Either :: A ( Local {
135+ addr : Remote ( ServerAddr ( addr) ) ,
136+ permit,
137+ client_id : client. client_id ,
138+ } ) )
134139 }
135140 TransportHeader {
136141 port,
@@ -167,30 +172,27 @@ impl<N> Inbound<N> {
167172 . instrument (
168173 |g : & GatewayTransportHeader | info_span ! ( "gateway" , dst = %g. target) ,
169174 )
170- . check_new_service :: < GatewayTransportHeader , io:: PrefixedIo < tls:: server:: Io < I > > > ( )
171175 . into_inner ( ) ,
172176 )
173177 // Use ALPN to determine whether a transport header should be read.
174178 . push ( NewTransportHeaderServer :: layer ( detect_timeout) )
175- . push_request_filter (
176- |client : ClientInfo | -> Result < _ > {
177- if client. header_negotiated ( ) {
178- Ok ( client)
179- } else {
180- Err ( RefusedNoTarget . into ( ) )
181- }
182- } ,
183- )
184- . check_new_service :: < ClientInfo , tls:: server:: Io < I > > ( )
179+ . push_request_filter ( |client : ClientInfo | -> Result < _ > {
180+ if client. header_negotiated ( ) {
181+ Ok ( client)
182+ } else {
183+ Err ( RefusedNoTarget . into ( ) )
184+ }
185+ } )
185186 // Build a ClientInfo target for each accepted connection. Refuse the
186187 // connection if it doesn't include an mTLS identity.
187188 . push_request_filter ( ClientInfo :: try_from)
188189 . push ( svc:: ArcNewService :: layer ( ) )
189- . push ( tls:: NewDetectTls :: < WithTransportHeaderAlpn , _ , _ > :: layer ( TlsParams {
190- timeout : tls:: server:: Timeout ( detect_timeout) ,
191- identity : WithTransportHeaderAlpn ( rt. identity . clone ( ) ) ,
192- } ) )
193- . check_new_service :: < T , I > ( )
190+ . push ( tls:: NewDetectTls :: < WithTransportHeaderAlpn , _ , _ > :: layer (
191+ TlsParams {
192+ timeout : tls:: server:: Timeout ( detect_timeout) ,
193+ identity : WithTransportHeaderAlpn ( rt. identity . clone ( ) ) ,
194+ } ,
195+ ) )
194196 . push_on_service ( svc:: BoxService :: layer ( ) )
195197 . push ( svc:: ArcNewService :: layer ( ) )
196198 } )
@@ -293,8 +295,20 @@ impl Param<tls::ConditionalServerTls> for GatewayTransportHeader {
293295
294296// === impl WithTransportHeaderAlpn ===
295297
296- impl svc:: Param < tls:: server:: Config > for WithTransportHeaderAlpn {
297- fn param ( & self ) -> tls:: server:: Config {
298+ impl < I > svc:: Service < I > for WithTransportHeaderAlpn
299+ where
300+ I : io:: AsyncRead + io:: AsyncWrite + Send + Unpin ,
301+ {
302+ type Response = ( tls:: ServerTls , rustls:: ServerIo < I > ) ;
303+ type Error = io:: Error ;
304+ type Future = rustls:: TerminateFuture < I > ;
305+
306+ #[ inline]
307+ fn poll_ready ( & mut self , _: & mut task:: Context < ' _ > ) -> task:: Poll < Result < ( ) , io:: Error > > {
308+ task:: Poll :: Ready ( Ok ( ( ) ) )
309+ }
310+
311+ fn call ( & mut self , io : I ) -> Self :: Future {
298312 // Copy the underlying TLS config and set an ALPN value.
299313 //
300314 // TODO: Avoid cloning the server config for every connection. It would
@@ -304,7 +318,7 @@ impl svc::Param<tls::server::Config> for WithTransportHeaderAlpn {
304318 config
305319 . alpn_protocols
306320 . push ( transport_header:: PROTOCOL . into ( ) ) ;
307- config. into ( )
321+ rustls :: terminate ( config. into ( ) , io )
308322 }
309323}
310324
0 commit comments