Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
65 commits
Select commit Hold shift + click to select a range
a0927a4
feat: add EnableImplicitSession flag support with parsing tests
LiamHamsters Aug 12, 2025
f18fbf7
feat: add EnableImplicitSession flag support with parsing tests
LiamHamsters Aug 12, 2025
5c7cfc9
resolve conflict
LiamHamsters Aug 12, 2025
5c9ec84
Merge remote-tracking branch 'origin/add-implicit-session-flag' into …
LiamHamsters Aug 12, 2025
eab1450
fix ci
LiamHamsters Aug 12, 2025
72c53f3
feat: add integration tests and rework implicit session handling via …
LiamHamsters Aug 14, 2025
7c35439
`Warning` -> `Debug` on DeleteSession has been `RpcException` & fixes…
KirillKurdyukov Aug 13, 2025
4dfa0bd
Feat: Implement `YdbRetryPolicy` with AWS-inspired Exponential Backof…
KirillKurdyukov Aug 18, 2025
c44750b
feat: added support new datetime types (#517)
KirillKurdyukov Aug 29, 2025
a19f8bb
feat: enforce DECIMAL(p,s) overflow in parameters (#519)
KirillKurdyukov Aug 29, 2025
4ac1c01
Feat ADO.NET: cache gRPC transport by `gRPCConnectionString` to reuse…
KirillKurdyukov Sep 1, 2025
f7da918
feat: add EnableImplicitSession flag support with parsing tests
LiamHamsters Aug 12, 2025
812a077
feat: add EnableImplicitSession flag support with parsing tests
LiamHamsters Aug 12, 2025
0df1155
Merge branch 'main' into add-implicit-session-flag
LiamHamsters Sep 2, 2025
74eac7d
feat: update ImplicitSession for singleton driver
LiamHamsters Sep 3, 2025
c57c3d6
try fix DisableParallelization in PMTests and autoformat
LiamHamsters Sep 3, 2025
a5154b5
fix lint
LiamHamsters Sep 3, 2025
3addeae
fix: keep single command session; make ImplicitSessionSource dispose-…
LiamHamsters Sep 3, 2025
0c53efb
Move implicit session creation to PoolManager by flag
LiamHamsters Sep 4, 2025
46b9ca3
test: validate implicit session disallows transactions but supports n…
LiamHamsters Sep 4, 2025
c6d6a59
fix lint
LiamHamsters Sep 4, 2025
741e28b
feat(ado): add ImplicitSession with PoolManager integration and stres…
LiamHamsters Sep 12, 2025
4253895
Merge branch 'main' into add-implicit-session-flag
LiamHamsters Sep 12, 2025
09dd895
fix lint
LiamHamsters Sep 12, 2025
b8db711
fix
LiamHamsters Sep 12, 2025
fad0f85
hot fix
LiamHamsters Sep 12, 2025
1c2d3ba
hot fix
LiamHamsters Sep 12, 2025
04050d6
feat: add owner registration and dispose logic for implicit sessions
LiamHamsters Sep 12, 2025
274a0e3
refactor(ado): remove onEmpty callback from ImplicitSessionSource
LiamHamsters Sep 12, 2025
80ac532
hot fix
LiamHamsters Sep 12, 2025
275d874
refactor
LiamHamsters Sep 12, 2025
04a31a6
delete onEmpty in stressTest
LiamHamsters Sep 12, 2025
f0c9bff
Refactored implicit session handling and stress tests
LiamHamsters Sep 15, 2025
b8feb70
Merge branch 'main' into add-implicit-session-flag
LiamHamsters Sep 15, 2025
4075173
feat: add EnableImplicitSession flag support with parsing tests
LiamHamsters Aug 12, 2025
bdfdd11
fix ci
LiamHamsters Aug 12, 2025
eb6fcb8
feat: add integration tests and rework implicit session handling via …
LiamHamsters Aug 14, 2025
991a412
feat: update ImplicitSession for singleton driver
LiamHamsters Sep 3, 2025
04f41b0
test
LiamHamsters Sep 16, 2025
ed0de42
test
LiamHamsters Sep 17, 2025
f6c2ff3
Merge branch 'main' into add-implicit-session-flag
LiamHamsters Sep 17, 2025
ef47d68
test
LiamHamsters Sep 17, 2025
0d16841
Revert "test"
LiamHamsters Sep 17, 2025
d058f92
test
LiamHamsters Sep 17, 2025
c4158c2
test
LiamHamsters Sep 17, 2025
622b48e
test
LiamHamsters Sep 17, 2025
e748a85
fix lint
LiamHamsters Sep 17, 2025
08e6bc2
tests: add double OpenSession check in stress tests;
LiamHamsters Sep 18, 2025
448244f
try fix
LiamHamsters Sep 18, 2025
e001a79
test
LiamHamsters Sep 19, 2025
ad4f8c2
Merge branch 'main' into add-implicit-session-flag
LiamHamsters Sep 23, 2025
ff61b60
made dispose two-phase
LiamHamsters Sep 24, 2025
dd0ba06
Merge remote-tracking branch 'origin/add-implicit-session-flag' into …
LiamHamsters Sep 24, 2025
6852546
tried to make a “two-phase”
LiamHamsters Sep 24, 2025
401e7d2
try
LiamHamsters Sep 24, 2025
624d9d2
push force
KirillKurdyukov Oct 6, 2025
2e3c87f
update
KirillKurdyukov Oct 6, 2025
508551c
Added logger
KirillKurdyukov Oct 6, 2025
b42602b
fix tests
KirillKurdyukov Oct 6, 2025
f6f3466
fix linter
KirillKurdyukov Oct 6, 2025
208f14d
fix test
KirillKurdyukov Oct 6, 2025
ee578a2
fixed race condition
KirillKurdyukov Oct 7, 2025
7f438fa
fix maybe problem
KirillKurdyukov Oct 7, 2025
5b9ccd0
delete Collection
KirillKurdyukov Oct 7, 2025
fc97eed
fix
KirillKurdyukov Oct 7, 2025
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions src/Ydb.Sdk/CHANGELOG.md
Original file line number Diff line number Diff line change
@@ -1,3 +1,5 @@
- Feat ADO.NET: Added `EnableImplicitSession` to support implicit sessions.

## v0.23.1

- Fixed bug Topic Reader: NullReferenceException when handling StopPartitionSessionRequest ([#528](https://github.com/ydb-platform/ydb-dotnet-sdk/issues/528)).
Expand Down
9 changes: 3 additions & 6 deletions src/Ydb.Sdk/src/Ado/PoolManager.cs
Original file line number Diff line number Diff line change
Expand Up @@ -35,12 +35,9 @@ CancellationToken cancellationToken
: Drivers[settings.GrpcConnectionString] = await settings.BuildDriver();
driver.RegisterOwner();

var factory = new PoolingSessionFactory(driver, settings);
var newSessionPool = new PoolingSessionSource<PoolingSession>(factory, settings);

Pools[settings.ConnectionString] = newSessionPool;

return newSessionPool;
return Pools[settings.ConnectionString] = settings.EnableImplicitSession
? new ImplicitSessionSource(driver, settings.LoggerFactory)
: new PoolingSessionSource<PoolingSession>(new PoolingSessionFactory(driver, settings), settings);
}
finally
{
Expand Down
11 changes: 6 additions & 5 deletions src/Ydb.Sdk/src/Ado/Session/ImplicitSession.cs
Original file line number Diff line number Diff line change
Expand Up @@ -5,9 +5,12 @@ namespace Ydb.Sdk.Ado.Session;

internal class ImplicitSession : ISession
{
public ImplicitSession(IDriver driver)
private readonly ImplicitSessionSource _source;

public ImplicitSession(IDriver driver, ImplicitSessionSource source)
{
Driver = driver;
_source = source;
}

public IDriver Driver { get; }
Expand Down Expand Up @@ -47,9 +50,7 @@ public void OnNotSuccessStatusCode(StatusCode code)
{
}

public void Dispose()
{
}
public void Dispose() => _source.ReleaseLease();

private static YdbException NotSupportedTransaction => new("Transactions are not supported in implicit sessions");
private static YdbException NotSupportedTransaction => new("Transactions are not supported in implicit session");
}
79 changes: 79 additions & 0 deletions src/Ydb.Sdk/src/Ado/Session/ImplicitSessionSource.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,79 @@
using Microsoft.Extensions.Logging;

namespace Ydb.Sdk.Ado.Session;

internal sealed class ImplicitSessionSource : ISessionSource
{
private const int DisposeTimeoutSeconds = 10;

private readonly IDriver _driver;
private readonly ILogger _logger;
private readonly TaskCompletionSource _drainedTcs = new(TaskCreationOptions.RunContinuationsAsynchronously);

private int _isDisposed;
private int _activeLeaseCount;

internal ImplicitSessionSource(IDriver driver, ILoggerFactory loggerFactory)
{
_driver = driver;
_logger = loggerFactory.CreateLogger<ImplicitSessionSource>();
}

public ValueTask<ISession> OpenSession(CancellationToken cancellationToken)
{
cancellationToken.ThrowIfCancellationRequested();

return TryAcquireLease()
? new ValueTask<ISession>(new ImplicitSession(_driver, this))
: throw new ObjectDisposedException(nameof(ImplicitSessionSource));
}

private bool TryAcquireLease()
{
Interlocked.Increment(ref _activeLeaseCount);

if (Volatile.Read(ref _isDisposed) == 0)
return true;

ReleaseLease();
return false;
}

internal void ReleaseLease()
{
if (Interlocked.Decrement(ref _activeLeaseCount) == 0 && Volatile.Read(ref _isDisposed) == 1)
_drainedTcs.TrySetResult();
}

public async ValueTask DisposeAsync()
{
if (Interlocked.CompareExchange(ref _isDisposed, 1, 0) != 0)
return;

try
{
if (Volatile.Read(ref _activeLeaseCount) != 0)
{
await _drainedTcs.Task.WaitAsync(TimeSpan.FromSeconds(DisposeTimeoutSeconds));
}
}
catch (TimeoutException)
{
_logger.LogCritical("Disposal timed out: Some implicit sessions are still active");

throw new YdbException("Timeout while disposing of the pool: some implicit sessions are still active. " +
"This may indicate a connection leak or suspended operations.");
}
finally
{
try
{
await _driver.DisposeAsync();
}
catch (Exception e)
{
_logger.LogError(e, "Failed to dispose the transport driver");
}
}
}
}
16 changes: 16 additions & 0 deletions src/Ydb.Sdk/src/Ado/YdbConnectionStringBuilder.cs
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,7 @@ private void InitDefaultValues()
_maxReceiveMessageSize = GrpcDefaultSettings.MaxReceiveMessageSize;
_disableDiscovery = GrpcDefaultSettings.DisableDiscovery;
_disableServerBalancer = false;
_enableImplicitSession = false;
}

public string Host
Expand Down Expand Up @@ -314,6 +315,18 @@ public int CreateSessionTimeout

private int _createSessionTimeout;

public bool EnableImplicitSession
{
get => _enableImplicitSession;
set
{
_enableImplicitSession = value;
SaveValue(nameof(EnableImplicitSession), value);
}
}

private bool _enableImplicitSession;

public ILoggerFactory LoggerFactory { get; init; } = NullLoggerFactory.Instance;

public ICredentialsProvider? CredentialsProvider { get; init; }
Expand Down Expand Up @@ -495,6 +508,9 @@ static YdbConnectionOption()
AddOption(new YdbConnectionOption<bool>(BoolExtractor,
(builder, disableServerBalancer) => builder.DisableServerBalancer = disableServerBalancer),
"DisableServerBalancer", "Disable Server Balancer");
AddOption(new YdbConnectionOption<bool>(BoolExtractor,
(builder, enableImplicitSession) => builder.EnableImplicitSession = enableImplicitSession),
"EnableImplicitSession", "Enable Implicit Session");
}

private static void AddOption(YdbConnectionOption option, params string[] keys)
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,70 @@
using Moq;
using Xunit;
using Ydb.Sdk.Ado.Session;
using Ydb.Sdk.Ado.Tests.Utils;

namespace Ydb.Sdk.Ado.Tests.Session;

public class YdbImplicitStressTests
{
private volatile bool _isDisposed;

private IDriver DummyDriver()
{
var m = new Mock<IDriver>(MockBehavior.Loose);
m.Setup(d => d.DisposeAsync())
.Callback(() => _isDisposed = true)
.Returns(ValueTask.CompletedTask);
return m.Object;
}

[Fact]
public async Task StressTest_OpenSession_RaceWithDispose_SuccessfulOpensAreNotDisposed()
{
for (var it = 0; it < 1000; it++)
{
var driver = DummyDriver();
var source = new ImplicitSessionSource(driver, TestUtils.LoggerFactory);

var workers = Enumerable.Range(0, 1000).Select(async _ =>
{
await Task.Delay(Random.Shared.Next(0, 5));
try
{
using var s = await source.OpenSession(CancellationToken.None);
Assert.False(_isDisposed);
}
catch (ObjectDisposedException)
{
}
}).ToArray();

await Task.WhenAll(workers.Append(Task.Run(async () =>
{
await Task.Delay(Random.Shared.Next(0, 3));
await source.DisposeAsync();
})));

Assert.True(_isDisposed);
await Assert.ThrowsAsync<ObjectDisposedException>(() =>
source.OpenSession(CancellationToken.None).AsTask());
_isDisposed = false;
}
}

[Fact]
public async Task DisposeAsync_WhenSessionIsLeaked_ThrowsYdbExceptionWithTimeoutMessage()
{
var driver = DummyDriver();
var source = new ImplicitSessionSource(driver, TestUtils.LoggerFactory);
#pragma warning disable CA2012
_ = source.OpenSession(CancellationToken.None);
#pragma warning restore CA2012

Assert.Equal("Timeout while disposing of the pool: some implicit sessions are still active. " +
"This may indicate a connection leak or suspended operations.",
(await Assert.ThrowsAsync<YdbException>(async () => await source.DisposeAsync())).Message);
Assert.True(_isDisposed);
await Assert.ThrowsAsync<ObjectDisposedException>(() => source.OpenSession(CancellationToken.None).AsTask());
}
}
1 change: 0 additions & 1 deletion src/Ydb.Sdk/test/Ydb.Sdk.Ado.Tests/YdbCommandTests.cs
Original file line number Diff line number Diff line change
Expand Up @@ -204,7 +204,6 @@ public async Task ExecuteScalar_WhenSelectNoRows_ReturnNull()
.ExecuteScalarAsync());
}


