11using System ;
2- using System . Data ;
2+ using System . Collections . Generic ;
33using System . Diagnostics ;
4- using System . Linq ;
4+ using System . Globalization ;
55using System . Threading ;
66using System . Threading . Tasks ;
77using EfCore . Ydb . Storage . Internal ;
88using Microsoft . EntityFrameworkCore . Diagnostics ;
99using Microsoft . EntityFrameworkCore . Migrations ;
10+ using Microsoft . EntityFrameworkCore . Migrations . Operations ;
11+ using Microsoft . Extensions . Logging ;
12+ using Ydb . Sdk ;
1013using Ydb . Sdk . Ado ;
1114
1215namespace EfCore . Ydb . Migrations . Internal ;
1316
14- // ReSharper disable once ClassNeverInstantiated.Global
15- public class YdbHistoryRepository ( HistoryRepositoryDependencies dependencies ) : HistoryRepository ( dependencies )
17+ public class YdbHistoryRepository ( HistoryRepositoryDependencies dependencies )
18+ : HistoryRepository ( dependencies ) , IHistoryRepository
1619{
20+ private const string LockKey = "LockMigration" ;
21+ private const int ReleaseMaxAttempt = 10 ;
22+
23+ private static readonly TimeSpan LockTimeout = TimeSpan . FromMinutes ( 2 ) ;
24+
1725 protected override bool InterpretExistsResult ( object ? value )
1826 => throw new InvalidOperationException ( "Shouldn't be called" ) ;
1927
@@ -25,152 +33,168 @@ public override async Task<IMigrationsDatabaseLock> AcquireDatabaseLockAsync(
2533 )
2634 {
2735 Dependencies . MigrationsLogger . AcquiringMigrationLock ( ) ;
28- var dbLock =
29- new YdbMigrationDatabaseLock ( "migrationLock" , this , ( YdbRelationalConnection ) Dependencies . Connection ) ;
30- await dbLock . Lock ( timeoutInSeconds : 60 , cancellationToken ) ;
31- return dbLock ;
32- }
3336
34- public override string GetCreateIfNotExistsScript ( )
35- => GetCreateScript ( ) . Replace ( "CREATE TABLE" , "CREATE TABLE IF NOT EXISTS" ) ;
37+ var deadline = DateTime . UtcNow + LockTimeout ;
38+ DateTime now ;
3639
37- public override LockReleaseBehavior LockReleaseBehavior => LockReleaseBehavior . Transaction ;
38-
39- protected override string ExistsSql
40- => throw new UnreachableException ( "Shouldn't be called. We check if exists using different approach" ) ;
40+ do
41+ {
42+ now = DateTime . UtcNow ;
4143
42- public override bool Exists ( )
43- => ExistsAsync ( ) . ConfigureAwait ( false ) . GetAwaiter ( ) . GetResult ( ) ;
44+ try
45+ {
46+ await Dependencies . MigrationCommandExecutor . ExecuteNonQueryAsync (
47+ AcquireDatabaseLockCommand ( ) ,
48+ ( ( IYdbRelationalConnection ) Dependencies . Connection ) . Clone ( ) , // TODO usage ExecutionContext
49+ new MigrationExecutionState ( ) ,
50+ commitTransaction : true ,
51+ cancellationToken : cancellationToken
52+ ) . ConfigureAwait ( false ) ;
53+
54+ return new YdbMigrationDatabaseLock ( this ) ;
55+ }
56+ catch ( YdbException )
57+ {
58+ await Task . Delay ( 100 + Random . Shared . Next ( 1000 ) , cancellationToken ) ;
59+ }
60+ } while ( now < deadline ) ;
4461
45- public override Task < bool > ExistsAsync ( CancellationToken cancellationToken = default )
46- {
47- var connection = ( YdbRelationalConnection ) Dependencies . Connection ;
48- var schema = ( YdbConnection ) connection . DbConnection ;
49- var tables = schema . GetSchema ( "tables" ) ;
50-
51- var foundTables =
52- from table in tables . AsEnumerable ( )
53- where table . Field < string > ( "table_type" ) == "TABLE"
54- && table . Field < string > ( "table_name" ) == TableName
55- select table ;
56- return Task . FromResult ( foundTables . Count ( ) == 1 ) ;
62+ throw new YdbException ( "Unable to obtain table lock - another EF instance may be running" ) ;
5763 }
5864
59- public override string GetBeginIfNotExistsScript ( string migrationId ) => throw new NotImplementedException ( ) ;
60-
61- public override string GetBeginIfExistsScript ( string migrationId ) => throw new NotImplementedException ( ) ;
62-
63- public override string GetEndIfScript ( ) => throw new NotImplementedException ( ) ;
64-
65- private sealed class YdbMigrationDatabaseLock (
66- string name ,
67- IHistoryRepository historyRepository ,
68- YdbRelationalConnection ydbConnection
69- ) : IMigrationsDatabaseLock
70- {
71- private IYdbRelationalConnection Connection { get ; } = ydbConnection . Clone ( ) ;
72- private volatile string _pid = null ! ;
73- private CancellationTokenSource ? _watchDogToken ;
74-
75- public async Task Lock ( int timeoutInSeconds , CancellationToken cancellationToken = default )
65+ private IReadOnlyList < MigrationCommand > AcquireDatabaseLockCommand ( ) =>
66+ Dependencies . MigrationsSqlGenerator . Generate ( new List < MigrationOperation >
7667 {
77- if ( _watchDogToken != null )
78- {
79- throw new InvalidOperationException ( "Already locked" ) ;
80- }
81-
82- await Connection . OpenAsync ( cancellationToken ) ;
83- await using ( var command = Connection . DbConnection . CreateCommand ( ) )
68+ new SqlOperation
8469 {
85- command . CommandText = """
86- CREATE TABLE IF NOT EXISTS shedlock (
87- name Text NOT NULL,
88- locked_at Timestamp NOT NULL,
89- lock_until Timestamp NOT NULL,
90- locked_by Text NOT NULL,
91- PRIMARY KEY(name)
92- );
93- """ ;
94- await command . ExecuteNonQueryAsync ( cancellationToken ) ;
70+ Sql = GetInsertScript (
71+ new HistoryRow (
72+ LockKey ,
73+ $ "LockTime: { DateTime . UtcNow . ToString ( CultureInfo . InvariantCulture ) } , PID: { Environment . ProcessId } "
74+ )
75+ )
9576 }
77+ } ) ;
9678
97- _pid = $ "PID:{ Environment . ProcessId } ";
79+ private async Task ReleaseDatabaseLockAsync ( )
80+ {
81+ for ( var i = 0 ; i < ReleaseMaxAttempt ; i ++ )
82+ {
83+ await using var connection = ( ( IYdbRelationalConnection ) Dependencies . Connection ) . Clone ( ) . DbConnection ;
9884
99- var lockAcquired = false ;
100- for ( var i = 0 ; i < 10 ; i ++ )
85+ try
10186 {
102- if ( await UpdateLock ( name , timeoutInSeconds ) )
103- {
104- lockAcquired = true ;
105- break ;
106- }
87+ await Dependencies . MigrationCommandExecutor . ExecuteNonQueryAsync (
88+ ReleaseDatabaseLockCommand ( ) ,
89+ ( ( IYdbRelationalConnection ) Dependencies . Connection ) . Clone ( )
90+ ) . ConfigureAwait ( false ) ;
10791
108- await Task . Delay ( TimeSpan . FromMilliseconds ( 500 ) , cancellationToken ) ;
92+ return ;
10993 }
110-
111- if ( ! lockAcquired )
94+ catch ( YdbException e )
11295 {
113- throw new TimeoutException ( "Failed to acquire lock for migration` " ) ;
96+ Dependencies . MigrationsLogger . Logger . LogError ( e , "Failed release database lock" ) ;
11497 }
98+ }
99+ }
115100
116- _watchDogToken = new CancellationTokenSource ( ) ;
117- _ = Task . Run ( ( async Task ( ) =>
118- {
119- while ( true )
120- {
121- // ReSharper disable once PossibleLossOfFraction
122- await Task . Delay ( TimeSpan . FromSeconds ( timeoutInSeconds / 2 ) , _watchDogToken . Token ) ;
123- await UpdateLock ( name , timeoutInSeconds ) ;
124- }
125- // ReSharper disable once FunctionNeverReturns
126- } ) ! , _watchDogToken . Token ) ;
101+ private IReadOnlyList < MigrationCommand > ReleaseDatabaseLockCommand ( ) =>
102+ Dependencies . MigrationsSqlGenerator . Generate ( new List < MigrationOperation >
103+ { new SqlOperation { Sql = GetDeleteScript ( LockKey ) } }
104+ ) ;
105+
106+ bool IHistoryRepository . CreateIfNotExists ( ) => CreateIfNotExistsAsync ( ) . GetAwaiter ( ) . GetResult ( ) ;
107+
108+ public async Task < bool > CreateIfNotExistsAsync ( CancellationToken cancellationToken = default )
109+ {
110+ if ( await ExistsAsync ( cancellationToken ) )
111+ {
112+ return false ;
127113 }
128114
129- private async Task < bool > UpdateLock ( string nameLock , int timeoutInSeconds )
115+ try
130116 {
131- var command = Connection . DbConnection . CreateCommand ( ) ;
132- command . CommandText =
133- $ """
134- UPSERT INTO shedlock (name, locked_at, lock_until, locked_by)
135- VALUES (
136- @name,
137- CurrentUtcTimestamp(),
138- Unwrap(CurrentUtcTimestamp() + Interval("PT{ timeoutInSeconds } S")),
139- @locked_by
140- );
141- """ ;
142- command . Parameters . Add ( new YdbParameter ( "name" , DbType . String , nameLock ) ) ;
143- command . Parameters . Add ( new YdbParameter ( "locked_by" , DbType . String , _pid ) ) ;
117+ await Dependencies . MigrationCommandExecutor . ExecuteNonQueryAsync (
118+ GetCreateIfNotExistsCommands ( ) ,
119+ Dependencies . Connection ,
120+ cancellationToken : cancellationToken
121+ ) . ConfigureAwait ( false ) ;
144122
145- try
123+ return true ;
124+ }
125+ catch ( YdbException e )
126+ {
127+ if ( e . Code == StatusCode . Overloaded )
146128 {
147- await command . ExecuteNonQueryAsync ( ) ;
148129 return true ;
149130 }
150- catch ( YdbException )
131+
132+ throw ;
133+ }
134+ }
135+
136+ private IReadOnlyList < MigrationCommand > GetCreateIfNotExistsCommands ( ) =>
137+ Dependencies . MigrationsSqlGenerator . Generate ( new List < MigrationOperation >
138+ {
139+ new SqlOperation
151140 {
152- return false ;
141+ Sql = GetCreateIfNotExistsScript ( ) ,
142+ SuppressTransaction = true
153143 }
154- }
144+ } ) ;
155145
156- public void Dispose ( )
157- => DisposeInternalAsync ( ) . GetAwaiter ( ) . GetResult ( ) ;
146+ public override string GetCreateIfNotExistsScript ( )
147+ => GetCreateScript ( ) . Replace ( "CREATE TABLE" , "CREATE TABLE IF NOT EXISTS" ) ;
158148
159- public async ValueTask DisposeAsync ( )
160- => await DisposeInternalAsync ( ) ;
149+ public override LockReleaseBehavior LockReleaseBehavior => LockReleaseBehavior . Transaction ;
161150
162- private async Task DisposeInternalAsync ( )
151+ protected override string ExistsSql
152+ => throw new UnreachableException ( "Shouldn't be called. We check if exists using different approach" ) ;
153+
154+ public override bool Exists ( )
155+ => ExistsAsync ( ) . ConfigureAwait ( false ) . GetAwaiter ( ) . GetResult ( ) ;
156+
157+ public override async Task < bool > ExistsAsync ( CancellationToken cancellationToken = default )
158+ {
159+ try
163160 {
164- if ( _watchDogToken != null )
161+ await Dependencies . MigrationCommandExecutor . ExecuteNonQueryAsync (
162+ SelectHistoryTableCommand ( ) ,
163+ Dependencies . Connection ,
164+ new MigrationExecutionState ( ) ,
165+ commitTransaction : true ,
166+ cancellationToken : cancellationToken
167+ ) . ConfigureAwait ( false ) ;
168+
169+ return true ;
170+ }
171+ catch ( YdbException )
172+ {
173+ return false ;
174+ }
175+ }
176+
177+ private IReadOnlyList < MigrationCommand > SelectHistoryTableCommand ( ) =>
178+ Dependencies . MigrationsSqlGenerator . Generate ( new List < MigrationOperation >
179+ {
180+ new SqlOperation
165181 {
166- await _watchDogToken . CancelAsync ( ) ;
182+ Sql = $ "SELECT * FROM { SqlGenerationHelper . DelimitIdentifier ( TableName , TableSchema ) } " +
183+ $ " WHERE MigrationId = '{ LockKey } ';"
167184 }
185+ } ) ;
168186
169- _watchDogToken = null ;
170- await using var connection = Connection . DbConnection . CreateCommand ( ) ;
171- connection . CommandText = "DELETE FROM shedlock WHERE name = '{_name}' AND locked_by = '{PID}';" ;
172- await connection . ExecuteNonQueryAsync ( ) ;
173- }
187+ public override string GetBeginIfNotExistsScript ( string migrationId ) => throw new NotSupportedException ( ) ;
188+
189+ public override string GetBeginIfExistsScript ( string migrationId ) => throw new NotSupportedException ( ) ;
190+
191+ public override string GetEndIfScript ( ) => throw new NotSupportedException ( ) ;
192+
193+ private sealed class YdbMigrationDatabaseLock ( YdbHistoryRepository historyRepository ) : IMigrationsDatabaseLock
194+ {
195+ public void Dispose ( ) => historyRepository . ReleaseDatabaseLockAsync ( ) . GetAwaiter ( ) . GetResult ( ) ;
196+
197+ public async ValueTask DisposeAsync ( ) => await historyRepository . ReleaseDatabaseLockAsync ( ) ;
174198
175199 public IHistoryRepository HistoryRepository { get ; } = historyRepository ;
176200 }
0 commit comments