Skip to content
Merged
Show file tree
Hide file tree
Changes from 3 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions src/Ydb.Sdk/CHANGELOG.md
Original file line number Diff line number Diff line change
@@ -1,3 +1,6 @@
- Optimization: On BadSession, do not invoke the `DeleteSession()` method.
- Canceling AttachStream after calling the `DeleteSession` method.
- Fixed bug: fixed issue where session was not deleted (`ClientTransportTimeout`).
- Fixed bug: Grpc.Core.StatusCode.Cancelled was mapped to server's Canceled status.
- ADO.NET: PoolingSessionSource 2.0 based on Npgsql pooling algorithm.
- Added new ADO.NET options:
Expand Down
3 changes: 2 additions & 1 deletion src/Ydb.Sdk/src/Ado/PoolManager.cs
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
using System.Collections.Concurrent;
using Ydb.Sdk.Ado.Session;
using Ydb.Sdk.Pool;
using Ydb.Sdk.Services.Query;

Expand All @@ -9,7 +10,7 @@ internal static class PoolManager
private static readonly SemaphoreSlim SemaphoreSlim = new(1); // async mutex
private static readonly ConcurrentDictionary<string, SessionPool> Pools = new();

internal static async Task<Services.Query.Session> GetSession(
internal static async Task<ISession> GetSession(
YdbConnectionStringBuilder connectionString,
CancellationToken cancellationToken
)
Expand Down
2 changes: 2 additions & 0 deletions src/Ydb.Sdk/src/Ado/Session/ISession.cs
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,8 @@ namespace Ydb.Sdk.Ado.Session;

internal interface ISession
{
IDriver Driver { get; }

bool IsBroken { get; }

ValueTask<IServerStream<ExecuteQueryResponsePart>> ExecuteQuery(
Expand Down
7 changes: 3 additions & 4 deletions src/Ydb.Sdk/src/Ado/Session/ImplicitSession.cs
Original file line number Diff line number Diff line change
Expand Up @@ -6,13 +6,12 @@ namespace Ydb.Sdk.Ado.Session;

internal class ImplicitSession : ISession
{
private readonly IDriver _driver;

public ImplicitSession(IDriver driver)
{
_driver = driver;
Driver = driver;
}

public IDriver Driver { get; }
public bool IsBroken => false;

public ValueTask<IServerStream<ExecuteQueryResponsePart>> ExecuteQuery(
Expand All @@ -36,7 +35,7 @@ public ValueTask<IServerStream<ExecuteQueryResponsePart>> ExecuteQuery(
};
request.Parameters.Add(parameters.ToDictionary(p => p.Key, p => p.Value.GetProto()));

return _driver.ServerStreamCall(QueryService.ExecuteQueryMethod, request, settings);
return Driver.ServerStreamCall(QueryService.ExecuteQueryMethod, request, settings);
}

public Task CommitTransaction(string txId, CancellationToken cancellationToken = default) =>
Expand Down
32 changes: 20 additions & 12 deletions src/Ydb.Sdk/src/Ado/Session/PoolingSession.cs
Original file line number Diff line number Diff line change
Expand Up @@ -15,17 +15,19 @@
private static readonly TimeSpan DeleteSessionTimeout = TimeSpan.FromSeconds(5);
private static readonly CreateSessionRequest CreateSessionRequest = new();

private readonly IDriver _driver;
private readonly PoolingSessionSource _poolingSessionSource;
private readonly ILogger<PoolingSession> _logger;
private readonly CancellationTokenSource _attachStreamLifecycleCts = new();

private volatile bool _isBroken = true;
private volatile bool _isBadSession = false;

Check warning on line 23 in src/Ydb.Sdk/src/Ado/Session/PoolingSession.cs

View workflow job for this annotation

GitHub Actions / Inspection (./src/YdbSdk.sln)

"[RedundantDefaultMemberInitializer] Initializing field by default value is redundant" on /home/runner/work/ydb-dotnet-sdk/ydb-dotnet-sdk/src/Ydb.Sdk/src/Ado/Session/PoolingSession.cs(23,41)

private readonly bool _disableServerBalancer;

private string SessionId { get; set; } = string.Empty;
private long NodeId { get; set; }

public IDriver Driver { get; }
public bool IsBroken => _isBroken;

internal PoolingSession(
Expand All @@ -35,10 +37,10 @@
ILogger<PoolingSession> logger
)
{
_driver = driver;
_poolingSessionSource = poolingSessionSource;
_disableServerBalancer = disableServerBalancer;
_logger = logger;
Driver = driver;
}

public ValueTask<IServerStream<ExecuteQueryResponsePart>> ExecuteQuery(
Expand All @@ -60,15 +62,15 @@
};
request.Parameters.Add(parameters.ToDictionary(p => p.Key, p => p.Value.GetProto()));

return _driver.ServerStreamCall(QueryService.ExecuteQueryMethod, request, settings);
return Driver.ServerStreamCall(QueryService.ExecuteQueryMethod, request, settings);
}

public async Task CommitTransaction(
string txId,
CancellationToken cancellationToken = default
)
{
var response = await _driver.UnaryCall(
var response = await Driver.UnaryCall(
QueryService.CommitTransactionMethod,
new CommitTransactionRequest { SessionId = SessionId, TxId = txId },
new GrpcRequestSettings { CancellationToken = cancellationToken, NodeId = NodeId }
Expand All @@ -85,7 +87,7 @@
CancellationToken cancellationToken = default
)
{
var response = await _driver.UnaryCall(
var response = await Driver.UnaryCall(
QueryService.RollbackTransactionMethod,
new RollbackTransactionRequest { SessionId = SessionId, TxId = txId },
new GrpcRequestSettings { CancellationToken = cancellationToken, NodeId = NodeId }
Expand All @@ -99,6 +101,8 @@

public void OnNotSuccessStatusCode(StatusCode statusCode)
{
_isBadSession = _isBadSession || statusCode is StatusCode.BadSession;

if (statusCode is
StatusCode.BadSession or
StatusCode.SessionBusy or
Expand All @@ -121,7 +125,7 @@
requestSettings.ClientCapabilities.Add(SessionBalancer);
}

var response = await _driver.UnaryCall(QueryService.CreateSessionMethod, CreateSessionRequest, requestSettings);
var response = await Driver.UnaryCall(QueryService.CreateSessionMethod, CreateSessionRequest, requestSettings);

if (response.Status.IsNotSuccess())
{
Expand All @@ -138,7 +142,7 @@
{
try
{
using var stream = await _driver.ServerStreamCall(
using var stream = await Driver.ServerStreamCall(
QueryService.AttachSessionMethod,
new AttachSessionRequest { SessionId = SessionId },
new GrpcRequestSettings { NodeId = NodeId }
Expand All @@ -161,10 +165,12 @@

completeTask.SetResult();

var lifecycleAttachToken = _attachStreamLifecycleCts.Token;

try
{
// ReSharper disable once MethodSupportsCancellation
while (await stream.MoveNextAsync())
while (await stream.MoveNextAsync(lifecycleAttachToken))
{
var sessionState = stream.Current;

Expand Down Expand Up @@ -215,14 +221,16 @@
{
try
{
if (_isBroken)
_isBroken = true;
_attachStreamLifecycleCts.CancelAfter(DeleteSessionTimeout);

if (_isBadSession)
{
return;
}

_isBroken = true;

var deleteSessionResponse = await _driver.UnaryCall(
_isBadSession = true;
var deleteSessionResponse = await Driver.UnaryCall(
QueryService.DeleteSessionMethod,
new DeleteSessionRequest { SessionId = SessionId },
new GrpcRequestSettings { TransportTimeout = DeleteSessionTimeout, NodeId = NodeId }
Expand Down
5 changes: 0 additions & 5 deletions src/Ydb.Sdk/src/Ado/Session/PoolingSessionSource.cs
Original file line number Diff line number Diff line change
@@ -1,8 +1,3 @@
// This file contains session pooling algorithms adapted from Npgsql
// Original source: https://github.com/npgsql/npgsql
// Copyright (c) 2002-2025, Npgsql
// Licence https://github.com/npgsql/npgsql?tab=PostgreSQL-1-ov-file

using System.Diagnostics.CodeAnalysis;
using System.Runtime.CompilerServices;
using System.Threading.Channels;
Expand Down
12 changes: 0 additions & 12 deletions src/Ydb.Sdk/src/Ado/YdbAdoDefaultSettings.cs

This file was deleted.

5 changes: 2 additions & 3 deletions src/Ydb.Sdk/src/Ado/YdbCommand.cs
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,6 @@
using System.Diagnostics.CodeAnalysis;
using System.Text;
using Ydb.Sdk.Ado.Internal;
using Ydb.Sdk.Services.Query;
using Ydb.Sdk.Value;

namespace Ydb.Sdk.Ado;
Expand Down Expand Up @@ -206,8 +205,8 @@ protected override async Task<DbDataReader> ExecuteDbDataReaderAsync(CommandBeha
preparedSql.Append(sql);

var execSettings = CommandTimeout > 0
? new ExecuteQuerySettings { TransportTimeout = TimeSpan.FromSeconds(CommandTimeout) }
: new ExecuteQuerySettings();
? new GrpcRequestSettings { TransportTimeout = TimeSpan.FromSeconds(CommandTimeout) }
: new GrpcRequestSettings();

var transaction = YdbConnection.CurrentTransaction;

Expand Down
9 changes: 5 additions & 4 deletions src/Ydb.Sdk/src/Ado/YdbConnection.cs
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
using System.Data;
using System.Data.Common;
using System.Diagnostics.CodeAnalysis;
using Ydb.Sdk.Ado.Session;
using Ydb.Sdk.Services.Query;
using static System.Data.IsolationLevel;

Expand All @@ -24,7 +25,7 @@ private YdbConnectionStringBuilder ConnectionStringBuilder
[param: AllowNull] init => _connectionStringBuilder = value;
}

internal Services.Query.Session Session
internal ISession Session
{
get
{
Expand All @@ -35,7 +36,7 @@ internal Services.Query.Session Session
private set => _session = value;
}

private Services.Query.Session _session = null!;
private ISession _session = null!;

public YdbConnection()
{
Expand Down Expand Up @@ -124,7 +125,7 @@ public override async Task CloseAsync()
}
finally
{
await _session.Release();
_session.Close();
}
}

Expand Down Expand Up @@ -152,7 +153,7 @@ internal void OnNotSuccessStatusCode(StatusCode code)
{
_session.OnNotSuccessStatusCode(code);

if (!_session.IsActive)
if (_session.IsBroken)
{
ConnectionState = ConnectionState.Broken;
}
Expand Down
8 changes: 4 additions & 4 deletions src/Ydb.Sdk/src/Ado/YdbConnectionStringBuilder.cs
Original file line number Diff line number Diff line change
Expand Up @@ -25,15 +25,15 @@ public YdbConnectionStringBuilder(string connectionString)
// Init default connection string
private void InitDefaultValues()
{
_host = YdbAdoDefaultSettings.Host;
_port = YdbAdoDefaultSettings.Port;
_database = YdbAdoDefaultSettings.Database;
_host = "localhost";
_port = 2136;
_database = "/local";
_minSessionPool = 0;
_maxSessionPool = SessionPoolDefaultSettings.MaxSessionPool;
_createSessionTimeout = SessionPoolDefaultSettings.CreateSessionTimeoutSeconds;
_sessionIdleTimeout = 300;
_sessionPruningInterval = 10;
_useTls = YdbAdoDefaultSettings.UseTls;
_useTls = false;
_connectTimeout = GrpcDefaultSettings.ConnectTimeoutSeconds;
_keepAlivePingDelay = GrpcDefaultSettings.KeepAlivePingSeconds;
_keepAlivePingTimeout = GrpcDefaultSettings.KeepAlivePingTimeoutSeconds;
Expand Down
60 changes: 35 additions & 25 deletions src/Ydb.Sdk/src/Ado/YdbSchema.cs
Original file line number Diff line number Diff line change
Expand Up @@ -3,8 +3,11 @@
using System.Globalization;
using Ydb.Scheme;
using Ydb.Scheme.V1;
using Ydb.Sdk.Ado.Internal;
using Ydb.Sdk.Ado.Schema;
using Ydb.Sdk.Services.Table;
using Ydb.Table;
using Ydb.Table.V1;

namespace Ydb.Sdk.Ado;

Expand Down Expand Up @@ -59,10 +62,28 @@ public static async Task<YdbTable> DescribeTable(
{
try
{
var describeResponse = await ydbConnection.Session
.DescribeTable(WithSuffix(ydbConnection.Database) + tableName, describeTableSettings);
describeTableSettings ??= new DescribeTableSettings();

return new YdbTable(tableName, describeResponse);
var describeResponse = await ydbConnection.Session.Driver.UnaryCall(
TableService.DescribeTableMethod,
new DescribeTableRequest
{
Path = WithSuffix(ydbConnection.Database) + tableName,
IncludeTableStats = describeTableSettings.IncludeTableStats,
IncludePartitionStats = describeTableSettings.IncludePartitionStats,
IncludeShardKeyBounds = describeTableSettings.IncludeShardKeyBounds
},
describeTableSettings
);

if (describeResponse.Operation.Status.IsNotSuccess())
{
throw YdbException.FromServer(describeResponse.Operation.Status, describeResponse.Operation.Issues);
}

var describeResult = describeResponse.Operation.Result.Unpack<DescribeTableResult>();

return new YdbTable(tableName, describeResult);
}
catch (YdbException e)
{
Expand Down Expand Up @@ -241,31 +262,20 @@ private static async Task AppendDescribeTable(
string? tableType,
Action<YdbTable, string> appendInTable)
{
try
{
var describeResponse = await ydbConnection.Session
.DescribeTable(WithSuffix(ydbConnection.Database) + tableName, describeTableSettings);
var ydbTable = new YdbTable(tableName, describeResponse);
var ydbTable = await DescribeTable(ydbConnection, tableName, describeTableSettings);

var type = ydbTable.IsSystem
? "SYSTEM_TABLE"
: ydbTable.Type switch
{
YdbTable.TableType.Table => "TABLE",
YdbTable.TableType.ColumnTable => "COLUMN_TABLE",
YdbTable.TableType.ExternalTable => "EXTERNAL_TABLE",
_ => throw new ArgumentOutOfRangeException(nameof(tableType))
};
if (type.IsPattern(tableType))
var type = ydbTable.IsSystem
? "SYSTEM_TABLE"
: ydbTable.Type switch
{
appendInTable(ydbTable, type);
}
}
catch (YdbException e)
YdbTable.TableType.Table => "TABLE",
YdbTable.TableType.ColumnTable => "COLUMN_TABLE",
YdbTable.TableType.ExternalTable => "EXTERNAL_TABLE",
_ => throw new ArgumentOutOfRangeException(nameof(tableType))
};
if (type.IsPattern(tableType))
{
ydbConnection.OnNotSuccessStatusCode(e.Code);

throw;
appendInTable(ydbTable, type);
}
}

Expand Down
Loading
Loading