Skip to content

Commit 74eac7d

Browse files
committed
feat: update ImplicitSession for singleton driver
1 parent 0df1155 commit 74eac7d

File tree

7 files changed

+206
-10
lines changed

7 files changed

+206
-10
lines changed

src/Ydb.Sdk/src/Ado/Session/ImplicitSession.cs

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -5,9 +5,12 @@ namespace Ydb.Sdk.Ado.Session;
55

66
internal class ImplicitSession : ISession
77
{
8-
public ImplicitSession(IDriver driver)
8+
private readonly Action? _onClose;
9+
10+
public ImplicitSession(IDriver driver, Action? onClose = null)
911
{
1012
Driver = driver;
13+
_onClose = onClose;
1114
}
1215

1316
public IDriver Driver { get; }
@@ -49,6 +52,7 @@ public void OnNotSuccessStatusCode(StatusCode code)
4952

5053
public void Close()
5154
{
55+
_onClose?.Invoke();
5256
}
5357

5458
private static YdbException NotSupportedTransaction =>

src/Ydb.Sdk/src/Ado/Session/ImplicitSessionSource.cs

Lines changed: 21 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -3,13 +3,30 @@ namespace Ydb.Sdk.Ado.Session;
33
internal sealed class ImplicitSessionSource : ISessionSource
44
{
55
private readonly IDriver _driver;
6+
private readonly Action? _onEmpty;
7+
private int _leased;
68

7-
internal ImplicitSessionSource(IDriver driver)
9+
internal ImplicitSessionSource(IDriver driver, Action? onEmpty = null)
810
{
911
_driver = driver;
12+
_onEmpty = onEmpty;
1013
}
1114

12-
public ValueTask<ISession> OpenSession(CancellationToken cancellationToken) => new(new ImplicitSession(_driver));
15+
public ValueTask<ISession> OpenSession(CancellationToken cancellationToken)
16+
{
17+
cancellationToken.ThrowIfCancellationRequested();
18+
Interlocked.Increment(ref _leased);
19+
20+
return new ValueTask<ISession>(new ImplicitSession(_driver, Release));
21+
}
22+
23+
private void Release()
24+
{
25+
if (Interlocked.Decrement(ref _leased) == 0)
26+
{
27+
_onEmpty?.Invoke();
28+
}
29+
}
1330

14-
public async ValueTask DisposeAsync() => await _driver.DisposeAsync();
15-
}
31+
public ValueTask DisposeAsync() => ValueTask.CompletedTask;
32+
}

src/Ydb.Sdk/src/Ado/YdbCommand.cs

Lines changed: 27 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -216,9 +216,33 @@ protected override async Task<DbDataReader> ExecuteDbDataReaderAsync(CommandBeha
216216
throw new InvalidOperationException("Transaction mismatched! (Maybe using another connection)");
217217
}
218218

219-
var ydbDataReader = await YdbDataReader.CreateYdbDataReader(await YdbConnection.Session.ExecuteQuery(
220-
preparedSql.ToString(), ydbParameters, execSettings, transaction?.TransactionControl
221-
), YdbConnection.OnNotSuccessStatusCode, transaction, cancellationToken);
219+
var useImplicit = YdbConnection.EnableImplicitSession && transaction is null;
220+
var session = YdbConnection.GetExecutionSession(useImplicit);
221+
222+
YdbDataReader ydbDataReader;
223+
try
224+
{
225+
var execResult = await session.ExecuteQuery(
226+
preparedSql.ToString(),
227+
ydbParameters,
228+
execSettings,
229+
transaction?.TransactionControl
230+
);
231+
232+
ydbDataReader = await YdbDataReader.CreateYdbDataReader(
233+
execResult,
234+
YdbConnection.OnNotSuccessStatusCode,
235+
transaction,
236+
cancellationToken
237+
);
238+
}
239+
finally
240+
{
241+
if (useImplicit)
242+
{
243+
session.Close();
244+
}
245+
}
222246

223247
YdbConnection.LastReader = ydbDataReader;
224248
YdbConnection.LastCommand = CommandText;

src/Ydb.Sdk/src/Ado/YdbConnection.cs

Lines changed: 16 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -39,6 +39,21 @@ internal ISession Session
3939

4040
private ISession _session = null!;
4141

42+
private ImplicitSessionSource? _implicitSessionSource;
43+
44+
internal bool EnableImplicitSession => ConnectionStringBuilder.EnableImplicitSession;
45+
46+
internal ISession GetExecutionSession(bool useImplicit)
47+
{
48+
ThrowIfConnectionClosed();
49+
50+
if (!useImplicit)
51+
return Session;
52+
53+
_implicitSessionSource ??= new ImplicitSessionSource(Session.Driver, onEmpty: () => _implicitSessionSource = null);
54+
return _implicitSessionSource.OpenSession(CancellationToken.None).GetAwaiter().GetResult();
55+
}
56+
4257
public YdbConnection()
4358
{
4459
}
@@ -127,6 +142,7 @@ public override async Task CloseAsync()
127142
finally
128143
{
129144
_session.Close();
145+
_implicitSessionSource = null;
130146
}
131147
}
132148

