Skip to content

Commit f4de07d

Browse files
authored
Update the accept stack to use NewService (#675)
This change modifies `serve` to take a `NewService` instead of a `MakeService`. Services specific to the accept stack have been updated as well `DetectHttp` has been updated to work as either a `MakeService` or `NewService` -- the asynchronous version is still needed by the outbound proxy (until caching is changed). `DetectTls` is now purely a `NewService`.
1 parent 6701892 commit f4de07d

File tree

10 files changed

+113
-138
lines changed

10 files changed

+113
-138
lines changed

linkerd/app/core/src/admin/mod.rs

Lines changed: 5 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -10,7 +10,7 @@ use crate::{
1010
use futures::{future, TryFutureExt};
1111
use http::StatusCode;
1212
use hyper::{Body, Request, Response};
13-
use linkerd2_error::{Error, Never};
13+
use linkerd2_error::Error;
1414
use linkerd2_metrics::{self as metrics, FmtMetrics};
1515
use std::{
1616
future::Future,
@@ -110,17 +110,11 @@ impl<M: FmtMetrics> Service<Request<Body>> for Admin<M> {
110110
}
111111
}
112112

113-
impl<M: FmtMetrics + Clone + Send + 'static> svc::Service<tls::accept::Meta> for Accept<M> {
114-
type Response = Serve<M>;
115-
type Error = Never;
116-
type Future = future::Ready<Result<Self::Response, Self::Error>>;
113+
impl<M: FmtMetrics + Clone + Send + 'static> svc::NewService<tls::accept::Meta> for Accept<M> {
114+
type Service = Serve<M>;
117115

118-
fn poll_ready(&mut self, _: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
119-
Poll::Ready(Ok(()))
120-
}
121-
122-
fn call(&mut self, meta: tls::accept::Meta) -> Self::Future {
123-
future::ok(Serve(meta, self.clone()))
116+
fn new_service(&mut self, meta: tls::accept::Meta) -> Self::Service {
117+
Serve(meta, self.clone())
124118
}
125119
}
126120

linkerd/app/core/src/serve.rs

Lines changed: 9 additions & 22 deletions
Original file line numberDiff line numberDiff line change
@@ -1,8 +1,9 @@
1+
use crate::svc;
12
use futures::prelude::*;
23
use linkerd2_error::Error;
34
use linkerd2_proxy_transport::listen::Addrs;
45
use tower::util::ServiceExt;
5-
use tracing::{debug, error, info, info_span};
6+
use tracing::{debug, info, info_span};
67
use tracing_futures::Instrument;
78

89
/// Spawns a task that binds an `L`-typed listener with an `A`-typed
@@ -11,15 +12,13 @@ use tracing_futures::Instrument;
1112
/// The task is driven until shutdown is signaled.
1213
pub async fn serve<M, A, I>(
1314
listen: impl Stream<Item = std::io::Result<(Addrs, I)>>,
14-
mut make_accept: M,
15+
mut new_accept: M,
1516
shutdown: impl Future,
1617
) -> Result<(), Error>
1718
where
1819
I: Send + 'static,
19-
M: tower::Service<Addrs, Response = A>,
20-
M::Error: Into<Error>,
21-
M::Future: Send + 'static,
22-
A: tower::Service<I, Response = ()> + Send,
20+
M: svc::NewService<Addrs, Service = A>,
21+
A: tower::Service<I, Response = ()> + Send + 'static,
2322
A::Error: Into<Error>,
2423
A::Future: Send + 'static,
2524
{
@@ -39,26 +38,14 @@ where
3938
target.addr = %addrs.target_addr(),
4039
);
4140

42-
// Ready the service before dispatching the request to it.
43-
//
44-
// This allows the service to propagate errors and to exert backpressure on the
45-
// listener. It also avoids a `Clone` requirement.
46-
let accept = make_accept
47-
.ready_and()
48-
.err_into::<Error>()
49-
.instrument(span.clone())
50-
.await?
51-
.call(addrs);
41+
let accept = new_accept.new_service(addrs);
5242

5343
// Dispatch all of the work for a given connection onto a connection-specific task.
5444
tokio::spawn(
5545
async move {
56-
match accept.err_into::<Error>().await {
57-
Err(error) => error!(%error, "Failed to dispatch connection"),
58-
Ok(accept) => match accept.oneshot(io).err_into::<Error>().await {
59-
Ok(()) => debug!("Connection closed"),
60-
Err(error) => info!(%error, "Connection closed"),
61-
},
46+
match accept.oneshot(io).err_into::<Error>().await {
47+
Ok(()) => debug!("Connection closed"),
48+
Err(error) => info!(%error, "Connection closed"),
6249
}
6350
}
6451
.instrument(span),

linkerd/app/inbound/src/lib.rs

Lines changed: 19 additions & 36 deletions
Original file line numberDiff line numberDiff line change
@@ -52,17 +52,15 @@ impl Config {
5252
metrics: ProxyMetrics,
5353
span_sink: Option<mpsc::Sender<oc::Span>>,
5454
drain: drain::Watch,
55-
) -> impl tower::Service<
55+
) -> impl svc::NewService<
5656
listen::Addrs,
57-
Error = impl Into<Error>,
58-
Future = impl Send + 'static,
59-
Response = impl tower::Service<
57+
Service = impl tower::Service<
6058
tokio::net::TcpStream,
6159
Response = (),
6260
Error = impl Into<Error>,
6361
Future = impl Send + 'static,
6462
> + Send
65-
+ 'static,
63+
+ 'static,
6664
> + Send
6765
+ 'static
6866
where
@@ -147,17 +145,15 @@ impl Config {
147145
tap_layer: tap::Layer,
148146
metrics: ProxyMetrics,
149147
span_sink: Option<mpsc::Sender<oc::Span>>,
150-
) -> impl tower::Service<
148+
) -> impl svc::NewService<
151149
Target,
152-
Error = Error,
153-
Future = impl Send,
154-
Response = impl tower::Service<
150+
Service = impl tower::Service<
155151
http::Request<http::boxed::Payload>,
156152
Response = http::Response<http::boxed::Payload>,
157153
Error = Error,
158154
Future = impl Send,
159155
> + Unpin
160-
+ Send,
156+
+ Send,
161157
> + Unpin
162158
+ Clone
163159
+ Send
@@ -282,7 +278,8 @@ impl Config {
282278
)
283279
.into_make_service()
284280
.spawn_buffer(buffer_capacity)
285-
.check_make_service::<Target, http::Request<http::boxed::Payload>>()
281+
.into_new_service()
282+
.check_new_service::<Target, http::Request<http::boxed::Payload>>()
286283
.into_inner()
287284
}
288285

@@ -293,30 +290,25 @@ impl Config {
293290
metrics: ProxyMetrics,
294291
span_sink: Option<mpsc::Sender<oc::Span>>,
295292
drain: drain::Watch,
296-
) -> impl tower::Service<
293+
) -> impl svc::NewService<
297294
tls::accept::Meta,
298-
Error = impl Into<Error>,
299-
Future = impl Send + 'static,
300-
Response = impl tower::Service<
295+
Service = impl tower::Service<
301296
I,
302297
Response = (),
303298
Error = impl Into<Error>,
304299
Future = impl Send + 'static,
305300
> + Send
306-
+ 'static,
301+
+ 'static,
307302
> + Clone
308303
+ Send
309304
+ 'static
310305
where
311306
I: io::AsyncRead + io::AsyncWrite + Unpin + Send + 'static,
312-
F: tower::Service<TcpEndpoint, Response = A> + Unpin + Clone + Send + 'static,
313-
F::Error: Into<Error>,
314-
F::Future: Send,
307+
F: svc::NewService<TcpEndpoint, Service = A> + Unpin + Clone + Send + 'static,
315308
A: tower::Service<io::PrefixedIo<I>, Response = ()> + Clone + Send + 'static,
316309
A::Error: Into<Error>,
317310
A::Future: Send,
318-
H: tower::Service<Target, Response = S, Error = Error> + Unpin + Clone + Send + 'static,
319-
H::Future: Send,
311+
H: svc::NewService<Target, Service = S> + Unpin + Clone + Send + 'static,
320312
S: tower::Service<
321313
http::Request<http::boxed::Payload>,
322314
Response = http::Response<http::boxed::Payload>,
@@ -351,15 +343,13 @@ impl Config {
351343
));
352344

