Skip to content

Commit 5ac5127

Browse files
committed
feat: make BulkUpsert API more user-friendly
1 parent 8d170b8 commit 5ac5127

File tree

6 files changed

+53
-63
lines changed

6 files changed

+53
-63
lines changed

src/Ydb.Sdk/src/Ado/BulkUpsert/TypedValueFactory.cs

Lines changed: 4 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -7,22 +7,17 @@ internal static class TypedValueFactory
77
public static TypedValue FromObjects<T>(
88
IReadOnlyCollection<T> rows,
99
IReadOnlyList<string> columnNames,
10-
IReadOnlyList<Func<T, YdbValue>> columnSelectors)
10+
IReadOnlyList<Func<T, object?>> columnSelectors)
1111
{
12-
if (rows.Count == 0)
13-
throw new ArgumentException("Rows collection is empty.", nameof(rows));
14-
if (columnNames.Count != columnSelectors.Count)
15-
throw new ArgumentException("Column names count must match selectors count.");
16-
1712
var structs = new List<YdbValue>(rows.Count);
1813

1914
foreach (var row in rows)
2015
{
2116
var members = new Dictionary<string, YdbValue>(columnNames.Count);
22-
for (int i = 0; i < columnNames.Count; i++)
17+
for (var i = 0; i < columnNames.Count; i++)
2318
{
24-
var value = columnSelectors[i](row);
25-
members[columnNames[i]] = value;
19+
var val = columnSelectors[i](row);
20+
members[columnNames[i]] = new YdbParameter { Value = val }.YdbValue;
2621
}
2722

2823
structs.Add(YdbValue.MakeStruct(members));

src/Ydb.Sdk/src/Ado/BulkUpsert/YdbBulkUpsertImporter.cs

Lines changed: 13 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,4 @@
11
using Ydb.Sdk.Services.Table;
2-
using Ydb.Sdk.Value;
32

43
namespace Ydb.Sdk.Ado.BulkUpsert;
54

@@ -8,23 +7,27 @@ public sealed class YdbBulkUpsertImporter<T> : IAsyncDisposable
87
private readonly YdbConnection _connection;
98
private readonly string _tablePath;
109
private readonly IReadOnlyList<string> _columns;
11-
private readonly IReadOnlyList<Func<T, YdbValue>> _selectors;
12-
private readonly int _maxRowsInBatch;
10+
private readonly Func<T, object?>[] _selectors;
1311
private readonly List<T> _buffer = new();
1412
private bool _isCompleted;
13+
private readonly int _maxRowsInBatch;
1514

1615
public YdbBulkUpsertImporter(
1716
YdbConnection connection,
1817
string tablePath,
1918
IReadOnlyList<string> columns,
20-
IReadOnlyList<Func<T, YdbValue>> selectors,
2119
int maxRowsInBatch = 1000)
2220
{
23-
_connection = connection ?? throw new ArgumentNullException(nameof(connection));
24-
_tablePath = tablePath ?? throw new ArgumentNullException(nameof(tablePath));
25-
_columns = columns ?? throw new ArgumentNullException(nameof(columns));
26-
_selectors = selectors ?? throw new ArgumentNullException(nameof(selectors));
21+
_connection = connection;
22+
_tablePath = tablePath;
23+
_columns = columns;
2724
_maxRowsInBatch = maxRowsInBatch;
25+
26+
var props = columns.Select(name =>
27+
typeof(T).GetProperty(name) ??
28+
throw new ArgumentException($"Type {typeof(T).Name} does not have a property '{name}'")
29+
).ToArray();
30+
_selectors = props.Select(p => (Func<T, object?>)(x => p.GetValue(x))).ToArray();
2831
}
2932

3033
public async Task WriteRowAsync(T row, CancellationToken cancellationToken = default)
@@ -35,12 +38,10 @@ public async Task WriteRowAsync(T row, CancellationToken cancellationToken = def
3538
_buffer.Add(row);
3639

3740
if (_buffer.Count >= _maxRowsInBatch)
38-
{
3941
await FlushAsync(cancellationToken).ConfigureAwait(false);
40-
}
4142
}
4243

43-
public async Task FlushAsync(CancellationToken cancellationToken = default)
44+
private async Task FlushAsync(CancellationToken cancellationToken = default)
4445
{
4546
if (_buffer.Count == 0)
4647
return;
@@ -49,14 +50,13 @@ await _connection.BulkUpsertWithRetry(
4950
_tablePath,
5051
_buffer,
5152
_columns,
52-
_selectors,
5353
cancellationToken
5454
).ConfigureAwait(false);
5555

5656
_buffer.Clear();
5757
}
5858

59-
public async Task CompleteAsync(CancellationToken cancellationToken = default)
59+
private async Task CompleteAsync(CancellationToken cancellationToken = default)
6060
{
6161
if (_isCompleted) return;
6262
await FlushAsync(cancellationToken).ConfigureAwait(false);

src/Ydb.Sdk/src/Ado/YdbConnection.cs

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -5,7 +5,6 @@
55
using Ydb.Sdk.Ado.BulkUpsert;
66
using Ydb.Sdk.Ado.Session;
77
using Ydb.Sdk.Services.Query;
8-
using Ydb.Sdk.Value;
98
using Ydb.Table;
109
using Ydb.Table.V1;
1110
using static System.Data.IsolationLevel;
@@ -61,7 +60,7 @@ internal async Task BulkUpsertInternalAsync<T>(
6160
string tablePath,
6261
IReadOnlyCollection<T> rows,
6362
IReadOnlyList<string> columns,
64-
IReadOnlyList<Func<T, YdbValue>> selectors,
63+
IReadOnlyList<Func<T, object?>> selectors,
6564
CancellationToken cancellationToken)
6665
{
6766
if (CurrentTransaction is { Completed: false })

src/Ydb.Sdk/src/Ado/YdbDataSource.cs

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -81,7 +81,6 @@ public async Task<YdbBulkUpsertImporter<T>> BeginBulkUpsertAsync<T>(
8181
conn,
8282
tablePath,
8383
columns,
84-
selectors,
8584
maxRowsInBatch
8685
);
8786
}
Lines changed: 14 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
1+
using System.Reflection;
12
using Ydb.Sdk.Ado;
2-
using Ydb.Sdk.Value;
33

44
namespace Ydb.Sdk.Services.Table;
55

@@ -10,13 +10,24 @@ public static async Task BulkUpsertWithRetry<T>(
1010
string tablePath,
1111
IReadOnlyCollection<T> rows,
1212
IReadOnlyList<string> columns,
13-
IReadOnlyList<Func<T, YdbValue>> selectors,
14-
CancellationToken cancellationToken = default) =>
13+
CancellationToken cancellationToken = default)
14+
{
15+
var type = typeof(T);
16+
var props = columns
17+
.Select(col => type.GetProperty(col) ??
18+
throw new ArgumentException($"Type {typeof(T).Name} does not have a property '{col}'"))
19+
.ToArray();
20+
21+
var selectors = props
22+
.Select<PropertyInfo, Func<T, object?>>(p => x => p.GetValue(x))
23+
.ToArray();
24+
1525
await connection.BulkUpsertInternalAsync(
1626
tablePath,
1727
rows,
1828
columns,
1929
selectors,
2030
cancellationToken
2131
);
32+
}
2233
}

src/Ydb.Sdk/test/Ydb.Sdk.Ado.Tests/YdbConnectionTests.cs

Lines changed: 21 additions & 35 deletions
Original file line numberDiff line numberDiff line change
@@ -329,11 +329,11 @@ public async Task BulkUpsert_HappyPath_C()
329329
await using (var createCmd = conn.CreateCommand())
330330
{
331331
createCmd.CommandText = $@"
332-
CREATE TABLE {tableName} (
333-
Id Int32,
334-
Name Utf8,
335-
PRIMARY KEY (Id)
336-
)";
332+
CREATE TABLE {tableName} (
333+
Id Int32,
334+
Name Utf8,
335+
PRIMARY KEY (Id)
336+
)";
337337
await createCmd.ExecuteNonQueryAsync();
338338
}
339339

@@ -344,13 +344,8 @@ PRIMARY KEY (Id)
344344
.ToList();
345345

346346
var columns = new[] { "Id", "Name" };
347-
var selectors = new Func<TestEntity, YdbValue>[]
348-
{
349-
x => YdbValue.MakeInt32(x.Id),
350-
x => YdbValue.MakeUtf8(x.Name)
351-
};
352347

353-
await conn.BulkUpsertWithRetry(absTablePath, rows, columns, selectors, default);
348+
await conn.BulkUpsertWithRetry(absTablePath, rows, columns, default);
354349

355350
await using (var checkCmd = conn.CreateCommand())
356351
{
@@ -379,36 +374,31 @@ public async Task BulkUpsert_InsertsNewRows_С()
379374
await using (var createCmd = conn.CreateCommand())
380375
{
381376
createCmd.CommandText = $@"
382-
CREATE TABLE {tableName} (
383-
Id Int32,
384-
Name Utf8,
385-
PRIMARY KEY (Id)
386-
)";
377+
CREATE TABLE {tableName} (
378+
Id Int32,
379+
Name Utf8,
380+
PRIMARY KEY (Id)
381+
)";
387382
await createCmd.ExecuteNonQueryAsync();
388383
}
389384

390385
await Task.Delay(500);
391386

392387
var columns = new[] { "Id", "Name" };
393-
var selectors = new Func<TestEntity, YdbValue>[]
394-
{
395-
x => YdbValue.MakeInt32(x.Id),
396-
x => YdbValue.MakeUtf8(x.Name)
397-
};
398388

399389
var firstRows = new List<TestEntity>
400390
{
401391
new() { Id = 1, Name = "Alice" },
402392
new() { Id = 2, Name = "Bob" }
403393
};
404-
await conn.BulkUpsertWithRetry(absTablePath, firstRows, columns, selectors, default);
394+
await conn.BulkUpsertWithRetry(absTablePath, firstRows, columns, default);
405395

406396
var newRows = new List<TestEntity>
407397
{
408398
new() { Id = 3, Name = "Charlie" },
409399
new() { Id = 4, Name = "Diana" }
410400
};
411-
await conn.BulkUpsertWithRetry(absTablePath, newRows, columns, selectors, default);
401+
await conn.BulkUpsertWithRetry(absTablePath, newRows, columns, default);
412402

413403
await using (var selectCmd = conn.CreateCommand())
414404
{
@@ -447,28 +437,23 @@ public async Task BulkUpsert_UpdatesExistingRows_С()
447437
await using (var createCmd = conn.CreateCommand())
448438
{
449439
createCmd.CommandText = $@"
450-
CREATE TABLE {tableName} (
451-
Id Int32,
452-
Name Utf8,
453-
PRIMARY KEY (Id)
454-
)";
440+
CREATE TABLE {tableName} (
441+
Id Int32,
442+
Name Utf8,
443+
PRIMARY KEY (Id)
444+
)";
455445
await createCmd.ExecuteNonQueryAsync();
456446
}
457447

458448
await Task.Delay(500);
459449

460450
var columns = new[] { "Id", "Name" };
461-
var selectors = new Func<TestEntity, YdbValue>[]
462-
{
463-
x => YdbValue.MakeInt32(x.Id),
464-
x => YdbValue.MakeUtf8(x.Name)
465-
};
466451

467452
var row = new TestEntity { Id = 1, Name = "Alice" };
468-
await conn.BulkUpsertWithRetry(absTablePath, [row], columns, selectors, default);
453+
await conn.BulkUpsertWithRetry(absTablePath, new[] { row }, columns, default);
469454

470455
var updated = new TestEntity { Id = 1, Name = "Alice Updated" };
471-
await conn.BulkUpsertWithRetry(absTablePath, [updated], columns, selectors, default);
456+
await conn.BulkUpsertWithRetry(absTablePath, new[] { updated }, columns, default);
472457

473458
await using (var selectCmd = conn.CreateCommand())
474459
{
@@ -483,4 +468,5 @@ PRIMARY KEY (Id)
483468
await dropCmd.ExecuteNonQueryAsync();
484469
}
485470
}
471+
486472
}

0 commit comments

Comments
 (0)