Skip to content

Commit eec3d12

Browse files
committed
remove Type argument
1 parent b203781 commit eec3d12

File tree

4 files changed

+79
-83
lines changed

4 files changed

+79
-83
lines changed

src/Ydb.Sdk/src/Ado/BulkUpsert/IBulkUpsertImporter.cs

Lines changed: 2 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -4,12 +4,8 @@ namespace Ydb.Sdk.Ado.BulkUpsert;
44

55
public interface IBulkUpsertImporter : IAsyncDisposable
66
{
7-
ValueTask AddRowAsync(params YdbValue[] values);
8-
ValueTask AddRowAsync(params object?[] values);
9-
10-
ValueTask AddRowsAsync(IEnumerable<YdbValue[]> rows, CancellationToken cancellationToken = default);
11-
ValueTask AddRowsAsync(IEnumerable<object?[]> rows, CancellationToken cancellationToken = default);
7+
ValueTask AddRowsAsync(IEnumerable<YdbValue[]> rows);
8+
ValueTask AddRowsAsync(IEnumerable<object?[]> rows);
129

1310
ValueTask FlushAsync(CancellationToken cancellationToken = default);
14-
IReadOnlyList<Ydb.Value> GetBufferedRows();
1511
}

src/Ydb.Sdk/src/Ado/BulkUpsert/YdbBulkUpsertImporter.cs

Lines changed: 48 additions & 53 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
1-
using Ydb.Sdk.Ado.Internal;
21
using Ydb.Sdk.Value;
2+
using Ydb.Sdk.Ado.Internal;
33
using Ydb.Table;
44
using Ydb.Table.V1;
55

@@ -10,82 +10,87 @@ public sealed class BulkUpsertImporter : IBulkUpsertImporter
1010
private readonly YdbConnection _connection;
1111
private readonly string _tablePath;
1212
private readonly IReadOnlyList<string> _columns;
13-
private readonly IReadOnlyList<Type> _types;
1413
private readonly int _maxBytes;
15-
private readonly List<Ydb.Value> _rows = new();
14+
private readonly List<YdbValue> _rows = new();
15+
private readonly CancellationToken _cancellationToken;
1616
private bool _disposed;
1717

1818
public BulkUpsertImporter(
1919
YdbConnection connection,
20-
string tablePath,
20+
string tableName,
2121
IReadOnlyList<string> columns,
22-
IReadOnlyList<Type> types,
23-
int maxBytes = 64 * 1024 * 1024
24-
)
22+
CancellationToken cancellationToken = default,
23+
int maxBytes = 64 * 1024 * 1024)
2524
{
2625
_connection = connection;
27-
_tablePath = tablePath;
26+
_tablePath = tableName;
2827
_columns = columns;
29-
_types = types;
3028
_maxBytes = maxBytes;
29+
_cancellationToken = cancellationToken;
3130
}
3231

