@@ -42,13 +42,13 @@ use std::{
42
42
future:: Future ,
43
43
sync:: {
44
44
Arc ,
45
- atomic:: { self , AtomicU32 } ,
45
+ atomic:: { AtomicBool , AtomicU32 , AtomicU64 , Ordering } ,
46
46
} ,
47
47
time:: Duration ,
48
48
} ;
49
49
50
50
use tokio:: sync:: Mutex ;
51
- use tracing:: { debug, error, instrument, trace} ;
51
+ use tracing:: { debug, error, instrument, trace, warn } ;
52
52
53
53
use crate :: {
54
54
SendOutsideWasm ,
@@ -60,9 +60,13 @@ use crate::{
60
60
/// holder.
61
61
pub type CrossProcessLockGeneration = u64 ;
62
62
63
+ type AtomicCrossProcessLockGeneration = AtomicU64 ;
64
+
63
65
/// Describe the first lock generation value (see [`LockGeneration`]).
64
66
pub const FIRST_CROSS_PROCESS_LOCK_GENERATION : CrossProcessLockGeneration = 1 ;
65
67
68
+ pub const NO_CROSS_PROCESS_LOCK_GENERATION : CrossProcessLockGeneration = 0 ;
69
+
66
70
/// Trait used to try to take a lock. Foundation of [`CrossProcessLock`].
67
71
pub trait TryLock {
68
72
#[ cfg( not( target_family = "wasm" ) ) ]
@@ -111,7 +115,7 @@ pub struct CrossProcessLockGuard {
111
115
112
116
impl Drop for CrossProcessLockGuard {
113
117
fn drop ( & mut self ) {
114
- self . num_holders . fetch_sub ( 1 , atomic :: Ordering :: SeqCst ) ;
118
+ self . num_holders . fetch_sub ( 1 , Ordering :: SeqCst ) ;
115
119
}
116
120
}
117
121
@@ -154,6 +158,10 @@ where
154
158
155
159
/// Backoff time, in milliseconds.
156
160
backoff : Arc < Mutex < WaitingTime > > ,
161
+
162
+ lock_generation : Arc < AtomicCrossProcessLockGeneration > ,
163
+
164
+ is_poisoned : Arc < AtomicBool > ,
157
165
}
158
166
159
167
/// Amount of time a lease of the lock should last, in milliseconds.
@@ -193,9 +201,31 @@ where
193
201
num_holders : Arc :: new ( 0 . into ( ) ) ,
194
202
locking_attempt : Arc :: new ( Mutex :: new ( ( ) ) ) ,
195
203
renew_task : Default :: default ( ) ,
204
+ lock_generation : Arc :: new ( AtomicCrossProcessLockGeneration :: new (
205
+ FIRST_CROSS_PROCESS_LOCK_GENERATION ,
206
+ ) ) ,
207
+ is_poisoned : Arc :: new ( AtomicBool :: new ( false ) ) ,
196
208
}
197
209
}
198
210
211
+ /// Determine whether the cross-process lock is poisoned.
212
+ ///
213
+ /// If another process has taken the lock, then this lock becomes poisoned.
214
+ /// You should not trust a `false` value for program correctness without
215
+ /// additional synchronisation.
216
+ pub fn is_poisoned ( & self ) -> bool {
217
+ self . is_poisoned . load ( Ordering :: SeqCst )
218
+ }
219
+
220
+ /// Clear the poisoned state from this cross-process lock.
221
+ ///
222
+ /// If the cross-process lock is poisoned, it will remain poisoned until
223
+ /// this method is called. This allows recovering from a poisoned
224
+ /// state and marking that it has recovered.
225
+ pub fn clear_poison ( & self ) {
226
+ self . is_poisoned . store ( false , Ordering :: SeqCst ) ;
227
+ }
228
+
199
229
/// Try to lock once, returns whether the lock was obtained or not.
200
230
#[ instrument( skip( self ) , fields( ?self . lock_key, ?self . lock_holder) ) ]
201
231
pub async fn try_lock_once (
@@ -207,25 +237,49 @@ where
207
237
208
238
// If another thread obtained the lock, make sure to only superficially increase
209
239
// the number of holders, and carry on.
210
- if self . num_holders . load ( atomic :: Ordering :: SeqCst ) > 0 {
240
+ if self . num_holders . load ( Ordering :: SeqCst ) > 0 {
211
241
// Note: between the above load and the fetch_add below, another thread may
212
242
// decrement `num_holders`. That's fine because that means the lock
213
243
// was taken by at least one thread, and after this call it will be
214
244
// taken by at least one thread.
215
245
trace ! ( "We already had the lock, incrementing holder count" ) ;
216
- self . num_holders . fetch_add ( 1 , atomic :: Ordering :: SeqCst ) ;
246
+ self . num_holders . fetch_add ( 1 , Ordering :: SeqCst ) ;
217
247
let guard = CrossProcessLockGuard { num_holders : self . num_holders . clone ( ) } ;
218
248
return Ok ( Some ( guard) ) ;
219
249
}
220
250
221
- let acquired = self
251
+ if let Some ( generation ) = self
222
252
. locker
223
253
. try_lock ( LEASE_DURATION_MS , & self . lock_key , & self . lock_holder )
224
254
. await
225
255
. map_err ( |err| CrossProcessLockError :: TryLockError ( Box :: new ( err) ) ) ?
226
- . is_some ( ) ;
256
+ {
257
+ match self . lock_generation . load ( Ordering :: SeqCst ) {
258
+ // If there is no lock generation, it means this is the first time the lock is
259
+ // acquired. Let's remember the generation.
260
+ NO_CROSS_PROCESS_LOCK_GENERATION => {
261
+ trace ! ( ?generation, "Setting the lock generation for the first time" ) ;
262
+
263
+ self . lock_generation . store ( generation, Ordering :: SeqCst ) ;
264
+ }
265
+
266
+ // This is NOT the same generation, the lock has been poisoned!
267
+ current_generation if current_generation != generation => {
268
+ warn ! (
269
+ ?current_generation,
270
+ ?generation,
271
+ "The lock has been acquired, but it's been poisoned!"
272
+ ) ;
273
+ }
274
+
275
+ // This is the same generation, no problem.
276
+ _ => {
277
+ trace ! ( "Same lock generation; no problem" ) ;
278
+ }
279
+ }
227
280
228
- if !acquired {
281
+ trace ! ( "Lock acquired!" ) ;
282
+ } else {
229
283
trace ! ( "Couldn't acquire the lock immediately." ) ;
230
284
return Ok ( None ) ;
231
285
}
@@ -269,7 +323,7 @@ where
269
323
let _guard = this. locking_attempt . lock ( ) . await ;
270
324
271
325
// If there are no more users, we can quit.
272
- if this. num_holders . load ( atomic :: Ordering :: SeqCst ) == 0 {
326
+ if this. num_holders . load ( Ordering :: SeqCst ) == 0 {
273
327
trace ! ( "exiting the lease extension loop" ) ;
274
328
275
329
// Cancel the lease with another 0ms lease.
@@ -295,7 +349,7 @@ where
295
349
}
296
350
} ) ) ;
297
351
298
- self . num_holders . fetch_add ( 1 , atomic :: Ordering :: SeqCst ) ;
352
+ self . num_holders . fetch_add ( 1 , Ordering :: SeqCst ) ;
299
353
300
354
let guard = CrossProcessLockGuard { num_holders : self . num_holders . clone ( ) } ;
301
355
Ok ( Some ( guard) )
0 commit comments