Skip to content

Commit f897208

Browse files
authored
cache: replace Lock with Buffer (#587)
The use of a buffer hides some of the type complexity of the inner service (all of the type complexity, after #586). However, Linkerd currently only uses buffers at the `Service` level, not at the `MakeService` level. Since `MakeService`/`NewService`s tend to be generic over both inner `MakeService` types _and_ over the produced `Service`s (as well as potentially two future types), they probably have much longer concrete types than `Service`s in most cases. As @olix0r suggested, we can replace the `Lock` that's currently used to ensure exclusive access to `Cache`s (which are at the `MakeService`)s with `Buffer`s. The `Lock` is currently essentially doing something quite similar to adding a `Buffer` anyway. Introducing buffers around all caches erases the inner type for everything layered around the cache, which should make overall type length much shorter. This seems to have a fairly noticeable impact on build time and memory use (see linkerd/linkerd2#4676). On my machine, running `make docker` on #586 gets SIGKILLed (presumably by the OOM killer) after 7m53s. After this change, `make docker` completes successfully after 1m44s. Also, the `linkerd2-lock` crate can be removed, as it was used only by `Cache`. To work around increased tail latencies when the buffer fills up, this branch also sets the default buffer capacity to be equal to the default concurrency limit, rather than 10. This is fine, since the buffer capacity isn't _actually_ what enforces a bound on proxy memory use. The spawned tasks waiting on a full buffer are still sitting on the Tokio executor, and it's actually the in flight limit that stops us from accepting any more requests when we have too many in flight. Depends on #586. Fixes linkerd/linkerd2#4676. Signed-off-by: Eliza Weisman <[email protected]>
1 parent 89d7a30 commit f897208

File tree

16 files changed

+37
-637
lines changed

16 files changed

+37
-637
lines changed

Cargo.lock

Lines changed: 0 additions & 19 deletions
Original file line numberDiff line numberDiff line change
@@ -836,7 +836,6 @@ dependencies = [
836836
"linkerd2-exp-backoff",
837837
"linkerd2-http-classify",
838838
"linkerd2-http-metrics",
839-
"linkerd2-lock",
840839
"linkerd2-metrics",
841840
"linkerd2-opencensus",
842841
"linkerd2-proxy-api",
@@ -987,7 +986,6 @@ version = "0.1.0"
987986
dependencies = [
988987
"futures 0.3.5",
989988
"linkerd2-error",
990-
"linkerd2-lock",
991989
"linkerd2-stack",
992990
"tokio 0.2.21",
993991
"tower",
@@ -1167,23 +1165,6 @@ dependencies = [
11671165
"tokio-test",
11681166
]
11691167

1170-
[[package]]
1171-
name = "linkerd2-lock"
1172-
version = "0.1.0"
1173-
dependencies = [
1174-
"futures 0.3.5",
1175-
"linkerd2-error",
1176-
"rand 0.7.2",
1177-
"tokio 0.2.21",
1178-
"tokio-test",
1179-
"tower",
1180-
"tower-test",
1181-
"tracing",
1182-
"tracing-futures",
1183-
"tracing-log",
1184-
"tracing-subscriber",
1185-
]
1186-
11871168
[[package]]
11881169
name = "linkerd2-metrics"
11891170
version = "0.1.0"

Cargo.toml

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -28,7 +28,6 @@ members = [
2828
"linkerd/http-metrics",
2929
"linkerd/identity",
3030
"linkerd/io",
31-
"linkerd/lock",
3231
"linkerd/metrics",
3332
"linkerd/opencensus",
3433
"linkerd/proxy/api-resolve",

linkerd/app/core/Cargo.toml

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -38,7 +38,6 @@ linkerd2-error-respond = { path = "../../error-respond" }
3838
linkerd2-exp-backoff = { path = "../../exp-backoff" }
3939
linkerd2-http-classify = { path = "../../http-classify" }
4040
linkerd2-http-metrics = { path = "../../http-metrics" }
41-
linkerd2-lock = { path = "../../lock" }
4241
linkerd2-metrics = { path = "../../metrics" }
4342
linkerd2-opencensus = { path = "../../opencensus" }
4443
linkerd2-proxy-core = { path = "../../proxy/core" }

linkerd/app/core/src/svc.rs

Lines changed: 0 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -5,7 +5,6 @@ use crate::transport::Connect;
55
use crate::{cache, Error};
66
pub use linkerd2_buffer as buffer;
77
use linkerd2_concurrency_limit as concurrency_limit;
8-
pub use linkerd2_lock as lock;
98
pub use linkerd2_stack::{self as stack, layer, NewService};
109
pub use linkerd2_stack_tracing::{InstrumentMake, InstrumentMakeLayer};
1110
pub use linkerd2_timeout as timeout;
@@ -130,11 +129,6 @@ impl<L> Layers<L> {
130129
self.push(buffer::SpawnBufferLayer::new(capacity).with_idle_timeout(idle_timeout))
131130
}
132131

133-
/// Makes the inner service shareable in a mutually-exclusive fashion.
134-
pub fn push_lock(self) -> Layers<Pair<L, lock::LockLayer>> {
135-
self.push(lock::LockLayer::new())
136-
}
137-
138132
// Makes the service eagerly process and fail requests after the given timeout.
139133
pub fn push_failfast(self, timeout: Duration) -> Layers<Pair<L, timeout::FailFastLayer>> {
140134
self.push(timeout::FailFastLayer::new(timeout))

linkerd/app/inbound/src/lib.rs

Lines changed: 2 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -237,10 +237,8 @@ impl Config {
237237
.push(metrics.stack.layer(stack_labels("target"))),
238238
),
239239
)
240+
.spawn_buffer(buffer_capacity)
240241
.instrument(|_: &Target| info_span!("target"))
241-
// Prevent the cache's lock from being acquired in poll_ready, ensuring this happens
242-
// in the response future. This prevents buffers from holding the cache's lock.
243-
.push_oneshot()
244242
.check_service::<Target>();
245243

246244
// Routes targets to a Profile stack, i.e. so that profile
@@ -268,10 +266,9 @@ impl Config {
268266
.push(metrics.stack.layer(stack_labels("profile"))),
269267
),
270268
)
269+
.spawn_buffer(buffer_capacity)
271270
.instrument(|p: &Profile| info_span!("profile", addr = %p.addr()))
272271
.check_make_service::<Profile, Target>()
273-
// Ensures that cache's lock isn't held in poll_ready.
274-
.push_oneshot()
275272
.push(router::Layer::new(|()| ProfileTarget))
276273
.check_new_service_routes::<(), Target>()
277274
.new_service(());

linkerd/app/outbound/src/lib.rs

Lines changed: 6 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -117,11 +117,10 @@ impl Config {
117117
.push(metrics.layer(stack_labels("refine"))),
118118
),
119119
)
120+
.spawn_buffer(self.proxy.buffer_capacity)
120121
.instrument(|name: &dns::Name| info_span!("refine", %name))
121122
// Obtains the service, advances the state of the resolution
122123
.push(svc::make_response::Layer)
123-
// Ensures that the cache isn't locked when polling readiness.
124-
.push_oneshot()
125124
.into_inner()
126125
}
127126

@@ -295,9 +294,8 @@ impl Config {
295294
.push(metrics.stack.layer(stack_labels("balance"))),
296295
),
297296
)
298-
.instrument(|c: &Concrete<http::Settings>| info_span!("balance", addr = %c.addr))
299-
// Ensure that buffers don't hold the cache's lock in poll_ready.
300-
.push_oneshot();
297+
.spawn_buffer(buffer_capacity)
298+
.instrument(|c: &Concrete<http::Settings>| info_span!("balance", addr = %c.addr));
301299

302300
// Caches clients that bypass discovery/balancing.
303301
//
@@ -319,16 +317,15 @@ impl Config {
319317
.push(metrics.stack.layer(stack_labels("forward.endpoint"))),
320318
),
321319
)
320+
.spawn_buffer(buffer_capacity)
322321
.instrument(|endpoint: &Target<HttpEndpoint>| {
323322
info_span!("forward", peer.addr = %endpoint.addr, peer.id = ?endpoint.inner.identity)
324323
})
325324
.push_map_target(|t: Concrete<HttpEndpoint>| Target {
326325
addr: t.addr.into(),
327326
inner: t.inner.inner,
328327
})
329-
.check_service::<Concrete<HttpEndpoint>>()
330-
// Ensure that buffers don't hold the cache's lock in poll_ready.
331-
.push_oneshot();
328+
.check_service::<Concrete<HttpEndpoint>>();
332329

