Skip to content

Commit d5e523c

Browse files
authored
internal: Decouple TCP forwarding from protocol dispatch (#389)
In preparation of adding discovery logic to outbound TCP forwarding, this change extracts the TCP forwarding logic as an Accept so that the outbound proxy will be able to provide an alternate implementation.
1 parent 0ca3bfe commit d5e523c

File tree

12 files changed

+189
-116
lines changed

12 files changed

+189
-116
lines changed

Cargo.lock

Lines changed: 13 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -563,6 +563,7 @@ dependencies = [
563563
"linkerd2-proxy-identity 0.1.0",
564564
"linkerd2-proxy-resolve 0.1.0",
565565
"linkerd2-proxy-tap 0.1.0",
566+
"linkerd2-proxy-tcp 0.1.0",
566567
"linkerd2-proxy-transport 0.1.0",
567568
"linkerd2-reconnect 0.1.0",
568569
"linkerd2-request-filter 0.1.0",
@@ -951,6 +952,18 @@ dependencies = [
951952
"tracing-futures 0.1.0 (registry+https://github.com/rust-lang/crates.io-index)",
952953
]
953954

955+
[[package]]
956+
name = "linkerd2-proxy-tcp"
957+
version = "0.1.0"
958+
dependencies = [
959+
"futures 0.1.26 (registry+https://github.com/rust-lang/crates.io-index)",
960+
"linkerd2-duplex 0.1.0",
961+
"linkerd2-error 0.1.0",
962+
"tokio 0.1.20 (registry+https://github.com/rust-lang/crates.io-index)",
963+
"tower 0.1.1 (registry+https://github.com/rust-lang/crates.io-index)",
964+
"tower-load 0.1.0 (git+https://github.com/tower-rs/tower)",
965+
]
966+
954967
[[package]]
955968
name = "linkerd2-proxy-transport"
956969
version = "0.1.0"

Cargo.toml

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -27,6 +27,7 @@ members = [
2727
"linkerd/proxy/identity",
2828
"linkerd/proxy/resolve",
2929
"linkerd/proxy/tap",
30+
"linkerd/proxy/tcp",
3031
"linkerd/proxy/transport",
3132
"linkerd/request-filter",
3233
"linkerd/reconnect",

linkerd/app/core/Cargo.toml

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -36,6 +36,7 @@ linkerd2-proxy-identity = { path = "../../proxy/identity" }
3636
linkerd2-proxy-http = { path = "../../proxy/http" }
3737
linkerd2-proxy-resolve = { path = "../../proxy/resolve" }
3838
linkerd2-proxy-tap = { path = "../../proxy/tap" }
39+
linkerd2-proxy-tcp = { path = "../../proxy/tcp" }
3940
linkerd2-proxy-transport = { path = "../../proxy/transport" }
4041
linkerd2-reconnect = { path = "../../reconnect" }
4142
linkerd2-request-filter = { path = "../../request-filter" }

linkerd/app/core/src/proxy/mod.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -8,10 +8,10 @@ pub use linkerd2_proxy_http::{self as http, grpc};
88
pub use linkerd2_proxy_identity as identity;
99
pub use linkerd2_proxy_resolve as resolve;
1010
pub use linkerd2_proxy_tap as tap;
11+
pub use linkerd2_proxy_tcp as tcp;
1112

1213
pub mod buffer;
1314
pub mod pending;
1415
pub mod server;
15-
mod tcp;
1616

1717
pub use self::server::Server;

linkerd/app/core/src/proxy/server.rs

Lines changed: 32 additions & 77 deletions
Original file line numberDiff line numberDiff line change
@@ -1,25 +1,23 @@
1-
use crate::proxy::http::{
2-
glue::{HttpBody, HyperServerSvc},
3-
upgrade, Version as HttpVersion,
1+
use crate::{
2+
drain,
3+
proxy::{
4+
core::Accept,
5+
detect,
6+
http::{
7+
glue::{HttpBody, HyperServerSvc},
8+
h2::Settings as H2Settings,
9+
upgrade, Version as HttpVersion,
10+
},
11+
},
12+
svc::{MakeService, Service, ServiceExt},
13+
transport::{self, io::BoxedIo, labels::Key as TransportKey, metrics::TransportLabels, tls},
14+
Error, Never,
415
};
5-
use crate::proxy::{detect, tcp};
6-
use crate::svc::{MakeService, Service};
7-
use crate::transport::{
8-
self, io::BoxedIo, labels::Key as TransportKey, metrics::TransportLabels, tls,
9-
};
10-
use futures::future::{self, Either};
11-
use futures::{Future, Poll};
16+
use futures::{future::Either, Future, Poll};
1217
use http;
1318
use hyper;
1419
use indexmap::IndexSet;
15-
use linkerd2_drain as drain;
16-
use linkerd2_error::{Error, Never};
17-
use linkerd2_proxy_core::listen::Accept;
18-
use linkerd2_proxy_http::h2::Settings as H2Settings;
19-
use std::net::SocketAddr;
2020
use std::sync::Arc;
21-
use std::{error, fmt};
22-
use tokio::io::{AsyncRead, AsyncWrite};
2321
use tracing::{info_span, trace};
2422
use tracing_futures::Instrument;
2523

@@ -77,7 +75,7 @@ impl detect::Detect<tls::accept::Meta> for ProtocolDetect {
7775
///
7876
/// * Otherwise, an `H`-typed `Service` is used to build a service that
7977
/// can route HTTP requests for the `tls::accept::Meta`.
80-
pub struct Server<L, C, H, B>
78+
pub struct Server<L, F, H, B>
8179
where
8280
// Used when forwarding a TCP stream (e.g. with telemetry, timeouts).
8381
L: TransportLabels<Protocol, Labels = TransportKey>,
@@ -94,57 +92,12 @@ where
9492
h2_settings: H2Settings,
9593
transport_labels: L,
9694
transport_metrics: transport::MetricsRegistry,
97-
connect: ForwardConnect<C>,
95+
forward_tcp: F,
9896
make_http: H,
9997
drain: drain::Watch,
10098
}
10199

102-
/// Establishes connections for forwarded connections.
103-
///
104-
/// Fails to produce a `Connect` if this would connect to the listener that
105-
/// already accepted this.
106-
#[derive(Clone, Debug)]
107-
struct ForwardConnect<C>(C);
108-
109-
/// An error indicating an accepted socket did not have an SO_ORIGINAL_DST
110-
/// address and therefore could not be forwarded.
111-
#[derive(Clone, Debug)]
112-
pub struct NoForwardTarget;
113-
114-
impl<C> Service<tls::accept::Meta> for ForwardConnect<C>
115-
where
116-
C: Service<SocketAddr>,
117-
C::Error: Into<Error>,
118-
{
119-
type Response = C::Response;
120-
type Error = Error;
121-
type Future = future::Either<
122-
future::FutureResult<C::Response, Error>,
123-
future::MapErr<C::Future, fn(C::Error) -> Error>,
124-
>;
125-
126-
fn poll_ready(&mut self) -> Poll<(), Self::Error> {
127-
self.0.poll_ready().map_err(Into::into)
128-
}
129-
130-
fn call(&mut self, meta: tls::accept::Meta) -> Self::Future {
131-
if meta.addrs.target_addr_is_local() {
132-
return future::Either::A(future::err(NoForwardTarget.into()));
133-
}
134-
135-
future::Either::B(self.0.call(meta.addrs.target_addr()).map_err(Into::into))
136-
}
137-
}
138-
139-
impl error::Error for NoForwardTarget {}
140-
141-
impl fmt::Display for NoForwardTarget {
142-
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
143-
write!(f, "Could not forward to a target address")
144-
}
145-
}
146-
147-
impl<L, C, H, B> Server<L, C, H, B>
100+
impl<L, F, H, B> Server<L, F, H, B>
148101
where
149102
L: TransportLabels<Protocol, Labels = TransportKey>,
150103
H: MakeService<
@@ -160,7 +113,7 @@ where
160113
pub fn new(
161114
transport_labels: L,
162115
transport_metrics: transport::MetricsRegistry,
163-
connect: C,
116+
forward_tcp: F,
164117
make_http: H,
165118
h2_settings: H2Settings,
166119
drain: drain::Watch,
@@ -173,21 +126,19 @@ where
173126
h2_settings,
174127
transport_labels,
175128
transport_metrics,
176-
connect: ForwardConnect(connect),
129+
forward_tcp,
177130
make_http,
178131
drain,
179132
},
180133
)
181134
}
182135
}
183136

