Skip to content

Commit b6bf0d5

Browse files
committed
Add BulkUpsert implementation
1 parent 30a4fa2 commit b6bf0d5

File tree

11 files changed

+646
-2
lines changed

11 files changed

+646
-2
lines changed
Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,7 @@
1+
namespace Ydb.Sdk.Ado.BulkUpsert
2+
{
3+
public sealed record BulkUpsertOptions(
4+
TimeSpan? Timeout = null,
5+
CancellationToken CancellationToken = default
6+
);
7+
}
Lines changed: 80 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,80 @@
1+
using System.Reflection;
2+
using Ydb.Sdk.Value;
3+
4+
namespace Ydb.Sdk.Ado.BulkUpsert;
5+
6+
internal static class TypedValueFactory
7+
{
8+
public static TypedValue FromObjects<T>(IReadOnlyCollection<T> rows)
9+
{
10+
if (rows.Count == 0)
11+
throw new ArgumentException("Rows collection is empty.", nameof(rows));
12+
13+
var props = typeof(T).GetProperties(BindingFlags.Instance | BindingFlags.Public)
14+
.Where(p => p.CanRead).ToArray();
15+
16+
var structs = new List<YdbValue>(rows.Count);
17+
18+
foreach (var row in rows)
19+
{
20+
var members = new Dictionary<string, YdbValue>(props.Length);
21+
foreach (var p in props)
22+
members[p.Name] = ToYdbValue(p.GetValue(row), p.PropertyType);
23+
24+
structs.Add(YdbValue.MakeStruct(members));
25+
}
26+
27+
var list = YdbValue.MakeList(structs);
28+
return list.GetProto();
29+
}
30+
31+
private static YdbValue ToYdbValue(object? value, global::System.Type clr)
32+
{
33+
if (value is null) return MakeOptional(clr);
34+
35+
if (clr == typeof(bool)) return (YdbValue)(bool)value;
36+
if (clr == typeof(sbyte)) return (YdbValue)(sbyte)value;
37+
if (clr == typeof(short)) return (YdbValue)(short)value;
38+
if (clr == typeof(int)) return (YdbValue)(int)value;
39+
if (clr == typeof(long)) return (YdbValue)(long)value;
40+
if (clr == typeof(byte)) return (YdbValue)(byte)value;
41+
if (clr == typeof(ushort)) return (YdbValue)(ushort)value;
42+
if (clr == typeof(uint)) return (YdbValue)(uint)value;
43+
if (clr == typeof(ulong)) return (YdbValue)(ulong)value;
44+
if (clr == typeof(float)) return (YdbValue)(float)value;
45+
if (clr == typeof(double)) return (YdbValue)(double)value;
46+
if (clr == typeof(decimal)) return (YdbValue)(decimal)value;
47+
if (clr == typeof(DateTime)) return YdbValue.MakeTimestamp((DateTime)value);
48+
if (clr == typeof(TimeSpan)) return (YdbValue)(TimeSpan)value;
49+
if (clr == typeof(Guid)) return YdbValue.MakeUuid((Guid)value);
50+
if (clr == typeof(string)) return YdbValue.MakeUtf8((string)value);
51+
if (clr == typeof(byte[])) return YdbValue.MakeString((byte[])value);
52+
53+
throw new NotSupportedException($"Type '{clr.FullName}' is not supported.");
54+
}
55+
56+
private static YdbValue MakeOptional(global::System.Type clr)
57+
{
58+
var t = Nullable.GetUnderlyingType(clr) ?? clr;
59+
60+
if (t == typeof(bool)) return YdbValue.MakeOptionalBool();
61+
if (t == typeof(sbyte)) return YdbValue.MakeOptionalInt8();
62+
if (t == typeof(short)) return YdbValue.MakeOptionalInt16();
63+
if (t == typeof(int)) return YdbValue.MakeOptionalInt32();
64+
if (t == typeof(long)) return YdbValue.MakeOptionalInt64();
65+
if (t == typeof(byte)) return YdbValue.MakeOptionalUint8();
66+
if (t == typeof(ushort)) return YdbValue.MakeOptionalUint16();
67+
if (t == typeof(uint)) return YdbValue.MakeOptionalUint32();
68+
if (t == typeof(ulong)) return YdbValue.MakeOptionalUint64();
69+
if (t == typeof(float)) return YdbValue.MakeOptionalFloat();
70+
if (t == typeof(double)) return YdbValue.MakeOptionalDouble();
71+
if (t == typeof(decimal)) return YdbValue.MakeOptionalDecimal();
72+
if (t == typeof(DateTime)) return YdbValue.MakeOptionalTimestamp();
73+
if (t == typeof(TimeSpan)) return YdbValue.MakeOptionalInterval();
74+
if (t == typeof(Guid)) return YdbValue.MakeOptionalUuid();
75+
if (t == typeof(string)) return YdbValue.MakeOptionalUtf8();
76+
if (t == typeof(byte[])) return YdbValue.MakeOptionalString();
77+
78+
throw new NotSupportedException($"Null for '{t.FullName}' is not supported.");
79+
}
80+
}
Lines changed: 73 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,73 @@
1+
using System;
2+
using System.Collections.Generic;
3+
using System.Threading;
4+
using System.Threading.Tasks;
5+
using Ydb.Sdk.Client;
6+
using Ydb.Sdk.Services.Table;
7+
8+
namespace Ydb.Sdk.Ado.BulkUpsert;
9+
10+
public sealed class YdbBulkUpsertImporter<T> : IAsyncDisposable
11+
{
12+
private readonly TableClient _tableClient;
13+
private readonly string _tablePath;
14+
private readonly BulkUpsertOptions _options;
15+
private readonly RetrySettings? _retrySettings;
16+
private readonly int _maxRowsInBatch;
17+
private readonly List<T> _buffer = new();
18+
private bool _isCompleted;
19+
20+
public YdbBulkUpsertImporter(
21+
TableClient tableClient,
22+
string tablePath,
23+
BulkUpsertOptions? options = null,
24+
RetrySettings? retrySettings = null,
25+
int maxRowsInBatch = 1000)
26+
{
27+
_tableClient = tableClient ?? throw new ArgumentNullException(nameof(tableClient));
28+
_tablePath = tablePath ?? throw new ArgumentNullException(nameof(tablePath));
29+
_options = options ?? new BulkUpsertOptions();
30+
_retrySettings = retrySettings;
31+
_maxRowsInBatch = maxRowsInBatch;
32+
}
33+
34+
public async Task WriteRowAsync(T row, CancellationToken cancellationToken = default)
35+
{
36+
if (_isCompleted)
37+
throw new InvalidOperationException("BulkUpsertImporter уже завершён.");
38+
39+
_buffer.Add(row);
40+
41+
if (_buffer.Count >= _maxRowsInBatch)
42+
{
43+
await FlushAsync(cancellationToken).ConfigureAwait(false);
44+
}
45+
}
46+
47+
public async Task FlushAsync(CancellationToken cancellationToken = default)
48+
{
49+
if (_buffer.Count == 0)
50+
return;
51+
52+
await _tableClient.BulkUpsertWithRetry(
53+
_tablePath,
54+
_buffer,
55+
_retrySettings
56+
).ConfigureAwait(false);
57+
58+
_buffer.Clear();
59+
}
60+
61+
public async Task CompleteAsync(CancellationToken cancellationToken = default)
62+
{
63+
if (_isCompleted) return;
64+
await FlushAsync(cancellationToken).ConfigureAwait(false);
65+
_isCompleted = true;
66+
}
67+
68+
public async ValueTask DisposeAsync()
69+
{
70+
if (!_isCompleted)
71+
await CompleteAsync();
72+
}
73+
}

