Skip to content

Commit 8d170b8

Browse files
committed
fix issues
1 parent 8173dd0 commit 8d170b8

File tree

9 files changed

+173
-217
lines changed

9 files changed

+173
-217
lines changed
Lines changed: 12 additions & 58 deletions
Original file line numberDiff line numberDiff line change
@@ -1,80 +1,34 @@
1-
using System.Reflection;
21
using Ydb.Sdk.Value;
32

43
namespace Ydb.Sdk.Ado.BulkUpsert;
54

65
internal static class TypedValueFactory
76
{
8-
public static TypedValue FromObjects<T>(IReadOnlyCollection<T> rows)
7+
public static TypedValue FromObjects<T>(
8+
IReadOnlyCollection<T> rows,
9+
IReadOnlyList<string> columnNames,
10+
IReadOnlyList<Func<T, YdbValue>> columnSelectors)
911
{
1012
if (rows.Count == 0)
1113
throw new ArgumentException("Rows collection is empty.", nameof(rows));
12-
13-
var props = typeof(T).GetProperties(BindingFlags.Instance | BindingFlags.Public)
14-
.Where(p => p.CanRead).ToArray();
14+
if (columnNames.Count != columnSelectors.Count)
15+
throw new ArgumentException("Column names count must match selectors count.");
1516

1617
var structs = new List<YdbValue>(rows.Count);
1718

1819
foreach (var row in rows)
1920
{
20-
var members = new Dictionary<string, YdbValue>(props.Length);
21-
foreach (var p in props)
22-
members[p.Name] = ToYdbValue(p.GetValue(row), p.PropertyType);
21+
var members = new Dictionary<string, YdbValue>(columnNames.Count);
22+
for (int i = 0; i < columnNames.Count; i++)
23+
{
24+
var value = columnSelectors[i](row);
25+
members[columnNames[i]] = value;
26+
}
2327

2428
structs.Add(YdbValue.MakeStruct(members));
2529
}
2630

2731
var list = YdbValue.MakeList(structs);
2832
return list.GetProto();
2933
}
30-
31-
private static YdbValue ToYdbValue(object? value, System.Type clr)
32-
{
33-
if (value is null) return MakeOptional(clr);
34-
35-
if (clr == typeof(bool)) return (YdbValue)(bool)value;
36-
if (clr == typeof(sbyte)) return (YdbValue)(sbyte)value;
37-
if (clr == typeof(short)) return (YdbValue)(short)value;
38-
if (clr == typeof(int)) return (YdbValue)(int)value;
39-
if (clr == typeof(long)) return (YdbValue)(long)value;
40-
if (clr == typeof(byte)) return (YdbValue)(byte)value;
41-
if (clr == typeof(ushort)) return (YdbValue)(ushort)value;
42-
if (clr == typeof(uint)) return (YdbValue)(uint)value;
43-
if (clr == typeof(ulong)) return (YdbValue)(ulong)value;
44-
if (clr == typeof(float)) return (YdbValue)(float)value;
45-
if (clr == typeof(double)) return (YdbValue)(double)value;
46-
if (clr == typeof(decimal)) return (YdbValue)(decimal)value;
47-
if (clr == typeof(DateTime)) return YdbValue.MakeTimestamp((DateTime)value);
48-
if (clr == typeof(TimeSpan)) return (YdbValue)(TimeSpan)value;
49-
if (clr == typeof(Guid)) return YdbValue.MakeUuid((Guid)value);
50-
if (clr == typeof(string)) return YdbValue.MakeUtf8((string)value);
51-
if (clr == typeof(byte[])) return YdbValue.MakeString((byte[])value);
52-
53-
throw new NotSupportedException($"Type '{clr.FullName}' is not supported.");
54-
}
55-
56-
private static YdbValue MakeOptional(System.Type clr)
57-
{
58-
var t = Nullable.GetUnderlyingType(clr) ?? clr;
59-
60-
if (t == typeof(bool)) return YdbValue.MakeOptionalBool();
61-
if (t == typeof(sbyte)) return YdbValue.MakeOptionalInt8();
62-
if (t == typeof(short)) return YdbValue.MakeOptionalInt16();
63-
if (t == typeof(int)) return YdbValue.MakeOptionalInt32();
64-
if (t == typeof(long)) return YdbValue.MakeOptionalInt64();
65-
if (t == typeof(byte)) return YdbValue.MakeOptionalUint8();
66-
if (t == typeof(ushort)) return YdbValue.MakeOptionalUint16();
67-
if (t == typeof(uint)) return YdbValue.MakeOptionalUint32();
68-
if (t == typeof(ulong)) return YdbValue.MakeOptionalUint64();
69-
if (t == typeof(float)) return YdbValue.MakeOptionalFloat();
70-
if (t == typeof(double)) return YdbValue.MakeOptionalDouble();
71-
if (t == typeof(decimal)) return YdbValue.MakeOptionalDecimal();
72-
if (t == typeof(DateTime)) return YdbValue.MakeOptionalTimestamp();
73-
if (t == typeof(TimeSpan)) return YdbValue.MakeOptionalInterval();
74-
if (t == typeof(Guid)) return YdbValue.MakeOptionalUuid();
75-
if (t == typeof(string)) return YdbValue.MakeOptionalUtf8();
76-
if (t == typeof(byte[])) return YdbValue.MakeOptionalString();
77-
78-
throw new NotSupportedException($"Null for '{t.FullName}' is not supported.");
79-
}
8034
}

