Skip to content

Commit 72f0e3d

Browse files
authored
Ensure that services are held as long they are being used (#767)
Middlewares, especially the cache, may want to use RAII to detect when a service is idle, but the TCP server and HTTP routers drop services as soon as the request is dispatched. This change modifies the TCP server to hold the TCP stack until all work on that connection is complete. It also introduces a new `http::Retain` middleware that clones its inner service into response bodies. This is necessary for upcoming cache eviction changes to address linkerd/linkerd2#5334.
1 parent 7f6159d commit 72f0e3d

File tree

4 files changed

+150
-8
lines changed

4 files changed

+150
-8
lines changed

linkerd/app/core/src/serve.rs

Lines changed: 16 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -3,7 +3,7 @@ use futures::prelude::*;
33
use linkerd2_error::Error;
44
use linkerd2_proxy_transport::listen::Addrs;
55
use tower::util::ServiceExt;
6-
use tracing::{debug, info, info_span};
6+
use tracing::{debug, info, info_span, warn};
77
use tracing_futures::Instrument;
88

99
/// Spawns a task that binds an `L`-typed listener with an `A`-typed
@@ -28,7 +28,7 @@ where
2828
match listen.next().await {
2929
None => return Ok(()),
3030
Some(conn) => {
31-
// If the listener returned an error, complete the task
31+
// If the listener returned an error, complete the task.
3232
let (addrs, io) = conn?;
3333

3434
// The local addr should be instrumented from the listener's context.
@@ -43,9 +43,20 @@ where
4343
// Dispatch all of the work for a given connection onto a connection-specific task.
4444
tokio::spawn(
4545
async move {
46-
match accept.oneshot(io).err_into::<Error>().await {
47-
Ok(()) => debug!("Connection closed"),
48-
Err(error) => info!(%error, "Connection closed"),
46+
match accept.ready_oneshot().err_into::<Error>().await {
47+
Ok(mut accept) => {
48+
match accept.call(io).err_into::<Error>().await {
49+
Ok(()) => debug!("Connection closed"),
50+
Err(error) => info!(%error, "Connection closed"),
51+
}
52+
// Hold the service until the connection is
53+
// complete. This helps tie any inner cache
54+
// lifetimes to the services they return.
55+
drop(accept);
56+
}
57+
Err(error) => {
58+
warn!(%error, "Server failed to become ready");
59+
}
4960
}
5061
}
5162
.instrument(span),

linkerd/app/inbound/src/lib.rs

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -164,7 +164,8 @@ impl Config {
164164
Response = http::Response<http::boxed::BoxBody>,
165165
Error = Error,
166166
Future = impl Send,
167-
> + Unpin
167+
> + Clone
168+
+ Unpin
168169
+ Send,
169170
> + Unpin
170171
+ Clone
@@ -320,7 +321,8 @@ impl Config {
320321
http::Request<http::boxed::BoxBody>,
321322
Response = http::Response<http::boxed::BoxBody>,
322323
Error = Error,
323-
> + Send
324+
> + Clone
325+
+ Send
324326
+ 'static,
325327
S::Future: Send,
326328
{

linkerd/proxy/http/src/lib.rs

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -16,6 +16,7 @@ pub mod insert;
1616
pub mod normalize_uri;
1717
pub mod orig_proto;
1818
pub mod override_authority;
19+
mod retain;
1920
mod server;
2021
pub mod strip_header;
2122
pub mod timeout;
@@ -25,8 +26,9 @@ mod version;
2526

2627
pub use self::{
2728
client_handle::{ClientHandle, SetClientHandle},
28-
glue::HyperServerSvc,
29+
glue::{HyperServerSvc, UpgradeBody},
2930
override_authority::CanOverrideAuthority,
31+
retain::Retain,
3032
server::NewServeHttp,
3133
timeout::MakeTimeoutLayer,
3234
version::Version,

linkerd/proxy/http/src/retain.rs

Lines changed: 127 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,127 @@
1+
//! Provides a middleware that holds an inner service as long as responses are
2+
//! being processed.
3+
4+
use linkerd2_stack::layer;
5+
use pin_project::pin_project;
6+
use std::{
7+
future::Future,
8+
pin::Pin,
9+
task::{Context, Poll},
10+
};
11+
12+
/// Retains an inner service as long as its HTTP responses are being processsed
13+
/// so that RAII-guarded resources are held. This is mostly intended to support
14+
/// cache eviction.
15+
#[derive(Debug)]
16+
pub struct Retain<S, B> {
17+
inner: S,
18+
_marker: std::marker::PhantomData<fn() -> B>,
19+
}
20+
21+
/// Wraps a `B` typed HTTP body to ensure that a `T` typed instance is held until
22+
/// the body is fully processed.
23+
#[pin_project]
24+
#[derive(Debug)]
25+
pub struct RetainBody<T, B> {
26+
#[pin]
27+
inner: B,
28+
29+
_retain: Option<T>,
30+
}
31+
32+
// === impl Retain ===
33+
34+
impl<S, B> Retain<S, B> {
35+
pub fn new(inner: S) -> Self {
36+
Self {
37+
inner,
38+
_marker: std::marker::PhantomData,
39+
}
40+
}
41+
42+
pub fn layer() -> impl layer::Layer<S, Service = Self> + Copy + Clone {
43+
layer::mk(Self::new)
44+
}
45+
}
46+
47+
impl<S: Clone, B> Clone for Retain<S, B> {
48+
fn clone(&self) -> Self {
49+
Self {
50+
inner: self.inner.clone(),
51+
_marker: std::marker::PhantomData,
52+
}
53+
}
54+
}
55+
56+
impl<S, Req, RspB> tower::Service<Req> for Retain<S, RspB>
57+
where
58+
S: tower::Service<Req, Response = http::Response<RspB>> + Clone + Send + 'static,
59+
S::Future: Send + 'static,
60+
{
61+
type Response = http::Response<RetainBody<S, RspB>>;
62+
type Error = S::Error;
63+
type Future = Pin<
64+
Box<
65+
dyn Future<Output = Result<http::Response<RetainBody<S, RspB>>, S::Error>>
66+
+ Send
67+
+ 'static,
68+
>,
69+
>;
70+
71+
fn poll_ready(&mut self, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
72+
self.inner.poll_ready(cx)
73+
}
74+
75+
fn call(&mut self, req: Req) -> Self::Future {
76+
let fut = self.inner.call(req);
77+
78+
// Retain a handle to the inner service until the response is completely
79+
// processed.
80+
let service = self.inner.clone();
81+
Box::pin(async move {
82+
let rsp = fut.await?;
83+
Ok(rsp.map(move |inner| RetainBody {
84+
inner,
85+
_retain: Some(service),
86+
}))
87+
})
88+
}
89+
}
90+
91+
// === impl RetainBody ===
92+
93+
impl<T, B: Default> Default for RetainBody<T, B> {
94+
fn default() -> Self {
95+
Self {
96+
inner: B::default(),
97+
_retain: None,
98+
}
99+
}
100+
}
101+
102+
impl<T, B: http_body::Body> http_body::Body for RetainBody<T, B> {
103+
type Data = B::Data;
104+
type Error = B::Error;
105+
106+
fn is_end_stream(&self) -> bool {
107+
self.inner.is_end_stream()
108+
}
109+
110+
fn poll_data(
111+
self: Pin<&mut Self>,
112+
cx: &mut Context<'_>,
113+
) -> Poll<Option<Result<B::Data, B::Error>>> {
114+
self.project().inner.poll_data(cx)
115+
}
116+
117+
fn poll_trailers(
118+
self: Pin<&mut Self>,
119+
cx: &mut Context<'_>,
120+
) -> Poll<Result<Option<http::HeaderMap<http::HeaderValue>>, B::Error>> {
121+
self.project().inner.poll_trailers(cx)
122+
}
123+
124+
fn size_hint(&self) -> http_body::SizeHint {
125+
self.inner.size_hint()
126+
}
127+
}

0 commit comments

Comments
 (0)