Skip to content

Commit 14a2bfe

Browse files
committed
fix: handle exceptions in BulkUpsertImporter DisposeAsync, update tests
1 parent 1879bca commit 14a2bfe

File tree

4 files changed

+76
-132
lines changed

4 files changed

+76
-132
lines changed

src/Ydb.Sdk/src/Ado/BulkUpsert/BulkUpsertFormat.cs

Lines changed: 0 additions & 7 deletions
This file was deleted.
Lines changed: 41 additions & 41 deletions
Original file line numberDiff line numberDiff line change
@@ -1,13 +1,16 @@
1+
using Ydb.Sdk.Ado.Internal;
12
using Ydb.Sdk.Value;
3+
using Ydb.Table;
4+
using Ydb.Table.V1;
25

36
namespace Ydb.Sdk.Ado.BulkUpsert;
47

58
public sealed class BulkUpsertImporter : IBulkUpsertImporter
69
{
710
private readonly YdbConnection _connection;
811
private readonly string _tablePath;
9-
private readonly List<string> _columns;
10-
private readonly List<Type> _types;
12+
private readonly IReadOnlyList<string> _columns;
13+
private readonly IReadOnlyList<Type> _types;
1114
private readonly int _maxBytes;
1215
private readonly List<Ydb.Value> _rows = new();
1316
private bool _disposed;
@@ -17,15 +20,30 @@ public BulkUpsertImporter(
1720
string tablePath,
1821
IReadOnlyList<string> columns,
1922
IReadOnlyList<Type> types,
20-
int maxBytes = 1024 * 1024)
23+
int maxBytes = 64 * 1024 * 1024
24+
)
2125
{
2226
_connection = connection;
2327
_tablePath = tablePath;
24-
_columns = columns.ToList();
25-
_types = types.ToList();
28+
_columns = columns;
29+
_types = types;
2630
_maxBytes = maxBytes;
2731
}
2832

33+
public async ValueTask AddRowAsync(params object?[] values)
34+
{
35+
ThrowIfDisposed();
36+
if (values.Length != _columns.Count)
37+
throw new ArgumentException("Values count must match columns count", nameof(values));
38+
39+
var ydbValues = values.Select(v =>
40+
v is YdbValue yv ? yv :
41+
v is YdbParameter param ? param.YdbValue :
42+
throw new ArgumentException("All values must be either YdbValue or YdbParameter")).ToArray();
43+
44+
await AddRowAsync(ydbValues);
45+
}
46+
2947
public async ValueTask AddRowAsync(params YdbValue[] values)
3048
{
3149
ThrowIfDisposed();
@@ -43,29 +61,15 @@ public async ValueTask AddRowAsync(params YdbValue[] values)
4361
await FlushAsync();
4462
}
4563

46-
public async ValueTask AddRowAsync(params object?[] values)
47-
{
48-
ThrowIfDisposed();
49-
if (values.Length != _columns.Count)
50-
throw new ArgumentException("Values count must match columns count", nameof(values));
51-
52-
var ydbValues = new YdbValue[values.Length];
53-
for (int i = 0; i < values.Length; i++)
54-
{
55-
ydbValues[i] = YdbValueFromObject(values[i], _types[i]);
56-
}
57-
await AddRowAsync(ydbValues);
58-
}
59-
60-
public async ValueTask AddRowsAsync(IEnumerable<YdbValue[]> rows, CancellationToken cancellationToken = default)
64+
public async ValueTask AddRowsAsync(IEnumerable<object?[]> rows, CancellationToken cancellationToken = default)
6165
{
6266
ThrowIfDisposed();
6367

6468
foreach (var values in rows)
6569
await AddRowAsync(values);
6670
}
6771

68-
public async ValueTask AddRowsAsync(IEnumerable<object?[]> rows, CancellationToken cancellationToken = default)
72+
public async ValueTask AddRowsAsync(IEnumerable<YdbValue[]> rows, CancellationToken cancellationToken = default)
6973
{
7074
ThrowIfDisposed();
7175

@@ -78,7 +82,22 @@ public async ValueTask FlushAsync(CancellationToken cancellationToken = default)
7882
ThrowIfDisposed();
7983
if (_rows.Count == 0) return;
8084

81-
await _connection.BulkUpsertProtoAsync(_tablePath, GetStructType(), _rows.ToList(), cancellationToken);
85+
var listValue = new Ydb.Value();
86+
listValue.Items.AddRange(_rows);
87+
88+
var typedValue = new TypedValue { Type = GetStructType(), Value = listValue };
89+
var req = new BulkUpsertRequest { Table = _tablePath, Rows = typedValue };
90+
91+
var resp = await _connection.Session.Driver.UnaryCall(
92+
TableService.BulkUpsertMethod,
93+
req,
94+
new GrpcRequestSettings { CancellationToken = cancellationToken }
95+
).ConfigureAwait(false);
96+
97+
var operation = resp.Operation;
98+
if (operation.Status.IsNotSuccess())
99+
throw YdbException.FromServer(operation.Status, operation.Issues);
100+
82101
_rows.Clear();
83102
}
84103

