@@ -238,21 +238,15 @@ where
238
238
}
239
239
240
240
/// Try to lock once, returns whether the lock was obtained or not.
241
+ ///
242
+ /// The lock can be obtained but it can be dirty. In all cases, the renew
243
+ /// task will run in the background.
241
244
#[ instrument( skip( self ) , fields( ?self . lock_key, ?self . lock_holder) ) ]
242
- pub async fn try_lock_once (
243
- & self ,
244
- ) -> Result < Option < CrossProcessLockGuard > , CrossProcessLockError > {
245
+ pub async fn try_lock_once ( & self ) -> Result < CrossProcessLockResult , CrossProcessLockError > {
245
246
// Hold onto the locking attempt mutex for the entire lifetime of this
246
247
// function, to avoid multiple reentrant calls.
247
248
let mut _attempt = self . locking_attempt . lock ( ) . await ;
248
249
249
- // The lock is already dirtied? Let's stop here.
250
- if self . is_dirty ( ) {
251
- return Err ( CrossProcessLockError :: LockDirtied ( CrossProcessLockGuard :: new (
252
- self . num_holders . clone ( ) ,
253
- ) ) ) ;
254
- }
255
-
256
250
// If another thread obtained the lock, make sure to only superficially increase
257
251
// the number of holders, and carry on.
258
252
if self . num_holders . load ( Ordering :: SeqCst ) > 0 {
@@ -261,9 +255,12 @@ where
261
255
// was taken by at least one thread, and after this call it will be
262
256
// taken by at least one thread.
263
257
trace ! ( "We already had the lock, incrementing holder count" ) ;
258
+
264
259
self . num_holders . fetch_add ( 1 , Ordering :: SeqCst ) ;
265
- let guard = CrossProcessLockGuard { num_holders : self . num_holders . clone ( ) } ;
266
- return Ok ( Some ( guard) ) ;
260
+
261
+ return Ok ( CrossProcessLockResult :: Clean ( CrossProcessLockGuard :: new (
262
+ self . num_holders . clone ( ) ,
263
+ ) ) ) ;
267
264
}
268
265
269
266
if let Some ( new_generation) = self
@@ -287,10 +284,6 @@ where
287
284
"The lock has been acquired, but it's been dirtied!"
288
285
) ;
289
286
self . is_dirty . store ( true , Ordering :: SeqCst ) ;
290
-
291
- return Err ( CrossProcessLockError :: LockDirtied ( CrossProcessLockGuard :: new (
292
- self . num_holders . clone ( ) ,
293
- ) ) ) ;
294
287
}
295
288
296
289
// This was the same generation, no problem.
@@ -302,7 +295,7 @@ where
302
295
trace ! ( "Lock acquired!" ) ;
303
296
} else {
304
297
trace ! ( "Couldn't acquire the lock immediately." ) ;
305
- return Ok ( None ) ;
298
+ return Ok ( CrossProcessLockResult :: Unobtained ) ;
306
299
}
307
300
308
301
trace ! ( "Acquired the lock, spawning the lease extension task." ) ;
@@ -357,42 +350,22 @@ where
357
350
}
358
351
}
359
352
360
- // The lock has been dirtied. Exit the loop.
361
- if this. is_dirty ( ) {
362
- break ;
363
- }
364
-
365
353
sleep ( Duration :: from_millis ( EXTEND_LEASE_EVERY_MS ) ) . await ;
366
354
367
355
match this
368
356
. locker
369
357
. try_lock ( LEASE_DURATION_MS , & this. lock_key , & this. lock_holder )
370
358
. await
371
359
{
372
- Ok ( Some ( new_generation) ) => {
373
- match this. lock_generation . swap ( new_generation, Ordering :: SeqCst ) {
374
- // It's impossible to renew the lock if the lock wasn't acquired at
375
- // least once. This is unreachable.
376
- NO_CROSS_PROCESS_LOCK_GENERATION => unreachable ! (
377
- "It's impossible to renew a lock lease that has not been acquired once"
378
- ) ,
379
-
380
- // This was NOT the same generation, the lock has been dirtied!
381
- previous_generation if previous_generation != new_generation => {
382
- warn ! (
383
- ?previous_generation,
384
- ?new_generation,
385
- "The lock lease has been renewed, but it's been dirtied!"
386
- ) ;
387
- this. is_dirty . store ( true , Ordering :: SeqCst ) ;
388
-
389
- // Exit the loop.
390
- break ;
391
- }
392
-
393
- // This was the same generation, no problem.
394
- _ => { }
395
- }
360
+ Ok ( Some ( _generation) ) => {
361
+ // It's impossible that the generation can be different
362
+ // from the previous generation.
363
+ //
364
+ // As long as the task runs, the lock is renewed, so the
365
+ // generation remains the same. If the lock is not
366
+ // taken, it's because the lease has expired, which is
367
+ // represented by the `Ok(None)` value, and the task
368
+ // must stop.
396
369
}
397
370
398
371
Ok ( None ) => {
@@ -414,7 +387,13 @@ where
414
387
415
388
self . num_holders . fetch_add ( 1 , Ordering :: SeqCst ) ;
416
389
417
- Ok ( Some ( CrossProcessLockGuard :: new ( self . num_holders . clone ( ) ) ) )
390
+ let guard = CrossProcessLockGuard :: new ( self . num_holders . clone ( ) ) ;
391
+
392
+ Ok ( if self . is_dirty ( ) {
393
+ CrossProcessLockResult :: Dirty ( guard)
394
+ } else {
395
+ CrossProcessLockResult :: Clean ( guard)
396
+ } )
418
397
}
419
398
420
399
/// Attempt to take the lock, with exponential backoff if the lock has
@@ -436,7 +415,7 @@ where
436
415
// lock in `try_lock_once` should sequentialize it all.
437
416
438
417
loop {
439
- if let Some ( guard) = self . try_lock_once ( ) . await ? {
418
+ if let Some ( guard) = self . try_lock_once ( ) . await ?. ok ( ) {
440
419
// Reset backoff before returning, for the next attempt to lock.
441
420
* self . backoff . lock ( ) . await = WaitingTime :: Some ( INITIAL_BACKOFF_MS ) ;
442
421
return Ok ( guard) ;
@@ -473,18 +452,53 @@ where
473
452
}
474
453
}
475
454
455
+ /// Represent the result of a locking attempt, either by
456
+ /// [`CrossProcessLock::try_lock_once`] or [`CrossProcessLock::spin_lock`].
457
+ #[ derive( Debug ) ]
458
+ pub enum CrossProcessLockResult {
459
+ /// The lock has been obtained successfully, all good.
460
+ Clean ( CrossProcessLockGuard ) ,
461
+
462
+ /// The lock has been obtained successfully, but the lock is dirty!
463
+ ///
464
+ /// This holder has obtained this cross-process lock once, then another
465
+ /// holder has obtained this cross-process lock _before_ this holder
466
+ /// obtained it again. The lock is marked as dirty. It means the value
467
+ /// protected by the cross-process lock may need to be reloaded if
468
+ /// synchronisation is important.
469
+ Dirty ( CrossProcessLockGuard ) ,
470
+
471
+ /// The lock has not been obtained.
472
+ Unobtained ,
473
+ }
474
+
475
+ impl CrossProcessLockResult {
476
+ pub fn is_dirty ( & self ) -> bool {
477
+ matches ! ( self , Self :: Dirty ( _) )
478
+ }
479
+
480
+ /// Convert from [`CrossProcessLockResult`] to
481
+ /// [`Option<T>`] where `T` is [`CrossProcessLockGuard`].
482
+ pub fn ok ( self ) -> Option < CrossProcessLockGuard > {
483
+ match self {
484
+ Self :: Clean ( guard) | Self :: Dirty ( guard) => Some ( guard) ,
485
+ Self :: Unobtained => None ,
486
+ }
487
+ }
488
+
489
+ /// Return `true` if the lock has been obtained, `false` otherwise.
490
+ pub fn is_ok ( & self ) -> bool {
491
+ matches ! ( self , Self :: Clean ( _) | Self :: Dirty ( _) )
492
+ }
493
+ }
494
+
476
495
/// Error related to the locking API of the store.
477
496
#[ derive( Debug , thiserror:: Error ) ]
478
497
pub enum CrossProcessLockError {
479
498
/// Spent too long waiting for a database lock.
480
499
#[ error( "a lock timed out" ) ]
481
500
LockTimeout ,
482
501
483
- /// The lock has been dirtied, i.e. it means another process took the lock
484
- /// while this process was holding it.
485
- #[ error( "a lock has been dirtied" ) ]
486
- LockDirtied ( CrossProcessLockGuard ) ,
487
-
488
502
#[ error( transparent) ]
489
503
#[ cfg( not( target_family = "wasm" ) ) ]
490
504
TryLockError ( #[ from] Box < dyn Error + Send + Sync > ) ,
@@ -499,6 +513,7 @@ pub enum CrossProcessLockError {
499
513
mod tests {
500
514
use std:: {
501
515
collections:: HashMap ,
516
+ ops:: Not ,
502
517
sync:: { Arc , RwLock , atomic} ,
503
518
} ;
504
519
@@ -510,8 +525,8 @@ mod tests {
510
525
} ;
511
526
512
527
use super :: {
513
- CrossProcessLock , CrossProcessLockError , CrossProcessLockGeneration , CrossProcessLockGuard ,
514
- EXTEND_LEASE_EVERY_MS , TryLock ,
528
+ CrossProcessLock , CrossProcessLockError , CrossProcessLockGeneration ,
529
+ CrossProcessLockResult , EXTEND_LEASE_EVERY_MS , TryLock ,
515
530
memory_store_helper:: { Lease , try_take_leased_lock} ,
516
531
} ;
517
532
@@ -548,8 +563,8 @@ mod tests {
548
563
}
549
564
}
550
565
551
- async fn release_lock ( guard : Option < CrossProcessLockGuard > ) {
552
- drop ( guard ) ;
566
+ async fn release_lock ( result : CrossProcessLockResult ) {
567
+ drop ( result ) ;
553
568
sleep ( Duration :: from_millis ( EXTEND_LEASE_EVERY_MS ) ) . await ;
554
569
}
555
570
@@ -561,19 +576,19 @@ mod tests {
561
576
let lock = CrossProcessLock :: new ( store, "key" . to_owned ( ) , "first" . to_owned ( ) ) ;
562
577
563
578
// The lock plain works when used with a single holder.
564
- let acquired = lock. try_lock_once ( ) . await ?;
565
- assert ! ( acquired . is_some ( ) ) ;
579
+ let result = lock. try_lock_once ( ) . await ?;
580
+ assert ! ( result . is_ok ( ) ) ;
566
581
assert_eq ! ( lock. num_holders. load( atomic:: Ordering :: SeqCst ) , 1 ) ;
567
582
568
583
// Releasing works.
569
- release_lock ( acquired ) . await ;
584
+ release_lock ( result ) . await ;
570
585
assert_eq ! ( lock. num_holders. load( atomic:: Ordering :: SeqCst ) , 0 ) ;
571
586
572
587
// Spin locking on the same lock always works, assuming no concurrent access.
573
- let acquired = lock. spin_lock ( None ) . await . unwrap ( ) ;
588
+ let guard = lock. spin_lock ( None ) . await . unwrap ( ) ;
574
589
575
590
// Releasing still works.
576
- release_lock ( Some ( acquired ) ) . await ;
591
+ release_lock ( CrossProcessLockResult :: Clean ( guard ) ) . await ;
577
592
assert_eq ! ( lock. num_holders. load( atomic:: Ordering :: SeqCst ) , 0 ) ;
578
593
579
594
Ok ( ( ) )
@@ -585,8 +600,8 @@ mod tests {
585
600
let lock = CrossProcessLock :: new ( store. clone ( ) , "key" . to_owned ( ) , "first" . to_owned ( ) ) ;
586
601
587
602
// When a lock is acquired...
588
- let acquired = lock. try_lock_once ( ) . await ?;
589
- assert ! ( acquired . is_some ( ) ) ;
603
+ let result = lock. try_lock_once ( ) . await ?;
604
+ assert ! ( result . is_ok ( ) ) ;
590
605
assert_eq ! ( lock. num_holders. load( atomic:: Ordering :: SeqCst ) , 1 ) ;
591
606
592
607
// But then forgotten... (note: no need to release the guard)
@@ -596,8 +611,8 @@ mod tests {
596
611
let lock = CrossProcessLock :: new ( store. clone ( ) , "key" . to_owned ( ) , "first" . to_owned ( ) ) ;
597
612
598
613
// We still got it.
599
- let acquired = lock. try_lock_once ( ) . await ?;
600
- assert ! ( acquired . is_some ( ) ) ;
614
+ let result = lock. try_lock_once ( ) . await ?;
615
+ assert ! ( result . is_ok ( ) ) ;
601
616
assert_eq ! ( lock. num_holders. load( atomic:: Ordering :: SeqCst ) , 1 ) ;
602
617
603
618
Ok ( ( ) )
@@ -609,19 +624,19 @@ mod tests {
609
624
let lock = CrossProcessLock :: new ( store, "key" . to_owned ( ) , "first" . to_owned ( ) ) ;
610
625
611
626
// Taking the lock twice...
612
- let acquired = lock. try_lock_once ( ) . await ?;
613
- assert ! ( acquired . is_some ( ) ) ;
627
+ let result1 = lock. try_lock_once ( ) . await ?;
628
+ assert ! ( result1 . is_ok ( ) ) ;
614
629
615
- let acquired2 = lock. try_lock_once ( ) . await ?;
616
- assert ! ( acquired2 . is_some ( ) ) ;
630
+ let result2 = lock. try_lock_once ( ) . await ?;
631
+ assert ! ( result2 . is_ok ( ) ) ;
617
632
618
633
assert_eq ! ( lock. num_holders. load( atomic:: Ordering :: SeqCst ) , 2 ) ;
619
634
620
635
// ...means we can release it twice.
621
- release_lock ( acquired ) . await ;
636
+ release_lock ( result1 ) . await ;
622
637
assert_eq ! ( lock. num_holders. load( atomic:: Ordering :: SeqCst ) , 1 ) ;
623
638
624
- release_lock ( acquired2 ) . await ;
639
+ release_lock ( result2 ) . await ;
625
640
assert_eq ! ( lock. num_holders. load( atomic:: Ordering :: SeqCst ) , 0 ) ;
626
641
627
642
Ok ( ( ) )
@@ -634,22 +649,22 @@ mod tests {
634
649
let lock2 = CrossProcessLock :: new ( store, "key" . to_owned ( ) , "second" . to_owned ( ) ) ;
635
650
636
651
// When the first process takes the lock...
637
- let acquired1 = lock1. try_lock_once ( ) . await ?;
638
- assert ! ( acquired1 . is_some ( ) ) ;
652
+ let result1 = lock1. try_lock_once ( ) . await ?;
653
+ assert ! ( result1 . is_ok ( ) ) ;
639
654
640
655
// The second can't take it immediately.
641
- let acquired2 = lock2. try_lock_once ( ) . await ?;
642
- assert ! ( acquired2 . is_none ( ) ) ;
656
+ let result2 = lock2. try_lock_once ( ) . await ?;
657
+ assert ! ( result2 . is_ok ( ) . not ( ) ) ;
643
658
644
659
let lock2_clone = lock2. clone ( ) ;
645
660
let handle = spawn ( async move { lock2_clone. spin_lock ( Some ( 1000 ) ) . await } ) ;
646
661
647
662
sleep ( Duration :: from_millis ( 100 ) ) . await ;
648
663
649
- drop ( acquired1 ) ;
664
+ drop ( result1 ) ;
650
665
651
666
// lock2 in the background manages to get the lock at some point.
652
- let _acquired2 = handle
667
+ let _result2 = handle
653
668
. await
654
669
. expect ( "join handle is properly awaited" )
655
670
. expect ( "lock was obtained after spin-locking" ) ;
0 commit comments