Skip to content

Commit 826612f

Browse files
committed
Lock implementation for schema generation
1 parent 27276c6 commit 826612f

File tree

2 files changed

+228
-19
lines changed

2 files changed

+228
-19
lines changed

src/EfCore.Ydb/src/Migrations/Internal/YdbHistoryRepository.cs

Lines changed: 110 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -7,7 +7,6 @@
77
using EfCore.Ydb.Storage.Internal;
88
using Microsoft.EntityFrameworkCore.Diagnostics;
99
using Microsoft.EntityFrameworkCore.Migrations;
10-
using Microsoft.EntityFrameworkCore.Storage;
1110
using Ydb.Sdk.Ado;
1211

1312
namespace EfCore.Ydb.Migrations.Internal;
@@ -22,16 +21,17 @@ protected override bool InterpretExistsResult(object? value)
2221
=> throw new InvalidOperationException("Shouldn't be called");
2322

2423
public override IMigrationsDatabaseLock AcquireDatabaseLock()
25-
{
26-
Dependencies.MigrationsLogger.AcquiringMigrationLock();
27-
return new YdbMigrationDatabaseLock(this, Dependencies.Connection);
28-
}
24+
=> AcquireDatabaseLockAsync().GetAwaiter().GetResult();
2925

30-
public override Task<IMigrationsDatabaseLock> AcquireDatabaseLockAsync(
26+
public override async Task<IMigrationsDatabaseLock> AcquireDatabaseLockAsync(
3127
CancellationToken cancellationToken = default
3228
)
3329
{
34-
throw new NotImplementedException();
30+
Dependencies.MigrationsLogger.AcquiringMigrationLock();
31+
var dbLock =
32+
new YdbMigrationDatabaseLock("migrationLock", this, (YdbRelationalConnection)Dependencies.Connection);
33+
await dbLock.Lock(timeoutInSeconds: 60, cancellationToken);
34+
return dbLock;
3535
}
3636

3737
public override string GetCreateIfNotExistsScript()
@@ -74,26 +74,122 @@ public override string GetEndIfScript()
7474
throw new NotImplementedException();
7575
}
7676

77-
// TODO Implement lock
7877
private sealed class YdbMigrationDatabaseLock : IMigrationsDatabaseLock
7978
{
80-
private YdbRelationalConnection _connection;
79+
public IYdbRelationalConnection Connection { get; }
80+
private readonly string _name;
81+
private volatile string _pid;
82+
private CancellationTokenSource? _watchDogToken;
8183

8284
public YdbMigrationDatabaseLock(
85+
string name,
8386
IHistoryRepository historyRepository,
84-
IRelationalConnection connection
87+
YdbRelationalConnection ydbConnection
8588
)
8689
{
90+
_name = name;
8791
HistoryRepository = historyRepository;
88-
_connection = (YdbRelationalConnection)connection;
92+
Connection = ydbConnection.Clone();
8993
}
9094

91-
public void Dispose()
95+
public async Task Lock(
96+
int timeoutInSeconds,
97+
CancellationToken cancellationToken = default
98+
)
99+
{
100+
if (_watchDogToken != null)
101+
{
102+
throw new InvalidOperationException("Already locked");
103+
}
104+
105+
await Connection.OpenAsync(cancellationToken);
106+
await using (var command = Connection.DbConnection.CreateCommand())
107+
{
108+
command.CommandText =
109+
"""
110+
CREATE TABLE IF NOT EXISTS shedlock (
111+
name STRING NOT NULL,
112+
locked_at TIMESTAMP NOT NULL,
113+
lock_until TIMESTAMP NOT NULL,
114+
locked_by STRING NOT NULL,
115+
PRIMARY KEY(name)
116+
);
117+
""";
118+
await command.ExecuteNonQueryAsync(cancellationToken);
119+
}
120+
121+
_pid = $"PID:{Environment.ProcessId}";
122+
123+
var lockAcquired = false;
124+
for (var i = 0; i < 10; i++)
125+
{
126+
if (await UpdateLock(_name, timeoutInSeconds))
127+
{
128+
lockAcquired = true;
129+
break;
130+
}
131+
await Task.Delay(TimeSpan.FromMilliseconds(500), cancellationToken);
132+
}
133+
134+
if (!lockAcquired)
135+
{
136+
throw new TimeoutException("Failed to acquire lock for migration`");
137+
}
138+
139+
_watchDogToken = new CancellationTokenSource();
140+
_ = Task.Run((async Task () =>
141+
{
142+
while (true)
143+
{
144+
await Task.Delay(TimeSpan.FromSeconds(timeoutInSeconds / 2), _watchDogToken.Token);
145+
await UpdateLock(_name, timeoutInSeconds);
146+
}
147+
})!, _watchDogToken.Token);
148+
}
149+
150+
private async Task<bool> UpdateLock(
151+
string name,
152+
int timeoutInSeconds
153+
)
92154
{
155+
var command = Connection.DbConnection.CreateCommand();
156+
command.CommandText =
157+
$"""
158+
UPSERT INTO shedlock (name, locked_at, lock_until, locked_by)
159+
VALUES (
160+
@name,
161+
CurrentUtcTimestamp(),
162+
Unwrap(CurrentUtcTimestamp() + Interval("PT{timeoutInSeconds}S")),
163+
@locked_by
164+
);
165+
""";
166+
command.Parameters.Add(new YdbParameter("name", DbType.String, name));
167+
command.Parameters.Add(new YdbParameter("locked_by", DbType.String, _pid));
168+
try
169+
{
170+
await command.ExecuteNonQueryAsync(default);
171+
return true;
172+
}
173+
catch (YdbException _)
174+
{
175+
return false;
176+
}
93177
}
94178

95-
public ValueTask DisposeAsync()
96-
=> default;
179+
public void Dispose()
180+
=> DisposeInternalAsync().GetAwaiter().GetResult();
181+
182+
public async ValueTask DisposeAsync()
183+
=> await DisposeInternalAsync();
184+
185+
private async Task DisposeInternalAsync()
186+
{
187+
if (_watchDogToken != null) await _watchDogToken.CancelAsync();
188+
_watchDogToken = null;
189+
await using var connection = Connection.DbConnection.CreateCommand();
190+
connection.CommandText = "DELETE FROM shedlock WHERE name = '{_name}' AND locked_by = '{PID}';";
191+
await connection.ExecuteNonQueryAsync();
192+
}
97193

98194
public IHistoryRepository HistoryRepository { get; }
99195
}

