Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
25 commits
Select commit Hold shift + click to select a range
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion .github/workflows/tests.yml
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,7 @@ jobs:
strategy:
fail-fast: false
matrix:
ydb-version: [ 'trunk' ]
ydb-version: [ 'trunk', 'latest' ]
dotnet-version: [ 6.0.x, 7.0.x ]
include:
- dotnet-version: 6.0.x
Expand Down
7 changes: 4 additions & 3 deletions src/Ydb.Sdk/src/Ado/YdbCommand.cs
Original file line number Diff line number Diff line change
Expand Up @@ -185,13 +185,14 @@ protected override async Task<DbDataReader> ExecuteDbDataReaderAsync(CommandBeha
? new ExecuteQuerySettings { TransportTimeout = TimeSpan.FromSeconds(CommandTimeout) }
: new ExecuteQuerySettings();

var transaction = YdbConnection.CurrentTransaction;

var ydbDataReader = await YdbDataReader.CreateYdbDataReader(YdbConnection.Session.ExecuteQuery(
preparedSql.ToString(), ydbParameters, execSettings, Transaction?.TransactionControl),
YdbConnection.Session.OnStatus, Transaction);
preparedSql.ToString(), ydbParameters, execSettings, transaction?.TransactionControl),
YdbConnection.Session.OnStatus, transaction);

YdbConnection.LastReader = ydbDataReader;
YdbConnection.LastCommand = CommandText;
YdbConnection.LastTransaction = Transaction;

return ydbDataReader;
}
Expand Down
33 changes: 23 additions & 10 deletions src/Ydb.Sdk/src/Ado/YdbConnection.cs
Original file line number Diff line number Diff line change
@@ -1,23 +1,28 @@
using System.Data;
using System.Data.Common;
using System.Diagnostics.CodeAnalysis;
using Ydb.Sdk.Services.Query;
using static System.Data.IsolationLevel;

namespace Ydb.Sdk.Ado;

