diff --git a/clients/algoliasearch-client-csharp/algoliasearch/Utils/SearchClientExtensions.cs b/clients/algoliasearch-client-csharp/algoliasearch/Utils/SearchClientExtensions.cs index ea8a7738a46..e9ac88317d3 100644 --- a/clients/algoliasearch-client-csharp/algoliasearch/Utils/SearchClientExtensions.cs +++ b/clients/algoliasearch-client-csharp/algoliasearch/Utils/SearchClientExtensions.cs @@ -2,12 +2,14 @@ using System.Collections.Generic; using System.Linq; using System.Text; +using System.Text.Json; using System.Text.RegularExpressions; using System.Threading; using System.Threading.Tasks; using Algolia.Search.Exceptions; using Algolia.Search.Http; using Algolia.Search.Models.Search; +using Algolia.Search.Serializer; using Algolia.Search.Utils; using Action = Algolia.Search.Models.Search.Action; @@ -419,6 +421,108 @@ List PartialUpdateObjects( /// bool IndexExists(string indexName, CancellationToken cancellationToken = default); + + /// + /// Helper: Similar to the `SaveObjects` method but requires a Push connector to be created first, + /// in order to transform records before indexing them to Algolia. + /// The ingestion region must have been provided at client instantiation. + /// + /// The index in which to perform the request. + /// The list of `objects` to store in the given Algolia `indexName`. + /// Whether or not we should wait until every `batch` tasks has been processed, this operation may slow the total execution time of this method but is more reliable. + /// The size of the chunk of `objects`. The number of `batch` calls will be equal to `length(objects) / batchSize`. Defaults to 1000. + /// Add extra http header or query parameters to Algolia. + /// Cancellation Token to cancel the request. + Task> SaveObjectsWithTransformationAsync( + string indexName, + IEnumerable objects, + bool waitForTasks = false, + int batchSize = 1000, + RequestOptions options = null, + CancellationToken cancellationToken = default + ) + where T : class; + + /// + List SaveObjectsWithTransformation( + string indexName, + IEnumerable objects, + bool waitForTasks = false, + int batchSize = 1000, + RequestOptions options = null, + CancellationToken cancellationToken = default + ) + where T : class; + + /// + /// Helper: Similar to the `PartialUpdateObjects` method but requires a Push connector to be created first, + /// in order to transform records before indexing them to Algolia. + /// The ingestion region must have been provided at client instantiation. + /// + /// The index in which to perform the request. + /// The list of `objects` to update in the given Algolia `indexName`. + /// To be provided if non-existing objects are passed, otherwise, the call will fail. + /// Whether or not we should wait until every `batch` tasks has been processed, this operation may slow the total execution time of this method but is more reliable. + /// The size of the chunk of `objects`. The number of `batch` calls will be equal to `length(objects) / batchSize`. Defaults to 1000. + /// Add extra http header or query parameters to Algolia. + /// Cancellation Token to cancel the request. + Task< + List + > PartialUpdateObjectsWithTransformationAsync( + string indexName, + IEnumerable objects, + bool createIfNotExists = true, + bool waitForTasks = false, + int batchSize = 1000, + RequestOptions options = null, + CancellationToken cancellationToken = default + ) + where T : class; + + /// + List PartialUpdateObjectsWithTransformation( + string indexName, + IEnumerable objects, + bool createIfNotExists = true, + bool waitForTasks = false, + int batchSize = 1000, + RequestOptions options = null, + CancellationToken cancellationToken = default + ) + where T : class; + + /// + /// Helper: Similar to the `ReplaceAllObjects` method but requires a Push connector to be created first, + /// in order to transform records before indexing them to Algolia. + /// The ingestion region must have been provided at client instantiation. + /// A temporary index is created during this process in order to backup your data. + /// + /// The index in which to perform the request. + /// The list of `objects` to store in the given Algolia `indexName`. + /// The size of the chunk of `objects`. The number of `batch` calls will be equal to `objects.length / batchSize`. Defaults to 1000. + /// The `scopes` to keep from the index. Defaults to ['settings', 'rules', 'synonyms']. + /// Add extra http header or query parameters to Algolia. + /// Cancellation Token to cancel the request. + Task ReplaceAllObjectsWithTransformationAsync( + string indexName, + IEnumerable objects, + int batchSize = 1000, + List scopes = null, + RequestOptions options = null, + CancellationToken cancellationToken = default + ) + where T : class; + + /// + ReplaceAllObjectsWithTransformationResponse ReplaceAllObjectsWithTransformation( + string indexName, + IEnumerable objects, + int batchSize = 1000, + List scopes = null, + RequestOptions options = null, + CancellationToken cancellationToken = default + ) + where T : class; } public partial class SearchClient : ISearchClient @@ -1174,4 +1278,399 @@ public async Task IndexExistsAsync( /// public bool IndexExists(string indexName, CancellationToken cancellationToken = default) => AsyncHelper.RunSync(() => IndexExistsAsync(indexName, cancellationToken)); + + // ==================== WithTransformation Helper Methods ==================== + + /// + /// Private helper: Wait for an event to be processed with retry logic + /// + private static async Task WaitForEventAsync( + IIngestionClient ingestionTransporter, + string runID, + string eventID, + int maxRetries = 50, + CancellationToken cancellationToken = default + ) + { + int retryCount = 0; + + while (retryCount < maxRetries) + { + try + { + var eventResponse = await ingestionTransporter + .GetEventAsync(runID, eventID, cancellationToken: cancellationToken) + .ConfigureAwait(false); + + // Event found, we're done + return; + } + catch (AlgoliaApiException ex) when (ex.HttpErrorCode == 404) + { + // Event not found yet, retry + retryCount++; + var timeout = Math.Min(retryCount * 500, 5000); + await Task.Delay(timeout, cancellationToken).ConfigureAwait(false); + } + } + + throw new AlgoliaException( + $"The maximum number of retries exceeded. ({retryCount}/{maxRetries})" + ); + } + + /// + /// Private helper: Chunk and push objects through the transformation pipeline + /// + private static async Task> ChunkedPushAsync( + IIngestionClient ingestionTransporter, + string indexName, + IEnumerable objects, + Algolia.Search.Models.Ingestion.Action action, + bool waitForTasks, + int batchSize, + string referenceIndexName = null, + RequestOptions options = null, + CancellationToken cancellationToken = default + ) + where T : class + { + var objectsList = objects.ToList(); + var responses = new List(); + var waitBatchSize = Math.Max(batchSize / 10, 1); + var offset = 0; + + for (var i = 0; i < objectsList.Count; i += batchSize) + { + var chunk = objectsList.Skip(i).Take(batchSize); + var records = new List(); + + foreach (var obj in chunk) + { + // Serialize the object to JSON and then deserialize to PushTaskRecords to populate AdditionalProperties + var jsonString = JsonSerializer.Serialize(obj, JsonConfig.Options); + var record = JsonSerializer.Deserialize( + jsonString, + JsonConfig.Options + ); + records.Add(record); + } + + var payload = new Algolia.Search.Models.Ingestion.PushTaskPayload(action, records); + + var response = await ingestionTransporter + .PushAsync( + indexName, + payload, + watch: null, + referenceIndexName: referenceIndexName, + options: options, + cancellationToken: cancellationToken + ) + .ConfigureAwait(false); + + // Convert Ingestion.WatchResponse to Search.WatchResponse via JSON serialization + var responseJsonString = JsonSerializer.Serialize(response, JsonConfig.Options); + var searchWatchResponse = + JsonSerializer.Deserialize( + responseJsonString, + JsonConfig.Options + ); + + responses.Add(searchWatchResponse); + + // Wait logic (after every waitBatchSize batches OR at the end) + if ( + waitForTasks + && responses.Count > 0 + && (responses.Count % waitBatchSize == 0 || i + batchSize >= objectsList.Count) + ) + { + for (var j = offset; j < responses.Count; j++) + { + var resp = responses[j]; + if (string.IsNullOrEmpty(resp.EventID)) + { + throw new AlgoliaException( + "Received unexpected response from the push endpoint, eventID must not be null or empty" + ); + } + + // Retry logic to wait for event + await WaitForEventAsync( + ingestionTransporter, + resp.RunID, + resp.EventID, + maxRetries: 50, + cancellationToken: cancellationToken + ) + .ConfigureAwait(false); + } + offset = responses.Count; + } + } + + return responses; + } + + // ==================== SaveObjectsWithTransformation ==================== + + /// + public async Task< + List + > SaveObjectsWithTransformationAsync( + string indexName, + IEnumerable objects, + bool waitForTasks = false, + int batchSize = 1000, + RequestOptions options = null, + CancellationToken cancellationToken = default + ) + where T : class + { + if (_ingestionTransporter == null) + { + throw new AlgoliaException( + "`transformation.region` must be provided at client instantiation before calling this method." + ); + } + + return await ChunkedPushAsync( + _ingestionTransporter, + indexName, + objects, + Algolia.Search.Models.Ingestion.Action.AddObject, + waitForTasks, + batchSize, + referenceIndexName: null, + options, + cancellationToken + ) + .ConfigureAwait(false); + } + + /// + public List SaveObjectsWithTransformation( + string indexName, + IEnumerable objects, + bool waitForTasks = false, + int batchSize = 1000, + RequestOptions options = null, + CancellationToken cancellationToken = default + ) + where T : class => + AsyncHelper.RunSync(() => + SaveObjectsWithTransformationAsync( + indexName, + objects, + waitForTasks, + batchSize, + options, + cancellationToken + ) + ); + + // ==================== PartialUpdateObjectsWithTransformation ==================== + + /// + public async Task< + List + > PartialUpdateObjectsWithTransformationAsync( + string indexName, + IEnumerable objects, + bool createIfNotExists = true, + bool waitForTasks = false, + int batchSize = 1000, + RequestOptions options = null, + CancellationToken cancellationToken = default + ) + where T : class + { + if (_ingestionTransporter == null) + { + throw new AlgoliaException( + "`transformation.region` must be provided at client instantiation before calling this method." + ); + } + + var action = createIfNotExists + ? Algolia.Search.Models.Ingestion.Action.PartialUpdateObject + : Algolia.Search.Models.Ingestion.Action.PartialUpdateObjectNoCreate; + + return await ChunkedPushAsync( + _ingestionTransporter, + indexName, + objects, + action, + waitForTasks, + batchSize, + referenceIndexName: null, + options, + cancellationToken + ) + .ConfigureAwait(false); + } + + /// + public List PartialUpdateObjectsWithTransformation( + string indexName, + IEnumerable objects, + bool createIfNotExists = true, + bool waitForTasks = false, + int batchSize = 1000, + RequestOptions options = null, + CancellationToken cancellationToken = default + ) + where T : class => + AsyncHelper.RunSync(() => + PartialUpdateObjectsWithTransformationAsync( + indexName, + objects, + createIfNotExists, + waitForTasks, + batchSize, + options, + cancellationToken + ) + ); + + // ==================== ReplaceAllObjectsWithTransformation ==================== + + /// + public async Task ReplaceAllObjectsWithTransformationAsync( + string indexName, + IEnumerable objects, + int batchSize = 1000, + List scopes = null, + RequestOptions options = null, + CancellationToken cancellationToken = default + ) + where T : class + { + if (_ingestionTransporter == null) + { + throw new AlgoliaException( + "`transformation.region` must be provided at client instantiation before calling this method." + ); + } + + if (scopes == null) + { + scopes = new List { ScopeType.Settings, ScopeType.Rules, ScopeType.Synonyms }; + } + + var random = new Random(); + var randomSuffix = random.Next(100000, 1000000); + var tmpIndexName = $"{indexName}_tmp_{randomSuffix}"; + + try + { + // Step 1: Copy settings/rules/synonyms from source to temp + var copyOperationResponse = await OperationIndexAsync( + indexName, + new OperationIndexParams(OperationType.Copy, tmpIndexName) { Scope = scopes }, + options, + cancellationToken + ) + .ConfigureAwait(false); + + // Step 2: Push transformed objects to temp index (referencing original index for transformation) + var watchResponses = await ChunkedPushAsync( + _ingestionTransporter, + tmpIndexName, + objects, + Algolia.Search.Models.Ingestion.Action.AddObject, + waitForTasks: true, + batchSize, + referenceIndexName: indexName, // CRITICAL: Apply transformation from original index + options, + cancellationToken + ) + .ConfigureAwait(false); + + // Step 3: Wait for copy operation to complete + await WaitForTaskAsync( + tmpIndexName, + copyOperationResponse.TaskID, + requestOptions: options, + ct: cancellationToken + ) + .ConfigureAwait(false); + + // Step 4: Copy again to ensure latest settings/rules/synonyms + copyOperationResponse = await OperationIndexAsync( + indexName, + new OperationIndexParams(OperationType.Copy, tmpIndexName) { Scope = scopes }, + options, + cancellationToken + ) + .ConfigureAwait(false); + + await WaitForTaskAsync( + tmpIndexName, + copyOperationResponse.TaskID, + requestOptions: options, + ct: cancellationToken + ) + .ConfigureAwait(false); + + // Step 5: Move temp index to replace original + var moveOperationResponse = await OperationIndexAsync( + tmpIndexName, + new OperationIndexParams(OperationType.Move, indexName), + options, + cancellationToken + ) + .ConfigureAwait(false); + + await WaitForTaskAsync( + tmpIndexName, + moveOperationResponse.TaskID, + requestOptions: options, + ct: cancellationToken + ) + .ConfigureAwait(false); + + return new ReplaceAllObjectsWithTransformationResponse( + copyOperationResponse, + watchResponses, + moveOperationResponse + ); + } + catch + { + // Clean up temp index on error + try + { + await DeleteIndexAsync(tmpIndexName, cancellationToken: cancellationToken) + .ConfigureAwait(false); + } + catch + { + // Ignore errors during cleanup + } + throw; + } + } + + /// + public ReplaceAllObjectsWithTransformationResponse ReplaceAllObjectsWithTransformation( + string indexName, + IEnumerable objects, + int batchSize = 1000, + List scopes = null, + RequestOptions options = null, + CancellationToken cancellationToken = default + ) + where T : class => + AsyncHelper.RunSync(() => + ReplaceAllObjectsWithTransformationAsync( + indexName, + objects, + batchSize, + scopes, + options, + cancellationToken + ) + ); } diff --git a/specs/search/helpers/partialUpdateObjectsWithTransformation.yml b/specs/search/helpers/partialUpdateObjectsWithTransformation.yml index eac22717a29..1159fc9d2eb 100644 --- a/specs/search/helpers/partialUpdateObjectsWithTransformation.yml +++ b/specs/search/helpers/partialUpdateObjectsWithTransformation.yml @@ -7,6 +7,7 @@ method: - javascript - php - python + - csharp tags: - Records operationId: partialUpdateObjectsWithTransformation diff --git a/specs/search/helpers/replaceAllObjectsWithTransformation.yml b/specs/search/helpers/replaceAllObjectsWithTransformation.yml index ea99f18f8bd..8abf31fbdb4 100644 --- a/specs/search/helpers/replaceAllObjectsWithTransformation.yml +++ b/specs/search/helpers/replaceAllObjectsWithTransformation.yml @@ -9,6 +9,7 @@ method: - java - php - python + - csharp operationId: replaceAllObjectsWithTransformation summary: Replace all records in an index description: | diff --git a/specs/search/helpers/saveObjectsWithTransformation.yml b/specs/search/helpers/saveObjectsWithTransformation.yml index 2f0977b0f82..7a1959f0296 100644 --- a/specs/search/helpers/saveObjectsWithTransformation.yml +++ b/specs/search/helpers/saveObjectsWithTransformation.yml @@ -7,6 +7,7 @@ method: - javascript - php - python + - csharp tags: - Records operationId: saveObjectsWithTransformation diff --git a/templates/csharp/api.mustache b/templates/csharp/api.mustache index 3c881667ced..323f29ba5a8 100644 --- a/templates/csharp/api.mustache +++ b/templates/csharp/api.mustache @@ -13,6 +13,9 @@ using Algolia.Search.Http; using Algolia.Search.Utils; using Microsoft.Extensions.Logging; using Microsoft.Extensions.Logging.Abstractions; +{{#isSearchClient}} +using Algolia.Search.Clients; // For IngestionClient +{{/isSearchClient}} namespace Algolia.Search.Clients; @@ -84,6 +87,9 @@ namespace Algolia.Search.Clients; { internal HttpTransport _transport; private readonly ILogger<{{apiPackageName}}Client> _logger; + {{#isSearchClient}} + private IIngestionClient _ingestionTransporter; + {{/isSearchClient}} /// /// Create a new {{apiPackageName}} client for the given appID and apiKey. @@ -149,6 +155,35 @@ namespace Algolia.Search.Clients; { _transport._algoliaConfig.SetClientApiKey(apiKey); } + + {{#isSearchClient}} + /// + /// Sets the region of the transformation pipeline. This is required to be called + /// if you wish to leverage the transformation pipeline (via the *WithTransformation methods). + /// + /// The region ("us" or "eu") + /// Logger factory + public void SetTransformationRegion(string region, ILoggerFactory factory = null) + { + if (string.IsNullOrWhiteSpace(region)) + { + throw new ArgumentException("`region` must be provided when leveraging the transformation pipeline"); + } + + if (string.IsNullOrWhiteSpace(_transport._algoliaConfig.AppId) || + string.IsNullOrWhiteSpace(_transport._algoliaConfig.ApiKey)) + { + throw new ArgumentException("AppId and ApiKey are required for transformation pipeline"); + } + + _ingestionTransporter = new IngestionClient( + _transport._algoliaConfig.AppId, + _transport._algoliaConfig.ApiKey, + region, + factory + ); + } + {{/isSearchClient}} {{#operation}} diff --git a/templates/csharp/tests/client/createClient.mustache b/templates/csharp/tests/client/createClient.mustache index 41e5c9a449d..ac9d98c1c19 100644 --- a/templates/csharp/tests/client/createClient.mustache +++ b/templates/csharp/tests/client/createClient.mustache @@ -14,5 +14,6 @@ }; {{^autoCreateClient}}var client = {{/autoCreateClient}}new {{client}}(_config); {{/hasCustomHosts}} +{{#hasTransformationRegion}}client.SetTransformationRegion("{{{transformationRegion}}}");{{/hasTransformationRegion}} {{#useEchoRequester}} {{^autoCreateClient}}var client = {{/autoCreateClient}}new {{client}}(new {{clientPrefix}}Config("{{parametersWithDataTypeMap.appId.value}}","{{parametersWithDataTypeMap.apiKey.value}}"{{#hasRegionalHost}}{{#parametersWithDataTypeMap.region}},"{{parametersWithDataTypeMap.region.value}}"{{/parametersWithDataTypeMap.region}}{{/hasRegionalHost}}), _echo);{{/useEchoRequester}} \ No newline at end of file