33-
public async ValueTask AddRowAsync(params object?[] values)
32+
public async ValueTask AddRowsAsync(IEnumerable<object?[]> rows)
3433
{
3534
ThrowIfDisposed();
36-
if (values.Length != _columns.Count)
37-
throw new ArgumentException("Values count must match columns count", nameof(values));
38-
39-
var ydbValues = values.Select(v =>
40-
v is YdbValue yv ? yv :
41-
v is YdbParameter param ? param.YdbValue :
42-
throw new ArgumentException("All values must be either YdbValue or YdbParameter")).ToArray();
43-
44-
await AddRowAsync(ydbValues);
45-
}
35+
foreach (var values in rows)
36+
{
37+
if (values.Length != _columns.Count)
38+
throw new ArgumentException("Values count must match columns count", nameof(values));
4639

47-
public async ValueTask AddRowAsync(params YdbValue[] values)
48-
{
49-
ThrowIfDisposed();
50-
if (values.Length != _columns.Count)
51-
throw new ArgumentException("Values count must match columns count", nameof(values));
40+
var ydbValues = values.Select(v =>
41+
v is YdbValue yv ? yv :
42+
v is YdbParameter param ? param.YdbValue :
43+
throw new ArgumentException("All values must be either YdbValue or YdbParameter")).ToArray();
5244

53-
var dict = _columns.Zip(values, (name, value) => new KeyValuePair<string, YdbValue>(name, value))
54-
.ToDictionary(x => x.Key, x => x.Value);
45+
var dict = _columns.Zip(ydbValues, (name, value) => new KeyValuePair<string, YdbValue>(name, value))
46+
.ToDictionary(x => x.Key, x => x.Value);
5547

56-
var structValue = YdbValue.MakeStruct(dict).GetProto().Value;
57-
_rows.Add(structValue);
48+
var structRow = YdbValue.MakeStruct(dict);
49+
_rows.Add(structRow);
5850

59-
var totalSize = _rows.Sum(r => r.CalculateSize());
60-
if (totalSize >= _maxBytes)
61-
await FlushAsync();
51+
var totalSize = _rows.Sum(r => r.GetProto().Value.CalculateSize());
52+
if (totalSize >= _maxBytes)
53+
await FlushAsync(_cancellationToken);
54+
}
6255
}
6356

64-
public async ValueTask AddRowsAsync(IEnumerable<object?[]> rows, CancellationToken cancellationToken = default)
57+
public async ValueTask AddRowsAsync(IEnumerable<YdbValue[]> rows)
6558
{
6659
ThrowIfDisposed();
67-
6860
foreach (var values in rows)
69-
await AddRowAsync(values);
70-
}
61+
{
62+
if (values.Length != _columns.Count)
63+
throw new ArgumentException("Values count must match columns count", nameof(values));
7164

72-
public async ValueTask AddRowsAsync(IEnumerable<YdbValue[]> rows, CancellationToken cancellationToken = default)
73-
{
74-
ThrowIfDisposed();
65+
var dict = _columns.Zip(values, (name, value) => new KeyValuePair<string, YdbValue>(name, value))
66+
.ToDictionary(x => x.Key, x => x.Value);
7567

76-
foreach (var values in rows)
77-
await AddRowAsync(values);
68+
var structRow = YdbValue.MakeStruct(dict);
69+
_rows.Add(structRow);
70+
71+
var totalSize = _rows.Sum(r => r.GetProto().Value.CalculateSize());
72+
if (totalSize >= _maxBytes)
73+
await FlushAsync(_cancellationToken);
74+
}
7875
}
7976

