Skip to content

Commit b29747d

Browse files
committed
start! implementing poison
1 parent 500c124 commit b29747d

File tree

1 file changed

+64
-10
lines changed

1 file changed

+64
-10
lines changed

crates/matrix-sdk-common/src/cross_process_lock.rs

Lines changed: 64 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -42,13 +42,13 @@ use std::{
4242
future::Future,
4343
sync::{
4444
Arc,
45-
atomic::{self, AtomicU32},
45+
atomic::{AtomicBool, AtomicU32, AtomicU64, Ordering},
4646
},
4747
time::Duration,
4848
};
4949

5050
use tokio::sync::Mutex;
51-
use tracing::{debug, error, instrument, trace};
51+
use tracing::{debug, error, instrument, trace, warn};
5252

5353
use crate::{
5454
SendOutsideWasm,
@@ -60,9 +60,13 @@ use crate::{
6060
/// holder.
6161
pub type CrossProcessLockGeneration = u64;
6262

63+
type AtomicCrossProcessLockGeneration = AtomicU64;
64+
6365
/// Describe the first lock generation value (see [`LockGeneration`]).
6466
pub const FIRST_CROSS_PROCESS_LOCK_GENERATION: CrossProcessLockGeneration = 1;
6567

68+
pub const NO_CROSS_PROCESS_LOCK_GENERATION: CrossProcessLockGeneration = 0;
69+
6670
/// Trait used to try to take a lock. Foundation of [`CrossProcessLock`].
6771
pub trait TryLock {
6872
#[cfg(not(target_family = "wasm"))]
@@ -111,7 +115,7 @@ pub struct CrossProcessLockGuard {
111115

112116
impl Drop for CrossProcessLockGuard {
113117
fn drop(&mut self) {
114-
self.num_holders.fetch_sub(1, atomic::Ordering::SeqCst);
118+
self.num_holders.fetch_sub(1, Ordering::SeqCst);
115119
}
116120
}
117121

@@ -154,6 +158,10 @@ where
154158

155159
/// Backoff time, in milliseconds.
156160
backoff: Arc<Mutex<WaitingTime>>,
161+
162+
lock_generation: Arc<AtomicCrossProcessLockGeneration>,
163+
164+
is_poisoned: Arc<AtomicBool>,
157165
}
158166

159167
/// Amount of time a lease of the lock should last, in milliseconds.
@@ -193,9 +201,31 @@ where
193201
num_holders: Arc::new(0.into()),
194202
locking_attempt: Arc::new(Mutex::new(())),
195203
renew_task: Default::default(),
204+
lock_generation: Arc::new(AtomicCrossProcessLockGeneration::new(
205+
FIRST_CROSS_PROCESS_LOCK_GENERATION,
206+
)),
207+
is_poisoned: Arc::new(AtomicBool::new(false)),
196208
}
197209
}
198210

211+
/// Determine whether the cross-process lock is poisoned.
212+
///
213+
/// If another process has taken the lock, then this lock becomes poisoned.
214+
/// You should not trust a `false` value for program correctness without
215+
/// additional synchronisation.
216+
pub fn is_poisoned(&self) -> bool {
217+
self.is_poisoned.load(Ordering::SeqCst)
218+
}
219+
220+
/// Clear the poisoned state from this cross-process lock.
221+
///
222+
/// If the cross-process lock is poisoned, it will remain poisoned until
223+
/// this method is called. This allows recovering from a poisoned
224+
/// state and marking that it has recovered.
225+
pub fn clear_poison(&self) {
226+
self.is_poisoned.store(false, Ordering::SeqCst);
227+
}
228+
199229
/// Try to lock once, returns whether the lock was obtained or not.
200230
#[instrument(skip(self), fields(?self.lock_key, ?self.lock_holder))]
201231
pub async fn try_lock_once(
@@ -207,25 +237,49 @@ where
207237

208238
// If another thread obtained the lock, make sure to only superficially increase
209239
// the number of holders, and carry on.
210-
if self.num_holders.load(atomic::Ordering::SeqCst) > 0 {
240+
if self.num_holders.load(Ordering::SeqCst) > 0 {
211241
// Note: between the above load and the fetch_add below, another thread may
212242
// decrement `num_holders`. That's fine because that means the lock
213243
// was taken by at least one thread, and after this call it will be
214244
// taken by at least one thread.
215245
trace!("We already had the lock, incrementing holder count");
216-
self.num_holders.fetch_add(1, atomic::Ordering::SeqCst);
246+
self.num_holders.fetch_add(1, Ordering::SeqCst);
217247
let guard = CrossProcessLockGuard { num_holders: self.num_holders.clone() };
218248
return Ok(Some(guard));
219249
}
220250

221-
let acquired = self
251+
if let Some(generation) = self
222252
.locker
223253
.try_lock(LEASE_DURATION_MS, &self.lock_key, &self.lock_holder)
224254
.await
225255
.map_err(|err| CrossProcessLockError::TryLockError(Box::new(err)))?
226-
.is_some();
256+
{
257+
match self.lock_generation.load(Ordering::SeqCst) {
258+
// If there is no lock generation, it means this is the first time the lock is
259+
// acquired. Let's remember the generation.
260+
NO_CROSS_PROCESS_LOCK_GENERATION => {
261+
trace!(?generation, "Setting the lock generation for the first time");
262+
263+
self.lock_generation.store(generation, Ordering::SeqCst);
264+
}
265+
266+
// This is NOT the same generation, the lock has been poisoned!
267+
current_generation if current_generation != generation => {
268+
warn!(
269+
?current_generation,
270+
?generation,
271+
"The lock has been acquired, but it's been poisoned!"
272+
);
273+
}
274+
275+
// This is the same generation, no problem.
276+
_ => {
277+
trace!("Same lock generation; no problem");
278+
}
279+
}
227280

228-
if !acquired {
281+
trace!("Lock acquired!");
282+
} else {
229283
trace!("Couldn't acquire the lock immediately.");
230284
return Ok(None);
231285
}
@@ -269,7 +323,7 @@ where
269323
let _guard = this.locking_attempt.lock().await;
270324

271325
// If there are no more users, we can quit.
272-
if this.num_holders.load(atomic::Ordering::SeqCst) == 0 {
326+
if this.num_holders.load(Ordering::SeqCst) == 0 {
273327
trace!("exiting the lease extension loop");
274328

275329
// Cancel the lease with another 0ms lease.
@@ -295,7 +349,7 @@ where
295349
}
296350
}));
297351

298-
self.num_holders.fetch_add(1, atomic::Ordering::SeqCst);
352+
self.num_holders.fetch_add(1, Ordering::SeqCst);
299353

300354
let guard = CrossProcessLockGuard { num_holders: self.num_holders.clone() };
301355
Ok(Some(guard))

0 commit comments

Comments
 (0)