Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions src/Ydb.Sdk/CHANGELOG.md
Original file line number Diff line number Diff line change
@@ -1,3 +1,6 @@
- ADO.NET: Added `MinPoolSize` setting to keep a minimum number of sessions ready in the PoolingSessionSource.
- ADO.NET: Added `SessionIdleTimeout` to remove idle sessions from the PoolingSessionSource automatically.
- ADO.NET: Made `PoolingSessionSource` faster and more reliable by using a lock-free FIFO stack.
- ADO.NET: Added `BeginBulkUpsertImport` for batch upsert operations with transaction checks and integration tests.
- Optimization: On BadSession, do not invoke the `DeleteSession()` method.
- Canceling AttachStream after calling the `DeleteSession` method.
Expand Down
19 changes: 6 additions & 13 deletions src/Ydb.Sdk/src/Ado/PoolManager.cs
Original file line number Diff line number Diff line change
@@ -1,14 +1,12 @@
using System.Collections.Concurrent;
using Ydb.Sdk.Ado.Session;
using Ydb.Sdk.Pool;
using Ydb.Sdk.Services.Query;

namespace Ydb.Sdk.Ado;

internal static class PoolManager
{
private static readonly SemaphoreSlim SemaphoreSlim = new(1); // async mutex
private static readonly ConcurrentDictionary<string, SessionPool> Pools = new();
private static readonly ConcurrentDictionary<string, ISessionSource> Pools = new();

internal static async Task<ISession> GetSession(
YdbConnectionStringBuilder settings,
Expand All @@ -17,7 +15,7 @@ CancellationToken cancellationToken
{
if (Pools.TryGetValue(settings.ConnectionString, out var sessionPool))
{
return await sessionPool.GetSession(cancellationToken);
return await sessionPool.OpenSession(cancellationToken);
}

try
Expand All @@ -26,21 +24,16 @@ CancellationToken cancellationToken

if (Pools.TryGetValue(settings.ConnectionString, out var pool))
{
return await pool.GetSession(cancellationToken);
return await pool.OpenSession(cancellationToken);
}

var newSessionPool = new SessionPool(
await settings.BuildDriver(),
new SessionPoolConfig(
MaxSessionPool: settings.MaxSessionPool,
CreateSessionTimeout: settings.CreateSessionTimeout,
DisposeDriver: true
)
var newSessionPool = new PoolingSessionSource<PoolingSession>(
await PoolingSessionFactory.Create(settings), settings
);

Pools[settings.ConnectionString] = newSessionPool;

return await newSessionPool.GetSession(cancellationToken);
return await newSessionPool.OpenSession(cancellationToken);
}
finally
{
Expand Down
2 changes: 1 addition & 1 deletion src/Ydb.Sdk/src/Ado/Session/PoolingSession.cs
Original file line number Diff line number Diff line change
Expand Up @@ -124,7 +124,7 @@ internal override async Task Open(CancellationToken cancellationToken)
throw YdbException.FromServer(response.Status, response.Issues);
}

TaskCompletionSource completeTask = new();
TaskCompletionSource completeTask = new(TaskCreationOptions.RunContinuationsAsynchronously);

SessionId = response.SessionId;
NodeId = response.NodeId;
Expand Down
18 changes: 6 additions & 12 deletions src/Ydb.Sdk/test/Ydb.Sdk.Ado.Tests/YdbConnectionTests.cs
Original file line number Diff line number Diff line change
Expand Up @@ -192,7 +192,6 @@ public async Task DisableDiscovery_WhenPropertyIsTrue_SimpleWorking()
connection.ConnectionString += ";DisableDiscovery=true";
await connection.OpenAsync();
Assert.True((bool)(await new YdbCommand(connection) { CommandText = "SELECT TRUE;" }.ExecuteScalarAsync())!);
await YdbConnection.ClearPool(connection);
}

[Fact]
Expand Down Expand Up @@ -307,16 +306,12 @@ private List<Task> GenerateTasks(string connectionString) => Enumerable.Range(0,
Interlocked.Add(ref _counter, scalar);
}).ToList();

protected override async Task OnDisposeAsync() =>
await YdbConnection.ClearPool(new YdbConnection(_connectionStringTls));

[Fact]
public async Task BulkUpsertImporter_HappyPath_Add_Flush()
{
var tableName = $"BulkImporter_{Guid.NewGuid():N}";

var conn = new YdbConnection(_connectionStringTls);
await conn.OpenAsync();
await using var conn = await CreateOpenConnectionAsync();
try
{
await using (var createCmd = conn.CreateCommand())
Expand Down Expand Up @@ -375,8 +370,7 @@ PRIMARY KEY (Id)
public async Task BulkUpsertImporter_ThrowsOnInvalidRowCount()
{
var tableName = $"BulkImporter_{Guid.NewGuid():N}";
var conn = new YdbConnection(_connectionStringTls);
await conn.OpenAsync();
await using var conn = await CreateOpenConnectionAsync();
try
{
await using (var createCmd = conn.CreateCommand())
Expand Down Expand Up @@ -418,8 +412,7 @@ public async Task BulkUpsertImporter_MultipleImporters_Parallel()
var table1 = $"BulkImporter_{Guid.NewGuid():N}_1";
var table2 = $"BulkImporter_{Guid.NewGuid():N}_2";

var conn = new YdbConnection(_connectionStringTls);
await conn.OpenAsync();
var conn = await CreateOpenConnectionAsync();
try
{
foreach (var table in new[] { table1, table2 })
Expand Down Expand Up @@ -474,15 +467,16 @@ await Task.WhenAll(
dropCmd.CommandText = $"DROP TABLE {table}";
await dropCmd.ExecuteNonQueryAsync();
}

await conn.DisposeAsync();
}
}

[Fact]
public async Task BulkUpsertImporter_ThrowsOnNonexistentTable()
{
var tableName = $"Nonexistent_{Guid.NewGuid():N}";
var conn = new YdbConnection(_connectionStringTls);
await conn.OpenAsync();
await using var conn = await CreateOpenConnectionAsync();

var columns = new[] { "Id", "Name" };

Expand Down
Loading