8077
public async ValueTask FlushAsync(CancellationToken cancellationToken = default)
8178
{
8279
ThrowIfDisposed();
8380
if (_rows.Count == 0) return;
8481

82+
var structType = _rows[0].GetProto().Type;
83+
8584
var listValue = new Ydb.Value();
86-
listValue.Items.AddRange(_rows);
85+
foreach (var row in _rows)
86+
listValue.Items.Add(row.GetProto().Value);
87+
88+
var typedValue = new TypedValue
89+
{
90+
Type = new Type { ListType = new ListType { Item = structType } },
91+
Value = listValue
92+
};
8793

88-
var typedValue = new TypedValue { Type = GetStructType(), Value = listValue };
8994
var req = new BulkUpsertRequest { Table = _tablePath, Rows = typedValue };
9095

9196
var resp = await _connection.Session.Driver.UnaryCall(
@@ -101,23 +106,13 @@ public async ValueTask FlushAsync(CancellationToken cancellationToken = default)
101106
_rows.Clear();
102107
}
103108

104-
public IReadOnlyList<Ydb.Value> GetBufferedRows() => _rows;
105-
106109
public async ValueTask DisposeAsync()
107110
{
108111
if (_disposed) return;
109112
await FlushAsync();
110113
_disposed = true;
111114
}
112115

113-
private Type GetStructType()
114-
{
115-
var structType = new Type { StructType = new StructType() };
116-
for (var i = 0; i < _columns.Count; i++)
117-
structType.StructType.Members.Add(new StructMember { Name = _columns[i], Type = _types[i] });
118-
return structType;
119-
}
120-
121116
private void ThrowIfDisposed()
122117
{
123118
if (_disposed)

src/Ydb.Sdk/src/Ado/YdbConnection.cs

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -56,7 +56,8 @@ public YdbConnection(YdbConnectionStringBuilder connectionStringBuilder)
5656
public IBulkUpsertImporter BeginBulkUpsertImport(
5757
string name,
5858
IReadOnlyList<string> columns,
59-
IReadOnlyList<Type> types)
59+
CancellationToken cancellationToken = default
60+
)
6061
{
6162
ThrowIfConnectionClosed();
6263
if (CurrentTransaction is { Completed: false })
@@ -65,7 +66,7 @@ public IBulkUpsertImporter BeginBulkUpsertImport(
6566
var database = ConnectionStringBuilder.Database.TrimEnd('/');
6667
var tablePath = string.IsNullOrEmpty(database) ? name : $"{database}/{name}";
6768

68-
return new BulkUpsertImporter(this, tablePath, columns, types);
69+
return new BulkUpsertImporter(this, tablePath, columns, cancellationToken);
6970
}
7071

7172
protected override YdbTransaction BeginDbTransaction(IsolationLevel isolationLevel)

src/Ydb.Sdk/test/Ydb.Sdk.Ado.Tests/YdbConnectionTests.cs

Lines changed: 26 additions & 22 deletions
Original file line numberDiff line numberDiff line change
@@ -310,7 +310,7 @@ protected override async Task OnDisposeAsync() =>
310310
await YdbConnection.ClearPool(new YdbConnection(_connectionStringTls));
311311

312312
[Fact]
313-
public async Task BulkUpsertImporter_HappyPath_AddRows_Flush_Dispose()
313+
public async Task BulkUpsertImporter_HappyPath_Add_Flush_Dispose()
314314
{
315315
var tableName = $"BulkImporter_{Guid.NewGuid():N}";
316316

@@ -336,13 +336,14 @@ PRIMARY KEY (Id)
336336
new Type { TypeId = Type.Types.PrimitiveTypeId.Utf8 }
337337
};
338338

339-
await using (var importer = conn.BeginBulkUpsertImport(tableName, columns, types))
339+
await using (var importer = conn.BeginBulkUpsertImport(tableName, columns))
340340
{
341-
await importer.AddRowAsync(YdbValue.MakeInt32(1), YdbValue.MakeUtf8("Alice"));
342-
await importer.AddRowAsync(YdbValue.MakeInt32(2), YdbValue.MakeUtf8("Bob"));
343-
Assert.Equal(2, importer.GetBufferedRows().Count);
341+
await importer.AddRowsAsync(new[]
342+
{
343+
new[] { YdbValue.MakeInt32(1), YdbValue.MakeUtf8("Alice") },
344+
new[] { YdbValue.MakeInt32(2), YdbValue.MakeUtf8("Bob") }
345+
});
344346
await importer.FlushAsync();
345-
Assert.Empty(importer.GetBufferedRows());
346347
}
347348

348349
await using (var checkCmd = conn.CreateCommand())
@@ -352,14 +353,15 @@ PRIMARY KEY (Id)
352353
Assert.Equal(2, count);
353354
}
354355

355-
await using (var importer = conn.BeginBulkUpsertImport(tableName, columns, types))
356+
await using (var importer = conn.BeginBulkUpsertImport(tableName, columns))
356357
{
357358
var rows = new List<YdbValue[]>
358359
{
359360
new[] { YdbValue.MakeInt32(3), YdbValue.MakeUtf8("Charlie") },
360361
new[] { YdbValue.MakeInt32(4), YdbValue.MakeUtf8("Diana") }
361362
};
362363
await importer.AddRowsAsync(rows);
364+
await importer.FlushAsync();
363365
}
364366

365367
await using (var checkCmd = conn.CreateCommand())
@@ -409,12 +411,10 @@ PRIMARY KEY (Id)
409411
new Type { TypeId = Type.Types.PrimitiveTypeId.Utf8 }
410412
};
411413

