Skip to content

Commit 78b7352

Browse files
feat: switch on PoolingSessionSource.cs (#492)
1 parent 08e82c5 commit 78b7352

File tree

4 files changed

+16
-26
lines changed

4 files changed

+16
-26
lines changed

src/Ydb.Sdk/CHANGELOG.md

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,3 +1,6 @@
1+
- ADO.NET: Added `MinPoolSize` setting to keep a minimum number of sessions ready in the PoolingSessionSource.
2+
- ADO.NET: Added `SessionIdleTimeout` to remove idle sessions from the PoolingSessionSource automatically.
3+
- ADO.NET: Made `PoolingSessionSource` faster and more reliable by using a lock-free FIFO stack.
14
- ADO.NET: Added `BeginBulkUpsertImport` for batch upsert operations with transaction checks and integration tests.
25
- Optimization: On BadSession, do not invoke the `DeleteSession()` method.
36
- Canceling AttachStream after calling the `DeleteSession` method.

src/Ydb.Sdk/src/Ado/PoolManager.cs

Lines changed: 6 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -1,14 +1,12 @@
11
using System.Collections.Concurrent;
22
using Ydb.Sdk.Ado.Session;
3-
using Ydb.Sdk.Pool;
4-
using Ydb.Sdk.Services.Query;
53

64
namespace Ydb.Sdk.Ado;
75

86
internal static class PoolManager
97
{
108
private static readonly SemaphoreSlim SemaphoreSlim = new(1); // async mutex
11-
private static readonly ConcurrentDictionary<string, SessionPool> Pools = new();
9+
private static readonly ConcurrentDictionary<string, ISessionSource> Pools = new();
1210

1311
internal static async Task<ISession> GetSession(
1412
YdbConnectionStringBuilder settings,
@@ -17,7 +15,7 @@ CancellationToken cancellationToken
1715
{
1816
if (Pools.TryGetValue(settings.ConnectionString, out var sessionPool))
1917
{
20-
return await sessionPool.GetSession(cancellationToken);
18+
return await sessionPool.OpenSession(cancellationToken);
2119
}
2220

2321
try
@@ -26,21 +24,16 @@ CancellationToken cancellationToken
2624

2725
if (Pools.TryGetValue(settings.ConnectionString, out var pool))
2826
{
29-
return await pool.GetSession(cancellationToken);
27+
return await pool.OpenSession(cancellationToken);
3028
}
3129

32-
var newSessionPool = new SessionPool(
33-
await settings.BuildDriver(),
34-
new SessionPoolConfig(
35-
MaxSessionPool: settings.MaxSessionPool,
36-
CreateSessionTimeout: settings.CreateSessionTimeout,
37-
DisposeDriver: true
38-
)
30+
var newSessionPool = new PoolingSessionSource<PoolingSession>(
31+
await PoolingSessionFactory.Create(settings), settings
3932
);
4033

4134
Pools[settings.ConnectionString] = newSessionPool;
4235

43-
return await newSessionPool.GetSession(cancellationToken);
36+
return await newSessionPool.OpenSession(cancellationToken);
4437
}
4538
finally
4639
{

src/Ydb.Sdk/src/Ado/Session/PoolingSession.cs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -124,7 +124,7 @@ internal override async Task Open(CancellationToken cancellationToken)
124124
throw YdbException.FromServer(response.Status, response.Issues);
125125
}
126126

127-
TaskCompletionSource completeTask = new();
127+
TaskCompletionSource completeTask = new(TaskCreationOptions.RunContinuationsAsynchronously);
128128

129129
SessionId = response.SessionId;
130130
NodeId = response.NodeId;

src/Ydb.Sdk/test/Ydb.Sdk.Ado.Tests/YdbConnectionTests.cs

Lines changed: 6 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -192,7 +192,6 @@ public async Task DisableDiscovery_WhenPropertyIsTrue_SimpleWorking()
192192
connection.ConnectionString += ";DisableDiscovery=true";
193193
await connection.OpenAsync();
194194
Assert.True((bool)(await new YdbCommand(connection) { CommandText = "SELECT TRUE;" }.ExecuteScalarAsync())!);
195-
await YdbConnection.ClearPool(connection);
196195
}
197196

198197
[Fact]
@@ -307,16 +306,12 @@ private List<Task> GenerateTasks(string connectionString) => Enumerable.Range(0,
307306
Interlocked.Add(ref _counter, scalar);
308307
}).ToList();
309308

310-
protected override async Task OnDisposeAsync() =>
311-
await YdbConnection.ClearPool(new YdbConnection(_connectionStringTls));
312-
313309
[Fact]
314310
public async Task BulkUpsertImporter_HappyPath_Add_Flush()
315311
{
316312
var tableName = $"BulkImporter_{Guid.NewGuid():N}";
317313

318-
var conn = new YdbConnection(_connectionStringTls);
319-
await conn.OpenAsync();
314+
await using var conn = await CreateOpenConnectionAsync();
320315
try
321316
{
322317
await using (var createCmd = conn.CreateCommand())
@@ -375,8 +370,7 @@ PRIMARY KEY (Id)
375370
public async Task BulkUpsertImporter_ThrowsOnInvalidRowCount()
376371
{
377372
var tableName = $"BulkImporter_{Guid.NewGuid():N}";
378-
var conn = new YdbConnection(_connectionStringTls);
379-
await conn.OpenAsync();
373+
await using var conn = await CreateOpenConnectionAsync();
380374
try
381375
{
382376
await using (var createCmd = conn.CreateCommand())
@@ -418,8 +412,7 @@ public async Task BulkUpsertImporter_MultipleImporters_Parallel()
418412
var table1 = $"BulkImporter_{Guid.NewGuid():N}_1";
419413
var table2 = $"BulkImporter_{Guid.NewGuid():N}_2";
420414

421-
var conn = new YdbConnection(_connectionStringTls);
422-
await conn.OpenAsync();
415+
var conn = await CreateOpenConnectionAsync();
423416
try
424417
{
425418
foreach (var table in new[] { table1, table2 })
@@ -474,15 +467,16 @@ await Task.WhenAll(
474467
dropCmd.CommandText = $"DROP TABLE {table}";
475468
await dropCmd.ExecuteNonQueryAsync();
476469
}
470+
471+
await conn.DisposeAsync();
477472
}
478473
}
479474

480475
[Fact]
481476
public async Task BulkUpsertImporter_ThrowsOnNonexistentTable()
482477
{
483478
var tableName = $"Nonexistent_{Guid.NewGuid():N}";
484-
var conn = new YdbConnection(_connectionStringTls);
485-
await conn.OpenAsync();
479+
await using var conn = await CreateOpenConnectionAsync();
486480

487481
var columns = new[] { "Id", "Name" };
488482

0 commit comments

Comments
 (0)