Skip to content
Merged
Show file tree
Hide file tree
Changes from 7 commits
Commits
Show all changes
30 commits
Select commit Hold shift + click to select a range
b6bf0d5
Add BulkUpsert implementation
LiamHamsters Jul 23, 2025
261d342
hot fix
LiamHamsters Jul 23, 2025
8173dd0
hot fix 2
LiamHamsters Jul 23, 2025
8d170b8
fix issues
LiamHamsters Jul 24, 2025
5ac5127
feat: make BulkUpsert API more user-friendly
LiamHamsters Jul 24, 2025
3db706d
Implement simple BulkUpsert in YdbConnection and YdbDataSource (delet…
LiamHamsters Jul 25, 2025
a2aa86b
refactor: use array-based rows for BulkUpsert, remove dictionary
LiamHamsters Jul 25, 2025
bff5137
fix: format
LiamHamsters Jul 25, 2025
924da51
last hot fix
LiamHamsters Jul 25, 2025
5e52cc1
fix issues
LiamHamsters Jul 25, 2025
3c219ef
rebase + SessionImpl -> Session
LiamHamsters Jul 25, 2025
4af5c80
fast fix
LiamHamsters Jul 25, 2025
18060cd
BulkUpsert: forbid usage inside transaction, add test
LiamHamsters Jul 25, 2025
f207b69
fix + edit CHANGELOG.md
LiamHamsters Jul 25, 2025
816dc9c
add BulkUpsertProtoImporter
LiamHamsters Jul 29, 2025
7ffa2ee
hot fix
LiamHamsters Jul 29, 2025
1879bca
bulk upsert: add object[] support, simplify implementation
LiamHamsters Jul 29, 2025
14a2bfe
fix: handle exceptions in BulkUpsertImporter DisposeAsync, update tests
LiamHamsters Jul 29, 2025
5cee465
fix
LiamHamsters Jul 30, 2025
b203781
hot fix
LiamHamsters Jul 30, 2025
eec3d12
remove Type argument
LiamHamsters Jul 30, 2025
8c32732
hot fix
LiamHamsters Jul 30, 2025
c9fdbb5
fix inspections and autoformat
LiamHamsters Jul 30, 2025
6463d36
feat: switch BulkUpsertImporter to plain proto, remove extra wrappers…
LiamHamsters Jul 31, 2025
bfc6cf5
refactor: correct batch bytes check and flush logic in BulkUpsertImpo…
LiamHamsters Jul 31, 2025
1c28862
fix
LiamHamsters Jul 31, 2025
0bb791c
last fix
LiamHamsters Jul 31, 2025
beec0e0
Finally
LiamHamsters Jul 31, 2025
b08ebd5
change ct in Flush
LiamHamsters Jul 31, 2025
96fef4a
edit: tablePath calculation in YdbConnection
LiamHamsters Aug 1, 2025
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
50 changes: 50 additions & 0 deletions src/Ydb.Sdk/src/Ado/YdbConnection.cs
Original file line number Diff line number Diff line change
@@ -1,8 +1,12 @@
using System.Data;
using System.Data.Common;
using System.Diagnostics.CodeAnalysis;
using Ydb.Operations;
using Ydb.Sdk.Ado.Session;
using Ydb.Sdk.Services.Query;
using Ydb.Sdk.Value;
using Ydb.Table;
using Ydb.Table.V1;
using static System.Data.IsolationLevel;

namespace Ydb.Sdk.Ado;
Expand Down Expand Up @@ -52,6 +56,52 @@ public YdbConnection(YdbConnectionStringBuilder connectionStringBuilder)
ConnectionStringBuilder = connectionStringBuilder;
}

public async Task BulkUpsertAsync(
string tablePath,
IReadOnlyList<string> columns,
IReadOnlyList<IReadOnlyList<object?>> rows,
CancellationToken cancellationToken = default)
{
if (columns == null || columns.Count == 0)
throw new ArgumentException("Columns must not be empty", nameof(columns));
if (rows == null || rows.Count == 0)
throw new ArgumentException("Rows collection is empty", nameof(rows));

var structs = rows.Select(row =>
{
if (row.Count != columns.Count)
throw new ArgumentException("Each row must have the same number of elements as columns");
var members = columns
.Select((col, i) => new KeyValuePair<string, YdbValue>(col, new YdbParameter { Value = row[i] }.YdbValue))
.ToDictionary(x => x.Key, x => x.Value);
return YdbValue.MakeStruct(members);
}).ToList();

var list = YdbValue.MakeList(structs);

var req = new BulkUpsertRequest
{
Table = tablePath,
Rows = list.GetProto(),
OperationParams = new OperationParams()
};

if (Session is Services.Query.Session sessionImpl)
{
var resp = await sessionImpl.Driver.UnaryCall(
TableService.BulkUpsertMethod,
req,
new GrpcRequestSettings { CancellationToken = cancellationToken }
).ConfigureAwait(false);
var status = Status.FromProto(resp.Operation.Status, resp.Operation.Issues);
status.EnsureSuccess();
}
else
{
throw new InvalidOperationException("Underlying session does not support BulkUpsertAsync");
}
}

