Skip to content

Commit 700ca59

Browse files
authored
Handle profile endpoint in Gateway outbound stack (#1157)
In linkerd2/PR#6090, we noticed that when testing the StatefulSet changes with a service that sends a request from the source cluster and expects to be connected directly to a pod in the target cluster, the outbound side of the Gateway in the target cluster throws a 'BadDomain' error. When establishing a direct connection to a pod, the profile returned from the look-up will contain an endpoint and it will not contain a logical address. The 'BadDomain' error stems from how the Gateway handles outbound connections. If the profile does not include a logical address (i.e an authorityOverride), then the connection is failed. This change adds functionality to the Gateway to also consider endpoints received in the profile on the outbound side. Support for endpoint handling on the outbound side will unblock PR#6090. To test, I have added a unit test that will check the endpoint stack is considered when dealing with an endpoint in the profile. There are also manual tests done in conjunction with the changes from PR#6090. Signed-off-by: Matei David <[email protected]>
1 parent 853e959 commit 700ca59

File tree

5 files changed

+114
-33
lines changed

5 files changed

+114
-33
lines changed

linkerd/app/gateway/src/gateway.rs

Lines changed: 28 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -49,7 +49,10 @@ impl<O> NewGateway<O> {
4949

5050
impl<O> svc::NewService<Target> for NewGateway<O>
5151
where
52-
O: svc::NewService<outbound::http::Logical> + Send + Clone + 'static,
52+
O: svc::NewService<svc::Either<outbound::http::Logical, outbound::http::Endpoint>>
53+
+ Send
54+
+ Clone
55+
+ 'static,
5356
{
5457
type Service = Gateway<O::Service>;
5558

@@ -63,6 +66,23 @@ where
6366
None => return Gateway::BadDomain(http.target.name().clone()),
6467
};
6568

69+
// Create an outbound target using the endpoint from the profile.
70+
if let Some((addr, metadata)) = profile.endpoint() {
71+
debug!("Creating outbound endpoint");
72+
let svc = self
73+
.outbound
74+
.new_service(svc::Either::B(outbound::http::Endpoint::from((
75+
http.version,
76+
outbound::tcp::Endpoint::from_metadata(
77+
addr,
78+
metadata,
79+
tls::NoClientTls::NotProvidedByServiceDiscovery,
80+
profile.is_opaque_protocol(),
81+
),
82+
))));
83+
return Gateway::new(svc, http.target, local_id);
84+
}
85+
6686
let logical_addr = match profile.logical_addr() {
6787
Some(addr) => addr,
6888
None => return Gateway::BadDomain(http.target.name().clone()),
@@ -72,11 +92,13 @@ where
7292
// including the original port. We don't know the IP of the target, so
7393
// we use an unroutable one.
7494
debug!("Creating outbound service");
75-
let svc = self.outbound.new_service(outbound::http::Logical {
76-
profile,
77-
protocol: http.version,
78-
logical_addr,
79-
});
95+
let svc = self
96+
.outbound
97+
.new_service(svc::Either::A(outbound::http::Logical {
98+
profile,
99+
protocol: http.version,
100+
logical_addr,
101+
}));
80102

81103
Gateway::new(svc, http.target, local_id)
82104
}

linkerd/app/gateway/src/lib.rs

Lines changed: 33 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -103,33 +103,51 @@ where
103103
// For each gatewayed connection that is *not* HTTP, use the target from the
104104
// transport header to lookup a service profile. If the profile includes a
105105
// resolvable service name, then continue with TCP endpoint resolution,
106-
// balancing, and forwarding. An invalid original destination address is
107-
// used so that service discovery is *required* to provide a valid endpoint.
106+
// balancing, and forwarding. If the profile includes an endpoint instead
107+
// of a logical address, then connect to endpoint directly and avoid
108+
// balancing.
108109
//
109110
// TODO: We should use another target type that actually reflects
110111
// reality. But the outbound stack is currently pretty tightly
111112
// coupled to its target types.
112-
let tcp = outbound
113+
let logical = outbound
113114
.clone()
114115
.push_tcp_endpoint()
115-
.push_tcp_logical(resolve.clone())
116-
.into_stack()
117-
.push_request_filter(
118-
|(p, _): (Option<profiles::Receiver>, _)| -> Result<_, Error> {
119-
let profile = p.ok_or_else(|| {
116+
.push_tcp_logical(resolve.clone());
117+
let endpoint = outbound
118+
.clone()
119+
.push_tcp_endpoint()
120+
.push_tcp_forward()
121+
.into_stack();
122+
let tcp = endpoint
123+
.push_switch(
124+
move |(profile, _): (Option<profiles::Receiver>, _)| -> Result<_, Error> {
125+
let profile = profile.ok_or_else(|| {
120126
DiscoveryRejected::new("no profile discovered for gateway target")
121127
})?;
128+
129+
if let Some((addr, metadata)) = profile.endpoint() {
130+
return Ok(svc::Either::A(outbound::tcp::Endpoint::from_metadata(
131+
addr,
132+
metadata,
133+
tls::NoClientTls::NotProvidedByServiceDiscovery,
134+
profile.is_opaque_protocol(),
135+
)));
136+
}
137+
122138
let logical_addr = profile.logical_addr().ok_or_else(|| {
123139
DiscoveryRejected::new(
124-
"profile for gateway target does not have a logical address",
140+
"profiles must have either an endpoint or a logical address",
125141
)
126142
})?;
127-
Ok(outbound::tcp::Logical {
143+
144+
Ok(svc::Either::B(outbound::tcp::Logical {
128145
profile,
129146
protocol: (),
130147
logical_addr,
131-
})
148+
}))
132149
},
150+
logical.into_inner(),
133151
)
134152
.push(profiles::discover::layer(profiles.clone(), {
135153
let allow = allow_discovery.clone();
@@ -162,11 +180,12 @@ where
162180
// The client's ID is set as a request extension, as required by the
163181
// gateway. This permits gateway services (and profile resolutions) to be
164182
// cached per target, shared across clients.
165-
let http = outbound
166-
.push_tcp_endpoint()
167-
.push_http_endpoint()
183+
let endpoint = outbound.push_tcp_endpoint().push_http_endpoint();
184+
let http = endpoint
185+
.clone()
168186
.push_http_logical(resolve)
169187
.into_stack()
188+
.push_switch(Ok::<_, Never>, endpoint.into_stack())
170189
.push(NewGateway::layer(local_id))
171190
.push(profiles::discover::layer(profiles, move |t: HttpTarget| {
172191
if allow_discovery.matches(t.target.name()) {

linkerd/app/gateway/src/tests.rs

Lines changed: 51 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -11,7 +11,31 @@ use tower_test::mock;
1111
#[tokio::test]
1212
async fn gateway() {
1313
assert_eq!(
14-
Test::default().run().await.unwrap().status(),
14+
Test::default()
15+
.with_default_profile()
16+
.run()
17+
.await
18+
.unwrap()
19+
.status(),
20+
http::StatusCode::NO_CONTENT
21+
);
22+
}
23+
24+
#[tokio::test]
25+
async fn gateway_endpoint() {
26+
let addr = std::net::SocketAddr::new([192, 0, 2, 10].into(), 777);
27+
let profile = support::profile::only(profiles::Profile {
28+
endpoint: Some((addr, Metadata::default())),
29+
..profiles::Profile::default()
30+
});
31+
32+
assert_eq!(
33+
Test::default()
34+
.with_profile(profile)
35+
.run()
36+
.await
37+
.unwrap()
38+
.status(),
1539
http::StatusCode::NO_CONTENT
1640
);
1741
}
@@ -23,6 +47,7 @@ async fn bad_domain() {
2347
..Default::default()
2448
};
2549
let status = test
50+
.with_default_profile()
2651
.run()
2752
.await
2853
.unwrap_err()
@@ -39,6 +64,7 @@ async fn no_identity() {
3964
..Default::default()
4065
};
4166
let status = test
67+
.with_default_profile()
4268
.run()
4369
.await
4470
.unwrap_err()
@@ -57,6 +83,7 @@ async fn forward_loop() {
5783
..Default::default()
5884
};
5985
let status = test
86+
.with_default_profile()
6087
.run()
6188
.await
6289
.unwrap_err()
@@ -71,6 +98,7 @@ struct Test {
7198
target: NameAddr,
7299
client_id: Option<tls::ClientId>,
73100
orig_fwd: Option<&'static str>,
101+
profile: Option<profiles::Receiver>,
74102
}
75103

76104
impl Default for Test {
@@ -80,36 +108,31 @@ impl Default for Test {
80108
target: NameAddr::from_str("dst.test.example.com:4321").unwrap(),
81109
client_id: Some(tls::ClientId::from_str("client.id.test").unwrap()),
82110
orig_fwd: None,
111+
profile: None,
83112
}
84113
}
85114
}
86115

87116
impl Test {
88117
async fn run(self) -> Result<http::Response<http::BoxBody>, Error> {
89118
let Self {
90-
suffix,
119+
suffix: _,
91120
target,
92121
client_id,
93122
orig_fwd,
123+
profile,
94124
} = self;
95125

96126
let (outbound, mut handle) =
97127
mock::pair::<http::Request<http::BoxBody>, http::Response<http::BoxBody>>();
98128

99129
let new = NewGateway::new(
100-
move |_: outbound::http::Logical| outbound.clone(),
130+
move |_: svc::Either<outbound::http::Logical, outbound::http::Endpoint>| {
131+
outbound.clone()
132+
},
101133
Some(tls::LocalId(id::Name::from_str("gateway.id.test").unwrap())),
102134
);
103135

104-
let allow = NameMatch::new(Some(dns::Suffix::from_str(suffix).unwrap()));
105-
let profile = if allow.matches(target.name()) {
106-
Some(support::profile::only(profiles::Profile {
107-
addr: Some(target.clone().into()),
108-
..profiles::Profile::default()
109-
}))
110-
} else {
111-
None
112-
};
113136
let t = HttpTarget {
114137
target: target.clone(),
115138
version: http::Version::Http1,
@@ -146,4 +169,20 @@ impl Test {
146169
bg.await?;
147170
Ok(rsp)
148171
}
172+
173+
fn with_profile(mut self, profile: profiles::Receiver) -> Self {
174+
let allow = NameMatch::new(Some(dns::Suffix::from_str(self.suffix).unwrap()));
175+
if allow.matches(self.target.name()) {
176+
self.profile = Some(profile);
177+
}
178+
self
179+
}
180+
181+
fn with_default_profile(self) -> Self {
182+
let target = self.target.clone();
183+
self.with_profile(support::profile::only(profiles::Profile {
184+
addr: Some(target.into()),
185+
..profiles::Profile::default()
186+
}))
187+
}
149188
}

linkerd/app/outbound/src/endpoint.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -38,7 +38,7 @@ impl Endpoint<()> {
3838
}
3939
}
4040

41-
pub(crate) fn from_metadata(
41+
pub fn from_metadata(
4242
addr: impl Into<SocketAddr>,
4343
metadata: Metadata,
4444
reason: tls::NoClientTls,

linkerd/app/outbound/src/http/logical.rs

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -99,6 +99,7 @@ impl<E> Outbound<E> {
9999
// If the traffic split is empty/unavailable, eagerly fail requests.
100100
// When the split is in failfast, spawn the service in a background
101101
// task so it becomes ready without new requests.
102+
.check_new_service::<(ConcreteAddr, Logical), _>()
102103
.push(profiles::split::layer())
103104
.push_on_response(
104105
svc::layers()

0 commit comments

Comments
 (0)