412-
await using (var importer = conn.BeginBulkUpsertImport(tableName, columns, types))
414+
await using (var importer = conn.BeginBulkUpsertImport(tableName, columns))
413415
{
414-
await Assert.ThrowsAsync<ArgumentException>(async () =>
415-
{
416-
await importer.AddRowAsync(YdbValue.MakeInt32(1));
417-
});
416+
var badRow = new[] { YdbValue.MakeInt32(1) };
417+
await Assert.ThrowsAsync<ArgumentException>(async () => await importer.AddRowsAsync(new[] { badRow }));
418418

419419
var rows = new List<YdbValue[]>
420420
{
@@ -458,8 +458,8 @@ PRIMARY KEY (Id)
458458
new Type { TypeId = Type.Types.PrimitiveTypeId.Utf8 }
459459
};
460460

461-
var importer = conn.BeginBulkUpsertImport(tableName, columns, types);
462-
await importer.AddRowAsync(YdbValue.MakeInt32(1), YdbValue.MakeUtf8("A"));
461+
var importer = conn.BeginBulkUpsertImport(tableName, columns);
462+
await importer.AddRowsAsync(new[] { new[] { YdbValue.MakeInt32(1), YdbValue.MakeUtf8("A") } });
463463
await importer.DisposeAsync();
464464

465465
await importer.DisposeAsync();
@@ -503,15 +503,19 @@ PRIMARY KEY (Id)
503503
await Task.WhenAll(
504504
Task.Run(async () =>
505505
{
506-
await using var importer = conn.BeginBulkUpsertImport(table1, columns, types);
507-
for (var i = 0; i < 20; i++)
508-
await importer.AddRowAsync(YdbValue.MakeInt32(i), YdbValue.MakeUtf8($"A{i}"));
506+
await using var importer = conn.BeginBulkUpsertImport(table1, columns);
507+
var rows = Enumerable.Range(0, 20)
508+
.Select(i => new[] { YdbValue.MakeInt32(i), YdbValue.MakeUtf8($"A{i}") })
509+
.ToList();
510+
await importer.AddRowsAsync(rows);
509511
}),
510512
Task.Run(async () =>
511513
{
512-
await using var importer = conn.BeginBulkUpsertImport(table2, columns, types);
513-
for (var i = 0; i < 20; i++)
514-
await importer.AddRowAsync(YdbValue.MakeInt32(i), YdbValue.MakeUtf8($"B{i}"));
514+
await using var importer = conn.BeginBulkUpsertImport(table2, columns);
515+
var rows = Enumerable.Range(0, 20)
516+
.Select(i => new[] { YdbValue.MakeInt32(i), YdbValue.MakeUtf8($"B{i}") })
517+
.ToList();
518+
await importer.AddRowsAsync(rows);
515519
})
516520
);
517521

@@ -548,11 +552,11 @@ public async Task BulkUpsertImporter_ThrowsOnNonexistentTable()
548552
new Type { TypeId = Type.Types.PrimitiveTypeId.Utf8 }
549553
};
550554

551-
var importer = conn.BeginBulkUpsertImport(tableName, columns, types);
555+
var importer = conn.BeginBulkUpsertImport(tableName, columns);
552556

553557
await Assert.ThrowsAsync<YdbException>(async () =>
554558
{
555-
await importer.AddRowAsync(YdbValue.MakeInt32(1), YdbValue.MakeUtf8("NotExists"));
559+
await importer.AddRowsAsync(new[] { new[] { YdbValue.MakeInt32(1), YdbValue.MakeUtf8("NotExists") } });
556560
await importer.FlushAsync();
557561
});
558562

0 commit comments

Comments
 (0)