Skip to content

Commit 51cc0b1

Browse files
authored
Simplify proxy::Server as ServeHttp (#608)
The `proxy::Server` type used to handle all protocol detection, but now it really only handles initializing HTTP services. This change extracts server-transport instrumentation into a distinct stack layer to decouple this logic from the server implementation. The server also now requires that response bodies are boxed to eliminate the need for a PhantomData.
1 parent 9d351b6 commit 51cc0b1

File tree

6 files changed

+165
-181
lines changed

6 files changed

+165
-181
lines changed

linkerd/app/core/src/proxy/mod.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -16,4 +16,4 @@ pub use linkerd2_proxy_tcp as tcp;
1616

1717
pub mod server;
1818

19-
pub use self::server::Server;
19+
pub use self::server::ServeHttp;
Lines changed: 78 additions & 147 deletions
Original file line numberDiff line numberDiff line change
@@ -1,13 +1,11 @@
11
use crate::proxy::http::{
2+
self,
23
glue::{Body, HyperServerSvc},
34
h2::Settings as H2Settings,
45
trace, upgrade, Version as HttpVersion,
56
};
67
use crate::transport::{
7-
self,
88
io::{self, BoxedIo, Peekable},
9-
labels::Key as TransportKey,
10-
metrics::TransportLabels,
119
tls,
1210
};
1311
use crate::{
@@ -18,7 +16,6 @@ use crate::{
1816
};
1917
use async_trait::async_trait;
2018
use futures::TryFutureExt;
21-
use http;
2219
use hyper;
2320
use indexmap::IndexSet;
2421
use std::future::Future;
@@ -29,133 +26,102 @@ use tracing::{info_span, trace};
2926
use tracing_futures::Instrument;
3027

3128
#[derive(Clone, Debug)]
32-
pub struct Protocol {
29+
pub struct Protocol<T> {
3330
pub http: Option<HttpVersion>,
34-
pub tls: tls::accept::Meta,
31+
pub target: T,
3532
}
3633

37-
pub type Connection = (Protocol, BoxedIo);
38-
3934
#[derive(Clone, Debug)]
40-
pub struct ProtocolDetect {
35+
pub struct DetectHttp {
4136
capacity: usize,
4237
skip_ports: Arc<IndexSet<u16>>,
4338
}
4439

45-
impl ProtocolDetect {
40+
impl DetectHttp {
4641
const PEEK_CAPACITY: usize = 8192;
4742

4843
pub fn new(skip_ports: Arc<IndexSet<u16>>) -> Self {
49-
ProtocolDetect {
44+
DetectHttp {
5045
skip_ports,
5146
capacity: Self::PEEK_CAPACITY,
5247
}
5348
}
5449
}
5550

5651
#[async_trait]
57-
impl detect::Detect<tls::accept::Meta, BoxedIo> for ProtocolDetect {
58-
type Target = Protocol;
52+
impl detect::Detect<tls::accept::Meta, BoxedIo> for DetectHttp {
53+
type Target = Protocol<tls::accept::Meta>;
5954
type Io = BoxedIo;
6055
type Error = io::Error;
6156

6257
async fn detect(
6358
&self,
64-
tls: tls::accept::Meta,
59+
target: tls::accept::Meta,
6560
io: BoxedIo,
6661
) -> Result<(Self::Target, BoxedIo), Self::Error> {
67-
let port = tls.addrs.target_addr().port();
62+
let port = target.addrs.target_addr().port();
6863

6964
// Skip detection if the port is in the configured set.
7065
if self.skip_ports.contains(&port) {
71-
let proto = Protocol { tls, http: None };
66+
let proto = Protocol { target, http: None };
7267
return Ok::<_, Self::Error>((proto, io));
7368
}
7469

7570
// Otherwise, attempt to peek the client connection to determine the protocol.
7671
// Currently, we only check for an HTTP prefix.
7772
let peek = io.peek(self.capacity).await?;
7873
let http = HttpVersion::from_prefix(peek.prefix().as_ref());
79-
let proto = Protocol { tls, http };
74+
let proto = Protocol { target, http };
8075
Ok((proto, BoxedIo::new(peek)))
8176
}
8277
}
8378

84-
/// A protocol-transparent Server!
85-
///
86-
/// As TCP streams are passed to `Server::serve`, the following occurs:
87-
///
88-
/// * A `Source` is created to describe the accepted connection.
79+
/// Accepts HTTP connections.
8980
///
90-
/// * If the original destination address's port is not specified in
91-
/// `disable_protocol_detection_ports`, then data received on the connection is
92-
/// buffered until the server can determine whether the streams begins with a
93-
/// HTTP/1 or HTTP/2 preamble.
81+
/// The server accepts TCP connections with their detected protocol. If the
82+
/// protocol is known to be HTTP, a server is built with a new HTTP service
83+
/// (built using the `H`-typed NewService).
9484
///
95-
/// * If the stream is not determined to be HTTP, then the original destination
96-
/// address is used to transparently forward the TCP stream. A `C`-typed
97-
/// `Connect` `Stack` is used to build a connection to the destination (i.e.,
98-
/// instrumented with telemetry, etc).
99-
///
100-
/// * Otherwise, an `H`-typed `Service` is used to build a service that
101-
/// can route HTTP requests for the `tls::accept::Meta`.
102-
pub struct Server<L, F, H, B>
103-
where
104-
H: NewService<tls::accept::Meta>,
105-
H::Service: Service<http::Request<Body>, Response = http::Response<B>>,
106-
{
85+
/// Otherwise, the `F` type forwarding service is used to handle the TCP
86+
/// connection.
87+
#[derive(Clone, Debug)]
88+
pub struct ServeHttp<F, H> {
10789
http: hyper::server::conn::Http<trace::Executor>,
108-
h2_settings: H2Settings,
109-
transport_labels: L,
110-
transport_metrics: transport::Metrics,
11190
forward_tcp: F,
11291
make_http: H,
11392
drain: drain::Watch,
11493
}
11594

116-
impl<L, F, H, B> Server<L, F, H, B>
117-
where
118-
L: TransportLabels<Protocol, Labels = TransportKey>,
119-
H: NewService<tls::accept::Meta>,
120-
H::Service: Service<http::Request<Body>, Response = http::Response<B>>,
121-
Self: Accept<Connection>,
122-
{
123-
/// Creates a new `Server`.
124-
pub fn new(
125-
transport_labels: L,
126-
transport_metrics: transport::Metrics,
127-
forward_tcp: F,
128-
make_http: H,
129-
h2_settings: H2Settings,
130-
drain: drain::Watch,
131-
) -> Self {
95+
impl<F, H> ServeHttp<F, H> {
96+
/// Creates a new `ServeHttp`.
97+
pub fn new(make_http: H, h2: H2Settings, forward_tcp: F, drain: drain::Watch) -> Self {
98+
let mut http = hyper::server::conn::Http::new().with_executor(trace::Executor::new());
99+
100+
http.http2_initial_stream_window_size(h2.initial_stream_window_size)
101+
.http2_initial_connection_window_size(h2.initial_connection_window_size);
102+
132103
Self {
133-
http: hyper::server::conn::Http::new().with_executor(trace::Executor::new()),
134-
h2_settings,
135-
transport_labels,
136-
transport_metrics,
104+
http,
137105
forward_tcp,
138106
make_http,
139107
drain,
140108
}
141109
}
142110
}
143111

144-
impl<L, F, H, B> Service<Connection> for Server<L, F, H, B>
112+
impl<T, I, F, H, S> Service<(Protocol<T>, I)> for ServeHttp<F, H>
145113
where
146-
L: TransportLabels<Protocol, Labels = TransportKey>,
147-
F: Accept<(tls::accept::Meta, transport::metrics::SensorIo<BoxedIo>)> + Clone + Send + 'static,
114+
T: Send + 'static,
115+
I: io::AsyncRead + io::AsyncWrite + Send + Unpin + 'static,
116+
F: Accept<(T, I)> + Clone + Send + 'static,
148117
F::Future: Send + 'static,
149118
F::ConnectionFuture: Send + 'static,
150-
H: NewService<tls::accept::Meta> + Send + 'static,
151-
H::Service: Service<http::Request<Body>, Response = http::Response<B>, Error = Error>
119+
H: NewService<T, Service = S> + Clone + Send + 'static,
120+
S: Service<http::Request<Body>, Response = http::Response<http::boxed::Payload>, Error = Error>
152121
+ Unpin
153122
+ Send
154123
+ 'static,
155-
<H::Service as Service<http::Request<Body>>>::Future: Send + 'static,
156-
B: hyper::body::HttpBody + Default + Send + 'static,
157-
B::Error: Into<Error>,
158-
B::Data: Send + 'static,
124+
S::Future: Send + 'static,
159125
{
160126
type Response = Pin<Box<dyn Future<Output = Result<(), Error>> + Send + 'static>>;
161127
type Error = Error;
@@ -166,104 +132,69 @@ where
166132
Poll::Ready(Ok(().into()))
167133
}
168134

169-
/// Handle a new connection.
170-
///
171-
/// This will peek on the connection for the first bytes to determine
172-
/// what protocol the connection is speaking. From there, the connection
173-
/// will be mapped into respective services, and spawned into an
174-
/// executor.
175-
fn call(&mut self, (proto, io): Connection) -> Self::Future {
176-
// TODO move this into a distinct Accept?
177-
let io = {
178-
let labels = self.transport_labels.transport_labels(&proto);
179-
self.transport_metrics.wrap_server_transport(labels, io)
180-
};
181-
135+
fn call(&mut self, (protocol, io): (Protocol<T>, I)) -> Self::Future {
182136
let drain = self.drain.clone();
183-
let http_version = match proto.http {
184-
Some(http) => http,
185-
None => {
186-
trace!("did not detect protocol; forwarding TCP");
187-
188-
let accept = self
189-
.forward_tcp
190-
.clone()
191-
.into_service()
192-
.oneshot((proto.tls, io));
193-
let fwd = async move {
194-
let conn = accept.await.map_err(Into::into)?;
195-
Ok(Box::pin(
196-
drain
197-
.ignore_signal()
198-
.release_after(conn)
199-
.map_err(Into::into),
200-
) as Self::Response)
201-
};
137+
let forward_tcp = self.forward_tcp.clone();
138+
let make_http = self.make_http.clone();
139+
let mut http = self.http.clone();
202140

203-
return Box::pin(fwd);
204-
}
205-
};
206-
207-
let http_svc = self.make_http.new_service(proto.tls);
208-
209-
let mut builder = self.http.clone();
210-
let initial_stream_window_size = self.h2_settings.initial_stream_window_size;
211-
let initial_conn_window_size = self.h2_settings.initial_connection_window_size;
212141
Box::pin(async move {
213-
match http_version {
214-
HttpVersion::Http1 => {
142+
let rsp: Self::Response = match protocol.http {
143+
Some(HttpVersion::Http1) => {
144+
trace!("Handling as HTTP");
215145
// Enable support for HTTP upgrades (CONNECT and websockets).
216-
let svc = upgrade::Service::new(http_svc, drain.clone());
217-
let conn = builder
146+
let svc = upgrade::Service::new(
147+
make_http.new_service(protocol.target),
148+
drain.clone(),
149+
);
150+
let conn = http
218151
.http1_only(true)
219152
.serve_connection(io, HyperServerSvc::new(svc))
220153
.with_upgrades();
221154

222-
Ok(Box::pin(async move {
155+
Box::pin(async move {
223156
drain
224157
.watch(conn, |conn| Pin::new(conn).graceful_shutdown())
225158
.instrument(info_span!("h1"))
226159
.await?;
227160
Ok(())
228-
}) as Self::Response)
161+
})
229162
}
230163

231-
HttpVersion::H2 => {
232-
let conn = builder
233-
.http2_only(true)
234-
.http2_initial_stream_window_size(initial_stream_window_size)
235-
.http2_initial_connection_window_size(initial_conn_window_size)
236-
.serve_connection(io, HyperServerSvc::new(http_svc));
237-
Ok(Box::pin(async move {
164+
Some(HttpVersion::H2) => {
165+
trace!("Handling as H2");
166+
let conn = http.http2_only(true).serve_connection(
167+
io,
168+
HyperServerSvc::new(make_http.new_service(protocol.target)),
169+
);
170+
171+
Box::pin(async move {
238172
drain
239173
.watch(conn, |conn| Pin::new(conn).graceful_shutdown())
240174
.instrument(info_span!("h2"))
241175
.await?;
242176
Ok(())
243-
}) as Self::Response)
177+
})
244178
}
245-
}
246-
})
247-
}
248-
}
249179

250-
impl<L, F, H, B> Clone for Server<L, F, H, B>
251-
where
252-
L: TransportLabels<Protocol, Labels = TransportKey> + Clone,
253-
F: Clone,
254-
H: NewService<tls::accept::Meta> + Clone,
255-
H::Service: Service<http::Request<Body>, Response = http::Response<B>>,
256-
B: hyper::body::HttpBody,
257-
{
258-
fn clone(&self) -> Self {
259-
Self {
260-
http: self.http.clone(),
261-
h2_settings: self.h2_settings.clone(),
262-
transport_labels: self.transport_labels.clone(),
263-
transport_metrics: self.transport_metrics.clone(),
264-
forward_tcp: self.forward_tcp.clone(),
265-
make_http: self.make_http.clone(),
266-
drain: self.drain.clone(),
267-
}
180+
None => {
181+
trace!("Forwarding TCP");
182+
let duplex = forward_tcp
183+
.into_service()
184+
.oneshot((protocol.target, io))
185+
.await
186+
.map_err(Into::into)?;
187+
188+
Box::pin(
189+
drain
190+
.ignore_signal()
191+
.release_after(duplex)
192+
.map_err(Into::into),
193+
)
194+
}
195+
};
196+
197+
Ok(rsp)
198+
})
268199
}
269200
}

0 commit comments

Comments
 (0)