public sealed class YdbConnection : DbConnection
{
private static readonly YdbConnectionStringBuilder DefaultSettings = new();

private static readonly StateChangeEventArgs ClosedToOpenEventArgs =
new(ConnectionState.Closed, ConnectionState.Open);

private static readonly StateChangeEventArgs OpenToClosedEventArgs =
new(ConnectionState.Open, ConnectionState.Closed);

private bool _disposed;
private YdbConnectionStringBuilder? _connectionStringBuilder;

private YdbConnectionStringBuilder ConnectionStringBuilder { get; set; }
private YdbConnectionStringBuilder ConnectionStringBuilder
{
get => _connectionStringBuilder ??
throw new InvalidOperationException("The ConnectionString property has not been initialized.");
[param: AllowNull] init => _connectionStringBuilder = value;
}

internal Session Session
{
Expand All @@ -34,7 +39,6 @@ internal Session Session

public YdbConnection()
{
ConnectionStringBuilder = DefaultSettings;
}

public YdbConnection(string connectionString)
Expand Down Expand Up @@ -67,7 +71,16 @@ public YdbTransaction BeginTransaction(TxMode txMode = TxMode.SerializableRw)
{
EnsureConnectionOpen();

return new YdbTransaction(this, txMode);
if (CurrentTransaction is { Completed: false })
{
throw new InvalidOperationException(
"A transaction is already in progress; nested/concurrent transactions aren't supported."
);
}

CurrentTransaction = new YdbTransaction(this, txMode);

return CurrentTransaction;
}

public override void ChangeDatabase(string databaseName)
Expand Down Expand Up @@ -121,9 +134,9 @@ public override async Task CloseAsync()
await LastReader.CloseAsync();
}

if (LastTransaction is { Completed: false })
if (CurrentTransaction is { Completed: false })
{
await LastTransaction.RollbackAsync();
await CurrentTransaction.RollbackAsync();
}

OnStateChange(OpenToClosedEventArgs);
Expand All @@ -138,15 +151,15 @@ public override async Task CloseAsync()

public override string ConnectionString
{
get => ConnectionStringBuilder.ConnectionString;
get => _connectionStringBuilder?.ConnectionString ?? string.Empty;
#pragma warning disable CS8765 // Nullability of type of parameter doesn't match overridden member (possibly because of nullability attributes).
set
#pragma warning restore CS8765 // Nullability of type of parameter doesn't match overridden member (possibly because of nullability attributes).
{
EnsureConnectionClosed();

// ReSharper disable once ConditionIsAlwaysTrueOrFalseAccordingToNullableAPIContract
ConnectionStringBuilder = value != null ? new YdbConnectionStringBuilder(value) : DefaultSettings;
_connectionStringBuilder = value != null ? new YdbConnectionStringBuilder(value) : null;
}
}

Expand All @@ -160,8 +173,8 @@ public override string ConnectionString

internal YdbDataReader? LastReader { get; set; }
internal string LastCommand { get; set; } = string.Empty;
internal YdbTransaction? LastTransaction { get; set; }
internal bool IsBusy => LastReader is { IsClosed: false };
internal YdbTransaction? CurrentTransaction { get; private set; }

public override string DataSource => string.Empty; // TODO

Expand Down
15 changes: 14 additions & 1 deletion src/Ydb.Sdk/src/Ado/YdbDataReader.cs
Original file line number Diff line number Diff line change
Expand Up @@ -434,9 +434,22 @@ public override IEnumerator<YdbDataRecord> GetEnumerator()

public override async Task CloseAsync()
{
if (ReaderState == State.Closed)
{
return;
}

ReaderState = State.Closed;
_onNotSuccessStatus(new Status(StatusCode.SessionBusy));

await _stream.DisposeAsync();

if (_ydbTransaction != null)
{
_ydbTransaction.Failed = true;

throw new YdbException("YdbDataReader was closed during transaction execution. Transaction is broken!");
}
}

public override void Close()
Expand Down Expand Up @@ -497,7 +510,7 @@ private async Task<State> NextExecPart()
_currentResultSet = part.ResultSet?.FromProto();
ReaderMetadata = _currentResultSet != null ? new Metadata(_currentResultSet) : EmptyMetadata.Instance;

if (_ydbTransaction != null)
if (_ydbTransaction != null && part.TxMeta != null)
{
_ydbTransaction.TxId ??= part.TxMeta.Id;
}
Expand Down
73 changes: 60 additions & 13 deletions src/Ydb.Sdk/src/Ado/YdbTransaction.cs
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,8 @@ public sealed class YdbTransaction : DbTransaction
private readonly TxMode _txMode;

private bool _failed;
private YdbConnection? _ydbConnection;
private bool _isDisposed;

internal string? TxId { get; set; }
internal bool Completed { get; private set; }
Expand All @@ -32,7 +34,7 @@ internal bool Failed

internal YdbTransaction(YdbConnection ydbConnection, TxMode txMode)
{
DbConnection = ydbConnection;
_ydbConnection = ydbConnection;
_txMode = txMode;
}

Expand All @@ -44,7 +46,7 @@ public override void Commit()
// TODO propagate cancellation token
public override async Task CommitAsync(CancellationToken cancellationToken = new())
{
await FinishTransaction(txId => DbConnection.Session.CommitTransaction(txId));
await FinishTransaction(txId => DbConnection!.Session.CommitTransaction(txId));
}

public override void Rollback()
Expand All @@ -62,36 +64,43 @@ public override void Rollback()
return;
}

await FinishTransaction(txId => DbConnection.Session.RollbackTransaction(txId));
await FinishTransaction(txId => DbConnection!.Session.RollbackTransaction(txId));
}

protected override YdbConnection DbConnection { get; }
protected override YdbConnection? DbConnection
{
get
{
CheckDisposed();
return _ydbConnection;
}
}

public override IsolationLevel IsolationLevel => _txMode == TxMode.SerializableRw
? IsolationLevel.Serializable
: IsolationLevel.Unspecified;

private async Task FinishTransaction(Func<string, Task<Status>> finishMethod)
{
if (Completed || DbConnection.State == ConnectionState.Closed)
if (DbConnection?.State == ConnectionState.Closed || Completed)
{
throw new InvalidOperationException("This YdbTransaction has completed; it is no longer usable");
}

if (DbConnection.IsBusy)
if (DbConnection!.IsBusy)
{
throw new YdbOperationInProgressException(DbConnection);
}

Completed = true;

if (TxId == null)
{
return; // transaction isn't started
}

try
{
Completed = true;

if (TxId == null)
{
return; // transaction isn't started
}

var status = await finishMethod(TxId); // Commit or Rollback

if (status.IsNotSuccess)
Expand All @@ -111,5 +120,43 @@ private async Task FinishTransaction(Func<string, Task<Status>> finishMethod)

throw new YdbException(e.Status);
}
finally
{
_ydbConnection = null;
}
}

protected override void Dispose(bool disposing)
{
if (_isDisposed || !disposing)
return;

if (!Completed)
{
Rollback();
}

_isDisposed = true;
}