184-
impl<L, C, H, B> Service<Connection> for Server<L, C, H, B>
137+
impl<L, F, H, B> Service<Connection> for Server<L, F, H, B>
185138
where
186139
L: TransportLabels<Protocol, Labels = TransportKey>,
187-
C: Service<SocketAddr> + Clone + Send + 'static,
188-
C::Response: AsyncRead + AsyncWrite + Send + 'static,
189-
C::Future: Send + 'static,
190-
C::Error: Into<Error>,
140+
F: Accept<(tls::accept::Meta, transport::metrics::Io<BoxedIo>)> + Clone + Send + 'static,
141+
F::Future: Send + 'static,
191142
H: MakeService<
192143
tls::accept::Meta,
193144
http::Request<HttpBody>,
@@ -228,8 +179,12 @@ where
228179
Some(http) => http,
229180
None => {
230181
trace!("did not detect protocol; forwarding TCP");
231-
let fwd = tcp::forward(io, self.connect.clone(), proto.tls);
232-
return Box::new(drain.watch(fwd, |_| {}));
182+
let fwd = self
183+
.forward_tcp
184+
.clone()
185+
.into_service()
186+
.oneshot((proto.tls, io));
187+
return Box::new(drain.watch(fwd.map_err(Into::into), |_| {}));
233188
}
234189
};
235190