@@ -104,23 +123,4 @@ private void ThrowIfDisposed()
104123
if (_disposed)
105124
throw new ObjectDisposedException(nameof(BulkUpsertImporter));
106125
}
107-
108-
private static YdbValue YdbValueFromObject(object? value, Type columnType)
109-
{
110-
switch (value)
111-
{
112-
case YdbValue ydbValue:
113-
return ydbValue;
114-
default:
115-
switch (columnType.TypeId)
116-
{
117-
case Type.Types.PrimitiveTypeId.Int32:
118-
return YdbValue.MakeInt32(Convert.ToInt32(value));
119-
case Type.Types.PrimitiveTypeId.Utf8:
120-
return YdbValue.MakeUtf8(value?.ToString()!);
121-
default:
122-
throw new NotSupportedException($"Type '{columnType.TypeId}' not supported in YdbValueFromObject");
123-
}
124-
}
125-
}
126126
}

src/Ydb.Sdk/src/Ado/YdbConnection.cs

Lines changed: 8 additions & 31 deletions
Original file line numberDiff line numberDiff line change
@@ -13,6 +13,8 @@ namespace Ydb.Sdk.Ado;
1313

1414
public sealed class YdbConnection : DbConnection
1515
{
16+
private const int MaxSendBufferSize = 64 * 1024 * 1024;
17+
1618
private static readonly StateChangeEventArgs ClosedToOpenEventArgs =
1719
new(ConnectionState.Closed, ConnectionState.Open);
1820

@@ -56,45 +58,20 @@ public YdbConnection(YdbConnectionStringBuilder connectionStringBuilder)
5658
ConnectionStringBuilder = connectionStringBuilder;
5759
}
5860

59-
public async Task BulkUpsertProtoAsync(
60-
string tablePath,
61-
Type structType,
62-
IReadOnlyList<Ydb.Value> chunk,
63-
CancellationToken cancellationToken = default)
64-
{
65-
if (CurrentTransaction is { Completed: false })
66-
throw new InvalidOperationException("BulkUpsertProto cannot be used inside an active transaction.");
67-
68-
var listValue = new Ydb.Value();
69-
listValue.Items.AddRange(chunk);
70-
71-
var typedValue = new TypedValue { Type = structType, Value = listValue };
72-
var req = new BulkUpsertRequest { Table = tablePath, Rows = typedValue };
73-
74-
var resp = await Session.Driver.UnaryCall(
75-
TableService.BulkUpsertMethod,
76-
req,
77-
new GrpcRequestSettings { CancellationToken = cancellationToken }
78-
).ConfigureAwait(false);
79-
80-
var operation = resp.Operation;
81-
if (operation.Status.IsNotSuccess())
82-
throw YdbException.FromServer(operation.Status, operation.Issues);
83-
}
84-
8561
public IBulkUpsertImporter BeginBulkUpsertImport(
86-
string tablePath,
62+
string name,
8763
IReadOnlyList<string> columns,
88-
IReadOnlyList<Type> types,
89-
int maxBytes = 1024 * 1024)
64+
IReadOnlyList<Type> types)
9065
{
9166
ThrowIfConnectionClosed();
9267
if (CurrentTransaction is { Completed: false })
9368
throw new InvalidOperationException("BulkUpsert cannot be used inside an active transaction.");
9469

95-
return new BulkUpsertImporter(this, tablePath, columns, types, maxBytes);
96-
}
70+
var database = (ConnectionStringBuilder.Database ?? "").TrimEnd('/');
71+
var tablePath = string.IsNullOrEmpty(database) ? name : $"{database}/{name}";
9772

73+
return new BulkUpsertImporter(this, tablePath, columns, types, MaxSendBufferSize);
74+
}
9875