public override async ValueTask DisposeAsync()
{
if (_isDisposed)
return;

if (!Completed)
{
await RollbackAsync();
}

_isDisposed = true;
}

private void CheckDisposed()
{
if (_isDisposed)
{
throw new ObjectDisposedException(nameof(YdbTransaction));
}
}
}
8 changes: 0 additions & 8 deletions src/Ydb.Sdk/tests/Ado/Specification/YdbConnectionTests.cs
Original file line number Diff line number Diff line change
Expand Up @@ -25,14 +25,6 @@ public override Task DisposeAsync_raises_Disposed()
return base.DisposeAsync_raises_Disposed();
}

#pragma warning disable xUnit1004
[Fact(Skip = "Connect to default settings 'grpc://localhost:2136/local'.")]
#pragma warning restore xUnit1004
public override void Open_throws_when_no_connection_string()
{
base.Open_throws_when_no_connection_string();
}

#pragma warning disable xUnit1004
[Fact(Skip = "TODO Supported this field.")]
#pragma warning restore xUnit1004
Expand Down
2 changes: 1 addition & 1 deletion src/Ydb.Sdk/tests/Ado/Specification/YdbFactoryFixture.cs
Original file line number Diff line number Diff line change
Expand Up @@ -8,5 +8,5 @@ public class YdbFactoryFixture : IDbFactoryFixture
{
public DbProviderFactory Factory => YdbProviderFactory.Instance;

public string ConnectionString => "Host=localhost;Port=2136;Database=local";
public string ConnectionString => "Host=localhost;Port=2136;Database=local;MaxSessionPool=10";
}
10 changes: 10 additions & 0 deletions src/Ydb.Sdk/tests/Ado/Specification/YdbTransactionTests.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,10 @@
using AdoNet.Specification.Tests;

namespace Ydb.Sdk.Tests.Ado.Specification;

public class YdbTransactionTests : TransactionTestBase<YdbFactoryFixture>
{
public YdbTransactionTests(YdbFactoryFixture fixture) : base(fixture)
{
}
}
30 changes: 23 additions & 7 deletions src/Ydb.Sdk/tests/Ado/YdbAdoUserPasswordTests.cs
Original file line number Diff line number Diff line change
@@ -1,32 +1,48 @@
using Xunit;
using Ydb.Sdk.Ado;
using Ydb.Sdk.Tests.Ado.Specification;
using Ydb.Sdk.Tests.Fixture;

namespace Ydb.Sdk.Tests.Ado;

public class YdbAdoUserPasswordTests
public class YdbAdoUserPasswordTests : YdbAdoNetFixture
{
public YdbAdoUserPasswordTests(YdbFactoryFixture fixture) : base(fixture)
{
}

[Fact]
public async Task Authentication_WhenUserAndPassword_ReturnValidConnection()
{
await using var connection = new YdbConnection();
await connection.OpenAsync();

await using var connection = await CreateOpenConnectionAsync();
var ydbCommand = connection.CreateCommand();
var kurdyukovkirya = "kurdyukovkirya" + Random.Shared.Next();
ydbCommand.CommandText = $"CREATE USER {kurdyukovkirya} PASSWORD 'password'";
await ydbCommand.ExecuteNonQueryAsync();
await connection.CloseAsync();

await using var userPasswordConnection = new YdbConnection($"User={kurdyukovkirya};Password=password;");
await using var userPasswordConnection =
new YdbConnection($"{ConnectionString};User={kurdyukovkirya};Password=password;");
await userPasswordConnection.OpenAsync();
ydbCommand = userPasswordConnection.CreateCommand();
ydbCommand.CommandText = "SELECT 1 + 2";
Assert.Equal(3, await ydbCommand.ExecuteScalarAsync());

await using var newConnection = new YdbConnection();
await newConnection.OpenAsync();
await using var newConnection = await CreateOpenConnectionAsync();
ydbCommand = newConnection.CreateCommand();
ydbCommand.CommandText = $"DROP USER {kurdyukovkirya};";
await ydbCommand.ExecuteNonQueryAsync();
}

[Fact]
public async Task ExecuteNonQueryAsync_WhenCreateUser_ReturnEmptyResultSet()
{
await using var connection = await CreateOpenConnectionAsync();
var dbCommand = connection.CreateCommand();
var user = "user" + Random.Shared.Next();
dbCommand.CommandText = $"CREATE USER {user} PASSWORD '123qweqwe'";
await dbCommand.ExecuteNonQueryAsync();
dbCommand.CommandText = $"DROP USER {user};";
await dbCommand.ExecuteNonQueryAsync();
}
}
Loading
Loading