protected override YdbTransaction BeginDbTransaction(IsolationLevel isolationLevel)
{
ThrowIfConnectionClosed();
Expand Down
10 changes: 10 additions & 0 deletions src/Ydb.Sdk/src/Ado/YdbDataSource.cs
Original file line number Diff line number Diff line change
Expand Up @@ -65,6 +65,16 @@ protected override async ValueTask DisposeAsyncCore() =>
await PoolManager.ClearPool(_ydbConnectionStringBuilder.ConnectionString);

protected override void Dispose(bool disposing) => DisposeAsyncCore().AsTask().GetAwaiter().GetResult();

public async Task BulkUpsertAsync(
string tablePath,
IReadOnlyList<string> columns,
IReadOnlyList<IReadOnlyList<object?>> rows,
CancellationToken cancellationToken = default)
{
await using var conn = await OpenConnectionAsync(cancellationToken);
await conn.BulkUpsertAsync(tablePath, columns, rows, cancellationToken);
}
}

#endif
154 changes: 153 additions & 1 deletion src/Ydb.Sdk/test/Ydb.Sdk.Ado.Tests/YdbConnectionTests.cs
Original file line number Diff line number Diff line change
Expand Up @@ -308,4 +308,156 @@

protected override async Task OnDisposeAsync() =>
await YdbConnection.ClearPool(new YdbConnection(_connectionStringTls));
}

[Fact]
public async Task BulkUpsert_HappyPath_C()
{
var tableName = $"BulkTest_{Guid.NewGuid():N}";
var database = new YdbConnectionStringBuilder(_connectionStringTls).Database.TrimEnd('/');
var absTablePath = string.IsNullOrEmpty(database) ? tableName : $"{database}/{tableName}";

await using var conn = new YdbConnection(_connectionStringTls);
await conn.OpenAsync();

await using (var createCmd = conn.CreateCommand())
{
createCmd.CommandText = $@"
CREATE TABLE {tableName} (
Id Int32,
Name Utf8,
PRIMARY KEY (Id)
)";
await createCmd.ExecuteNonQueryAsync();
}

await Task.Delay(500);

var columns = new[] { "Id", "Name" };
var rows = Enumerable.Range(1, 10)
.Select(i => new object?[] { i, $"Name {i}" })
.ToList();

await conn.BulkUpsertAsync(absTablePath, columns, rows);

await using (var checkCmd = conn.CreateCommand())
{
checkCmd.CommandText = $"SELECT COUNT(*) FROM {tableName}";
var count = Convert.ToInt32(await checkCmd.ExecuteScalarAsync());
Assert.Equal(rows.Count, count);
}

await using (var dropCmd = conn.CreateCommand())
{
dropCmd.CommandText = $"DROP TABLE {tableName}";
await dropCmd.ExecuteNonQueryAsync();
}
}

