Skip to content

Commit cf9900d

Browse files
authored
cache: Only spawn a single task per cache entry (#770)
In b0cf2fb, the proxy moved eviction to be controlled by a background task; but this scheme spawned a task every time a service was dropped. This change modifies the cache to only spawn a single task per cached service and use a notification channel to signal drops. This reduces memory pressure, especially for HTTP caches.
1 parent b0cf2fb commit cf9900d

File tree

3 files changed

+115
-115
lines changed

3 files changed

+115
-115
lines changed

linkerd/app/core/src/svc.rs

Lines changed: 5 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -8,8 +8,10 @@ use linkerd2_concurrency_limit as concurrency_limit;
88
pub use linkerd2_stack::{self as stack, layer, NewService};
99
pub use linkerd2_stack_tracing::{InstrumentMake, InstrumentMakeLayer};
1010
pub use linkerd2_timeout as timeout;
11-
use std::task::{Context, Poll};
12-
use std::time::Duration;
11+
use std::{
12+
task::{Context, Poll},
13+
time::Duration,
14+
};
1315
use tower::layer::util::{Identity, Stack as Pair};
1416
pub use tower::layer::Layer;
1517
pub use tower::make::MakeService;
@@ -229,7 +231,7 @@ impl<S> Stack<S> {
229231

230232
pub fn push_cache<T>(self, idle: Duration) -> Stack<cache::Cache<T, S>>
231233
where
232-
T: Eq + std::hash::Hash + Send + Sync + 'static,
234+
T: Clone + Eq + std::fmt::Debug + std::hash::Hash + Send + Sync + 'static,
233235
S: NewService<T> + 'static,
234236
S::Service: Send + Sync + 'static,
235237
{

linkerd/cache/Cargo.toml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -10,7 +10,7 @@ futures = "0.3"
1010
linkerd2-error = { path = "../error" }
1111
linkerd2-stack = { path = "../stack" }
1212
parking_lot = "0.11"
13-
tokio = { version = "0.3", default-features = false, features = ["rt", "time"] }
13+
tokio = { version = "0.3", default-features = false, features = ["rt", "sync", "time"] }
1414
tower = { version = "0.4", default-features = false, features = ["util"] }
1515
tracing = "0.1.22"
1616
tracing-futures = "0.2"

linkerd/cache/src/lib.rs

Lines changed: 109 additions & 111 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,6 @@
11
#![deny(warnings, rust_2018_idioms)]
22

3+
use futures::prelude::*;
34
use linkerd2_stack::{layer, NewService};
45
use parking_lot::RwLock;
56
use std::{
@@ -8,9 +9,8 @@ use std::{
89
sync::{Arc, Weak},
910
task::{Context, Poll},
1011
};
11-
use tokio::time;
12-
use tracing::{debug, debug_span, trace};
13-
use tracing_futures::Instrument;
12+
use tokio::{sync::Notify, time};
13+
use tracing::{debug, instrument, trace};
1414

1515
#[derive(Clone)]
1616
pub struct Cache<T, N>
@@ -19,72 +19,125 @@ where
1919
N: NewService<T>,
2020
{
2121
inner: N,
22-
services: Services<T, N::Service>,
22+
services: Arc<Services<T, N::Service>>,
2323
idle: time::Duration,
2424
}
2525

2626
#[derive(Clone, Debug)]
27-
pub struct Cached<S, T>
27+
pub struct Cached<S>
2828
where
2929
S: Send + Sync + 'static,
30-
T: Eq + Hash + Send + Sync + 'static,
3130
{
3231
inner: S,
33-
handle: Option<Handle<S, T>>,
34-
idle: time::Duration,
35-
}
36-
37-
#[derive(Debug)]
38-
struct Handle<S, T> {
39-
handle: Arc<T>,
40-
cache: Weak<RwLock<HashMap<T, (S, Weak<T>)>>>,
32+
// Notifies entry's eviction task that a drop has occurred.
33+
handle: Arc<Notify>,
4134
}
4235

43-
type Services<T, S> = Arc<RwLock<HashMap<T, (S, Weak<T>)>>>;
36+
type Services<T, S> = RwLock<HashMap<T, (S, Weak<Notify>)>>;
4437

4538
// === impl Cache ===
4639

4740
impl<T, N> Cache<T, N>
4841
where
49-
T: Eq + Hash + Send + Sync + 'static,
42+
T: Clone + std::fmt::Debug + Eq + Hash + Send + Sync + 'static,
5043
N: NewService<T> + 'static,
5144
N::Service: Send + Sync + 'static,
5245
{
5346
pub fn layer(idle: time::Duration) -> impl layer::Layer<N, Service = Self> + Clone {
5447
layer::mk(move |inner| Self::new(idle, inner))
5548
}
5649

57-
pub fn new(idle: time::Duration, inner: N) -> Self {
58-
let services = Services::default();
59-
50+
fn new(idle: time::Duration, inner: N) -> Self {
51+
let services = Arc::new(Services::default());
6052
Self {
6153
inner,
6254
services,
6355
idle,
6456
}
6557
}
58+
59+
fn spawn_idle(
60+
target: T,
61+
idle: time::Duration,
62+
cache: &Arc<Services<T, N::Service>>,
63+
) -> Arc<Notify> {
64+
// Spawn a background task that holds the handle. Every time the handle
65+
// is notified, it resets the idle timeout. Every time teh idle timeout
66+
// expires, the handle is checked and the service is dropped if there
67+
// are no active handles.
68+
let handle = Arc::new(Notify::new());
69+
tokio::spawn(Self::evict(
70+
target,
71+
idle,
72+
handle.clone(),
73+
Arc::downgrade(&cache),
74+
));
75+
handle
76+
}
77+
78+
#[instrument(level = "debug", skip(idle, reset, cache))]
79+
async fn evict(
80+
target: T,
81+
idle: time::Duration,
82+
mut reset: Arc<Notify>,
83+
cache: Weak<Services<T, N::Service>>,
84+
) {
85+
// Wait for the handle to be notified before starting to track idleness.
86+
reset.notified().await;
87+
debug!("Awaiting idleness");
88+
89+
// Wait for either the reset to be notified or the idle timeout to
90+
// elapse.
91+
loop {
92+
futures::select_biased! {
93+
// If the reset was notified, restart the timer.
94+
_ = reset.notified().fuse() => {
95+
trace!("Reset");
96+
}
97+
_ = time::sleep(idle).fuse() => match cache.upgrade() {
98+
Some(cache) => match Arc::try_unwrap(reset) {
99+
// If this is the last reference to the handle after the
100+
// idle timeout, remove the cache entry.
101+
Ok(_) => {
102+
let removed = cache.write().remove(&target).is_some();
103+
debug_assert!(removed, "Cache item must exist: {:?}", target);
104+
debug!("Cache entry dropped");
105+
return;
106+
}
107+
// Otherwise, another handle has been acquired, so
108+
// restore our reset reference for the next iteration.
109+
Err(r) => {
110+
trace!("The handle is still active");
111+
reset = r;
112+
}
113+
},
114+
None => {
115+
trace!("Cache already dropped");
116+
return;
117+
}
118+
},
119+
}
120+
}
121+
}
66122
}
67123

68124
impl<T, N> NewService<T> for Cache<T, N>
69125
where
70-
T: Clone + Eq + Hash + Send + Sync + 'static,
71-
N: NewService<T>,
126+
T: Clone + std::fmt::Debug + Eq + Hash + Send + Sync + 'static,
127+
N: NewService<T> + 'static,
72128
N::Service: Clone + Send + Sync + 'static,
73129
{
74-
type Service = Cached<N::Service, T>;
75-
76-
fn new_service(&mut self, target: T) -> Cached<N::Service, T> {
77-
let cache = Arc::downgrade(&self.services);
130+
type Service = Cached<N::Service>;
78131

132+
fn new_service(&mut self, target: T) -> Cached<N::Service> {
79133
// We expect the item to be available in most cases, so initially obtain
80134
// only a read lock.
81135
if let Some((svc, weak)) = self.services.read().get(&target) {
82136
if let Some(handle) = weak.upgrade() {
83137
trace!("Using cached service");
84138
return Cached {
85139
inner: svc.clone(),
86-
idle: self.idle,
87-
handle: Some(Handle { cache, handle }),
140+
handle,
88141
};
89142
}
90143
}
@@ -97,58 +150,37 @@ where
97150
let (svc, weak) = entry.get();
98151
match weak.upgrade() {
99152
Some(handle) => {
100-
trace!("Using cached service");
153+
trace!(?target, "Using cached service");
101154
Cached {
102155
inner: svc.clone(),
103-
idle: self.idle,
104-
handle: Some(Handle { cache, handle }),
156+
handle,
105157
}
106158
}
107159
None => {
108-
debug!("Replacing defunct service");
109-
let inner = self.inner.new_service(target.clone());
110-
let handle = Arc::new(target);
160+
debug!(?target, "Replacing defunct service");
161+
let handle = Self::spawn_idle(target.clone(), self.idle, &self.services);
162+
let inner = self.inner.new_service(target);
111163
entry.insert((inner.clone(), Arc::downgrade(&handle)));
112-
Cached {
113-
inner,
114-
idle: self.idle,
115-
handle: Some(Handle { cache, handle }),
116-
}
164+
Cached { inner, handle }
117165
}
118166
}
119167
}
120168
Entry::Vacant(entry) => {
121-
debug!("Caching new service");
122-
let inner = self.inner.new_service(target.clone());
123-
let handle = Arc::new(target);
169+
debug!(?target, "Caching new service");
170+
let handle = Self::spawn_idle(target.clone(), self.idle, &self.services);
171+
let inner = self.inner.new_service(target);
124172
entry.insert((inner.clone(), Arc::downgrade(&handle)));
125-
Cached {
126-
inner,
127-
idle: self.idle,
128-
handle: Some(Handle { cache, handle }),
129-
}
173+
Cached { inner, handle }
130174
}
131175
}
132176
}
133177
}
134178

135-
// === impl Handle ===
136-
137-
impl<S, T> Clone for Handle<S, T> {
138-
fn clone(&self) -> Self {
139-
Self {
140-
cache: self.cache.clone(),
141-
handle: self.handle.clone(),
142-
}
143-
}
144-
}
145-
146179
// === impl Cached ===
147180

148-
impl<Req, S, T> tower::Service<Req> for Cached<S, T>
181+
impl<Req, S> tower::Service<Req> for Cached<S>
149182
where
150183
S: tower::Service<Req> + Send + Sync + 'static,
151-
T: Eq + Hash + Send + Sync + 'static,
152184
{
153185
type Response = S::Response;
154186
type Error = S::Error;
@@ -165,66 +197,35 @@ where
165197
}
166198
}
167199

168-
impl<S, T> Drop for Cached<S, T>
200+
impl<S> Drop for Cached<S>
169201
where
170202
S: Send + Sync + 'static,
171-
T: Eq + Hash + Send + Sync + 'static,
172203
{
173204
fn drop(&mut self) {
174-
// If the eviction task is still running, wait for an idle timeout,
175-
// retaining the handle for this service. If there are no new instances
176-
// holding the handle after that timeout, drop the last strong reference
177-
// and signal the eviction task.
178-
trace!("Dropping cached service");
179-
if let Some(Handle { cache, handle }) = self.handle.take() {
180-
let idle = self.idle;
181-
tokio::spawn(
182-
async move {
183-
trace!(?idle, "Waiting for timeout to elapse");
184-
time::sleep(idle).await;
185-
if let Some(cache) = cache.upgrade() {
186-
// Only evict the service if we can claim the target
187-
// from the handle, ensuring that there are no other
188-
// strong references to the handle.
189-
if let Ok(target) = Arc::try_unwrap(handle) {
190-
trace!("Evicting target");
191-
let mut services = cache.write();
192-
if services.remove(&target).is_some() {
193-
trace!("Evicted");
194-
} else {
195-
trace!("Service already evicted");
196-
}
197-
} else {
198-
trace!("Handle acquired by another instance");
199-
}
200-
} else {
201-
trace!("Cache dropped");
202-
}
203-
}
204-
.instrument(debug_span!("evict")),
205-
);
206-
}
205+
self.handle.notify_one();
207206
}
208207
}
209208

210209
#[cfg(test)]
211210
#[tokio::test]
212-
async fn cached_idle_retain() {
211+
async fn test_idle_retain() {
213212
let _ = tracing_subscriber::fmt::try_init();
214213
time::pause();
215214

216-
let cache = Services::default();
217-
let handle = Arc::new(());
215+
let idle = time::Duration::from_secs(10);
216+
let cache = Arc::new(Services::default());
217+
218+
let handle = Cache::<(), fn(()) -> ()>::spawn_idle((), idle, &cache);
218219
cache.write().insert((), ((), Arc::downgrade(&handle)));
219-
let c0 = Cached {
220-
inner: (),
221-
idle: time::Duration::from_secs(10),
222-
handle: Some(Handle {
223-
handle,
224-
cache: Arc::downgrade(&cache),
225-
}),
226-
};
227-
let handle = Arc::downgrade(&c0.handle.as_ref().unwrap().handle);
220+
let c0 = Cached { inner: (), handle };
221+
222+
let handle = Arc::downgrade(&c0.handle);
223+
224+
// Let an idle timeout elapse and ensured the held service has not been
225+
// evicted.
226+
time::sleep(idle * 2).await;
227+
assert!(handle.upgrade().is_some());
228+
assert!(cache.read().contains_key(&()));
228229

229230
// Drop the original cached instance and elapse only half of the idle
230231
// timeout.
@@ -237,11 +238,8 @@ async fn cached_idle_retain() {
237238
// new cached instance.
238239
let c1 = Cached {
239240
inner: (),
240-
idle: time::Duration::from_secs(10),
241-
handle: Some(Handle {
242-
handle: handle.upgrade().unwrap(),
243-
cache: Arc::downgrade(&cache),
244-
}),
241+
// Retain the handle from the first instance.
242+
handle: handle.upgrade().unwrap(),
245243
};
246244

247245
// Drop the new cache instance. Wait the remainder of the first idle timeout

0 commit comments

Comments
 (0)