353345
let http_server = svc::stack(http_router)
354-
// Limits the amount of time each request waits to obtain a
355-
// ready service.
356-
.push_timeout(dispatch_timeout)
357346
// Removes the override header after it has been used to
358347
// determine a reuquest target.
359348
.push_on_response(strip_header::request::layer(DST_OVERRIDE_HEADER))
360349
// Routes each request to a target, obtains a service for that
361350
// target, and dispatches the request.
362351
.instrument_from_target()
352+
.into_make_service()
363353
.push(router::Layer::new(RequestTarget::from))
364354
// Used by tap.
365355
.push_http_insert_target()
@@ -375,7 +365,6 @@ impl Config {
375365
)
376366
.instrument(|_: &_| debug_span!("source"))
377367
.check_new_service::<tls::accept::Meta, http::Request<_>>()
378-
.into_make_service()
379368
.into_inner();
380369

381370
DetectHttp::new(
@@ -395,29 +384,23 @@ impl Config {
395384
tcp_forward: F,
396385
identity: tls::Conditional<identity::Local>,
397386
metrics: ProxyMetrics,
398-
) -> impl tower::Service<
387+
) -> impl svc::NewService<
399388
listen::Addrs,
400-
Error = impl Into<Error>,
401-
Future = impl Send + 'static,
402-
Response = impl tower::Service<
389+
Service = impl tower::Service<
403390
TcpStream,
404391
Response = (),
405392
Error = impl Into<Error>,
406393
Future = impl Send + 'static,
407394
> + Send
408-
+ 'static,
395+
+ 'static,
409396
> + Send
410397
+ 'static
411398
where
412-
D: tower::Service<tls::accept::Meta, Response = A> + Unpin + Clone + Send + Sync + 'static,
413-
D::Error: Into<Error>,
414-
D::Future: Unpin + Send,
399+
D: svc::NewService<tls::accept::Meta, Service = A> + Unpin + Clone + Send + Sync + 'static,
415400
A: tower::Service<SensorIo<io::BoxedIo>, Response = ()> + Unpin + Send + 'static,
416401
A::Error: Into<Error>,
417402
A::Future: Send,
418-
F: tower::Service<TcpEndpoint, Response = B> + Unpin + Clone + Send + Sync + 'static,
419-
F::Error: Into<Error>,
420-
F::Future: Unpin + Send,
403+
F: svc::NewService<TcpEndpoint, Service = B> + Unpin + Clone + Send + Sync + 'static,
421404
B: tower::Service<SensorIo<TcpStream>, Response = ()> + Unpin + Send + 'static,
422405
B::Error: Into<Error>,
423406
B::Future: Send,

