Skip to content

Commit edb1f52

Browse files
authored
lock: Generalize to protect a guarded value (#431)
We used Tokio's Lock implementation in the router's cache implementation, though we know it can leak under contention. This change generalizes the Lock to return a guarded value (like Tokio's lock). This change simplifies the Lock's state management: the Lock may no longer hold a value, nor can it fail. The `lock::Service` implementation now holds a `Result<Service, ServiceError>` so that lock services may still broadcast the inner service's failure.
1 parent d6d94d5 commit edb1f52

File tree

12 files changed

+222
-234
lines changed

12 files changed

+222
-234
lines changed

Cargo.lock

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1216,6 +1216,7 @@ dependencies = [
12161216
"futures",
12171217
"indexmap",
12181218
"linkerd2-error",
1219+
"linkerd2-lock",
12191220
"tokio",
12201221
"tokio-sync",
12211222
"tokio-timer",

linkerd/lock/src/error.rs

Lines changed: 1 addition & 20 deletions
Original file line numberDiff line numberDiff line change
@@ -1,28 +1,9 @@
11
pub use linkerd2_error::Error;
22
use std::sync::Arc;
33

4-
#[derive(Debug)]
5-
pub struct Poisoned(());
6-
7-
#[derive(Debug)]
4+
#[derive(Clone, Debug)]
85
pub struct ServiceError(Arc<Error>);
96

10-
// === impl Poisoned ===
11-
12-
impl Poisoned {
13-
pub fn new() -> Self {
14-
Poisoned(())
15-
}
16-
}
17-
18-
impl std::fmt::Display for Poisoned {
19-
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
20-
write!(f, "poisoned")
21-
}
22-
}
23-
24-
impl std::error::Error for Poisoned {}
25-
267
// === impl ServiceError ===
278

289
impl ServiceError {

linkerd/lock/src/layer.rs

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,4 @@
1-
use super::Lock;
1+
use super::LockService;
22

33
#[derive(Clone, Debug, Default)]
44
pub struct LockLayer(());
@@ -12,7 +12,7 @@ impl LockLayer {
1212
}
1313

1414
impl<S> tower::layer::Layer<S> for LockLayer {
15-
type Service = Lock<S>;
15+
type Service = LockService<S>;
1616

1717
fn layer(&self, inner: S) -> Self::Service {
1818
Self::Service::new(inner)

linkerd/lock/src/lib.rs

Lines changed: 6 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -4,9 +4,14 @@
44

55
pub mod error;
66
mod layer;
7+
mod lock;
78
mod service;
89
mod shared;
910
#[cfg(test)]
1011
mod test;
1112

12-
pub use self::{layer::LockLayer, service::Lock};
13+
pub use self::{
14+
layer::LockLayer,
15+
lock::{Guard, Lock},
16+
service::LockService,
17+
};

linkerd/lock/src/lock.rs

Lines changed: 122 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,122 @@
1+
use crate::shared::{Shared, Wait};
2+
use futures::Async;
3+
use std::sync::{Arc, Mutex};
4+
5+
/// Provides mutually exclusive to a `T`-typed value, asynchronously.
6+
///
7+
/// Note that, when the lock is contested, waiters are notified in a LIFO
8+
/// fashion. This is done to minimize latency at the expense of unfairness.
9+
pub struct Lock<T> {
10+
/// Set when this Lock is interested in acquiring the value.
11+
waiting: Option<Wait>,
12+
shared: Arc<Mutex<Shared<T>>>,
13+
}
14+
15+
/// Guards access to a `T`-typed value, ensuring the value is released on Drop.
16+
pub struct Guard<T> {
17+
/// Must always be Some; Used to reclaim the value in Drop.
18+
value: Option<T>,
19+
20+
shared: Arc<Mutex<Shared<T>>>,
21+
}
22+
23+
// === impl Lock ===
24+
25+
impl<S> Lock<S> {
26+
pub fn new(service: S) -> Self {
27+
Self {
28+
waiting: None,
29+
shared: Arc::new(Mutex::new(Shared::new(service))),
30+
}
31+
}
32+
}
33+
34+
impl<S> Clone for Lock<S> {
35+
fn clone(&self) -> Self {
36+
Self {
37+
// Clones have an independent local lock state.
38+
waiting: None,
39+
shared: self.shared.clone(),
40+
}
41+
}
42+
}
43+
44+
impl<T> Lock<T> {
45+
fn guard(&self, value: T) -> Guard<T> {
46+
Guard {
47+
value: Some(value),
48+
shared: self.shared.clone(),
49+
}
50+
}
51+
52+
pub fn poll_acquire(&mut self) -> Async<Guard<T>> {
53+
let mut shared = self.shared.lock().expect("Lock poisoned");
54+
55+
loop {
56+
self.waiting = match self.waiting {
57+
// This instance has not registered interest in the lock.
58+
None => match shared.acquire() {
59+
// This instance has exclusive access to the inner service.
60+
Some(value) => {
61+
// The state remains Released.
62+
return Async::Ready(self.guard(value));
63+
}
64+
None => Some(Wait::default()),
65+
},
66+
67+
// This instance is interested in the lock.
68+
Some(ref waiter) => match shared.poll_acquire(waiter) {
69+
Async::NotReady => return Async::NotReady,
70+
Async::Ready(value) => {
71+
self.waiting = None;
72+
return Async::Ready(self.guard(value));
73+
}
74+
},
75+
};
76+
}
77+
}
78+
}
79+
80+
impl<T> Drop for Lock<T> {
81+
fn drop(&mut self) {
82+
if let Some(wait) = self.waiting.take() {
83+
if let Ok(mut shared) = self.shared.lock() {
84+
shared.release_waiter(wait);
85+
}
86+
}
87+
}
88+
}
89+
90+
impl<T> std::ops::Deref for Guard<T> {
91+
type Target = T;
92+
fn deref(&self) -> &Self::Target {
93+
self.value.as_ref().expect("Value dropped from guard")
94+
}
95+
}
96+
97+
impl<T> std::ops::DerefMut for Guard<T> {
98+
fn deref_mut(&mut self) -> &mut Self::Target {
99+
self.value.as_mut().expect("Value dropped from guard")
100+
}
101+
}
102+
103+
impl<T: std::fmt::Debug> std::fmt::Debug for Guard<T> {
104+
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
105+
write!(f, "Guard({:?})", &**self)
106+
}
107+
}
108+
109+
impl<T: std::fmt::Display> std::fmt::Display for Guard<T> {
110+
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
111+
std::fmt::Display::fmt(&**self, f)
112+
}
113+
}
114+
115+
impl<T> Drop for Guard<T> {
116+
fn drop(&mut self) {
117+
let value = self.value.take().expect("Guard may not be dropped twice");
118+
if let Ok(mut shared) = self.shared.lock() {
119+
shared.release_and_notify(value);
120+
}
121+
}
122+
}

0 commit comments

Comments
 (0)