Skip to content
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
Show all changes
25 commits
Select commit Hold shift + click to select a range
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 4 additions & 3 deletions src/Ydb.Sdk/src/Ado/YdbCommand.cs
Original file line number Diff line number Diff line change
Expand Up @@ -185,13 +185,14 @@ protected override async Task<DbDataReader> ExecuteDbDataReaderAsync(CommandBeha
? new ExecuteQuerySettings { TransportTimeout = TimeSpan.FromSeconds(CommandTimeout) }
: new ExecuteQuerySettings();

var transaction = YdbConnection.CurrentTransaction;

var ydbDataReader = await YdbDataReader.CreateYdbDataReader(YdbConnection.Session.ExecuteQuery(
preparedSql.ToString(), ydbParameters, execSettings, Transaction?.TransactionControl),
YdbConnection.Session.OnStatus, Transaction);
preparedSql.ToString(), ydbParameters, execSettings, transaction?.TransactionControl),
YdbConnection.Session.OnStatus, transaction);

YdbConnection.LastReader = ydbDataReader;
YdbConnection.LastCommand = CommandText;
YdbConnection.LastTransaction = Transaction;

return ydbDataReader;
}
Expand Down
17 changes: 13 additions & 4 deletions src/Ydb.Sdk/src/Ado/YdbConnection.cs
Original file line number Diff line number Diff line change
Expand Up @@ -67,7 +67,16 @@ public YdbTransaction BeginTransaction(TxMode txMode = TxMode.SerializableRw)
{
EnsureConnectionOpen();

return new YdbTransaction(this, txMode);
if (CurrentTransaction is { Completed: false })
{
throw new InvalidOperationException(
"A transaction is already in progress; nested/concurrent transactions aren't supported."
);
}

CurrentTransaction = new YdbTransaction(this, txMode);

return CurrentTransaction;
}

public override void ChangeDatabase(string databaseName)
Expand Down Expand Up @@ -121,9 +130,9 @@ public override async Task CloseAsync()
await LastReader.CloseAsync();
}

if (LastTransaction is { Completed: false })
if (CurrentTransaction is { Completed: false })
{
await LastTransaction.RollbackAsync();
await CurrentTransaction.RollbackAsync();
}

OnStateChange(OpenToClosedEventArgs);
Expand Down Expand Up @@ -160,8 +169,8 @@ public override string ConnectionString

internal YdbDataReader? LastReader { get; set; }
internal string LastCommand { get; set; } = string.Empty;
internal YdbTransaction? LastTransaction { get; set; }
internal bool IsBusy => LastReader is { IsClosed: false };
internal YdbTransaction? CurrentTransaction { get; private set; }

public override string DataSource => string.Empty; // TODO

Expand Down
2 changes: 1 addition & 1 deletion src/Ydb.Sdk/src/Ado/YdbDataReader.cs
Original file line number Diff line number Diff line change
Expand Up @@ -497,7 +497,7 @@ private async Task<State> NextExecPart()
_currentResultSet = part.ResultSet?.FromProto();
ReaderMetadata = _currentResultSet != null ? new Metadata(_currentResultSet) : EmptyMetadata.Instance;

if (_ydbTransaction != null)
if (_ydbTransaction != null && part.TxMeta != null)
{
_ydbTransaction.TxId ??= part.TxMeta.Id;
}
Expand Down
73 changes: 60 additions & 13 deletions src/Ydb.Sdk/src/Ado/YdbTransaction.cs
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,8 @@ public sealed class YdbTransaction : DbTransaction
private readonly TxMode _txMode;

private bool _failed;
private YdbConnection? _ydbConnection;
private bool _isDisposed;

internal string? TxId { get; set; }
internal bool Completed { get; private set; }
Expand All @@ -32,7 +34,7 @@ internal bool Failed

internal YdbTransaction(YdbConnection ydbConnection, TxMode txMode)
{
DbConnection = ydbConnection;
_ydbConnection = ydbConnection;
_txMode = txMode;
}

Expand All @@ -44,7 +46,7 @@ public override void Commit()
// TODO propagate cancellation token
public override async Task CommitAsync(CancellationToken cancellationToken = new())
{
await FinishTransaction(txId => DbConnection.Session.CommitTransaction(txId));
await FinishTransaction(txId => DbConnection!.Session.CommitTransaction(txId));
}

public override void Rollback()
Expand All @@ -62,36 +64,43 @@ public override void Rollback()
return;
}

await FinishTransaction(txId => DbConnection.Session.RollbackTransaction(txId));
await FinishTransaction(txId => DbConnection!.Session.RollbackTransaction(txId));
}

protected override YdbConnection DbConnection { get; }
protected override YdbConnection? DbConnection
{
get
{
CheckDisposed();
return _ydbConnection;
}
}

public override IsolationLevel IsolationLevel => _txMode == TxMode.SerializableRw
? IsolationLevel.Serializable
: IsolationLevel.Unspecified;

private async Task FinishTransaction(Func<string, Task<Status>> finishMethod)
{
if (Completed || DbConnection.State == ConnectionState.Closed)
if (DbConnection?.State == ConnectionState.Closed || Completed)
{
throw new InvalidOperationException("This YdbTransaction has completed; it is no longer usable");
}

if (DbConnection.IsBusy)
if (DbConnection!.IsBusy)
{
throw new YdbOperationInProgressException(DbConnection);
}

Completed = true;

if (TxId == null)
{
return; // transaction isn't started
}

try
{
Completed = true;

if (TxId == null)
{
return; // transaction isn't started
}

var status = await finishMethod(TxId); // Commit or Rollback

if (status.IsNotSuccess)
Expand All @@ -111,5 +120,43 @@ private async Task FinishTransaction(Func<string, Task<Status>> finishMethod)

throw new YdbException(e.Status);
}
finally
{
_ydbConnection = null;
}
}

protected override void Dispose(bool disposing)
{
if (_isDisposed || !disposing)
return;

if (!Completed)
{
Rollback();
}

_isDisposed = true;
}

public override async ValueTask DisposeAsync()
{
if (_isDisposed)
return;

if (!Completed)
{
await RollbackAsync();
}

_isDisposed = true;
}

private void CheckDisposed()
{
if (_isDisposed)
{
throw new ObjectDisposedException(nameof(YdbTransaction));
}
}
}
10 changes: 10 additions & 0 deletions src/Ydb.Sdk/tests/Ado/Specification/YdbTransactionTests.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,10 @@
using AdoNet.Specification.Tests;

namespace Ydb.Sdk.Tests.Ado.Specification;

public class YdbTransactionTests : TransactionTestBase<YdbFactoryFixture>
{
public YdbTransactionTests(YdbFactoryFixture fixture) : base(fixture)
{
}
}
Loading