Skip to content

Commit 6463d36

Browse files
committed
feat: switch BulkUpsertImporter to plain proto, remove extra wrappers and disposables
1 parent c9fdbb5 commit 6463d36

File tree

5 files changed

+127
-206
lines changed

5 files changed

+127
-206
lines changed
Lines changed: 90 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,90 @@
1+
using Google.Protobuf.Collections;
2+
using Ydb.Sdk.Ado.Internal;
3+
using Ydb.Sdk.Value;
4+
using Ydb.Table;
5+
using Ydb.Table.V1;
6+
7+
namespace Ydb.Sdk.Ado.BulkUpsert;
8+
9+
public sealed class BulkUpsertImporter : IBulkUpsertImporter
10+
{
11+
private readonly IDriver _driver;
12+
private readonly string _tablePath;
13+
private readonly IReadOnlyList<string> _columns;
14+
private readonly RepeatedField<Ydb.Value> _rows = new();
15+
private readonly CancellationToken _cancellationToken;
16+
private StructType? _structType;
17+
18+
public BulkUpsertImporter(
19+
IDriver driver,
20+
string tableName,
21+
IReadOnlyList<string> columns,
22+
CancellationToken cancellationToken = default)
23+
{
24+
_driver = driver;
25+
_tablePath = tableName;
26+
_columns = columns;
27+
_cancellationToken = cancellationToken;
28+
}
29+
30+
public async ValueTask AddRowsAsync(IEnumerable<object?[]> rows)
31+
{
32+
foreach (var values in rows)
33+
{
34+
if (values.Length != _columns.Count)
35+
throw new ArgumentException("Values count must match columns count", nameof(values));
36+
37+
var ydbValues = values.Select(v =>
38+
v as YdbValue ?? (v is YdbParameter param ? param.YdbValue :
39+
new YdbParameter { Value = v }.YdbValue)
40+
).ToArray();
41+
42+
var protoStruct = new Ydb.Value();
43+
foreach (var value in ydbValues)
44+
protoStruct.Items.Add(value.GetProto().Value);
45+
46+
_rows.Add(protoStruct);
47+
48+
_structType ??= new StructType
49+
{
50+
Members =
51+
{
52+
_columns.Select((col, i) => new StructMember
53+
{
54+
Name = col,
55+
Type = ydbValues[i].GetProto().Type
56+
})
57+
}
58+
};
59+
60+
await FlushAsync(_cancellationToken);
61+
}
62+
}
63+
64+
public async ValueTask FlushAsync(CancellationToken cancellationToken = default)
65+
{
66+
if (_rows.Count == 0) return;
67+
if (_structType == null)
68+
throw new InvalidOperationException("structType is undefined");
69+
70+
var typedValue = new TypedValue
71+
{
72+
Type = new Type { ListType = new ListType { Item = new Type { StructType = _structType } } },
73+
Value = new Ydb.Value { Items = { _rows } }
74+
};
75+
76+
var req = new BulkUpsertRequest { Table = _tablePath, Rows = typedValue };
77+
78+
var resp = await _driver.UnaryCall(
79+
TableService.BulkUpsertMethod,
80+
req,
81+
new GrpcRequestSettings { CancellationToken = cancellationToken }
82+
).ConfigureAwait(false);
83+
84+
var operation = resp.Operation;
85+
if (operation.Status.IsNotSuccess())
86+
throw YdbException.FromServer(operation.Status, operation.Issues);
87+
88+
_rows.Clear();
89+
}
90+
}
Lines changed: 2 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -1,11 +1,7 @@
1-
using Ydb.Sdk.Value;
2-
31
namespace Ydb.Sdk.Ado.BulkUpsert;
42

5-
public interface IBulkUpsertImporter : IAsyncDisposable
3+
public interface IBulkUpsertImporter
64
{
7-
ValueTask AddRowsAsync(IEnumerable<YdbValue[]> rows);
85
ValueTask AddRowsAsync(IEnumerable<object?[]> rows);
9-
106
ValueTask FlushAsync(CancellationToken cancellationToken = default);
11-
}
7+
}

src/Ydb.Sdk/src/Ado/BulkUpsert/YdbBulkUpsertImporter.cs

Lines changed: 0 additions & 121 deletions
This file was deleted.

src/Ydb.Sdk/src/Ado/YdbConnection.cs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -66,7 +66,7 @@ public IBulkUpsertImporter BeginBulkUpsertImport(
6666
var database = ConnectionStringBuilder.Database.TrimEnd('/');
6767
var tablePath = string.IsNullOrEmpty(database) ? name : $"{database}/{name}";
6868

69-
return new BulkUpsertImporter(this, tablePath, columns, cancellationToken);
69+
return new BulkUpsertImporter(Session.Driver, tablePath, columns, cancellationToken);
7070
}
7171

7272
protected override YdbTransaction BeginDbTransaction(IsolationLevel isolationLevel)

0 commit comments

Comments
 (0)