77using EfCore . Ydb . Storage . Internal ;
88using Microsoft . EntityFrameworkCore . Diagnostics ;
99using Microsoft . EntityFrameworkCore . Migrations ;
10- using Microsoft . EntityFrameworkCore . Storage ;
1110using Ydb . Sdk . Ado ;
1211
1312namespace EfCore . Ydb . Migrations . Internal ;
@@ -22,16 +21,17 @@ protected override bool InterpretExistsResult(object? value)
2221 => throw new InvalidOperationException ( "Shouldn't be called" ) ;
2322
2423 public override IMigrationsDatabaseLock AcquireDatabaseLock ( )
25- {
26- Dependencies . MigrationsLogger . AcquiringMigrationLock ( ) ;
27- return new YdbMigrationDatabaseLock ( this , Dependencies . Connection ) ;
28- }
24+ => AcquireDatabaseLockAsync ( ) . GetAwaiter ( ) . GetResult ( ) ;
2925
30- public override Task < IMigrationsDatabaseLock > AcquireDatabaseLockAsync (
26+ public override async Task < IMigrationsDatabaseLock > AcquireDatabaseLockAsync (
3127 CancellationToken cancellationToken = default
3228 )
3329 {
34- throw new NotImplementedException ( ) ;
30+ Dependencies . MigrationsLogger . AcquiringMigrationLock ( ) ;
31+ var dbLock =
32+ new YdbMigrationDatabaseLock ( "migrationLock" , this , ( YdbRelationalConnection ) Dependencies . Connection ) ;
33+ await dbLock . Lock ( timeoutInSeconds : 60 , cancellationToken ) ;
34+ return dbLock ;
3535 }
3636
3737 public override string GetCreateIfNotExistsScript ( )
@@ -74,26 +74,122 @@ public override string GetEndIfScript()
7474 throw new NotImplementedException ( ) ;
7575 }
7676
77- // TODO Implement lock
7877 private sealed class YdbMigrationDatabaseLock : IMigrationsDatabaseLock
7978 {
80- private YdbRelationalConnection _connection ;
79+ public IYdbRelationalConnection Connection { get ; }
80+ private readonly string _name ;
81+ private volatile string _pid ;
82+ private CancellationTokenSource ? _watchDogToken ;
8183
8284 public YdbMigrationDatabaseLock (
85+ string name ,
8386 IHistoryRepository historyRepository ,
84- IRelationalConnection connection
87+ YdbRelationalConnection ydbConnection
8588 )
8689 {
90+ _name = name ;
8791 HistoryRepository = historyRepository ;
88- _connection = ( YdbRelationalConnection ) connection ;
92+ Connection = ydbConnection . Clone ( ) ;
8993 }
9094
91- public void Dispose ( )
95+ public async Task Lock (
96+ int timeoutInSeconds ,
97+ CancellationToken cancellationToken = default
98+ )
99+ {
100+ if ( _watchDogToken != null )
101+ {
102+ throw new InvalidOperationException ( "Already locked" ) ;
103+ }
104+
105+ await Connection . OpenAsync ( cancellationToken ) ;
106+ await using ( var command = Connection . DbConnection . CreateCommand ( ) )
107+ {
108+ command . CommandText =
109+ """
110+ CREATE TABLE IF NOT EXISTS shedlock (
111+ name STRING NOT NULL,
112+ locked_at TIMESTAMP NOT NULL,
113+ lock_until TIMESTAMP NOT NULL,
114+ locked_by STRING NOT NULL,
115+ PRIMARY KEY(name)
116+ );
117+ """ ;
118+ await command . ExecuteNonQueryAsync ( cancellationToken ) ;
119+ }
120+
121+ _pid = $ "PID:{ Environment . ProcessId } ";
122+
123+ var lockAcquired = false ;
124+ for ( var i = 0 ; i < 10 ; i ++ )
125+ {
126+ if ( await UpdateLock ( _name , timeoutInSeconds ) )
127+ {
128+ lockAcquired = true ;
129+ break ;
130+ }
131+ await Task . Delay ( TimeSpan . FromMilliseconds ( 500 ) , cancellationToken ) ;
132+ }
133+
134+ if ( ! lockAcquired )
135+ {
136+ throw new TimeoutException ( "Failed to acquire lock for migration`" ) ;
137+ }
138+
139+ _watchDogToken = new CancellationTokenSource ( ) ;
140+ _ = Task . Run ( ( async Task ( ) =>
141+ {
142+ while ( true )
143+ {
144+ await Task . Delay ( TimeSpan . FromSeconds ( timeoutInSeconds / 2 ) , _watchDogToken . Token ) ;
145+ await UpdateLock ( _name , timeoutInSeconds ) ;
146+ }
147+ } ) ! , _watchDogToken . Token ) ;
148+ }
149+
150+ private async Task < bool > UpdateLock (
151+ string name ,
152+ int timeoutInSeconds
153+ )
92154 {
155+ var command = Connection . DbConnection . CreateCommand ( ) ;
156+ command . CommandText =
157+ $ """
158+ UPSERT INTO shedlock (name, locked_at, lock_until, locked_by)
159+ VALUES (
160+ @name,
161+ CurrentUtcTimestamp(),
162+ Unwrap(CurrentUtcTimestamp() + Interval("PT{ timeoutInSeconds } S")),
163+ @locked_by
164+ );
165+ """ ;
166+ command . Parameters . Add ( new YdbParameter ( "name" , DbType . String , name ) ) ;
167+ command . Parameters . Add ( new YdbParameter ( "locked_by" , DbType . String , _pid ) ) ;
168+ try
169+ {
170+ await command . ExecuteNonQueryAsync ( default ) ;
171+ return true ;
172+ }
173+ catch ( YdbException _ )
174+ {
175+ return false ;
176+ }
93177 }
94178
95- public ValueTask DisposeAsync ( )
96- => default ;
179+ public void Dispose ( )
180+ => DisposeInternalAsync ( ) . GetAwaiter ( ) . GetResult ( ) ;
181+
182+ public async ValueTask DisposeAsync ( )
183+ => await DisposeInternalAsync ( ) ;
184+
185+ private async Task DisposeInternalAsync ( )
186+ {
187+ if ( _watchDogToken != null ) await _watchDogToken . CancelAsync ( ) ;
188+ _watchDogToken = null ;
189+ await using var connection = Connection . DbConnection . CreateCommand ( ) ;
190+ connection . CommandText = "DELETE FROM shedlock WHERE name = '{_name}' AND locked_by = '{PID}';" ;
191+ await connection . ExecuteNonQueryAsync ( ) ;
192+ }
97193
98194 public IHistoryRepository HistoryRepository { get ; }
99195 }
0 commit comments