-
Notifications
You must be signed in to change notification settings - Fork 28
add Bulk upsert #483
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
add Bulk upsert #483
Changes from all commits
d904ed8
b757bf4
eca3da5
e63dc1c
81c68a8
6ca42e8
9f29644
c9c2bae
3bf1220
52891c0
d4dda32
c07b88d
b426020
b94b27b
6013dee
4768078
dcb95c4
d6f9956
17ad798
891c408
1e23c76
1241e8f
ae57645
b574624
270c446
c2a9a81
9b007fa
7597068
0214e9a
42e0018
5740614
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,7 @@ | ||
| namespace Ydb.Sdk.Ado.BulkUpsert | ||
| { | ||
| public sealed record BulkUpsertOptions( | ||
| TimeSpan? Timeout = null, | ||
| CancellationToken CancellationToken = default | ||
| ); | ||
| } |
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,80 @@ | ||
| using System.Reflection; | ||
| using Ydb.Sdk.Value; | ||
|
|
||
| namespace Ydb.Sdk.Ado.BulkUpsert; | ||
|
|
||
| internal static class TypedValueFactory | ||
| { | ||
| public static TypedValue FromObjects<T>(IReadOnlyCollection<T> rows) | ||
| { | ||
| if (rows.Count == 0) | ||
| throw new ArgumentException("Rows collection is empty.", nameof(rows)); | ||
|
|
||
| var props = typeof(T).GetProperties(BindingFlags.Instance | BindingFlags.Public) | ||
| .Where(p => p.CanRead).ToArray(); | ||
|
|
||
| var structs = new List<YdbValue>(rows.Count); | ||
|
|
||
| foreach (var row in rows) | ||
| { | ||
| var members = new Dictionary<string, YdbValue>(props.Length); | ||
| foreach (var p in props) | ||
| members[p.Name] = ToYdbValue(p.GetValue(row), p.PropertyType); | ||
|
|
||
| structs.Add(YdbValue.MakeStruct(members)); | ||
| } | ||
|
|
||
| var list = YdbValue.MakeList(structs); | ||
| return list.GetProto(); | ||
| } | ||
|
|
||
| private static YdbValue ToYdbValue(object? value, global::System.Type clr) | ||
|
Collaborator
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. зачем писать global:: ?
Contributor
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. У него какая-то очень странная коллизия имён была, ну можно убрать, вдруг пропала |
||
| { | ||
| if (value is null) return MakeOptional(clr); | ||
|
|
||
| if (clr == typeof(bool)) return (YdbValue)(bool)value; | ||
| if (clr == typeof(sbyte)) return (YdbValue)(sbyte)value; | ||
| if (clr == typeof(short)) return (YdbValue)(short)value; | ||
| if (clr == typeof(int)) return (YdbValue)(int)value; | ||
| if (clr == typeof(long)) return (YdbValue)(long)value; | ||
| if (clr == typeof(byte)) return (YdbValue)(byte)value; | ||
| if (clr == typeof(ushort)) return (YdbValue)(ushort)value; | ||
| if (clr == typeof(uint)) return (YdbValue)(uint)value; | ||
| if (clr == typeof(ulong)) return (YdbValue)(ulong)value; | ||
| if (clr == typeof(float)) return (YdbValue)(float)value; | ||
| if (clr == typeof(double)) return (YdbValue)(double)value; | ||
| if (clr == typeof(decimal)) return (YdbValue)(decimal)value; | ||
| if (clr == typeof(DateTime)) return YdbValue.MakeTimestamp((DateTime)value); | ||
| if (clr == typeof(TimeSpan)) return (YdbValue)(TimeSpan)value; | ||
| if (clr == typeof(Guid)) return YdbValue.MakeUuid((Guid)value); | ||
| if (clr == typeof(string)) return YdbValue.MakeUtf8((string)value); | ||
| if (clr == typeof(byte[])) return YdbValue.MakeString((byte[])value); | ||
|
|
||
| throw new NotSupportedException($"Type '{clr.FullName}' is not supported."); | ||
| } | ||
|
|
||
| private static YdbValue MakeOptional(global::System.Type clr) | ||
| { | ||
| var t = Nullable.GetUnderlyingType(clr) ?? clr; | ||
|
|
||
| if (t == typeof(bool)) return YdbValue.MakeOptionalBool(); | ||
| if (t == typeof(sbyte)) return YdbValue.MakeOptionalInt8(); | ||
| if (t == typeof(short)) return YdbValue.MakeOptionalInt16(); | ||
| if (t == typeof(int)) return YdbValue.MakeOptionalInt32(); | ||
| if (t == typeof(long)) return YdbValue.MakeOptionalInt64(); | ||
| if (t == typeof(byte)) return YdbValue.MakeOptionalUint8(); | ||
| if (t == typeof(ushort)) return YdbValue.MakeOptionalUint16(); | ||
| if (t == typeof(uint)) return YdbValue.MakeOptionalUint32(); | ||
| if (t == typeof(ulong)) return YdbValue.MakeOptionalUint64(); | ||
| if (t == typeof(float)) return YdbValue.MakeOptionalFloat(); | ||
| if (t == typeof(double)) return YdbValue.MakeOptionalDouble(); | ||
| if (t == typeof(decimal)) return YdbValue.MakeOptionalDecimal(); | ||
| if (t == typeof(DateTime)) return YdbValue.MakeOptionalTimestamp(); | ||
| if (t == typeof(TimeSpan)) return YdbValue.MakeOptionalInterval(); | ||
| if (t == typeof(Guid)) return YdbValue.MakeOptionalUuid(); | ||
| if (t == typeof(string)) return YdbValue.MakeOptionalUtf8(); | ||
| if (t == typeof(byte[])) return YdbValue.MakeOptionalString(); | ||
|
|
||
| throw new NotSupportedException($"Null for '{t.FullName}' is not supported."); | ||
|
Comment on lines
+56
to
+78
Collaborator
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Это все написано в YdbParameter, давай попробуем переиспользовать существующий код |
||
| } | ||
| } | ||
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,73 @@ | ||
| using System; | ||
| using System.Collections.Generic; | ||
| using System.Threading; | ||
| using System.Threading.Tasks; | ||
| using Ydb.Sdk.Ado.BulkUpsert; | ||
|
|
||
| namespace Ydb.Sdk.Ado | ||
|
Check warning on line 7 in src/Ydb.Sdk/src/Ado/BulkUpsert/YdbBulkUpsertImporter.cs
|
||
| { | ||
| public sealed class YdbBulkUpsertImporter<T> : IAsyncDisposable | ||
| { | ||
| private readonly YdbConnection _connection; | ||
| private readonly string _tablePath; | ||
| private readonly BulkUpsertOptions _options; | ||
| private readonly List<T> _buffer = new(); | ||
| private readonly int _maxBatchSizeBytes; | ||
| private int _bufferSizeBytes; | ||
| private bool _isCompleted; | ||
|
|
||
| public YdbBulkUpsertImporter( | ||
| YdbConnection connection, | ||
| string tablePath, | ||
| BulkUpsertOptions? options = null, | ||
| int maxBatchSizeBytes = 64 * 1024 * 1024) | ||
| { | ||
| _connection = connection ?? throw new ArgumentNullException(nameof(connection)); | ||
|
Collaborator
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Не вижу, чтобы закрывался connection |
||
| _tablePath = tablePath ?? throw new ArgumentNullException(nameof(tablePath)); | ||
|
Collaborator
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. зачем проверки на null, если параметр not nullable |
||
| _options = options ?? new BulkUpsertOptions(); | ||
| _maxBatchSizeBytes = maxBatchSizeBytes; | ||
| } | ||
|
|
||
| public async Task WriteRowAsync(T row, CancellationToken cancellationToken = default) | ||
| { | ||
| if (_isCompleted) | ||
| throw new InvalidOperationException("BulkUpsertImporter уже завершён."); | ||
|
|
||
| _buffer.Add(row); | ||
|
|
||
| var rowProto = TypedValueFactory.FromObjects(new[] { row }); | ||
| var rowSize = rowProto.CalculateSize(); | ||
|
|
||
| _bufferSizeBytes += rowSize; | ||
|
|
||
| if (_bufferSizeBytes >= _maxBatchSizeBytes) | ||
| { | ||
| await FlushAsync(cancellationToken).ConfigureAwait(false); | ||
| } | ||
| } | ||
|
|
||
| public async Task FlushAsync(CancellationToken cancellationToken = default) | ||
| { | ||
| if (_buffer.Count == 0) | ||
| return; | ||
|
|
||
| await _connection.BulkUpsertInternalAsync(_tablePath, _buffer, _options, cancellationToken).ConfigureAwait(false); | ||
|
|
||
| _buffer.Clear(); | ||
| _bufferSizeBytes = 0; | ||
| } | ||
|
|
||
| public async Task CompleteAsync(CancellationToken cancellationToken = default) | ||
| { | ||
| if (_isCompleted) return; | ||
| await FlushAsync(cancellationToken).ConfigureAwait(false); | ||
| _isCompleted = true; | ||
| } | ||
|
|
||
| public async ValueTask DisposeAsync() | ||
| { | ||
| if (!_isCompleted) | ||
| await CompleteAsync(); | ||
| } | ||
| } | ||
| } | ||
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Это можно сделать один раз до цикла