Skip to content

Commit f4a2a6b

Browse files
authored
service-profiles: Wrap receiver types (#1038)
We should be able to cache profile resolutions independently of the stack; but the fact that the the profile modue's `Receiver` type is just an alias for `watch::Receiver<Profile>` means that we can't attach the additional metadata needed for cache entry retention. In anticipation of caching profiles by lookup address, this change creates an explicit `Receiver` type that hides the underlying `watch::Receiver`, exposing only methods used by other crates/modules. This change also replaces the boxed watch stream with a `tokio_stream::wrappers::WatchStream`--also wrapped by a profile-specific type--to avoid needless allocation and support retaining additional resources.
1 parent 2a87f89 commit f4a2a6b

File tree

16 files changed

+90
-60
lines changed

16 files changed

+90
-60
lines changed

Cargo.lock

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1303,7 +1303,6 @@ dependencies = [
13031303
name = "linkerd-service-profiles"
13041304
version = "0.1.0"
13051305
dependencies = [
1306-
"async-stream",
13071306
"bytes",
13081307
"futures",
13091308
"http",
@@ -1323,6 +1322,7 @@ dependencies = [
13231322
"regex",
13241323
"thiserror",
13251324
"tokio",
1325+
"tokio-stream",
13261326
"tonic",
13271327
"tower",
13281328
"tracing",

linkerd/app/gateway/src/gateway.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -63,7 +63,7 @@ where
6363
None => return Gateway::BadDomain(http.target.name().clone()),
6464
};
6565

66-
let logical_addr = match profile.borrow().addr.clone() {
66+
let logical_addr = match profile.logical_addr() {
6767
Some(addr) => addr,
6868
None => return Gateway::BadDomain(http.target.name().clone()),
6969
};

linkerd/app/gateway/src/lib.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -119,7 +119,7 @@ where
119119
let profile = p.ok_or_else(|| {
120120
DiscoveryRejected::new("no profile discovered for gateway target")
121121
})?;
122-
let logical_addr = profile.borrow().addr.clone().ok_or_else(|| {
122+
let logical_addr = profile.logical_addr().ok_or_else(|| {
123123
DiscoveryRejected::new(
124124
"profile for gateway target does not have a logical address",
125125
)

linkerd/app/inbound/fuzz/Cargo.lock

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1132,7 +1132,6 @@ dependencies = [
11321132
name = "linkerd-service-profiles"
11331133
version = "0.1.0"
11341134
dependencies = [
1135-
"async-stream",
11361135
"bytes",
11371136
"futures",
11381137
"http",
@@ -1150,6 +1149,7 @@ dependencies = [
11501149
"regex",
11511150
"thiserror",
11521151
"tokio",
1152+
"tokio-stream",
11531153
"tonic",
11541154
"tower",
11551155
"tracing",

linkerd/app/outbound/src/ingress.rs

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -97,8 +97,7 @@ impl Outbound<svc::BoxNewHttp<http::Endpoint>> {
9797
.push_request_filter(
9898
|(profile, http): (Option<profiles::Receiver>, Http<NameAddr>)| {
9999
if let Some(profile) = profile {
100-
let addr = profile.borrow().addr.clone();
101-
if let Some(logical_addr) = addr {
100+
if let Some(logical_addr) = profile.logical_addr() {
102101
return Ok(http::Logical {
103102
profile,
104103
logical_addr,

linkerd/app/outbound/src/logical.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -58,7 +58,7 @@ impl<P> svc::Param<LogicalAddr> for Logical<P> {
5858
// Used for skipping HTTP detection
5959
impl svc::Param<Option<http::detect::Skip>> for Logical<()> {
6060
fn param(&self) -> Option<http::detect::Skip> {
61-
if self.profile.borrow().opaque_protocol {
61+
if self.profile.is_opaque_protocol() {
6262
Some(http::detect::Skip)
6363
} else {
6464
None

linkerd/app/outbound/src/switch_logical.rs

Lines changed: 6 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -36,28 +36,21 @@ impl<S> Outbound<S> {
3636
.push_switch(
3737
move |(profile, target): (Option<profiles::Receiver>, T)| -> Result<_, Never> {
3838
if let Some(rx) = profile {
39-
let profiles::Profile {
40-
ref addr,
41-
ref endpoint,
42-
opaque_protocol,
43-
..
44-
} = *rx.borrow();
45-
4639
// If the profile provides an endpoint, then the target is single endpoint and
4740
// not a logical/load-balanced service.
48-
if let Some((addr, metadata)) = endpoint.clone() {
41+
if let Some((addr, metadata)) = rx.endpoint() {
4942
return Ok(svc::Either::A(Endpoint::from_metadata(
5043
addr,
5144
metadata,
5245
no_tls_reason,
53-
opaque_protocol,
46+
rx.is_opaque_protocol(),
5447
)));
5548
}
5649

5750
// Otherwise, if the profile provides a (named) logical address, then we build a
5851
// logical stack so we apply routes, traffic splits, and load balancing.
59-
if let Some(logical_addr) = addr.clone() {
60-
return Ok(svc::Either::B(Logical::new(logical_addr, rx.clone())));
52+
if let Some(logical_addr) = rx.logical_addr() {
53+
return Ok(svc::Either::B(Logical::new(logical_addr, rx)));
6154
}
6255
}
6356

@@ -151,7 +144,7 @@ mod tests {
151144
});
152145

153146
let orig_dst = OrigDstAddr(SocketAddr::new([192, 0, 2, 20].into(), 2020));
154-
let svc = stack.new_service((Some(profile), orig_dst));
147+
let svc = stack.new_service((Some(profile.into()), orig_dst));
155148
let (server_io, _client_io) = io::duplex(1);
156149
svc.oneshot(server_io).await.expect("service must succeed");
157150
}
@@ -182,7 +175,7 @@ mod tests {
182175
});
183176

184177
let orig_dst = OrigDstAddr(SocketAddr::new([192, 0, 2, 20].into(), 2020));
185-
let svc = stack.new_service((Some(profile), orig_dst));
178+
let svc = stack.new_service((Some(profile.into()), orig_dst));
186179
let (server_io, _client_io) = io::duplex(1);
187180
svc.oneshot(server_io).await.expect("service must succeed");
188181
}

linkerd/app/outbound/src/tcp/logical.rs

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -140,12 +140,12 @@ mod tests {
140140

141141
// We create a logical target to be resolved to endpoints.
142142
let logical_addr = LogicalAddr("xyz.example.com:4444".parse().unwrap());
143-
let (_tx, profile) = tokio::sync::watch::channel(Profile {
143+
let (_tx, rx) = tokio::sync::watch::channel(Profile {
144144
addr: Some(logical_addr.clone()),
145145
..Default::default()
146146
});
147147
let logical = Logical {
148-
profile,
148+
profile: rx.into(),
149149
logical_addr: logical_addr.clone(),
150150
protocol: (),
151151
};
@@ -203,12 +203,12 @@ mod tests {
203203

204204
// We create a logical target to be resolved to endpoints.
205205
let logical_addr = LogicalAddr("xyz.example.com:4444".parse().unwrap());
206-
let (_tx, profile) = tokio::sync::watch::channel(Profile {
206+
let (_tx, rx) = tokio::sync::watch::channel(Profile {
207207
addr: Some(logical_addr.clone()),
208208
..Default::default()
209209
});
210210
let logical = Logical {
211-
profile,
211+
profile: rx.into(),
212212
logical_addr: logical_addr.clone(),
213213
protocol: (),
214214
};

linkerd/app/test/src/profile.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -16,7 +16,7 @@ pub fn only_default() -> Receiver {
1616
pub fn only(profile: Profile) -> Receiver {
1717
let (tx, rx) = channel(profile);
1818
tokio::spawn(async move { tx.closed().await });
19-
rx
19+
rx.into()
2020
}
2121

2222
pub fn resolver() -> crate::resolver::Profiles {

linkerd/app/test/src/resolver.rs

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -158,7 +158,7 @@ impl Profiles {
158158
.endpoints
159159
.lock()
160160
.unwrap()
161-
.insert(addr.into(), Some(rx));
161+
.insert(addr.into(), Some(rx.into()));
162162
tx
163163
}
164164

@@ -169,7 +169,7 @@ impl Profiles {
169169
.endpoints
170170
.lock()
171171
.unwrap()
172-
.insert(addr.into(), Some(rx));
172+
.insert(addr.into(), Some(rx.into()));
173173
self
174174
}
175175

0 commit comments

Comments
 (0)