Skip to content

Commit 5d72f78

Browse files
LiamHamstersKirillKurdyukov
authored andcommitted
Add: BulkUpsertImporter (#485)
1 parent c99713d commit 5d72f78

File tree

5 files changed

+307
-0
lines changed

5 files changed

+307
-0
lines changed

src/Ydb.Sdk/CHANGELOG.md

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,3 +1,4 @@
1+
- ADO.NET: Added `BeginBulkUpsertImport` for batch upsert operations with transaction checks and integration tests.
12
- Optimization: On BadSession, do not invoke the `DeleteSession()` method.
23
- Canceling AttachStream after calling the `DeleteSession` method.
34
- Fixed bug: fixed issue where session was not deleted (`ClientTransportTimeout`).
Lines changed: 97 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,97 @@
1+
using Google.Protobuf.Collections;
2+
using Ydb.Sdk.Ado.Internal;
3+
using Ydb.Sdk.Value;
4+
using Ydb.Table;
5+
using Ydb.Table.V1;
6+
7+
namespace Ydb.Sdk.Ado.BulkUpsert;
8+
9+
public sealed class BulkUpsertImporter : IBulkUpsertImporter
10+
{
11+
private readonly IDriver _driver;
12+
private readonly string _tablePath;
13+
private readonly IReadOnlyList<string> _columns;
14+
private readonly int _maxBytes;
15+
private readonly RepeatedField<Ydb.Value> _rows = new();
16+
private readonly CancellationToken _cancellationToken;
17+
private StructType? _structType;
18+
private int _currentBytes;
19+
20+
internal BulkUpsertImporter(
21+
IDriver driver,
22+
string tableName,
23+
IReadOnlyList<string> columns,
24+
int maxBytes,
25+
CancellationToken cancellationToken = default)
26+
{
27+
_driver = driver;
28+
_tablePath = tableName;
29+
_columns = columns;
30+
_maxBytes = maxBytes / 2;
31+
_cancellationToken = cancellationToken;
32+
}
33+
34+
public async ValueTask AddRowAsync(object?[] values)
35+
{
36+
if (values.Length != _columns.Count)
37+
throw new ArgumentException("Values count must match columns count", nameof(values));
38+
39+
var ydbValues = values.Select(v =>
40+
v as YdbValue ?? (v is YdbParameter param ? param.YdbValue : new YdbParameter { Value = v }.YdbValue)
41+
).ToArray();
42+
43+
var protoStruct = new Ydb.Value();
44+
foreach (var value in ydbValues)
45+
protoStruct.Items.Add(value.GetProto().Value);
46+
47+
var rowSize = protoStruct.CalculateSize();
48+
49+
if (_currentBytes + rowSize > _maxBytes && _rows.Count > 0)
50+
{
51+
await FlushAsync();
52+
}
53+
54+
_rows.Add(protoStruct);
55+
_currentBytes += rowSize;
56+
57+
_structType ??= new StructType
58+
{
59+
Members =
60+
{
61+
_columns.Select((col, i) => new StructMember
62+
{
63+
Name = col,
64+
Type = ydbValues[i].GetProto().Type
65+
})
66+
}
67+
};
68+
}
69+
70+
public async ValueTask FlushAsync()
71+
{
72+
if (_rows.Count == 0) return;
73+
if (_structType == null)
74+
throw new InvalidOperationException("structType is undefined");
75+
76+
var typedValue = new TypedValue
77+
{
78+
Type = new Type { ListType = new ListType { Item = new Type { StructType = _structType } } },
79+
Value = new Ydb.Value { Items = { _rows } }
80+
};
81+
82+
var req = new BulkUpsertRequest { Table = _tablePath, Rows = typedValue };
83+
84+
var resp = await _driver.UnaryCall(
85+
TableService.BulkUpsertMethod,
86+
req,
87+
new GrpcRequestSettings { CancellationToken = _cancellationToken }
88+
).ConfigureAwait(false);
89+
90+
var operation = resp.Operation;
91+
if (operation.Status.IsNotSuccess())
92+
throw YdbException.FromServer(operation.Status, operation.Issues);
93+
94+
_rows.Clear();
95+
_currentBytes = 0;
96+
}
97+
}
Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,8 @@
1+
namespace Ydb.Sdk.Ado.BulkUpsert;
2+
3+
public interface IBulkUpsertImporter
4+
{
5+
ValueTask AddRowAsync(object?[] row);
6+
7+
ValueTask FlushAsync();
8+
}

src/Ydb.Sdk/src/Ado/YdbConnection.cs

Lines changed: 18 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,7 @@
11
using System.Data;
22
using System.Data.Common;
33
using System.Diagnostics.CodeAnalysis;
4+
using Ydb.Sdk.Ado.BulkUpsert;
45
using Ydb.Sdk.Ado.Session;
56
using Ydb.Sdk.Services.Query;
67
using static System.Data.IsolationLevel;
@@ -52,6 +53,23 @@ public YdbConnection(YdbConnectionStringBuilder connectionStringBuilder)
5253
ConnectionStringBuilder = connectionStringBuilder;
5354
}
5455

