Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 3 additions & 1 deletion src/Ydb.Sdk/CHANGELOG.md
Original file line number Diff line number Diff line change
@@ -1,9 +1,11 @@
- ADO.NET: Changed `IBulkUpsertImporter.AddRowAsync` signature: `object?[] row` → `params object[]`.

## v0.21.0

- ADO.NET: Added `MinPoolSize` setting to keep a minimum number of sessions ready in the PoolingSessionSource.
- ADO.NET: Added `SessionIdleTimeout` to remove idle sessions from the PoolingSessionSource automatically.
- ADO.NET: Made `PoolingSessionSource` faster and more reliable by using a lock-free LIFO stack.
- ADO.NET: Added `BeginBulkUpsertImport` for batch upsert operations with transaction checks and integration tests.
- ADO.NET: Added `BeginBulkUpsertImport` for batch upsert operations with transaction checks.
- Optimization: On BadSession, do not invoke the `DeleteSession()` method.
- Canceling AttachStream after calling the `DeleteSession` method.
- Fixed bug: fixed issue where session was not deleted (`ClientTransportTimeout`).
Expand Down
10 changes: 5 additions & 5 deletions src/Ydb.Sdk/src/Ado/BulkUpsert/BulkUpsertImporter.cs
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@ public sealed class BulkUpsertImporter : IBulkUpsertImporter
private readonly IDriver _driver;
private readonly string _tablePath;
private readonly IReadOnlyList<string> _columns;
private readonly int _maxBytes;
private readonly int _maxBatchByteSize;
private readonly RepeatedField<Ydb.Value> _rows = new();
private readonly CancellationToken _cancellationToken;
private StructType? _structType;
Expand All @@ -21,17 +21,17 @@ internal BulkUpsertImporter(
IDriver driver,
string tableName,
IReadOnlyList<string> columns,
int maxBytes,
int maxBatchByteSize,
CancellationToken cancellationToken = default)
{
_driver = driver;
_tablePath = tableName;
_columns = columns;
_maxBytes = maxBytes / 2;
_maxBatchByteSize = maxBatchByteSize / 2;
_cancellationToken = cancellationToken;
}

