Skip to content

Commit 3db706d

Browse files
committed
Implement simple BulkUpsert in YdbConnection and YdbDataSource (delete module)
1 parent 5ac5127 commit 3db706d

File tree

8 files changed

+115
-248
lines changed

8 files changed

+115
-248
lines changed

src/Ydb.Sdk/src/Ado/BulkUpsert/BulkUpsertOptions.cs

Lines changed: 0 additions & 6 deletions
This file was deleted.

src/Ydb.Sdk/src/Ado/BulkUpsert/TypedValueFactory.cs

Lines changed: 0 additions & 29 deletions
This file was deleted.

src/Ydb.Sdk/src/Ado/BulkUpsert/YdbBulkUpsertImporter.cs

Lines changed: 0 additions & 71 deletions
This file was deleted.

src/Ydb.Sdk/src/Ado/YdbConnection.cs

Lines changed: 15 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -2,9 +2,9 @@
22
using System.Data.Common;
33
using System.Diagnostics.CodeAnalysis;
44
using Ydb.Operations;
5-
using Ydb.Sdk.Ado.BulkUpsert;
65
using Ydb.Sdk.Ado.Session;
76
using Ydb.Sdk.Services.Query;
7+
using Ydb.Sdk.Value;
88
using Ydb.Table;
99
using Ydb.Table.V1;
1010
using static System.Data.IsolationLevel;
@@ -56,20 +56,26 @@ public YdbConnection(YdbConnectionStringBuilder connectionStringBuilder)
5656
ConnectionStringBuilder = connectionStringBuilder;
5757
}
5858

59-
internal async Task BulkUpsertInternalAsync<T>(
59+
public async Task BulkUpsertAsync(
6060
string tablePath,
61-
IReadOnlyCollection<T> rows,
6261
IReadOnlyList<string> columns,
63-
IReadOnlyList<Func<T, object?>> selectors,
64-
CancellationToken cancellationToken)
62+
IReadOnlyList<IReadOnlyDictionary<string, object?>> rows,
63+
CancellationToken cancellationToken = default)
6564
{
66-
if (CurrentTransaction is { Completed: false })
67-
throw new InvalidOperationException("BulkUpsert does not support working within a transaction");
65+
var structs = rows.Select(dict =>
66+
YdbValue.MakeStruct(
67+
columns.ToDictionary(
68+
col => col,
69+
col => new YdbParameter { Value = dict[col] }.YdbValue
70+
))).ToList();
71+
72+
var list = YdbValue.MakeList(structs);
73+
6874
var req = new BulkUpsertRequest
6975
{
7076
Table = tablePath,
71-
OperationParams = new OperationParams(),
72-
Rows = TypedValueFactory.FromObjects(rows, columns, selectors)
77+
Rows = list.GetProto(),
78+
OperationParams = new OperationParams()
7379
};
7480

7581
if (Session is Services.Query.Session sessionImpl)

src/Ydb.Sdk/src/Ado/YdbDataSource.cs

Lines changed: 4 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,5 @@
11
#if NET7_0_OR_GREATER
22
using System.Data.Common;
3-
using Ydb.Sdk.Ado.BulkUpsert;
43
using Ydb.Sdk.Value;
54

65
namespace Ydb.Sdk.Ado;
@@ -68,21 +67,14 @@ protected override async ValueTask DisposeAsyncCore() =>
6867

6968
protected override void Dispose(bool disposing) => DisposeAsyncCore().AsTask().GetAwaiter().GetResult();
7069

71-
public async Task<YdbBulkUpsertImporter<T>> BeginBulkUpsertAsync<T>(
70+
public async Task BulkUpsertAsync(
7271
string tablePath,
7372
IReadOnlyList<string> columns,
74-
IReadOnlyList<Func<T, YdbValue>> selectors,
75-
int maxRowsInBatch = 1000,
73+
IReadOnlyList<IReadOnlyDictionary<string, object?>> rows,
7674
CancellationToken cancellationToken = default)
7775
{
78-
var conn = await OpenConnectionAsync(cancellationToken).ConfigureAwait(false);
79-
80-
return new YdbBulkUpsertImporter<T>(
81-
conn,
82-
tablePath,
83-
columns,
84-
maxRowsInBatch
85-
);
76+
await using var conn = await OpenConnectionAsync(cancellationToken);
77+
await conn.BulkUpsertAsync(tablePath, columns, rows, cancellationToken);
8678
}
8779
}
8880

src/Ydb.Sdk/src/Services/Table/BulkUpsertExtensions.cs

Lines changed: 0 additions & 33 deletions
This file was deleted.

src/Ydb.Sdk/test/Ydb.Sdk.Ado.Tests/YdbConnectionTests.cs

Lines changed: 28 additions & 23 deletions
Original file line numberDiff line numberDiff line change
@@ -316,6 +316,13 @@ private class TestEntity
316316
public string Name { get; set; }
317317
}
318318