src/Ydb.Sdk/src/Ado/YdbConnection.cs

Lines changed: 51 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,8 +1,12 @@
11
using System.Data;
22
using System.Data.Common;
33
using System.Diagnostics.CodeAnalysis;
4+
using Ydb.Operations;
5+
using Ydb.Sdk.Ado.BulkUpsert;
46
using Ydb.Sdk.Ado.Session;
57
using Ydb.Sdk.Services.Query;
8+
using Ydb.Sdk.Services.Table;
9+
using Ydb.Table;
610
using static System.Data.IsolationLevel;
711

812
namespace Ydb.Sdk.Ado;
@@ -52,6 +56,53 @@ public YdbConnection(YdbConnectionStringBuilder connectionStringBuilder)
5256
ConnectionStringBuilder = connectionStringBuilder;
5357
}
5458

59+
internal async Task BulkUpsertInternalAsync<T>(
60+
string tablePath,
61+
IReadOnlyCollection<T> rows,
62+
BulkUpsertOptions options,
63+
CancellationToken cancellationToken)
64+
{
65+
if (CurrentTransaction is { Completed: false })
66+
throw new InvalidOperationException("BulkUpsert does not support working within a transaction");
67+
var req = new BulkUpsertRequest
68+
{
69+
Table = tablePath,
70+
OperationParams = new OperationParams(),
71+
Rows = TypedValueFactory.FromObjects(rows)
72+
};
73+
74+
if (Session is Ydb.Sdk.Services.Query.Session sessionImpl)
75+
{
76+
var resp = await sessionImpl.BulkUpsertAsync(req, cancellationToken).ConfigureAwait(false);
77+
var status = Status.FromProto(resp.Operation.Status, resp.Operation.Issues);
78+
status.EnsureSuccess();
79+
}
80+
else
81+
{
82+
throw new InvalidOperationException("Underlying session does not support BulkUpsertAsync");
83+
}
84+
}
85+
86+
public YdbBulkUpsertImporter<T> BeginBulkUpsert<T>(
87+
string tablePath,
88+
BulkUpsertOptions? options = null,
89+
RetrySettings? retrySettings = null,
90+
int maxRowsInBatch = 1000)
91+
{
92+
ThrowIfConnectionClosed();
93+
if (CurrentTransaction is { Completed: false })
94+
throw new InvalidOperationException("BulkUpsert does not support working within a transaction");
95+
96+
var realSession = Session as Ydb.Sdk.Services.Query.Session
97+
?? throw new InvalidOperationException("Underlying session does not support bulk upsert");
98+
99+
var driver = realSession.Driver as Ydb.Sdk.Driver
100+
?? throw new InvalidOperationException("Session driver is not of expected type 'Ydb.Sdk.Driver'");
101+
102+
var tableClient = new TableClient(driver);
103+
return new YdbBulkUpsertImporter<T>(tableClient, tablePath, options, retrySettings, maxRowsInBatch);
104+
}
105+
55106
protected override YdbTransaction BeginDbTransaction(IsolationLevel isolationLevel)
56107
{
57108
ThrowIfConnectionClosed();

src/Ydb.Sdk/src/Ado/YdbDataSource.cs

Lines changed: 22 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,7 @@
11
#if NET7_0_OR_GREATER
22
using System.Data.Common;
3+
using Ydb.Sdk.Ado.BulkUpsert;
4+
using Ydb.Sdk.Services.Table;
35

46
namespace Ydb.Sdk.Ado;
57

@@ -65,6 +67,26 @@ protected override async ValueTask DisposeAsyncCore() =>
6567
await PoolManager.ClearPool(_ydbConnectionStringBuilder.ConnectionString);
6668

6769
protected override void Dispose(bool disposing) => DisposeAsyncCore().AsTask().GetAwaiter().GetResult();
70+
71+
public async Task<YdbBulkUpsertImporter<T>> BeginBulkUpsertAsync<T>(
72+
string tablePath,
73+
BulkUpsertOptions? options = null,
74+
RetrySettings? retrySettings = null,
75+
int maxBatchSizeBytes = 64 * 1024 * 1024,
76+
CancellationToken cancellationToken = default)
77+
{
78+
var conn = await OpenConnectionAsync(cancellationToken).ConfigureAwait(false);
79+
80+
var realSession = conn.Session as Ydb.Sdk.Services.Query.Session
81+
?? throw new InvalidOperationException("Underlying session does not support bulk upsert");
82+
83+
var driver = realSession.Driver as Ydb.Sdk.Driver
84+
?? throw new InvalidOperationException("Session driver is not of expected type 'Ydb.Sdk.Driver'");
85+
86+
var tableClient = new TableClient(driver);
87+
88+
return new YdbBulkUpsertImporter<T>(tableClient, tablePath, options, retrySettings, maxBatchSizeBytes);
89+
}
6890
}
6991

