Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 0 additions & 1 deletion examples/src/EF/EF.csproj
Original file line number Diff line number Diff line change
Expand Up @@ -11,5 +11,4 @@
<ItemGroup>
<ProjectReference Include="..\..\..\src\EfCore.Ydb\src\EfCore.Ydb.csproj"/>
</ItemGroup>

</Project>
1 change: 1 addition & 0 deletions examples/src/EF/Program.cs
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@

await using var db = new BloggingContext();

await db.Database.EnsureDeletedAsync();
await db.Database.EnsureCreatedAsync();

Console.WriteLine("Inserting a new blog");
Expand Down
3 changes: 2 additions & 1 deletion examples/src/EF_YC/Program.cs
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,8 @@ await Parser.Default.ParseArguments<CmdOptions>(args).WithParsedAsync(async cmd
.Options;

await using var db = new AppDbContext(options);
db.Database.EnsureCreated();
await db.Database.EnsureDeletedAsync();
await db.Database.EnsureCreatedAsync();

db.Users.Add(new User { Name = "Alex", Email = "[email protected]" });
db.Users.Add(new User { Name = "Kirill", Email = "[email protected]" });
Expand Down
258 changes: 141 additions & 117 deletions src/EfCore.Ydb/src/Migrations/Internal/YdbHistoryRepository.cs
Original file line number Diff line number Diff line change
@@ -1,19 +1,27 @@
using System;
using System.Data;
using System.Collections.Generic;
using System.Diagnostics;
using System.Linq;
using System.Globalization;
using System.Threading;
using System.Threading.Tasks;
using EfCore.Ydb.Storage.Internal;
using Microsoft.EntityFrameworkCore.Diagnostics;
using Microsoft.EntityFrameworkCore.Migrations;
using Microsoft.EntityFrameworkCore.Migrations.Operations;
using Microsoft.Extensions.Logging;
using Ydb.Sdk;
using Ydb.Sdk.Ado;

namespace EfCore.Ydb.Migrations.Internal;

// ReSharper disable once ClassNeverInstantiated.Global
public class YdbHistoryRepository(HistoryRepositoryDependencies dependencies) : HistoryRepository(dependencies)
public class YdbHistoryRepository(HistoryRepositoryDependencies dependencies)
: HistoryRepository(dependencies), IHistoryRepository
{
private const string LockKey = "LockMigration";
private const int ReleaseMaxAttempt = 10;

private static readonly TimeSpan LockTimeout = TimeSpan.FromMinutes(2);

protected override bool InterpretExistsResult(object? value)
=> throw new InvalidOperationException("Shouldn't be called");

Expand All @@ -25,152 +33,168 @@ public override async Task<IMigrationsDatabaseLock> AcquireDatabaseLockAsync(
)
{
Dependencies.MigrationsLogger.AcquiringMigrationLock();
var dbLock =
new YdbMigrationDatabaseLock("migrationLock", this, (YdbRelationalConnection)Dependencies.Connection);
await dbLock.Lock(timeoutInSeconds: 60, cancellationToken);
return dbLock;
}

public override string GetCreateIfNotExistsScript()
=> GetCreateScript().Replace("CREATE TABLE", "CREATE TABLE IF NOT EXISTS");
var deadline = DateTime.UtcNow + LockTimeout;
DateTime now;

public override LockReleaseBehavior LockReleaseBehavior => LockReleaseBehavior.Transaction;

protected override string ExistsSql
=> throw new UnreachableException("Shouldn't be called. We check if exists using different approach");
do
{
now = DateTime.UtcNow;

public override bool Exists()
=> ExistsAsync().ConfigureAwait(false).GetAwaiter().GetResult();
try
{
await Dependencies.MigrationCommandExecutor.ExecuteNonQueryAsync(
AcquireDatabaseLockCommand(),
((IYdbRelationalConnection)Dependencies.Connection).Clone(), // TODO usage ExecutionContext
new MigrationExecutionState(),
commitTransaction: true,
cancellationToken: cancellationToken
).ConfigureAwait(false);

return new YdbMigrationDatabaseLock(this);
}
catch (YdbException)
{
await Task.Delay(100 + Random.Shared.Next(1000), cancellationToken);
}
} while (now < deadline);

public override Task<bool> ExistsAsync(CancellationToken cancellationToken = default)
{
var connection = (YdbRelationalConnection)Dependencies.Connection;
var schema = (YdbConnection)connection.DbConnection;
var tables = schema.GetSchema("tables");

var foundTables =
from table in tables.AsEnumerable()
where table.Field<string>("table_type") == "TABLE"
&& table.Field<string>("table_name") == TableName
select table;
return Task.FromResult(foundTables.Count() == 1);
throw new YdbException("Unable to obtain table lock - another EF instance may be running");
}

public override string GetBeginIfNotExistsScript(string migrationId) => throw new NotImplementedException();

public override string GetBeginIfExistsScript(string migrationId) => throw new NotImplementedException();

public override string GetEndIfScript() => throw new NotImplementedException();

private sealed class YdbMigrationDatabaseLock(
string name,
IHistoryRepository historyRepository,
YdbRelationalConnection ydbConnection
) : IMigrationsDatabaseLock
{
private IYdbRelationalConnection Connection { get; } = ydbConnection.Clone();
private volatile string _pid = null!;
private CancellationTokenSource? _watchDogToken;

public async Task Lock(int timeoutInSeconds, CancellationToken cancellationToken = default)
private IReadOnlyList<MigrationCommand> AcquireDatabaseLockCommand() =>
Dependencies.MigrationsSqlGenerator.Generate(new List<MigrationOperation>
{
if (_watchDogToken != null)
{
throw new InvalidOperationException("Already locked");
}

await Connection.OpenAsync(cancellationToken);
await using (var command = Connection.DbConnection.CreateCommand())
new SqlOperation
{
command.CommandText = """
CREATE TABLE IF NOT EXISTS shedlock (
name Text NOT NULL,
locked_at Timestamp NOT NULL,
lock_until Timestamp NOT NULL,
locked_by Text NOT NULL,
PRIMARY KEY(name)
);
""";
await command.ExecuteNonQueryAsync(cancellationToken);
Sql = GetInsertScript(
new HistoryRow(
LockKey,
$"LockTime: {DateTime.UtcNow.ToString(CultureInfo.InvariantCulture)}, PID: {Environment.ProcessId}"
)
)
}
});

_pid = $"PID:{Environment.ProcessId}";
private async Task ReleaseDatabaseLockAsync()
{
for (var i = 0; i < ReleaseMaxAttempt; i++)
{
await using var connection = ((IYdbRelationalConnection)Dependencies.Connection).Clone().DbConnection;

var lockAcquired = false;
for (var i = 0; i < 10; i++)
try
{
if (await UpdateLock(name, timeoutInSeconds))
{
lockAcquired = true;
break;
}
await Dependencies.MigrationCommandExecutor.ExecuteNonQueryAsync(
ReleaseDatabaseLockCommand(),
((IYdbRelationalConnection)Dependencies.Connection).Clone()
).ConfigureAwait(false);

await Task.Delay(TimeSpan.FromMilliseconds(500), cancellationToken);
return;
}

if (!lockAcquired)
catch (YdbException e)
{
throw new TimeoutException("Failed to acquire lock for migration`");
Dependencies.MigrationsLogger.Logger.LogError(e, "Failed release database lock");
}
}
}

_watchDogToken = new CancellationTokenSource();
_ = Task.Run((async Task () =>
{
while (true)
{
// ReSharper disable once PossibleLossOfFraction
await Task.Delay(TimeSpan.FromSeconds(timeoutInSeconds / 2), _watchDogToken.Token);
await UpdateLock(name, timeoutInSeconds);
}
// ReSharper disable once FunctionNeverReturns
})!, _watchDogToken.Token);
private IReadOnlyList<MigrationCommand> ReleaseDatabaseLockCommand() =>
Dependencies.MigrationsSqlGenerator.Generate(new List<MigrationOperation>
{ new SqlOperation { Sql = GetDeleteScript(LockKey) } }
);

bool IHistoryRepository.CreateIfNotExists() => CreateIfNotExistsAsync().GetAwaiter().GetResult();

public async Task<bool> CreateIfNotExistsAsync(CancellationToken cancellationToken = default)
{
if (await ExistsAsync(cancellationToken))
{
return false;
}

private async Task<bool> UpdateLock(string nameLock, int timeoutInSeconds)
try
{
var command = Connection.DbConnection.CreateCommand();
command.CommandText =
$"""
UPSERT INTO shedlock (name, locked_at, lock_until, locked_by)
VALUES (
@name,
CurrentUtcTimestamp(),
Unwrap(CurrentUtcTimestamp() + Interval("PT{timeoutInSeconds}S")),
@locked_by
);
""";
command.Parameters.Add(new YdbParameter("name", DbType.String, nameLock));
command.Parameters.Add(new YdbParameter("locked_by", DbType.String, _pid));
await Dependencies.MigrationCommandExecutor.ExecuteNonQueryAsync(
GetCreateIfNotExistsCommands(),
Dependencies.Connection,
cancellationToken: cancellationToken
).ConfigureAwait(false);

try
return true;
}
catch (YdbException e)
{
if (e.Code == StatusCode.Overloaded)
{
await command.ExecuteNonQueryAsync();
return true;
}
catch (YdbException)

throw;
}
}

private IReadOnlyList<MigrationCommand> GetCreateIfNotExistsCommands() =>
Dependencies.MigrationsSqlGenerator.Generate(new List<MigrationOperation>
{
new SqlOperation
{
return false;
Sql = GetCreateIfNotExistsScript(),
SuppressTransaction = true
}
}
});

public void Dispose()
=> DisposeInternalAsync().GetAwaiter().GetResult();
public override string GetCreateIfNotExistsScript()
=> GetCreateScript().Replace("CREATE TABLE", "CREATE TABLE IF NOT EXISTS");

public async ValueTask DisposeAsync()
=> await DisposeInternalAsync();
public override LockReleaseBehavior LockReleaseBehavior => LockReleaseBehavior.Transaction;

private async Task DisposeInternalAsync()
protected override string ExistsSql
=> throw new UnreachableException("Shouldn't be called. We check if exists using different approach");

public override bool Exists()
=> ExistsAsync().ConfigureAwait(false).GetAwaiter().GetResult();

public override async Task<bool> ExistsAsync(CancellationToken cancellationToken = default)
{
try
{
if (_watchDogToken != null)
await Dependencies.MigrationCommandExecutor.ExecuteNonQueryAsync(
SelectHistoryTableCommand(),
Dependencies.Connection,
new MigrationExecutionState(),
commitTransaction: true,
cancellationToken: cancellationToken
).ConfigureAwait(false);

return true;
}
catch (YdbException)
{
return false;
}
}

private IReadOnlyList<MigrationCommand> SelectHistoryTableCommand() =>
Dependencies.MigrationsSqlGenerator.Generate(new List<MigrationOperation>
{
new SqlOperation
{
await _watchDogToken.CancelAsync();
Sql = $"SELECT * FROM {SqlGenerationHelper.DelimitIdentifier(TableName, TableSchema)}" +
$" WHERE MigrationId = '{LockKey}';"
}
});

_watchDogToken = null;
await using var connection = Connection.DbConnection.CreateCommand();
connection.CommandText = "DELETE FROM shedlock WHERE name = '{_name}' AND locked_by = '{PID}';";
await connection.ExecuteNonQueryAsync();
}
public override string GetBeginIfNotExistsScript(string migrationId) => throw new NotSupportedException();

public override string GetBeginIfExistsScript(string migrationId) => throw new NotSupportedException();

public override string GetEndIfScript() => throw new NotSupportedException();

private sealed class YdbMigrationDatabaseLock(YdbHistoryRepository historyRepository) : IMigrationsDatabaseLock
{
public void Dispose() => historyRepository.ReleaseDatabaseLockAsync().GetAwaiter().GetResult();

public async ValueTask DisposeAsync() => await historyRepository.ReleaseDatabaseLockAsync();

public IHistoryRepository HistoryRepository { get; } = historyRepository;
}
Expand Down
Loading
Loading