Skip to content

Commit 1879bca

Browse files
committed
bulk upsert: add object[] support, simplify implementation
1 parent 7ffa2ee commit 1879bca

File tree

6 files changed

+284
-542
lines changed

6 files changed

+284
-542
lines changed
Lines changed: 10 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,15 @@
1+
using Ydb.Sdk.Value;
2+
13
namespace Ydb.Sdk.Ado.BulkUpsert;
24

35
public interface IBulkUpsertImporter : IAsyncDisposable
46
{
5-
ValueTask AddRowAsync(params Ydb.Value[] values);
7+
ValueTask AddRowAsync(params YdbValue[] values);
8+
ValueTask AddRowAsync(params object?[] values);
9+
10+
ValueTask AddRowsAsync(IEnumerable<YdbValue[]> rows, CancellationToken cancellationToken = default);
11+
ValueTask AddRowsAsync(IEnumerable<object?[]> rows, CancellationToken cancellationToken = default);
12+
613
ValueTask FlushAsync(CancellationToken cancellationToken = default);
7-
}
14+
IReadOnlyList<Ydb.Value> GetBufferedRows();
15+
}

src/Ydb.Sdk/src/Ado/BulkUpsert/YdbBulkUpsertProtoImporter.cs renamed to src/Ydb.Sdk/src/Ado/BulkUpsert/YdbBulkUpsertImporter.cs

Lines changed: 54 additions & 44 deletions
Original file line numberDiff line numberDiff line change
@@ -2,18 +2,17 @@
22

33
namespace Ydb.Sdk.Ado.BulkUpsert;
44

