Skip to content

Commit 28b039b

Browse files
Dev: added BulkUpsertImporterBenchmark.cs
1 parent 4786134 commit 28b039b

File tree

5 files changed

+87
-8
lines changed

5 files changed

+87
-8
lines changed

src/Ydb.Sdk/CHANGELOG.md

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -3,7 +3,7 @@
33
- ADO.NET: Added `MinPoolSize` setting to keep a minimum number of sessions ready in the PoolingSessionSource.
44
- ADO.NET: Added `SessionIdleTimeout` to remove idle sessions from the PoolingSessionSource automatically.
55
- ADO.NET: Made `PoolingSessionSource` faster and more reliable by using a lock-free LIFO stack.
6-
- ADO.NET: Added `BeginBulkUpsertImport` for batch upsert operations with transaction checks and integration tests.
6+
- ADO.NET: Added `BeginBulkUpsertImport` for batch upsert operations with transaction checks.
77
- Optimization: On BadSession, do not invoke the `DeleteSession()` method.
88
- Canceling AttachStream after calling the `DeleteSession` method.
99
- Fixed bug: fixed issue where session was not deleted (`ClientTransportTimeout`).

src/Ydb.Sdk/src/Ado/BulkUpsert/BulkUpsertImporter.cs

Lines changed: 5 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -11,7 +11,7 @@ public sealed class BulkUpsertImporter : IBulkUpsertImporter
1111
private readonly IDriver _driver;
1212
private readonly string _tablePath;
1313
private readonly IReadOnlyList<string> _columns;
14-
private readonly int _maxBytes;
14+
private readonly int _maxBatchByteSize;
1515
private readonly RepeatedField<Ydb.Value> _rows = new();
1616
private readonly CancellationToken _cancellationToken;
1717
private StructType? _structType;
@@ -21,17 +21,17 @@ internal BulkUpsertImporter(
2121
IDriver driver,
2222
string tableName,
2323
IReadOnlyList<string> columns,
24-
int maxBytes,
24+
int maxBatchByteSize,
2525
CancellationToken cancellationToken = default)
2626
{
2727
_driver = driver;
2828
_tablePath = tableName;
2929
_columns = columns;
30-
_maxBytes = maxBytes / 2;
30+
_maxBatchByteSize = maxBatchByteSize / 2;
3131
_cancellationToken = cancellationToken;
3232
}
3333

