Skip to content

Commit 571f4cb

Browse files
authored
discover: Make endpoints with NewService (#671)
This branch changes `linkerd2_proxy_discover::MakeEndpoint` to wrap a `NewService` rather than a `MakeService`, and construct endpoint services synchronously rather than asynchronously. This allows removing a great deal of complex logic for driving and cancelling `MakeService` futures that was previously necessary. For now, we just wrap the underlying endpoint stacks in `into_new_service` to create them lazily. In the future, these stacks will likely be changed to native `NewService` impls.
1 parent b7e8e5d commit 571f4cb

File tree

12 files changed

+190
-504
lines changed

12 files changed

+190
-504
lines changed

Cargo.lock

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -750,6 +750,7 @@ version = "0.1.0"
750750
dependencies = [
751751
"futures 0.3.5",
752752
"linkerd2-error",
753+
"linkerd2-stack",
753754
"tower",
754755
"tracing",
755756
]

linkerd/admit/Cargo.toml

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -8,5 +8,6 @@ publish = false
88
[dependencies]
99
futures = "0.3"
1010
linkerd2-error = { path = "../error" }
11+
linkerd2-stack = { path = "../stack" }
1112
tower = { version = "0.3", default-features = false, features = ["util"] }
12-
tracing = "0.1.19"
13+
tracing = "0.1.19"

linkerd/admit/src/lib.rs

Lines changed: 16 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -4,6 +4,7 @@
44
55
use futures::{future, TryFutureExt};
66
use linkerd2_error::Error;
7+
use linkerd2_stack::{NewService, ResultService};
78
use std::task::{Context, Poll};
89

910
pub struct AdmitLayer<A>(A);
@@ -37,6 +38,21 @@ impl<A: Clone, S> tower::layer::Layer<S> for AdmitLayer<A> {
3738
}
3839
}
3940

41+
impl<A, T, S> NewService<T> for AdmitService<A, S>
42+
where
43+
A: Admit<T>,
44+
S: NewService<T>,
45+
{
46+
type Service = ResultService<S::Service, A::Error>;
47+
48+
fn new_service(&mut self, t: T) -> Self::Service {
49+
match self.admit.admit(&t) {
50+
Ok(()) => ResultService::ok(self.inner.new_service(t)),
51+
Err(e) => ResultService::err(e),
52+
}
53+
}
54+
}
55+
4056
impl<A, T, S> tower::Service<T> for AdmitService<A, S>
4157
where
4258
A: Admit<T>,

