Skip to content

Commit bfc6cf5

Browse files
committed
refactor: correct batch bytes check and flush logic in BulkUpsertImporter
1 parent 6463d36 commit bfc6cf5

File tree

3 files changed

+51
-48
lines changed

3 files changed

+51
-48
lines changed

src/Ydb.Sdk/src/Ado/BulkUpsert/BulkUpsertImporter.cs

Lines changed: 36 additions & 27 deletions
Original file line numberDiff line numberDiff line change
@@ -11,54 +11,62 @@ public sealed class BulkUpsertImporter : IBulkUpsertImporter
1111
private readonly IDriver _driver;
1212
private readonly string _tablePath;
1313
private readonly IReadOnlyList<string> _columns;
14+
private readonly int _maxBytes;
1415
private readonly RepeatedField<Ydb.Value> _rows = new();
1516
private readonly CancellationToken _cancellationToken;
1617
private StructType? _structType;
18+
private int _currentBytes = 0;
1719

1820
public BulkUpsertImporter(
1921
IDriver driver,
2022
string tableName,
2123
IReadOnlyList<string> columns,
22-
CancellationToken cancellationToken = default)
24+
CancellationToken cancellationToken = default,
25+
int maxBytes = 64 * 1024 * 1024)
2326
{
2427
_driver = driver;
2528
_tablePath = tableName;
2629
_columns = columns;
30+
_maxBytes = maxBytes;
2731
_cancellationToken = cancellationToken;
32+
_maxBytes = maxBytes / 2;
2833
}
2934

30-
public async ValueTask AddRowsAsync(IEnumerable<object?[]> rows)
35+
public async ValueTask AddRowAsync(object?[] values)
3136
{
32-
foreach (var values in rows)
33-
{
34-
if (values.Length != _columns.Count)
35-
throw new ArgumentException("Values count must match columns count", nameof(values));
36-
37-
var ydbValues = values.Select(v =>
38-
v as YdbValue ?? (v is YdbParameter param ? param.YdbValue :
39-
new YdbParameter { Value = v }.YdbValue)
40-
).ToArray();
37+
if (values.Length != _columns.Count)
38+
throw new ArgumentException("Values count must match columns count", nameof(values));
4139

42-
var protoStruct = new Ydb.Value();
43-
foreach (var value in ydbValues)
44-
protoStruct.Items.Add(value.GetProto().Value);
40+
var ydbValues = values.Select(v =>
41+
v as YdbValue ?? (v is YdbParameter param ? param.YdbValue :
42+
new YdbParameter { Value = v }.YdbValue)
43+
).ToArray();
4544

46-
_rows.Add(protoStruct);
45+
var protoStruct = new Ydb.Value();
46+
foreach (var value in ydbValues)
47+
protoStruct.Items.Add(value.GetProto().Value);
4748

48-
_structType ??= new StructType
49-
{
50-
Members =
51-
{
52-
_columns.Select((col, i) => new StructMember
53-
{
54-
Name = col,
55-
Type = ydbValues[i].GetProto().Type
56-
})
57-
}
58-
};
49+
var rowSize = protoStruct.CalculateSize();
5950

51+
if (_currentBytes + rowSize > _maxBytes && _rows.Count > 0)
52+
{
6053
await FlushAsync(_cancellationToken);
6154
}
55+
56+
_rows.Add(protoStruct);
57+
_currentBytes += rowSize;
58+
59+
_structType ??= new StructType
60+
{
61+
Members =
62+
{
63+
_columns.Select((col, i) => new StructMember
64+
{
65+
Name = col,
66+
Type = ydbValues[i].GetProto().Type
67+
})
68+
}
69+
};
6270
}
6371

6472
public async ValueTask FlushAsync(CancellationToken cancellationToken = default)
@@ -86,5 +94,6 @@ public async ValueTask FlushAsync(CancellationToken cancellationToken = default)
8694
throw YdbException.FromServer(operation.Status, operation.Issues);
8795

8896
_rows.Clear();
97+
_currentBytes = 0;
8998
}
90-
}
99+
}

src/Ydb.Sdk/src/Ado/BulkUpsert/IBulkUpsertImporter.cs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -2,6 +2,6 @@ namespace Ydb.Sdk.Ado.BulkUpsert;
22

33
public interface IBulkUpsertImporter
44
{
5-
ValueTask AddRowsAsync(IEnumerable<object?[]> rows);
5+
ValueTask AddRowAsync(object?[] rows);
66
ValueTask FlushAsync(CancellationToken cancellationToken = default);
77
}