src/Ydb.Sdk/src/Ado/BulkUpsert/YdbBulkUpsertImporter.cs

Lines changed: 14 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -1,28 +1,29 @@
11
using Ydb.Sdk.Services.Table;
2+
using Ydb.Sdk.Value;
23

34
namespace Ydb.Sdk.Ado.BulkUpsert;
45

56
public sealed class YdbBulkUpsertImporter<T> : IAsyncDisposable
67
{
7-
private readonly TableClient _tableClient;
8+
private readonly YdbConnection _connection;
89
private readonly string _tablePath;
9-
private readonly BulkUpsertOptions _options;
10-
private readonly RetrySettings? _retrySettings;
10+
private readonly IReadOnlyList<string> _columns;
11+
private readonly IReadOnlyList<Func<T, YdbValue>> _selectors;
1112
private readonly int _maxRowsInBatch;
1213
private readonly List<T> _buffer = new();
1314
private bool _isCompleted;
1415

1516
public YdbBulkUpsertImporter(
16-
TableClient tableClient,
17+
YdbConnection connection,
1718
string tablePath,
18-
BulkUpsertOptions? options = null,
19-
RetrySettings? retrySettings = null,
19+
IReadOnlyList<string> columns,
20+
IReadOnlyList<Func<T, YdbValue>> selectors,
2021
int maxRowsInBatch = 1000)
2122
{
22-
_tableClient = tableClient ?? throw new ArgumentNullException(nameof(tableClient));
23+
_connection = connection ?? throw new ArgumentNullException(nameof(connection));
2324
_tablePath = tablePath ?? throw new ArgumentNullException(nameof(tablePath));
24-
_options = options ?? new BulkUpsertOptions();
25-
_retrySettings = retrySettings;
25+
_columns = columns ?? throw new ArgumentNullException(nameof(columns));
26+
_selectors = selectors ?? throw new ArgumentNullException(nameof(selectors));
2627
_maxRowsInBatch = maxRowsInBatch;
2728
}
2829

@@ -44,10 +45,12 @@ public async Task FlushAsync(CancellationToken cancellationToken = default)
4445
if (_buffer.Count == 0)
4546
return;
4647

47-
await _tableClient.BulkUpsertWithRetry(
48+
await _connection.BulkUpsertWithRetry(
4849
_tablePath,
4950
_buffer,
50-
_retrySettings
51+
_columns,
52+
_selectors,
53+
cancellationToken
5154
).ConfigureAwait(false);
5255

5356
_buffer.Clear();

src/Ydb.Sdk/src/Ado/YdbConnection.cs

Lines changed: 10 additions & 24 deletions
Original file line numberDiff line numberDiff line change
@@ -5,8 +5,9 @@
55
using Ydb.Sdk.Ado.BulkUpsert;
66
using Ydb.Sdk.Ado.Session;
77
using Ydb.Sdk.Services.Query;
8-
using Ydb.Sdk.Services.Table;
8+
using Ydb.Sdk.Value;
99
using Ydb.Table;
10+
using Ydb.Table.V1;
1011
using static System.Data.IsolationLevel;
1112

1213
namespace Ydb.Sdk.Ado;
@@ -59,7 +60,8 @@ public YdbConnection(YdbConnectionStringBuilder connectionStringBuilder)
5960
internal async Task BulkUpsertInternalAsync<T>(
6061
string tablePath,
6162
IReadOnlyCollection<T> rows,
62-
BulkUpsertOptions options,
63+
IReadOnlyList<string> columns,
64+
IReadOnlyList<Func<T, YdbValue>> selectors,
6365
CancellationToken cancellationToken)
6466
{
6567
if (CurrentTransaction is { Completed: false })
@@ -68,12 +70,16 @@ internal async Task BulkUpsertInternalAsync<T>(
6870
{
6971
Table = tablePath,
7072
OperationParams = new OperationParams(),
71-
Rows = TypedValueFactory.FromObjects(rows)
73+
Rows = TypedValueFactory.FromObjects(rows, columns, selectors)
7274
};
7375

7476
if (Session is Services.Query.Session sessionImpl)
7577
{
76-
var resp = await sessionImpl.BulkUpsertAsync(req, cancellationToken).ConfigureAwait(false);
78+
var resp = await sessionImpl.Driver.UnaryCall(
79+
TableService.BulkUpsertMethod,
80+
req,
81+
new GrpcRequestSettings { CancellationToken = cancellationToken }
82+
).ConfigureAwait(false);
7783
var status = Status.FromProto(resp.Operation.Status, resp.Operation.Issues);
7884
status.EnsureSuccess();
7985
}
@@ -83,26 +89,6 @@ internal async Task BulkUpsertInternalAsync<T>(
8389
}
8490
}
8591

86-
public YdbBulkUpsertImporter<T> BeginBulkUpsert<T>(
87-
string tablePath,
88-
BulkUpsertOptions? options = null,
89-
RetrySettings? retrySettings = null,
90-
int maxRowsInBatch = 1000)
91-
{
92-
ThrowIfConnectionClosed();
93-
if (CurrentTransaction is { Completed: false })
94-
throw new InvalidOperationException("BulkUpsert does not support working within a transaction");
95-
96-
var realSession = Session as Services.Query.Session
97-
?? throw new InvalidOperationException("Underlying session does not support bulk upsert");
98-
99-
var driver = realSession.Driver as Driver
100-
?? throw new InvalidOperationException("Session driver is not of expected type 'Ydb.Sdk.Driver'");
101-
102-
var tableClient = new TableClient(driver);
103-
return new YdbBulkUpsertImporter<T>(tableClient, tablePath, options, retrySettings, maxRowsInBatch);
104-
}
105-
10692
protected override YdbTransaction BeginDbTransaction(IsolationLevel isolationLevel)
10793
{
10894
ThrowIfConnectionClosed();

src/Ydb.Sdk/src/Ado/YdbDataSource.cs

Lines changed: 11 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,7 @@
11
#if NET7_0_OR_GREATER
22
using System.Data.Common;
33
using Ydb.Sdk.Ado.BulkUpsert;
4-
using Ydb.Sdk.Services.Table;
4+
using Ydb.Sdk.Value;
55

66
namespace Ydb.Sdk.Ado;
77

@@ -70,22 +70,20 @@ protected override async ValueTask DisposeAsyncCore() =>
7070

7171
public async Task<YdbBulkUpsertImporter<T>> BeginBulkUpsertAsync<T>(
7272
string tablePath,
73-
BulkUpsertOptions? options = null,
74-
RetrySettings? retrySettings = null,
75-
int maxBatchSizeBytes = 64 * 1024 * 1024,
73+
IReadOnlyList<string> columns,
74+
IReadOnlyList<Func<T, YdbValue>> selectors,
75+
int maxRowsInBatch = 1000,
7676
CancellationToken cancellationToken = default)
7777
{
7878
var conn = await OpenConnectionAsync(cancellationToken).ConfigureAwait(false);
7979

80-
var realSession = conn.Session as Services.Query.Session
81-
?? throw new InvalidOperationException("Underlying session does not support bulk upsert");
82-
83-
var driver = realSession.Driver as Driver
84-
?? throw new InvalidOperationException("Session driver is not of expected type 'Ydb.Sdk.Driver'");
85-
86-
var tableClient = new TableClient(driver);
87-
88-
return new YdbBulkUpsertImporter<T>(tableClient, tablePath, options, retrySettings, maxBatchSizeBytes);
80+
return new YdbBulkUpsertImporter<T>(
81+
conn,
82+
tablePath,
83+
columns,
84+
selectors,
85+
maxRowsInBatch
86+
);
8987
}
9088
}
9189

Lines changed: 13 additions & 20 deletions
Original file line numberDiff line numberDiff line change
@@ -1,29 +1,22 @@
1-
using Ydb.Operations;
2-
using Ydb.Sdk.Ado.BulkUpsert;
3-
using Ydb.Sdk.Client;
4-
using Ydb.Table;
1+
using Ydb.Sdk.Ado;
2+
using Ydb.Sdk.Value;
53

64
namespace Ydb.Sdk.Services.Table;
75

86
public static class BulkUpsertExtensions
97
{
10-
public static async Task<IResponse> BulkUpsertWithRetry<T>(
11-
this TableClient tableClient,
8+
public static async Task BulkUpsertWithRetry<T>(
9+
this YdbConnection connection,
1210
string tablePath,
1311
IReadOnlyCollection<T> rows,
14-
RetrySettings? retrySettings = null) =>
15-
await tableClient.SessionExec(
16-
async session =>
17-
{
18-
var req = new BulkUpsertRequest
19-
{
20-
Table = tablePath,
21-
OperationParams = new OperationParams(),
22-
Rows = TypedValueFactory.FromObjects(rows)
23-
};
24-
var resp = await session.BulkUpsertAsync(req);
25-
return new BulkUpsertResponseAdapter(resp);
26-
},
27-
retrySettings
12+
IReadOnlyList<string> columns,
13+
IReadOnlyList<Func<T, YdbValue>> selectors,
14+
CancellationToken cancellationToken = default) =>
15+
await connection.BulkUpsertInternalAsync(
16+
tablePath,
17+
rows,
18+
columns,
19+
selectors,
20+
cancellationToken
2821
);
2922
}

src/Ydb.Sdk/src/Services/Table/BulkUpsertResponseAdapter.cs

Lines changed: 0 additions & 16 deletions
This file was deleted.

src/Ydb.Sdk/src/Services/Table/Session.cs

Lines changed: 0 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -1,8 +1,6 @@
11
using Grpc.Core;
22
using Microsoft.Extensions.Logging;
33
using Ydb.Sdk.Services.Sessions;
4-
using Ydb.Table;
5-
using Ydb.Table.V1;
64

75
namespace Ydb.Sdk.Services.Table;
86

@@ -82,7 +80,4 @@ private async Task<TResponse> UnaryCall<TRequest, TResponse>(
8280
settings: settings
8381
);
8482
}
85-
86-
public Task<BulkUpsertResponse> BulkUpsertAsync(BulkUpsertRequest req, CancellationToken ct = default)
87-
=> Driver.UnaryCall(TableService.BulkUpsertMethod, req, new GrpcRequestSettings { CancellationToken = ct });
8883
}

0 commit comments

Comments
 (0)