public class Data<T>(DbType dbType, T expected, bool isNullable = false)
{
public bool IsNullable { get; } = isNullable || expected == null;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@ public void InitDefaultValues_WhenEmptyConstructorInvoke_ReturnDefaultConnection
Assert.False(ydbConnectionStringBuilder.DisableDiscovery);
Assert.False(ydbConnectionStringBuilder.DisableServerBalancer);
Assert.False(ydbConnectionStringBuilder.UseTls);
Assert.False(ydbConnectionStringBuilder.EnableImplicitSession);

Assert.Equal("UseTls=False;Host=localhost;Port=2136;Database=/local;User=;Password=;ConnectTimeout=5;" +
"KeepAlivePingDelay=10;KeepAlivePingTimeout=10;EnableMultipleHttp2Connections=False;" +
Expand All @@ -54,7 +55,7 @@ public void InitConnectionStringBuilder_WhenExpectedKeys_ReturnUpdatedConnection
"Host=server;Port=2135;Database=/my/path;User=Kirill;UseTls=true;MinSessionPool=10;MaxSessionPool=50;" +
"CreateSessionTimeout=30;SessionIdleTimeout=600;ConnectTimeout=30;KeepAlivePingDelay=30;" +
"KeepAlivePingTimeout=60;EnableMultipleHttp2Connections=true;MaxSendMessageSize=1000000;" +
"MaxReceiveMessageSize=1000000;DisableDiscovery=true;DisableServerBalancer=true;"
"MaxReceiveMessageSize=1000000;DisableDiscovery=true;DisableServerBalancer=true;EnableImplicitSession=true;"
);

Assert.Equal(2135, ydbConnectionStringBuilder.Port);
Expand All @@ -78,9 +79,11 @@ public void InitConnectionStringBuilder_WhenExpectedKeys_ReturnUpdatedConnection
"ConnectTimeout=30;KeepAlivePingDelay=30;KeepAlivePingTimeout=60;" +
"EnableMultipleHttp2Connections=True;" +
"MaxSendMessageSize=1000000;MaxReceiveMessageSize=1000000;" +
"DisableDiscovery=True;DisableServerBalancer=True", ydbConnectionStringBuilder.ConnectionString);
"DisableDiscovery=True;DisableServerBalancer=True;EnableImplicitSession=True",
ydbConnectionStringBuilder.ConnectionString);
Assert.True(ydbConnectionStringBuilder.DisableDiscovery);
Assert.True(ydbConnectionStringBuilder.DisableServerBalancer);
Assert.True(ydbConnectionStringBuilder.EnableImplicitSession);
Assert.Equal("UseTls=True;Host=server;Port=2135;Database=/my/path;User=Kirill;Password=;ConnectTimeout=30;" +
"KeepAlivePingDelay=30;KeepAlivePingTimeout=60;EnableMultipleHttp2Connections=True;" +
"MaxSendMessageSize=1000000;MaxReceiveMessageSize=1000000;DisableDiscovery=True",
Expand Down
47 changes: 47 additions & 0 deletions src/Ydb.Sdk/test/Ydb.Sdk.Ado.Tests/YdbConnectionTests.cs
Original file line number Diff line number Diff line change
Expand Up @@ -473,4 +473,51 @@ public async Task BulkUpsertImporter_ThrowsOnNonexistentTable()

await Assert.ThrowsAsync<YdbException>(async () => { await importer.FlushAsync(); });
}

