Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
31 commits
Select commit Hold shift + click to select a range
d904ed8
Add: hint "session-balancer"
LiamHamsters Jul 3, 2025
b757bf4
Added DisableServerBalancer: support for server-side load balancing o…
LiamHamsters Jul 3, 2025
eca3da5
Added server balancer disabling check in Query/SessionPool
LiamHamsters Jul 4, 2025
e63dc1c
Removed server balancer flag from driver config and renamed session list
LiamHamsters Jul 4, 2025
81c68a8
Fixed initialization of SessionPoolConfig
LiamHamsters Jul 4, 2025
6ca42e8
Update CHANGELOG.md
LiamHamsters Jul 4, 2025
9f29644
Changed field type in SessionPool config
LiamHamsters Jul 7, 2025
c9c2bae
Fix warning "Inspection (./src/YdbSdk.sln)"
LiamHamsters Jul 7, 2025
3bf1220
finally fixes
LiamHamsters Jul 7, 2025
52891c0
Final fix
LiamHamsters Jul 7, 2025
d4dda32
Add bulk upsert
LiamHamsters Jul 23, 2025
c07b88d
dev SLO: Dapper over ADO.NET (#466)
KirillKurdyukov Jul 7, 2025
b426020
PID header added (#464)
Travonka Jul 7, 2025
b94b27b
feat: supported Guid (Uuid YDB type) (#465)
KirillKurdyukov Jul 8, 2025
6013dee
dev: prepare repository for benchmarks testing (#467)
KirillKurdyukov Jul 10, 2025
4768078
fixed unhandled HttpIOException & delete legacy exceptions (#474)
KirillKurdyukov Jul 14, 2025
dcb95c4
dev: Ydb.Sdk.Ado.Benchmarks & prepared support for new pooling sessio…
KirillKurdyukov Jul 15, 2025
d6f9956
dev: MinSessionPool, SessionIdleTimeout, SessionPruningInterval (#476)
KirillKurdyukov Jul 16, 2025
17ad798
feat: PoolingSessionSource 2.0 based on Npgsql pooling algorithm (#477)
KirillKurdyukov Jul 17, 2025
891c408
Fixed bug: Grpc.Core.StatusCode.Cancelled was mapped to server's Canc…
KirillKurdyukov Jul 17, 2025
1e23c76
dev: Added PoolingSessionTests.cs (#479)
KirillKurdyukov Jul 18, 2025
1241e8f
dev: Ydb.Sdk.Ado.Stress.Loader (#480)
KirillKurdyukov Jul 21, 2025
ae57645
dev: Prepare ISession for switching implementation (#481)
KirillKurdyukov Jul 22, 2025
b574624
Add: hint "session-balancer"
LiamHamsters Jul 3, 2025
270c446
Added server balancer disabling check in Query/SessionPool
LiamHamsters Jul 4, 2025
c2a9a81
Removed server balancer flag from driver config and renamed session list
LiamHamsters Jul 4, 2025
9b007fa
Fixed initialization of SessionPoolConfig
LiamHamsters Jul 4, 2025
7597068
Fix warning "Inspection (./src/YdbSdk.sln)"
LiamHamsters Jul 7, 2025
0214e9a
Hot fix
LiamHamsters Jul 23, 2025
42e0018
Hot fix
LiamHamsters Jul 23, 2025
5740614
fix conflict
LiamHamsters Jul 23, 2025
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 7 additions & 0 deletions src/Ydb.Sdk/src/Ado/BulkUpsert/BulkUpsertOptions.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,7 @@
namespace Ydb.Sdk.Ado.BulkUpsert
{
public sealed record BulkUpsertOptions(
TimeSpan? Timeout = null,
CancellationToken CancellationToken = default
);
}
80 changes: 80 additions & 0 deletions src/Ydb.Sdk/src/Ado/BulkUpsert/TypedValueFactory.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,80 @@
using System.Reflection;
using Ydb.Sdk.Value;

namespace Ydb.Sdk.Ado.BulkUpsert;

internal static class TypedValueFactory
{
public static TypedValue FromObjects<T>(IReadOnlyCollection<T> rows)
{
if (rows.Count == 0)
throw new ArgumentException("Rows collection is empty.", nameof(rows));

var props = typeof(T).GetProperties(BindingFlags.Instance | BindingFlags.Public)
.Where(p => p.CanRead).ToArray();

var structs = new List<YdbValue>(rows.Count);

foreach (var row in rows)
{
var members = new Dictionary<string, YdbValue>(props.Length);
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Это можно сделать один раз до цикла

foreach (var p in props)
members[p.Name] = ToYdbValue(p.GetValue(row), p.PropertyType);

structs.Add(YdbValue.MakeStruct(members));
}

var list = YdbValue.MakeList(structs);
return list.GetProto();
}

private static YdbValue ToYdbValue(object? value, global::System.Type clr)
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

зачем писать global:: ?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

У него какая-то очень странная коллизия имён была, ну можно убрать, вдруг пропала

{
if (value is null) return MakeOptional(clr);

if (clr == typeof(bool)) return (YdbValue)(bool)value;
if (clr == typeof(sbyte)) return (YdbValue)(sbyte)value;
if (clr == typeof(short)) return (YdbValue)(short)value;
if (clr == typeof(int)) return (YdbValue)(int)value;
if (clr == typeof(long)) return (YdbValue)(long)value;
if (clr == typeof(byte)) return (YdbValue)(byte)value;
if (clr == typeof(ushort)) return (YdbValue)(ushort)value;
if (clr == typeof(uint)) return (YdbValue)(uint)value;
if (clr == typeof(ulong)) return (YdbValue)(ulong)value;
if (clr == typeof(float)) return (YdbValue)(float)value;
if (clr == typeof(double)) return (YdbValue)(double)value;
if (clr == typeof(decimal)) return (YdbValue)(decimal)value;
if (clr == typeof(DateTime)) return YdbValue.MakeTimestamp((DateTime)value);
if (clr == typeof(TimeSpan)) return (YdbValue)(TimeSpan)value;
if (clr == typeof(Guid)) return YdbValue.MakeUuid((Guid)value);
if (clr == typeof(string)) return YdbValue.MakeUtf8((string)value);
if (clr == typeof(byte[])) return YdbValue.MakeString((byte[])value);

throw new NotSupportedException($"Type '{clr.FullName}' is not supported.");
}

private static YdbValue MakeOptional(global::System.Type clr)
{
var t = Nullable.GetUnderlyingType(clr) ?? clr;

if (t == typeof(bool)) return YdbValue.MakeOptionalBool();
if (t == typeof(sbyte)) return YdbValue.MakeOptionalInt8();
if (t == typeof(short)) return YdbValue.MakeOptionalInt16();
if (t == typeof(int)) return YdbValue.MakeOptionalInt32();
if (t == typeof(long)) return YdbValue.MakeOptionalInt64();
if (t == typeof(byte)) return YdbValue.MakeOptionalUint8();
if (t == typeof(ushort)) return YdbValue.MakeOptionalUint16();
if (t == typeof(uint)) return YdbValue.MakeOptionalUint32();
if (t == typeof(ulong)) return YdbValue.MakeOptionalUint64();
if (t == typeof(float)) return YdbValue.MakeOptionalFloat();
if (t == typeof(double)) return YdbValue.MakeOptionalDouble();
if (t == typeof(decimal)) return YdbValue.MakeOptionalDecimal();
if (t == typeof(DateTime)) return YdbValue.MakeOptionalTimestamp();
if (t == typeof(TimeSpan)) return YdbValue.MakeOptionalInterval();
if (t == typeof(Guid)) return YdbValue.MakeOptionalUuid();
if (t == typeof(string)) return YdbValue.MakeOptionalUtf8();
if (t == typeof(byte[])) return YdbValue.MakeOptionalString();

throw new NotSupportedException($"Null for '{t.FullName}' is not supported.");
Comment on lines +56 to +78
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Это все написано в YdbParameter, давай попробуем переиспользовать существующий код

}
}
73 changes: 73 additions & 0 deletions src/Ydb.Sdk/src/Ado/BulkUpsert/YdbBulkUpsertImporter.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,73 @@
using System;
using System.Collections.Generic;
using System.Threading;
using System.Threading.Tasks;
using Ydb.Sdk.Ado.BulkUpsert;

namespace Ydb.Sdk.Ado

Check warning on line 7 in src/Ydb.Sdk/src/Ado/BulkUpsert/YdbBulkUpsertImporter.cs

View workflow job for this annotation

GitHub Actions / Inspection (./src/YdbSdk.sln)

"[CheckNamespace] Namespace does not correspond to file location, must be: 'Ydb.Sdk.Ado.BulkUpsert'" on /home/runner/work/ydb-dotnet-sdk/ydb-dotnet-sdk/src/Ydb.Sdk/src/Ado/BulkUpsert/YdbBulkUpsertImporter.cs(7,11)
{
public sealed class YdbBulkUpsertImporter<T> : IAsyncDisposable
{
private readonly YdbConnection _connection;
private readonly string _tablePath;
private readonly BulkUpsertOptions _options;
private readonly List<T> _buffer = new();
private readonly int _maxBatchSizeBytes;
private int _bufferSizeBytes;
private bool _isCompleted;

public YdbBulkUpsertImporter(
YdbConnection connection,
string tablePath,
BulkUpsertOptions? options = null,
int maxBatchSizeBytes = 64 * 1024 * 1024)
{
_connection = connection ?? throw new ArgumentNullException(nameof(connection));
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Не вижу, чтобы закрывался connection

_tablePath = tablePath ?? throw new ArgumentNullException(nameof(tablePath));
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

зачем проверки на null, если параметр not nullable

_options = options ?? new BulkUpsertOptions();
_maxBatchSizeBytes = maxBatchSizeBytes;
}

public async Task WriteRowAsync(T row, CancellationToken cancellationToken = default)
{
if (_isCompleted)
throw new InvalidOperationException("BulkUpsertImporter уже завершён.");

_buffer.Add(row);

var rowProto = TypedValueFactory.FromObjects(new[] { row });
var rowSize = rowProto.CalculateSize();

_bufferSizeBytes += rowSize;

if (_bufferSizeBytes >= _maxBatchSizeBytes)
{
await FlushAsync(cancellationToken).ConfigureAwait(false);
}
}

public async Task FlushAsync(CancellationToken cancellationToken = default)
{
if (_buffer.Count == 0)
return;

await _connection.BulkUpsertInternalAsync(_tablePath, _buffer, _options, cancellationToken).ConfigureAwait(false);

_buffer.Clear();
_bufferSizeBytes = 0;
}

public async Task CompleteAsync(CancellationToken cancellationToken = default)
{
if (_isCompleted) return;
await FlushAsync(cancellationToken).ConfigureAwait(false);
_isCompleted = true;
}

public async ValueTask DisposeAsync()
{
if (!_isCompleted)
await CompleteAsync();
}
}
}
84 changes: 57 additions & 27 deletions src/Ydb.Sdk/src/Ado/YdbConnection.cs
Original file line number Diff line number Diff line change
@@ -1,12 +1,18 @@
using System.Data;
using System.Data.Common;
using System.Diagnostics.CodeAnalysis;
using Ydb.Operations;
using Ydb.Sdk.Ado.BulkUpsert;
using Ydb.Sdk.Ado.Session;
using Ydb.Sdk.Services.Query;
using Ydb.Table;
using static System.Data.IsolationLevel;

namespace Ydb.Sdk.Ado;

/// <summary>
/// YDB database connection implementation.
/// </summary>
public sealed class YdbConnection : DbConnection
{
private static readonly StateChangeEventArgs ClosedToOpenEventArgs =
Expand All @@ -25,22 +31,20 @@
[param: AllowNull] init => _connectionStringBuilder = value;
}

// ISession для абстракции!
internal ISession Session
{
get
{
ThrowIfConnectionClosed();

return _session;
}
private set => _session = value;
}

private ISession _session = null!;

public YdbConnection()
{
}
public YdbConnection() { }

public YdbConnection(string connectionString)
{
Expand All @@ -52,6 +56,37 @@
ConnectionStringBuilder = connectionStringBuilder;
}

public YdbBulkUpsertImporter<T> BeginBulkUpsert<T>(
string tablePath,
BulkUpsertOptions? options = null,
int maxBatchSizeBytes = 64 * 1024 * 1024) // 64 Mb
{
return new YdbBulkUpsertImporter<T>(this, tablePath, options, maxBatchSizeBytes);
}

internal async Task BulkUpsertInternalAsync<T>(
string tablePath,
IReadOnlyCollection<T> rows,
BulkUpsertOptions options,
CancellationToken cancellationToken)
{
if (CurrentTransaction is { Completed: false })
throw new InvalidOperationException("BulkUpsert does not support working within a transaction");
var req = new BulkUpsertRequest
{
Table = tablePath,
OperationParams = new OperationParams(),
Rows = TypedValueFactory.FromObjects(rows)
};

var sessionImpl = Session as Services.Query.Session
?? throw new InvalidOperationException("Underlying session does not support BulkUpsertAsync");

var resp = await sessionImpl.BulkUpsertAsync(req, cancellationToken).ConfigureAwait(false);
var status = Status.FromProto(resp.Operation.Status, resp.Operation.Issues);
status.EnsureSuccess();
}

protected override YdbTransaction BeginDbTransaction(IsolationLevel isolationLevel)
{
ThrowIfConnectionClosed();
Expand Down Expand Up @@ -92,21 +127,11 @@
public override async Task OpenAsync(CancellationToken cancellationToken)
{
ThrowIfConnectionOpen();
try
{
Session = await PoolManager.GetSession(ConnectionStringBuilder, cancellationToken);
}
catch (OperationCanceledException e)
{
throw new YdbException(StatusCode.Cancelled,
$"The connection pool has been exhausted, either raise 'MaxSessionPool' " +
$"(currently {ConnectionStringBuilder.MaxSessionPool}) or 'CreateSessionTimeout' " +
$"(currently {ConnectionStringBuilder.CreateSessionTimeout} seconds) in your connection string.", e
);
}

OnStateChange(ClosedToOpenEventArgs);
// Получаем сессию через PoolManager, приводим к ISession (интерфейс)
Session = await PoolManager.GetSession(ConnectionStringBuilder, cancellationToken);

OnStateChange(ClosedToOpenEventArgs);
ConnectionState = ConnectionState.Open;
}

Expand All @@ -130,12 +155,14 @@
}

OnStateChange(OpenToClosedEventArgs);

ConnectionState = ConnectionState.Closed;
}
finally
{
_session.Close();
if (_session is Services.Query.Session realSession)
await realSession.Release();
else
_session.Close();
}
}

Expand All @@ -144,12 +171,10 @@
get => _connectionStringBuilder?.ConnectionString ?? string.Empty;
#pragma warning disable CS8765 // Nullability of type of parameter doesn't match overridden member (possibly because of nullability attributes).
set
#pragma warning restore CS8765 // Nullability of type of parameter doesn't match overridden member (possibly because of nullability attributes).
#pragma warning restore CS8765
{
ThrowIfConnectionOpen();

// ReSharper disable once ConditionIsAlwaysTrueOrFalseAccordingToNullableAPIContract
_connectionStringBuilder = value != null ? new YdbConnectionStringBuilder(value) : null;

Check warning on line 177 in src/Ydb.Sdk/src/Ado/YdbConnection.cs

View workflow job for this annotation

GitHub Actions / Inspection (./src/YdbSdk.sln)

"[ConditionIsAlwaysTrueOrFalseAccordingToNullableAPIContract] Expression is always true according to nullable reference types' annotations" on /home/runner/work/ydb-dotnet-sdk/ydb-dotnet-sdk/src/Ydb.Sdk/src/Ado/YdbConnection.cs(177,40)
}
}

Expand All @@ -163,7 +188,13 @@
{
_session.OnNotSuccessStatusCode(code);

if (_session.IsBroken)
// Проверяем статус сессии: если есть IsActive или IsBroken — корректно переключаем состояние подключения
if (_session is Services.Query.Session sessionImpl)
{
if (!sessionImpl.IsActive)
ConnectionState = ConnectionState.Broken;
}
else if (_session.IsBroken)
{
ConnectionState = ConnectionState.Broken;
}
Expand All @@ -174,15 +205,14 @@
internal bool IsBusy => LastReader is { IsOpen: true };
internal YdbTransaction? CurrentTransaction { get; private set; }

public override string DataSource => string.Empty; // TODO
public override string DataSource => string.Empty; // TODO: DataSource

public override string ServerVersion
{
get
{
ThrowIfConnectionClosed();

return string.Empty; // TODO ServerVersion
return string.Empty; // TODO: ServerVersion
}
}

Expand Down Expand Up @@ -271,4 +301,4 @@
/// to their pool.
/// </summary>
public static Task ClearAllPools() => PoolManager.ClearAllPools();
}
}
11 changes: 11 additions & 0 deletions src/Ydb.Sdk/src/Ado/YdbDataSource.cs
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
#if NET7_0_OR_GREATER
using System.Data.Common;
using Ydb.Sdk.Ado.BulkUpsert;

namespace Ydb.Sdk.Ado;

Expand Down Expand Up @@ -58,6 +59,16 @@ protected override YdbConnection OpenDbConnection()
throw;
}
}

public async Task<YdbBulkUpsertImporter<T>> BeginBulkUpsertAsync<T>(
string tablePath,
BulkUpsertOptions? options = null,
int maxBatchSizeBytes = 64 * 1024 * 1024,
CancellationToken cancellationToken = default)
{
var conn = await OpenConnectionAsync(cancellationToken).ConfigureAwait(false);
return new YdbBulkUpsertImporter<T>(conn, tablePath, options, maxBatchSizeBytes);
}

public override string ConnectionString => _ydbConnectionStringBuilder.ConnectionString;

Expand Down
8 changes: 8 additions & 0 deletions src/Ydb.Sdk/src/Services/Query/SessionPool.cs
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,8 @@
using Ydb.Sdk.Ado.Session;
using Ydb.Sdk.Pool;
using Ydb.Sdk.Value;
using Ydb.Table;
using Ydb.Table.V1;
using CommitTransactionRequest = Ydb.Query.CommitTransactionRequest;
using CreateSessionRequest = Ydb.Query.CreateSessionRequest;
using DeleteSessionRequest = Ydb.Query.DeleteSessionRequest;
Expand Down Expand Up @@ -226,4 +228,10 @@ internal override async Task DeleteSession()
throw YdbException.FromServer(deleteSessionResponse.Status, deleteSessionResponse.Issues);
}
}

internal async Task<BulkUpsertResponse> BulkUpsertAsync(BulkUpsertRequest req, CancellationToken ct = default)
{
var settings = MakeGrpcRequestSettings(new GrpcRequestSettings { CancellationToken = ct });
return await Driver.UnaryCall(TableService.BulkUpsertMethod, req, settings);
}
}
2 changes: 2 additions & 0 deletions src/Ydb.Sdk/src/Ydb.Sdk.csproj
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,8 @@
</PropertyGroup>

<ItemGroup>
<PackageReference Include="Microsoft.Build.Tasks.Core" Version="17.7.2" />
<PackageReference Include="Microsoft.NETCore.Platforms" Version="8.0.0-preview.7.23375.6" />
<PackageReference Include="Ydb.Protos" Version="1.1.1" />
<PackageReference Include="Portable.BouncyCastle" Version="1.9.0" />
<PackageReference Include="System.IdentityModel.Tokens.Jwt" Version="8.5.0" />
Expand Down
Loading
Loading