333330
// If the balancer fails to be created, i.e., because it is unresolvable, fall back to
334331
// using a router that dispatches request to the application-selected original destination.
@@ -396,9 +393,8 @@ impl Config {
396393
.push(metrics.stack.layer(stack_labels("profile"))),
397394
),
398395
)
396+
.spawn_buffer(buffer_capacity)
399397
.instrument(|_: &Profile| info_span!("profile"))
400-
// Ensures that the cache isn't locked when polling readiness.
401-
.push_oneshot()
402398
.check_make_service::<Profile, Logical<HttpEndpoint>>()
403399
.push(router::Layer::new(|()| ProfilePerTarget))
404400
.check_new_service_routes::<(), Logical<HttpEndpoint>>()
@@ -638,9 +634,6 @@ fn is_discovery_rejected(err: &Error) -> bool {
638634
if let Some(e) = err.downcast_ref::<svc::buffer::error::ServiceError>() {
639635
return is_discovery_rejected(e.inner());
640636
}
641-
if let Some(e) = err.downcast_ref::<svc::lock::error::ServiceError>() {
642-
return is_discovery_rejected(e.inner());
643-
}
644637

645638
err.is::<DiscoveryRejected>() || err.is::<profiles::InvalidProfileAddr>()
646639
}

linkerd/app/src/env.rs

