Skip to content

Commit 88abe64

Browse files
feat: update ydb transaction semantics
1 parent 23e4818 commit 88abe64

File tree

5 files changed

+88
-21
lines changed

5 files changed

+88
-21
lines changed

src/Ydb.Sdk/src/Ado/YdbCommand.cs

Lines changed: 4 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -185,13 +185,14 @@ protected override async Task<DbDataReader> ExecuteDbDataReaderAsync(CommandBeha
185185
? new ExecuteQuerySettings { TransportTimeout = TimeSpan.FromSeconds(CommandTimeout) }
186186
: new ExecuteQuerySettings();
187187

188+
var transaction = YdbConnection.CurrentTransaction;
189+
188190
var ydbDataReader = await YdbDataReader.CreateYdbDataReader(YdbConnection.Session.ExecuteQuery(
189-
preparedSql.ToString(), ydbParameters, execSettings, Transaction?.TransactionControl),
190-
YdbConnection.Session.OnStatus, Transaction);
191+
preparedSql.ToString(), ydbParameters, execSettings, transaction?.TransactionControl),
192+
YdbConnection.Session.OnStatus, transaction);
191193

192194
YdbConnection.LastReader = ydbDataReader;
193195
YdbConnection.LastCommand = CommandText;
194-
YdbConnection.LastTransaction = Transaction;
195196

196197
return ydbDataReader;
197198
}

src/Ydb.Sdk/src/Ado/YdbConnection.cs

Lines changed: 13 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -67,7 +67,16 @@ public YdbTransaction BeginTransaction(TxMode txMode = TxMode.SerializableRw)
6767
{
6868
EnsureConnectionOpen();
6969

70-
return new YdbTransaction(this, txMode);
70+
if (CurrentTransaction is { Completed: false })
71+
{
72+
throw new InvalidOperationException(
73+
"A transaction is already in progress; nested/concurrent transactions aren't supported."
74+
);
75+
}
76+
77+
CurrentTransaction = new YdbTransaction(this, txMode);
78+
79+
return CurrentTransaction;
7180
}
7281

7382
public override void ChangeDatabase(string databaseName)
@@ -121,9 +130,9 @@ public override async Task CloseAsync()
121130
await LastReader.CloseAsync();
122131
}
123132

124-
if (LastTransaction is { Completed: false })
133+
if (CurrentTransaction is { Completed: false })
125134
{
126-
await LastTransaction.RollbackAsync();
135+
await CurrentTransaction.RollbackAsync();
127136
}
128137

129138
OnStateChange(OpenToClosedEventArgs);
@@ -160,8 +169,8 @@ public override string ConnectionString
160169

161170
internal YdbDataReader? LastReader { get; set; }
162171
internal string LastCommand { get; set; } = string.Empty;
163-
internal YdbTransaction? LastTransaction { get; set; }
164172
internal bool IsBusy => LastReader is { IsClosed: false };
173+
internal YdbTransaction? CurrentTransaction { get; private set; }
165174

166175
public override string DataSource => string.Empty; // TODO
167176

src/Ydb.Sdk/src/Ado/YdbDataReader.cs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -497,7 +497,7 @@ private async Task<State> NextExecPart()
497497
_currentResultSet = part.ResultSet?.FromProto();
498498
ReaderMetadata = _currentResultSet != null ? new Metadata(_currentResultSet) : EmptyMetadata.Instance;
499499

500-
if (_ydbTransaction != null)
500+
if (_ydbTransaction != null && part.TxMeta != null)
501501
{
502502
_ydbTransaction.TxId ??= part.TxMeta.Id;
503503
}

src/Ydb.Sdk/src/Ado/YdbTransaction.cs

Lines changed: 60 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -10,6 +10,8 @@ public sealed class YdbTransaction : DbTransaction
1010
private readonly TxMode _txMode;
1111

1212
private bool _failed;
13+
private YdbConnection? _ydbConnection;
14+
private bool _isDisposed;
1315