src/EfCore.Ydb/src/Migrations/YdbMigrationsSqlGenerator.cs

Lines changed: 118 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,5 @@
11
using System;
2+
using System.Text;
23
using EfCore.Ydb.Metadata.Internal;
34
using Microsoft.EntityFrameworkCore.Metadata;
45
using Microsoft.EntityFrameworkCore.Migrations;
@@ -104,12 +105,124 @@ MigrationCommandListBuilder builder
104105
IndexOptions(operation, model, builder);
105106
}
106107

107-
protected override void Generate(SqlOperation operation, IModel? model, MigrationCommandListBuilder builder)
108+
protected override void Generate(RenameTableOperation operation, IModel? model, MigrationCommandListBuilder builder)
108109
{
109-
builder.AppendLine(operation.Sql);
110-
// TODO: Find out how to apply migration without suppressing transaction
111-
// We suppress transaction because CREATE/DROP operations cannot be executed during them
112-
EndStatement(builder, suppressTransaction: true);
110+
if (operation.NewSchema is not null && operation.NewSchema != operation.Schema)
111+
{
112+
throw new NotImplementedException("Rename table with schema is not supported");
113+
}
114+
115+
if (operation.NewName is not null && operation.NewName != operation.Name)
116+
{
117+
builder
118+
.Append("ALTER TABLE ")
119+
.Append(DelimitIdentifier(operation.Name, operation.Schema))
120+
.AppendLine("RENAME TO")
121+
.Append(DelimitIdentifier(operation.NewName, operation.Schema));
122+
EndStatement(builder);
123+
}
124+
}
125+
126+
protected override void Generate(
127+
InsertDataOperation operation, IModel? model, MigrationCommandListBuilder builder, bool terminate = true
128+
)
129+
{
130+
var sqlBuilder = new StringBuilder();
131+
foreach (var modificationCommand in GenerateModificationCommands(operation, model))
132+
{
133+
SqlGenerator.AppendInsertOperation(
134+
sqlBuilder,
135+
modificationCommand,
136+
0);
137+
}
138+
139+
builder.Append(sqlBuilder.ToString());
140+
141+
if (terminate)
142+
{
143+
EndStatement(builder, suppressTransaction: false);
144+
}
145+
}
146+
147+
protected override void Generate(DeleteDataOperation operation, IModel? model, MigrationCommandListBuilder builder)
148+
{
149+
var sqlBuilder = new StringBuilder();
150+
foreach (var modificationCommand in GenerateModificationCommands(operation, model))
151+
{
152+
SqlGenerator.AppendDeleteOperation(
153+
sqlBuilder,
154+
modificationCommand,
155+
0);
156+
}
157+
158+
builder.Append(sqlBuilder.ToString());
159+
EndStatement(builder, suppressTransaction: false);
160+
}
161+
162+
protected override void Generate(UpdateDataOperation operation, IModel? model, MigrationCommandListBuilder builder)
163+
{
164+
var sqlBuilder = new StringBuilder();
165+
foreach (var modificationCommand in GenerateModificationCommands(operation, model))
166+
{
167+
SqlGenerator.AppendUpdateOperation(
168+
sqlBuilder,
169+
modificationCommand,
170+
0);
171+
}
172+
173+
builder.Append(sqlBuilder.ToString());
174+
EndStatement(builder, suppressTransaction: false);
175+
}
176+
177+
protected override void Generate(
178+
DropForeignKeyOperation operation,
179+
IModel? model,
180+
MigrationCommandListBuilder builder,
181+
bool terminate = true
182+
)
183+
{
184+
// Ignore bc YDB doesn't have foreign keys
185+
}
186+
187+
protected override void Generate(
188+
DropPrimaryKeyOperation operation,
189+
IModel? model,
190+
MigrationCommandListBuilder builder,
191+
bool terminate = true
192+
)
193+
{
194+
// Ignore bc YDB automatically drops primary keys
195+
}
196+
197+
protected override void Generate(
198+
AddForeignKeyOperation operation,
199+
IModel? model,
200+
MigrationCommandListBuilder builder,
201+
bool terminate = true
202+
)
203+
{
204+
// Ignore bc YDB doesn't have foreign keys
205+
}
206+
207+
protected override void Generate(
208+
AddPrimaryKeyOperation operation,
209+
IModel? model,
210+
MigrationCommandListBuilder builder,
211+
bool terminate = true
212+
)
213+
{
214+
// Ignore bc YDB doesn't support adding keys outside table creation
215+
}
216+
217+
218+
// ReSharper disable once RedundantOverriddenMember
219+
protected override void EndStatement(
220+
MigrationCommandListBuilder builder,
221+
// ReSharper disable once OptionalParameterHierarchyMismatch
222+
bool suppressTransaction = true
223+
)
224+
{
225+
base.EndStatement(builder, suppressTransaction);
113226
}
114227

115228
private string DelimitIdentifier(string name, string? schema)

0 commit comments

Comments
 (0)