[Fact]
public async Task BulkUpsert_InsertsNewRows_C()
{
var tableName = $"BulkTest_{Guid.NewGuid():N}";
var database = new YdbConnectionStringBuilder(_connectionStringTls).Database.TrimEnd('/');
var absTablePath = string.IsNullOrEmpty(database) ? tableName : $"{database}/{tableName}";

await using var conn = new YdbConnection(_connectionStringTls);
await conn.OpenAsync();

await using (var createCmd = conn.CreateCommand())
{
createCmd.CommandText = $@"
CREATE TABLE {tableName} (
Id Int32,
Name Utf8,
PRIMARY KEY (Id)
)";
await createCmd.ExecuteNonQueryAsync();
}

await Task.Delay(500);

var columns = new[] { "Id", "Name" };

var firstRows = new List<object[]>
{
new object?[] { 1, "Alice" },

Check warning on line 383 in src/Ydb.Sdk/test/Ydb.Sdk.Ado.Tests/YdbConnectionTests.cs

View workflow job for this annotation

GitHub Actions / ydb-sdk-tests (latest, 9.0.x)

Argument of type 'object?[]' cannot be used for parameter 'item' of type 'object[]' in 'void List<object[]>.Add(object[] item)' due to differences in the nullability of reference types.

Check warning on line 383 in src/Ydb.Sdk/test/Ydb.Sdk.Ado.Tests/YdbConnectionTests.cs

View workflow job for this annotation

GitHub Actions / Analyze (csharp)

Argument of type 'object?[]' cannot be used for parameter 'item' of type 'object[]' in 'void List<object[]>.Add(object[] item)' due to differences in the nullability of reference types.

Check warning on line 383 in src/Ydb.Sdk/test/Ydb.Sdk.Ado.Tests/YdbConnectionTests.cs

View workflow job for this annotation

GitHub Actions / Analyze (csharp)

Argument of type 'object?[]' cannot be used for parameter 'item' of type 'object[]' in 'void List<object[]>.Add(object[] item)' due to differences in the nullability of reference types.

Check warning on line 383 in src/Ydb.Sdk/test/Ydb.Sdk.Ado.Tests/YdbConnectionTests.cs

View workflow job for this annotation

GitHub Actions / ydb-sdk-tests (latest, 8.0.x)

Argument of type 'object?[]' cannot be used for parameter 'item' of type 'object[]' in 'void List<object[]>.Add(object[] item)' due to differences in the nullability of reference types.

Check warning on line 383 in src/Ydb.Sdk/test/Ydb.Sdk.Ado.Tests/YdbConnectionTests.cs

View workflow job for this annotation

GitHub Actions / ydb-sdk-tests (25.1, 9.0.x)

Argument of type 'object?[]' cannot be used for parameter 'item' of type 'object[]' in 'void List<object[]>.Add(object[] item)' due to differences in the nullability of reference types.

Check warning on line 383 in src/Ydb.Sdk/test/Ydb.Sdk.Ado.Tests/YdbConnectionTests.cs

View workflow job for this annotation

GitHub Actions / ydb-sdk-tests (25.1, 8.0.x)

Argument of type 'object?[]' cannot be used for parameter 'item' of type 'object[]' in 'void List<object[]>.Add(object[] item)' due to differences in the nullability of reference types.

Check warning on line 383 in src/Ydb.Sdk/test/Ydb.Sdk.Ado.Tests/YdbConnectionTests.cs

View workflow job for this annotation

GitHub Actions / Inspection (./src/YdbSdk.sln)

"[CSharpWarnings::CS8620] Argument of type 'object?[]' cannot be used for parameter 'item' of type 'object[]' in 'System.Collections.Generic.List<T>.Add' because of differences in the nullability of reference types" on /home/runner/work/ydb-dotnet-sdk/ydb-dotnet-sdk/src/Ydb.Sdk/test/Ydb.Sdk.Ado.Tests/YdbConnectionTests.cs(383,13)
new object?[] { 2, "Bob" }

Check warning on line 384 in src/Ydb.Sdk/test/Ydb.Sdk.Ado.Tests/YdbConnectionTests.cs

View workflow job for this annotation

GitHub Actions / ydb-sdk-tests (latest, 9.0.x)

Argument of type 'object?[]' cannot be used for parameter 'item' of type 'object[]' in 'void List<object[]>.Add(object[] item)' due to differences in the nullability of reference types.

Check warning on line 384 in src/Ydb.Sdk/test/Ydb.Sdk.Ado.Tests/YdbConnectionTests.cs

View workflow job for this annotation

GitHub Actions / Analyze (csharp)

Argument of type 'object?[]' cannot be used for parameter 'item' of type 'object[]' in 'void List<object[]>.Add(object[] item)' due to differences in the nullability of reference types.

Check warning on line 384 in src/Ydb.Sdk/test/Ydb.Sdk.Ado.Tests/YdbConnectionTests.cs

View workflow job for this annotation

GitHub Actions / Analyze (csharp)

Argument of type 'object?[]' cannot be used for parameter 'item' of type 'object[]' in 'void List<object[]>.Add(object[] item)' due to differences in the nullability of reference types.

Check warning on line 384 in src/Ydb.Sdk/test/Ydb.Sdk.Ado.Tests/YdbConnectionTests.cs

View workflow job for this annotation

GitHub Actions / ydb-sdk-tests (latest, 8.0.x)

Argument of type 'object?[]' cannot be used for parameter 'item' of type 'object[]' in 'void List<object[]>.Add(object[] item)' due to differences in the nullability of reference types.

Check warning on line 384 in src/Ydb.Sdk/test/Ydb.Sdk.Ado.Tests/YdbConnectionTests.cs

View workflow job for this annotation

GitHub Actions / ydb-sdk-tests (25.1, 9.0.x)

Argument of type 'object?[]' cannot be used for parameter 'item' of type 'object[]' in 'void List<object[]>.Add(object[] item)' due to differences in the nullability of reference types.

Check warning on line 384 in src/Ydb.Sdk/test/Ydb.Sdk.Ado.Tests/YdbConnectionTests.cs

View workflow job for this annotation

GitHub Actions / ydb-sdk-tests (25.1, 8.0.x)

Argument of type 'object?[]' cannot be used for parameter 'item' of type 'object[]' in 'void List<object[]>.Add(object[] item)' due to differences in the nullability of reference types.

Check warning on line 384 in src/Ydb.Sdk/test/Ydb.Sdk.Ado.Tests/YdbConnectionTests.cs

View workflow job for this annotation

GitHub Actions / Inspection (./src/YdbSdk.sln)

"[CSharpWarnings::CS8620] Argument of type 'object?[]' cannot be used for parameter 'item' of type 'object[]' in 'System.Collections.Generic.List<T>.Add' because of differences in the nullability of reference types" on /home/runner/work/ydb-dotnet-sdk/ydb-dotnet-sdk/src/Ydb.Sdk/test/Ydb.Sdk.Ado.Tests/YdbConnectionTests.cs(384,13)
};
await conn.BulkUpsertAsync(absTablePath, columns, firstRows);

var newRows = new List<object[]>
{
new object?[] { 3, "Charlie" },

Check warning on line 390 in src/Ydb.Sdk/test/Ydb.Sdk.Ado.Tests/YdbConnectionTests.cs

View workflow job for this annotation

GitHub Actions / ydb-sdk-tests (latest, 9.0.x)

Argument of type 'object?[]' cannot be used for parameter 'item' of type 'object[]' in 'void List<object[]>.Add(object[] item)' due to differences in the nullability of reference types.

Check warning on line 390 in src/Ydb.Sdk/test/Ydb.Sdk.Ado.Tests/YdbConnectionTests.cs

View workflow job for this annotation

GitHub Actions / Analyze (csharp)

Argument of type 'object?[]' cannot be used for parameter 'item' of type 'object[]' in 'void List<object[]>.Add(object[] item)' due to differences in the nullability of reference types.

Check warning on line 390 in src/Ydb.Sdk/test/Ydb.Sdk.Ado.Tests/YdbConnectionTests.cs

View workflow job for this annotation

GitHub Actions / ydb-sdk-tests (latest, 8.0.x)

Argument of type 'object?[]' cannot be used for parameter 'item' of type 'object[]' in 'void List<object[]>.Add(object[] item)' due to differences in the nullability of reference types.

Check warning on line 390 in src/Ydb.Sdk/test/Ydb.Sdk.Ado.Tests/YdbConnectionTests.cs

View workflow job for this annotation

GitHub Actions / ydb-sdk-tests (25.1, 9.0.x)

Argument of type 'object?[]' cannot be used for parameter 'item' of type 'object[]' in 'void List<object[]>.Add(object[] item)' due to differences in the nullability of reference types.

Check warning on line 390 in src/Ydb.Sdk/test/Ydb.Sdk.Ado.Tests/YdbConnectionTests.cs

View workflow job for this annotation

GitHub Actions / ydb-sdk-tests (25.1, 8.0.x)

Argument of type 'object?[]' cannot be used for parameter 'item' of type 'object[]' in 'void List<object[]>.Add(object[] item)' due to differences in the nullability of reference types.

Check warning on line 390 in src/Ydb.Sdk/test/Ydb.Sdk.Ado.Tests/YdbConnectionTests.cs

View workflow job for this annotation

GitHub Actions / Inspection (./src/YdbSdk.sln)

"[CSharpWarnings::CS8620] Argument of type 'object?[]' cannot be used for parameter 'item' of type 'object[]' in 'System.Collections.Generic.List<T>.Add' because of differences in the nullability of reference types" on /home/runner/work/ydb-dotnet-sdk/ydb-dotnet-sdk/src/Ydb.Sdk/test/Ydb.Sdk.Ado.Tests/YdbConnectionTests.cs(390,13)
new object?[] { 4, "Diana" }

Check warning on line 391 in src/Ydb.Sdk/test/Ydb.Sdk.Ado.Tests/YdbConnectionTests.cs

View workflow job for this annotation

GitHub Actions / ydb-sdk-tests (latest, 9.0.x)

Argument of type 'object?[]' cannot be used for parameter 'item' of type 'object[]' in 'void List<object[]>.Add(object[] item)' due to differences in the nullability of reference types.

Check warning on line 391 in src/Ydb.Sdk/test/Ydb.Sdk.Ado.Tests/YdbConnectionTests.cs

View workflow job for this annotation

GitHub Actions / Analyze (csharp)

Argument of type 'object?[]' cannot be used for parameter 'item' of type 'object[]' in 'void List<object[]>.Add(object[] item)' due to differences in the nullability of reference types.

Check warning on line 391 in src/Ydb.Sdk/test/Ydb.Sdk.Ado.Tests/YdbConnectionTests.cs

View workflow job for this annotation

GitHub Actions / ydb-sdk-tests (latest, 8.0.x)

Argument of type 'object?[]' cannot be used for parameter 'item' of type 'object[]' in 'void List<object[]>.Add(object[] item)' due to differences in the nullability of reference types.

Check warning on line 391 in src/Ydb.Sdk/test/Ydb.Sdk.Ado.Tests/YdbConnectionTests.cs

View workflow job for this annotation

GitHub Actions / ydb-sdk-tests (25.1, 9.0.x)

Argument of type 'object?[]' cannot be used for parameter 'item' of type 'object[]' in 'void List<object[]>.Add(object[] item)' due to differences in the nullability of reference types.

Check warning on line 391 in src/Ydb.Sdk/test/Ydb.Sdk.Ado.Tests/YdbConnectionTests.cs

View workflow job for this annotation

GitHub Actions / ydb-sdk-tests (25.1, 8.0.x)

Argument of type 'object?[]' cannot be used for parameter 'item' of type 'object[]' in 'void List<object[]>.Add(object[] item)' due to differences in the nullability of reference types.

Check warning on line 391 in src/Ydb.Sdk/test/Ydb.Sdk.Ado.Tests/YdbConnectionTests.cs

View workflow job for this annotation

GitHub Actions / Inspection (./src/YdbSdk.sln)

"[CSharpWarnings::CS8620] Argument of type 'object?[]' cannot be used for parameter 'item' of type 'object[]' in 'System.Collections.Generic.List<T>.Add' because of differences in the nullability of reference types" on /home/runner/work/ydb-dotnet-sdk/ydb-dotnet-sdk/src/Ydb.Sdk/test/Ydb.Sdk.Ado.Tests/YdbConnectionTests.cs(391,13)
};
await conn.BulkUpsertAsync(absTablePath, columns, newRows);

await using (var selectCmd = conn.CreateCommand())
{
selectCmd.CommandText = $"SELECT Id, Name FROM {tableName}";
var results = new Dictionary<int, string>();
await using var reader = await selectCmd.ExecuteReaderAsync();
while (await reader.ReadAsync())
{
results[reader.GetInt32(0)] = reader.GetString(1);
}

Assert.Equal(4, results.Count);
Assert.Equal("Alice", results[1]);
Assert.Equal("Bob", results[2]);
Assert.Equal("Charlie", results[3]);
Assert.Equal("Diana", results[4]);
}

await using (var dropCmd = conn.CreateCommand())
{
dropCmd.CommandText = $"DROP TABLE {tableName};";
await dropCmd.ExecuteNonQueryAsync();
}
}

