Skip to content

Commit 827762d

Browse files
authored
outbound: Decouple the TCP connect stack from the target type (#951)
In order to make the server and logical stack more flexible, we need to make stacks generic over their input target type. This change modifies the innermost `tcp::connect` stack to not depend on a concrete target type, instead relying on `Param` constraints. A new `tcp::Connect` target type is introduced to be constructed by the TCP connect stack.
1 parent 1009645 commit 827762d

File tree

8 files changed

+140
-67
lines changed

8 files changed

+140
-67
lines changed

linkerd/app/gateway/src/lib.rs

Lines changed: 3 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -79,15 +79,11 @@ pub fn stack<I, O, P, R>(
7979
+ Send
8080
where
8181
I: io::AsyncRead + io::AsyncWrite + io::PeerAddr + fmt::Debug + Send + Sync + Unpin + 'static,
82-
O: svc::Service<outbound::http::Endpoint, Error = io::Error>
83-
+ svc::Service<outbound::tcp::Endpoint, Error = io::Error>,
8482
O: Clone + Send + Sync + Unpin + 'static,
85-
<O as svc::Service<outbound::http::Endpoint>>::Response:
83+
O: svc::Service<outbound::tcp::Connect, Error = io::Error>,
84+
O::Response:
8685
io::AsyncRead + io::AsyncWrite + tls::HasNegotiatedProtocol + Send + Unpin + 'static,
87-
<O as svc::Service<outbound::http::Endpoint>>::Future: Send + Unpin + 'static,
88-
<O as svc::Service<outbound::tcp::Endpoint>>::Response:
89-
io::AsyncRead + io::AsyncWrite + tls::HasNegotiatedProtocol + Send + Unpin + 'static,
90-
<O as svc::Service<outbound::tcp::Endpoint>>::Future: Send + Unpin + 'static,
86+
O::Future: Send + Unpin + 'static,
9187
P: profiles::GetProfile<profiles::LogicalAddr> + Clone + Send + Sync + Unpin + 'static,
9288
P::Future: Send + 'static,
9389
P::Error: Send,

linkerd/app/outbound/src/http/endpoint.rs

Lines changed: 5 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -8,13 +8,7 @@ use linkerd_app_core::{
88
use tokio::io;
99
use tracing::debug_span;
1010

11-
impl<C> Outbound<C>
12-
where
13-
C: svc::Service<Endpoint> + Clone + Send + Sync + Unpin + 'static,
14-
C::Response: io::AsyncRead + io::AsyncWrite + Send + Unpin,
15-
C::Error: Into<Error>,
16-
C::Future: Send + Unpin,
17-
{
11+
impl<C> Outbound<C> {
1812
pub fn push_http_endpoint<B>(
1913
self,
2014
) -> Outbound<
@@ -31,6 +25,10 @@ where
3125
where
3226
B: http::HttpBody<Error = Error> + std::fmt::Debug + Default + Send + 'static,
3327
B::Data: Send + 'static,
28+
C: svc::Service<Endpoint> + Clone + Send + Sync + Unpin + 'static,
29+
C::Response: io::AsyncRead + io::AsyncWrite + Send + Unpin,
30+
C::Error: Into<Error>,
31+
C::Future: Send + Unpin,
3432
{
3533
let Self {
3634
config,

linkerd/app/outbound/src/http/mod.rs

Lines changed: 0 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -135,15 +135,6 @@ impl Param<Option<SessionProtocol>> for Endpoint {
135135
}
136136
}
137137

138-
impl Param<Option<AuthorityOverride>> for Endpoint {
139-
fn param(&self) -> Option<AuthorityOverride> {
140-
self.metadata
141-
.authority_override()
142-
.cloned()
143-
.map(AuthorityOverride)
144-
}
145-
}
146-
147138
impl tap::Inspect for Endpoint {
148139
fn src_addr<B>(&self, req: &Request<B>) -> Option<SocketAddr> {
149140
req.extensions().get::<ClientHandle>().map(|c| c.addr)

linkerd/app/outbound/src/lib.rs

Lines changed: 4 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -101,17 +101,11 @@ impl<S> Outbound<S> {
101101
Service = impl svc::Service<I, Response = (), Error = Error, Future = impl Send>,
102102
>
103103
where
104-
S: svc::Service<http::Endpoint, Error = io::Error>
105-
+ svc::Service<tcp::Endpoint, Error = io::Error>,
106104
S: Clone + Send + Sync + Unpin + 'static,
107-
<S as svc::Service<http::Endpoint>>::Response: tls::HasNegotiatedProtocol,
108-
<S as svc::Service<http::Endpoint>>::Response:
109-
tokio::io::AsyncRead + tokio::io::AsyncWrite + Send + Unpin,
110-
<S as svc::Service<http::Endpoint>>::Future: Send + Unpin,
111-
<S as svc::Service<tcp::Endpoint>>::Response: tls::HasNegotiatedProtocol,
112-
<S as svc::Service<tcp::Endpoint>>::Response:
113-
tokio::io::AsyncRead + tokio::io::AsyncWrite + Send + Unpin,
114-
<S as svc::Service<tcp::Endpoint>>::Future: Send,
105+
S: svc::Service<tcp::Connect, Error = io::Error>,
106+
S::Response: tls::HasNegotiatedProtocol,
107+
S::Response: tokio::io::AsyncRead + tokio::io::AsyncWrite + Send + Unpin,
108+
S::Future: Send + Unpin,
115109
R: Clone + Send + 'static,
116110
R: Resolve<ConcreteAddr, Endpoint = Metadata, Error = Error>,
117111
R::Resolution: Send,

linkerd/app/outbound/src/target.rs

Lines changed: 24 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,9 @@
1+
use crate::tcp::opaque_transport;
12
use linkerd_app_core::{
23
metrics, profiles,
34
proxy::{
45
api_resolve::{ConcreteAddr, Metadata},
6+
http,
57
resolve::map_endpoint::MapEndpoint,
68
},
79
svc::{self, Param},
@@ -12,11 +14,6 @@ use linkerd_app_core::{
1214
use std::net::SocketAddr;
1315
use tracing::debug;
1416

15-
#[derive(Copy, Clone)]
16-
pub(crate) struct EndpointFromMetadata {
17-
pub identity_disabled: bool,
18-
}
19-
2017
#[derive(Copy, Clone, Debug, Eq, PartialEq, Hash)]
2118
pub struct Accept<P> {
2219
pub orig_dst: OrigDstAddr,
@@ -45,6 +42,11 @@ pub struct Endpoint<P> {
4542
pub protocol: P,
4643
}
4744

45+
#[derive(Copy, Clone)]
46+
pub struct EndpointFromMetadata {
47+
pub identity_disabled: bool,
48+
}
49+
4850
// === impl Accept ===
4951

5052
impl<P> Param<transport::labels::Key> for Accept<P> {
@@ -218,6 +220,23 @@ impl<P> Param<tls::ConditionalClientTls> for Endpoint<P> {
218220
}
219221
}
220222

223+
impl<P> Param<Option<opaque_transport::PortOverride>> for Endpoint<P> {
224+
fn param(&self) -> Option<opaque_transport::PortOverride> {
225+
self.metadata
226+
.opaque_transport_port()
227+
.map(opaque_transport::PortOverride)
228+
}
229+
}
230+
231+
impl<P> Param<Option<http::AuthorityOverride>> for Endpoint<P> {
232+
fn param(&self) -> Option<http::AuthorityOverride> {
233+
self.metadata
234+
.authority_override()
235+
.cloned()
236+
.map(http::AuthorityOverride)
237+
}
238+
}
239+
221240
impl<P> Param<transport::labels::Key> for Endpoint<P> {
222241
fn param(&self) -> transport::labels::Key {
223242
transport::labels::Key::OutboundConnect(self.param())

linkerd/app/outbound/src/tcp/connect.rs

Lines changed: 35 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -1,15 +1,23 @@
1-
use super::opaque_transport::OpaqueTransport;
2-
use crate::{target::Endpoint, Outbound};
1+
use super::opaque_transport::{self, OpaqueTransport};
2+
use crate::Outbound;
33
use futures::future;
44
use linkerd_app_core::{
5-
io, svc, tls,
6-
transport::{ConnectTcp, Remote, ServerAddr},
5+
io,
6+
proxy::http,
7+
svc, tls,
8+
transport::{self, ConnectTcp, Remote, ServerAddr},
79
transport_header::SessionProtocol,
810
Error,
911
};
1012
use std::task::{Context, Poll};
1113
use tracing::debug_span;
1214

15+
#[derive(Clone, Debug)]
16+
pub struct Connect {
17+
pub addr: Remote<ServerAddr>,
18+
pub tls: tls::ConditionalClientTls,
19+
}
20+
1321
/// Prevents outbound connections on the loopback interface, unless the
1422
/// `allow-loopback` feature is enabled.
1523
#[derive(Clone, Debug)]
@@ -25,19 +33,24 @@ impl Outbound<()> {
2533
}
2634

2735
impl<C> Outbound<C> {
28-
pub fn push_tcp_endpoint<P>(
36+
pub fn push_tcp_endpoint<T>(
2937
self,
3038
) -> Outbound<
3139
impl svc::Service<
32-
Endpoint<P>,
40+
T,
3341
Response = impl io::AsyncRead + io::AsyncWrite + Send + Unpin,
3442
Error = Error,
3543
Future = impl Send,
3644
> + Clone,
3745
>
3846
where
39-
Endpoint<P>: svc::Param<Option<SessionProtocol>>,
40-
C: svc::Service<Endpoint<P>, Error = io::Error> + Clone + Send + 'static,
47+
T: svc::Param<Remote<ServerAddr>>
48+
+ svc::Param<tls::ConditionalClientTls>
49+
+ svc::Param<Option<opaque_transport::PortOverride>>
50+
+ svc::Param<Option<http::AuthorityOverride>>
51+
+ svc::Param<Option<SessionProtocol>>
52+
+ svc::Param<transport::labels::Key>,
53+
C: svc::Service<Connect, Error = io::Error> + Clone + Send + 'static,
4154
C::Response: tls::HasNegotiatedProtocol,
4255
C::Response: io::AsyncRead + io::AsyncWrite + Send + Unpin + 'static,
4356
C::Future: Send + 'static,
@@ -147,3 +160,17 @@ where
147160
future::Either::Left(self.0.call(ep))
148161
}
149162
}
163+
164+
// === impl Connect ===
165+
166+
impl svc::Param<Remote<ServerAddr>> for Connect {
167+
fn param(&self) -> Remote<ServerAddr> {
168+
self.addr
169+
}
170+
}
171+
172+
impl svc::Param<tls::ConditionalClientTls> for Connect {
173+
fn param(&self) -> tls::ConditionalClientTls {
174+
self.tls.clone()
175+
}
176+
}

linkerd/app/outbound/src/tcp/mod.rs

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -4,6 +4,7 @@ pub mod opaque_transport;
44
#[cfg(test)]
55
mod tests;
66

7+
pub use self::connect::Connect;
78
use crate::target;
89
pub use linkerd_app_core::proxy::tcp::Forward;
910
use linkerd_app_core::{

0 commit comments

Comments
 (0)