22
33namespace CFLookup
44{
5- public class RedisJobLock : IDisposable
5+ public sealed class RedisJobLock : IAsyncDisposable
66 {
7- private readonly ConnectionMultiplexer _connectionMultiplexer ;
8- private string TypeName { get ; set ; }
9- private string Handle { get ; set ; }
7+ private readonly ILogger < RedisJobLock > _logger ;
8+ private readonly TimeSpan _expiryTime ;
9+ private string _lockKey { get ; }
1010
11- private IDatabase RDB { get { return _connectionMultiplexer . GetDatabase ( 0 ) ; } }
12- private Guid LockInstance ;
13- private CancellationToken _cancellationToken ;
14- private readonly CancellationTokenSource CTS ;
11+ private IDatabase _redisDatabase { get ; }
12+ private readonly string _lockOwner ;
13+ private readonly CancellationTokenSource _cts ;
14+ private Task ? _refreshLockTask ;
1515
16- public RedisJobLock ( ConnectionMultiplexer connectionMultiplexer , string lockName )
16+ private RedisJobLock ( IDatabase database , string lockName , ILogger < RedisJobLock > logger , TimeSpan expiryTime )
1717 {
18- _connectionMultiplexer = connectionMultiplexer ;
19- CTS = new CancellationTokenSource ( ) ;
18+ _logger = logger ;
19+ _expiryTime = expiryTime ;
20+ _redisDatabase = database ;
2021
21- TypeName = lockName ;
22- LockInstance = Guid . NewGuid ( ) ;
23- _cancellationToken = CTS . Token ;
22+ _lockOwner = Guid . NewGuid ( ) . ToString ( ) ;
2423
25- Handle = GetHandle ( TypeName ) ;
24+ _lockKey = $ "joblock:{ lockName } ";
25+ _cts = new CancellationTokenSource ( ) ;
2626 }
2727
28- public bool IsLocked ( string lockName )
28+ public async static Task < RedisJobLock ? > CreateAsync (
29+ IDatabase database ,
30+ string lockName ,
31+ ILogger < RedisJobLock > logger ,
32+ TimeSpan expiryTime
33+ )
2934 {
30- var handle = GetHandle ( lockName ) ;
31- return RDB . LockQuery ( handle ) != RedisValue . Null ;
32- }
33-
34- private static string GetHandle ( string lockName )
35- {
36- return $ "joblock:{ lockName } ";
37- }
35+ var distributedLock = new RedisJobLock ( database , lockName , logger , expiryTime ) ;
3836
39- TimeSpan LockTime = TimeSpan . FromSeconds ( 15 ) ;
37+ var lockAcquired = await database . LockTakeAsync (
38+ distributedLock . _lockKey ,
39+ distributedLock . _lockOwner ,
40+ distributedLock . _expiryTime ) ;
4041
41- Task _refreshLockTask ;
42- async Task RefreshLockAsync ( )
43- {
44- try
45- {
46- await Task . Delay ( LockTime . Subtract ( TimeSpan . FromMilliseconds ( LockTime . TotalMilliseconds / 2 ) ) , _cancellationToken ) ;
47- KeepLocked ( ) ;
48- _refreshLockTask = RefreshLockAsync ( ) ;
49- _ = PubSubLog ( $ "Refreshing lock { Handle } / { LockInstance } ") ;
50- }
51- catch ( TaskCanceledException )
42+ if ( lockAcquired )
5243 {
53- /* This is totally ok, we cancelled it */
44+ distributedLock . StartRenewalTask ( ) ;
45+ logger . LogDebug ( "Lock acquired for key {LockKey} by owner {LockOwner}" , distributedLock . _lockKey , distributedLock . _lockOwner ) ;
46+ return distributedLock ;
5447 }
48+
49+ logger . LogDebug ( "Failed to acquire lock for key {LockKey}" , distributedLock . _lockKey ) ;
50+ return null ;
5551 }
5652
57- private void KeepLocked ( )
53+ private void StartRenewalTask ( )
5854 {
59- RDB . LockExtend ( Handle , LockInstance . ToString ( ) , LockTime ) ;
60- }
55+ _refreshLockTask = Task . Run ( async ( ) =>
56+ {
57+ var renewalDelay = TimeSpan . FromMilliseconds ( _expiryTime . TotalMilliseconds / 2.5 ) ;
6158
62- public async Task KeepLockedAsync ( )
63- {
64- await RDB . LockExtendAsync ( Handle , LockInstance . ToString ( ) , LockTime ) ;
65- }
59+ while ( ! _cts . IsCancellationRequested )
60+ {
61+ try
62+ {
63+ await Task . Delay ( renewalDelay , _cts . Token ) ;
6664
67- public bool TryTakeLock ( )
68- {
69- var couldTakeLock = RDB . LockTake ( Handle , LockInstance . ToString ( ) , LockTime ) ;
70- if ( couldTakeLock ) _refreshLockTask = RefreshLockAsync ( ) ;
71- _ = PubSubLog ( $ "Could { ( couldTakeLock ? "" : "not " ) } take lock for { Handle } / { LockInstance } ") ;
72- return couldTakeLock ;
73- }
65+ var renewed = await _redisDatabase . LockExtendAsync ( _lockKey , _lockOwner , _expiryTime ) ;
7466
75- public async Task < bool > TryTakeLockAsync ( )
76- {
77- var couldTakeLock = await RDB . LockTakeAsync ( Handle , LockInstance . ToString ( ) , LockTime ) ;
78- if ( couldTakeLock ) _refreshLockTask = RefreshLockAsync ( ) ;
79- _ = PubSubLog ( $ "Could { ( couldTakeLock ? "" : "not " ) } take lock for { Handle } / { LockInstance } ") ;
80- return couldTakeLock ;
67+ if ( renewed )
68+ {
69+ _logger . LogDebug ( "Renewed lock for key {LockKey}" , _lockKey ) ;
70+ }
71+ else
72+ {
73+ _logger . LogError ( "Failed to renew lock for key {LockKey}. Lock has been lost." , _lockKey ) ;
74+ break ;
75+ }
76+ }
77+ catch ( TaskCanceledException )
78+ {
79+ break ;
80+ }
81+ catch ( Exception ex )
82+ {
83+ _logger . LogError ( ex , "Failed to renew lock for key {LockKey} due to exception." , _lockKey ) ;
84+ break ;
85+ }
86+ }
87+ } ) ;
8188 }
8289
83- internal async Task PubSubLog ( string logmessage )
90+ public async ValueTask DisposeAsync ( )
8491 {
85- await _connectionMultiplexer . GetSubscriber ( ) . PublishAsync ( "LockMessages/CFLookup" , logmessage ) ;
86- }
92+ if ( _refreshLockTask == null )
93+ {
94+ return ;
95+ }
96+
97+ _logger . LogDebug ( $ "Releasing lock { _lockKey } / { _lockOwner } ") ;
8798
88- public void Dispose ( )
89- {
90- _ = PubSubLog ( $ "Releasing lock { Handle } / { LockInstance } ") ;
91- RDB . LockRelease ( Handle , LockInstance . ToString ( ) ) ;
92- CTS . Cancel ( ) ;
99+ if ( ! _cts . IsCancellationRequested )
100+ {
101+ await _cts . CancelAsync ( ) ;
102+ }
103+
104+ var released = await _redisDatabase . LockReleaseAsync ( _lockKey , _lockOwner ) ;
105+ if ( ! released )
106+ {
107+ _logger . LogWarning ( "Failed to release lock for key {LockKey}, it may have expired already" , _lockKey ) ;
108+ }
109+ else
110+ {
111+ _logger . LogDebug ( "Released lock for key {LockKey}" , _lockKey ) ;
112+ }
113+
114+ try
115+ {
116+ await _refreshLockTask ;
117+ }
118+ catch ( Exception ex )
119+ {
120+ _logger . LogError ( ex , "Failed to refresh lock for key {LockKey}" , _lockKey ) ;
121+ }
122+
123+ _cts . Dispose ( ) ;
93124 }
94125 }
95126}
0 commit comments