Skip to content

Commit 374df57

Browse files
committed
test
1 parent 6b7a7de commit 374df57

File tree

9 files changed

+72
-87
lines changed

9 files changed

+72
-87
lines changed

src/Ydb.Sdk/CHANGELOG.md

Lines changed: 0 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,3 @@
1-
- Added provider support for implicit sessions.
2-
3-
## v0.23.0
41
- Feat ADO.NET: `YdbDataSource.OpenRetryableConnectionAsync` opens a retryable connection with automatic retries for transient failures.
52
- Fixed bug ADO.NET/PoolManager: `SemaphoreSlim.WaitAsync` over-release on cancellation.
63
- Feat ADO.NET: Mark `YdbConnection.State` as `Broken` when the underlying session is broken, including background deactivation.

src/Ydb.Sdk/src/Ado/PoolManager.cs

Lines changed: 6 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -10,14 +10,14 @@ internal static class PoolManager
1010
internal static readonly ConcurrentDictionary<string, IDriver> Drivers = new();
1111
internal static readonly ConcurrentDictionary<string, ISessionSource> Pools = new();
1212

13-
internal static async Task<ISession> GetSession(
13+
internal static async ValueTask<ISessionSource> Get(
1414
YdbConnectionStringBuilder settings,
1515
CancellationToken cancellationToken
1616
)
1717
{
1818
if (Pools.TryGetValue(settings.ConnectionString, out var sessionPool))
1919
{
20-
return await sessionPool.OpenSession(cancellationToken);
20+
return sessionPool;
2121
}
2222

2323
await SemaphoreSlim.WaitAsync(cancellationToken);
@@ -26,23 +26,21 @@ CancellationToken cancellationToken
2626
{
2727
if (Pools.TryGetValue(settings.ConnectionString, out var pool))
2828
{
29-
return await pool.OpenSession(cancellationToken);
29+
return pool;
3030
}
3131

3232
var driver = Drivers.TryGetValue(settings.GrpcConnectionString, out var cacheDriver) &&
3333
!cacheDriver.IsDisposed
3434
? cacheDriver
3535
: Drivers[settings.GrpcConnectionString] = await settings.BuildDriver();
36-
3736
driver.RegisterOwner();
3837

39-
ISessionSource newSessionPool = settings.EnableImplicitSession
40-
? new ImplicitSessionSource(driver)
41-
: new PoolingSessionSource<PoolingSession>(new PoolingSessionFactory(driver, settings), settings);
38+
var factory = new PoolingSessionFactory(driver, settings);
39+
var newSessionPool = new PoolingSessionSource<PoolingSession>(factory, settings);
4240

4341
Pools[settings.ConnectionString] = newSessionPool;
4442

45-
return await newSessionPool.OpenSession(cancellationToken);
43+
return newSessionPool;
4644
}
4745
finally
4846
{

src/Ydb.Sdk/src/Ado/Session/ImplicitSession.cs

Lines changed: 14 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -5,12 +5,17 @@ namespace Ydb.Sdk.Ado.Session;
55

66
internal class ImplicitSession : ISession
77
{
8-
private readonly ImplicitSessionSource _source;
8+
private readonly ImplicitSessionSource? _owner;
99

10-
public ImplicitSession(IDriver driver, ImplicitSessionSource source)
10+
public ImplicitSession(IDriver driver)
1111
{
1212
Driver = driver;
13-
_source = source;
13+
}
14+
15+
public ImplicitSession(IDriver driver, ImplicitSessionSource owner)
16+
{
17+
Driver = driver;
18+
_owner = owner;
1419
}
1520

1621
public IDriver Driver { get; }
@@ -24,9 +29,7 @@ public ValueTask<IServerStream<ExecuteQueryResponsePart>> ExecuteQuery(
2429
)
2530
{
2631
if (txControl is not null && !txControl.CommitTx)
27-
{
2832
throw NotSupportedTransaction;
29-
}
3033

3134
var request = new ExecuteQueryRequest
3235
{
@@ -46,11 +49,13 @@ public Task CommitTransaction(string txId, CancellationToken cancellationToken =
4649
public Task RollbackTransaction(string txId, CancellationToken cancellationToken = default) =>
4750
throw NotSupportedTransaction;
4851

49-
public void OnNotSuccessStatusCode(StatusCode code)
52+
public void OnNotSuccessStatusCode(StatusCode code) { }
53+
54+
public void Dispose()
5055
{
56+
_owner?.ReleaseLease();
5157
}
5258

53-
public void Dispose() => _source.ReleaseLease();
54-
55-
private static YdbException NotSupportedTransaction => new("Transactions are not supported in implicit sessions");
59+
private static YdbException NotSupportedTransaction =>
60+
new("Transactions are not supported in implicit sessions");
5661
}

src/Ydb.Sdk/src/Ado/Session/ImplicitSessionSource.cs

Lines changed: 0 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -15,20 +15,16 @@ internal ImplicitSessionSource(IDriver driver)
1515
public ValueTask<ISession> OpenSession(CancellationToken cancellationToken)
1616
{
1717
cancellationToken.ThrowIfCancellationRequested();
18-
1918
if (!TryAcquireLease())
2019
throw new ObjectDisposedException(nameof(ImplicitSessionSource));
21-
2220
return new ValueTask<ISession>(new ImplicitSession(_driver, this));
2321
}
2422

2523
private bool TryAcquireLease()
2624
{
2725
if (Volatile.Read(ref _isDisposed) != 0)
2826
return false;
29-
3027
Interlocked.Increment(ref _activeLeaseCount);
31-
3228
if (Volatile.Read(ref _isDisposed) != 0)
3329
{
3430
Interlocked.Decrement(ref _activeLeaseCount);
@@ -51,7 +47,6 @@ public async ValueTask DisposeAsync()
5147

5248
if (Volatile.Read(ref _activeLeaseCount) != 0)
5349
_allReleased.Wait();
54-
5550
try
5651
{
5752
await _driver.DisposeAsync();

src/Ydb.Sdk/src/Ado/YdbCommand.cs

Lines changed: 17 additions & 35 deletions
Original file line numberDiff line numberDiff line change
@@ -3,7 +3,6 @@
33
using System.Diagnostics.CodeAnalysis;
44
using System.Text;
55
using Ydb.Sdk.Ado.Internal;
6-
using Ydb.Sdk.Ado.Session;
76

87
namespace Ydb.Sdk.Ado;
98

@@ -166,7 +165,8 @@ protected override DbTransaction? DbTransaction
166165
protected override YdbDataReader ExecuteDbDataReader(CommandBehavior behavior) =>
167166
ExecuteReaderAsync(behavior).GetAwaiter().GetResult();
168167

169-
protected override async Task<DbDataReader> ExecuteDbDataReaderAsync(CommandBehavior behavior,
168+
protected override async Task<DbDataReader> ExecuteDbDataReaderAsync(
169+
CommandBehavior behavior,
170170
CancellationToken cancellationToken)
171171
{
172172
cancellationToken.ThrowIfCancellationRequested();
@@ -210,47 +210,28 @@ protected override async Task<DbDataReader> ExecuteDbDataReaderAsync(CommandBeha
210210
? new GrpcRequestSettings { TransportTimeout = TimeSpan.FromSeconds(CommandTimeout) }
211211
: new GrpcRequestSettings();
212212

213-
var connectionTx = YdbConnection.CurrentTransaction;
214-
var effectiveTx = Transaction ?? connectionTx;
213+
var transaction = YdbConnection.CurrentTransaction;
215214

216-
if (Transaction != null && Transaction != connectionTx)
215+
if (Transaction != null && Transaction != transaction) // assert on legacy DbTransaction property
217216
{
218217
throw new InvalidOperationException("Transaction mismatched! (Maybe using another connection)");
219218
}
220219

221-
var useImplicit = YdbConnection.EnableImplicitSession && effectiveTx is null;
222-
var session = useImplicit
223-
? new ImplicitSession(YdbConnection.Session.Driver, new ImplicitSessionSource(YdbConnection.Session.Driver))
224-
: YdbConnection.Session;
220+
var useImplicit = transaction is null && YdbConnection.EnableImplicitSession;
221+
var execSession = YdbConnection.GetExecutionSession(useImplicit);
225222

226-
YdbDataReader ydbDataReader;
227-
try
228-
{
229-
if (effectiveTx is not null && session is ImplicitSession)
230-
{
231-
throw new InvalidOperationException(
232-
"Invariant violated: pooled session expected inside a transaction, but ImplicitSession was selected.");
233-
}
234-
235-
var execResult = await session.ExecuteQuery(
223+
var ydbDataReader = await YdbDataReader.CreateYdbDataReader(
224+
await execSession.ExecuteQuery(
236225
preparedSql.ToString(),
237226
ydbParameters,
238227
execSettings,
239-
useImplicit ? null : effectiveTx?.TransactionControl
240-
);
241-
242-
ydbDataReader = await YdbDataReader.CreateYdbDataReader(
243-
execResult,
244-
YdbConnection.OnNotSuccessStatusCode,
245-
effectiveTx,
246-
cancellationToken
247-
);
248-
}
249-
finally
250-
{
251-
if (useImplicit)
252-
session.Dispose();
253-
}
228+
transaction?.TransactionControl
229+
),
230+
YdbConnection.OnNotSuccessStatusCode,
231+
transaction,
232+
cancellationToken);
233+
234+
YdbConnection.AdoptSession(execSession);
254235

255236
YdbConnection.LastReader = ydbDataReader;
256237
YdbConnection.LastCommand = CommandText;
@@ -269,7 +250,8 @@ protected override async Task<DbDataReader> ExecuteDbDataReaderAsync(CommandBeha
269250
ExecuteReaderAsync(behavior, CancellationToken.None);
270251

271252
// ReSharper disable once MemberCanBePrivate.Global
272-
public new async Task<YdbDataReader> ExecuteReaderAsync(CommandBehavior behavior,
253+
public new async Task<YdbDataReader> ExecuteReaderAsync(
254+
CommandBehavior behavior,
273255
CancellationToken cancellationToken) =>
274256
(YdbDataReader)await ExecuteDbDataReaderAsync(behavior, cancellationToken);
275257
}

src/Ydb.Sdk/src/Ado/YdbConnection.cs

Lines changed: 27 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -2,6 +2,7 @@
22
using System.Data.Common;
33
using System.Diagnostics.CodeAnalysis;
44
using Ydb.Sdk.Ado.BulkUpsert;
5+
using Ydb.Sdk.Ado.RetryPolicy;
56
using Ydb.Sdk.Ado.Session;
67
using static System.Data.IsolationLevel;
78

@@ -37,8 +38,13 @@ internal ISession Session
3738
}
3839

3940
private ISession _session = null!;
40-
41-
internal void OverrideSession(ISession session) => _session = session ?? throw new ArgumentNullException(nameof(session));
41+
42+
internal bool EnableImplicitSession => ConnectionStringBuilder.EnableImplicitSession;
43+
44+
internal ISession GetExecutionSession(bool useImplicit) =>
45+
useImplicit ? new ImplicitSession(Session.Driver) : Session;
46+
47+
internal void AdoptSession(ISession session) => _session = session;
4248

4349
public YdbConnection()
4450
{
@@ -83,11 +89,7 @@ public YdbTransaction BeginTransaction(TransactionMode transactionMode = Transac
8389
return CurrentTransaction;
8490
}
8591

86-
public override void ChangeDatabase(string databaseName)
87-
{
88-
}
89-
90-
internal bool EnableImplicitSession => ConnectionStringBuilder.EnableImplicitSession;
92+
public override void ChangeDatabase(string databaseName) => throw new NotSupportedException();
9193

9294
public override void Close() => CloseAsync().GetAwaiter().GetResult();
9395

@@ -97,19 +99,26 @@ public override async Task OpenAsync(CancellationToken cancellationToken)
9799
{
98100
ThrowIfConnectionOpen();
99101

100-
var sess = await PoolManager.GetSession(ConnectionStringBuilder, cancellationToken);
101-
102-
if (sess is ImplicitSession)
103-
{
104-
var factory = new PoolingSessionFactory(sess.Driver, ConnectionStringBuilder);
105-
var source = new PoolingSessionSource<PoolingSession>(factory, ConnectionStringBuilder);
106-
sess.Dispose();
107-
sess = await source.OpenSession(cancellationToken);
108-
}
102+
var sessionSource = await PoolManager.Get(ConnectionStringBuilder, cancellationToken);
109103

110-
Session = sess;
104+
Session = await sessionSource.OpenSession(cancellationToken);
111105

112106
OnStateChange(ClosedToOpenEventArgs);
107+
108+
ConnectionState = ConnectionState.Open;
109+
}
110+
111+
internal async ValueTask OpenAsync(
112+
YdbRetryPolicyExecutor retryPolicyExecutor,
113+
CancellationToken cancellationToken = default
114+
)
115+
{
116+
ThrowIfConnectionOpen();
117+
118+
var sessionSource = await PoolManager.Get(ConnectionStringBuilder, cancellationToken);
119+
120+
Session = new RetryableSession(sessionSource, retryPolicyExecutor);
121+
113122
ConnectionState = ConnectionState.Open;
114123
}
115124

@@ -171,7 +180,7 @@ public override string ConnectionString
171180
? ConnectionState.Broken
172181
: ConnectionState;
173182

174-
private ConnectionState ConnectionState { get; set; } = ConnectionState.Closed;
183+
private ConnectionState ConnectionState { get; set; } = ConnectionState.Closed; // Invoke AsyncOpen()
175184

176185
internal void OnNotSuccessStatusCode(StatusCode code) => _session.OnNotSuccessStatusCode(code);
177186

src/Ydb.Sdk/src/Ado/YdbDataSource.cs

Lines changed: 6 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -390,11 +390,8 @@ public async ValueTask<YdbConnection> OpenRetryableConnectionAsync(CancellationT
390390
var ydbConnection = CreateDbConnection();
391391
try
392392
{
393-
await ydbConnection.OpenAsync(cancellationToken);
394-
var driver = ydbConnection.Session.Driver;
395-
var source = new Session.ImplicitSessionSource(driver);
396-
var retryable = new Session.RetryableSession(source, _retryPolicyExecutor);
397-
ydbConnection.OverrideSession(retryable);
393+
await ydbConnection.OpenAsync(_retryPolicyExecutor, cancellationToken);
394+
398395
return ydbConnection;
399396
}
400397
catch
@@ -412,7 +409,8 @@ public async ValueTask<YdbConnection> OpenRetryableConnectionAsync(
412409
var ydbConnection = CreateDbConnection();
413410
try
414411
{
415-
await ydbConnection.OpenAsync(cancellationToken);
412+
await ydbConnection.OpenAsync(GetExecutor(ydbRetryPolicyConfig), cancellationToken);
413+
416414
return ydbConnection;
417415
}
418416
catch
@@ -430,7 +428,8 @@ public async ValueTask<YdbConnection> OpenRetryableConnectionAsync(
430428
var ydbConnection = CreateDbConnection();
431429
try
432430
{
433-
await ydbConnection.OpenAsync(cancellationToken);
431+
await ydbConnection.OpenAsync(new YdbRetryPolicyExecutor(retryPolicy), cancellationToken);
432+
434433
return ydbConnection;
435434
}
436435
catch

src/Ydb.Sdk/test/Ydb.Sdk.Ado.Tests/Session/YdbImplicitStressTests.cs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -142,4 +142,4 @@ public async Task Open_RacingWithDispose_StateRemainsConsistent()
142142

143143
await Assert.ThrowsAsync<ObjectDisposedException>(() => source.OpenSession(CancellationToken.None).AsTask());
144144
}
145-
}
145+
}

src/Ydb.Sdk/test/Ydb.Sdk.Ado.Tests/Session/YdbImplictConnectionTests.cs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -125,4 +125,4 @@ public async Task ImplicitSession_Cancellation_AfterFirstResult_StillReturnsFirs
125125
Assert.Equal(1, reader.GetValue(0));
126126
Assert.False(await reader.NextResultAsync());
127127
}
128-
}
128+
}

0 commit comments

Comments
 (0)