319+
private static IReadOnlyList<IReadOnlyDictionary<string, object?>> ToDicts(IEnumerable<TestEntity> rows)
320+
=> rows.Select(x => (IReadOnlyDictionary<string, object?>)new Dictionary<string, object?>
321+
{
322+
["Id"] = x.Id,
323+
["Name"] = x.Name
324+
}).ToList();
325+
319326
[Fact]
320327
public async Task BulkUpsert_HappyPath_C()
321328
{
@@ -329,11 +336,11 @@ public async Task BulkUpsert_HappyPath_C()
329336
await using (var createCmd = conn.CreateCommand())
330337
{
331338
createCmd.CommandText = $@"
332-
CREATE TABLE {tableName} (
333-
Id Int32,
334-
Name Utf8,
335-
PRIMARY KEY (Id)
336-
)";
339+
CREATE TABLE {tableName} (
340+
Id Int32,
341+
Name Utf8,
342+
PRIMARY KEY (Id)
343+
)";
337344
await createCmd.ExecuteNonQueryAsync();
338345
}
339346

@@ -344,8 +351,7 @@ PRIMARY KEY (Id)
344351
.ToList();
345352

346353
var columns = new[] { "Id", "Name" };
347-
348-
await conn.BulkUpsertWithRetry(absTablePath, rows, columns, default);
354+
await conn.BulkUpsertAsync(absTablePath, columns, ToDicts(rows));
349355

350356
await using (var checkCmd = conn.CreateCommand())
351357
{
@@ -374,11 +380,11 @@ public async Task BulkUpsert_InsertsNewRows_С()
374380
await using (var createCmd = conn.CreateCommand())
375381
{
376382
createCmd.CommandText = $@"
377-
CREATE TABLE {tableName} (
378-
Id Int32,
379-
Name Utf8,
380-
PRIMARY KEY (Id)
381-
)";
383+
CREATE TABLE {tableName} (
384+
Id Int32,
385+
Name Utf8,
386+
PRIMARY KEY (Id)
387+
)";
382388
await createCmd.ExecuteNonQueryAsync();
383389
}
384390

@@ -391,14 +397,14 @@ PRIMARY KEY (Id)
391397
new() { Id = 1, Name = "Alice" },
392398
new() { Id = 2, Name = "Bob" }
393399
};
394-
await conn.BulkUpsertWithRetry(absTablePath, firstRows, columns, default);
400+
await conn.BulkUpsertAsync(absTablePath, columns, ToDicts(firstRows));
395401

396402
var newRows = new List<TestEntity>
397403
{
398404
new() { Id = 3, Name = "Charlie" },
399405
new() { Id = 4, Name = "Diana" }
400406
};
401-
await conn.BulkUpsertWithRetry(absTablePath, newRows, columns, default);
407+
await conn.BulkUpsertAsync(absTablePath, columns, ToDicts(newRows));
402408

403409
await using (var selectCmd = conn.CreateCommand())
404410
{
@@ -437,11 +443,11 @@ public async Task BulkUpsert_UpdatesExistingRows_С()
437443
await using (var createCmd = conn.CreateCommand())
438444
{
439445
createCmd.CommandText = $@"
440-
CREATE TABLE {tableName} (
441-
Id Int32,
442-
Name Utf8,
443-
PRIMARY KEY (Id)
444-
)";
446+
CREATE TABLE {tableName} (
447+
Id Int32,
448+
Name Utf8,
449+
PRIMARY KEY (Id)
450+
)";
445451
await createCmd.ExecuteNonQueryAsync();
446452
}
447453

@@ -450,15 +456,15 @@ PRIMARY KEY (Id)
450456
var columns = new[] { "Id", "Name" };
451457

452458
var row = new TestEntity { Id = 1, Name = "Alice" };
453-
await conn.BulkUpsertWithRetry(absTablePath, new[] { row }, columns, default);
459+
await conn.BulkUpsertAsync(absTablePath, columns, ToDicts([row]));
454460

455461
var updated = new TestEntity { Id = 1, Name = "Alice Updated" };
456-
await conn.BulkUpsertWithRetry(absTablePath, new[] { updated }, columns, default);
462+
await conn.BulkUpsertAsync(absTablePath, columns, ToDicts([updated]));
457463

458464
await using (var selectCmd = conn.CreateCommand())
459465
{
460466
selectCmd.CommandText = $"SELECT Name FROM {tableName} WHERE Id = 1;";
461-
var name = (string)(await selectCmd.ExecuteScalarAsync())!;
467+
var name = await selectCmd.ExecuteScalarAsync() as string;
462468
Assert.Equal("Alice Updated", name);
463469
}
464470

@@ -468,5 +474,4 @@ PRIMARY KEY (Id)
468474
await dropCmd.ExecuteNonQueryAsync();
469475
}
470476
}
471-
472477
}

0 commit comments

Comments
 (0)