public async ValueTask AddRowAsync(object?[] values)
public async ValueTask AddRowAsync(params object[] values)
{
if (values.Length != _columns.Count)
throw new ArgumentException("Values count must match columns count", nameof(values));
Expand All @@ -46,7 +46,7 @@ public async ValueTask AddRowAsync(object?[] values)

var rowSize = protoStruct.CalculateSize();

if (_currentBytes + rowSize > _maxBytes && _rows.Count > 0)
if (_currentBytes + rowSize > _maxBatchByteSize && _rows.Count > 0)
{
await FlushAsync();
}
Expand Down
2 changes: 1 addition & 1 deletion src/Ydb.Sdk/src/Ado/BulkUpsert/IBulkUpsertImporter.cs
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@ namespace Ydb.Sdk.Ado.BulkUpsert;

public interface IBulkUpsertImporter
{
ValueTask AddRowAsync(object?[] row);
ValueTask AddRowAsync(params object[] row);

ValueTask FlushAsync();
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,66 @@
using BenchmarkDotNet.Attributes;
using Grpc.Core;
using Microsoft.Extensions.Logging;
using Ydb.Sdk.Ado.BulkUpsert;
using Ydb.Table;

namespace Ydb.Sdk.Ado.Benchmarks;

public class BulkUpsertImporterBenchmark
{
private const int BatchSize = 15000;
private readonly int _maxBatchByteSize = new YdbConnectionStringBuilder().MaxSendMessageSize;
private IDriver _driver = null!;
private IList<(Guid Guid, string String, int Int, double Double, DateTime DateTime)> _rows = null!;

[GlobalSetup]
public void Setup()
{
_driver = new BulkUpsertMockDriver();
_rows = Enumerable.Range(0, BatchSize)
.Select(i => new ValueTuple<Guid, string, int, double, DateTime>(Guid.NewGuid(),
Guid.NewGuid() + "_" + Guid.NewGuid(), i, i * 1.324, DateTime.Now))
.ToList();
}

[Benchmark]
public async Task FlushAsync_BulkUpsertImporter()
{
var bulkUpsertImporter = new BulkUpsertImporter(_driver, "table_name", ["c1", "c2", "c3", "c4", "c5"],
_maxBatchByteSize, CancellationToken.None);

foreach (var row in _rows)
{
await bulkUpsertImporter.AddRowAsync(row.Guid, row.String, row.Int, row.Double, row.DateTime);
}

await bulkUpsertImporter.FlushAsync();
}
}

internal class BulkUpsertMockDriver : IDriver
{
public ValueTask DisposeAsync() => throw new NotImplementedException();

public void Dispose() => throw new NotImplementedException();

public Task<TResponse>
UnaryCall<TRequest, TResponse>(Method<TRequest, TResponse> method, TRequest request,
GrpcRequestSettings settings) where TRequest : class where TResponse : class =>
Task.FromResult((TResponse)(object)new BulkUpsertResponse
{
Operation = new Operations.Operation { Status = StatusIds.Types.StatusCode.Success }
});

public ValueTask<IServerStream<TResponse>> ServerStreamCall<TRequest,
TResponse>(Method<TRequest, TResponse> method, TRequest request, GrpcRequestSettings settings) where
TRequest : class
where TResponse : class => throw new NotImplementedException();

public ValueTask<IBidirectionalStream<TRequest,
TResponse>> BidirectionalStreamCall<TRequest, TResponse>(Method<TRequest, TResponse> method,
GrpcRequestSettings settings) where TRequest : class where TResponse : class => throw new
NotImplementedException();

public ILoggerFactory LoggerFactory => null!;
}
16 changes: 15 additions & 1 deletion src/Ydb.Sdk/test/Ydb.Sdk.Ado.Benchmarks/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -54,4 +54,18 @@ DefaultJob : .NET 8.0.2 (8.0.224.6711), Arm64 RyuJIT AdvSIMD
| HighContention_OpenClose | 91,700.72 ns | 1,803.206 ns | 3,842.780 ns | 204.6633 | 0.0007 | 5.0049 | - | 41951 B |
| SessionReuse_Pattern | 117,545.11 ns | 2,226.365 ns | 4,014.595 ns | 220.0000 | 0.0001 | 1.5869 | - | 13656 B |
| SessionReuse_HighContention_Pattern | 7,463,819.00 ns | 148,409.083 ns | 364,050.038 ns | 19044.6172 | 1.1719 | 765.6250 | 125.0000 | 6367528 B |
| SessionReuse_HighIterations_Pattern | 70,844,972.06 ns | 1,400,128.942 ns | 3,589,066.009 ns | 200020.0000 | - | 750.0000 | - | 6407440 B |
| SessionReuse_HighIterations_Pattern | 70,844,972.06 ns | 1,400,128.942 ns | 3,589,066.009 ns | 200020.0000 | - | 750.0000 | - | 6407440 B |

# BulkUpsertImporter Benchmarks

ButchSize = 15_000

| Method | Mean | Error | StdDev |
|-------------------------------|---------:|---------:|---------:|
| FlushAsync_BulkUpsertImporter | 32.41 ms | 0.546 ms | 0.456 ms |

ButchSize = 1_000

| Method | Mean | Error | StdDev |
|-------------------------------|---------:|---------:|---------:|
| FlushAsync_BulkUpsertImporter | 583.3 us | 11.32 us | 13.03 us |
63 changes: 30 additions & 33 deletions src/Ydb.Sdk/test/Ydb.Sdk.Ado.Tests/YdbConnectionTests.cs
Original file line number Diff line number Diff line change
Expand Up @@ -316,21 +316,22 @@ public async Task BulkUpsertImporter_HappyPath_Add_Flush()
{
await using (var createCmd = conn.CreateCommand())
{
createCmd.CommandText = $@"
CREATE TABLE {tableName} (
Id Int32,
Name Utf8,
PRIMARY KEY (Id)
)";
createCmd.CommandText = $"""
CREATE TABLE {tableName} (
Id Int32,
Name Utf8,
PRIMARY KEY (Id)
)
""";
await createCmd.ExecuteNonQueryAsync();
}

var columns = new[] { "Id", "Name" };

var importer = conn.BeginBulkUpsertImport(tableName, columns);

await importer.AddRowAsync([YdbValue.MakeInt32(1), YdbValue.MakeUtf8("Alice")]);
await importer.AddRowAsync([YdbValue.MakeInt32(2), YdbValue.MakeUtf8("Bob")]);
await importer.AddRowAsync(YdbValue.MakeInt32(1), YdbValue.MakeUtf8("Alice"));
await importer.AddRowAsync(YdbValue.MakeInt32(2), YdbValue.MakeUtf8("Bob"));
await importer.FlushAsync();

await using (var checkCmd = conn.CreateCommand())
Expand All @@ -341,8 +342,8 @@ PRIMARY KEY (Id)
}

importer = conn.BeginBulkUpsertImport(tableName, columns);
await importer.AddRowAsync([YdbValue.MakeInt32(3), YdbValue.MakeUtf8("Charlie")]);
await importer.AddRowAsync([YdbValue.MakeInt32(4), YdbValue.MakeUtf8("Diana")]);
await importer.AddRowAsync(YdbValue.MakeInt32(3), YdbValue.MakeUtf8("Charlie"));
await importer.AddRowAsync(YdbValue.MakeInt32(4), YdbValue.MakeUtf8("Diana"));
await importer.FlushAsync();

await using (var checkCmd = conn.CreateCommand())
Expand Down Expand Up @@ -375,28 +376,22 @@ public async Task BulkUpsertImporter_ThrowsOnInvalidRowCount()
{
await using (var createCmd = conn.CreateCommand())
{
createCmd.CommandText = $@"
CREATE TABLE {tableName} (
Id Int32,
Name Utf8,
PRIMARY KEY (Id)
)";
createCmd.CommandText = $"""
CREATE TABLE {tableName} (
Id Int32,
Name Utf8,
PRIMARY KEY (Id)
)
""";
await createCmd.ExecuteNonQueryAsync();
}

var columns = new[] { "Id", "Name" };

var importer = conn.BeginBulkUpsertImport(tableName, columns);

var badRow = new object?[] { YdbValue.MakeInt32(1) };
await Assert.ThrowsAsync<ArgumentException>(async () => await importer.AddRowAsync([badRow]));

await Assert.ThrowsAsync<ArgumentException>(async () =>
{
await importer.AddRowAsync([
new object?[] { YdbValue.MakeInt32(2) }
]);
});
await Assert.ThrowsAsync<ArgumentException>(async () => await importer.AddRowAsync(1));
await Assert.ThrowsAsync<ArgumentException>(async () => await importer.AddRowAsync(2));
}
finally
{
Expand All @@ -418,11 +413,13 @@ public async Task BulkUpsertImporter_MultipleImporters_Parallel()
foreach (var table in new[] { table1, table2 })
{
await using var createCmd = conn.CreateCommand();
createCmd.CommandText = $@"CREATE TABLE {table} (
Id Int32,
Name Utf8,
PRIMARY KEY (Id)
)";
createCmd.CommandText = $"""
CREATE TABLE {table} (
Id Int32,
Name Utf8,
PRIMARY KEY (Id)
)
""";
await createCmd.ExecuteNonQueryAsync();
}

Expand All @@ -433,7 +430,7 @@ await Task.WhenAll(
{
var importer = conn.BeginBulkUpsertImport(table1, columns);
var rows = Enumerable.Range(0, 20)
.Select(i => new object?[] { YdbValue.MakeInt32(i), YdbValue.MakeUtf8($"A{i}") })
.Select(i => new object[] { YdbValue.MakeInt32(i), YdbValue.MakeUtf8($"A{i}") })
.ToArray();
foreach (var row in rows)
await importer.AddRowAsync(row);
Expand All @@ -443,7 +440,7 @@ await Task.WhenAll(
{
var importer = conn.BeginBulkUpsertImport(table2, columns);
var rows = Enumerable.Range(0, 20)
.Select(i => new object?[] { YdbValue.MakeInt32(i), YdbValue.MakeUtf8($"B{i}") })
.Select(i => new object[] { YdbValue.MakeInt32(i), YdbValue.MakeUtf8($"B{i}") })
.ToArray();
foreach (var row in rows)
await importer.AddRowAsync(row);
Expand Down Expand Up @@ -482,7 +479,7 @@ public async Task BulkUpsertImporter_ThrowsOnNonexistentTable()

var importer = conn.BeginBulkUpsertImport(tableName, columns);

await importer.AddRowAsync([YdbValue.MakeInt32(1), YdbValue.MakeUtf8("NotExists")]);
await importer.AddRowAsync(YdbValue.MakeInt32(1), YdbValue.MakeUtf8("NotExists"));

await Assert.ThrowsAsync<YdbException>(async () => { await importer.FlushAsync(); });
}
Expand Down
Loading