9976
protected override YdbTransaction BeginDbTransaction(IsolationLevel isolationLevel)
10077
{

src/Ydb.Sdk/test/Ydb.Sdk.Ado.Tests/YdbConnectionTests.cs

Lines changed: 27 additions & 53 deletions
Original file line numberDiff line numberDiff line change
@@ -313,8 +313,6 @@ protected override async Task OnDisposeAsync() =>
313313
public async Task BulkUpsertImporter_HappyPath_AddRows_Flush_Dispose()
314314
{
315315
var tableName = $"BulkImporter_{Guid.NewGuid():N}";
316-
var database = new YdbConnectionStringBuilder(_connectionStringTls).Database.TrimEnd('/');
317-
var absTablePath = string.IsNullOrEmpty(database) ? tableName : $"{database}/{tableName}";
318316

319317
var conn = new YdbConnection(_connectionStringTls);
320318
await conn.OpenAsync();
@@ -337,7 +335,7 @@ PRIMARY KEY (Id)
337335
new Type { TypeId = Type.Types.PrimitiveTypeId.Utf8 }
338336
};
339337

340-
await using (var importer = conn.BeginBulkUpsertImport(absTablePath, columns, types, maxBytes: 1024))
338+
await using (var importer = conn.BeginBulkUpsertImport(tableName, columns, types))
341339
{
342340
await importer.AddRowAsync(YdbValue.MakeInt32(1), YdbValue.MakeUtf8("Alice"));
343341
await importer.AddRowAsync(YdbValue.MakeInt32(2), YdbValue.MakeUtf8("Bob"));
@@ -353,7 +351,7 @@ PRIMARY KEY (Id)
353351
Assert.Equal(2, count);
354352
}
355353

356-
await using (var importer = conn.BeginBulkUpsertImport(absTablePath, columns, types, maxBytes: 1))
354+
await using (var importer = conn.BeginBulkUpsertImport(tableName, columns, types))
357355
{
358356
var rows = new List<YdbValue[]>
359357
{
@@ -409,7 +407,7 @@ PRIMARY KEY (Id)
409407
new Type { TypeId = Type.Types.PrimitiveTypeId.Utf8 }
410408
};
411409

412-
await using (var importer = conn.BeginBulkUpsertImport(tableName, columns, types, maxBytes: 1024))
410+
await using (var importer = conn.BeginBulkUpsertImport(tableName, columns, types))
413411
{
414412
await Assert.ThrowsAsync<ArgumentException>(async () =>
415413
{
@@ -438,8 +436,6 @@ await Assert.ThrowsAsync<ArgumentException>(async () =>
438436
public async Task BulkUpsertImporter_DisposeIsIdempotent()
439437
{
440438
var tableName = $"BulkImporter_{Guid.NewGuid():N}";
441-
var database = new YdbConnectionStringBuilder(_connectionStringTls).Database.TrimEnd('/');
442-
var absTablePath = string.IsNullOrEmpty(database) ? tableName : $"{database}/{tableName}";
443439

444440
var conn = new YdbConnection(_connectionStringTls);
445441
await conn.OpenAsync();
@@ -462,7 +458,7 @@ PRIMARY KEY (Id)
462458
new Type { TypeId = Type.Types.PrimitiveTypeId.Utf8 }
463459
};
464460

465-
var importer = conn.BeginBulkUpsertImport(absTablePath, columns, types, maxBytes: 1024);
461+
var importer = conn.BeginBulkUpsertImport(tableName, columns, types);
466462
await importer.AddRowAsync(YdbValue.MakeInt32(1), YdbValue.MakeUtf8("A"));
467463
await importer.DisposeAsync();
468464

@@ -475,15 +471,12 @@ PRIMARY KEY (Id)
475471
await dropCmd.ExecuteNonQueryAsync();
476472
}
477473
}
478-
474+
479475
[Fact]
480476
public async Task BulkUpsertImporter_MultipleImporters_Parallel()
481477
{
482478
var table1 = $"BulkImporter_{Guid.NewGuid():N}_1";
483479
var table2 = $"BulkImporter_{Guid.NewGuid():N}_2";
484-
var database = new YdbConnectionStringBuilder(_connectionStringTls).Database.TrimEnd('/');
485-
var absTablePath1 = string.IsNullOrEmpty(database) ? table1 : $"{database}/{table1}";
486-
var absTablePath2 = string.IsNullOrEmpty(database) ? table2 : $"{database}/{table2}";
487480

488481
var conn = new YdbConnection(_connectionStringTls);
489482
await conn.OpenAsync();
@@ -510,13 +503,13 @@ PRIMARY KEY (Id)
510503
await Task.WhenAll(
511504
Task.Run(async () =>
512505
{
513-
await using var importer = conn.BeginBulkUpsertImport(absTablePath1, columns, types, maxBytes: 1024);
506+
await using var importer = conn.BeginBulkUpsertImport(table1, columns, types);
514507
for (int i = 0; i < 20; i++)
515508
await importer.AddRowAsync(YdbValue.MakeInt32(i), YdbValue.MakeUtf8($"A{i}"));
516509
}),
517510
Task.Run(async () =>
518511
{
519-
await using var importer = conn.BeginBulkUpsertImport(absTablePath2, columns, types, maxBytes: 1024);
512+
await using var importer = conn.BeginBulkUpsertImport(table2, columns, types);
520513
for (int i = 0; i < 20; i++)
521514
await importer.AddRowAsync(YdbValue.MakeInt32(i), YdbValue.MakeUtf8($"B{i}"));
522515
})
@@ -540,55 +533,36 @@ await Task.WhenAll(
540533
}
541534
}
542535
}
543-
536+
544537
[Fact]
545-
public async Task BulkUpsertImporter_TinyMaxBytes_FlushesEveryRow()
538+
public async Task BulkUpsertImporter_ThrowsOnNonexistentTable()
546539
{
547-
var tableName = $"BulkImporter_{Guid.NewGuid():N}";
548-
var database = new YdbConnectionStringBuilder(_connectionStringTls).Database.TrimEnd('/');
549-
var absTablePath = string.IsNullOrEmpty(database) ? tableName : $"{database}/{tableName}";
550-
540+
var tableName = $"Nonexistent_{Guid.NewGuid():N}";
551541
var conn = new YdbConnection(_connectionStringTls);
552542
await conn.OpenAsync();
553-
try
543+
544+
var columns = new[] { "Id", "Name" };
545+
var types = new[]
554546
{
555-
await using (var createCmd = conn.CreateCommand())
556-
{
557-
createCmd.CommandText = $@"
558-
CREATE TABLE {tableName} (
559-
Id Int32,
560-
Name Utf8,
561-
PRIMARY KEY (Id)
562-
)";
563-
await createCmd.ExecuteNonQueryAsync();
564-
}
547+
new Type { TypeId = Type.Types.PrimitiveTypeId.Int32 },
548+
new Type { TypeId = Type.Types.PrimitiveTypeId.Utf8 }
549+
};
565550

566-
var columns = new[] { "Id", "Name" };
567-
var types = new[]
568-
{
569-
new Type { TypeId = Type.Types.PrimitiveTypeId.Int32 },
570-
new Type { TypeId = Type.Types.PrimitiveTypeId.Utf8 }
571-
};
551+
var importer = conn.BeginBulkUpsertImport(tableName, columns, types);
572552

573-
await using (var importer = conn.BeginBulkUpsertImport(absTablePath, columns, types, maxBytes: 1))
574-
{
575-
for (int i = 0; i < 10; i++)
576-
await importer.AddRowAsync(YdbValue.MakeInt32(i), YdbValue.MakeUtf8($"Name{i}"));
577-
Assert.Empty(importer.GetBufferedRows());
578-
}
553+
await Assert.ThrowsAsync<YdbException>(async () =>
554+
{
555+
await importer.AddRowAsync(YdbValue.MakeInt32(1), YdbValue.MakeUtf8("NotExists"));
556+
await importer.FlushAsync();
557+
});
579558

580-
await using (var checkCmd = conn.CreateCommand())
581-
{
582-
checkCmd.CommandText = $"SELECT COUNT(*) FROM {tableName}";
583-
var count = Convert.ToInt32(await checkCmd.ExecuteScalarAsync());
584-
Assert.Equal(10, count);
585-
}
559+
try
560+
{
561+
await importer.DisposeAsync();
586562
}
587-
finally
563+
catch (YdbException)
588564
{
589-
await using var dropCmd = conn.CreateCommand();
590-
dropCmd.CommandText = $"DROP TABLE {tableName}";
591-
await dropCmd.ExecuteNonQueryAsync();
565+
// success
592566
}
593567
}
594568
}

0 commit comments

Comments
 (0)