-
Notifications
You must be signed in to change notification settings - Fork 28
Add: BulkUpsertImporter #485
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Merged
KirillKurdyukov
merged 30 commits into
ydb-platform:main
from
LiamHamsters:add-bulk-upsert
Aug 1, 2025
Merged
Changes from 17 commits
Commits
Show all changes
30 commits
Select commit
Hold shift + click to select a range
b6bf0d5
Add BulkUpsert implementation
LiamHamsters 261d342
hot fix
LiamHamsters 8173dd0
hot fix 2
LiamHamsters 8d170b8
fix issues
LiamHamsters 5ac5127
feat: make BulkUpsert API more user-friendly
LiamHamsters 3db706d
Implement simple BulkUpsert in YdbConnection and YdbDataSource (delet…
LiamHamsters a2aa86b
refactor: use array-based rows for BulkUpsert, remove dictionary
LiamHamsters bff5137
fix: format
LiamHamsters 924da51
last hot fix
LiamHamsters 5e52cc1
fix issues
LiamHamsters 3c219ef
rebase + SessionImpl -> Session
LiamHamsters 4af5c80
fast fix
LiamHamsters 18060cd
BulkUpsert: forbid usage inside transaction, add test
LiamHamsters f207b69
fix + edit CHANGELOG.md
LiamHamsters 816dc9c
add BulkUpsertProtoImporter
LiamHamsters 7ffa2ee
hot fix
LiamHamsters 1879bca
bulk upsert: add object[] support, simplify implementation
LiamHamsters 14a2bfe
fix: handle exceptions in BulkUpsertImporter DisposeAsync, update tests
LiamHamsters 5cee465
fix
LiamHamsters b203781
hot fix
LiamHamsters eec3d12
remove Type argument
LiamHamsters 8c32732
hot fix
LiamHamsters c9fdbb5
fix inspections and autoformat
LiamHamsters 6463d36
feat: switch BulkUpsertImporter to plain proto, remove extra wrappers…
LiamHamsters bfc6cf5
refactor: correct batch bytes check and flush logic in BulkUpsertImpo…
LiamHamsters 1c28862
fix
LiamHamsters 0bb791c
last fix
LiamHamsters beec0e0
Finally
LiamHamsters b08ebd5
change ct in Flush
LiamHamsters 96fef4a
edit: tablePath calculation in YdbConnection
LiamHamsters File filter
Filter by extension
Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
There are no files selected for viewing
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,7 @@ | ||
| namespace Ydb.Sdk.Ado.BulkUpsert; | ||
|
|
||
| public enum BulkUpsertFormat | ||
| { | ||
| Proto = 0 | ||
| // Arrow = 1 | ||
LiamHamsters marked this conversation as resolved.
Outdated
Show resolved
Hide resolved
|
||
| } | ||
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,15 @@ | ||
| using Ydb.Sdk.Value; | ||
|
|
||
| namespace Ydb.Sdk.Ado.BulkUpsert; | ||
|
|
||
| public interface IBulkUpsertImporter : IAsyncDisposable | ||
| { | ||
LiamHamsters marked this conversation as resolved.
Outdated
Show resolved
Hide resolved
|
||
| ValueTask AddRowAsync(params YdbValue[] values); | ||
| ValueTask AddRowAsync(params object?[] values); | ||
|
|
||
| ValueTask AddRowsAsync(IEnumerable<YdbValue[]> rows, CancellationToken cancellationToken = default); | ||
| ValueTask AddRowsAsync(IEnumerable<object?[]> rows, CancellationToken cancellationToken = default); | ||
|
|
||
| ValueTask FlushAsync(CancellationToken cancellationToken = default); | ||
| IReadOnlyList<Ydb.Value> GetBufferedRows(); | ||
| } | ||
126 changes: 126 additions & 0 deletions
126
src/Ydb.Sdk/src/Ado/BulkUpsert/YdbBulkUpsertImporter.cs
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,126 @@ | ||
| using Ydb.Sdk.Value; | ||
|
|
||
| namespace Ydb.Sdk.Ado.BulkUpsert; | ||
|
|
||
| public sealed class BulkUpsertImporter : IBulkUpsertImporter | ||
| { | ||
| private readonly YdbConnection _connection; | ||
| private readonly string _tablePath; | ||
| private readonly List<string> _columns; | ||
| private readonly List<Type> _types; | ||
| private readonly int _maxBytes; | ||
| private readonly List<Ydb.Value> _rows = new(); | ||
| private bool _disposed; | ||
|
|
||
| public BulkUpsertImporter( | ||
| YdbConnection connection, | ||
| string tablePath, | ||
| IReadOnlyList<string> columns, | ||
| IReadOnlyList<Type> types, | ||
| int maxBytes = 1024 * 1024) | ||
| { | ||
| _connection = connection; | ||
| _tablePath = tablePath; | ||
| _columns = columns.ToList(); | ||
| _types = types.ToList(); | ||
| _maxBytes = maxBytes; | ||
| } | ||
|
|
||
| public async ValueTask AddRowAsync(params YdbValue[] values) | ||
| { | ||
| ThrowIfDisposed(); | ||
| if (values.Length != _columns.Count) | ||
| throw new ArgumentException("Values count must match columns count", nameof(values)); | ||
|
|
||
| var dict = _columns.Zip(values, (name, value) => new KeyValuePair<string, YdbValue>(name, value)) | ||
| .ToDictionary(x => x.Key, x => x.Value); | ||
|
|
||
| var structValue = YdbValue.MakeStruct(dict).GetProto().Value; | ||
| _rows.Add(structValue); | ||
|
|
||
| var totalSize = _rows.Sum(r => r.CalculateSize()); | ||
| if (totalSize >= _maxBytes) | ||
| await FlushAsync(); | ||
| } | ||
|
|
||
| public async ValueTask AddRowAsync(params object?[] values) | ||
| { | ||
| ThrowIfDisposed(); | ||
| if (values.Length != _columns.Count) | ||
| throw new ArgumentException("Values count must match columns count", nameof(values)); | ||
|
|
||
| var ydbValues = new YdbValue[values.Length]; | ||
| for (int i = 0; i < values.Length; i++) | ||
| { | ||
| ydbValues[i] = YdbValueFromObject(values[i], _types[i]); | ||
| } | ||
| await AddRowAsync(ydbValues); | ||
| } | ||
|
|
||
| public async ValueTask AddRowsAsync(IEnumerable<YdbValue[]> rows, CancellationToken cancellationToken = default) | ||
| { | ||
| ThrowIfDisposed(); | ||
|
|
||
| foreach (var values in rows) | ||
| await AddRowAsync(values); | ||
| } | ||
|
|
||
| public async ValueTask AddRowsAsync(IEnumerable<object?[]> rows, CancellationToken cancellationToken = default) | ||
| { | ||
| ThrowIfDisposed(); | ||
|
|
||
| foreach (var values in rows) | ||
| await AddRowAsync(values); | ||
| } | ||
|
|
||
| public async ValueTask FlushAsync(CancellationToken cancellationToken = default) | ||
| { | ||
| ThrowIfDisposed(); | ||
| if (_rows.Count == 0) return; | ||
|
|
||
| await _connection.BulkUpsertProtoAsync(_tablePath, GetStructType(), _rows.ToList(), cancellationToken); | ||
| _rows.Clear(); | ||
| } | ||
|
|
||
| public IReadOnlyList<Ydb.Value> GetBufferedRows() => _rows; | ||
|
|
||
| public async ValueTask DisposeAsync() | ||
| { | ||
| if (_disposed) return; | ||
| await FlushAsync(); | ||
| _disposed = true; | ||
| } | ||
|
|
||
| private Type GetStructType() | ||
| { | ||
| var structType = new Type { StructType = new StructType() }; | ||
| for (var i = 0; i < _columns.Count; i++) | ||
| structType.StructType.Members.Add(new StructMember { Name = _columns[i], Type = _types[i] }); | ||
| return structType; | ||
| } | ||
|
|
||
| private void ThrowIfDisposed() | ||
| { | ||
| if (_disposed) | ||
| throw new ObjectDisposedException(nameof(BulkUpsertImporter)); | ||
| } | ||
|
|
||
| private static YdbValue YdbValueFromObject(object? value, Type columnType) | ||
| { | ||
| switch (value) | ||
| { | ||
| case YdbValue ydbValue: | ||
| return ydbValue; | ||
| default: | ||
| switch (columnType.TypeId) | ||
| { | ||
| case Type.Types.PrimitiveTypeId.Int32: | ||
| return YdbValue.MakeInt32(Convert.ToInt32(value)); | ||
| case Type.Types.PrimitiveTypeId.Utf8: | ||
| return YdbValue.MakeUtf8(value?.ToString()!); | ||
| default: | ||
| throw new NotSupportedException($"Type '{columnType.TypeId}' not supported in YdbValueFromObject"); | ||
| } | ||
| } | ||
| } | ||
| } |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Oops, something went wrong.
Oops, something went wrong.
Add this suggestion to a batch that can be applied as a single commit.
This suggestion is invalid because no changes were made to the code.
Suggestions cannot be applied while the pull request is closed.
Suggestions cannot be applied while viewing a subset of changes.
Only one suggestion per line can be applied in a batch.
Add this suggestion to a batch that can be applied as a single commit.
Applying suggestions on deleted lines is not supported.
You must change the existing code in this line in order to create a valid suggestion.
Outdated suggestions cannot be applied.
This suggestion has been applied or marked resolved.
Suggestions cannot be applied from pending reviews.
Suggestions cannot be applied on multi-line comments.
Suggestions cannot be applied while the pull request is queued to merge.
Suggestion cannot be applied right now. Please check back later.
Uh oh!
There was an error while loading. Please reload this page.