Lines changed: 5 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -206,17 +206,15 @@ const DEFAULT_RESOLV_CONF: &str = "/etc/resolv.conf";
206206
const DEFAULT_INITIAL_STREAM_WINDOW_SIZE: u32 = 65_535; // Protocol default
207207
const DEFAULT_INITIAL_CONNECTION_WINDOW_SIZE: u32 = 1048576; // 1MB ~ 16 streams at capacity
208208

209-
// Because buffers propagate readiness, they should only need enough capacity to satisfy the
210-
// process's concurrency. This should probably be derived from the number of CPUs, but the num-cpus
211-
// crate does not support cgroups yet [seanmonstar/num_cpus#80].
212-
const DEFAULT_BUFFER_CAPACITY: usize = 10;
209+
// 10_000 is arbitrarily chosen for now...
210+
const DEFAULT_BUFFER_CAPACITY: usize = 10_000;
213211

214212
const DEFAULT_INBOUND_ROUTER_MAX_IDLE_AGE: Duration = Duration::from_secs(60);
215213
const DEFAULT_OUTBOUND_ROUTER_MAX_IDLE_AGE: Duration = Duration::from_secs(60);
216214

217-
// 10_000 is arbitrarily chosen for now...
218-
const DEFAULT_INBOUND_MAX_IN_FLIGHT: usize = 10_000;
219-
const DEFAULT_OUTBOUND_MAX_IN_FLIGHT: usize = 10_000;
215+
// By default, don't accept more requests than we can buffer.
216+
const DEFAULT_INBOUND_MAX_IN_FLIGHT: usize = DEFAULT_BUFFER_CAPACITY;
217+
const DEFAULT_OUTBOUND_MAX_IN_FLIGHT: usize = DEFAULT_BUFFER_CAPACITY;
220218

221219
const DEFAULT_DESTINATION_GET_SUFFIXES: &str = "svc.cluster.local.";
222220
const DEFAULT_DESTINATION_PROFILE_SUFFIXES: &str = "svc.cluster.local.";

linkerd/cache/Cargo.toml

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -8,7 +8,6 @@ publish = false
88
[dependencies]
99
futures = "0.3"
1010
linkerd2-error = { path = "../error" }
11-
linkerd2-lock = { path = "../lock" }
1211
linkerd2-stack = { path = "../stack" }
1312
tokio = "0.2"
1413
tower = { version = "0.3", default-features = false, features = ["util"] }

linkerd/cache/src/lib.rs