1416
internal string? TxId { get; set; }
1517
internal bool Completed { get; private set; }
@@ -32,7 +34,7 @@ internal bool Failed
3234

3335
internal YdbTransaction(YdbConnection ydbConnection, TxMode txMode)
3436
{
35-
DbConnection = ydbConnection;
37+
_ydbConnection = ydbConnection;
3638
_txMode = txMode;
3739
}
3840

@@ -44,7 +46,7 @@ public override void Commit()
4446
// TODO propagate cancellation token
4547
public override async Task CommitAsync(CancellationToken cancellationToken = new())
4648
{
47-
await FinishTransaction(txId => DbConnection.Session.CommitTransaction(txId));
49+
await FinishTransaction(txId => DbConnection!.Session.CommitTransaction(txId));
4850
}
4951

5052
public override void Rollback()
@@ -62,36 +64,43 @@ public override void Rollback()
6264
return;
6365
}
6466

65-
await FinishTransaction(txId => DbConnection.Session.RollbackTransaction(txId));
67+
await FinishTransaction(txId => DbConnection!.Session.RollbackTransaction(txId));
6668
}
6769

68-
protected override YdbConnection DbConnection { get; }
70+
protected override YdbConnection? DbConnection
71+
{
72+
get
73+
{
74+
CheckDisposed();
75+
return _ydbConnection;
76+
}
77+
}
6978

7079
public override IsolationLevel IsolationLevel => _txMode == TxMode.SerializableRw
7180
? IsolationLevel.Serializable
7281
: IsolationLevel.Unspecified;
7382

7483
private async Task FinishTransaction(Func<string, Task<Status>> finishMethod)
7584
{
76-
if (Completed || DbConnection.State == ConnectionState.Closed)
85+
if (DbConnection?.State == ConnectionState.Closed || Completed)
7786
{
7887
throw new InvalidOperationException("This YdbTransaction has completed; it is no longer usable");
7988
}
8089

81-
if (DbConnection.IsBusy)
90+
if (DbConnection!.IsBusy)
8291
{
8392
throw new YdbOperationInProgressException(DbConnection);
8493
}
8594

86-
Completed = true;
87-
88-
if (TxId == null)
89-
{
90-
return; // transaction isn't started
91-
}
92-
9395
try
9496
{
97+
Completed = true;
98+
99+
if (TxId == null)
100+
{
101+
return; // transaction isn't started
102+
}
103+
95104
var status = await finishMethod(TxId); // Commit or Rollback
96105

97106
if (status.IsNotSuccess)
@@ -111,5 +120,43 @@ private async Task FinishTransaction(Func<string, Task<Status>> finishMethod)
111120

112121
throw new YdbException(e.Status);
113122
}
123+
finally
124+
{
125+
_ydbConnection = null;
126+
}
127+
}
128+
129+
protected override void Dispose(bool disposing)
130+
{
131+
if (_isDisposed || !disposing)
132+
return;
133+
134+
if (!Completed)
135+
{
136+
Rollback();
137+
}
138+
139+
_isDisposed = true;
140+
}
141+
142+
public override async ValueTask DisposeAsync()
143+
{
144+
if (_isDisposed)
145+
return;
146+
147+
if (!Completed)
148+
{
149+
await RollbackAsync();
150+
}
151+
152+
_isDisposed = true;
153+
}
154+
155+
private void CheckDisposed()
156+
{
157+
if (_isDisposed)
158+
{
159+
throw new ObjectDisposedException(nameof(YdbTransaction));
160+
}
114161
}
115162
}
Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,10 @@
1+
using AdoNet.Specification.Tests;
2+
3+
namespace Ydb.Sdk.Tests.Ado.Specification;
4+
5+
public class YdbTransactionTests : TransactionTestBase<YdbFactoryFixture>
6+
{
7+
public YdbTransactionTests(YdbFactoryFixture fixture) : base(fixture)
8+
{
9+
}
10+
}

0 commit comments

Comments
 (0)