5-
public sealed class YdbBulkUpsertProtoImporter : IAsyncDisposable
5+
public sealed class BulkUpsertImporter : IBulkUpsertImporter
66
{
77
private readonly YdbConnection _connection;
88
private readonly string _tablePath;
99
private readonly List<string> _columns;
1010
private readonly List<Type> _types;
1111
private readonly int _maxBytes;
12-
1312
private readonly List<Ydb.Value> _rows = new();
1413
private bool _disposed;
1514

16-
public YdbBulkUpsertProtoImporter(
15+
public BulkUpsertImporter(
1716
YdbConnection connection,
1817
string tablePath,
1918
IReadOnlyList<string> columns,
@@ -22,15 +21,14 @@ public YdbBulkUpsertProtoImporter(
2221
{
2322
_connection = connection;
2423
_tablePath = tablePath;
25-
_columns = columns is List<string> colList ? colList : new List<string>(columns);
26-
_types = types is List<Type> typList ? typList : new List<Type>(types);
24+
_columns = columns.ToList();
25+
_types = types.ToList();
2726
_maxBytes = maxBytes;
2827
}
2928

3029
public async ValueTask AddRowAsync(params YdbValue[] values)
3130
{
3231
ThrowIfDisposed();
33-
3432
if (values.Length != _columns.Count)
3533
throw new ArgumentException("Values count must match columns count", nameof(values));
3634

@@ -41,76 +39,88 @@ public async ValueTask AddRowAsync(params YdbValue[] values)
4139
_rows.Add(structValue);
4240

4341
var totalSize = _rows.Sum(r => r.CalculateSize());
44-
4542
if (totalSize >= _maxBytes)
4643
await FlushAsync();
4744
}
4845

49-
public async ValueTask AddRowsAsync(IEnumerable<YdbValue[]> rows, CancellationToken cancellationToken = default)
46+
public async ValueTask AddRowAsync(params object?[] values)
5047
{
5148
ThrowIfDisposed();
49+
if (values.Length != _columns.Count)
50+
throw new ArgumentException("Values count must match columns count", nameof(values));
5251

53-
foreach (var values in rows)
52+
var ydbValues = new YdbValue[values.Length];
53+
for (int i = 0; i < values.Length; i++)
5454
{
55-
if (values.Length != _columns.Count)
56-
throw new ArgumentException("Values count must match columns count", nameof(values));
57-
58-
var dict = _columns.Zip(values, (name, value) => new KeyValuePair<string, YdbValue>(name, value))
59-
.ToDictionary(x => x.Key, x => x.Value);
60-
61-
var structValue = YdbValue.MakeStruct(dict).GetProto().Value;
62-
_rows.Add(structValue);
55+
ydbValues[i] = YdbValueFromObject(values[i], _types[i]);
56+
}
57+
await AddRowAsync(ydbValues);
58+
}
6359

64-
var totalSize = _rows.Sum(r => r.CalculateSize());
60+
public async ValueTask AddRowsAsync(IEnumerable<YdbValue[]> rows, CancellationToken cancellationToken = default)
61+
{
62+
ThrowIfDisposed();
6563

66-
if (totalSize >= _maxBytes)
67-
await FlushAsync(cancellationToken);
68-
}
64+
foreach (var values in rows)
65+
await AddRowAsync(values);
6966
}
7067

71-
public async ValueTask DisposeAsync()
68+
public async ValueTask AddRowsAsync(IEnumerable<object?[]> rows, CancellationToken cancellationToken = default)
7269
{
73-
if (_disposed) return;
74-
await FlushAsync();
75-
_disposed = true;
70+
ThrowIfDisposed();
71+
72+
foreach (var values in rows)
73+
await AddRowAsync(values);
7674
}
7775

7876
public async ValueTask FlushAsync(CancellationToken cancellationToken = default)
7977
{
8078
ThrowIfDisposed();
79+
if (_rows.Count == 0) return;
8180

82-
if (_rows.Count == 0)
83-
return;
81+
await _connection.BulkUpsertProtoAsync(_tablePath, GetStructType(), _rows.ToList(), cancellationToken);
82+
_rows.Clear();
83+
}
8484

85-
await _connection.BulkUpsertProtoAsync(
86-
_tablePath,
87-
GetStructType(),
88-
new List<Ydb.Value>(_rows),
89-
cancellationToken);
85+
public IReadOnlyList<Ydb.Value> GetBufferedRows() => _rows;
9086

91-
_rows.Clear();
87+
public async ValueTask DisposeAsync()
88+
{
89+
if (_disposed) return;
90+
await FlushAsync();
91+
_disposed = true;
9292
}
9393

9494
private Type GetStructType()
9595
{
9696
var structType = new Type { StructType = new StructType() };
9797
for (var i = 0; i < _columns.Count; i++)
98-
{
99-
structType.StructType.Members.Add(new StructMember
100-
{
101-
Name = _columns[i],
102-
Type = _types[i]
103-
});
104-
}
105-
98+
structType.StructType.Members.Add(new StructMember { Name = _columns[i], Type = _types[i] });
10699
return structType;
107100
}
108101

109102
private void ThrowIfDisposed()
110103
{
111104
if (_disposed)
112-
throw new ObjectDisposedException(nameof(YdbBulkUpsertProtoImporter));
105+
throw new ObjectDisposedException(nameof(BulkUpsertImporter));
113106
}
114107

115-
public IReadOnlyList<Ydb.Value> GetBufferedRows() => _rows;
116-
}
108+
private static YdbValue YdbValueFromObject(object? value, Type columnType)
109+
{
110+
switch (value)
111+
{
112+
case YdbValue ydbValue:
113+
return ydbValue;
114+
default:
115+
switch (columnType.TypeId)
116+
{
117+
case Type.Types.PrimitiveTypeId.Int32:
118+
return YdbValue.MakeInt32(Convert.ToInt32(value));
119+
case Type.Types.PrimitiveTypeId.Utf8:
120+
return YdbValue.MakeUtf8(value?.ToString()!);
121+
default:
122+
throw new NotSupportedException($"Type '{columnType.TypeId}' not supported in YdbValueFromObject");
123+
}
124+
}
125+
}
126+
}

src/Ydb.Sdk/src/Ado/YdbConnection.cs

Lines changed: 18 additions & 64 deletions
Original file line numberDiff line numberDiff line change
@@ -1,10 +1,10 @@
11
using System.Data;
22
using System.Data.Common;
33
using System.Diagnostics.CodeAnalysis;
4+
using Ydb.Sdk.Ado.BulkUpsert;
45
using Ydb.Sdk.Ado.Internal;
56
using Ydb.Sdk.Ado.Session;
67
using Ydb.Sdk.Services.Query;
7-
using Ydb.Sdk.Value;
88
using Ydb.Table;
99
using Ydb.Table.V1;
1010
using static System.Data.IsolationLevel;
@@ -56,38 +56,20 @@ public YdbConnection(YdbConnectionStringBuilder connectionStringBuilder)
5656
ConnectionStringBuilder = connectionStringBuilder;
5757
}
5858