linkerd/app/outbound/src/lib.rs

Lines changed: 5 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -402,17 +402,15 @@ impl Config {
402402
metrics: ProxyMetrics,
403403
span_sink: Option<mpsc::Sender<oc::Span>>,
404404
drain: drain::Watch,
405-
) -> impl tower::Service<
405+
) -> impl svc::NewService<
406406
listen::Addrs,
407-
Error = impl Into<Error>,
408-
Future = impl Send + 'static,
409-
Response = impl tower::Service<
407+
Service = impl tower::Service<
410408
I,
411409
Response = (),
412410
Error = impl Into<Error>,
413411
Future = impl Send + 'static,
414412
> + Send
415-
+ 'static,
413+
+ 'static,
416414
> + Send
417415
+ 'static
418416
where
@@ -530,8 +528,8 @@ impl Config {
530528

531529
svc::stack(svc::stack::MakeSwitch::new(
532530
skip_detect.clone(),
533-
http,
534-
tcp_forward.push_map_target(TcpEndpoint::from),
531+
svc::stack(http).into_new_service().into_inner(),
532+
tcp_forward.push_map_target(TcpEndpoint::from).into_inner(),
535533
))
536534
.push(metrics.transport.layer_accept(TransportLabels))
537535
.into_inner()

linkerd/app/src/tap.rs

Lines changed: 6 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -1,12 +1,12 @@
1-
use futures::{future, prelude::*};
1+
use futures::prelude::*;
22
use indexmap::IndexSet;
33
use linkerd2_app_core::{
44
config::ServerConfig,
55
drain,
66
proxy::{identity, tap},
77
serve,
88
transport::{io, tls},
9-
Error, Never,
9+
Error,
1010
};
1111
use std::net::SocketAddr;
1212
use std::pin::Pin;
@@ -55,15 +55,15 @@ impl Config {
5555
tap::AcceptPermittedClients::new(permitted_peer_identities.into(), server);
5656
let accept = tls::DetectTls::new(
5757
identity,
58-
service_fn(move |meta: tls::accept::Meta| {
58+
move |meta: tls::accept::Meta| {
5959
let service = service.clone();
60-
future::ok::<_, Never>(service_fn(move |io: io::BoxedIo| {
60+
service_fn(move |io: io::BoxedIo| {
6161
let fut = service.clone().oneshot((meta.clone(), io));
6262
Box::pin(async move {
6363
fut.err_into::<Error>().await?.err_into::<Error>().await
6464
})
65-
}))
66-
}),
65+
})
66+
},
6767
std::time::Duration::from_secs(1),
6868
);
6969

linkerd/proxy/http/src/detect.rs

Lines changed: 20 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -8,6 +8,7 @@ use futures::prelude::*;
88
use linkerd2_drain as drain;
99
use linkerd2_error::Error;
1010
use linkerd2_io::{self as io, PrefixedIo};
11+
use linkerd2_stack::NewService;
1112
use std::{
1213
future::Future,
1314
pin::Pin,
@@ -69,6 +70,25 @@ impl<F, H> DetectHttp<F, H> {
6970
}
7071
}
7172

73+
impl<T, F, S> NewService<T> for DetectHttp<F, S>
74+
where
75+
T: Clone,
76+
F: NewService<T>,
77+
S: NewService<T>,
78+
{
79+
type Service = AcceptHttp<F::Service, S::Service>;
80+
81+
fn new_service(&mut self, target: T) -> Self::Service {
82+
AcceptHttp::new(
83+
self.server.clone(),
84+
self.timeout,
85+
self.http.new_service(target.clone()),
86+
self.tcp.new_service(target),
87+
self.drain.clone(),
88+
)
89+
}
90+
}
91+
7292
impl<T, F, S> Service<T> for DetectHttp<F, S>
7393
where
7494
T: Clone + Send + 'static,

linkerd/proxy/transport/src/metrics.rs

Lines changed: 7 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -5,7 +5,7 @@ use linkerd2_io as io;
55
use linkerd2_metrics::{
66
latency, metrics, Counter, FmtLabels, FmtMetric, FmtMetrics, Gauge, Histogram, Metric,
77
};
8-
use linkerd2_stack::layer;
8+
use linkerd2_stack::{layer, NewService};
99
use pin_project::pin_project;
1010
use std::fmt;
1111
use std::future::Future;
@@ -246,22 +246,14 @@ where
246246
}
247247
}
248248

