From cda7380b86119c952189d4164a708710843f39dc Mon Sep 17 00:00:00 2001 From: KirillKurdyukov Date: Thu, 28 Aug 2025 09:49:20 +0000 Subject: [PATCH 1/6] Ydb.Sdk.AdoNet.OpenSearch.ToYDB.Transfer --- .../Program.cs | 318 ++++++++++++++++++ ...dk.AdoNet.OpenSearch.ToYDB.Transfer.csproj | 24 ++ .../nlog.config | 15 + examples/YdbExamples.sln | 6 + 4 files changed, 363 insertions(+) create mode 100644 examples/Ydb.Sdk.AdoNet.OpenSearch.ToYDB.Transfer/Program.cs create mode 100644 examples/Ydb.Sdk.AdoNet.OpenSearch.ToYDB.Transfer/Ydb.Sdk.AdoNet.OpenSearch.ToYDB.Transfer.csproj create mode 100644 examples/Ydb.Sdk.AdoNet.OpenSearch.ToYDB.Transfer/nlog.config diff --git a/examples/Ydb.Sdk.AdoNet.OpenSearch.ToYDB.Transfer/Program.cs b/examples/Ydb.Sdk.AdoNet.OpenSearch.ToYDB.Transfer/Program.cs new file mode 100644 index 00000000..bde4bc5a --- /dev/null +++ b/examples/Ydb.Sdk.AdoNet.OpenSearch.ToYDB.Transfer/Program.cs @@ -0,0 +1,318 @@ +using Microsoft.Extensions.Logging; +using Ydb.Sdk.Ado; +using Ydb.Sdk.Ado.YdbType; +using Ydb.Sdk.Yc; +using OpenSearch.Client; +using System.Text.Json.Serialization; +using NLog.Extensions.Logging; + +if (args.Length != 3) +{ + Console.WriteLine("Usage: Program.exe "); + + return 1; +} + +const int workerCount = 20; +const int batchSize = 1_000; + +var loggerFactory = LoggerFactory.Create(builder => builder.AddNLog()); +var logger = loggerFactory.CreateLogger(); + +var builder = new YdbConnectionStringBuilder(args[0]) +{ + CredentialsProvider = new MetadataProvider(loggerFactory: loggerFactory), + ServerCertificates = YcCerts.GetYcServerCertificates(), + MaxSessionPool = workerCount +}; + +await using var ydbDataSource = new YdbDataSource(builder); +await using (var ydbCommand = ydbDataSource.CreateCommand()) +{ + ydbCommand.CommandText = """ + CREATE TABLE IF NOT EXISTS `doc_25_08_28__12_00` ( + indexId Text NOT NULL, + chunkId Text NOT NULL, + fileId Text NOT NULL, + folderId Text NOT NULL, + chunkText Text FAMILY family_chunkText, + chunkVector Bytes, + createdAt Timestamp NOT NULL, + createdBy Text NOT NULL, + updatedAt Timestamp NOT NULL, + updatedBy Text NOT NULL, + PRIMARY KEY (indexId, chunkId, fileId, folderId), + FAMILY family_chunkText ( + DATA = "ssd", + COMPRESSION = "lz4" + ), + ) WITH ( + AUTO_PARTITIONING_BY_SIZE = ENABLED, + AUTO_PARTITIONING_BY_LOAD = ENABLED, + AUTO_PARTITIONING_MIN_PARTITIONS_COUNT = 50, + AUTO_PARTITIONING_MAX_PARTITIONS_COUNT = 100 + ) + """; + await ydbCommand.ExecuteNonQueryAsync(); +} + +var openSearchUri = new Uri(args[1]); +var openSearchSettings = new ConnectionSettings(openSearchUri) + .BasicAuthentication("admin", args[2]) + .ServerCertificateValidationCallback((_, _, _, _) => true) + .MaximumRetries(10) + .ConnectionLimit(100) + .EnableDebugMode() + .RequestTimeout(TimeSpan.FromSeconds(30)); + +var openSearchClient = new OpenSearchClient(openSearchSettings); +var indicesResponse = await openSearchClient.Indices.GetAsync("*"); + +logger.LogInformation("Found count of indices: {Count}", indicesResponse.Indices.Count); + +var filteredIndices = indicesResponse.Indices.Where(index => !index.Key.Name.StartsWith('.')).Select(index => index.Key) + .ToArray(); + +var startTime = DateTime.UtcNow; +logger.LogInformation("Started at: {StartTime:yyyy-MM-dd HH:mm:ss} UTC", startTime); +logger.LogInformation("Total indices to process: {Count}", filteredIndices.Length); + +var iterator = filteredIndices.Length; +var workers = new List(); + +for (var i = 0; i < workerCount; i++) +{ + workers.Add(Task.Run(async () => + { + while (true) + { + var index = Interlocked.Decrement(ref iterator); + + await using var ydbConnection = await ydbDataSource.OpenConnectionAsync(); + + if (index < 0) + break; + + await WorkerJobSingleIndex(filteredIndices[index], ydbConnection); + } + })); +} + +await Task.WhenAll(workers); + +var endTime = DateTime.UtcNow; +var duration = endTime - startTime; +logger.LogInformation("Completed at: {EndTime:yyyy-MM-dd HH:mm:ss} UTC", endTime); +logger.LogInformation(@"Total duration: {Duration:hh\:mm\:ss}", duration); +logger.LogInformation("All workers completed at {EndTime}. Total duration: {Duration}", endTime, duration); +return 0; + +async Task WorkerJobSingleIndex(IndexName indexName, YdbConnection ydbConnection) +{ + var countResponse = await openSearchClient.CountAsync(c => c.Index(indexName)); + var totalDocuments = countResponse.Count; + + var bulkUpsertImporter = ydbConnection.BeginBulkUpsertImport("doc_25_08_28__12_00", + [ + "indexId", "chunkId", "fileId", "folderId", "chunkText", "chunkVector", "createdAt", "createdBy", "updatedAt", + "updatedBy" + ]); + logger.LogInformation("Index {IndexName}: Total documents {TotalCount}", indexName, totalDocuments); + + var scrollResponse = await openSearchClient.SearchAsync(s => s + .Index(indexName) + .Size(batchSize) + .Scroll("5m")); + + var scrollId = scrollResponse.ScrollId; + var totalProcessed = 0; + + try + { + while (scrollResponse.Documents.Count != 0) + { + foreach (var doc in scrollResponse.Documents) + { + for (var attempt = 0; attempt < 10; attempt++) + { + try + { + await bulkUpsertImporter.AddRowAsync( + indexName.Name, + doc.ChunkMetadata.ChunkId, + doc.ChunkMetadata.FileId, + doc.ChunkMetadata.FolderId, + doc.ChunkText, + new YdbParameter + { + YdbDbType = YdbDbType.Bytes, + Value = ConvertVectorToBytes(doc.ChunkVector) + }, + doc.RecordMetadata.CreatedAt, + doc.RecordMetadata.CreatedBy, + doc.RecordMetadata.UpdatedAt, + doc.RecordMetadata.UpdatedBy + ); + + totalProcessed++; + break; + } + catch (YdbException e) when (e.IsTransient) + { + await Task.Delay(attempt * 1000); + logger.LogInformation(e, "Transient error during add row, attempt {Attempt}", attempt); + + if (attempt == 9) + { + throw; + } + } + } + } + + logger.LogInformation("Index {IndexName}: processed {TotalProcessed}/{TotalDocuments} documents", indexName, + totalProcessed, totalDocuments); + + for (var attempt = 0; attempt < 10; attempt++) + { + scrollResponse = await openSearchClient.ScrollAsync("5m", scrollId); + if (!scrollResponse.IsValid) + { + logger.LogError(scrollResponse.OriginalException, "Failed to scroll"); + + if (attempt == 9) + { + throw new Exception("Failed to scroll", scrollResponse.OriginalException); + } + + continue; + } + + if (scrollResponse.Documents.Count == 0) + return; + + break; + } + + if (scrollResponse.Documents.Count != 0) + continue; + + logger.LogInformation( + "Index {IndexName}: Scroll completed - no more documents, processed {TotalProcessed}/{TotalDocuments}", + indexName, totalProcessed, totalDocuments); + + break; + } + + for (var attempt = 0; attempt < 10; attempt++) + { + try + { + await bulkUpsertImporter.FlushAsync(); + break; + } + catch (YdbException e) when (e.IsTransientWhenIdempotent) + { + await Task.Delay(attempt * 1000); + logger.LogInformation(e, "Transient error during flush, attempt {Attempt}", attempt); + + if (attempt == 9) + { + throw; + } + } + } + } + catch (Exception ex) + { + logger.LogError(ex, + "Index {IndexName}: Error during processing, processed {TotalProcessed}/{TotalDocuments} documents", + indexName, totalProcessed, totalDocuments); + throw; + } + finally + { + if (!string.IsNullOrEmpty(scrollId)) + { + try + { + await openSearchClient.ClearScrollAsync(c => c.ScrollId(scrollId)); + } + catch (Exception ex) + { + logger.LogWarning(ex, "Index {IndexName}: Failed to clear scroll", indexName); + } + } + } +} + +byte[]? ConvertVectorToBytes(float[]? vector) +{ + if (vector == null) + { + return null; + } + + const int floatSize = sizeof(float); + var result = new byte[vector.Length * floatSize + 1]; + + for (var i = 0; i < vector.Length; i++) + { + var bytes = BitConverter.GetBytes(vector[i]); + Array.Copy(bytes, 0, result, i * floatSize, floatSize); + } + + result[^1] = 0x01; + return result; +} + +internal class Document +{ + [JsonRequired] + [JsonPropertyName("chunkMetadata")] + public ChunkMetadata ChunkMetadata { get; set; } = null!; + + [JsonRequired] + [JsonPropertyName("chunkText")] + public string ChunkText { get; set; } = null!; + + [JsonPropertyName("chunkVector")] public float[]? ChunkVector { get; set; } = null; + + [JsonRequired] + [JsonPropertyName("recordMetadata")] + public RecordMetadata RecordMetadata { get; set; } = null!; +} + +internal class ChunkMetadata +{ + [JsonRequired] + [JsonPropertyName("chunkId")] + public string ChunkId { get; set; } = null!; + + [JsonRequired] + [JsonPropertyName("fileId")] + public string FileId { get; set; } = null!; + + [JsonRequired] + [JsonPropertyName("folderId")] + public string FolderId { get; set; } = null!; +} + +internal class RecordMetadata +{ + [JsonRequired] + [JsonPropertyName("createdAt")] + public DateTime CreatedAt { get; set; } + + [JsonRequired] + [JsonPropertyName("createdBy")] + public string CreatedBy { get; set; } = null!; + + [JsonRequired] + [JsonPropertyName("updatedAt")] + public DateTime UpdatedAt { get; set; } + + [JsonRequired] + [JsonPropertyName("updatedBy")] + public string UpdatedBy { get; set; } = null!; +} diff --git a/examples/Ydb.Sdk.AdoNet.OpenSearch.ToYDB.Transfer/Ydb.Sdk.AdoNet.OpenSearch.ToYDB.Transfer.csproj b/examples/Ydb.Sdk.AdoNet.OpenSearch.ToYDB.Transfer/Ydb.Sdk.AdoNet.OpenSearch.ToYDB.Transfer.csproj new file mode 100644 index 00000000..242c8478 --- /dev/null +++ b/examples/Ydb.Sdk.AdoNet.OpenSearch.ToYDB.Transfer/Ydb.Sdk.AdoNet.OpenSearch.ToYDB.Transfer.csproj @@ -0,0 +1,24 @@ + + + + Exe + net8.0 + enable + enable + + + + + + + + + + + + + + PreserveNewest + + + diff --git a/examples/Ydb.Sdk.AdoNet.OpenSearch.ToYDB.Transfer/nlog.config b/examples/Ydb.Sdk.AdoNet.OpenSearch.ToYDB.Transfer/nlog.config new file mode 100644 index 00000000..b3aed425 --- /dev/null +++ b/examples/Ydb.Sdk.AdoNet.OpenSearch.ToYDB.Transfer/nlog.config @@ -0,0 +1,15 @@ + + + + + + + + + + + + + \ No newline at end of file diff --git a/examples/YdbExamples.sln b/examples/YdbExamples.sln index 4e37d6b8..a109fec8 100644 --- a/examples/YdbExamples.sln +++ b/examples/YdbExamples.sln @@ -29,6 +29,8 @@ Project("{FAE04EC0-301F-11D3-BF4B-00C04F79EFBC}") = "Database.Operations.Tutoria EndProject Project("{FAE04EC0-301F-11D3-BF4B-00C04F79EFBC}") = "Ydb.Sdk.AdoNet.Yandex.Cloud.Serverless.Container", "Ydb.Sdk.AdoNet.Yandex.Cloud.Serverless.Container\Ydb.Sdk.AdoNet.Yandex.Cloud.Serverless.Container.csproj", "{77625697-498B-4879-BABA-046EE93E7AF7}" EndProject +Project("{FAE04EC0-301F-11D3-BF4B-00C04F79EFBC}") = "Ydb.Sdk.AdoNet.OpenSearch.ToYDB.Transfer", "Ydb.Sdk.AdoNet.OpenSearch.ToYDB.Transfer\Ydb.Sdk.AdoNet.OpenSearch.ToYDB.Transfer.csproj", "{99716DB3-23BD-4969-9A4C-B25B80453793}" +EndProject Global GlobalSection(SolutionConfigurationPlatforms) = preSolution Debug|Any CPU = Debug|Any CPU @@ -87,6 +89,10 @@ Global {77625697-498B-4879-BABA-046EE93E7AF7}.Debug|Any CPU.Build.0 = Debug|Any CPU {77625697-498B-4879-BABA-046EE93E7AF7}.Release|Any CPU.ActiveCfg = Release|Any CPU {77625697-498B-4879-BABA-046EE93E7AF7}.Release|Any CPU.Build.0 = Release|Any CPU + {99716DB3-23BD-4969-9A4C-B25B80453793}.Debug|Any CPU.ActiveCfg = Debug|Any CPU + {99716DB3-23BD-4969-9A4C-B25B80453793}.Debug|Any CPU.Build.0 = Debug|Any CPU + {99716DB3-23BD-4969-9A4C-B25B80453793}.Release|Any CPU.ActiveCfg = Release|Any CPU + {99716DB3-23BD-4969-9A4C-B25B80453793}.Release|Any CPU.Build.0 = Release|Any CPU EndGlobalSection GlobalSection(SolutionProperties) = preSolution HideSolutionNode = FALSE From 729914ccc92698937890b3d448243c74cc58c4ce Mon Sep 17 00:00:00 2001 From: KirillKurdyukov Date: Thu, 28 Aug 2025 12:14:51 +0000 Subject: [PATCH 2/6] NOT NULL --- examples/Ydb.Sdk.AdoNet.OpenSearch.ToYDB.Transfer/Program.cs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/examples/Ydb.Sdk.AdoNet.OpenSearch.ToYDB.Transfer/Program.cs b/examples/Ydb.Sdk.AdoNet.OpenSearch.ToYDB.Transfer/Program.cs index bde4bc5a..4e206e7c 100644 --- a/examples/Ydb.Sdk.AdoNet.OpenSearch.ToYDB.Transfer/Program.cs +++ b/examples/Ydb.Sdk.AdoNet.OpenSearch.ToYDB.Transfer/Program.cs @@ -35,7 +35,7 @@ chunkId Text NOT NULL, fileId Text NOT NULL, folderId Text NOT NULL, - chunkText Text FAMILY family_chunkText, + chunkText Text FAMILY family_chunkText NOT NULL, chunkVector Bytes, createdAt Timestamp NOT NULL, createdBy Text NOT NULL, From 6406f5f0850c92fcf6619bf2c9450a2f7f8e5f54 Mon Sep 17 00:00:00 2001 From: KirillKurdyukov Date: Thu, 28 Aug 2025 12:58:05 +0000 Subject: [PATCH 3/6] commit --- .../Program.cs | 10 +++++----- 1 file changed, 5 insertions(+), 5 deletions(-) diff --git a/examples/Ydb.Sdk.AdoNet.OpenSearch.ToYDB.Transfer/Program.cs b/examples/Ydb.Sdk.AdoNet.OpenSearch.ToYDB.Transfer/Program.cs index 4e206e7c..3bb7c207 100644 --- a/examples/Ydb.Sdk.AdoNet.OpenSearch.ToYDB.Transfer/Program.cs +++ b/examples/Ydb.Sdk.AdoNet.OpenSearch.ToYDB.Transfer/Program.cs @@ -6,9 +6,9 @@ using System.Text.Json.Serialization; using NLog.Extensions.Logging; -if (args.Length != 3) +if (args.Length != 4) { - Console.WriteLine("Usage: Program.exe "); + Console.WriteLine("Usage: Program.exe "); return 1; } @@ -29,8 +29,8 @@ await using var ydbDataSource = new YdbDataSource(builder); await using (var ydbCommand = ydbDataSource.CreateCommand()) { - ydbCommand.CommandText = """ - CREATE TABLE IF NOT EXISTS `doc_25_08_28__12_00` ( + ydbCommand.CommandText = $""" + CREATE TABLE IF NOT EXISTS `{args[3]}` ( indexId Text NOT NULL, chunkId Text NOT NULL, fileId Text NOT NULL, @@ -112,7 +112,7 @@ async Task WorkerJobSingleIndex(IndexName indexName, YdbConnection ydbConnection var countResponse = await openSearchClient.CountAsync(c => c.Index(indexName)); var totalDocuments = countResponse.Count; - var bulkUpsertImporter = ydbConnection.BeginBulkUpsertImport("doc_25_08_28__12_00", + var bulkUpsertImporter = ydbConnection.BeginBulkUpsertImport(args[3], [ "indexId", "chunkId", "fileId", "folderId", "chunkText", "chunkVector", "createdAt", "createdBy", "updatedAt", "updatedBy" From a471e69b5cd6996475d1abfa29fb3fd34131858e Mon Sep 17 00:00:00 2001 From: KirillKurdyukov Date: Thu, 28 Aug 2025 20:03:49 +0700 Subject: [PATCH 4/6] fix linter --- .../Program.cs | 55 ++++++++++--------- 1 file changed, 28 insertions(+), 27 deletions(-) diff --git a/examples/Ydb.Sdk.AdoNet.OpenSearch.ToYDB.Transfer/Program.cs b/examples/Ydb.Sdk.AdoNet.OpenSearch.ToYDB.Transfer/Program.cs index 3bb7c207..48ae2e95 100644 --- a/examples/Ydb.Sdk.AdoNet.OpenSearch.ToYDB.Transfer/Program.cs +++ b/examples/Ydb.Sdk.AdoNet.OpenSearch.ToYDB.Transfer/Program.cs @@ -8,7 +8,8 @@ if (args.Length != 4) { - Console.WriteLine("Usage: Program.exe "); + Console.WriteLine( + "Usage: Program.exe "); return 1; } @@ -30,29 +31,29 @@ await using (var ydbCommand = ydbDataSource.CreateCommand()) { ydbCommand.CommandText = $""" - CREATE TABLE IF NOT EXISTS `{args[3]}` ( - indexId Text NOT NULL, - chunkId Text NOT NULL, - fileId Text NOT NULL, - folderId Text NOT NULL, - chunkText Text FAMILY family_chunkText NOT NULL, - chunkVector Bytes, - createdAt Timestamp NOT NULL, - createdBy Text NOT NULL, - updatedAt Timestamp NOT NULL, - updatedBy Text NOT NULL, - PRIMARY KEY (indexId, chunkId, fileId, folderId), - FAMILY family_chunkText ( - DATA = "ssd", - COMPRESSION = "lz4" - ), - ) WITH ( - AUTO_PARTITIONING_BY_SIZE = ENABLED, - AUTO_PARTITIONING_BY_LOAD = ENABLED, - AUTO_PARTITIONING_MIN_PARTITIONS_COUNT = 50, - AUTO_PARTITIONING_MAX_PARTITIONS_COUNT = 100 - ) - """; + CREATE TABLE IF NOT EXISTS `{args[3]}` ( + indexId Text NOT NULL, + chunkId Text NOT NULL, + fileId Text NOT NULL, + folderId Text NOT NULL, + chunkText Text FAMILY family_chunkText NOT NULL, + chunkVector Bytes, + createdAt Timestamp NOT NULL, + createdBy Text NOT NULL, + updatedAt Timestamp NOT NULL, + updatedBy Text NOT NULL, + PRIMARY KEY (indexId, chunkId, fileId, folderId), + FAMILY family_chunkText ( + DATA = "ssd", + COMPRESSION = "lz4" + ), + ) WITH ( + AUTO_PARTITIONING_BY_SIZE = ENABLED, + AUTO_PARTITIONING_BY_LOAD = ENABLED, + AUTO_PARTITIONING_MIN_PARTITIONS_COUNT = 50, + AUTO_PARTITIONING_MAX_PARTITIONS_COUNT = 100 + ) + """; await ydbCommand.ExecuteNonQueryAsync(); } @@ -194,9 +195,9 @@ await bulkUpsertImporter.AddRowAsync( break; } - if (scrollResponse.Documents.Count != 0) + if (scrollResponse.Documents.Count != 0) continue; - + logger.LogInformation( "Index {IndexName}: Scroll completed - no more documents, processed {TotalProcessed}/{TotalDocuments}", indexName, totalProcessed, totalDocuments); @@ -315,4 +316,4 @@ internal class RecordMetadata [JsonRequired] [JsonPropertyName("updatedBy")] public string UpdatedBy { get; set; } = null!; -} +} \ No newline at end of file From 9bdaaeb43abf745a27ab7730eaf28f22f20d3693 Mon Sep 17 00:00:00 2001 From: KirillKurdyukov Date: Thu, 28 Aug 2025 20:12:55 +0700 Subject: [PATCH 5/6] disable --- examples/Ydb.Sdk.AdoNet.OpenSearch.ToYDB.Transfer/Program.cs | 1 + 1 file changed, 1 insertion(+) diff --git a/examples/Ydb.Sdk.AdoNet.OpenSearch.ToYDB.Transfer/Program.cs b/examples/Ydb.Sdk.AdoNet.OpenSearch.ToYDB.Transfer/Program.cs index 48ae2e95..bb390d45 100644 --- a/examples/Ydb.Sdk.AdoNet.OpenSearch.ToYDB.Transfer/Program.cs +++ b/examples/Ydb.Sdk.AdoNet.OpenSearch.ToYDB.Transfer/Program.cs @@ -89,6 +89,7 @@ FAMILY family_chunkText ( { var index = Interlocked.Decrement(ref iterator); + // ReSharper disable once AccessToDisposedClosure await using var ydbConnection = await ydbDataSource.OpenConnectionAsync(); if (index < 0) From 56446ee9dd7b376a3940641c2aa2af52c5d74780 Mon Sep 17 00:00:00 2001 From: KirillKurdyukov Date: Thu, 28 Aug 2025 20:14:51 +0700 Subject: [PATCH 6/6] fix linter --- .../Ydb.Sdk.AdoNet.OpenSearch.ToYDB.Transfer/Program.cs | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/examples/Ydb.Sdk.AdoNet.OpenSearch.ToYDB.Transfer/Program.cs b/examples/Ydb.Sdk.AdoNet.OpenSearch.ToYDB.Transfer/Program.cs index bb390d45..392d9327 100644 --- a/examples/Ydb.Sdk.AdoNet.OpenSearch.ToYDB.Transfer/Program.cs +++ b/examples/Ydb.Sdk.AdoNet.OpenSearch.ToYDB.Transfer/Program.cs @@ -1,10 +1,10 @@ -using Microsoft.Extensions.Logging; +using System.Text.Json.Serialization; +using Microsoft.Extensions.Logging; +using NLog.Extensions.Logging; +using OpenSearch.Client; using Ydb.Sdk.Ado; using Ydb.Sdk.Ado.YdbType; using Ydb.Sdk.Yc; -using OpenSearch.Client; -using System.Text.Json.Serialization; -using NLog.Extensions.Logging; if (args.Length != 4) {