src/Ydb.Sdk/src/Ado/YdbConnectionStringBuilder.cs

Lines changed: 16 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -40,6 +40,7 @@ private void InitDefaultValues()
4040
_maxReceiveMessageSize = GrpcDefaultSettings.MaxReceiveMessageSize;
4141
_disableDiscovery = GrpcDefaultSettings.DisableDiscovery;
4242
_disableServerBalancer = false;
43+
_enableImplicitSession = false;
4344
}
4445

4546
public string Host
@@ -314,6 +315,18 @@ public int CreateSessionTimeout
314315

315316
private int _createSessionTimeout;
316317

318+
public bool EnableImplicitSession
319+
{
320+
get => _enableImplicitSession;
321+
set
322+
{
323+
_enableImplicitSession = value;
324+
SaveValue(nameof(EnableImplicitSession), value);
325+
}
326+
}
327+
328+
private bool _enableImplicitSession;
329+
317330
public ILoggerFactory LoggerFactory { get; init; } = NullLoggerFactory.Instance;
318331

319332
public ICredentialsProvider? CredentialsProvider { get; init; }
@@ -495,6 +508,9 @@ static YdbConnectionOption()
495508
AddOption(new YdbConnectionOption<bool>(BoolExtractor,
496509
(builder, disableServerBalancer) => builder.DisableServerBalancer = disableServerBalancer),
497510
"DisableServerBalancer", "Disable Server Balancer");
511+
AddOption(new YdbConnectionOption<bool>(BoolExtractor,
512+
(builder, enableImplicitSession) => builder.EnableImplicitSession = enableImplicitSession),
513+
"EnableImplicitSession", "ImplicitSession");
498514
}
499515

500516
private static void AddOption(YdbConnectionOption option, params string[] keys)

src/Ydb.Sdk/test/Ydb.Sdk.Ado.Tests/YdbCommandTests.cs

Lines changed: 116 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -204,6 +204,122 @@ public async Task ExecuteScalar_WhenSelectNoRows_ReturnNull()
204204
.ExecuteScalarAsync());
205205
}
206206

207+
[Fact]
208+
public async Task ImplicitSession_SimpleScalar_Works()
209+
{
210+
await using var connection = CreateConnection();
211+
connection.ConnectionString += ";EnableImplicitSession=true";
212+
await connection.OpenAsync();
213+
214+
var cmd = connection.CreateCommand();
215+
cmd.CommandText = "SELECT 40 + 2;";
216+
var scalar = await cmd.ExecuteScalarAsync();
217+
Assert.Equal(42, Convert.ToInt32(scalar));
218+
}
219+
220+
[Fact]
221+
public async Task ImplicitSession_RepeatedScalars_WorksManyTimes()
222+
{
223+
await using var connection = CreateConnection();
224+
connection.ConnectionString += ";EnableImplicitSession=true";
225+
await connection.OpenAsync();
226+
227+
for (var i = 0; i < 30; i++)
228+
{
229+
var cmd = connection.CreateCommand();
230+
cmd.CommandText = $"SELECT {i};";
231+
var scalar = await cmd.ExecuteScalarAsync();
232+
Assert.Equal(i, Convert.ToInt32(scalar));
233+
}
234+
}
235+
236+
[Fact]
237+
public void ImplicitSession_ConcurrentCommand_IsStillBlockedByBusyCheck()
238+
{
239+
using var connection = CreateConnection();
240+
connection.ConnectionString += ";EnableImplicitSession=true";
241+
connection.Open();
242+
243+
var cmd = connection.CreateCommand();
244+
cmd.CommandText = "SELECT 1; SELECT 1;";
245+
using var reader = cmd.ExecuteReader();
246+
247+
var ex = Assert.Throws<YdbOperationInProgressException>(() => cmd.ExecuteReader());
248+
Assert.Equal("A command is already in progress: SELECT 1; SELECT 1;", ex.Message);
249+
}
250+
251+
[Fact]
252+
public async Task ImplicitSession_WithExplicitTransaction_UsesExplicitSessionAndCommits()
253+
{
254+
var table = $"Implicit_{Guid.NewGuid():N}";
255+
256+
await using var connection = CreateConnection();
257+
connection.ConnectionString += ";EnableImplicitSession=true";
258+
await connection.OpenAsync();
259+
260+
try
261+
{
262+
await using (var create = connection.CreateCommand())
263+
{
264+
create.CommandText = $"""
265+
CREATE TABLE {table} (
266+
Id Int32,
267+
Name Text,
268+
PRIMARY KEY (Id)
269+
)
270+
""";
271+
await create.ExecuteNonQueryAsync();
272+
}
273+
274+
var tx = connection.BeginTransaction();
275+
await using (var insert = connection.CreateCommand())
276+
{
277+
insert.Transaction = tx;
278+
insert.CommandText = $"INSERT INTO {table} (Id, Name) VALUES (1, 'A');";
279+
await insert.ExecuteNonQueryAsync();
280+
insert.CommandText = $"INSERT INTO {table} (Id, Name) VALUES (2, 'B');";
281+
await insert.ExecuteNonQueryAsync();
282+
}
283+
284+
await tx.CommitAsync();
285+
286+
await using (var check = connection.CreateCommand())
287+
{
288+
check.CommandText = $"SELECT COUNT(*) FROM {table};";
289+
var count = Convert.ToInt32(await check.ExecuteScalarAsync());
290+
Assert.Equal(2, count);
291+
}
292+
}
293+
finally
294+
{
295+
await using var drop = connection.CreateCommand();
296+
drop.CommandText = $"DROP TABLE {table}";
297+
await drop.ExecuteNonQueryAsync();
298+
}
299+
}
300+
301+
[Fact]
302+
public async Task ImplicitSession_Cancellation_AfterFirstResult_StillReturnsFirst()
303+
{
304+
await using var connection = CreateConnection();
305+
connection.ConnectionString += ";EnableImplicitSession=true";
306+
await connection.OpenAsync();
307+
308+
var cmd = new YdbCommand(connection) { CommandText = "SELECT 1; SELECT 1;" };
309+
using var cts = new CancellationTokenSource();
310+
311+
var reader = await cmd.ExecuteReaderAsync(cts.Token);
312+
313+
await reader.ReadAsync(cts.Token);
314+
Assert.Equal(1, reader.GetValue(0));
315+
Assert.True(await reader.NextResultAsync(cts.Token));
316+
317+
await cts.CancelAsync();
318+
319+
await reader.ReadAsync(cts.Token);
320+
Assert.Equal(1, reader.GetValue(0));
321+
Assert.False(await reader.NextResultAsync());
322+
}
207323