34-
public async ValueTask AddRowAsync(object?[] values)
34+
public async ValueTask AddRowAsync(params object[] values)
3535
{
3636
if (values.Length != _columns.Count)
3737
throw new ArgumentException("Values count must match columns count", nameof(values));
@@ -46,7 +46,7 @@ public async ValueTask AddRowAsync(object?[] values)
4646

4747
var rowSize = protoStruct.CalculateSize();
4848

49-
if (_currentBytes + rowSize > _maxBytes && _rows.Count > 0)
49+
if (_currentBytes + rowSize > _maxBatchByteSize && _rows.Count > 0)
5050
{
5151
await FlushAsync();
5252
}

src/Ydb.Sdk/src/Ado/BulkUpsert/IBulkUpsertImporter.cs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -2,7 +2,7 @@ namespace Ydb.Sdk.Ado.BulkUpsert;
22

33
public interface IBulkUpsertImporter
44
{
5-
ValueTask AddRowAsync(object?[] row);
5+
ValueTask AddRowAsync(params object[] row);
66

77
ValueTask FlushAsync();
88
}
Lines changed: 65 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,65 @@
1+
using BenchmarkDotNet.Attributes;
2+
using Grpc.Core;
3+
using Microsoft.Extensions.Logging;
4+
using Ydb.Sdk.Ado.BulkUpsert;
5+
using Ydb.Table;
6+
7+
namespace Ydb.Sdk.Ado.Benchmarks;
8+
9+
public class BulkUpsertImporterBenchmark
10+
{
11+
private const int BatchSize = 15000;
12+
private readonly int _maxBatchByteSize = new YdbConnectionStringBuilder().MaxSendMessageSize;
13+
private IDriver _driver = null!;
14+
private IList<(Guid Guid, string String, int Int, double Double, DateTime DateTime)> _rows = null!;
15+
16+
[GlobalSetup]
17+
public void Setup()
18+
{
19+
_driver = new BulkUpsertMockDriver();
20+
_rows = Enumerable.Range(0, BatchSize)
21+
.Select(i => new ValueTuple<Guid, string, int, double, DateTime>(Guid.NewGuid(),
22+
Guid.NewGuid() + "_" + Guid.NewGuid(), i, i * 1.324, DateTime.Now))
23+
.ToList();
24+
}
25+
26+
[Benchmark]
27+
public async Task FlushAsync_BulkUpsertImporter()
28+
{
29+
var bulkUpsertImporter = new BulkUpsertImporter(_driver, "table_name", ["c1", "c2", "c3", "c4", "c5"],
30+
_maxBatchByteSize, CancellationToken.None);
31+
32+
foreach (var row in _rows)
33+
{
34+
await bulkUpsertImporter.AddRowAsync(row.Guid, row.String, row.Int, row.Double, row.DateTime);
35+
}
36+
await bulkUpsertImporter.FlushAsync();
37+
}
38+
}
39+
40+
internal class BulkUpsertMockDriver : IDriver
41+
{
42+
public ValueTask DisposeAsync() => throw new NotImplementedException();
43+
44+
public void Dispose() => throw new NotImplementedException();
45+
46+
public Task<TResponse>
47+
UnaryCall<TRequest, TResponse>(Method<TRequest, TResponse> method, TRequest request,
48+
GrpcRequestSettings settings) where TRequest : class where TResponse : class =>
49+
Task.FromResult((TResponse)(object)new BulkUpsertResponse
50+
{
51+
Operation = new Operations.Operation { Status = StatusIds.Types.StatusCode.Success }
52+
});
53+
54+
public ValueTask<IServerStream<TResponse>> ServerStreamCall<TRequest,
55+
TResponse>(Method<TRequest, TResponse> method, TRequest request, GrpcRequestSettings settings) where
56+
TRequest : class
57+
where TResponse : class => throw new NotImplementedException();
58+
59+
public ValueTask<IBidirectionalStream<TRequest,
60+
TResponse>> BidirectionalStreamCall<TRequest, TResponse>(Method<TRequest, TResponse> method,
61+
GrpcRequestSettings settings) where TRequest : class where TResponse : class => throw new
62+
NotImplementedException();
63+
64+
public ILoggerFactory LoggerFactory { get; }
65+
}

src/Ydb.Sdk/test/Ydb.Sdk.Ado.Benchmarks/README.md

Lines changed: 15 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -54,4 +54,18 @@ DefaultJob : .NET 8.0.2 (8.0.224.6711), Arm64 RyuJIT AdvSIMD
5454
| HighContention_OpenClose | 91,700.72 ns | 1,803.206 ns | 3,842.780 ns | 204.6633 | 0.0007 | 5.0049 | - | 41951 B |
5555
| SessionReuse_Pattern | 117,545.11 ns | 2,226.365 ns | 4,014.595 ns | 220.0000 | 0.0001 | 1.5869 | - | 13656 B |
5656
| SessionReuse_HighContention_Pattern | 7,463,819.00 ns | 148,409.083 ns | 364,050.038 ns | 19044.6172 | 1.1719 | 765.6250 | 125.0000 | 6367528 B |
57-
| SessionReuse_HighIterations_Pattern | 70,844,972.06 ns | 1,400,128.942 ns | 3,589,066.009 ns | 200020.0000 | - | 750.0000 | - | 6407440 B |
57+
| SessionReuse_HighIterations_Pattern | 70,844,972.06 ns | 1,400,128.942 ns | 3,589,066.009 ns | 200020.0000 | - | 750.0000 | - | 6407440 B |
58+
59+
# BulkUpsertImporter Benchmarks
60+
61+
ButchSize = 15_000
62+
63+
| Method | Mean | Error | StdDev |
64+
|-------------------------------|---------:|---------:|---------:|
65+
| FlushAsync_BulkUpsertImporter | 32.41 ms | 0.546 ms | 0.456 ms |
66+
67+
ButchSize = 1_000
68+
69+
| Method | Mean | Error | StdDev |
70+
|-------------------------------|---------:|---------:|---------:|
71+
| FlushAsync_BulkUpsertImporter | 583.3 us | 11.32 us | 13.03 us |

0 commit comments

Comments
 (0)