249-
impl<L, T, M> tower::Service<T> for MakeAccept<L, L::Labels, M>
249+
impl<L, T, M> NewService<T> for MakeAccept<L, L::Labels, M>
250250
where
251251
L: TransportLabels<T>,
252-
M: tower::Service<T>,
253-
M::Future: Send + 'static,
252+
M: NewService<T>,
254253
{
255-
type Response = Accept<M::Response>;
256-
type Error = M::Error;
257-
type Future =
258-
Pin<Box<dyn Future<Output = Result<Accept<M::Response>, M::Error>> + Send + 'static>>;
254+
type Service = Accept<M::Service>;
259255

260-
fn poll_ready(&mut self, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
261-
self.inner.poll_ready(cx)
262-
}
263-
264-
fn call(&mut self, target: T) -> Self::Future {
256+
fn new_service(&mut self, target: T) -> Self::Service {
265257
let labels = self.label.transport_labels(&target);
266258
let metrics = self
267259
.registry
@@ -270,11 +262,8 @@ where
270262
.get_or_default(labels)
271263
.clone();
272264

273-
let fut = self.inner.call(target);
274-
Box::pin(async move {
275-
let inner = fut.await?;
276-
Ok(Accept { metrics, inner })
277-
})
265+
let inner = self.inner.new_service(target);
266+
Accept { metrics, inner }
278267
}
279268
}
280269

0 commit comments

Comments
 (0)