59-
public async Task BulkUpsertAsync(
59+
public async Task BulkUpsertProtoAsync(
6060
string tablePath,
61-
IReadOnlyList<string> columns,
62-
IReadOnlyList<IReadOnlyList<object?>> rows,
61+
Type structType,
62+
IReadOnlyList<Ydb.Value> chunk,
6363
CancellationToken cancellationToken = default)
6464
{
6565
if (CurrentTransaction is { Completed: false })
66-
throw new InvalidOperationException("BulkUpsert cannot be used inside an active transaction.");
66+
throw new InvalidOperationException("BulkUpsertProto cannot be used inside an active transaction.");
6767

68-
if (columns == null || columns.Count == 0)
69-
throw new ArgumentException("Columns must not be empty", nameof(columns));
70-
if (rows == null || rows.Count == 0)
71-
throw new ArgumentException("Rows collection is empty", nameof(rows));
68+
var listValue = new Ydb.Value();
69+
listValue.Items.AddRange(chunk);
7270

73-
var structs = rows.Select(row =>
74-
{
75-
if (row.Count != columns.Count)
76-
throw new ArgumentException("Each row must have the same number of elements as columns");
77-
var members = columns
78-
.Select((col, i) =>
79-
new KeyValuePair<string, YdbValue>(col, new YdbParameter { Value = row[i] }.YdbValue))
80-
.ToDictionary(x => x.Key, x => x.Value);
81-
return YdbValue.MakeStruct(members);
82-
}).ToList();
83-
84-
var list = YdbValue.MakeList(structs);
85-
86-
var req = new BulkUpsertRequest
87-
{
88-
Table = tablePath,
89-
Rows = list.GetProto()
90-
};
71+
var typedValue = new TypedValue { Type = structType, Value = listValue };
72+
var req = new BulkUpsertRequest { Table = tablePath, Rows = typedValue };
9173

9274
var resp = await Session.Driver.UnaryCall(
9375
TableService.BulkUpsertMethod,
@@ -97,51 +79,23 @@ public async Task BulkUpsertAsync(
9779

9880
var operation = resp.Operation;
9981
if (operation.Status.IsNotSuccess())
100-
{
10182
throw YdbException.FromServer(operation.Status, operation.Issues);
102-
}
10383
}
104-
105-
public async Task BulkUpsertProtoAsync(
84+
85+
public IBulkUpsertImporter BeginBulkUpsertImport(
10686
string tablePath,
107-
Type structType,
108-
IReadOnlyList<Ydb.Value> chunk,
109-
CancellationToken cancellationToken = default,
110-
int retryCount = 3)
87+
IReadOnlyList<string> columns,
88+
IReadOnlyList<Type> types,
89+
int maxBytes = 1024 * 1024)
11190
{
91+
ThrowIfConnectionClosed();
11292
if (CurrentTransaction is { Completed: false })
113-
throw new InvalidOperationException("BulkUpsertProto cannot be used inside an active transaction.");
114-
115-
var listValue = new Ydb.Value();
116-
listValue.Items.AddRange(chunk);
117-
118-
var typedValue = new TypedValue { Type = structType, Value = listValue };
119-
var req = new BulkUpsertRequest { Table = tablePath, Rows = typedValue };
93+
throw new InvalidOperationException("BulkUpsert cannot be used inside an active transaction.");
12094

121-
var attempt = 0;
122-
while (true)
123-
{
124-
try
125-
{
126-
var resp = await Session.Driver.UnaryCall(
127-
TableService.BulkUpsertMethod,
128-
req,
129-
new GrpcRequestSettings { CancellationToken = cancellationToken }
130-
).ConfigureAwait(false);
131-
132-
var operation = resp.Operation;
133-
if (operation.Status.IsNotSuccess())
134-
throw YdbException.FromServer(operation.Status, operation.Issues);
135-
return;
136-
}
137-
catch (Exception) when (attempt < retryCount)
138-
{
139-
attempt++;
140-
await Task.Delay(100 * attempt, cancellationToken);
141-
}
142-
}
95+
return new BulkUpsertImporter(this, tablePath, columns, types, maxBytes);
14396
}
14497

98+
14599
protected override YdbTransaction BeginDbTransaction(IsolationLevel isolationLevel)
146100
{
147101
ThrowIfConnectionClosed();

src/Ydb.Sdk/src/Ado/YdbDataSource.cs

Lines changed: 0 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -65,16 +65,6 @@ protected override async ValueTask DisposeAsyncCore() =>
6565
await PoolManager.ClearPool(_ydbConnectionStringBuilder.ConnectionString);
6666

6767
protected override void Dispose(bool disposing) => DisposeAsyncCore().AsTask().GetAwaiter().GetResult();
68-
69-
public async Task BulkUpsertAsync(
70-
string tablePath,
71-
IReadOnlyList<string> columns,
72-
IReadOnlyList<IReadOnlyList<object?>> rows,
73-
CancellationToken cancellationToken = default)
74-
{
75-
await using var conn = await OpenConnectionAsync(cancellationToken);
76-
await conn.BulkUpsertAsync(tablePath, columns, rows, cancellationToken);
77-
}
7868
}
7969

8070
#endif

0 commit comments

Comments
 (0)