Skip to content

Commit 2a19082

Browse files
authored
Implement profile discovery as a NewService (#673)
The profile discovery layer is always coerced to a NewService. This change formalizes this to avoid stack boilerplate. While we're here, the GetProfile trait has ben simplified to avoid exposing poll_ready in its public API. Instead, we provide an impl of Service for GetProfile that does this (though it's not currently needed).
1 parent 571f4cb commit 2a19082

File tree

4 files changed

+40
-27
lines changed

4 files changed

+40
-27
lines changed

linkerd/app/inbound/src/lib.rs

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -248,7 +248,6 @@ impl Config {
248248
))
249249
.push_map_target(endpoint::Logical::from)
250250
.push(profiles::discover::layer(profiles_client))
251-
.into_new_service()
252251
.instrument(|_: &Target| debug_span!("profile"))
253252
.push_on_response(svc::layers().box_http_response())
254253
.check_new_service::<Target, http::Request<http::boxed::Payload>>();

linkerd/app/outbound/src/lib.rs

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -361,7 +361,6 @@ impl Config {
361361
// Discovers the service profile from the control plane and passes
362362
// it to inner stack to build the router and traffic split.
363363
.push(profiles::discover::layer(profiles_client))
364-
.into_new_service()
365364
.check_new_service::<HttpLogical, http::Request<_>>();
366365

367366
// Caches clients that bypass discovery/balancing.
Lines changed: 10 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -1,11 +1,7 @@
11
use super::{GetProfile, Receiver};
22
use futures::prelude::*;
3-
use linkerd2_stack::{layer, NewService};
4-
use std::{
5-
future::Future,
6-
pin::Pin,
7-
task::{Context, Poll},
8-
};
3+
use linkerd2_stack::{layer, FutureService, NewService};
4+
use std::{future::Future, pin::Pin};
95

106
pub fn layer<G: Clone, M>(
117
get_profile: G,
@@ -22,28 +18,25 @@ pub struct Discover<G, M> {
2218
inner: M,
2319
}
2420

25-
impl<T, G, M> tower::Service<T> for Discover<G, M>
21+
impl<T, G, M> NewService<T> for Discover<G, M>
2622
where
2723
T: Clone + Send + 'static,
2824
G: GetProfile<T>,
2925
G::Future: Send + 'static,
3026
G::Error: Send,
3127
M: NewService<(Receiver, T)> + Clone + Send + 'static,
3228
{
33-
type Response = M::Service;
34-
type Error = G::Error;
35-
type Future = Pin<Box<dyn Future<Output = Result<M::Service, G::Error>> + Send + 'static>>;
29+
type Service = FutureService<
30+
Pin<Box<dyn Future<Output = Result<M::Service, G::Error>> + Send + 'static>>,
31+
M::Service,
32+
>;
3633

37-
fn poll_ready(&mut self, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
38-
self.get_profile.poll_ready(cx)
39-
}
40-
41-
fn call(&mut self, target: T) -> Self::Future {
34+
fn new_service(&mut self, target: T) -> Self::Service {
4235
let mut inner = self.inner.clone();
43-
Box::pin(
36+
FutureService::new(Box::pin(
4437
self.get_profile
4538
.get_profile(target.clone())
4639
.map_ok(move |rx| inner.new_service((rx, target))),
47-
)
40+
))
4841
}
4942
}

linkerd/service-profiles/src/lib.rs

Lines changed: 30 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -6,6 +6,7 @@ use std::{
66
future::Future,
77
task::{Context, Poll},
88
};
9+
use tower::util::{Oneshot, ServiceExt};
910

1011
mod client;
1112
pub mod discover;
@@ -28,29 +29,50 @@ pub struct Target {
2829
pub weight: u32,
2930
}
3031

32+
#[derive(Clone, Debug)]
33+
pub struct GetProfileService<P>(P);
34+
3135
/// Watches a destination's Profile.
3236
pub trait GetProfile<T> {
3337
type Error: Into<Error>;
3438
type Future: Future<Output = Result<Receiver, Self::Error>>;
3539

36-
fn poll_ready(&mut self, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>>;
37-
3840
fn get_profile(&mut self, target: T) -> Self::Future;
41+
42+
fn into_service(self) -> GetProfileService<Self>
43+
where
44+
Self: Sized,
45+
{
46+
GetProfileService(self)
47+
}
3948
}
4049

4150
impl<T, S> GetProfile<T> for S
4251
where
43-
S: tower::Service<T, Response = Receiver>,
52+
S: tower::Service<T, Response = Receiver> + Clone,
4453
S::Error: Into<Error>,
4554
{
4655
type Error = S::Error;
47-
type Future = S::Future;
56+
type Future = Oneshot<S, T>;
4857

49-
fn poll_ready(&mut self, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
50-
tower::Service::poll_ready(self, cx)
58+
fn get_profile(&mut self, target: T) -> Self::Future {
59+
self.clone().oneshot(target)
5160
}
61+
}
5262

53-
fn get_profile(&mut self, target: T) -> Self::Future {
54-
tower::Service::call(self, target)
63+
impl<T, P> tower::Service<T> for GetProfileService<P>
64+
where
65+
P: GetProfile<T>,
66+
{
67+
type Response = Receiver;
68+
type Error = P::Error;
69+
type Future = P::Future;
70+
71+
fn poll_ready(&mut self, _: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
72+
Poll::Ready(Ok(()))
73+
}
74+
75+
fn call(&mut self, target: T) -> Self::Future {
76+
self.0.get_profile(target)
5577
}
5678
}

0 commit comments

Comments
 (0)