Skip to content

Commit 2d1f392

Browse files
authored
Add: BulkUpsertImporter (#485)
1 parent 19dedc6 commit 2d1f392

File tree

5 files changed

+307
-0
lines changed

5 files changed

+307
-0
lines changed

src/Ydb.Sdk/CHANGELOG.md

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,3 +1,4 @@
1+
- ADO.NET: Added `BeginBulkUpsertImport` for batch upsert operations with transaction checks and integration tests.
12
- Optimization: On BadSession, do not invoke the `DeleteSession()` method.
23
- Canceling AttachStream after calling the `DeleteSession` method.
34
- Fixed bug: fixed issue where session was not deleted (`ClientTransportTimeout`).
Lines changed: 97 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,97 @@
1+
using Google.Protobuf.Collections;
2+
using Ydb.Sdk.Ado.Internal;
3+
using Ydb.Sdk.Value;
4+
using Ydb.Table;
5+
using Ydb.Table.V1;
6+
7+
namespace Ydb.Sdk.Ado.BulkUpsert;
8+
9+
public sealed class BulkUpsertImporter : IBulkUpsertImporter
10+
{
11+
private readonly IDriver _driver;
12+
private readonly string _tablePath;
13+
private readonly IReadOnlyList<string> _columns;
14+
private readonly int _maxBytes;
15+
private readonly RepeatedField<Ydb.Value> _rows = new();
16+
private readonly CancellationToken _cancellationToken;
17+
private StructType? _structType;
18+
private int _currentBytes;
19+
20+
internal BulkUpsertImporter(
21+
IDriver driver,
22+
string tableName,
23+
IReadOnlyList<string> columns,
24+
int maxBytes,
25+
CancellationToken cancellationToken = default)
26+
{
27+
_driver = driver;
28+
_tablePath = tableName;
29+
_columns = columns;
30+
_maxBytes = maxBytes / 2;
31+
_cancellationToken = cancellationToken;
32+
}
33+
34+
public async ValueTask AddRowAsync(object?[] values)
35+
{
36+
if (values.Length != _columns.Count)
37+
throw new ArgumentException("Values count must match columns count", nameof(values));
38+
39+
var ydbValues = values.Select(v =>
40+
v as YdbValue ?? (v is YdbParameter param ? param.YdbValue : new YdbParameter { Value = v }.YdbValue)
41+
).ToArray();
42+
43+
var protoStruct = new Ydb.Value();
44+
foreach (var value in ydbValues)
45+
protoStruct.Items.Add(value.GetProto().Value);
46+
47+
var rowSize = protoStruct.CalculateSize();
48+
49+
if (_currentBytes + rowSize > _maxBytes && _rows.Count > 0)
50+
{
51+
await FlushAsync();
52+
}
53+
54+
_rows.Add(protoStruct);
55+
_currentBytes += rowSize;
56+
57+
_structType ??= new StructType
58+
{
59+
Members =
60+
{
61+
_columns.Select((col, i) => new StructMember
62+
{
63+
Name = col,
64+
Type = ydbValues[i].GetProto().Type
65+
})
66+
}
67+
};
68+
}
69+
70+
public async ValueTask FlushAsync()
71+
{
72+
if (_rows.Count == 0) return;
73+
if (_structType == null)
74+
throw new InvalidOperationException("structType is undefined");
75+
76+
var typedValue = new TypedValue
77+
{
78+
Type = new Type { ListType = new ListType { Item = new Type { StructType = _structType } } },
79+
Value = new Ydb.Value { Items = { _rows } }
80+
};
81+
82+
var req = new BulkUpsertRequest { Table = _tablePath, Rows = typedValue };
83+
84+
var resp = await _driver.UnaryCall(
85+
TableService.BulkUpsertMethod,
86+
req,
87+
new GrpcRequestSettings { CancellationToken = _cancellationToken }
88+
).ConfigureAwait(false);
89+
90+
var operation = resp.Operation;
91+
if (operation.Status.IsNotSuccess())
92+
throw YdbException.FromServer(operation.Status, operation.Issues);
93+
94+
_rows.Clear();
95+
_currentBytes = 0;
96+
}
97+
}
Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,8 @@
1+
namespace Ydb.Sdk.Ado.BulkUpsert;
2+
3+
public interface IBulkUpsertImporter
4+
{
5+
ValueTask AddRowAsync(object?[] row);
6+
7+
ValueTask FlushAsync();
8+
}

src/Ydb.Sdk/src/Ado/YdbConnection.cs

Lines changed: 18 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,7 @@
11
using System.Data;
22
using System.Data.Common;
33
using System.Diagnostics.CodeAnalysis;
4+
using Ydb.Sdk.Ado.BulkUpsert;
45
using Ydb.Sdk.Ado.Session;
56
using Ydb.Sdk.Services.Query;
67
using static System.Data.IsolationLevel;
@@ -52,6 +53,23 @@ public YdbConnection(YdbConnectionStringBuilder connectionStringBuilder)
5253
ConnectionStringBuilder = connectionStringBuilder;
5354
}
5455

