Skip to content

Commit 5ec38b4

Browse files
Dev: added BulkUpsertImporterBenchmark.cs (#496)
1 parent 4786134 commit 5ec38b4

File tree

6 files changed

+120
-41
lines changed

6 files changed

+120
-41
lines changed

src/Ydb.Sdk/CHANGELOG.md

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,9 +1,11 @@
1+
- ADO.NET: Changed `IBulkUpsertImporter.AddRowAsync` signature: `object?[] row``params object[]`.
2+
13
## v0.21.0
24

35
- ADO.NET: Added `MinPoolSize` setting to keep a minimum number of sessions ready in the PoolingSessionSource.
46
- ADO.NET: Added `SessionIdleTimeout` to remove idle sessions from the PoolingSessionSource automatically.
57
- ADO.NET: Made `PoolingSessionSource` faster and more reliable by using a lock-free LIFO stack.
6-
- ADO.NET: Added `BeginBulkUpsertImport` for batch upsert operations with transaction checks and integration tests.
8+
- ADO.NET: Added `BeginBulkUpsertImport` for batch upsert operations with transaction checks.
79
- Optimization: On BadSession, do not invoke the `DeleteSession()` method.
810
- Canceling AttachStream after calling the `DeleteSession` method.
911
- Fixed bug: fixed issue where session was not deleted (`ClientTransportTimeout`).

src/Ydb.Sdk/src/Ado/BulkUpsert/BulkUpsertImporter.cs

Lines changed: 5 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -11,7 +11,7 @@ public sealed class BulkUpsertImporter : IBulkUpsertImporter
1111
private readonly IDriver _driver;
1212
private readonly string _tablePath;
1313
private readonly IReadOnlyList<string> _columns;
14-
private readonly int _maxBytes;
14+
private readonly int _maxBatchByteSize;
1515
private readonly RepeatedField<Ydb.Value> _rows = new();
1616
private readonly CancellationToken _cancellationToken;
1717
private StructType? _structType;
@@ -21,17 +21,17 @@ internal BulkUpsertImporter(
2121
IDriver driver,
2222
string tableName,
2323
IReadOnlyList<string> columns,
24-
int maxBytes,
24+
int maxBatchByteSize,
2525
CancellationToken cancellationToken = default)
2626
{
2727
_driver = driver;
2828
_tablePath = tableName;
2929
_columns = columns;
30-
_maxBytes = maxBytes / 2;
30+
_maxBatchByteSize = maxBatchByteSize / 2;
3131
_cancellationToken = cancellationToken;
3232
}
3333

34-
public async ValueTask AddRowAsync(object?[] values)
34+
public async ValueTask AddRowAsync(params object[] values)
3535
{
3636
if (values.Length != _columns.Count)
3737
throw new ArgumentException("Values count must match columns count", nameof(values));
@@ -46,7 +46,7 @@ public async ValueTask AddRowAsync(object?[] values)
4646

4747
var rowSize = protoStruct.CalculateSize();
4848

49-
if (_currentBytes + rowSize > _maxBytes && _rows.Count > 0)
49+
if (_currentBytes + rowSize > _maxBatchByteSize && _rows.Count > 0)
5050
{
5151
await FlushAsync();
5252
}

src/Ydb.Sdk/src/Ado/BulkUpsert/IBulkUpsertImporter.cs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -2,7 +2,7 @@ namespace Ydb.Sdk.Ado.BulkUpsert;
22

33
public interface IBulkUpsertImporter
44
{
5-
ValueTask AddRowAsync(object?[] row);
5+
ValueTask AddRowAsync(params object[] row);
66

77
ValueTask FlushAsync();
88
}
Lines changed: 66 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,66 @@
1+
using BenchmarkDotNet.Attributes;
2+
using Grpc.Core;
3+
using Microsoft.Extensions.Logging;
4+
using Ydb.Sdk.Ado.BulkUpsert;
5+
using Ydb.Table;
6+
7+
namespace Ydb.Sdk.Ado.Benchmarks;
8+
9+
public class BulkUpsertImporterBenchmark
10+
{
11+
private const int BatchSize = 15000;
12+
private readonly int _maxBatchByteSize = new YdbConnectionStringBuilder().MaxSendMessageSize;
13+
private IDriver _driver = null!;
14+
private IList<(Guid Guid, string String, int Int, double Double, DateTime DateTime)> _rows = null!;
15+
16+
[GlobalSetup]
17+
public void Setup()
18+
{
19+
_driver = new BulkUpsertMockDriver();
20+
_rows = Enumerable.Range(0, BatchSize)
21+
.Select(i => new ValueTuple<Guid, string, int, double, DateTime>(Guid.NewGuid(),
22+
Guid.NewGuid() + "_" + Guid.NewGuid(), i, i * 1.324, DateTime.Now))
23+
.ToList();
24+
}
25+
26+
[Benchmark]
27+
public async Task FlushAsync_BulkUpsertImporter()
28+
{
29+
var bulkUpsertImporter = new BulkUpsertImporter(_driver, "table_name", ["c1", "c2", "c3", "c4", "c5"],
30+
_maxBatchByteSize, CancellationToken.None);
31+
32+
foreach (var row in _rows)
33+
{
34+
await bulkUpsertImporter.AddRowAsync(row.Guid, row.String, row.Int, row.Double, row.DateTime);
35+
}
36+
37+
await bulkUpsertImporter.FlushAsync();
38+
}
39+
}
40+
41+
internal class BulkUpsertMockDriver : IDriver
42+
{
43+
public ValueTask DisposeAsync() => throw new NotImplementedException();
44+
45+
public void Dispose() => throw new NotImplementedException();
46+
47+
public Task<TResponse>
48+
UnaryCall<TRequest, TResponse>(Method<TRequest, TResponse> method, TRequest request,
49+
GrpcRequestSettings settings) where TRequest : class where TResponse : class =>
50+
Task.FromResult((TResponse)(object)new BulkUpsertResponse
51+
{
52+
Operation = new Operations.Operation { Status = StatusIds.Types.StatusCode.Success }
53+
});
54+
55+
public ValueTask<IServerStream<TResponse>> ServerStreamCall<TRequest,
56+
TResponse>(Method<TRequest, TResponse> method, TRequest request, GrpcRequestSettings settings) where
57+
TRequest : class
58+
where TResponse : class => throw new NotImplementedException();
59+
60+
public ValueTask<IBidirectionalStream<TRequest,
61+
TResponse>> BidirectionalStreamCall<TRequest, TResponse>(Method<TRequest, TResponse> method,
62+
GrpcRequestSettings settings) where TRequest : class where TResponse : class => throw new
63+
NotImplementedException();
64+
65+
public ILoggerFactory LoggerFactory => null!;
66+
}

src/Ydb.Sdk/test/Ydb.Sdk.Ado.Benchmarks/README.md

Lines changed: 15 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -54,4 +54,18 @@ DefaultJob : .NET 8.0.2 (8.0.224.6711), Arm64 RyuJIT AdvSIMD
5454
| HighContention_OpenClose | 91,700.72 ns | 1,803.206 ns | 3,842.780 ns | 204.6633 | 0.0007 | 5.0049 | - | 41951 B |
5555
| SessionReuse_Pattern | 117,545.11 ns | 2,226.365 ns | 4,014.595 ns | 220.0000 | 0.0001 | 1.5869 | - | 13656 B |
5656
| SessionReuse_HighContention_Pattern | 7,463,819.00 ns | 148,409.083 ns | 364,050.038 ns | 19044.6172 | 1.1719 | 765.6250 | 125.0000 | 6367528 B |
57-
| SessionReuse_HighIterations_Pattern | 70,844,972.06 ns | 1,400,128.942 ns | 3,589,066.009 ns | 200020.0000 | - | 750.0000 | - | 6407440 B |
57+
| SessionReuse_HighIterations_Pattern | 70,844,972.06 ns | 1,400,128.942 ns | 3,589,066.009 ns | 200020.0000 | - | 750.0000 | - | 6407440 B |
58+
59+
# BulkUpsertImporter Benchmarks
60+
61+
ButchSize = 15_000
62+
63+
| Method | Mean | Error | StdDev |
64+
|-------------------------------|---------:|---------:|---------:|
65+
| FlushAsync_BulkUpsertImporter | 32.41 ms | 0.546 ms | 0.456 ms |
66+
67+
ButchSize = 1_000
68+
69+
| Method | Mean | Error | StdDev |
70+
|-------------------------------|---------:|---------:|---------:|
71+
| FlushAsync_BulkUpsertImporter | 583.3 us | 11.32 us | 13.03 us |

src/Ydb.Sdk/test/Ydb.Sdk.Ado.Tests/YdbConnectionTests.cs

Lines changed: 30 additions & 33 deletions
Original file line numberDiff line numberDiff line change
@@ -316,21 +316,22 @@ public async Task BulkUpsertImporter_HappyPath_Add_Flush()
316316
{
317317
await using (var createCmd = conn.CreateCommand())
318318
{
319-
createCmd.CommandText = $@"
320-
CREATE TABLE {tableName} (
321-
Id Int32,
322-
Name Utf8,
323-
PRIMARY KEY (Id)
324-
)";
319+
createCmd.CommandText = $"""
320+
CREATE TABLE {tableName} (
321+
Id Int32,
322+
Name Utf8,
323+
PRIMARY KEY (Id)
324+
)
325+
""";
325326
await createCmd.ExecuteNonQueryAsync();
326327
}
327328

328329
var columns = new[] { "Id", "Name" };
329330

330331
var importer = conn.BeginBulkUpsertImport(tableName, columns);
331332

332-
await importer.AddRowAsync([YdbValue.MakeInt32(1), YdbValue.MakeUtf8("Alice")]);
333-
await importer.AddRowAsync([YdbValue.MakeInt32(2), YdbValue.MakeUtf8("Bob")]);
333+
await importer.AddRowAsync(YdbValue.MakeInt32(1), YdbValue.MakeUtf8("Alice"));
334+
await importer.AddRowAsync(YdbValue.MakeInt32(2), YdbValue.MakeUtf8("Bob"));
334335
await importer.FlushAsync();
335336

336337
await using (var checkCmd = conn.CreateCommand())
@@ -341,8 +342,8 @@ PRIMARY KEY (Id)
341342
}
342343

343344
importer = conn.BeginBulkUpsertImport(tableName, columns);
344-
await importer.AddRowAsync([YdbValue.MakeInt32(3), YdbValue.MakeUtf8("Charlie")]);
345-
await importer.AddRowAsync([YdbValue.MakeInt32(4), YdbValue.MakeUtf8("Diana")]);
345+
await importer.AddRowAsync(YdbValue.MakeInt32(3), YdbValue.MakeUtf8("Charlie"));
346+
await importer.AddRowAsync(YdbValue.MakeInt32(4), YdbValue.MakeUtf8("Diana"));
346347
await importer.FlushAsync();
347348

348349
await using (var checkCmd = conn.CreateCommand())
@@ -375,28 +376,22 @@ public async Task BulkUpsertImporter_ThrowsOnInvalidRowCount()
375376
{
376377
await using (var createCmd = conn.CreateCommand())
377378
{
378-
createCmd.CommandText = $@"
379-
CREATE TABLE {tableName} (
380-
Id Int32,
381-
Name Utf8,
382-
PRIMARY KEY (Id)
383-
)";
379+
createCmd.CommandText = $"""
380+
CREATE TABLE {tableName} (
381+
Id Int32,
382+
Name Utf8,
383+
PRIMARY KEY (Id)
384+
)
385+
""";
384386
await createCmd.ExecuteNonQueryAsync();
385387
}
386388

387389
var columns = new[] { "Id", "Name" };
388390

389391
var importer = conn.BeginBulkUpsertImport(tableName, columns);
390392

391-
var badRow = new object?[] { YdbValue.MakeInt32(1) };
392-
await Assert.ThrowsAsync<ArgumentException>(async () => await importer.AddRowAsync([badRow]));
393-
394-
await Assert.ThrowsAsync<ArgumentException>(async () =>
395-
{
396-
await importer.AddRowAsync([
397-
new object?[] { YdbValue.MakeInt32(2) }
398-
]);
399-
});
393+
await Assert.ThrowsAsync<ArgumentException>(async () => await importer.AddRowAsync(1));
394+
await Assert.ThrowsAsync<ArgumentException>(async () => await importer.AddRowAsync(2));
400395
}
401396
finally
402397
{
@@ -418,11 +413,13 @@ public async Task BulkUpsertImporter_MultipleImporters_Parallel()
418413
foreach (var table in new[] { table1, table2 })
419414
{
420415
await using var createCmd = conn.CreateCommand();
421-
createCmd.CommandText = $@"CREATE TABLE {table} (
422-
Id Int32,
423-
Name Utf8,
424-
PRIMARY KEY (Id)
425-
)";
416+
createCmd.CommandText = $"""
417+
CREATE TABLE {table} (
418+
Id Int32,
419+
Name Utf8,
420+
PRIMARY KEY (Id)
421+
)
422+
""";
426423
await createCmd.ExecuteNonQueryAsync();
427424
}
428425

@@ -433,7 +430,7 @@ await Task.WhenAll(
433430
{
434431
var importer = conn.BeginBulkUpsertImport(table1, columns);
435432
var rows = Enumerable.Range(0, 20)
436-
.Select(i => new object?[] { YdbValue.MakeInt32(i), YdbValue.MakeUtf8($"A{i}") })
433+
.Select(i => new object[] { YdbValue.MakeInt32(i), YdbValue.MakeUtf8($"A{i}") })
437434
.ToArray();
438435
foreach (var row in rows)
439436
await importer.AddRowAsync(row);
@@ -443,7 +440,7 @@ await Task.WhenAll(
443440
{
444441
var importer = conn.BeginBulkUpsertImport(table2, columns);
445442
var rows = Enumerable.Range(0, 20)
446-
.Select(i => new object?[] { YdbValue.MakeInt32(i), YdbValue.MakeUtf8($"B{i}") })
443+
.Select(i => new object[] { YdbValue.MakeInt32(i), YdbValue.MakeUtf8($"B{i}") })
447444
.ToArray();
448445
foreach (var row in rows)
449446
await importer.AddRowAsync(row);
@@ -482,7 +479,7 @@ public async Task BulkUpsertImporter_ThrowsOnNonexistentTable()
482479

483480
var importer = conn.BeginBulkUpsertImport(tableName, columns);
484481

485-
await importer.AddRowAsync([YdbValue.MakeInt32(1), YdbValue.MakeUtf8("NotExists")]);
482+
await importer.AddRowAsync(YdbValue.MakeInt32(1), YdbValue.MakeUtf8("NotExists"));
486483

487484
await Assert.ThrowsAsync<YdbException>(async () => { await importer.FlushAsync(); });
488485
}

0 commit comments

Comments
 (0)