Lines changed: 24 additions & 45 deletions
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,6 @@
11
#![deny(warnings, rust_2018_idioms)]
22
use futures::future;
33
use linkerd2_error::Never;
4-
use linkerd2_lock::{Guard, Lock};
54
use linkerd2_stack::NewService;
65
use std::collections::HashMap;
76
use std::hash::Hash;
@@ -19,8 +18,7 @@ where
1918
N: NewService<(T, Handle)>,
2019
{
2120
new_service: N,
22-
lock: Lock<Services<T, N::Service>>,
23-
guard: Option<Guard<Services<T, N::Service>>>,
21+
services: Services<T, N::Service>,
2422
}
2523

2624
/// A tracker inserted into each inner service that, when dropped, indicates the service may be
@@ -34,68 +32,33 @@ type Services<T, S> = HashMap<T, (S, Weak<()>)>;
3432

3533
impl<T, N> Cache<T, N>
3634
where
37-
T: Eq + Hash + Send + 'static,
35+
T: Eq + Hash + Send,
3836
N: NewService<(T, Handle)>,
3937
{
4038
pub fn new(new_service: N) -> Self {
4139
Self {
4240
new_service,
43-
guard: None,
44-
lock: Lock::new(Services::default()),
45-
}
46-
}
47-
}
48-
49-
impl<T, N> Clone for Cache<T, N>
50-
where
51-
T: Clone + Eq + Hash,
52-
N: NewService<(T, Handle)> + Clone,
53-
N::Service: Clone,
54-
{
55-
fn clone(&self) -> Self {
56-
Self {
57-
new_service: self.new_service.clone(),
58-
lock: self.lock.clone(),
59-
guard: None,
41+
services: Services::default(),
6042
}
6143
}
6244
}
6345

6446
impl<T, N> tower::Service<T> for Cache<T, N>
6547
where
66-
T: Clone + Eq + Hash + Send + 'static,
48+
T: Clone + Eq + Hash + Send,
6749
N: NewService<(T, Handle)>,
68-
N::Service: Clone + Send + 'static,
50+
N::Service: Clone + Send,
6951
{
7052
type Response = N::Service;
7153
type Error = Never;
7254
type Future = future::Ready<Result<Self::Response, Self::Error>>;
7355

74-
fn poll_ready(&mut self, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
75-
if self.guard.is_none() {
76-
let mut services = futures::ready!(self.lock.poll_acquire(cx));
77-
// Drop defunct services before interacting with the cache.
78-
let n = services.len();
79-
services.retain(|_, (_, weak)| {
80-
if weak.strong_count() > 0 {
81-
true
82-
} else {
83-
trace!("Dropping defunct service");
84-
false
85-
}
86-
});
87-
debug!(services = services.len(), dropped = n - services.len());
88-
self.guard = Some(services);
89-
}
90-
91-
debug_assert!(self.guard.is_some(), "guard must be acquired");
56+
fn poll_ready(&mut self, _: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
9257
Poll::Ready(Ok(()))
9358
}
9459

9560
fn call(&mut self, target: T) -> Self::Future {
96-
let mut services = self.guard.take().expect("poll_ready must be called");
97-
98-
if let Some((service, weak)) = services.get(&target) {
61+
if let Some((service, weak)) = self.services.get(&target) {
9962
if weak.upgrade().is_some() {
10063
trace!("Using cached service");
10164
return future::ok(service.clone());
@@ -109,8 +72,24 @@ where
10972
.new_service
11073
.new_service((target.clone(), Handle(handle)));
11174

75+
// Drop defunct services before inserting the new service into the
76+
// cache.
77+
let n = self.services.len();
78+
self.services.retain(|_, (_, weak)| {
79+
if weak.strong_count() > 0 {
80+
true
81+
} else {
82+
trace!("Dropping defunct service");
83+
false
84+
}
85+
});
86+
debug!(
87+
services = self.services.len(),
88+
dropped = n - self.services.len()
89+
);
90+
11291
debug!("Caching new service");
113-
services.insert(target, (service.clone(), weak));
92+
self.services.insert(target, (service.clone(), weak));
11493

11594
future::ok(service.into())
11695
}

linkerd/lock/Cargo.toml

Lines changed: 0 additions & 25 deletions
This file was deleted.

0 commit comments

Comments
 (0)