Skip to content

Commit 56d54c7

Browse files
feat: add EnableImplicitSession support (#532)
1 parent a5a2618 commit 56d54c7

File tree

10 files changed

+326
-14
lines changed

10 files changed

+326
-14
lines changed

src/Ydb.Sdk/CHANGELOG.md

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,3 +1,5 @@
1+
- Feat ADO.NET: Added `EnableImplicitSession` to support implicit sessions.
2+
13
## v0.23.1
24

35
- Fixed bug Topic Reader: NullReferenceException when handling StopPartitionSessionRequest ([#528](https://github.com/ydb-platform/ydb-dotnet-sdk/issues/528)).

src/Ydb.Sdk/src/Ado/PoolManager.cs

Lines changed: 3 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -35,12 +35,9 @@ CancellationToken cancellationToken
3535
: Drivers[settings.GrpcConnectionString] = await settings.BuildDriver();
3636
driver.RegisterOwner();
3737

38-
var factory = new PoolingSessionFactory(driver, settings);
39-
var newSessionPool = new PoolingSessionSource<PoolingSession>(factory, settings);
40-
41-
Pools[settings.ConnectionString] = newSessionPool;
42-
43-
return newSessionPool;
38+
return Pools[settings.ConnectionString] = settings.EnableImplicitSession
39+
? new ImplicitSessionSource(driver, settings.LoggerFactory)
40+
: new PoolingSessionSource<PoolingSession>(new PoolingSessionFactory(driver, settings), settings);
4441
}
4542
finally
4643
{

src/Ydb.Sdk/src/Ado/Session/ImplicitSession.cs

Lines changed: 6 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -5,9 +5,12 @@ namespace Ydb.Sdk.Ado.Session;
55

66
internal class ImplicitSession : ISession
77
{
8-
public ImplicitSession(IDriver driver)
8+
private readonly ImplicitSessionSource _source;
9+
10+
public ImplicitSession(IDriver driver, ImplicitSessionSource source)
911
{
1012
Driver = driver;
13+
_source = source;
1114
}
1215

1316
public IDriver Driver { get; }
@@ -47,9 +50,7 @@ public void OnNotSuccessStatusCode(StatusCode code)
4750
{
4851
}
4952

50-
public void Dispose()
51-
{
52-
}
53+
public void Dispose() => _source.ReleaseLease();
5354

54-
private static YdbException NotSupportedTransaction => new("Transactions are not supported in implicit sessions");
55+
private static YdbException NotSupportedTransaction => new("Transactions are not supported in implicit session");
5556
}
Lines changed: 79 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,79 @@
1+
using Microsoft.Extensions.Logging;
2+
3+
namespace Ydb.Sdk.Ado.Session;
4+
5+
internal sealed class ImplicitSessionSource : ISessionSource
6+
{
7+
private const int DisposeTimeoutSeconds = 10;
8+
9+
private readonly IDriver _driver;
10+
private readonly ILogger _logger;
11+
private readonly TaskCompletionSource _drainedTcs = new(TaskCreationOptions.RunContinuationsAsynchronously);
12+
13+
private int _isDisposed;
14+
private int _activeLeaseCount;
15+
16+
internal ImplicitSessionSource(IDriver driver, ILoggerFactory loggerFactory)
17+
{
18+
_driver = driver;
19+
_logger = loggerFactory.CreateLogger<ImplicitSessionSource>();
20+
}
21+
22+
public ValueTask<ISession> OpenSession(CancellationToken cancellationToken)
23+
{
24+
cancellationToken.ThrowIfCancellationRequested();
25+
26+
return TryAcquireLease()
27+
? new ValueTask<ISession>(new ImplicitSession(_driver, this))
28+
: throw new ObjectDisposedException(nameof(ImplicitSessionSource));
29+
}
30+
31+
private bool TryAcquireLease()
32+
{
33+
Interlocked.Increment(ref _activeLeaseCount);
34+
35+
if (Volatile.Read(ref _isDisposed) == 0)
36+
return true;
37+
38+
ReleaseLease();
39+
return false;
40+
}
41+
42+
internal void ReleaseLease()
43+
{
44+
if (Interlocked.Decrement(ref _activeLeaseCount) == 0 && Volatile.Read(ref _isDisposed) == 1)
45+
_drainedTcs.TrySetResult();
46+
}
47+
48+
public async ValueTask DisposeAsync()
49+
{
50+
if (Interlocked.CompareExchange(ref _isDisposed, 1, 0) != 0)
51+
return;
52+
53+
try
54+
{
55+
if (Volatile.Read(ref _activeLeaseCount) != 0)
56+
{
57+
await _drainedTcs.Task.WaitAsync(TimeSpan.FromSeconds(DisposeTimeoutSeconds));
58+
}
59+
}
60+
catch (TimeoutException)
61+
{
62+
_logger.LogCritical("Disposal timed out: Some implicit sessions are still active");
63+
64+
throw new YdbException("Timeout while disposing of the pool: some implicit sessions are still active. " +
65+
"This may indicate a connection leak or suspended operations.");
66+
}
67+
finally
68+
{
69+
try
70+
{
71+
await _driver.DisposeAsync();
72+
}
73+
catch (Exception e)
74+
{
75+
_logger.LogError(e, "Failed to dispose the transport driver");
76+
}
77+
}
78+
}
79+
}

src/Ydb.Sdk/src/Ado/YdbConnectionStringBuilder.cs

Lines changed: 16 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -40,6 +40,7 @@ private void InitDefaultValues()
4040
_maxReceiveMessageSize = GrpcDefaultSettings.MaxReceiveMessageSize;
4141
_disableDiscovery = GrpcDefaultSettings.DisableDiscovery;
4242
_disableServerBalancer = false;
43+
_enableImplicitSession = false;
4344
}
4445

4546
public string Host
@@ -314,6 +315,18 @@ public int CreateSessionTimeout
314315

315316
private int _createSessionTimeout;
316317

318+
public bool EnableImplicitSession
319+
{
320+
get => _enableImplicitSession;
321+
set
322+
{
323+
_enableImplicitSession = value;
324+
SaveValue(nameof(EnableImplicitSession), value);
325+
}
326+
}
327+
328+
private bool _enableImplicitSession;
329+
317330
public ILoggerFactory LoggerFactory { get; init; } = NullLoggerFactory.Instance;
318331

319332
public ICredentialsProvider? CredentialsProvider { get; init; }
@@ -495,6 +508,9 @@ static YdbConnectionOption()
495508
AddOption(new YdbConnectionOption<bool>(BoolExtractor,
496509
(builder, disableServerBalancer) => builder.DisableServerBalancer = disableServerBalancer),
497510
"DisableServerBalancer", "Disable Server Balancer");
511+
AddOption(new YdbConnectionOption<bool>(BoolExtractor,
512+
(builder, enableImplicitSession) => builder.EnableImplicitSession = enableImplicitSession),
513+
"EnableImplicitSession", "Enable Implicit Session");
498514
}
499515

500516
private static void AddOption(YdbConnectionOption option, params string[] keys)
Lines changed: 70 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,70 @@
1+
using Moq;
2+
using Xunit;
3+
using Ydb.Sdk.Ado.Session;
4+
using Ydb.Sdk.Ado.Tests.Utils;
5+
6+
namespace Ydb.Sdk.Ado.Tests.Session;
7+
8+
public class YdbImplicitStressTests
9+
{
10+
private volatile bool _isDisposed;
11+
12+
private IDriver DummyDriver()
13+
{
14+
var m = new Mock<IDriver>(MockBehavior.Loose);
15+
m.Setup(d => d.DisposeAsync())
16+
.Callback(() => _isDisposed = true)
17+
.Returns(ValueTask.CompletedTask);
18+
return m.Object;
19+
}
20+
21+
[Fact]
22+
public async Task StressTest_OpenSession_RaceWithDispose_SuccessfulOpensAreNotDisposed()
23+
{
24+
for (var it = 0; it < 1000; it++)
25+
{
26+
var driver = DummyDriver();
27+
var source = new ImplicitSessionSource(driver, TestUtils.LoggerFactory);
28+
29+
var workers = Enumerable.Range(0, 1000).Select(async _ =>
30+
{
31+
await Task.Delay(Random.Shared.Next(0, 5));
32+
try
33+
{
34+
using var s = await source.OpenSession(CancellationToken.None);
35+
Assert.False(_isDisposed);
36+
}
37+
catch (ObjectDisposedException)
38+
{
39+
}
40+
}).ToArray();
41+
42+
await Task.WhenAll(workers.Append(Task.Run(async () =>
43+
{
44+
await Task.Delay(Random.Shared.Next(0, 3));
45+
await source.DisposeAsync();
46+
})));
47+
48+
Assert.True(_isDisposed);
49+
await Assert.ThrowsAsync<ObjectDisposedException>(() =>
50+
source.OpenSession(CancellationToken.None).AsTask());
51+
_isDisposed = false;
52+
}
53+
}
54+
55+
[Fact]
56+
public async Task DisposeAsync_WhenSessionIsLeaked_ThrowsYdbExceptionWithTimeoutMessage()
57+
{
58+
var driver = DummyDriver();
59+
var source = new ImplicitSessionSource(driver, TestUtils.LoggerFactory);
60+
#pragma warning disable CA2012
61+
_ = source.OpenSession(CancellationToken.None);
62+
#pragma warning restore CA2012
63+
64+
Assert.Equal("Timeout while disposing of the pool: some implicit sessions are still active. " +
65+
"This may indicate a connection leak or suspended operations.",
66+
(await Assert.ThrowsAsync<YdbException>(async () => await source.DisposeAsync())).Message);
67+
Assert.True(_isDisposed);
68+
await Assert.ThrowsAsync<ObjectDisposedException>(() => source.OpenSession(CancellationToken.None).AsTask());
69+
}
70+
}

src/Ydb.Sdk/test/Ydb.Sdk.Ado.Tests/YdbCommandTests.cs

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -204,7 +204,6 @@ public async Task ExecuteScalar_WhenSelectNoRows_ReturnNull()
204204
.ExecuteScalarAsync());
205205
}
206206

