Skip to content

Commit 467e979

Browse files
authored
Avoid boxing I/O types for optional TLS (#817)
Our tls client and server currently box the underlying I/O types so that TcpStreams and TlsStreams can be returned and used interchangeably. This change introduces a new `EitherIo` type that implements `AsyncRead`, `AsyncWrite`, and `PeerAddr` to replace uses of `BoxedIo` in the data path.
1 parent 68aacb8 commit 467e979

File tree

11 files changed

+151
-45
lines changed

11 files changed

+151
-45
lines changed

linkerd/app/core/src/admin/mod.rs

Lines changed: 8 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -36,7 +36,7 @@ pub struct Admin<M> {
3636
pub struct Accept<M>(Admin<M>, hyper::server::conn::Http);
3737

3838
#[derive(Clone)]
39-
pub struct Serve<M: FmtMetrics>(tls::accept::Meta, Accept<M>);
39+
pub struct Serve<M>(tls::accept::Meta, Accept<M>);
4040

4141
pub type ResponseFuture =
4242
Pin<Box<dyn Future<Output = Result<Response<Body>, Never>> + Send + 'static>>;
@@ -199,15 +199,19 @@ impl<M: FmtMetrics> tower::Service<http::Request<Body>> for Admin<M> {
199199
}
200200
}
201201

202-
impl<M: FmtMetrics + Clone + Send + 'static> svc::NewService<tls::accept::Meta> for Accept<M> {
202+
impl<M: Clone> svc::NewService<tls::accept::Meta> for Accept<M> {
203203
type Service = Serve<M>;
204204

205205
fn new_service(&mut self, meta: tls::accept::Meta) -> Self::Service {
206206
Serve(meta, self.clone())
207207
}
208208
}
209209

210-
impl<M: FmtMetrics + Clone + Send + 'static> svc::Service<io::BoxedIo> for Serve<M> {
210+
impl<I, M> svc::Service<I> for Serve<M>
211+
where
212+
I: io::AsyncRead + io::AsyncWrite + Send + Unpin + 'static,
213+
M: FmtMetrics + Clone + Send + 'static,
214+
{
211215
type Response = ();
212216
type Error = Error;
213217
type Future = Pin<Box<dyn Future<Output = Result<(), Error>> + Send + 'static>>;
@@ -216,7 +220,7 @@ impl<M: FmtMetrics + Clone + Send + 'static> svc::Service<io::BoxedIo> for Serve
216220
Poll::Ready(Ok(()))
217221
}
218222

219-
fn call(&mut self, io: io::BoxedIo) -> Self::Future {
223+
fn call(&mut self, io: I) -> Self::Future {
220224
let Self(ref meta, Accept(ref svc, ref server)) = self;
221225

222226
// Since the `/proxy-log-level` controls access based on the

linkerd/app/inbound/src/lib.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -396,7 +396,7 @@ impl Config {
396396
> + Clone
397397
where
398398
D: svc::NewService<TcpAccept, Service = DSvc> + Clone + Send + 'static,
399-
DSvc: svc::Service<SensorIo<io::BoxedIo>, Response = ()> + Send + 'static,
399+
DSvc: svc::Service<SensorIo<tls::accept::Io>, Response = ()> + Send + 'static,
400400
DSvc::Error: Into<Error>,
401401
DSvc::Future: Send,
402402
F: svc::NewService<TcpEndpoint, Service = FSvc> + Clone + 'static,

linkerd/app/src/tap.rs

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -5,7 +5,7 @@ use linkerd2_app_core::{
55
drain,
66
proxy::{identity, tap},
77
serve,
8-
transport::{io, tls},
8+
transport::tls,
99
Error,
1010
};
1111
use std::{net::SocketAddr, pin::Pin};
@@ -55,7 +55,7 @@ impl Config {
5555
identity,
5656
move |meta: tls::accept::Meta| {
5757
let service = service.clone();
58-
service_fn(move |io: io::BoxedIo| {
58+
service_fn(move |io| {
5959
let fut = service.clone().oneshot((meta.clone(), io));
6060
Box::pin(async move {
6161
fut.err_into::<Error>().await?.err_into::<Error>().await

linkerd/io/src/either.rs

Lines changed: 76 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,76 @@
1+
use super::{AsyncRead, AsyncWrite, IoSlice, PeerAddr, Poll, ReadBuf, Result};
2+
use pin_project::pin_project;
3+
use std::{pin::Pin, task::Context};
4+
5+
#[pin_project(project = EitherIoProj)]
6+
#[derive(Debug)]
7+
pub enum EitherIo<L, R> {
8+
Left(#[pin] L),
9+
Right(#[pin] R),
10+
}
11+
12+
impl<L: PeerAddr, R: PeerAddr> PeerAddr for EitherIo<L, R> {
13+
#[inline]
14+
fn peer_addr(&self) -> Result<std::net::SocketAddr> {
15+
match self {
16+
Self::Left(l) => l.peer_addr(),
17+
Self::Right(r) => r.peer_addr(),
18+
}
19+
}
20+
}
21+
22+
impl<L: AsyncRead, R: AsyncRead> AsyncRead for EitherIo<L, R> {
23+
#[inline]
24+
fn poll_read(self: Pin<&mut Self>, cx: &mut Context, buf: &mut ReadBuf<'_>) -> Poll<()> {
25+
match self.project() {
26+
EitherIoProj::Left(l) => l.poll_read(cx, buf),
27+
EitherIoProj::Right(r) => r.poll_read(cx, buf),
28+
}
29+
}
30+
}
31+
32+
impl<L: AsyncWrite, R: AsyncWrite> AsyncWrite for EitherIo<L, R> {
33+
#[inline]
34+
fn poll_shutdown(self: Pin<&mut Self>, cx: &mut Context) -> Poll<()> {
35+
match self.project() {
36+
EitherIoProj::Left(l) => l.poll_shutdown(cx),
37+
EitherIoProj::Right(r) => r.poll_shutdown(cx),
38+
}
39+
}
40+
41+
#[inline]
42+
fn poll_flush(self: Pin<&mut Self>, cx: &mut Context) -> Poll<()> {
43+
match self.project() {
44+
EitherIoProj::Left(l) => l.poll_flush(cx),
45+
EitherIoProj::Right(r) => r.poll_flush(cx),
46+
}
47+
}
48+
49+
#[inline]
50+
fn poll_write(self: Pin<&mut Self>, cx: &mut Context, buf: &[u8]) -> Poll<usize> {
51+
match self.project() {
52+
EitherIoProj::Left(l) => l.poll_write(cx, buf),
53+
EitherIoProj::Right(r) => r.poll_write(cx, buf),
54+
}
55+
}
56+
57+
#[inline]
58+
fn poll_write_vectored(
59+
self: Pin<&mut Self>,
60+
cx: &mut Context,
61+
buf: &[IoSlice<'_>],
62+
) -> Poll<usize> {
63+
match self.project() {
64+
EitherIoProj::Left(l) => l.poll_write_vectored(cx, buf),
65+
EitherIoProj::Right(r) => r.poll_write_vectored(cx, buf),
66+
}
67+
}
68+
69+
#[inline]
70+
fn is_write_vectored(&self) -> bool {
71+
match self {
72+
EitherIo::Left(l) => l.is_write_vectored(),
73+
EitherIo::Right(r) => r.is_write_vectored(),
74+
}
75+
}
76+
}

linkerd/io/src/lib.rs

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,9 +1,11 @@
11
mod boxed;
2+
mod either;
23
mod prefixed;
34
mod sensor;
45

56
pub use self::{
67
boxed::BoxedIo,
8+
either::EitherIo,
79
prefixed::PrefixedIo,
810
sensor::{Sensor, SensorIo},
911
};

linkerd/io/src/prefixed.rs

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -33,6 +33,7 @@ impl<S> From<S> for PrefixedIo<S> {
3333
}
3434

3535
impl<S: PeerAddr> PeerAddr for PrefixedIo<S> {
36+
#[inline]
3637
fn peer_addr(&self) -> Result<std::net::SocketAddr> {
3738
self.io.peer_addr()
3839
}
@@ -63,28 +64,34 @@ impl<S: AsyncRead> AsyncRead for PrefixedIo<S> {
6364
}
6465

6566
impl<S: io::Write> io::Write for PrefixedIo<S> {
67+
#[inline]
6668
fn write(&mut self, buf: &[u8]) -> Result<usize> {
6769
self.io.write(buf)
6870
}
6971

72+
#[inline]
7073
fn flush(&mut self) -> Result<()> {
7174
self.io.flush()
7275
}
7376
}
7477

7578
impl<S: AsyncWrite> AsyncWrite for PrefixedIo<S> {
79+
#[inline]
7680
fn poll_shutdown(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<()> {
7781
self.project().io.poll_shutdown(cx)
7882
}
7983

84+
#[inline]
8085
fn poll_flush(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<()> {
8186
self.project().io.poll_flush(cx)
8287
}
8388

89+
#[inline]
8490
fn poll_write(self: Pin<&mut Self>, cx: &mut Context<'_>, buf: &[u8]) -> Poll<usize> {
8591
self.project().io.poll_write(cx, buf)
8692
}
8793

94+
#[inline]
8895
fn poll_write_vectored(
8996
self: Pin<&mut Self>,
9097
cx: &mut Context<'_>,
@@ -93,6 +100,7 @@ impl<S: AsyncWrite> AsyncWrite for PrefixedIo<S> {
93100
self.project().io.poll_write_vectored(cx, bufs)
94101
}
95102

103+
#[inline]
96104
fn is_write_vectored(&self) -> bool {
97105
self.io.is_write_vectored()
98106
}

linkerd/proxy/tap/src/accept.rs

Lines changed: 14 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -5,8 +5,10 @@ use linkerd2_error::Error;
55
use linkerd2_identity as identity;
66
use linkerd2_proxy_api::tap::tap_server::{Tap, TapServer};
77
use linkerd2_proxy_http::{trace, HyperServerSvc};
8-
use linkerd2_proxy_transport::io::BoxedIo;
9-
use linkerd2_proxy_transport::tls::{accept::Connection, Conditional, ReasonForNoPeerName};
8+
use linkerd2_proxy_transport::{
9+
io,
10+
tls::{accept::Connection, Conditional, ReasonForNoPeerName},
11+
};
1012
use std::future::Future;
1113
use std::pin::Pin;
1214
use std::sync::Arc;
@@ -29,8 +31,9 @@ impl AcceptPermittedClients {
2931
}
3032
}
3133

32-
fn serve<T>(&self, io: BoxedIo, tap: T) -> ServeFuture
34+
fn serve<I, T>(&self, io: I, tap: T) -> ServeFuture
3335
where
36+
I: io::AsyncRead + io::AsyncWrite + Send + Unpin + 'static,
3437
T: Tap + Send + 'static,
3538
T::ObserveStream: Send + 'static,
3639
{
@@ -45,11 +48,17 @@ impl AcceptPermittedClients {
4548
})
4649
}
4750

48-
fn serve_authenticated(&self, io: BoxedIo) -> ServeFuture {
51+
fn serve_authenticated<I>(&self, io: I) -> ServeFuture
52+
where
53+
I: io::AsyncRead + io::AsyncWrite + Send + Unpin + 'static,
54+
{
4955
self.serve(io, self.server.clone())
5056
}
5157

52-
fn serve_unauthenticated(&self, io: BoxedIo, msg: impl Into<String>) -> ServeFuture {
58+
fn serve_unauthenticated<I>(&self, io: I, msg: impl Into<String>) -> ServeFuture
59+
where
60+
I: io::AsyncRead + io::AsyncWrite + Send + Unpin + 'static,
61+
{
5362
self.serve(io, unauthenticated::new(msg))
5463
}
5564
}

linkerd/proxy/transport/src/lib.rs

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -14,7 +14,6 @@ pub mod tls;
1414
pub use self::{
1515
connect::ConnectTcp,
1616
detect::{Detect, DetectService, NewDetectService},
17-
io::BoxedIo,
1817
listen::{BindTcp, DefaultOrigDstAddr, NoOrigDstAddr, OrigDstAddr},
1918
};
2019

0 commit comments

Comments
 (0)