@@ -279,10 +234,10 @@ where
279234
}
280235
}
281236

282-
impl<L, C, H, B> Clone for Server<L, C, H, B>
237+
impl<L, F, H, B> Clone for Server<L, F, H, B>
283238
where
284239
L: TransportLabels<Protocol, Labels = TransportKey> + Clone,
285-
C: Clone,
240+
F: Clone,
286241
H: MakeService<
287242
tls::accept::Meta,
288243
http::Request<HttpBody>,
@@ -297,7 +252,7 @@ where
297252
h2_settings: self.h2_settings.clone(),
298253
transport_labels: self.transport_labels.clone(),
299254
transport_metrics: self.transport_metrics.clone(),
300-
connect: self.connect.clone(),
255+
forward_tcp: self.forward_tcp.clone(),
301256
make_http: self.make_http.clone(),
302257
drain: self.drain.clone(),
303258
}

linkerd/app/core/src/proxy/tcp.rs

Lines changed: 0 additions & 28 deletions
This file was deleted.

linkerd/app/inbound/src/lib.rs

Lines changed: 10 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -22,7 +22,7 @@ use linkerd2_app_core::{
2222
},
2323
identity,
2424
server::{Protocol as ServerProtocol, Server},
25-
tap,
25+
tap, tcp,
2626
},
2727
reconnect, serve,
2828
spans::SpanConverter,
@@ -269,12 +269,18 @@ impl<A: OrigDstAddr> Config<A> {
269269
.push(metrics.http_handle_time.layer())
270270
.serves::<tls::accept::Meta>();
271271

272+
let forward_tcp = tcp::Forward::new(
273+
svc::stack(connect_stack)
274+
.push(svc::map_target::layer(|meta: tls::accept::Meta| {
275+
Endpoint::from(meta.addrs.target_addr())
276+
}))
277+
.into_inner(),
278+
);
279+
272280
let server = Server::new(
273281
TransportLabels,
274282
metrics.transport,
275-
svc::stack(connect_stack)
276-
.push(svc::map_target::layer(Endpoint::from))
277-
.into_inner(),
283+
forward_tcp,
278284
source_stack,
279285
h2_settings,
280286
drain.clone(),

linkerd/app/outbound/src/lib.rs

Lines changed: 10 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -25,7 +25,7 @@ use linkerd2_app_core::{
2525
},
2626
identity,
2727
resolve::map_endpoint,
28-
tap, Server,
28+
tap, tcp, Server,
2929
},
3030
reconnect, serve,
3131
spans::SpanConverter,
@@ -329,12 +329,18 @@ impl<A: OrigDstAddr> Config<A> {
329329
))
330330
.push(metrics.http_handle_time.layer());
331331