linkerd/app/core/src/control.rs

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -63,6 +63,7 @@ impl Config {
6363
.push(self::client::layer())
6464
.push(reconnect::layer(backoff.clone()))
6565
.push_spawn_ready()
66+
.into_new_service()
6667
.push(self::resolve::layer(dns, backoff))
6768
.push_on_response(self::control::balance::layer())
6869
.push(metrics.into_layer::<classify::Response>())

linkerd/app/inbound/src/lib.rs

Lines changed: 21 additions & 32 deletions
Original file line numberDiff line numberDiff line change
@@ -219,6 +219,7 @@ impl Config {
219219
let target = endpoint
220220
.push_map_target(HttpEndpoint::from)
221221
.push(observe)
222+
.push_on_response(svc::layers().box_http_response())
222223
.into_new_service()
223224
.check_new_service::<Target, http::Request<_>>();
224225

@@ -248,50 +249,41 @@ impl Config {
248249
.push_map_target(endpoint::Logical::from)
249250
.push(profiles::discover::layer(profiles_client))
250251
.into_new_service()
251-
.cache(
252-
svc::layers().push_on_response(
253-
svc::layers()
254-
.push_failfast(dispatch_timeout)
255-
.push_spawn_buffer_with_idle_timeout(buffer_capacity, cache_max_idle_age)
256-
.push(metrics.stack.layer(stack_labels("profile")))
257-
.box_http_response(),
258-
),
259-
)
260-
.into_make_service()
261-
.spawn_buffer(buffer_capacity)
262252
.instrument(|_: &Target| debug_span!("profile"))
263-
.check_make_service::<Target, http::Request<_>>();
253+
.push_on_response(svc::layers().box_http_response())
254+
.check_new_service::<Target, http::Request<http::boxed::Payload>>();
264255

265256
let forward = target
266-
.cache(
267-
svc::layers().push_on_response(
268-
svc::layers()
269-
.push_failfast(dispatch_timeout)
270-
.push_spawn_buffer_with_idle_timeout(buffer_capacity, cache_max_idle_age)
271-
.push(metrics.stack.layer(stack_labels("forward")))
272-
.box_http_response(),
273-
),
274-
)
275-
.into_make_service()
276-
.spawn_buffer(buffer_capacity)
277257
.instrument(|_: &Target| debug_span!("forward"))
278-
.check_make_service::<Target, http::Request<http::boxed::Payload>>();
258+
.check_new_service::<Target, http::Request<http::boxed::Payload>>();
279259

280260
// Attempts to resolve the target as a service profile or, if that
281261
// fails, skips that stack to forward to the local endpoint.
282262
profile
283-
.push_make_ready()
284263
.push_fallback(forward)
264+
.check_new_service::<Target, http::Request<http::boxed::Payload>>()
285265
// If the traffic is targeted at the inbound port, send it through
286-
// the loopback service (i.e. as a gateway). This is done before
287-
// caching so that the loopback stack can determine whether it
288-
// should cache or not.
266+
// the loopback service (i.e. as a gateway).
289267
.push(admit::AdmitLayer::new(prevent_loop))
268+
.check_new_service::<Target, http::Request<http::boxed::Payload>>()
290269
.push_fallback_on_error::<prevent_loop::LoopPrevented, _>(
291270
svc::stack(loopback)
292-
.check_make_service::<Target, http::Request<_>>()
271+
.into_new_service()
272+
.check_new_service::<Target, http::Request<_>>()
293273
.into_inner(),
294274
)
275+
.check_new_service::<Target, http::Request<http::boxed::Payload>>()
276+
.cache(
277+
svc::layers().push_on_response(
278+
svc::layers()
279+
.push_failfast(dispatch_timeout)
280+
.push_spawn_buffer_with_idle_timeout(buffer_capacity, cache_max_idle_age)
281+
.push(metrics.stack.layer(stack_labels("logical")))
282+
.box_http_response(),
283+
),
284+
)
285+
.into_make_service()
286+
.spawn_buffer(buffer_capacity)
295287
.check_make_service::<Target, http::Request<http::boxed::Payload>>()
296288
.into_inner()
297289
}
@@ -361,9 +353,6 @@ impl Config {
361353
));
362354

363355
let http_server = svc::stack(http_router)
364-
// Ensures that the built service is ready before it is returned
365-
// to the router to dispatch a request.
366-
.push_make_ready()
367356
// Limits the amount of time each request waits to obtain a
368357
// ready service.
369358
.push_timeout(dispatch_timeout)

linkerd/app/outbound/src/lib.rs

Lines changed: 38 additions & 52 deletions
Original file line numberDiff line numberDiff line change
@@ -85,21 +85,18 @@ impl Config {
8585
&self,
8686
connect: C,
8787
resolve: E,
88-
) -> impl tower::Service<
88+
) -> impl svc::NewService<
8989
SocketAddr,
90-
Error = impl Into<Error>,
91-
Future = impl Unpin + Send + 'static,
92-
Response = impl tower::Service<
90+
Service = impl tower::Service<
9391
I,
9492
Response = (),
9593
Future = impl Unpin + Send + 'static,
9694
Error = impl Into<Error>,
9795
> + Unpin
98-
+ Clone
99-
+ Send
100-
+ 'static,
101-
> + Unpin
102-
+ Clone
96+
+ Send
97+
+ 'static,
98+
> + Clone
99+
+ Unpin
103100
+ Send
104101
+ 'static
105102
where
@@ -112,10 +109,7 @@ impl Config {
112109
I: tokio::io::AsyncRead + tokio::io::AsyncWrite + std::fmt::Debug + Unpin + Send + 'static,
113110
{
114111
let ProxyConfig {
115-
dispatch_timeout,
116-
cache_max_idle_age,
117-
buffer_capacity,
118-
..
112+
cache_max_idle_age, ..
119113
} = self.proxy;
120114

121115
svc::stack(connect)
@@ -132,18 +126,6 @@ impl Config {
132126
.push_on_response(svc::layer::mk(tcp::Forward::new))
133127
.into_new_service()
134128
.check_new_service::<SocketAddr, I>()
135-
.cache(
136-
svc::layers().push_on_response(
137-
svc::layers()
138-
.push_failfast(dispatch_timeout)
139-
.push_spawn_buffer_with_idle_timeout(buffer_capacity, cache_max_idle_age)
140-
),
141-
)
142-
.into_make_service()
143-
.spawn_buffer(buffer_capacity)
144-
.push_make_ready()
145-
.instrument(|_: &_| info_span!("tcp"))
146-
.check_make_service::<SocketAddr, I>()
147129
}
148130

149131
pub fn build_dns_refine(
@@ -327,7 +309,8 @@ impl Config {
327309
.box_http_request(),
328310
)
329311
.push_spawn_ready()
330-
.check_make_service::<HttpEndpoint, http::Request<_>>()
312+
.into_new_service()
313+
.check_new_service::<HttpEndpoint, http::Request<_>>()
331314
.push(discover)
332315
.check_service::<HttpConcrete>()
333316
.push_on_response(
@@ -379,36 +362,13 @@ impl Config {
379362
// it to inner stack to build the router and traffic split.
380363
.push(profiles::discover::layer(profiles_client))
381364
.into_new_service()
382-
.cache(
383-
svc::layers().push_on_response(
384-
svc::layers()
385-
.push_failfast(dispatch_timeout)
386-
.push_spawn_buffer_with_idle_timeout(buffer_capacity, cache_max_idle_age)
387-
.push(metrics.stack.layer(stack_labels("profile"))),
388-
),
389-
)
390-
.into_make_service()
391-
.spawn_buffer(buffer_capacity)
392-
.push_make_ready()
393-
.check_make_service::<HttpLogical, http::Request<_>>();
365+
.check_new_service::<HttpLogical, http::Request<_>>();
394366

395367
// Caches clients that bypass discovery/balancing.
396368
let forward = svc::stack(endpoint)
397-
.check_make_service::<HttpEndpoint, http::Request<http::boxed::Payload>>()
398369
.into_new_service()
399-
.cache(
400-
svc::layers().push_on_response(
401-
svc::layers()
402-
.push_failfast(dispatch_timeout)
403-
.push_spawn_buffer_with_idle_timeout(buffer_capacity, cache_max_idle_age)
404-
.box_http_request()
405-
.push(metrics.stack.layer(stack_labels("forward.endpoint"))),
406-
),
407-
)
408-
.into_make_service()
409-
.spawn_buffer(buffer_capacity)
410370
.instrument(|t: &HttpEndpoint| debug_span!("forward", peer.id = ?t.identity))
411-
.check_make_service::<HttpEndpoint, http::Request<_>>();
371+
.check_new_service::<HttpEndpoint, http::Request<_>>();
412372

413373
// Attempts to route route request to a logical services that uses
414374
// control plane for discovery. If the discovery is rejected, the
@@ -422,6 +382,17 @@ impl Config {
422382
.into_inner(),
423383
is_discovery_rejected,
424384
)
385+
.cache(
386+
svc::layers().push_on_response(
387+
svc::layers()
388+
.push_failfast(dispatch_timeout)
389+
.push_spawn_buffer_with_idle_timeout(buffer_capacity, cache_max_idle_age)
390+
.push(metrics.stack.layer(stack_labels("logical"))),
391+
),
392+
)
393+
.into_make_service()
394+
.spawn_buffer(buffer_capacity)
395+
.check_make_service::<HttpLogical, http::Request<_>>()
425396
.push(http::header_from_target::layer(CANONICAL_DST_HEADER))
426397
// Strips headers that may be set by this proxy.
427398
.push_on_response(http::strip_header::request::layer(DST_OVERRIDE_HEADER))
@@ -486,6 +457,8 @@ impl Config {
486457
dispatch_timeout,
487458
max_in_flight_requests,
488459
detect_protocol_timeout,
460+
cache_max_idle_age,
461+
buffer_capacity,
489462
..
490463
} = self.proxy;
491464
let canonicalize_timeout = self.canonicalize_timeout;
@@ -536,9 +509,22 @@ impl Config {
536509
// Load balances TCP streams that cannot be decoded as HTTP.
537510
let tcp_balance = svc::stack(self.build_tcp_balance(tcp_connect, resolve))
538511
.push_fallback_with_predicate(
539-
svc::stack(tcp_forward.clone()).push_map_target(TcpEndpoint::from),
512+
svc::stack(tcp_forward.clone())
513+
.check_new::<TcpEndpoint>()
514+
.push_map_target(TcpEndpoint::from)
515+
.into_inner(),
540516
is_discovery_rejected,
541517
)
518+
.cache(
519+
svc::layers().push_on_response(
520+
svc::layers()
521+
.push_failfast(dispatch_timeout)
522+
.push_spawn_buffer_with_idle_timeout(buffer_capacity, cache_max_idle_age),
523+
),
524+
)
525+
.into_make_service()
526+
.spawn_buffer(buffer_capacity)
527+
.instrument(|_: &_| info_span!("tcp"))
542528
.push_map_target(|a: listen::Addrs| a.target_addr())
543529
.into_inner();
544530

linkerd/app/outbound/src/tests.rs

Lines changed: 7 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,8 @@
11
use crate::Config;
22
use futures::prelude::*;
3-
use linkerd2_app_core::{config, exp_backoff, proxy::http::h2, transport::listen, Addr, Error};
3+
use linkerd2_app_core::{
4+
config, exp_backoff, proxy::http::h2, svc::NewService, transport::listen, Addr, Error,
5+
};
46
use linkerd2_app_test as test_support;
57
use std::{net::SocketAddr, time::Duration};
68
use tower::ServiceExt;
@@ -68,14 +70,12 @@ async fn plaintext_tcp() {
6870
);
6971

7072
// Build the outbound TCP balancer stack.
71-
let make = cfg
73+
let forward = cfg
7274
.build_tcp_balance(connect, resolver)
73-
.oneshot(target_addr)
74-
.err_into::<Error>()
75-
.await
76-
.expect("make service should succeed");
75+
.new_service(target_addr);
7776

78-
make.oneshot(client_io)
77+
forward
78+
.oneshot(client_io)
7979
.err_into::<Error>()
8080
.await
8181
.expect("conn should succeed");

linkerd/io/src/boxed.rs

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -86,6 +86,12 @@ impl Io for BoxedIo {
8686
}
8787
}
8888

89+
impl std::fmt::Debug for BoxedIo {
90+
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
91+
f.debug_struct("BoxedIo").finish()
92+
}
93+
}
94+
8995
#[cfg(test)]
9096
mod tests {
9197
use super::*;

0 commit comments

Comments
 (0)