Skip to content
Closed
Show file tree
Hide file tree
Changes from 21 commits
Commits
Show all changes
55 commits
Select commit Hold shift + click to select a range
a0927a4
feat: add EnableImplicitSession flag support with parsing tests
LiamHamsters Aug 12, 2025
f18fbf7
feat: add EnableImplicitSession flag support with parsing tests
LiamHamsters Aug 12, 2025
5c7cfc9
resolve conflict
LiamHamsters Aug 12, 2025
5c9ec84
Merge remote-tracking branch 'origin/add-implicit-session-flag' into …
LiamHamsters Aug 12, 2025
eab1450
fix ci
LiamHamsters Aug 12, 2025
72c53f3
feat: add integration tests and rework implicit session handling via …
LiamHamsters Aug 14, 2025
7c35439
`Warning` -> `Debug` on DeleteSession has been `RpcException` & fixes…
KirillKurdyukov Aug 13, 2025
4dfa0bd
Feat: Implement `YdbRetryPolicy` with AWS-inspired Exponential Backof…
KirillKurdyukov Aug 18, 2025
c44750b
feat: added support new datetime types (#517)
KirillKurdyukov Aug 29, 2025
a19f8bb
feat: enforce DECIMAL(p,s) overflow in parameters (#519)
KirillKurdyukov Aug 29, 2025
4ac1c01
Feat ADO.NET: cache gRPC transport by `gRPCConnectionString` to reuse…
KirillKurdyukov Sep 1, 2025
f7da918
feat: add EnableImplicitSession flag support with parsing tests
LiamHamsters Aug 12, 2025
812a077
feat: add EnableImplicitSession flag support with parsing tests
LiamHamsters Aug 12, 2025
0df1155
Merge branch 'main' into add-implicit-session-flag
LiamHamsters Sep 2, 2025
74eac7d
feat: update ImplicitSession for singleton driver
LiamHamsters Sep 3, 2025
c57c3d6
try fix DisableParallelization in PMTests and autoformat
LiamHamsters Sep 3, 2025
a5154b5
fix lint
LiamHamsters Sep 3, 2025
3addeae
fix: keep single command session; make ImplicitSessionSource dispose-…
LiamHamsters Sep 3, 2025
0c53efb
Move implicit session creation to PoolManager by flag
LiamHamsters Sep 4, 2025
46b9ca3
test: validate implicit session disallows transactions but supports n…
LiamHamsters Sep 4, 2025
c6d6a59
fix lint
LiamHamsters Sep 4, 2025
741e28b
feat(ado): add ImplicitSession with PoolManager integration and stres…
LiamHamsters Sep 12, 2025
4253895
Merge branch 'main' into add-implicit-session-flag
LiamHamsters Sep 12, 2025
09dd895
fix lint
LiamHamsters Sep 12, 2025
b8db711
fix
LiamHamsters Sep 12, 2025
fad0f85
hot fix
LiamHamsters Sep 12, 2025
1c2d3ba
hot fix
LiamHamsters Sep 12, 2025
04050d6
feat: add owner registration and dispose logic for implicit sessions
LiamHamsters Sep 12, 2025
274a0e3
refactor(ado): remove onEmpty callback from ImplicitSessionSource
LiamHamsters Sep 12, 2025
80ac532
hot fix
LiamHamsters Sep 12, 2025
275d874
refactor
LiamHamsters Sep 12, 2025
04a31a6
delete onEmpty in stressTest
LiamHamsters Sep 12, 2025
f0c9bff
Refactored implicit session handling and stress tests
LiamHamsters Sep 15, 2025
b8feb70
Merge branch 'main' into add-implicit-session-flag
LiamHamsters Sep 15, 2025
4075173
feat: add EnableImplicitSession flag support with parsing tests
LiamHamsters Aug 12, 2025
bdfdd11
fix ci
LiamHamsters Aug 12, 2025
eb6fcb8
feat: add integration tests and rework implicit session handling via …
LiamHamsters Aug 14, 2025
991a412
feat: update ImplicitSession for singleton driver
LiamHamsters Sep 3, 2025
04f41b0
test
LiamHamsters Sep 16, 2025
ed0de42
test
LiamHamsters Sep 17, 2025
f6c2ff3
Merge branch 'main' into add-implicit-session-flag
LiamHamsters Sep 17, 2025
ef47d68
test
LiamHamsters Sep 17, 2025
0d16841
Revert "test"
LiamHamsters Sep 17, 2025
d058f92
test
LiamHamsters Sep 17, 2025
c4158c2
test
LiamHamsters Sep 17, 2025
622b48e
test
LiamHamsters Sep 17, 2025
e748a85
fix lint
LiamHamsters Sep 17, 2025
08e6bc2
tests: add double OpenSession check in stress tests;
LiamHamsters Sep 18, 2025
448244f
try fix
LiamHamsters Sep 18, 2025
e001a79
test
LiamHamsters Sep 19, 2025
ad4f8c2
Merge branch 'main' into add-implicit-session-flag
LiamHamsters Sep 23, 2025
ff61b60
made dispose two-phase
LiamHamsters Sep 24, 2025
dd0ba06
Merge remote-tracking branch 'origin/add-implicit-session-flag' into …
LiamHamsters Sep 24, 2025
6852546
tried to make a “two-phase”
LiamHamsters Sep 24, 2025
401e7d2
try
LiamHamsters Sep 24, 2025
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
18 changes: 15 additions & 3 deletions src/Ydb.Sdk/src/Ado/PoolManager.cs
Original file line number Diff line number Diff line change
Expand Up @@ -33,10 +33,22 @@ CancellationToken cancellationToken
!cacheDriver.IsDisposed
? cacheDriver
: Drivers[settings.GrpcConnectionString] = await settings.BuildDriver();
driver.RegisterOwner();

var factory = new PoolingSessionFactory(driver, settings);
var newSessionPool = new PoolingSessionSource<PoolingSession>(factory, settings);
ISessionSource newSessionPool;
if (settings.EnableImplicitSession)
{
var key = settings.ConnectionString;
newSessionPool = new ImplicitSessionSource(
driver,
onEmpty: () => Pools.TryRemove(key, out _)
);
}
else
{
driver.RegisterOwner();
var factory = new PoolingSessionFactory(driver, settings);
newSessionPool = new PoolingSessionSource<PoolingSession>(factory, settings);
}

Pools[settings.ConnectionString] = newSessionPool;

Expand Down
9 changes: 5 additions & 4 deletions src/Ydb.Sdk/src/Ado/Session/ImplicitSession.cs
Original file line number Diff line number Diff line change
Expand Up @@ -5,9 +5,12 @@ namespace Ydb.Sdk.Ado.Session;

internal class ImplicitSession : ISession
{
public ImplicitSession(IDriver driver)
private readonly Action? _onClose;

public ImplicitSession(IDriver driver, Action? onClose = null)
{
Driver = driver;
_onClose = onClose;
}

public IDriver Driver { get; }
Expand Down Expand Up @@ -47,9 +50,7 @@ public void OnNotSuccessStatusCode(StatusCode code)
{
}

public void Close()
{
}
public void Close() => _onClose?.Invoke();

private static YdbException NotSupportedTransaction =>
new(StatusCode.BadRequest, "Transactions are not supported in implicit sessions");
Expand Down
61 changes: 61 additions & 0 deletions src/Ydb.Sdk/src/Ado/Session/ImplicitSessionSource.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,61 @@
namespace Ydb.Sdk.Ado.Session;

internal sealed class ImplicitSessionSource : ISessionSource
{
private readonly IDriver _driver;
private readonly Action? _onBecameEmpty;
private int _isDisposed;
private int _activeLeaseCount;

internal ImplicitSessionSource(IDriver driver, Action? onEmpty = null)
{
_driver = driver ?? throw new ArgumentNullException(nameof(driver));
_onBecameEmpty = onEmpty;
}

public ValueTask<ISession> OpenSession(CancellationToken cancellationToken)
{
cancellationToken.ThrowIfCancellationRequested();

if (!TryAcquireLease())
throw new ObjectDisposedException(nameof(ImplicitSessionSource));

return new ValueTask<ISession>(new ImplicitSession(_driver, ReleaseLease));
}

private bool TryAcquireLease()
{
if (Volatile.Read(ref _isDisposed) != 0)
return false;

Interlocked.Increment(ref _activeLeaseCount);

if (Volatile.Read(ref _isDisposed) != 0)
{
Interlocked.Decrement(ref _activeLeaseCount);
return false;
}

return true;
}

private void ReleaseLease()
{
if (Interlocked.Decrement(ref _activeLeaseCount) == 0)
{
_onBecameEmpty?.Invoke();
}
}

public ValueTask DisposeAsync()
{
Interlocked.Exchange(ref _isDisposed, 1);

if (Volatile.Read(ref _activeLeaseCount) == 0)
{
_onBecameEmpty?.Invoke();
}

return ValueTask.CompletedTask;
}
}
16 changes: 13 additions & 3 deletions src/Ydb.Sdk/src/Ado/YdbCommand.cs
Original file line number Diff line number Diff line change
Expand Up @@ -216,9 +216,19 @@ protected override async Task<DbDataReader> ExecuteDbDataReaderAsync(CommandBeha
throw new InvalidOperationException("Transaction mismatched! (Maybe using another connection)");
}

var ydbDataReader = await YdbDataReader.CreateYdbDataReader(await YdbConnection.Session.ExecuteQuery(
preparedSql.ToString(), ydbParameters, execSettings, transaction?.TransactionControl
), YdbConnection.OnNotSuccessStatusCode, transaction, cancellationToken);
var execResult = await YdbConnection.Session.ExecuteQuery(
preparedSql.ToString(),
ydbParameters,
execSettings,
transaction?.TransactionControl
);

var ydbDataReader = await YdbDataReader.CreateYdbDataReader(
execResult,
YdbConnection.OnNotSuccessStatusCode,
transaction,
cancellationToken
);

YdbConnection.LastReader = ydbDataReader;
YdbConnection.LastCommand = CommandText;
Expand Down
2 changes: 1 addition & 1 deletion src/Ydb.Sdk/src/Ado/YdbConnection.cs
Original file line number Diff line number Diff line change
Expand Up @@ -148,7 +148,7 @@ public override string ConnectionString

public override ConnectionState State => ConnectionState;

private ConnectionState ConnectionState { get; set; } = ConnectionState.Closed; // Invoke AsyncOpen()
private ConnectionState ConnectionState { get; set; } = ConnectionState.Closed;

internal void OnNotSuccessStatusCode(StatusCode code)
{
Expand Down
16 changes: 16 additions & 0 deletions src/Ydb.Sdk/src/Ado/YdbConnectionStringBuilder.cs
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,7 @@ private void InitDefaultValues()
_maxReceiveMessageSize = GrpcDefaultSettings.MaxReceiveMessageSize;
_disableDiscovery = GrpcDefaultSettings.DisableDiscovery;
_disableServerBalancer = false;
_enableImplicitSession = false;
}

public string Host
Expand Down Expand Up @@ -314,6 +315,18 @@ public int CreateSessionTimeout

private int _createSessionTimeout;

public bool EnableImplicitSession
{
get => _enableImplicitSession;
set
{
_enableImplicitSession = value;
SaveValue(nameof(EnableImplicitSession), value);
}
}

private bool _enableImplicitSession;

public ILoggerFactory LoggerFactory { get; init; } = NullLoggerFactory.Instance;

public ICredentialsProvider? CredentialsProvider { get; init; }
Expand Down Expand Up @@ -495,6 +508,9 @@ static YdbConnectionOption()
AddOption(new YdbConnectionOption<bool>(BoolExtractor,
(builder, disableServerBalancer) => builder.DisableServerBalancer = disableServerBalancer),
"DisableServerBalancer", "Disable Server Balancer");
AddOption(new YdbConnectionOption<bool>(BoolExtractor,
(builder, enableImplicitSession) => builder.EnableImplicitSession = enableImplicitSession),
"EnableImplicitSession", "ImplicitSession");
}

private static void AddOption(YdbConnectionOption option, params string[] keys)
Expand Down
4 changes: 3 additions & 1 deletion src/Ydb.Sdk/test/Ydb.Sdk.Ado.Tests/PoolManagerTests.cs
Original file line number Diff line number Diff line change
Expand Up @@ -3,8 +3,10 @@

namespace Ydb.Sdk.Ado.Tests;

[Collection("PoolManagerTests")]
[CollectionDefinition("PoolManagerTests", DisableParallelization = true)]
public sealed class PoolManagerCollection;

[Collection("PoolManagerTests")]
public class PoolManagerTests
{
[Theory]
Expand Down
121 changes: 121 additions & 0 deletions src/Ydb.Sdk/test/Ydb.Sdk.Ado.Tests/YdbCommandTests.cs
Original file line number Diff line number Diff line change
Expand Up @@ -204,6 +204,127 @@ public async Task ExecuteScalar_WhenSelectNoRows_ReturnNull()
.ExecuteScalarAsync());
}

[Fact]
public async Task ImplicitSession_SimpleScalar_Works()
{
await using var connection = CreateConnection();
connection.ConnectionString += ";EnableImplicitSession=true";
await connection.OpenAsync();

var cmd = connection.CreateCommand();
cmd.CommandText = "SELECT 40 + 2;";
var scalar = await cmd.ExecuteScalarAsync();
Assert.Equal(42, Convert.ToInt32(scalar));
}

[Fact]
public async Task ImplicitSession_RepeatedScalars_WorksManyTimes()
{
await using var connection = CreateConnection();
connection.ConnectionString += ";EnableImplicitSession=true";
await connection.OpenAsync();

for (var i = 0; i < 30; i++)
{
var cmd = connection.CreateCommand();
cmd.CommandText = $"SELECT {i};";
var scalar = await cmd.ExecuteScalarAsync();
Assert.Equal(i, Convert.ToInt32(scalar));
}
}

[Fact]
public void ImplicitSession_ConcurrentCommand_IsStillBlockedByBusyCheck()
{
using var connection = CreateConnection();
connection.ConnectionString += ";EnableImplicitSession=true";
connection.Open();

var cmd = connection.CreateCommand();
cmd.CommandText = "SELECT 1; SELECT 1;";
using var reader = cmd.ExecuteReader();

var ex = Assert.Throws<YdbOperationInProgressException>(() => cmd.ExecuteReader());
Assert.Equal("A command is already in progress: SELECT 1; SELECT 1;", ex.Message);
}

[Fact]
public async Task ImplicitSession_DisallowsTransactions_And_AllowsNonTransactionalCommands()
{
var table = $"Implicit_{Guid.NewGuid():N}";

await using var connection = CreateConnection();
connection.ConnectionString += ";EnableImplicitSession=true";
await connection.OpenAsync();

try
{
await using (var create = connection.CreateCommand())
{
create.CommandText = $"""
CREATE TABLE {table} (
Id Int32,
Name Text,
PRIMARY KEY (Id)
)
""";
await create.ExecuteNonQueryAsync();
}

await using (var insert = connection.CreateCommand())
{
insert.CommandText = $"INSERT INTO {table} (Id, Name) VALUES (1, 'A');";
await insert.ExecuteNonQueryAsync();
}

var tx = connection.BeginTransaction();
await using (var insertTx = connection.CreateCommand())
{
insertTx.Transaction = tx;
insertTx.CommandText = $"INSERT INTO {table} (Id, Name) VALUES (2, 'B');";
var ex = await Assert.ThrowsAsync<YdbException>(async () => await insertTx.ExecuteNonQueryAsync());
Assert.Contains("Transactions are not supported in implicit sessions", ex.Message);
}

await tx.RollbackAsync();

await using (var check = connection.CreateCommand())
{
check.CommandText = $"SELECT COUNT(*) FROM {table};";
var count = Convert.ToInt32(await check.ExecuteScalarAsync());
Assert.Equal(1, count);
}
}
finally
{
await using var drop = connection.CreateCommand();
drop.CommandText = $"DROP TABLE {table}";
await drop.ExecuteNonQueryAsync();
}
}

[Fact]
public async Task ImplicitSession_Cancellation_AfterFirstResult_StillReturnsFirst()
{
await using var connection = CreateConnection();
connection.ConnectionString += ";EnableImplicitSession=true";
await connection.OpenAsync();

var cmd = new YdbCommand(connection) { CommandText = "SELECT 1; SELECT 1;" };
using var cts = new CancellationTokenSource();

var reader = await cmd.ExecuteReaderAsync(cts.Token);

await reader.ReadAsync(cts.Token);
Assert.Equal(1, reader.GetValue(0));
Assert.True(await reader.NextResultAsync(cts.Token));

await cts.CancelAsync();

await reader.ReadAsync(cts.Token);
Assert.Equal(1, reader.GetValue(0));
Assert.False(await reader.NextResultAsync());
}

public class Data<T>(DbType dbType, T expected, bool isNullable = false)
{
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@ public void InitDefaultValues_WhenEmptyConstructorInvoke_ReturnDefaultConnection
Assert.False(ydbConnectionStringBuilder.DisableDiscovery);
Assert.False(ydbConnectionStringBuilder.DisableServerBalancer);
Assert.False(ydbConnectionStringBuilder.UseTls);
Assert.False(ydbConnectionStringBuilder.EnableImplicitSession);

Assert.Equal("UseTls=False;Host=localhost;Port=2136;Database=/local;User=;Password=;ConnectTimeout=5;" +
"KeepAlivePingDelay=10;KeepAlivePingTimeout=10;EnableMultipleHttp2Connections=False;" +
Expand All @@ -54,7 +55,7 @@ public void InitConnectionStringBuilder_WhenExpectedKeys_ReturnUpdatedConnection
"Host=server;Port=2135;Database=/my/path;User=Kirill;UseTls=true;MinSessionPool=10;MaxSessionPool=50;" +
"CreateSessionTimeout=30;SessionIdleTimeout=600;ConnectTimeout=30;KeepAlivePingDelay=30;" +
"KeepAlivePingTimeout=60;EnableMultipleHttp2Connections=true;MaxSendMessageSize=1000000;" +
"MaxReceiveMessageSize=1000000;DisableDiscovery=true;DisableServerBalancer=true;"
"MaxReceiveMessageSize=1000000;DisableDiscovery=true;DisableServerBalancer=true;EnableImplicitSession=true;"
);

Assert.Equal(2135, ydbConnectionStringBuilder.Port);
Expand All @@ -78,9 +79,11 @@ public void InitConnectionStringBuilder_WhenExpectedKeys_ReturnUpdatedConnection
"ConnectTimeout=30;KeepAlivePingDelay=30;KeepAlivePingTimeout=60;" +
"EnableMultipleHttp2Connections=True;" +
"MaxSendMessageSize=1000000;MaxReceiveMessageSize=1000000;" +
"DisableDiscovery=True;DisableServerBalancer=True", ydbConnectionStringBuilder.ConnectionString);
"DisableDiscovery=True;DisableServerBalancer=True;EnableImplicitSession=True",
ydbConnectionStringBuilder.ConnectionString);
Assert.True(ydbConnectionStringBuilder.DisableDiscovery);
Assert.True(ydbConnectionStringBuilder.DisableServerBalancer);
Assert.True(ydbConnectionStringBuilder.EnableImplicitSession);
Assert.Equal("UseTls=True;Host=server;Port=2135;Database=/my/path;User=Kirill;Password=;ConnectTimeout=30;" +
"KeepAlivePingDelay=30;KeepAlivePingTimeout=60;EnableMultipleHttp2Connections=True;" +
"MaxSendMessageSize=1000000;MaxReceiveMessageSize=1000000;DisableDiscovery=True",
Expand Down
Loading