208324
public class Data<T>(DbType dbType, T expected, bool isNullable = false)
209325
{

src/Ydb.Sdk/test/Ydb.Sdk.Ado.Tests/YdbConnectionStringBuilderTests.cs

Lines changed: 5 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -30,6 +30,7 @@ public void InitDefaultValues_WhenEmptyConstructorInvoke_ReturnDefaultConnection
3030
Assert.False(ydbConnectionStringBuilder.DisableDiscovery);
3131
Assert.False(ydbConnectionStringBuilder.DisableServerBalancer);
3232
Assert.False(ydbConnectionStringBuilder.UseTls);
33+
Assert.False(ydbConnectionStringBuilder.EnableImplicitSession);
3334

3435
Assert.Equal("UseTls=False;Host=localhost;Port=2136;Database=/local;User=;Password=;ConnectTimeout=5;" +
3536
"KeepAlivePingDelay=10;KeepAlivePingTimeout=10;EnableMultipleHttp2Connections=False;" +
@@ -54,7 +55,7 @@ public void InitConnectionStringBuilder_WhenExpectedKeys_ReturnUpdatedConnection
5455
"Host=server;Port=2135;Database=/my/path;User=Kirill;UseTls=true;MinSessionPool=10;MaxSessionPool=50;" +
5556
"CreateSessionTimeout=30;SessionIdleTimeout=600;ConnectTimeout=30;KeepAlivePingDelay=30;" +
5657
"KeepAlivePingTimeout=60;EnableMultipleHttp2Connections=true;MaxSendMessageSize=1000000;" +
57-
"MaxReceiveMessageSize=1000000;DisableDiscovery=true;DisableServerBalancer=true;"
58+
"MaxReceiveMessageSize=1000000;DisableDiscovery=true;DisableServerBalancer=true;EnableImplicitSession=true;"
5859
);
5960

6061
Assert.Equal(2135, ydbConnectionStringBuilder.Port);
@@ -78,9 +79,11 @@ public void InitConnectionStringBuilder_WhenExpectedKeys_ReturnUpdatedConnection
7879
"ConnectTimeout=30;KeepAlivePingDelay=30;KeepAlivePingTimeout=60;" +
7980
"EnableMultipleHttp2Connections=True;" +
8081
"MaxSendMessageSize=1000000;MaxReceiveMessageSize=1000000;" +
81-
"DisableDiscovery=True;DisableServerBalancer=True", ydbConnectionStringBuilder.ConnectionString);
82+
"DisableDiscovery=True;DisableServerBalancer=True;EnableImplicitSession=True",
83+
ydbConnectionStringBuilder.ConnectionString);
8284
Assert.True(ydbConnectionStringBuilder.DisableDiscovery);
8385
Assert.True(ydbConnectionStringBuilder.DisableServerBalancer);
86+
Assert.True(ydbConnectionStringBuilder.EnableImplicitSession);
8487
Assert.Equal("UseTls=True;Host=server;Port=2135;Database=/my/path;User=Kirill;Password=;ConnectTimeout=30;" +
8588
"KeepAlivePingDelay=30;KeepAlivePingTimeout=60;EnableMultipleHttp2Connections=True;" +
8689
"MaxSendMessageSize=1000000;MaxReceiveMessageSize=1000000;DisableDiscovery=True",

0 commit comments

Comments
 (0)