[Fact]
public async Task BulkUpsert_UpdatesExistingRows_C()
{
var tableName = $"BulkTest_{Guid.NewGuid():N}";
var database = new YdbConnectionStringBuilder(_connectionStringTls).Database.TrimEnd('/');
var absTablePath = string.IsNullOrEmpty(database) ? tableName : $"{database}/{tableName}";

await using var conn = new YdbConnection(_connectionStringTls);
await conn.OpenAsync();

await using (var createCmd = conn.CreateCommand())
{
createCmd.CommandText = $@"
CREATE TABLE {tableName} (
Id Int32,
Name Utf8,
PRIMARY KEY (Id)
)";
await createCmd.ExecuteNonQueryAsync();
}

await Task.Delay(500);

var columns = new[] { "Id", "Name" };

var row = new object?[] { 1, "Alice" };
await conn.BulkUpsertAsync(absTablePath, columns, new[] { row });

var updated = new object?[] { 1, "Alice Updated" };
await conn.BulkUpsertAsync(absTablePath, columns, new[] { updated });

await using (var selectCmd = conn.CreateCommand())
{
selectCmd.CommandText = $"SELECT Name FROM {tableName} WHERE Id = 1;";
var name = (string)(await selectCmd.ExecuteScalarAsync())!;
Assert.Equal("Alice Updated", name);
}

await using (var dropCmd = conn.CreateCommand())
{
dropCmd.CommandText = $"DROP TABLE {tableName};";
await dropCmd.ExecuteNonQueryAsync();
}
}
}
Loading
Loading