src/Ydb.Sdk/test/Ydb.Sdk.Ado.Tests/YdbConnectionTests.cs

Lines changed: 14 additions & 20 deletions
Original file line numberDiff line numberDiff line change
@@ -332,11 +332,9 @@ PRIMARY KEY (Id)
332332
var columns = new[] { "Id", "Name" };
333333

334334
var importer = conn.BeginBulkUpsertImport(tableName, columns);
335-
await importer.AddRowsAsync(new object?[][]
336-
{
337-
new object?[] { YdbValue.MakeInt32(1), YdbValue.MakeUtf8("Alice") },
338-
new object?[] { YdbValue.MakeInt32(2), YdbValue.MakeUtf8("Bob") }
339-
});
335+
336+
await importer.AddRowAsync([YdbValue.MakeInt32(1), YdbValue.MakeUtf8("Alice")]);
337+
await importer.AddRowAsync([YdbValue.MakeInt32(2), YdbValue.MakeUtf8("Bob")]);
340338
await importer.FlushAsync();
341339

342340
await using (var checkCmd = conn.CreateCommand())
@@ -347,11 +345,8 @@ await importer.AddRowsAsync(new object?[][]
347345
}
348346

349347
importer = conn.BeginBulkUpsertImport(tableName, columns);
350-
await importer.AddRowsAsync(new object?[][]
351-
{
352-
new object?[] { YdbValue.MakeInt32(3), YdbValue.MakeUtf8("Charlie") },
353-
new object?[] { YdbValue.MakeInt32(4), YdbValue.MakeUtf8("Diana") }
354-
});
348+
await importer.AddRowAsync([YdbValue.MakeInt32(3), YdbValue.MakeUtf8("Charlie")]);
349+
await importer.AddRowAsync([YdbValue.MakeInt32(4), YdbValue.MakeUtf8("Diana")]);
355350
await importer.FlushAsync();
356351

357352
await using (var checkCmd = conn.CreateCommand())
@@ -399,14 +394,13 @@ PRIMARY KEY (Id)
399394
var importer = conn.BeginBulkUpsertImport(tableName, columns);
400395

401396
var badRow = new object?[] { YdbValue.MakeInt32(1) };
402-
await Assert.ThrowsAsync<ArgumentException>(async () => await importer.AddRowsAsync(new object?[][] { badRow }));
397+
await Assert.ThrowsAsync<ArgumentException>(async () => await importer.AddRowAsync([badRow]));
403398

404399
await Assert.ThrowsAsync<ArgumentException>(async () =>
405400
{
406-
await importer.AddRowsAsync(new object?[][]
407-
{
401+
await importer.AddRowAsync([
408402
new object?[] { YdbValue.MakeInt32(2) }
409-
});
403+
]);
410404
});
411405
}
412406
finally
@@ -447,7 +441,8 @@ await Task.WhenAll(
447441
var rows = Enumerable.Range(0, 20)
448442
.Select(i => new object?[] { YdbValue.MakeInt32(i), YdbValue.MakeUtf8($"A{i}") })
449443
.ToArray();
450-
await importer.AddRowsAsync(rows);
444+
foreach (var row in rows)
445+
await importer.AddRowAsync(row);
451446
await importer.FlushAsync();
452447
}),
453448
Task.Run(async () =>
@@ -456,7 +451,8 @@ await Task.WhenAll(
456451
var rows = Enumerable.Range(0, 20)
457452
.Select(i => new object?[] { YdbValue.MakeInt32(i), YdbValue.MakeUtf8($"B{i}") })
458453
.ToArray();
459-
await importer.AddRowsAsync(rows);
454+
foreach (var row in rows)
455+
await importer.AddRowAsync(row);
460456
await importer.FlushAsync();
461457
})
462458
);
@@ -491,12 +487,10 @@ public async Task BulkUpsertImporter_ThrowsOnNonexistentTable()
491487

492488
var importer = conn.BeginBulkUpsertImport(tableName, columns);
493489

490+
await importer.AddRowAsync([YdbValue.MakeInt32(1), YdbValue.MakeUtf8("NotExists")]);
491+
494492
await Assert.ThrowsAsync<YdbException>(async () =>
495493
{
496-
await importer.AddRowsAsync(new object?[][]
497-
{
498-
new object?[] { YdbValue.MakeInt32(1), YdbValue.MakeUtf8("NotExists") }
499-
});
500494
await importer.FlushAsync();
501495
});
502496
}

0 commit comments

Comments
 (0)