7092
#endif

src/Ydb.Sdk/src/Services/Query/SessionPool.cs

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -6,6 +6,8 @@
66
using Ydb.Sdk.Ado.Session;
77
using Ydb.Sdk.Pool;
88
using Ydb.Sdk.Value;
9+
using Ydb.Table;
10+
using Ydb.Table.V1;
911
using CommitTransactionRequest = Ydb.Query.CommitTransactionRequest;
1012
using CreateSessionRequest = Ydb.Query.CreateSessionRequest;
1113
using DeleteSessionRequest = Ydb.Query.DeleteSessionRequest;
@@ -226,4 +228,10 @@ internal override async Task DeleteSession()
226228
throw YdbException.FromServer(deleteSessionResponse.Status, deleteSessionResponse.Issues);
227229
}
228230
}
231+
232+
internal async Task<BulkUpsertResponse> BulkUpsertAsync(BulkUpsertRequest req, CancellationToken ct = default)
233+
{
234+
var settings = MakeGrpcRequestSettings(new GrpcRequestSettings { CancellationToken = ct });
235+
return await Driver.UnaryCall(TableService.BulkUpsertMethod, req, settings);
236+
}
229237
}
Lines changed: 32 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,32 @@
1+
using Ydb.Sdk.Ado.BulkUpsert;
2+
using Ydb.Operations;
3+
using Ydb.Sdk.Client;
4+
using Ydb.Table;
5+
6+
namespace Ydb.Sdk.Services.Table
7+
{
8+
public static class BulkUpsertExtensions
9+
{
10+
public static async Task<IResponse> BulkUpsertWithRetry<T>(
11+
this TableClient tableClient,
12+
string tablePath,
13+
IReadOnlyCollection<T> rows,
14+
RetrySettings? retrySettings = null)
15+
{
16+
return await tableClient.SessionExec(
17+
async session =>
18+
{
19+
var req = new BulkUpsertRequest
20+
{
21+
Table = tablePath,
22+
OperationParams = new OperationParams(),
23+
Rows = TypedValueFactory.FromObjects(rows)
24+
};
25+
var resp = await session.BulkUpsertAsync(req);
26+
return new BulkUpsertResponseAdapter(resp);
27+
},
28+
retrySettings
29+
);
30+
}
31+
}
32+
}
Lines changed: 18 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,18 @@
1+
using Ydb.Sdk.Client;
2+
using Ydb.Operations;
3+
using Ydb.Table;
4+
5+
namespace Ydb.Sdk.Services.Table
6+
{
7+
public class BulkUpsertResponseAdapter : IResponse
8+
{
9+
public Status Status { get; }
10+
public BulkUpsertResponse Response { get; }
11+
12+
public BulkUpsertResponseAdapter(BulkUpsertResponse response)
13+
{
14+
Response = response ?? throw new ArgumentNullException(nameof(response));
15+
Status = Status.FromProto(response.Operation.Status, response.Operation.Issues);
16+
}
17+
}
18+
}

src/Ydb.Sdk/src/Services/Table/Session.cs

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,8 @@
11
using Grpc.Core;
22
using Microsoft.Extensions.Logging;
33
using Ydb.Sdk.Services.Sessions;
4+
using Ydb.Table;
5+
using Ydb.Table.V1;
46

57
namespace Ydb.Sdk.Services.Table;
68

@@ -80,4 +82,8 @@ private async Task<TResponse> UnaryCall<TRequest, TResponse>(
8082
settings: settings
8183
);
8284
}
85+
86+
public Task<BulkUpsertResponse> BulkUpsertAsync(BulkUpsertRequest req, CancellationToken ct = default)
87+
=> Driver.UnaryCall(TableService.BulkUpsertMethod, req, new GrpcRequestSettings { CancellationToken = ct });
88+
8389
}

0 commit comments

Comments
 (0)