[Fact]
public async Task ClearPool_FireAndForget_DoesNotBlock_And_PoolsRecreate()
{
var csPooled = ConnectionString +
";UseTls=false;DisableDiscovery=true" +
";CreateSessionTimeout=3;ConnectTimeout=3" +
";KeepAlivePingDelay=0;KeepAlivePingTimeout=0";
var csImplicit = csPooled + ";EnableImplicitSession=true";

await using (var warmPooled = new YdbConnection(csPooled))
{
await warmPooled.OpenAsync();
await using var cmd = warmPooled.CreateCommand();
cmd.CommandText = "SELECT 1";
Assert.Equal(1L, Convert.ToInt64(await cmd.ExecuteScalarAsync()));
}

await using (var warmImplicit = new YdbConnection(csImplicit))
{
await warmImplicit.OpenAsync();
await using var cmd = warmImplicit.CreateCommand();
cmd.CommandText = "SELECT 1";
Assert.Equal(1L, Convert.ToInt64(await cmd.ExecuteScalarAsync()));
}

var clearPooledTask = YdbConnection.ClearPool(new YdbConnection(csPooled));
var clearImplicitTask = YdbConnection.ClearPool(new YdbConnection(csImplicit));

await Task.WhenAll(clearPooledTask, clearImplicitTask);

await using (var checkPooled = new YdbConnection(csPooled))
{
await checkPooled.OpenAsync();
await using var cmd = checkPooled.CreateCommand();
cmd.CommandText = "SELECT 1";
Assert.Equal(1L, Convert.ToInt64(await cmd.ExecuteScalarAsync()));
}

await using (var checkImplicit = new YdbConnection(csImplicit))
{
await checkImplicit.OpenAsync();
await using var cmd = checkImplicit.CreateCommand();
cmd.CommandText = "SELECT 1";
Assert.Equal(1L, Convert.ToInt64(await cmd.ExecuteScalarAsync()));
}
}
}
Loading
Loading