56+
public IBulkUpsertImporter BeginBulkUpsertImport(
57+
string name,
58+
IReadOnlyList<string> columns,
59+
CancellationToken cancellationToken = default)
60+
{
61+
ThrowIfConnectionClosed();
62+
if (CurrentTransaction is { Completed: false })
63+
throw new InvalidOperationException("BulkUpsert cannot be used inside an active transaction.");
64+
65+
var database = ConnectionStringBuilder.Database.TrimEnd('/');
66+
var tablePath = name.StartsWith(database) ? name : $"{database}/{name}";
67+
68+
var maxBytes = ConnectionStringBuilder.MaxSendMessageSize;
69+
70+
return new BulkUpsertImporter(Session.Driver, tablePath, columns, maxBytes, cancellationToken);
71+
}
72+
5573
protected override YdbTransaction BeginDbTransaction(IsolationLevel isolationLevel)
5674
{
5775
ThrowIfConnectionClosed();

src/Ydb.Sdk/test/Ydb.Sdk.Ado.Tests/YdbConnectionTests.cs

Lines changed: 183 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -308,4 +308,187 @@ private List<Task> GenerateTasks() => Enumerable.Range(0, 100).Select(async i =>
308308

309309
protected override async Task OnDisposeAsync() =>
310310
await YdbConnection.ClearPool(new YdbConnection(_connectionStringTls));
311+
312+
[Fact]
313+
public async Task BulkUpsertImporter_HappyPath_Add_Flush()
314+
{
315+
var tableName = $"BulkImporter_{Guid.NewGuid():N}";
316+
317+
var conn = new YdbConnection(_connectionStringTls);
318+
await conn.OpenAsync();
319+
try
320+
{
321+
await using (var createCmd = conn.CreateCommand())
322+
{
323+
createCmd.CommandText = $@"
324+
CREATE TABLE {tableName} (
325+
Id Int32,
326+
Name Utf8,
327+
PRIMARY KEY (Id)
328+
)";
329+
await createCmd.ExecuteNonQueryAsync();
330+
}
331+
332+
var columns = new[] { "Id", "Name" };
333+
334+
var importer = conn.BeginBulkUpsertImport(tableName, columns);
335+
336+
await importer.AddRowAsync([YdbValue.MakeInt32(1), YdbValue.MakeUtf8("Alice")]);
337+
await importer.AddRowAsync([YdbValue.MakeInt32(2), YdbValue.MakeUtf8("Bob")]);
338+
await importer.FlushAsync();
339+
340+
await using (var checkCmd = conn.CreateCommand())
341+
{
342+
checkCmd.CommandText = $"SELECT COUNT(*) FROM {tableName}";
343+
var count = Convert.ToInt32(await checkCmd.ExecuteScalarAsync());
344+
Assert.Equal(2, count);
345+
}
346+
347+
importer = conn.BeginBulkUpsertImport(tableName, columns);
348+
await importer.AddRowAsync([YdbValue.MakeInt32(3), YdbValue.MakeUtf8("Charlie")]);
349+
await importer.AddRowAsync([YdbValue.MakeInt32(4), YdbValue.MakeUtf8("Diana")]);
350+
await importer.FlushAsync();
351+
352+
await using (var checkCmd = conn.CreateCommand())
353+
{
354+
checkCmd.CommandText = $"SELECT Name FROM {tableName} ORDER BY Id";
355+
var names = new List<string>();
356+
await using var reader = await checkCmd.ExecuteReaderAsync();
357+
while (await reader.ReadAsync())
358+
names.Add(reader.GetString(0));
359+
Assert.Contains("Alice", names);
360+
Assert.Contains("Bob", names);
361+
Assert.Contains("Charlie", names);
362+
Assert.Contains("Diana", names);
363+
}
364+
}
365+
finally
366+
{
367+
await using var dropCmd = conn.CreateCommand();
368+
dropCmd.CommandText = $"DROP TABLE {tableName}";
369+
await dropCmd.ExecuteNonQueryAsync();
370+
}
371+
}
372+
373+
[Fact]
374+
public async Task BulkUpsertImporter_ThrowsOnInvalidRowCount()
375+
{
376+
var tableName = $"BulkImporter_{Guid.NewGuid():N}";
377+
var conn = new YdbConnection(_connectionStringTls);
378+
await conn.OpenAsync();
379+
try
380+
{
381+
await using (var createCmd = conn.CreateCommand())
382+
{
383+
createCmd.CommandText = $@"
384+
CREATE TABLE {tableName} (
385+
Id Int32,
386+
Name Utf8,
387+
PRIMARY KEY (Id)
388+
)";
389+
await createCmd.ExecuteNonQueryAsync();
390+
}
391+
392+
var columns = new[] { "Id", "Name" };
393+
394+
var importer = conn.BeginBulkUpsertImport(tableName, columns);
395+
396+
var badRow = new object?[] { YdbValue.MakeInt32(1) };
397+
await Assert.ThrowsAsync<ArgumentException>(async () => await importer.AddRowAsync([badRow]));
398+
399+
await Assert.ThrowsAsync<ArgumentException>(async () =>
400+
{
401+
await importer.AddRowAsync([
402+
new object?[] { YdbValue.MakeInt32(2) }
403+
]);
404+
});
405+
}
406+
finally
407+
{
408+
await using var dropCmd = conn.CreateCommand();
409+
dropCmd.CommandText = $"DROP TABLE {tableName}";
410+
await dropCmd.ExecuteNonQueryAsync();
411+
}
412+
}
413+
414+
[Fact]
415+
public async Task BulkUpsertImporter_MultipleImporters_Parallel()
416+
{
417+
var table1 = $"BulkImporter_{Guid.NewGuid():N}_1";
418+
var table2 = $"BulkImporter_{Guid.NewGuid():N}_2";
419+
420+
var conn = new YdbConnection(_connectionStringTls);
421+
await conn.OpenAsync();
422+
try
423+
{
424+
foreach (var table in new[] { table1, table2 })
425+
{
426+
await using var createCmd = conn.CreateCommand();
427+
createCmd.CommandText = $@"CREATE TABLE {table} (
428+
Id Int32,
429+
Name Utf8,
430+
PRIMARY KEY (Id)
431+
)";
432+
await createCmd.ExecuteNonQueryAsync();
433+
}
434+
435+
var columns = new[] { "Id", "Name" };
436+
437+
await Task.WhenAll(
438+
Task.Run(async () =>
439+
{
440+
var importer = conn.BeginBulkUpsertImport(table1, columns);
441+
var rows = Enumerable.Range(0, 20)
442+
.Select(i => new object?[] { YdbValue.MakeInt32(i), YdbValue.MakeUtf8($"A{i}") })
443+
.ToArray();
444+
foreach (var row in rows)
445+
await importer.AddRowAsync(row);
446+
await importer.FlushAsync();
447+
}),
448+
Task.Run(async () =>
449+
{
450+
var importer = conn.BeginBulkUpsertImport(table2, columns);
451+
var rows = Enumerable.Range(0, 20)
452+
.Select(i => new object?[] { YdbValue.MakeInt32(i), YdbValue.MakeUtf8($"B{i}") })
453+
.ToArray();
454+
foreach (var row in rows)
455+
await importer.AddRowAsync(row);
456+
await importer.FlushAsync();
457+
})
458+
);
459+
460+
foreach (var table in new[] { table1, table2 })
461+
{
462+
await using var checkCmd = conn.CreateCommand();
463+
checkCmd.CommandText = $"SELECT COUNT(*) FROM {table}";
464+
var count = Convert.ToInt32(await checkCmd.ExecuteScalarAsync());
465+
Assert.Equal(20, count);
466+
}
467+
}
468+
finally
469+
{
470+
foreach (var table in new[] { table1, table2 })
471+
{
472+
await using var dropCmd = conn.CreateCommand();
473+
dropCmd.CommandText = $"DROP TABLE {table}";
474+
await dropCmd.ExecuteNonQueryAsync();
475+
}
476+
}
477+
}
478+
479+
[Fact]
480+
public async Task BulkUpsertImporter_ThrowsOnNonexistentTable()
481+
{
482+
var tableName = $"Nonexistent_{Guid.NewGuid():N}";
483+
var conn = new YdbConnection(_connectionStringTls);
484+
await conn.OpenAsync();
485+
486+
var columns = new[] { "Id", "Name" };
487+
488+
var importer = conn.BeginBulkUpsertImport(tableName, columns);
489+
490+
await importer.AddRowAsync([YdbValue.MakeInt32(1), YdbValue.MakeUtf8("NotExists")]);
491+
492+
await Assert.ThrowsAsync<YdbException>(async () => { await importer.FlushAsync(); });
493+
}
311494
}

0 commit comments

Comments
 (0)