207-
208207
public class Data<T>(DbType dbType, T expected, bool isNullable = false)
209208
{
210209
public bool IsNullable { get; } = isNullable || expected == null;

src/Ydb.Sdk/test/Ydb.Sdk.Ado.Tests/YdbConnectionStringBuilderTests.cs

Lines changed: 5 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -30,6 +30,7 @@ public void InitDefaultValues_WhenEmptyConstructorInvoke_ReturnDefaultConnection
3030
Assert.False(ydbConnectionStringBuilder.DisableDiscovery);
3131
Assert.False(ydbConnectionStringBuilder.DisableServerBalancer);
3232
Assert.False(ydbConnectionStringBuilder.UseTls);
33+
Assert.False(ydbConnectionStringBuilder.EnableImplicitSession);
3334

3435
Assert.Equal("UseTls=False;Host=localhost;Port=2136;Database=/local;User=;Password=;ConnectTimeout=5;" +
3536
"KeepAlivePingDelay=10;KeepAlivePingTimeout=10;EnableMultipleHttp2Connections=False;" +
@@ -54,7 +55,7 @@ public void InitConnectionStringBuilder_WhenExpectedKeys_ReturnUpdatedConnection
5455
"Host=server;Port=2135;Database=/my/path;User=Kirill;UseTls=true;MinSessionPool=10;MaxSessionPool=50;" +
5556
"CreateSessionTimeout=30;SessionIdleTimeout=600;ConnectTimeout=30;KeepAlivePingDelay=30;" +
5657
"KeepAlivePingTimeout=60;EnableMultipleHttp2Connections=true;MaxSendMessageSize=1000000;" +
57-
"MaxReceiveMessageSize=1000000;DisableDiscovery=true;DisableServerBalancer=true;"
58+
"MaxReceiveMessageSize=1000000;DisableDiscovery=true;DisableServerBalancer=true;EnableImplicitSession=true;"
5859
);
5960

6061
Assert.Equal(2135, ydbConnectionStringBuilder.Port);
@@ -78,9 +79,11 @@ public void InitConnectionStringBuilder_WhenExpectedKeys_ReturnUpdatedConnection
7879
"ConnectTimeout=30;KeepAlivePingDelay=30;KeepAlivePingTimeout=60;" +
7980
"EnableMultipleHttp2Connections=True;" +
8081
"MaxSendMessageSize=1000000;MaxReceiveMessageSize=1000000;" +
81-
"DisableDiscovery=True;DisableServerBalancer=True", ydbConnectionStringBuilder.ConnectionString);
82+
"DisableDiscovery=True;DisableServerBalancer=True;EnableImplicitSession=True",
83+
ydbConnectionStringBuilder.ConnectionString);
8284
Assert.True(ydbConnectionStringBuilder.DisableDiscovery);
8385
Assert.True(ydbConnectionStringBuilder.DisableServerBalancer);
86+
Assert.True(ydbConnectionStringBuilder.EnableImplicitSession);
8487
Assert.Equal("UseTls=True;Host=server;Port=2135;Database=/my/path;User=Kirill;Password=;ConnectTimeout=30;" +
8588
"KeepAlivePingDelay=30;KeepAlivePingTimeout=60;EnableMultipleHttp2Connections=True;" +
8689
"MaxSendMessageSize=1000000;MaxReceiveMessageSize=1000000;DisableDiscovery=True",

src/Ydb.Sdk/test/Ydb.Sdk.Ado.Tests/YdbConnectionTests.cs

Lines changed: 47 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -473,4 +473,51 @@ public async Task BulkUpsertImporter_ThrowsOnNonexistentTable()
473473

474474
await Assert.ThrowsAsync<YdbException>(async () => { await importer.FlushAsync(); });
475475
}
476+
477+
[Fact]
478+
public async Task ClearPool_FireAndForget_DoesNotBlock_And_PoolsRecreate()
479+
{
480+
var csPooled = ConnectionString +
481+
";UseTls=false;DisableDiscovery=true" +
482+
";CreateSessionTimeout=3;ConnectTimeout=3" +
483+
";KeepAlivePingDelay=0;KeepAlivePingTimeout=0";
484+
var csImplicit = csPooled + ";EnableImplicitSession=true";
485+
486+
await using (var warmPooled = new YdbConnection(csPooled))
487+
{
488+
await warmPooled.OpenAsync();
489+
await using var cmd = warmPooled.CreateCommand();
490+
cmd.CommandText = "SELECT 1";
491+
Assert.Equal(1L, Convert.ToInt64(await cmd.ExecuteScalarAsync()));
492+
}
493+
494+
await using (var warmImplicit = new YdbConnection(csImplicit))
495+
{
496+
await warmImplicit.OpenAsync();
497+
await using var cmd = warmImplicit.CreateCommand();
498+
cmd.CommandText = "SELECT 1";
499+
Assert.Equal(1L, Convert.ToInt64(await cmd.ExecuteScalarAsync()));
500+
}
501+
502+
var clearPooledTask = YdbConnection.ClearPool(new YdbConnection(csPooled));
503+
var clearImplicitTask = YdbConnection.ClearPool(new YdbConnection(csImplicit));
504+
505+
await Task.WhenAll(clearPooledTask, clearImplicitTask);
506+
507+
await using (var checkPooled = new YdbConnection(csPooled))
508+
{
509+
await checkPooled.OpenAsync();
510+
await using var cmd = checkPooled.CreateCommand();
511+
cmd.CommandText = "SELECT 1";
512+
Assert.Equal(1L, Convert.ToInt64(await cmd.ExecuteScalarAsync()));
513+
}
514+
515+
await using (var checkImplicit = new YdbConnection(csImplicit))
516+
{
517+
await checkImplicit.OpenAsync();
518+
await using var cmd = checkImplicit.CreateCommand();
519+
cmd.CommandText = "SELECT 1";
520+
Assert.Equal(1L, Convert.ToInt64(await cmd.ExecuteScalarAsync()));
521+
}
522+
}
476523
}

0 commit comments

Comments
 (0)