Skip to content

Commit 268b3be

Browse files
authored
inbound: Reorganize server into smaller stacks (#1156)
The inbound server stack is fairly large. While sketching out changes to support inbound policy, it's helpful to split this larger stack into a few smaller components (using the recently-introduced `map_stack` helper). This change: * Modifies `Inbound::push_http_server` to return a boxed service. This service was being boxed by `Inbound::into_server`. This change helps reduce boilerplate in our test utilities, too. (In fact, this change requires that we eliminate some unnecessary type constraints). * Changes `Inbound::to_tcp_connect` to `Inbound::into_tcp_connect`. The internal cloning is not necessary. * Changes `Inbound::into_server` to `Inbound::push_server`. Internally, this function now uses several calls to `map_stack` to build the inbound server stack.
1 parent 700ca59 commit 268b3be

File tree

4 files changed

+117
-136
lines changed

4 files changed

+117
-136
lines changed

linkerd/app/inbound/src/http/mod.rs

Lines changed: 3 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -25,12 +25,7 @@ use tracing::debug_span;
2525
impl<H> Inbound<H> {
2626
pub fn push_http_server<T, I, HSvc>(
2727
self,
28-
) -> Inbound<
29-
svc::BoxNewService<
30-
T,
31-
impl svc::Service<I, Response = (), Error = Error, Future = impl Send> + Clone,
32-
>,
33-
>
28+
) -> Inbound<svc::BoxNewService<T, svc::BoxService<I, (), Error>>>
3429
where
3530
T: Param<Version>
3631
+ Param<http::normalize_uri::DefaultAuthority>
@@ -84,6 +79,7 @@ impl<H> Inbound<H> {
8479
.check_new_service::<T, http::Request<_>>()
8580
.instrument(|t: &T| debug_span!("http", v=%Param::<Version>::param(t)))
8681
.push(http::NewServeHttp::layer(h2_settings, rt.drain.clone()))
82+
.push_on_response(svc::BoxService::layer())
8783
.push(svc::BoxNewService::layer())
8884
})
8985
}
@@ -355,16 +351,7 @@ pub mod fuzz_logic {
355351
rt: ProxyRuntime,
356352
profiles: resolver::Profiles,
357353
connect: Connect<Remote<ServerAddr>>,
358-
) -> impl svc::NewService<
359-
HttpAccept,
360-
Service = impl tower::Service<
361-
I,
362-
Response = (),
363-
Error = impl Into<linkerd_app_core::Error>,
364-
Future = impl Send + 'static,
365-
> + Send
366-
+ Clone,
367-
> + Clone
354+
) -> svc::BoxNewService<HttpAccept, svc::BoxService<I, (), linkerd_app_core::Error>>
368355
where
369356
I: io::AsyncRead + io::AsyncWrite + io::PeerAddr + Send + Unpin + 'static,
370357
{

linkerd/app/inbound/src/http/tests.rs

Lines changed: 2 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -13,7 +13,7 @@ use linkerd_app_core::{
1313
svc::{self, NewService, Param},
1414
tls,
1515
transport::{ClientAddr, Remote, ServerAddr},
16-
Conditional, NameAddr, ProxyRuntime,
16+
Conditional, Error, NameAddr, ProxyRuntime,
1717
};
1818
use linkerd_app_test::connect::ConnectFuture;
1919
use linkerd_tracing::test::trace_init;
@@ -24,16 +24,7 @@ fn build_server<I>(
2424
rt: ProxyRuntime,
2525
profiles: resolver::Profiles,
2626
connect: Connect<Remote<ServerAddr>>,
27-
) -> svc::BoxNewService<
28-
HttpAccept,
29-
impl tower::Service<
30-
I,
31-
Response = (),
32-
Error = impl Into<linkerd_app_core::Error>,
33-
Future = impl Send + 'static,
34-
> + Send
35-
+ Clone,
36-
>
27+
) -> svc::BoxNewService<HttpAccept, svc::BoxService<I, (), Error>>
3728
where
3829
I: io::AsyncRead + io::AsyncWrite + io::PeerAddr + Send + Unpin + 'static,
3930
{

linkerd/app/inbound/src/lib.rs

Lines changed: 105 additions & 92 deletions
Original file line numberDiff line numberDiff line change
@@ -97,27 +97,32 @@ impl Inbound<()> {
9797

9898
/// Readies the inbound stack to make TCP connections (for both TCP
9999
// forwarding and HTTP proxying).
100-
pub fn to_tcp_connect<T: svc::Param<u16> + 'static>(
101-
&self,
100+
pub fn into_tcp_connect<T>(
101+
self,
102102
) -> Inbound<
103103
impl svc::Service<
104104
T,
105105
Response = impl io::AsyncRead + io::AsyncWrite + Send,
106106
Error = Error,
107107
Future = impl Send,
108108
> + Clone,
109-
> {
110-
self.clone().map_stack(|config, _, _| {
109+
>
110+
where
111+
T: svc::Param<u16> + 'static,
112+
{
113+
self.map_stack(|config, _, _| {
111114
// Establishes connections to remote peers (for both TCP
112115
// forwarding and HTTP proxying).
113116
let ConnectConfig {
114-
keepalive, timeout, ..
115-
} = config.proxy.connect.clone();
117+
ref keepalive,
118+
ref timeout,
119+
..
120+
} = config.proxy.connect;
116121

117-
svc::stack(transport::ConnectTcp::new(keepalive))
122+
svc::stack(transport::ConnectTcp::new(*keepalive))
118123
.push_map_target(|t: T| Remote(ServerAddr(([127, 0, 0, 1], t.param()).into())))
119124
// Limits the time we wait for a connection to be established.
120-
.push_connect_timeout(timeout)
125+
.push_connect_timeout(*timeout)
121126
.push(svc::stack::BoxFuture::layer())
122127
})
123128
}
@@ -142,19 +147,20 @@ impl Inbound<()> {
142147
P::Error: Send,
143148
P::Future: Send,
144149
{
145-
let (listen_addr, listen) = bind
150+
let (Local(ServerAddr(la)), listen) = bind
146151
.bind(&self.config.proxy.server)
147152
.expect("Failed to bind inbound listener");
148153

149154
let serve = async move {
150-
let stack =
151-
self.to_tcp_connect()
152-
.into_server(listen_addr.as_ref().port(), profiles, gateway);
153-
let shutdown = self.runtime.drain.signaled();
155+
let shutdown = self.runtime.drain.clone().signaled();
156+
let stack = self
157+
.into_tcp_connect()
158+
.push_server(la.port(), profiles, gateway)
159+
.into_inner();
154160
serve::serve(listen, stack, shutdown).await
155161
};
156162

157-
(listen_addr, serve)
163+
(Local(ServerAddr(la)), serve)
158164
}
159165
}
160166

@@ -199,12 +205,12 @@ where
199205
})
200206
}
201207

202-
pub fn into_server<T, I, G, GSvc, P>(
208+
pub fn push_server<T, I, G, GSvc, P>(
203209
self,
204210
server_port: u16,
205211
profiles: P,
206212
gateway: G,
207-
) -> svc::BoxNewService<T, svc::BoxService<I, (), Error>>
213+
) -> Inbound<svc::BoxNewService<T, svc::BoxService<I, (), Error>>>
208214
where
209215
T: svc::Param<Remote<ClientAddr>> + svc::Param<OrigDstAddr>,
210216
T: Clone + Send + 'static,
@@ -219,86 +225,93 @@ where
219225
P::Error: Send,
220226
P::Future: Send,
221227
{
222-
let Self {
223-
config:
224-
Config {
225-
proxy: config,
226-
require_identity_for_inbound_ports: require_id,
227-
disable_protocol_detection_for_ports: disable_detect,
228-
..
229-
},
230-
runtime: rt,
231-
stack: _,
232-
} = self.clone();
228+
// Handles inbound connections that target an opaque port.
229+
let opaque = self
230+
.clone()
231+
.push_tcp_forward(server_port)
232+
.map_stack(|_, rt, tcp| {
233+
tcp.push_map_target(TcpEndpoint::from)
234+
.push(rt.metrics.transport.layer_accept())
235+
.check_new_service::<TcpAccept, _>()
236+
})
237+
.into_stack();
238+
239+
// Handles inbound connections that could not be detected as HTTP.
240+
let tcp = self.clone().push_tcp_forward(server_port);
233241

234-
self.clone()
235-
.push_http_router(profiles)
242+
// Handles connections targeting the inbound proxy port--either by acting as a gateway to
243+
// the outbound stack or by forwarding connections locally (for opauque transport).
244+
let direct = tcp
245+
.clone()
246+
.push_direct(gateway)
247+
.into_stack()
248+
.instrument(|_: &_| debug_span!("direct"));
249+
250+
self.push_http_router(profiles)
236251
.push_http_server()
237-
.stack
238-
.push_map_target(HttpAccept::from)
239-
.push(svc::UnwrapOr::layer(
240-
// When HTTP detection fails, forward the connection to the
241-
// application as an opaque TCP stream.
242-
self.clone()
243-
.push_tcp_forward(server_port)
244-
.into_stack()
245-
.push_map_target(TcpEndpoint::from)
246-
.push_on_response(svc::BoxService::layer())
247-
.into_inner(),
248-
))
249-
.push_on_response(svc::BoxService::layer())
250-
.push_map_target(detect::allow_timeout)
251-
.push(svc::BoxNewService::layer())
252-
.push(detect::NewDetectService::layer(
253-
config.detect_protocol_timeout,
254-
http::DetectHttp::default(),
255-
))
256-
.push_request_filter(require_id)
257-
.push(rt.metrics.transport.layer_accept())
258-
.push_request_filter(TcpAccept::try_from)
259-
.push(svc::BoxNewService::layer())
260-
.push(tls::NewDetectTls::layer(
261-
rt.identity.clone(),
262-
config.detect_protocol_timeout,
263-
))
264-
.instrument(|_: &_| debug_span!("proxy"))
265-
.push_switch(
266-
move |t: T| {
267-
let OrigDstAddr(addr) = t.param();
268-
if !disable_detect.contains(&addr.port()) {
269-
Ok::<_, Never>(svc::Either::A(t))
270-
} else {
271-
Ok(svc::Either::B(TcpAccept::port_skipped(t)))
272-
}
273-
},
274-
self.clone()
275-
.push_tcp_forward(server_port)
276-
.stack
277-
.push_map_target(TcpEndpoint::from)
252+
.map_stack(|cfg, rt, http| {
253+
let detect_timeout = cfg.proxy.detect_protocol_timeout;
254+
let require_id = cfg.require_identity_for_inbound_ports.clone();
255+
256+
http.push_map_target(HttpAccept::from)
257+
.push(svc::UnwrapOr::layer(
258+
// When HTTP detection fails, forward the connection to the application as
259+
// an opaque TCP stream.
260+
tcp.into_stack()
261+
.push_map_target(TcpEndpoint::from)
262+
.push_on_response(svc::BoxService::layer())
263+
.into_inner(),
264+
))
265+
.push_map_target(detect::allow_timeout)
266+
.push(svc::BoxNewService::layer())
267+
.push(detect::NewDetectService::layer(
268+
detect_timeout,
269+
http::DetectHttp::default(),
270+
))
271+
.push_request_filter(require_id)
278272
.push(rt.metrics.transport.layer_accept())
279-
.check_new_service::<TcpAccept, _>()
280-
.instrument(|_: &TcpAccept| debug_span!("forward"))
281-
.into_inner(),
282-
)
283-
.check_new_service::<T, I>()
284-
.push_on_response(svc::BoxService::layer())
285-
.push(svc::BoxNewService::layer())
286-
.push_switch(
287-
PreventLoop::from(server_port).to_switch(),
288-
self.push_tcp_forward(server_port)
289-
.push_direct(gateway)
290-
.stack
291-
.instrument(|_: &_| debug_span!("direct"))
292-
.into_inner(),
293-
)
294-
.instrument(|a: &T| {
295-
let OrigDstAddr(target_addr) = a.param();
296-
info_span!("server", port = target_addr.port())
273+
.push_request_filter(TcpAccept::try_from)
274+
.push(svc::BoxNewService::layer())
275+
.push(tls::NewDetectTls::layer(
276+
rt.identity.clone(),
277+
detect_timeout,
278+
))
279+
})
280+
.map_stack(|cfg, _, detect| {
281+
let disable_detect = cfg.disable_protocol_detection_for_ports.clone();
282+
detect
283+
.instrument(|_: &_| debug_span!("proxy"))
284+
.push_switch(
285+
move |t: T| {
286+
let OrigDstAddr(addr) = t.param();
287+
if !disable_detect.contains(&addr.port()) {
288+
Ok::<_, Never>(svc::Either::A(t))
289+
} else {
290+
Ok(svc::Either::B(TcpAccept::port_skipped(t)))
291+
}
292+
},
293+
opaque
294+
.instrument(|_: &TcpAccept| debug_span!("forward"))
295+
.into_inner(),
296+
)
297+
.check_new_service::<T, I>()
298+
.push_on_response(svc::BoxService::layer())
299+
.push(svc::BoxNewService::layer())
300+
})
301+
.map_stack(|_, rt, accept| {
302+
accept
303+
.push_switch(
304+
PreventLoop::from(server_port).to_switch(),
305+
direct.into_inner(),
306+
)
307+
.instrument(|a: &T| {
308+
let OrigDstAddr(target_addr) = a.param();
309+
info_span!("server", port = target_addr.port())
310+
})
311+
.push(rt.metrics.tcp_accept_errors.layer())
312+
.push_on_response(svc::BoxService::layer())
313+
.push(svc::BoxNewService::layer())
297314
})
298-
.push(rt.metrics.tcp_accept_errors.layer())
299-
.push_on_response(svc::BoxService::layer())
300-
.push(svc::BoxNewService::layer())
301-
.into_inner()
302315
}
303316
}
304317

linkerd/app/test/src/http_util.rs

Lines changed: 7 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -7,6 +7,7 @@ use hyper::{
77
client::conn::{Builder as ClientBuilder, SendRequest},
88
Body, Request, Response,
99
};
10+
use linkerd_app_core::svc::BoxService;
1011
use std::{
1112
future::Future,
1213
sync::{Arc, Mutex},
@@ -22,6 +23,8 @@ pub struct Server {
2223

2324
type HandleFuture = Box<dyn (FnMut(Request<Body>) -> Result<Response<Body>, Error>) + Send>;
2425

26+
type BoxServer = BoxService<io::DuplexStream, (), Error>;
27+
2528
impl Default for Server {
2629
fn default() -> Self {
2730
Self {
@@ -36,18 +39,11 @@ impl Default for Server {
3639
}
3740
}
3841

39-
pub async fn run_proxy<S>(mut server: S) -> (io::DuplexStream, JoinHandle<Result<(), Error>>)
40-
where
41-
S: tower::Service<io::DuplexStream> + Send + Sync + 'static,
42-
S::Error: Into<Error>,
43-
S::Response: std::fmt::Debug + Send + Sync + 'static,
44-
S::Future: Send,
45-
{
42+
pub async fn run_proxy(mut server: BoxServer) -> (io::DuplexStream, JoinHandle<Result<(), Error>>) {
4643
let (client_io, server_io) = io::duplex(4096);
4744
let f = server
4845
.ready()
4946
.await
50-
.map_err(Into::into)
5147
.expect("proxy server failed to become ready")
5248
.call(server_io);
5349

@@ -79,16 +75,10 @@ pub async fn connect_client(
7975
(client, tokio::spawn(client_bg))
8076
}
8177

82-
pub async fn connect_and_accept<S>(
78+
pub async fn connect_and_accept(
8379
client_settings: &mut ClientBuilder,
84-
server: S,
85-
) -> (SendRequest<Body>, impl Future<Output = Result<(), Error>>)
86-
where
87-
S: tower::Service<io::DuplexStream> + Send + Sync + 'static,
88-
S::Error: Into<Error>,
89-
S::Response: std::fmt::Debug + Send + Sync + 'static,
90-
S::Future: Send,
91-
{
80+
server: BoxServer,
81+
) -> (SendRequest<Body>, impl Future<Output = Result<(), Error>>) {
9282
tracing::info!(settings = ?client_settings, "connecting client with");
9383
let (client_io, proxy) = run_proxy(server).await;
9484
let (client, client_bg) = connect_client(client_settings, client_io).await;

0 commit comments

Comments
 (0)