11using System ;
2- using System . Data ;
2+ using System . Collections . Generic ;
33using System . Diagnostics ;
4- using System . Linq ;
4+ using System . Globalization ;
55using System . Threading ;
66using System . Threading . Tasks ;
77using EfCore . Ydb . Storage . Internal ;
88using Microsoft . EntityFrameworkCore . Diagnostics ;
99using Microsoft . EntityFrameworkCore . Migrations ;
10+ using Microsoft . EntityFrameworkCore . Migrations . Operations ;
11+ using Microsoft . Extensions . Logging ;
12+ using Ydb . Sdk ;
1013using Ydb . Sdk . Ado ;
1114
1215namespace EfCore . Ydb . Migrations . Internal ;
1316
14- // ReSharper disable once ClassNeverInstantiated.Global
15- public class YdbHistoryRepository ( HistoryRepositoryDependencies dependencies ) : HistoryRepository ( dependencies )
17+ public class YdbHistoryRepository ( HistoryRepositoryDependencies dependencies )
18+ : HistoryRepository ( dependencies ) , IHistoryRepository
1619{
20+ private const string LockKey = "LockMigration" ;
21+ private const int ReleaseMaxAttempt = 10 ;
22+
23+ private static readonly TimeSpan LockTimeout = TimeSpan . FromMinutes ( 2 ) ;
24+
1725 protected override bool InterpretExistsResult ( object ? value )
1826 => throw new InvalidOperationException ( "Shouldn't be called" ) ;
1927
@@ -25,152 +33,141 @@ public override async Task<IMigrationsDatabaseLock> AcquireDatabaseLockAsync(
2533 )
2634 {
2735 Dependencies . MigrationsLogger . AcquiringMigrationLock ( ) ;
28- var dbLock =
29- new YdbMigrationDatabaseLock ( "migrationLock" , this , ( YdbRelationalConnection ) Dependencies . Connection ) ;
30- await dbLock . Lock ( timeoutInSeconds : 60 , cancellationToken ) ;
31- return dbLock ;
32- }
3336
34- public override string GetCreateIfNotExistsScript ( )
35- => GetCreateScript ( ) . Replace ( "CREATE TABLE" , "CREATE TABLE IF NOT EXISTS" ) ;
37+ var deadline = DateTime . UtcNow + LockTimeout ;
38+ DateTime now ;
3639
37- public override LockReleaseBehavior LockReleaseBehavior => LockReleaseBehavior . Transaction ;
40+ do
41+ {
42+ now = DateTime . UtcNow ;
3843
39- protected override string ExistsSql
40- => throw new UnreachableException ( "Shouldn't be called. We check if exists using different approach" ) ;
44+ await using var connection = ( ( IYdbRelationalConnection ) Dependencies . Connection ) . Clone ( ) . DbConnection ;
45+ try
46+ {
47+ await connection . OpenAsync ( cancellationToken ) ;
4148
42- public override bool Exists ( )
43- => ExistsAsync ( ) . ConfigureAwait ( false ) . GetAwaiter ( ) . GetResult ( ) ;
49+ var command = connection . CreateCommand ( ) ;
50+ command . CommandText = GetInsertScript ( new HistoryRow ( LockKey ,
51+ $ "LockTime: { DateTime . UtcNow . ToString ( CultureInfo . InvariantCulture ) } , PID: { Environment . ProcessId } ") ) ;
52+ await command . ExecuteNonQueryAsync ( cancellationToken ) ;
4453
45- public override Task < bool > ExistsAsync ( CancellationToken cancellationToken = default )
46- {
47- var connection = ( YdbRelationalConnection ) Dependencies . Connection ;
48- var schema = ( YdbConnection ) connection . DbConnection ;
49- var tables = schema . GetSchema ( "tables" ) ;
50-
51- var foundTables =
52- from table in tables . AsEnumerable ( )
53- where table . Field < string > ( "table_type" ) == "TABLE"
54- && table . Field < string > ( "table_name" ) == TableName
55- select table ;
56- return Task . FromResult ( foundTables . Count ( ) == 1 ) ;
54+ return new YdbMigrationDatabaseLock ( this ) ;
55+ }
56+ catch ( YdbException )
57+ {
58+ await Task . Delay ( 100 + Random . Shared . Next ( 1000 ) , cancellationToken ) ;
59+ }
60+ } while ( now < deadline ) ;
61+
62+ throw new YdbException ( "Unable to obtain table lock - another EF instance may be running" ) ;
5763 }
5864
59- public override string GetBeginIfNotExistsScript ( string migrationId ) => throw new NotImplementedException ( ) ;
65+ private async Task ReleaseDatabaseLockAsync ( )
66+ {
67+ for ( var i = 0 ; i < ReleaseMaxAttempt ; i ++ )
68+ {
69+ await using var connection = ( ( IYdbRelationalConnection ) Dependencies . Connection ) . Clone ( ) . DbConnection ;
6070
61- public override string GetBeginIfExistsScript ( string migrationId ) => throw new NotImplementedException ( ) ;
71+ try
72+ {
73+ await connection . OpenAsync ( ) ;
74+ var command = connection . CreateCommand ( ) ;
75+ command . CommandText = GetDeleteScript ( LockKey ) ;
76+ await command . ExecuteNonQueryAsync ( ) ;
77+
78+ return ;
79+ }
80+ catch ( YdbException e )
81+ {
82+ Dependencies . MigrationsLogger . Logger . LogError ( e , "Failed release database lock" ) ;
83+ }
84+ }
85+ }
6286
63- public override string GetEndIfScript ( ) => throw new NotImplementedException ( ) ;
87+ bool IHistoryRepository . CreateIfNotExists ( ) => CreateIfNotExistsAsync ( ) . GetAwaiter ( ) . GetResult ( ) ;
6488
65- private sealed class YdbMigrationDatabaseLock (
66- string name ,
67- IHistoryRepository historyRepository ,
68- YdbRelationalConnection ydbConnection
69- ) : IMigrationsDatabaseLock
89+ public async Task < bool > CreateIfNotExistsAsync ( CancellationToken cancellationToken = default )
7090 {
71- private IYdbRelationalConnection Connection { get ; } = ydbConnection . Clone ( ) ;
72- private volatile string _pid = null ! ;
73- private CancellationTokenSource ? _watchDogToken ;
91+ if ( await ExistsAsync ( cancellationToken ) )
92+ {
93+ return false ;
94+ }
7495
75- public async Task Lock ( int timeoutInSeconds , CancellationToken cancellationToken = default )
96+ try
7697 {
77- if ( _watchDogToken != null )
98+ await Dependencies . MigrationCommandExecutor . ExecuteNonQueryAsync (
99+ GetCreateIfNotExistsCommands ( ) ,
100+ Dependencies . Connection ,
101+ new MigrationExecutionState ( ) ,
102+ true ,
103+ cancellationToken : cancellationToken
104+ ) . ConfigureAwait ( false ) ;
105+
106+ return true ;
107+ }
108+ catch ( YdbException e )
109+ {
110+ if ( e . Code == StatusCode . Overloaded )
78111 {
79- throw new InvalidOperationException ( "Already locked" ) ;
112+ return true ;
80113 }
81114
82- await Connection . OpenAsync ( cancellationToken ) ;
83- await using ( var command = Connection . DbConnection . CreateCommand ( ) )
115+ throw ;
116+ }
117+ }
118+
119+ private IReadOnlyList < MigrationCommand > GetCreateIfNotExistsCommands ( ) =>
120+ Dependencies . MigrationsSqlGenerator . Generate ( new List < MigrationOperation >
121+ {
122+ new SqlOperation
84123 {
85- command . CommandText = """
86- CREATE TABLE IF NOT EXISTS shedlock (
87- name Text NOT NULL,
88- locked_at Timestamp NOT NULL,
89- lock_until Timestamp NOT NULL,
90- locked_by Text NOT NULL,
91- PRIMARY KEY(name)
92- );
93- """ ;
94- await command . ExecuteNonQueryAsync ( cancellationToken ) ;
124+ Sql = GetCreateIfNotExistsScript ( ) ,
125+ SuppressTransaction = true
95126 }
127+ } ) ;
96128
97- _pid = $ "PID:{ Environment . ProcessId } ";
129+ public override string GetCreateIfNotExistsScript ( )
130+ => GetCreateScript ( ) . Replace ( "CREATE TABLE" , "CREATE TABLE IF NOT EXISTS" ) ;
98131
99- var lockAcquired = false ;
100- for ( var i = 0 ; i < 10 ; i ++ )
101- {
102- if ( await UpdateLock ( name , timeoutInSeconds ) )
103- {
104- lockAcquired = true ;
105- break ;
106- }
132+ public override LockReleaseBehavior LockReleaseBehavior => LockReleaseBehavior . Transaction ;
107133
108- await Task . Delay ( TimeSpan . FromMilliseconds ( 500 ) , cancellationToken ) ;
109- }
134+ protected override string ExistsSql
135+ => throw new UnreachableException ( "Shouldn't be called. We check if exists using different approach" ) ;
110136
111- if ( ! lockAcquired )
112- {
113- throw new TimeoutException ( "Failed to acquire lock for migration`" ) ;
114- }
137+ public override bool Exists ( )
138+ => ExistsAsync ( ) . ConfigureAwait ( false ) . GetAwaiter ( ) . GetResult ( ) ;
115139
116- _watchDogToken = new CancellationTokenSource ( ) ;
117- _ = Task . Run ( ( async Task ( ) =>
118- {
119- while ( true )
120- {
121- // ReSharper disable once PossibleLossOfFraction
122- await Task . Delay ( TimeSpan . FromSeconds ( timeoutInSeconds / 2 ) , _watchDogToken . Token ) ;
123- await UpdateLock ( name , timeoutInSeconds ) ;
124- }
125- // ReSharper disable once FunctionNeverReturns
126- } ) ! , _watchDogToken . Token ) ;
127- }
140+ public override async Task < bool > ExistsAsync ( CancellationToken cancellationToken = default )
141+ {
142+ await using var ydbConnection = ( YdbConnection )
143+ ( ( IYdbRelationalConnection ) Dependencies . Connection ) . Clone ( ) . DbConnection ;
144+ await ydbConnection . OpenAsync ( cancellationToken ) ;
128145
129- private async Task < bool > UpdateLock ( string nameLock , int timeoutInSeconds )
146+ try
130147 {
131- var command = Connection . DbConnection . CreateCommand ( ) ;
132- command . CommandText =
133- $ """
134- UPSERT INTO shedlock (name, locked_at, lock_until, locked_by)
135- VALUES (
136- @name,
137- CurrentUtcTimestamp(),
138- Unwrap(CurrentUtcTimestamp() + Interval("PT{ timeoutInSeconds } S")),
139- @locked_by
140- );
141- """ ;
142- command . Parameters . Add ( new YdbParameter ( "name" , DbType . String , nameLock ) ) ;
143- command . Parameters . Add ( new YdbParameter ( "locked_by" , DbType . String , _pid ) ) ;
148+ await new YdbCommand ( ydbConnection )
149+ { CommandText = $ "SELECT * FROM { TableName } WHERE MigrationId = '{ LockKey } ';" }
150+ . ExecuteNonQueryAsync ( cancellationToken ) ;
144151
145- try
146- {
147- await command . ExecuteNonQueryAsync ( ) ;
148- return true ;
149- }
150- catch ( YdbException )
151- {
152- return false ;
153- }
152+ return true ;
154153 }
154+ catch ( YdbException )
155+ {
156+ return false ;
157+ }
158+ }
155159
156- public void Dispose ( )
157- => DisposeInternalAsync ( ) . GetAwaiter ( ) . GetResult ( ) ;
160+ public override string GetBeginIfNotExistsScript ( string migrationId ) => throw new NotSupportedException ( ) ;
158161
159- public async ValueTask DisposeAsync ( )
160- => await DisposeInternalAsync ( ) ;
162+ public override string GetBeginIfExistsScript ( string migrationId ) => throw new NotSupportedException ( ) ;
161163
162- private async Task DisposeInternalAsync ( )
163- {
164- if ( _watchDogToken != null )
165- {
166- await _watchDogToken . CancelAsync ( ) ;
167- }
164+ public override string GetEndIfScript ( ) => throw new NotSupportedException ( ) ;
168165
169- _watchDogToken = null ;
170- await using var connection = Connection . DbConnection . CreateCommand ( ) ;
171- connection . CommandText = "DELETE FROM shedlock WHERE name = '{_name}' AND locked_by = '{PID}';" ;
172- await connection . ExecuteNonQueryAsync ( ) ;
173- }
166+ private sealed class YdbMigrationDatabaseLock ( YdbHistoryRepository historyRepository ) : IMigrationsDatabaseLock
167+ {
168+ public void Dispose ( ) => historyRepository . ReleaseDatabaseLockAsync ( ) . GetAwaiter ( ) . GetResult ( ) ;
169+
170+ public async ValueTask DisposeAsync ( ) => await historyRepository . ReleaseDatabaseLockAsync ( ) ;
174171
175172 public IHistoryRepository HistoryRepository { get ; } = historyRepository ;
176173 }
0 commit comments