Skip to content

Commit d7784bb

Browse files
authored
app: Decouple stacks from listeners (#663)
The inbound and outbound configs bind an accept stack to the listener via serve::serve; but this makes it infeasible to test the accept stacks without actually binding a socket. This change moves the server binding into the main app initialization so that the inbound and outbound stacks can be built without a listener.
1 parent e06982b commit d7784bb

File tree

3 files changed

+79
-44
lines changed

3 files changed

+79
-44
lines changed

linkerd/app/inbound/src/lib.rs

Lines changed: 32 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -8,7 +8,7 @@
88
pub use self::endpoint::{HttpEndpoint, ProfileTarget, RequestTarget, Target, TcpEndpoint};
99
use self::prevent_loop::PreventLoop;
1010
use self::require_identity_for_ports::RequireIdentityForPorts;
11-
use futures::{future, prelude::*};
11+
use futures::future;
1212
use linkerd2_app_core::{
1313
admit, classify,
1414
config::{ProxyConfig, ServerConfig},
@@ -19,15 +19,15 @@ use linkerd2_app_core::{
1919
http::{self, orig_proto, strip_header, DetectHttp},
2020
identity, tap, tcp,
2121
},
22-
reconnect, router, serve,
22+
reconnect, router,
2323
spans::SpanConverter,
2424
svc::{self, NewService},
2525
transport::{self, io::BoxedIo, listen, tls},
2626
Error, ProxyMetrics, TraceContextLayer, DST_OVERRIDE_HEADER,
2727
};
2828
use std::collections::HashMap;
2929
use tokio::sync::mpsc;
30-
use tracing::{debug_span, info};
30+
use tracing::debug_span;
3131

3232
pub mod endpoint;
3333
mod prevent_loop;
@@ -40,18 +40,29 @@ pub struct Config {
4040
}
4141

4242
impl Config {
43-
pub async fn build<L, S, P>(
43+
pub fn build<L, S, P>(
4444
self,
4545
listen_addr: std::net::SocketAddr,
46-
listen: impl Stream<Item = std::io::Result<listen::Connection>> + Send + 'static,
4746
local_identity: tls::Conditional<identity::Local>,
4847
http_loopback: L,
4948
profiles_client: P,
5049
tap_layer: tap::Layer,
5150
metrics: ProxyMetrics,
5251
span_sink: Option<mpsc::Sender<oc::Span>>,
5352
drain: drain::Watch,
54-
) -> Result<(), Error>
53+
) -> impl tower::Service<
54+
listen::Addrs,
55+
Error = impl Into<Error>,
56+
Future = impl Send + 'static,
57+
Response = impl tower::Service<
58+
tokio::net::TcpStream,
59+
Response = (),
60+
Error = impl Into<Error>,
61+
Future = impl Send + 'static,
62+
> + Send
63+
+ 'static,
64+
> + Send
65+
+ 'static
5566
where
5667
L: tower::Service<Target, Response = S> + Unpin + Send + Clone + 'static,
5768
L::Error: Into<Error>,
@@ -80,16 +91,13 @@ impl Config {
8091
span_sink.clone(),
8192
);
8293
self.build_server(
83-
listen_addr,
84-
listen,
8594
tcp_connect,
8695
http_router,
8796
local_identity,
8897
metrics,
8998
span_sink,
9099
drain,
91100
)
92-
.await
93101
}
94102

95103
pub fn build_tcp_connect(
@@ -276,17 +284,27 @@ impl Config {
276284
.into_inner()
277285
}
278286

279-
pub async fn build_server<C, H, S>(
287+
pub fn build_server<C, H, S>(
280288
self,
281-
listen_addr: std::net::SocketAddr,
282-
listen: impl Stream<Item = std::io::Result<listen::Connection>> + Send + 'static,
283289
tcp_connect: C,
284290
http_router: H,
285291
local_identity: tls::Conditional<identity::Local>,
286292
metrics: ProxyMetrics,
287293
span_sink: Option<mpsc::Sender<oc::Span>>,
288294
drain: drain::Watch,
289-
) -> Result<(), Error>
295+
) -> impl tower::Service<
296+
listen::Addrs,
297+
Error = impl Into<Error>,
298+
Future = impl Send + 'static,
299+
Response = impl tower::Service<
300+
tokio::net::TcpStream,
301+
Response = (),
302+
Error = impl Into<Error>,
303+
Future = impl Send + 'static,
304+
> + Send
305+
+ 'static,
306+
> + Send
307+
+ 'static
290308
where
291309
C: tower::Service<TcpEndpoint> + Unpin + Clone + Send + Sync + 'static,
292310
C::Error: Into<Error>,
@@ -387,10 +405,7 @@ impl Config {
387405
.push_map_target(TcpEndpoint::from)
388406
.push(metrics.transport.layer_accept(TransportLabels))
389407
.into_inner();
390-
let accept = svc::stack::MakeSwitch::new(skip_detect, tls, accept_fwd);
391-
392-
info!(addr = %listen_addr, "Serving");
393-
serve::serve(listen, accept, drain.signal()).await
408+
svc::stack::MakeSwitch::new(skip_detect, tls, accept_fwd)
394409
}
395410
}
396411

linkerd/app/outbound/src/lib.rs

Lines changed: 21 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -6,7 +6,7 @@
66
#![deny(warnings, rust_2018_idioms)]
77

88
pub use self::endpoint::{HttpConcrete, HttpEndpoint, HttpLogical, LogicalPerRequest, TcpEndpoint};
9-
use futures::{future, prelude::*};
9+
use futures::future;
1010
use linkerd2_app_core::{
1111
admit, classify,
1212
config::{ProxyConfig, ServerConfig},
@@ -16,7 +16,7 @@ use linkerd2_app_core::{
1616
proxy::{
1717
self, core::resolve::Resolve, discover, http, identity, resolve::map_endpoint, tap, tcp,
1818
},
19-
reconnect, retry, router, serve,
19+
reconnect, retry, router,
2020
spans::SpanConverter,
2121
svc::{self, NewService},
2222
transport::{self, listen, tls},
@@ -29,7 +29,7 @@ use std::{
2929
time::Duration,
3030
};
3131
use tokio::sync::mpsc;
32-
use tracing::{debug_span, info, info_span};
32+
use tracing::{debug_span, info_span};
3333

3434
pub mod endpoint;
3535
mod prevent_loop;
@@ -426,19 +426,30 @@ impl Config {
426426
.into_inner()
427427
}
428428

429-
pub async fn build_server<E, R, C, H, S>(
429+
pub fn build_server<E, R, C, H, S, I>(
430430
self,
431-
listen_addr: std::net::SocketAddr,
432-
listen: impl Stream<Item = std::io::Result<listen::Connection>> + Send + 'static,
433431
refine: R,
434432
resolve: E,
435433
tcp_connect: C,
436434
http_router: H,
437435
metrics: ProxyMetrics,
438436
span_sink: Option<mpsc::Sender<oc::Span>>,
439437
drain: drain::Watch,
440-
) -> Result<(), Error>
438+
) -> impl tower::Service<
439+
listen::Addrs,
440+
Error = impl Into<Error>,
441+
Future = impl Send + 'static,
442+
Response = impl tower::Service<
443+
I,
444+
Response = (),
445+
Error = impl Into<Error>,
446+
Future = impl Send + 'static,
447+
> + Send
448+
+ 'static,
449+
> + Send
450+
+ 'static
441451
where
452+
I: tokio::io::AsyncRead + tokio::io::AsyncWrite + std::fmt::Debug + Unpin + Send + 'static,
442453
E: Resolve<Addr, Endpoint = proxy::api_resolve::Metadata> + Unpin + Clone + Send + 'static,
443454
E::Future: Unpin + Send,
444455
E::Resolution: Unpin + Send,
@@ -535,15 +546,13 @@ impl Config {
535546
drain.clone(),
536547
);
537548

538-
let accept = svc::stack(svc::stack::MakeSwitch::new(
549+
svc::stack(svc::stack::MakeSwitch::new(
539550
skip_detect.clone(),
540551
http,
541552
tcp_forward.push_map_target(TcpEndpoint::from),
542553
))
543-
.push(metrics.transport.layer_accept(TransportLabels));
544-
545-
info!(addr = %listen_addr, "Serving");
546-
serve::serve(listen, accept, drain.signal()).await
554+
.push(metrics.transport.layer_accept(TransportLabels))
555+
.into_inner()
547556
}
548557
}
549558

linkerd/app/src/lib.rs

Lines changed: 26 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -12,7 +12,7 @@ pub mod tap;
1212
pub use self::metrics::Metrics;
1313
use futures::{future, FutureExt, TryFutureExt};
1414
pub use linkerd2_app_core::{self as core, metrics, trace};
15-
use linkerd2_app_core::{control::ControlAddr, dns, drain, svc, Error};
15+
use linkerd2_app_core::{control::ControlAddr, dns, drain, serve, svc, Error};
1616
use linkerd2_app_gateway as gateway;
1717
use linkerd2_app_inbound as inbound;
1818
use linkerd2_app_outbound as outbound;
@@ -149,11 +149,13 @@ impl Config {
149149
outbound_metrics.clone(),
150150
);
151151

152+
let span = info_span!("outbound");
153+
let _enter = span.enter();
154+
info!(listen.addr = %outbound_addr);
152155
tokio::spawn(
153-
outbound
154-
.build_server(
155-
outbound_addr,
156-
outbound_listen,
156+
serve::serve(
157+
outbound_listen,
158+
outbound.build_server(
157159
svc::stack(refine.clone())
158160
.push_map_response(|(n, _)| n)
159161
.into_inner(),
@@ -163,22 +165,28 @@ impl Config {
163165
outbound_metrics,
164166
oc_span_sink.clone(),
165167
drain_rx.clone(),
166-
)
167-
.map_err(|e| panic!("outbound failed: {}", e))
168-
.instrument(info_span!("outbound")),
168+
),
169+
drain_rx.clone().signal(),
170+
)
171+
.map_err(|e| panic!("outbound failed: {}", e))
172+
.instrument(span.clone()),
169173
);
174+
drop(_enter);
170175

171176
let http_gateway = gateway.build(
172177
refine,
173178
outbound_http,
174179
local_identity.as_ref().map(|l| l.name().clone()),
175180
);
176181

182+
let span = info_span!("inbound");
183+
let _enter = span.enter();
184+
info!(listen.addr = %inbound_addr);
177185
tokio::spawn(
178-
inbound
179-
.build(
186+
serve::serve(
187+
inbound_listen,
188+
inbound.build(
180189
inbound_addr,
181-
inbound_listen,
182190
local_identity,
183191
svc::stack(http_gateway)
184192
.push_on_response(svc::layers().box_http_request())
@@ -187,11 +195,14 @@ impl Config {
187195
tap_layer,
188196
inbound_metrics,
189197
oc_span_sink,
190-
drain_rx,
191-
)
192-
.map_err(|e| panic!("inbound failed: {}", e))
193-
.instrument(info_span!("inbound")),
198+
drain_rx.clone(),
199+
),
200+
drain_rx.signal(),
201+
)
202+
.map_err(|e| panic!("inbound failed: {}", e))
203+
.instrument(span.clone()),
194204
);
205+
drop(_enter);
195206
});
196207

197208
Ok(App {

0 commit comments

Comments
 (0)