@@ -42,13 +42,13 @@ use std::{
42
42
future:: Future ,
43
43
sync:: {
44
44
Arc ,
45
- atomic:: { self , AtomicU32 } ,
45
+ atomic:: { AtomicBool , AtomicU32 , AtomicU64 , Ordering } ,
46
46
} ,
47
47
time:: Duration ,
48
48
} ;
49
49
50
50
use tokio:: sync:: Mutex ;
51
- use tracing:: { debug, error, instrument, trace} ;
51
+ use tracing:: { debug, error, instrument, trace, warn } ;
52
52
53
53
use crate :: {
54
54
SendOutsideWasm ,
@@ -60,9 +60,13 @@ use crate::{
60
60
/// holder.
61
61
pub type CrossProcessLockGeneration = u64 ;
62
62
63
+ type AtomicCrossProcessLockGeneration = AtomicU64 ;
64
+
63
65
/// Describe the first lock generation value (see [`LockGeneration`]).
64
66
pub const FIRST_CROSS_PROCESS_LOCK_GENERATION : CrossProcessLockGeneration = 1 ;
65
67
68
+ pub const NO_CROSS_PROCESS_LOCK_GENERATION : CrossProcessLockGeneration = 0 ;
69
+
66
70
/// Trait used to try to take a lock. Foundation of [`CrossProcessLock`].
67
71
pub trait TryLock {
68
72
#[ cfg( not( target_family = "wasm" ) ) ]
@@ -81,7 +85,11 @@ pub trait TryLock {
81
85
/// - If there was no previous lease, we will acquire the lock.
82
86
/// - Otherwise, we don't get the lock.
83
87
///
84
- /// Returns whether taking the lock succeeded.
88
+ /// Returns `Some(_)` to indicate the lock succeeded, `None` otherwise. The
89
+ /// cross-process lock generation must be compared to the generation before
90
+ /// the call to see if the lock has been poisoned: a different generation
91
+ /// means the lock has been poisoned, i.e. taken by a different holder in
92
+ /// the meantime.
85
93
fn try_lock (
86
94
& self ,
87
95
lease_duration_ms : u32 ,
@@ -111,7 +119,7 @@ pub struct CrossProcessLockGuard {
111
119
112
120
impl Drop for CrossProcessLockGuard {
113
121
fn drop ( & mut self ) {
114
- self . num_holders . fetch_sub ( 1 , atomic :: Ordering :: SeqCst ) ;
122
+ self . num_holders . fetch_sub ( 1 , Ordering :: SeqCst ) ;
115
123
}
116
124
}
117
125
@@ -154,6 +162,10 @@ where
154
162
155
163
/// Backoff time, in milliseconds.
156
164
backoff : Arc < Mutex < WaitingTime > > ,
165
+
166
+ lock_generation : Arc < AtomicCrossProcessLockGeneration > ,
167
+
168
+ is_poisoned : Arc < AtomicBool > ,
157
169
}
158
170
159
171
/// Amount of time a lease of the lock should last, in milliseconds.
@@ -193,9 +205,32 @@ where
193
205
num_holders : Arc :: new ( 0 . into ( ) ) ,
194
206
locking_attempt : Arc :: new ( Mutex :: new ( ( ) ) ) ,
195
207
renew_task : Default :: default ( ) ,
208
+ lock_generation : Arc :: new ( AtomicCrossProcessLockGeneration :: new (
209
+ FIRST_CROSS_PROCESS_LOCK_GENERATION ,
210
+ ) ) ,
211
+ is_poisoned : Arc :: new ( AtomicBool :: new ( false ) ) ,
196
212
}
197
213
}
198
214
215
+ /// Determine whether the cross-process lock is poisoned.
216
+ ///
217
+ /// If another process has taken the lock, then this lock becomes poisoned.
218
+ /// You should not trust a `false` value for program correctness without
219
+ /// additional synchronisation.
220
+ pub fn is_poisoned ( & self ) -> bool {
221
+ self . is_poisoned . load ( Ordering :: SeqCst )
222
+ }
223
+
224
+ /// Clear the poisoned state from this cross-process lock.
225
+ ///
226
+ /// If the cross-process lock is poisoned, it will remain poisoned until
227
+ /// this method is called. This allows recovering from a poisoned
228
+ /// state and marking that it has recovered.
229
+ pub fn clear_poison ( & self ) {
230
+ self . is_poisoned . store ( false , Ordering :: SeqCst ) ;
231
+ self . lock_generation . store ( NO_CROSS_PROCESS_LOCK_GENERATION , Ordering :: SeqCst ) ;
232
+ }
233
+
199
234
/// Try to lock once, returns whether the lock was obtained or not.
200
235
#[ instrument( skip( self ) , fields( ?self . lock_key, ?self . lock_holder) ) ]
201
236
pub async fn try_lock_once (
@@ -207,25 +242,48 @@ where
207
242
208
243
// If another thread obtained the lock, make sure to only superficially increase
209
244
// the number of holders, and carry on.
210
- if self . num_holders . load ( atomic :: Ordering :: SeqCst ) > 0 {
245
+ if self . num_holders . load ( Ordering :: SeqCst ) > 0 {
211
246
// Note: between the above load and the fetch_add below, another thread may
212
247
// decrement `num_holders`. That's fine because that means the lock
213
248
// was taken by at least one thread, and after this call it will be
214
249
// taken by at least one thread.
215
250
trace ! ( "We already had the lock, incrementing holder count" ) ;
216
- self . num_holders . fetch_add ( 1 , atomic :: Ordering :: SeqCst ) ;
251
+ self . num_holders . fetch_add ( 1 , Ordering :: SeqCst ) ;
217
252
let guard = CrossProcessLockGuard { num_holders : self . num_holders . clone ( ) } ;
218
253
return Ok ( Some ( guard) ) ;
219
254
}
220
255
221
- let acquired = self
256
+ if let Some ( new_generation ) = self
222
257
. locker
223
258
. try_lock ( LEASE_DURATION_MS , & self . lock_key , & self . lock_holder )
224
259
. await
225
260
. map_err ( |err| CrossProcessLockError :: TryLockError ( Box :: new ( err) ) ) ?
226
- . is_some ( ) ;
261
+ {
262
+ match self . lock_generation . swap ( new_generation, Ordering :: SeqCst ) {
263
+ // If there was no lock generation, it means this is the first time the lock is
264
+ // acquired. It cannot be poisoned.
265
+ NO_CROSS_PROCESS_LOCK_GENERATION => {
266
+ trace ! ( ?new_generation, "Setting the lock generation for the first time" ) ;
267
+ }
227
268
228
- if !acquired {
269
+ // This was NOT the same generation, the lock has been poisoned!
270
+ previous_generation if previous_generation != new_generation => {
271
+ warn ! (
272
+ ?previous_generation,
273
+ ?new_generation,
274
+ "The lock has been acquired, but it's been poisoned!"
275
+ ) ;
276
+ self . is_poisoned . store ( true , Ordering :: SeqCst ) ;
277
+ }
278
+
279
+ // This was the same generation, no problem.
280
+ _ => {
281
+ trace ! ( "Same lock generation; no problem" ) ;
282
+ }
283
+ }
284
+
285
+ trace ! ( "Lock acquired!" ) ;
286
+ } else {
229
287
trace ! ( "Couldn't acquire the lock immediately." ) ;
230
288
return Ok ( None ) ;
231
289
}
@@ -269,7 +327,7 @@ where
269
327
let _guard = this. locking_attempt . lock ( ) . await ;
270
328
271
329
// If there are no more users, we can quit.
272
- if this. num_holders . load ( atomic :: Ordering :: SeqCst ) == 0 {
330
+ if this. num_holders . load ( Ordering :: SeqCst ) == 0 {
273
331
trace ! ( "exiting the lease extension loop" ) ;
274
332
275
333
// Cancel the lease with another 0ms lease.
@@ -284,18 +342,55 @@ where
284
342
285
343
sleep ( Duration :: from_millis ( EXTEND_LEASE_EVERY_MS ) ) . await ;
286
344
287
- let fut =
288
- this. locker . try_lock ( LEASE_DURATION_MS , & this. lock_key , & this. lock_holder ) ;
345
+ match this
346
+ . locker
347
+ . try_lock ( LEASE_DURATION_MS , & this. lock_key , & this. lock_holder )
348
+ . await
349
+ {
350
+ Ok ( Some ( new_generation) ) => {
351
+ match this. lock_generation . swap ( new_generation, Ordering :: SeqCst ) {
352
+ // It's impossible to renew the lock if the lock wasn't acquired at
353
+ // least once. This is unreachable.
354
+ NO_CROSS_PROCESS_LOCK_GENERATION => unreachable ! (
355
+ "It's impossible to renew a lock lease that has not been acquired once"
356
+ ) ,
357
+
358
+ // This was NOT the same generation, the lock has been poisoned!
359
+ previous_generation if previous_generation != new_generation => {
360
+ warn ! (
361
+ ?previous_generation,
362
+ ?new_generation,
363
+ "The lock lease has been renewed, but it's been poisoned!"
364
+ ) ;
365
+ this. is_poisoned . store ( true , Ordering :: SeqCst ) ;
366
+
367
+ // Exit the loop.
368
+ break ;
369
+ }
370
+
371
+ // This was the same generation, no problem.
372
+ _ => { }
373
+ }
374
+ }
289
375
290
- if let Err ( err) = fut. await {
291
- error ! ( "error when extending lock lease: {err:#}" ) ;
292
- // Exit the loop.
293
- break ;
376
+ Ok ( None ) => {
377
+ error ! ( "Failed to renew the lock lease: the lock could not be acquired" ) ;
378
+
379
+ // Exit the loop.
380
+ break ;
381
+ }
382
+
383
+ Err ( err) => {
384
+ error ! ( "Error when extending the lock lease: {err:#}" ) ;
385
+
386
+ // Exit the loop.
387
+ break ;
388
+ }
294
389
}
295
390
}
296
391
} ) ) ;
297
392
298
- self . num_holders . fetch_add ( 1 , atomic :: Ordering :: SeqCst ) ;
393
+ self . num_holders . fetch_add ( 1 , Ordering :: SeqCst ) ;
299
394
300
395
let guard = CrossProcessLockGuard { num_holders : self . num_holders . clone ( ) } ;
301
396
Ok ( Some ( guard) )
0 commit comments