Skip to content

Commit 69dbf63

Browse files
authored
app: Defer stack construction until the proxy starts (#1200)
We currently eagerly build the inbound and outbound stacks. In an upcoming change, however, we'll want to ensure that we've obtained inbound port policies before initializing the inbound stack. This change modifies app initialization to move stack initialization into the proxy's start task. In order to do this, we eagerly bind the inbound/outbound listeners and provide the stream of incoming sockets to the stacks at construction-time. This doesn't actually change any behavior since stack construction was moved into a task that was run lazily. The change is purely internal, restructuring how/when this task is built.
1 parent 676b213 commit 69dbf63

File tree

3 files changed

+111
-100
lines changed

3 files changed

+111
-100
lines changed

linkerd/app/inbound/src/server.rs

Lines changed: 44 additions & 53 deletions
Original file line numberDiff line numberDiff line change
@@ -1,11 +1,11 @@
11
use crate::{direct, Inbound};
2+
use futures::Stream;
23
use linkerd_app_core::{
3-
config::ServerConfig,
44
io, profiles, serve, svc,
5-
transport::{self, listen::Bind, ClientAddr, Local, OrigDstAddr, Remote, ServerAddr},
5+
transport::{self, ClientAddr, Local, OrigDstAddr, Remote, ServerAddr},
66
Error,
77
};
8-
use std::{fmt::Debug, future::Future};
8+
use std::fmt::Debug;
99
use tracing::debug_span;
1010

1111
#[derive(Copy, Clone, Debug)]
@@ -16,72 +16,63 @@ struct TcpEndpoint {
1616
// === impl Inbound ===
1717

1818
impl Inbound<()> {
19-
pub fn serve<B, G, GSvc, P>(
19+
pub async fn serve<A, I, G, GSvc, P>(
2020
self,
21-
bind: B,
21+
addr: Local<ServerAddr>,
22+
listen: impl Stream<Item = io::Result<(A, I)>> + Send + Sync + 'static,
2223
profiles: P,
2324
gateway: G,
24-
) -> (Local<ServerAddr>, impl Future<Output = ()> + Send)
25-
where
26-
B: Bind<ServerConfig>,
27-
B::Addrs: svc::Param<Remote<ClientAddr>>
28-
+ svc::Param<Local<ServerAddr>>
29-
+ svc::Param<OrigDstAddr>,
25+
) where
26+
A: svc::Param<Remote<ClientAddr>> + svc::Param<OrigDstAddr> + Clone + Send + Sync + 'static,
27+
I: io::AsyncRead + io::AsyncWrite + io::Peek + io::PeerAddr,
28+
I: Debug + Unpin + Send + Sync + 'static,
3029
G: svc::NewService<direct::GatewayConnection, Service = GSvc>,
3130
G: Clone + Send + Sync + Unpin + 'static,
32-
GSvc: svc::Service<direct::GatewayIo<io::ScopedIo<B::Io>>, Response = ()> + Send + 'static,
31+
GSvc: svc::Service<direct::GatewayIo<io::ScopedIo<I>>, Response = ()> + Send + 'static,
3332
GSvc::Error: Into<Error>,
3433
GSvc::Future: Send,
3534
P: profiles::GetProfile<profiles::LookupAddr> + Clone + Send + Sync + Unpin + 'static,
3635
P::Error: Send,
3736
P::Future: Send,
3837
{
39-
let (Local(ServerAddr(la)), listen) = bind
40-
.bind(&self.config.proxy.server)
41-
.expect("Failed to bind inbound listener");
38+
let shutdown = self.runtime.drain.clone().signaled();
4239

43-
let serve = async move {
44-
let shutdown = self.runtime.drain.clone().signaled();
40+
// Handles connections to ports that can't be determined to be HTTP.
41+
let forward = self
42+
.clone()
43+
.into_tcp_connect(addr.port())
44+
.push_tcp_forward()
45+
.into_stack()
46+
.push_map_target(TcpEndpoint::from_param)
47+
.instrument(|_: &_| debug_span!("tcp"))
48+
.into_inner();
4549

46-
// Handles connections to ports that can't be determined to be HTTP.
47-
let forward = self
48-
.clone()
49-
.into_tcp_connect(la.port())
50-
.push_tcp_forward()
51-
.into_stack()
52-
.push_map_target(TcpEndpoint::from_param)
53-
.instrument(|_: &_| debug_span!("tcp"))
54-
.into_inner();
50+
// Handles connections that target the inbound proxy port.
51+
let direct = self
52+
.clone()
53+
.into_tcp_connect(addr.port())
54+
.push_tcp_forward()
55+
.map_stack(|_, _, s| s.push_map_target(TcpEndpoint::from_param))
56+
.push_direct(gateway)
57+
.into_stack()
58+
.instrument(|_: &_| debug_span!("direct"))
59+
.into_inner();
5560

56-
// Handles connections that target the inbound proxy port.
57-
let direct = self
58-
.clone()
59-
.into_tcp_connect(la.port())
60-
.push_tcp_forward()
61-
.map_stack(|_, _, s| s.push_map_target(TcpEndpoint::from_param))
62-
.push_direct(gateway)
63-
.into_stack()
64-
.instrument(|_: &_| debug_span!("direct"))
65-
.into_inner();
61+
// Handles HTTP connections.
62+
let http = self
63+
.into_tcp_connect(addr.port())
64+
.push_http_router(profiles)
65+
.push_http_server();
6666

67-
// Handles HTTP connections.
68-
let http = self
69-
.into_tcp_connect(la.port())
70-
.push_http_router(profiles)
71-
.push_http_server();
67+
// Determines how to handle an inbound connection, dispatching it to the appropriate
68+
// stack.
69+
let server = http
70+
.push_detect_http(forward.clone())
71+
.push_detect_tls(forward)
72+
.push_accept(addr.port(), direct)
73+
.into_inner();
7274

73-
// Determines how to handle an inbound connection, dispatching it to the appropriate
74-
// stack.
75-
let server = http
76-
.push_detect_http(forward.clone())
77-
.push_detect_tls(forward)
78-
.push_accept(la.port(), direct)
79-
.into_inner();
80-
81-
serve::serve(listen, server, shutdown).await
82-
};
83-
84-
(Local(ServerAddr(la)), serve)
75+
serve::serve(listen, server, shutdown).await;
8576
}
8677
}
8778

linkerd/app/outbound/src/lib.rs

Lines changed: 30 additions & 37 deletions
Original file line numberDiff line numberDiff line change
@@ -16,20 +16,21 @@ pub mod tcp;
1616
#[cfg(test)]
1717
pub(crate) mod test_util;
1818

19+
use futures::Stream;
1920
use linkerd_app_core::{
20-
config::{ProxyConfig, ServerConfig},
21-
metrics, profiles,
21+
config::ProxyConfig,
22+
io, metrics, profiles,
2223
proxy::{
2324
api_resolve::{ConcreteAddr, Metadata},
2425
core::Resolve,
2526
},
2627
serve,
2728
svc::{self, stack::Param},
2829
tls,
29-
transport::{self, addrs::*, listen::Bind},
30+
transport::{self, addrs::*},
3031
AddrMatch, Conditional, Error, ProxyRuntime,
3132
};
32-
use std::{collections::HashMap, future::Future, time::Duration};
33+
use std::{collections::HashMap, fmt::Debug, time::Duration};
3334
use tracing::info;
3435

3536
const EWMA_DEFAULT_RTT: Duration = Duration::from_millis(30);
@@ -119,15 +120,15 @@ impl<S> Outbound<S> {
119120
}
120121

121122
impl Outbound<()> {
122-
pub fn serve<B, P, R>(
123+
pub async fn serve<A, I, P, R>(
123124
self,
124-
bind: B,
125+
listen: impl Stream<Item = io::Result<(A, I)>> + Send + Sync + 'static,
125126
profiles: P,
126127
resolve: R,
127-
) -> (Local<ServerAddr>, impl Future<Output = ()>)
128-
where
129-
B: Bind<ServerConfig>,
130-
B::Addrs: Param<Remote<ClientAddr>> + Param<OrigDstAddr>,
128+
) where
129+
A: Param<Remote<ClientAddr>> + Param<OrigDstAddr> + Clone + Send + Sync + 'static,
130+
I: io::AsyncRead + io::AsyncWrite + io::Peek + io::PeerAddr,
131+
I: Debug + Unpin + Send + Sync + 'static,
131132
R: Clone + Send + Sync + Unpin + 'static,
132133
R: Resolve<ConcreteAddr, Endpoint = Metadata, Error = Error>,
133134
R::Resolution: Send,
@@ -136,33 +137,25 @@ impl Outbound<()> {
136137
P::Future: Send,
137138
P::Error: Send,
138139
{
139-
let (listen_addr, listen) = bind
140-
.bind(&self.config.proxy.server)
141-
.expect("Failed to bind outbound listener");
142-
143-
let serve = async move {
144-
if self.config.ingress_mode {
145-
info!("Outbound routing in ingress-mode");
146-
let stack = self
147-
.to_tcp_connect()
148-
.push_tcp_endpoint()
149-
.push_http_endpoint()
150-
.into_ingress(profiles, resolve);
151-
let shutdown = self.runtime.drain.signaled();
152-
serve::serve(listen, stack, shutdown).await;
153-
} else {
154-
let logical = self.to_tcp_connect().push_logical(resolve);
155-
let endpoint = self.to_tcp_connect().push_endpoint();
156-
let server = endpoint
157-
.push_switch_logical(logical.into_inner())
158-
.push_discover(profiles)
159-
.into_inner();
160-
let shutdown = self.runtime.drain.signaled();
161-
serve::serve(listen, server, shutdown).await;
162-
}
163-
};
164-
165-
(listen_addr, serve)
140+
if self.config.ingress_mode {
141+
info!("Outbound routing in ingress-mode");
142+
let stack = self
143+
.to_tcp_connect()
144+
.push_tcp_endpoint()
145+
.push_http_endpoint()
146+
.into_ingress(profiles, resolve);
147+
let shutdown = self.runtime.drain.signaled();
148+
serve::serve(listen, stack, shutdown).await;
149+
} else {
150+
let logical = self.to_tcp_connect().push_logical(resolve);
151+
let endpoint = self.to_tcp_connect().push_endpoint();
152+
let server = endpoint
153+
.push_switch_logical(logical.into_inner())
154+
.push_discover(profiles)
155+
.into_inner();
156+
let shutdown = self.runtime.drain.signaled();
157+
serve::serve(listen, server, shutdown).await;
158+
}
166159
}
167160
}
168161

linkerd/app/src/lib.rs

Lines changed: 37 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -29,8 +29,7 @@ use tokio::{
2929
sync::mpsc,
3030
time::{self, Duration},
3131
};
32-
use tracing::instrument::Instrument;
33-
use tracing::{debug, info, info_span};
32+
use tracing::{debug, info, info_span, Instrument};
3433

3534
/// Spawns a sidecar proxy.
3635
///
@@ -115,8 +114,10 @@ impl Config {
115114

116115
let dns = dns.build();
117116

117+
// Ensure that we've obtained a valid identity before binding any servers.
118118
let identity = info_span!("identity")
119119
.in_scope(|| identity.build(dns.resolver.clone(), metrics.control.clone()))?;
120+
120121
let report = identity.metrics().and_then(report);
121122

122123
let (drain_tx, drain_rx) = drain::channel();
@@ -190,14 +191,40 @@ impl Config {
190191
dst.resolve.clone(),
191192
);
192193

193-
let (inbound_addr, inbound_serve) =
194-
inbound.serve(bind_in, dst.profiles.clone(), gateway_stack);
195-
let (outbound_addr, outbound_serve) = outbound.serve(bind_out, dst.profiles, dst.resolve);
196-
197-
let start_proxy = Box::pin(async move {
198-
tokio::spawn(outbound_serve.instrument(info_span!("outbound")));
199-
tokio::spawn(inbound_serve.instrument(info_span!("inbound")));
200-
});
194+
// Bind the proxy sockets eagerly (so they're reserved and known) but defer building the
195+
// stacks until the proxy starts running.
196+
let (inbound_addr, inbound_listen) = bind_in
197+
.bind(&inbound.config().proxy.server)
198+
.expect("Failed to bind inbound listener");
199+
200+
let (outbound_addr, outbound_listen) = bind_out
201+
.bind(&outbound.config().proxy.server)
202+
.expect("Failed to bind outbound listener");
203+
204+
// Build a task that initializes and runs the proxy stacks.
205+
let start_proxy = {
206+
let inbound_addr = inbound_addr;
207+
let profiles = dst.profiles;
208+
let resolve = dst.resolve;
209+
210+
Box::pin(async move {
211+
// TODO(ver): Block on identity.
212+
213+
tokio::spawn(
214+
outbound
215+
.serve(outbound_listen, profiles.clone(), resolve)
216+
.instrument(info_span!("outbound")),
217+
);
218+
219+
// TODO(ver): Block on policy.
220+
221+
tokio::spawn(
222+
inbound
223+
.serve(inbound_addr, inbound_listen, profiles, gateway_stack)
224+
.instrument(info_span!("inbound")),
225+
);
226+
})
227+
};
201228

202229
Ok(App {
203230
admin,

0 commit comments

Comments
 (0)