332+
let forward_tcp = tcp::Forward::new(
333+
svc::stack(connect_stack)
334+
.push(svc::map_target::layer(|meta: tls::accept::Meta| {
335+
Endpoint::from(meta.addrs.target_addr())
336+
}))
337+
.into_inner(),
338+
);
339+
332340
let proxy = Server::new(
333341
TransportLabels,
334342
metrics.transport,
335-
svc::stack(connect_stack)
336-
.push(svc::map_target::layer(Endpoint::from))
337-
.into_inner(),
343+
forward_tcp,
338344
server_stack,
339345
h2_settings,
340346
drain.clone(),

linkerd/proxy/core/src/listen.rs

Lines changed: 24 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -42,8 +42,18 @@ pub trait Accept<C> {
4242
fn poll_ready(&mut self) -> Poll<(), Self::Error>;
4343

4444
fn accept(&mut self, connection: C) -> Self::Future;
45+
46+
fn into_service(self) -> AcceptService<Self>
47+
where
48+
Self: Sized,
49+
{
50+
AcceptService(self)
51+
}
4552
}
4653

54+
#[derive(Clone, Debug)]
55+
pub struct AcceptService<S>(S);
56+
4757
impl<C, S> Accept<C> for S
4858
where
4959
S: Service<C, Response = ()>,
@@ -52,17 +62,29 @@ where
5262
type Error = S::Error;
5363
type Future = S::Future;
5464

55-
#[inline]
5665
fn poll_ready(&mut self) -> Poll<(), Self::Error> {
5766
Service::poll_ready(self)
5867
}
5968

60-
#[inline]
6169
fn accept(&mut self, connection: C) -> Self::Future {
6270
Service::call(self, connection)
6371
}
6472
}
6573

74+
impl<C, S: Accept<C>> Service<C> for AcceptService<S> {
75+
type Response = ();
76+
type Error = S::Error;
77+
type Future = S::Future;
78+
79+
fn poll_ready(&mut self) -> Poll<(), Self::Error> {
80+
self.0.poll_ready()
81+
}
82+
83+
fn call(&mut self, connection: C) -> Self::Future {
84+
self.0.accept(connection)
85+
}
86+
}
87+
6688
pub struct Serve<L, A, E> {
6789
listen: L,
6890
accept: A,

linkerd/proxy/tcp/Cargo.toml

Lines changed: 15 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,15 @@
1+
[package]
2+
name = "linkerd2-proxy-tcp"
3+
version = "0.1.0"
4+
authors = ["Linkerd Developers <[email protected]>"]
5+
edition = "2018"
6+
publish = false
7+
8+
9+
[dependencies]
10+
futures = "0.1"
11+
linkerd2-duplex = { path = "../../duplex" }
12+
linkerd2-error = { path = "../../error" }
13+
tokio = "0.1.14"
14+
tower = "0.1"
15+
tower-load = { git = "https://github.com/tower-rs/tower" }

0 commit comments

Comments
 (0)