Skip to content

Commit b0cf2fb

Browse files
authored
cache: Ensure that actively held services are not evicted (#768)
Cache eviction is currently triggered when a service has not processed new requests for some idle timeout. This is fragile for caches that may process a few long-lived requests. In practice, we would prefer to only start tracking idleness when there are no *active* requests on a service. This change restructures the cache to return services that are wrapped with tracking handles, rather than passing a tracking handle into the inner service. When a returned service is dropped, it spawns a background task that retains the handle for an idle timeout and, if no new instances have acquire the handle have that timeout, removes the service from the cache. This change reduces latency as well as CPU and memory utilization in load tests. Furthermore, it ultimately eliminates the need for a specialized buffer implementation. Fixes linkerd/linkerd2#5334
1 parent 7a2f2b2 commit b0cf2fb

File tree

14 files changed

+725
-257
lines changed

14 files changed

+725
-257
lines changed

Cargo.lock

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1011,6 +1011,7 @@ dependencies = [
10111011
"http-body",
10121012
"hyper",
10131013
"linkerd2-app-core",
1014+
"linkerd2-identity",
10141015
"regex 0.1.80",
10151016
"tokio 0.3.5",
10161017
"tokio-test 0.3.0",
@@ -1044,9 +1045,11 @@ dependencies = [
10441045
"linkerd2-error",
10451046
"linkerd2-stack",
10461047
"parking_lot",
1047-
"tokio 0.2.23",
1048+
"tokio 0.3.5",
10481049
"tower",
10491050
"tracing",
1051+
"tracing-futures",
1052+
"tracing-subscriber",
10501053
]
10511054

10521055
[[package]]

linkerd/app/core/src/svc.rs

Lines changed: 5 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -78,18 +78,6 @@ impl<L> Layers<L> {
7878
self.push(buffer::SpawnBufferLayer::new(capacity))
7979
}
8080

81-
pub fn push_spawn_buffer_with_idle_timeout<Req, Rsp>(
82-
self,
83-
capacity: usize,
84-
idle_timeout: Duration,
85-
) -> Layers<Pair<L, buffer::SpawnBufferLayer<Req, Rsp>>>
86-
where
87-
Req: Send + 'static,
88-
Rsp: Send + 'static,
89-
{
90-
self.push(buffer::SpawnBufferLayer::new(capacity).with_idle_timeout(idle_timeout))
91-
}
92-
9381
// Makes the service eagerly process and fail requests after the given timeout.
9482
pub fn push_failfast(self, timeout: Duration) -> Layers<Pair<L, timeout::FailFastLayer>> {
9583
self.push(timeout::FailFastLayer::new(timeout))
@@ -239,14 +227,13 @@ impl<S> Stack<S> {
239227
self.push(http::insert::target::layer())
240228
}
241229

242-
pub fn cache<T, L, U>(self, track: L) -> Stack<cache::Cache<T, cache::layer::NewTrack<L, S>>>
230+
pub fn push_cache<T>(self, idle: Duration) -> Stack<cache::Cache<T, S>>
243231
where
244-
T: Eq + std::hash::Hash + Send + 'static,
245-
S: NewService<T> + Clone,
246-
L: tower::layer::Layer<cache::layer::Track<S>> + Clone,
247-
L::Service: NewService<T, Service = U>,
232+
T: Eq + std::hash::Hash + Send + Sync + 'static,
233+
S: NewService<T> + 'static,
234+
S::Service: Send + Sync + 'static,
248235
{
249-
self.push(cache::CacheLayer::new(track))
236+
self.push(cache::Cache::layer(idle))
250237
}
251238

252239
/// Push a service that either calls the inner service if it is ready, or

linkerd/app/inbound/src/lib.rs

Lines changed: 16 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -24,7 +24,7 @@ use linkerd2_app_core::{
2424
},
2525
reconnect,
2626
spans::SpanConverter,
27-
svc::{self},
27+
svc,
2828
transport::{self, io, listen, tls},
2929
Error, NameAddr, NameMatch, TraceContext, DST_OVERRIDE_HEADER,
3030
};
@@ -165,8 +165,9 @@ impl Config {
165165
Error = Error,
166166
Future = impl Send,
167167
> + Clone
168-
+ Unpin
169-
+ Send,
168+
+ Send
169+
+ Sync
170+
+ Unpin,
170171
> + Unpin
171172
+ Clone
172173
+ Send
@@ -275,14 +276,17 @@ impl Config {
275276
// fails, skips that stack to forward to the local endpoint.
276277
svc::stack(switch_loopback)
277278
.check_new_service::<Target, http::Request<http::boxed::BoxBody>>()
278-
.cache(
279-
svc::layers().push_on_response(
280-
svc::layers()
281-
.push_failfast(dispatch_timeout)
282-
.push_spawn_buffer_with_idle_timeout(buffer_capacity, cache_max_idle_age)
283-
.push(metrics.stack.layer(stack_labels("logical")))
284-
.box_http_response(),
285-
),
279+
.push_on_response(
280+
svc::layers()
281+
.push_failfast(dispatch_timeout)
282+
.push_spawn_buffer(buffer_capacity)
283+
.push(metrics.stack.layer(stack_labels("logical"))),
284+
)
285+
.push_cache(cache_max_idle_age)
286+
.push_on_response(
287+
svc::layers()
288+
.push(http::Retain::layer())
289+
.box_http_response(),
286290
)
287291
// Boxing is necessary purely to limit the link-time overhead of
288292
// having enormous types.
@@ -358,9 +362,9 @@ impl Config {
358362
.push(svc::layer::mk(|inner| {
359363
svc::stack::NewRouter::new(RequestTarget::from, inner)
360364
}))
365+
.check_new_service::<TcpAccept, http::Request<_>>()
361366
// Used by tap.
362367
.push_http_insert_target()
363-
.check_new_service::<TcpAccept, http::Request<_>>()
364368
.push_on_response(
365369
svc::layers()
366370
.push(http_admit_request)

0 commit comments

Comments
 (0)