diff --git a/src/Ydb.Sdk/CHANGELOG.md b/src/Ydb.Sdk/CHANGELOG.md index f7d99293..8760ab88 100644 --- a/src/Ydb.Sdk/CHANGELOG.md +++ b/src/Ydb.Sdk/CHANGELOG.md @@ -1,9 +1,11 @@ +- ADO.NET: Changed `IBulkUpsertImporter.AddRowAsync` signature: `object?[] row` → `params object[]`. + ## v0.21.0 - ADO.NET: Added `MinPoolSize` setting to keep a minimum number of sessions ready in the PoolingSessionSource. - ADO.NET: Added `SessionIdleTimeout` to remove idle sessions from the PoolingSessionSource automatically. - ADO.NET: Made `PoolingSessionSource` faster and more reliable by using a lock-free LIFO stack. -- ADO.NET: Added `BeginBulkUpsertImport` for batch upsert operations with transaction checks and integration tests. +- ADO.NET: Added `BeginBulkUpsertImport` for batch upsert operations with transaction checks. - Optimization: On BadSession, do not invoke the `DeleteSession()` method. - Canceling AttachStream after calling the `DeleteSession` method. - Fixed bug: fixed issue where session was not deleted (`ClientTransportTimeout`). diff --git a/src/Ydb.Sdk/src/Ado/BulkUpsert/BulkUpsertImporter.cs b/src/Ydb.Sdk/src/Ado/BulkUpsert/BulkUpsertImporter.cs index 38a4f0cc..6a87a096 100644 --- a/src/Ydb.Sdk/src/Ado/BulkUpsert/BulkUpsertImporter.cs +++ b/src/Ydb.Sdk/src/Ado/BulkUpsert/BulkUpsertImporter.cs @@ -11,7 +11,7 @@ public sealed class BulkUpsertImporter : IBulkUpsertImporter private readonly IDriver _driver; private readonly string _tablePath; private readonly IReadOnlyList _columns; - private readonly int _maxBytes; + private readonly int _maxBatchByteSize; private readonly RepeatedField _rows = new(); private readonly CancellationToken _cancellationToken; private StructType? _structType; @@ -21,17 +21,17 @@ internal BulkUpsertImporter( IDriver driver, string tableName, IReadOnlyList columns, - int maxBytes, + int maxBatchByteSize, CancellationToken cancellationToken = default) { _driver = driver; _tablePath = tableName; _columns = columns; - _maxBytes = maxBytes / 2; + _maxBatchByteSize = maxBatchByteSize / 2; _cancellationToken = cancellationToken; } - public async ValueTask AddRowAsync(object?[] values) + public async ValueTask AddRowAsync(params object[] values) { if (values.Length != _columns.Count) throw new ArgumentException("Values count must match columns count", nameof(values)); @@ -46,7 +46,7 @@ public async ValueTask AddRowAsync(object?[] values) var rowSize = protoStruct.CalculateSize(); - if (_currentBytes + rowSize > _maxBytes && _rows.Count > 0) + if (_currentBytes + rowSize > _maxBatchByteSize && _rows.Count > 0) { await FlushAsync(); } diff --git a/src/Ydb.Sdk/src/Ado/BulkUpsert/IBulkUpsertImporter.cs b/src/Ydb.Sdk/src/Ado/BulkUpsert/IBulkUpsertImporter.cs index da8d65b8..fa687609 100644 --- a/src/Ydb.Sdk/src/Ado/BulkUpsert/IBulkUpsertImporter.cs +++ b/src/Ydb.Sdk/src/Ado/BulkUpsert/IBulkUpsertImporter.cs @@ -2,7 +2,7 @@ namespace Ydb.Sdk.Ado.BulkUpsert; public interface IBulkUpsertImporter { - ValueTask AddRowAsync(object?[] row); + ValueTask AddRowAsync(params object[] row); ValueTask FlushAsync(); } diff --git a/src/Ydb.Sdk/test/Ydb.Sdk.Ado.Benchmarks/BulkUpsertImporterBenchmark.cs b/src/Ydb.Sdk/test/Ydb.Sdk.Ado.Benchmarks/BulkUpsertImporterBenchmark.cs new file mode 100644 index 00000000..06fbc0e7 --- /dev/null +++ b/src/Ydb.Sdk/test/Ydb.Sdk.Ado.Benchmarks/BulkUpsertImporterBenchmark.cs @@ -0,0 +1,66 @@ +using BenchmarkDotNet.Attributes; +using Grpc.Core; +using Microsoft.Extensions.Logging; +using Ydb.Sdk.Ado.BulkUpsert; +using Ydb.Table; + +namespace Ydb.Sdk.Ado.Benchmarks; + +public class BulkUpsertImporterBenchmark +{ + private const int BatchSize = 15000; + private readonly int _maxBatchByteSize = new YdbConnectionStringBuilder().MaxSendMessageSize; + private IDriver _driver = null!; + private IList<(Guid Guid, string String, int Int, double Double, DateTime DateTime)> _rows = null!; + + [GlobalSetup] + public void Setup() + { + _driver = new BulkUpsertMockDriver(); + _rows = Enumerable.Range(0, BatchSize) + .Select(i => new ValueTuple(Guid.NewGuid(), + Guid.NewGuid() + "_" + Guid.NewGuid(), i, i * 1.324, DateTime.Now)) + .ToList(); + } + + [Benchmark] + public async Task FlushAsync_BulkUpsertImporter() + { + var bulkUpsertImporter = new BulkUpsertImporter(_driver, "table_name", ["c1", "c2", "c3", "c4", "c5"], + _maxBatchByteSize, CancellationToken.None); + + foreach (var row in _rows) + { + await bulkUpsertImporter.AddRowAsync(row.Guid, row.String, row.Int, row.Double, row.DateTime); + } + + await bulkUpsertImporter.FlushAsync(); + } +} + +internal class BulkUpsertMockDriver : IDriver +{ + public ValueTask DisposeAsync() => throw new NotImplementedException(); + + public void Dispose() => throw new NotImplementedException(); + + public Task + UnaryCall(Method method, TRequest request, + GrpcRequestSettings settings) where TRequest : class where TResponse : class => + Task.FromResult((TResponse)(object)new BulkUpsertResponse + { + Operation = new Operations.Operation { Status = StatusIds.Types.StatusCode.Success } + }); + + public ValueTask> ServerStreamCall(Method method, TRequest request, GrpcRequestSettings settings) where + TRequest : class + where TResponse : class => throw new NotImplementedException(); + + public ValueTask> BidirectionalStreamCall(Method method, + GrpcRequestSettings settings) where TRequest : class where TResponse : class => throw new + NotImplementedException(); + + public ILoggerFactory LoggerFactory => null!; +} diff --git a/src/Ydb.Sdk/test/Ydb.Sdk.Ado.Benchmarks/README.md b/src/Ydb.Sdk/test/Ydb.Sdk.Ado.Benchmarks/README.md index 393bb083..2ca6d833 100644 --- a/src/Ydb.Sdk/test/Ydb.Sdk.Ado.Benchmarks/README.md +++ b/src/Ydb.Sdk/test/Ydb.Sdk.Ado.Benchmarks/README.md @@ -54,4 +54,18 @@ DefaultJob : .NET 8.0.2 (8.0.224.6711), Arm64 RyuJIT AdvSIMD | HighContention_OpenClose | 91,700.72 ns | 1,803.206 ns | 3,842.780 ns | 204.6633 | 0.0007 | 5.0049 | - | 41951 B | | SessionReuse_Pattern | 117,545.11 ns | 2,226.365 ns | 4,014.595 ns | 220.0000 | 0.0001 | 1.5869 | - | 13656 B | | SessionReuse_HighContention_Pattern | 7,463,819.00 ns | 148,409.083 ns | 364,050.038 ns | 19044.6172 | 1.1719 | 765.6250 | 125.0000 | 6367528 B | -| SessionReuse_HighIterations_Pattern | 70,844,972.06 ns | 1,400,128.942 ns | 3,589,066.009 ns | 200020.0000 | - | 750.0000 | - | 6407440 B | \ No newline at end of file +| SessionReuse_HighIterations_Pattern | 70,844,972.06 ns | 1,400,128.942 ns | 3,589,066.009 ns | 200020.0000 | - | 750.0000 | - | 6407440 B | + +# BulkUpsertImporter Benchmarks + +ButchSize = 15_000 + +| Method | Mean | Error | StdDev | +|-------------------------------|---------:|---------:|---------:| +| FlushAsync_BulkUpsertImporter | 32.41 ms | 0.546 ms | 0.456 ms | + +ButchSize = 1_000 + +| Method | Mean | Error | StdDev | +|-------------------------------|---------:|---------:|---------:| +| FlushAsync_BulkUpsertImporter | 583.3 us | 11.32 us | 13.03 us | \ No newline at end of file diff --git a/src/Ydb.Sdk/test/Ydb.Sdk.Ado.Tests/YdbConnectionTests.cs b/src/Ydb.Sdk/test/Ydb.Sdk.Ado.Tests/YdbConnectionTests.cs index ce51963b..3001816a 100644 --- a/src/Ydb.Sdk/test/Ydb.Sdk.Ado.Tests/YdbConnectionTests.cs +++ b/src/Ydb.Sdk/test/Ydb.Sdk.Ado.Tests/YdbConnectionTests.cs @@ -316,12 +316,13 @@ public async Task BulkUpsertImporter_HappyPath_Add_Flush() { await using (var createCmd = conn.CreateCommand()) { - createCmd.CommandText = $@" - CREATE TABLE {tableName} ( - Id Int32, - Name Utf8, - PRIMARY KEY (Id) - )"; + createCmd.CommandText = $""" + CREATE TABLE {tableName} ( + Id Int32, + Name Utf8, + PRIMARY KEY (Id) + ) + """; await createCmd.ExecuteNonQueryAsync(); } @@ -329,8 +330,8 @@ PRIMARY KEY (Id) var importer = conn.BeginBulkUpsertImport(tableName, columns); - await importer.AddRowAsync([YdbValue.MakeInt32(1), YdbValue.MakeUtf8("Alice")]); - await importer.AddRowAsync([YdbValue.MakeInt32(2), YdbValue.MakeUtf8("Bob")]); + await importer.AddRowAsync(YdbValue.MakeInt32(1), YdbValue.MakeUtf8("Alice")); + await importer.AddRowAsync(YdbValue.MakeInt32(2), YdbValue.MakeUtf8("Bob")); await importer.FlushAsync(); await using (var checkCmd = conn.CreateCommand()) @@ -341,8 +342,8 @@ PRIMARY KEY (Id) } importer = conn.BeginBulkUpsertImport(tableName, columns); - await importer.AddRowAsync([YdbValue.MakeInt32(3), YdbValue.MakeUtf8("Charlie")]); - await importer.AddRowAsync([YdbValue.MakeInt32(4), YdbValue.MakeUtf8("Diana")]); + await importer.AddRowAsync(YdbValue.MakeInt32(3), YdbValue.MakeUtf8("Charlie")); + await importer.AddRowAsync(YdbValue.MakeInt32(4), YdbValue.MakeUtf8("Diana")); await importer.FlushAsync(); await using (var checkCmd = conn.CreateCommand()) @@ -375,12 +376,13 @@ public async Task BulkUpsertImporter_ThrowsOnInvalidRowCount() { await using (var createCmd = conn.CreateCommand()) { - createCmd.CommandText = $@" - CREATE TABLE {tableName} ( - Id Int32, - Name Utf8, - PRIMARY KEY (Id) - )"; + createCmd.CommandText = $""" + CREATE TABLE {tableName} ( + Id Int32, + Name Utf8, + PRIMARY KEY (Id) + ) + """; await createCmd.ExecuteNonQueryAsync(); } @@ -388,15 +390,8 @@ PRIMARY KEY (Id) var importer = conn.BeginBulkUpsertImport(tableName, columns); - var badRow = new object?[] { YdbValue.MakeInt32(1) }; - await Assert.ThrowsAsync(async () => await importer.AddRowAsync([badRow])); - - await Assert.ThrowsAsync(async () => - { - await importer.AddRowAsync([ - new object?[] { YdbValue.MakeInt32(2) } - ]); - }); + await Assert.ThrowsAsync(async () => await importer.AddRowAsync(1)); + await Assert.ThrowsAsync(async () => await importer.AddRowAsync(2)); } finally { @@ -418,11 +413,13 @@ public async Task BulkUpsertImporter_MultipleImporters_Parallel() foreach (var table in new[] { table1, table2 }) { await using var createCmd = conn.CreateCommand(); - createCmd.CommandText = $@"CREATE TABLE {table} ( - Id Int32, - Name Utf8, - PRIMARY KEY (Id) - )"; + createCmd.CommandText = $""" + CREATE TABLE {table} ( + Id Int32, + Name Utf8, + PRIMARY KEY (Id) + ) + """; await createCmd.ExecuteNonQueryAsync(); } @@ -433,7 +430,7 @@ await Task.WhenAll( { var importer = conn.BeginBulkUpsertImport(table1, columns); var rows = Enumerable.Range(0, 20) - .Select(i => new object?[] { YdbValue.MakeInt32(i), YdbValue.MakeUtf8($"A{i}") }) + .Select(i => new object[] { YdbValue.MakeInt32(i), YdbValue.MakeUtf8($"A{i}") }) .ToArray(); foreach (var row in rows) await importer.AddRowAsync(row); @@ -443,7 +440,7 @@ await Task.WhenAll( { var importer = conn.BeginBulkUpsertImport(table2, columns); var rows = Enumerable.Range(0, 20) - .Select(i => new object?[] { YdbValue.MakeInt32(i), YdbValue.MakeUtf8($"B{i}") }) + .Select(i => new object[] { YdbValue.MakeInt32(i), YdbValue.MakeUtf8($"B{i}") }) .ToArray(); foreach (var row in rows) await importer.AddRowAsync(row); @@ -482,7 +479,7 @@ public async Task BulkUpsertImporter_ThrowsOnNonexistentTable() var importer = conn.BeginBulkUpsertImport(tableName, columns); - await importer.AddRowAsync([YdbValue.MakeInt32(1), YdbValue.MakeUtf8("NotExists")]); + await importer.AddRowAsync(YdbValue.MakeInt32(1), YdbValue.MakeUtf8("NotExists")); await Assert.ThrowsAsync(async () => { await importer.FlushAsync(); }); }