56+
public IBulkUpsertImporter BeginBulkUpsertImport(
57+
string name,
58+
IReadOnlyList<string> columns,
59+
CancellationToken cancellationToken = default)
60+
{
61+
ThrowIfConnectionClosed();
62+
if (CurrentTransaction is { Completed: false })
63+
throw new InvalidOperationException("BulkUpsert cannot be used inside an active transaction.");
64+
65+
var database = ConnectionStringBuilder.Database.TrimEnd('/');
66+
var tablePath = name.StartsWith(database) ? name : $"{database}/{name}";
67+
68+
var maxBytes = ConnectionStringBuilder.MaxSendMessageSize;
69+
70+
return new BulkUpsertImporter(Session.Driver, tablePath, columns, maxBytes, cancellationToken);
71+
}
72+
5573
protected override YdbTransaction BeginDbTransaction(IsolationLevel isolationLevel)
5674
{
5775
ThrowIfConnectionClosed();

src/Ydb.Sdk/test/Ydb.Sdk.Ado.Tests/YdbConnectionTests.cs

Lines changed: 183 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -314,4 +314,187 @@ private List<Task> GenerateTasks(string connectionString) => Enumerable.Range(0,
314314

315315
protected override async Task OnDisposeAsync() =>
316316
await YdbConnection.ClearPool(new YdbConnection(_connectionStringTls));
317+
318+
[Fact]
319+
public async Task BulkUpsertImporter_HappyPath_Add_Flush()
320+
{
321+
var tableName = $"BulkImporter_{Guid.NewGuid():N}";
322+
323+
var conn = new YdbConnection(_connectionStringTls);
324+
await conn.OpenAsync();
325+
try
326+
{
327+
await using (var createCmd = conn.CreateCommand())
328+
{
329+
createCmd.CommandText = $@"
330+
CREATE TABLE {tableName} (
331+
Id Int32,
332+
Name Utf8,
333+
PRIMARY KEY (Id)
334+
)";
335+
await createCmd.ExecuteNonQueryAsync();
336+
}
337+
338+
var columns = new[] { "Id", "Name" };
339+
340+
var importer = conn.BeginBulkUpsertImport(tableName, columns);
341+
342+
await importer.AddRowAsync([YdbValue.MakeInt32(1), YdbValue.MakeUtf8("Alice")]);
343+
await importer.AddRowAsync([YdbValue.MakeInt32(2), YdbValue.MakeUtf8("Bob")]);
344+
await importer.FlushAsync();
345+
346+
await using (var checkCmd = conn.CreateCommand())
347+
{
348+
checkCmd.CommandText = $"SELECT COUNT(*) FROM {tableName}";
349+
var count = Convert.ToInt32(await checkCmd.ExecuteScalarAsync());
350+
Assert.Equal(2, count);
351+
}
352+
353+
importer = conn.BeginBulkUpsertImport(tableName, columns);
354+
await importer.AddRowAsync([YdbValue.MakeInt32(3), YdbValue.MakeUtf8("Charlie")]);
355+
await importer.AddRowAsync([YdbValue.MakeInt32(4), YdbValue.MakeUtf8("Diana")]);
356+
await importer.FlushAsync();
357+
358+
await using (var checkCmd = conn.CreateCommand())
359+
{
360+
checkCmd.CommandText = $"SELECT Name FROM {tableName} ORDER BY Id";
361+
var names = new List<string>();
362+
await using var reader = await checkCmd.ExecuteReaderAsync();
363+
while (await reader.ReadAsync())
364+
names.Add(reader.GetString(0));
365+
Assert.Contains("Alice", names);
366+
Assert.Contains("Bob", names);
367+
Assert.Contains("Charlie", names);
368+
Assert.Contains("Diana", names);
369+
}
370+
}
371+
finally
372+
{
373+
await using var dropCmd = conn.CreateCommand();
374+
dropCmd.CommandText = $"DROP TABLE {tableName}";
375+
await dropCmd.ExecuteNonQueryAsync();
376+
}
377+
}
378+
379+
[Fact]
380+
public async Task BulkUpsertImporter_ThrowsOnInvalidRowCount()
381+
{
382+
var tableName = $"BulkImporter_{Guid.NewGuid():N}";
383+
var conn = new YdbConnection(_connectionStringTls);
384+
await conn.OpenAsync();
385+
try
386+
{
387+
await using (var createCmd = conn.CreateCommand())
388+
{
389+
createCmd.CommandText = $@"
390+
CREATE TABLE {tableName} (
391+
Id Int32,
392+
Name Utf8,
393+
PRIMARY KEY (Id)
394+
)";
395+
await createCmd.ExecuteNonQueryAsync();
396+
}
397+
398+
var columns = new[] { "Id", "Name" };
399+
400+
var importer = conn.BeginBulkUpsertImport(tableName, columns);
401+
402+
var badRow = new object?[] { YdbValue.MakeInt32(1) };
403+
await Assert.ThrowsAsync<ArgumentException>(async () => await importer.AddRowAsync([badRow]));
404+
405+
await Assert.ThrowsAsync<ArgumentException>(async () =>
406+
{
407+
await importer.AddRowAsync([
408+
new object?[] { YdbValue.MakeInt32(2) }
409+
]);
410+
});
411+
}
412+
finally
413+
{
414+
await using var dropCmd = conn.CreateCommand();
415+
dropCmd.CommandText = $"DROP TABLE {tableName}";
416+
await dropCmd.ExecuteNonQueryAsync();
417+
}
418+
}
419+
420+
[Fact]
421+
public async Task BulkUpsertImporter_MultipleImporters_Parallel()
422+
{
423+
var table1 = $"BulkImporter_{Guid.NewGuid():N}_1";
424+
var table2 = $"BulkImporter_{Guid.NewGuid():N}_2";
425+
426+
var conn = new YdbConnection(_connectionStringTls);
427+
await conn.OpenAsync();
428+
try
429+
{
430+
foreach (var table in new[] { table1, table2 })
431+
{
432+
await using var createCmd = conn.CreateCommand();
433+
createCmd.CommandText = $@"CREATE TABLE {table} (
434+
Id Int32,
435+
Name Utf8,
436+
PRIMARY KEY (Id)
437+
)";
438+
await createCmd.ExecuteNonQueryAsync();
439+
}
440+
441+
var columns = new[] { "Id", "Name" };
442+
443+
await Task.WhenAll(
444+
Task.Run(async () =>
445+
{
446+
var importer = conn.BeginBulkUpsertImport(table1, columns);
447+
var rows = Enumerable.Range(0, 20)
448+
.Select(i => new object?[] { YdbValue.MakeInt32(i), YdbValue.MakeUtf8($"A{i}") })
449+
.ToArray();
450+
foreach (var row in rows)
451+
await importer.AddRowAsync(row);
452+
await importer.FlushAsync();
453+
}),
454+
Task.Run(async () =>
455+
{
456+
var importer = conn.BeginBulkUpsertImport(table2, columns);
457+
var rows = Enumerable.Range(0, 20)
458+
.Select(i => new object?[] { YdbValue.MakeInt32(i), YdbValue.MakeUtf8($"B{i}") })
459+
.ToArray();
460+
foreach (var row in rows)
461+
await importer.AddRowAsync(row);
462+
await importer.FlushAsync();
463+
})
464+
);
465+
466+
foreach (var table in new[] { table1, table2 })
467+
{
468+
await using var checkCmd = conn.CreateCommand();
469+
checkCmd.CommandText = $"SELECT COUNT(*) FROM {table}";
470+
var count = Convert.ToInt32(await checkCmd.ExecuteScalarAsync());
471+
Assert.Equal(20, count);
472+
}
473+
}
474+
finally
475+
{
476+
foreach (var table in new[] { table1, table2 })
477+
{
478+
await using var dropCmd = conn.CreateCommand();
479+
dropCmd.CommandText = $"DROP TABLE {table}";
480+
await dropCmd.ExecuteNonQueryAsync();
481+
}
482+
}
483+
}
484+
485+
[Fact]
486+
public async Task BulkUpsertImporter_ThrowsOnNonexistentTable()
487+
{
488+
var tableName = $"Nonexistent_{Guid.NewGuid():N}";
489+
var conn = new YdbConnection(_connectionStringTls);
490+
await conn.OpenAsync();
491+
492+
var columns = new[] { "Id", "Name" };
493+
494+
var importer = conn.BeginBulkUpsertImport(tableName, columns);
495+
496+
await importer.AddRowAsync([YdbValue.MakeInt32(1), YdbValue.MakeUtf8("NotExists")]);
497+
498+
await Assert.ThrowsAsync<YdbException>(async () => { await importer.FlushAsync(); });
499+
}
317500
}

0 commit comments

Comments
 (0)