diff --git a/playground/php/src/search.php b/playground/php/src/search.php index 241549d66f9..ff6b05523c2 100644 --- a/playground/php/src/search.php +++ b/playground/php/src/search.php @@ -89,8 +89,8 @@ // } // var_dump($rules); -$configForIngestion = $config->setFullHosts(['http://localhost:6689'])->setTransformationRegion('eu'); +$configForIngestion = $config->setTransformationRegion('eu'); $clientWithTransformation = SearchClient::createWithConfig($configForIngestion); -var_dump($clientWithTransformation->saveObjectsWithTransformation('boyd', [['objectID' => '1', 'name' => 'Michel']], true)); +var_dump($clientWithTransformation->replaceAllObjectsWithTransformation('boyd', [['objectID' => '1', 'name' => 'Michel']], true)); diff --git a/playground/python/app/ingestion.py b/playground/python/app/ingestion.py index 30dfe7286c5..e432f28c550 100644 --- a/playground/python/app/ingestion.py +++ b/playground/python/app/ingestion.py @@ -1,13 +1,16 @@ +from os import environ from asyncio import run from algoliasearch.ingestion import __version__ from algoliasearch.ingestion.client import IngestionClient +from dotenv import load_dotenv +load_dotenv("../.env") async def main(): print("IngestionClient version", __version__) - client = IngestionClient("FOO", "BAR") + client = IngestionClient(environ.get("ALGOLIA_APPLICATION_ID"), environ.get("ALGOLIA_ADMIN_KEY"), "eu") print("client initialized", client) diff --git a/playground/python/app/search.py b/playground/python/app/search.py index e708f0e1b4e..4945d1dec40 100644 --- a/playground/python/app/search.py +++ b/playground/python/app/search.py @@ -11,26 +11,26 @@ def main(): print("SearchClient version", __version__) - client = SearchClientSync( - environ.get("ALGOLIA_APPLICATION_ID"), environ.get("ALGOLIA_ADMIN_KEY") - ) - client.add_user_agent("playground") - client.add_user_agent("bar", "baz") - - print("user_agent", client._config._user_agent.get()) - print("client initialized", client) - - try: - resp = client.search_synonyms("foo") - print(resp) - client.browse_synonyms("foo", lambda _resp: print(_resp)) - finally: - client.close() - - print("client closed") - - print("with transformations") - + # client = SearchClientSync( + # environ.get("ALGOLIA_APPLICATION_ID"), environ.get("ALGOLIA_ADMIN_KEY") + # ) + # client.add_user_agent("playground") + # client.add_user_agent("bar", "baz") + # + # print("user_agent", client._config._user_agent.get()) + # print("client initialized", client) + # + # try: + # resp = client.search_synonyms("foo") + # print(resp) + # client.browse_synonyms("foo", lambda _resp: print(_resp)) + # finally: + # client.close() + # + # print("client closed") + # + # print("with transformations") + # config = SearchConfig( environ.get("ALGOLIA_APPLICATION_ID"), environ.get("ALGOLIA_ADMIN_KEY") ) @@ -46,10 +46,12 @@ def main(): print("user_agent", client._config._user_agent.get()) try: - resp = client.save_objects_with_transformation( - "foo", [{"objectID": "bar"}], wait_for_tasks=True + resp = client.replace_all_objects_with_transformation( + "boyd", [{"objectID": "bar"},{"objectID": "bar"},{"objectID": "bar"},{"objectID": "bar"},{"objectID": "bar"}], 2 ) print(resp) + except Exception as e: + print(e) finally: client.close() diff --git a/playground/python/poetry.lock b/playground/python/poetry.lock index 8cbaed43417..144c811ab7a 100644 --- a/playground/python/poetry.lock +++ b/playground/python/poetry.lock @@ -1,4 +1,4 @@ -# This file is automatically @generated by Poetry 2.1.1 and should not be changed by hand. +# This file is automatically @generated by Poetry 2.0.0 and should not be changed by hand. [[package]] name = "aiohappyeyeballs" @@ -123,7 +123,7 @@ multidict = ">=4.5,<7.0" yarl = ">=1.12.0,<2.0" [package.extras] -speedups = ["Brotli ; platform_python_implementation == \"CPython\"", "aiodns (>=3.2.0) ; sys_platform == \"linux\" or sys_platform == \"darwin\"", "brotlicffi ; platform_python_implementation != \"CPython\""] +speedups = ["Brotli", "aiodns (>=3.2.0)", "brotlicffi"] [[package]] name = "aiosignal" @@ -142,7 +142,7 @@ frozenlist = ">=1.1.0" [[package]] name = "algoliasearch" -version = "4.17.0" +version = "4.20.0" description = "A fully-featured and blazing-fast Python API client to interact with Algolia." optional = false python-versions = ">= 3.8.1" @@ -202,12 +202,12 @@ files = [ ] [package.extras] -benchmark = ["cloudpickle ; platform_python_implementation == \"CPython\"", "hypothesis", "mypy (>=1.11.1) ; platform_python_implementation == \"CPython\" and python_version >= \"3.10\"", "pympler", "pytest (>=4.3.0)", "pytest-codspeed", "pytest-mypy-plugins ; platform_python_implementation == \"CPython\" and python_version >= \"3.10\"", "pytest-xdist[psutil]"] -cov = ["cloudpickle ; platform_python_implementation == \"CPython\"", "coverage[toml] (>=5.3)", "hypothesis", "mypy (>=1.11.1) ; platform_python_implementation == \"CPython\" and python_version >= \"3.10\"", "pympler", "pytest (>=4.3.0)", "pytest-mypy-plugins ; platform_python_implementation == \"CPython\" and python_version >= \"3.10\"", "pytest-xdist[psutil]"] -dev = ["cloudpickle ; platform_python_implementation == \"CPython\"", "hypothesis", "mypy (>=1.11.1) ; platform_python_implementation == \"CPython\" and python_version >= \"3.10\"", "pre-commit-uv", "pympler", "pytest (>=4.3.0)", "pytest-mypy-plugins ; platform_python_implementation == \"CPython\" and python_version >= \"3.10\"", "pytest-xdist[psutil]"] +benchmark = ["cloudpickle", "hypothesis", "mypy (>=1.11.1)", "pympler", "pytest (>=4.3.0)", "pytest-codspeed", "pytest-mypy-plugins", "pytest-xdist[psutil]"] +cov = ["cloudpickle", "coverage[toml] (>=5.3)", "hypothesis", "mypy (>=1.11.1)", "pympler", "pytest (>=4.3.0)", "pytest-mypy-plugins", "pytest-xdist[psutil]"] +dev = ["cloudpickle", "hypothesis", "mypy (>=1.11.1)", "pre-commit-uv", "pympler", "pytest (>=4.3.0)", "pytest-mypy-plugins", "pytest-xdist[psutil]"] docs = ["cogapp", "furo", "myst-parser", "sphinx", "sphinx-notfound-page", "sphinxcontrib-towncrier", "towncrier"] -tests = ["cloudpickle ; platform_python_implementation == \"CPython\"", "hypothesis", "mypy (>=1.11.1) ; platform_python_implementation == \"CPython\" and python_version >= \"3.10\"", "pympler", "pytest (>=4.3.0)", "pytest-mypy-plugins ; platform_python_implementation == \"CPython\" and python_version >= \"3.10\"", "pytest-xdist[psutil]"] -tests-mypy = ["mypy (>=1.11.1) ; platform_python_implementation == \"CPython\" and python_version >= \"3.10\"", "pytest-mypy-plugins ; platform_python_implementation == \"CPython\" and python_version >= \"3.10\""] +tests = ["cloudpickle", "hypothesis", "mypy (>=1.11.1)", "pympler", "pytest (>=4.3.0)", "pytest-mypy-plugins", "pytest-xdist[psutil]"] +tests-mypy = ["mypy (>=1.11.1)", "pytest-mypy-plugins"] [[package]] name = "certifi" @@ -684,7 +684,7 @@ typing-extensions = ">=4.12.2" [package.extras] email = ["email-validator (>=2.0.0)"] -timezone = ["tzdata ; python_version >= \"3.9\" and platform_system == \"Windows\""] +timezone = ["tzdata"] [[package]] name = "pydantic-core" @@ -937,7 +937,7 @@ files = [ ] [package.extras] -brotli = ["brotli (>=1.0.9) ; platform_python_implementation == \"CPython\"", "brotlicffi (>=0.8.0) ; platform_python_implementation != \"CPython\""] +brotli = ["brotli (>=1.0.9)", "brotlicffi (>=0.8.0)"] h2 = ["h2 (>=4,<5)"] socks = ["pysocks (>=1.5.6,!=1.5.7,<2.0)"] zstd = ["zstandard (>=0.18.0)"] diff --git a/scripts/cts/runCts.ts b/scripts/cts/runCts.ts index 7885e47506f..a38921927cc 100644 --- a/scripts/cts/runCts.ts +++ b/scripts/cts/runCts.ts @@ -155,7 +155,9 @@ export async function runCts( assertValidTimeouts(languages.length); assertChunkWrapperValid(languages.length - skip('dart')); assertValidReplaceAllObjects(languages.length - skip('dart')); - assertValidReplaceAllObjectsWithTransformation(only('javascript')); + assertValidReplaceAllObjectsWithTransformation( + only('javascript') + only('go') + only('python') + only('java') + only('php'), + ); assertValidAccountCopyIndex(only('javascript')); assertValidReplaceAllObjectsFailed(languages.length - skip('dart')); assertValidReplaceAllObjectsScopes(languages.length - skip('dart')); diff --git a/scripts/cts/testServer/replaceAllObjectsWithTransformation.ts b/scripts/cts/testServer/replaceAllObjectsWithTransformation.ts index 3a57e129f21..29b83a57aeb 100644 --- a/scripts/cts/testServer/replaceAllObjectsWithTransformation.ts +++ b/scripts/cts/testServer/replaceAllObjectsWithTransformation.ts @@ -92,6 +92,7 @@ function addRoutes(app: Express): void { )?.[1] as string; expect(raowtState).to.include.keys(lang); expect(req.body.action === 'addObject').to.equal(true); + expect(req.query.referenceIndexName === `cts_e2e_replace_all_objects_with_transformation_${lang}`).to.equal(true); raowtState[lang].pushCount += req.body.records.length; @@ -104,7 +105,14 @@ function addRoutes(app: Express): void { }); app.get('/1/runs/:runID/events/:eventID', (req, res) => { - res.json({ status: 'finished' }); + res.json({ + status: 'succeeded', + eventID: '113b2068-6337-4c85-b5c2-e7b213d82921', + runID: 'b1b7a982-524c-40d2-bb7f-48aab075abda', + type: 'fetch', + batchSize: 1, + publishedAt: '2022-05-12T06:24:30.049Z', + }); }); app.get('/1/indexes/:indexName/task/:taskID', (req, res) => { diff --git a/specs/search/helpers/replaceAllObjectsWithTransformation.yml b/specs/search/helpers/replaceAllObjectsWithTransformation.yml index 2521366622a..5bcaee7546d 100644 --- a/specs/search/helpers/replaceAllObjectsWithTransformation.yml +++ b/specs/search/helpers/replaceAllObjectsWithTransformation.yml @@ -5,6 +5,10 @@ method: - Records x-available-languages: - javascript + - go + - java + - php + - python operationId: replaceAllObjectsWithTransformation summary: Replace all records in an index description: | diff --git a/templates/go/api.mustache b/templates/go/api.mustache index e82cd9db638..bec79423feb 100644 --- a/templates/go/api.mustache +++ b/templates/go/api.mustache @@ -227,16 +227,23 @@ func toRequestOptions[T RequestOption](opts []T) []RequestOption { return requestOpts } -func toIngestionRequestOptions(opts []RequestOption) []ingestion.RequestOption { - requestOpts := make([]ingestion.RequestOption, 0, len(opts)) +// toIngestionChunkedBatchOptions converts the current chunked batch opts to ingestion ones. +func toIngestionChunkedBatchOptions(opts []ChunkedBatchOption) []ingestion.ChunkedBatchOption { + conf := config{} for _, opt := range opts { - if opt, ok := opt.(ingestion.RequestOption); ok { - requestOpts = append(requestOpts, opt) - } + opt.apply(&conf) } - return requestOpts + ingestionOpts := make([]ingestion.ChunkedBatchOption, 0, len(opts)) + + if conf.batchSize > 0 { + ingestionOpts = append(ingestionOpts, ingestion.WithBatchSize(conf.batchSize)) + } + + ingestionOpts = append(ingestionOpts, ingestion.WithWaitForTasks(conf.waitForTasks)) + + return ingestionOpts } func toIterableOptions(opts []ChunkedBatchOption) []IterableOption { diff --git a/templates/go/ingestion_helpers.mustache b/templates/go/ingestion_helpers.mustache index c1cd379ab96..2e2229799cb 100644 --- a/templates/go/ingestion_helpers.mustache +++ b/templates/go/ingestion_helpers.mustache @@ -9,7 +9,7 @@ ChunkedPush Chunks the given `objects` list in subset of 1000 elements max in or @return []WatchResponse - List of push responses. @return error - Error if any. */ -func (c *APIClient) ChunkedPush(indexName string, objects []map[string]any, action Action, referenceIndexName *string, opts ...RequestOption) ([]WatchResponse, error) { +func (c *APIClient) ChunkedPush(indexName string, objects []map[string]any, action Action, referenceIndexName *string, opts ...ChunkedBatchOption) ([]WatchResponse, error) { conf := config{ headerParams: map[string]string{}, waitForTasks: false, diff --git a/templates/go/search_helpers.mustache b/templates/go/search_helpers.mustache index 2df49c6712d..7391f0867a3 100644 --- a/templates/go/search_helpers.mustache +++ b/templates/go/search_helpers.mustache @@ -599,6 +599,100 @@ func (c *APIClient) ChunkedBatch(indexName string, objects []map[string]any, act return responses, nil } +/* +ReplaceAllObjectsWithTransformation is similar to the `replaceAllObjects` method but requires a Push connector (https://www.algolia.com/doc/guides/sending-and-managing-data/send-and-update-your-data/connectors/push/) to be created first, in order to transform records before indexing them to Algolia. The `region` must have been passed to the client instantiation method. +See https://api-clients-automation.netlify.app/docs/add-new-api-client#5-helpers for implementation details. + + @param indexName string - the index name to replace objects into. + @param objects []map[string]any - List of objects to replace. + @param opts ...ReplaceAllObjectsOption - Optional parameters for the request. + @return *ReplaceAllObjectsResponse - The response of the replace all objects operation. + @return error - Error if any. +*/ +func (c *APIClient) ReplaceAllObjectsWithTransformation(indexName string, objects []map[string]any, opts ...ReplaceAllObjectsOption) (*ReplaceAllObjectsWithTransformationResponse, error) { + if c.ingestionTransporter == nil { + return nil, reportError("`region` must be provided at client instantiation before calling this method.") + } + + tmpIndexName := fmt.Sprintf("%s_tmp_%d", indexName, time.Now().UnixNano()) + + conf := config{ + headerParams: map[string]string{}, + scopes: []ScopeType{SCOPE_TYPE_SETTINGS, SCOPE_TYPE_RULES, SCOPE_TYPE_SYNONYMS}, + } + + for _, opt := range opts { + opt.apply(&conf) + } + + opts = append(opts, WithWaitForTasks(true)) + + copyResp, err := c.OperationIndex(c.NewApiOperationIndexRequest(indexName, NewOperationIndexParams(OPERATION_TYPE_COPY, tmpIndexName, WithOperationIndexParamsScope(conf.scopes))), toRequestOptions(opts)...) + if err != nil { + return nil, err + } + + watchResp, err := c.ingestionTransporter.ChunkedPush(tmpIndexName, objects, ingestion.Action(ACTION_ADD_OBJECT), &indexName, toIngestionChunkedBatchOptions(replaceAllObjectsToChunkBactchOptions(opts))...) + if err != nil { + _, _ = c.DeleteIndex(c.NewApiDeleteIndexRequest(tmpIndexName)) + + return nil, err //nolint:wrapcheck + } + + _, err = c.WaitForTask(tmpIndexName, copyResp.TaskID, replaceAllObjectsToIterableOptions(opts)...) + if err != nil { + _, _ = c.DeleteIndex(c.NewApiDeleteIndexRequest(tmpIndexName)) + + return nil, err + } + + copyResp, err = c.OperationIndex(c.NewApiOperationIndexRequest(indexName, NewOperationIndexParams(OPERATION_TYPE_COPY, tmpIndexName, WithOperationIndexParamsScope(conf.scopes))), toRequestOptions(opts)...) + if err != nil { + _, _ = c.DeleteIndex(c.NewApiDeleteIndexRequest(tmpIndexName)) + + return nil, err + } + + _, err = c.WaitForTask(tmpIndexName, copyResp.TaskID, replaceAllObjectsToIterableOptions(opts)...) + if err != nil { + _, _ = c.DeleteIndex(c.NewApiDeleteIndexRequest(tmpIndexName)) + + return nil, err + } + + moveResp, err := c.OperationIndex(c.NewApiOperationIndexRequest(tmpIndexName, NewOperationIndexParams(OPERATION_TYPE_MOVE, indexName)), toRequestOptions(opts)...) + if err != nil { + _, _ = c.DeleteIndex(c.NewApiDeleteIndexRequest(tmpIndexName)) + + return nil, err + } + + _, err = c.WaitForTask(tmpIndexName, moveResp.TaskID, replaceAllObjectsToIterableOptions(opts)...) + if err != nil { + _, _ = c.DeleteIndex(c.NewApiDeleteIndexRequest(tmpIndexName)) + + return nil, err + } + + var searchWatchResp []WatchResponse + + rawResp, err := json.Marshal(watchResp) + if err != nil { + return nil, fmt.Errorf("unable to convert the ingestion WatchResponse to search WatchResponse: %w", err) + } + + err = json.Unmarshal(rawResp, &searchWatchResp) + if err != nil { + return nil, fmt.Errorf("unable to convert the ingestion WatchResponse to search WatchResponse: %w", err) + } + + return &ReplaceAllObjectsWithTransformationResponse{ + CopyOperationResponse: *copyResp, + WatchResponses: searchWatchResp, + MoveOperationResponse: *moveResp, + }, nil +} + /* ReplaceAllObjects replaces all objects (records) in the given `indexName` with the given `objects`. A temporary index is created during this process in order to backup your data. See https://api-clients-automation.netlify.app/docs/add-new-api-client#5-helpers for implementation details. @@ -708,7 +802,7 @@ func (c *APIClient) SaveObjectsWithTransformation(indexName string, objects []ma return nil, reportError("`region` must be provided at client instantiation before calling this method.") } - return c.ingestionTransporter.ChunkedPush(indexName, objects, ingestion.Action(ACTION_ADD_OBJECT), nil, toIngestionRequestOptions(toRequestOptions(opts))...) //nolint:wrapcheck + return c.ingestionTransporter.ChunkedPush(indexName, objects, ingestion.Action(ACTION_ADD_OBJECT), nil, toIngestionChunkedBatchOptions(opts)...) //nolint:wrapcheck } /* @@ -725,22 +819,22 @@ func (c *APIClient) PartialUpdateObjectsWithTransformation(indexName string, obj return nil, reportError("`region` must be provided at client instantiation before calling this method.") } - conf := config{ - headerParams: map[string]string{}, - createIfNotExists: true, - } + conf := config{ + headerParams: map[string]string{}, + createIfNotExists: true, + } - for _, opt := range opts { - opt.apply(&conf) - } + for _, opt := range opts { + opt.apply(&conf) + } - var action Action + var action Action - if conf.createIfNotExists { - action = ACTION_PARTIAL_UPDATE_OBJECT - } else { - action = ACTION_PARTIAL_UPDATE_OBJECT_NO_CREATE - } + if conf.createIfNotExists { + action = ACTION_PARTIAL_UPDATE_OBJECT + } else { + action = ACTION_PARTIAL_UPDATE_OBJECT_NO_CREATE + } - return c.ingestionTransporter.ChunkedPush(indexName, objects, ingestion.Action(action), nil, toIngestionRequestOptions(toRequestOptions(opts))...) //nolint:wrapcheck + return c.ingestionTransporter.ChunkedPush(indexName, objects, ingestion.Action(action), nil, toIngestionChunkedBatchOptions(partialUpdateObjectsToChunkedBatchOptions(opts))...) //nolint:wrapcheck } \ No newline at end of file diff --git a/templates/java/api_helpers.mustache b/templates/java/api_helpers.mustache index 294d7662bdc..4445d8238cb 100644 --- a/templates/java/api_helpers.mustache +++ b/templates/java/api_helpers.mustache @@ -1304,6 +1304,166 @@ public ReplaceAllObjectsResponse replaceAllObjects( } } +/** + * Helper: Similar to the `saveObjects` method but requires a Push connector + * (https://www.algolia.com/doc/guides/sending-and-managing-data/send-and-update-your-data/connectors/push/) + * to be created first, in order to transform records before indexing them to Algolia. The + * `region` must have been passed to the client instantiation method. + * + * @param indexName The `indexName` to replace `objects` in. + * @param objects The array of `objects` to store in the given Algolia `indexName`. + * @throws AlgoliaRetryException When the retry has failed on all hosts + * @throws AlgoliaApiException When the API sends an http error code + * @throws AlgoliaRuntimeException When an error occurred during the serialization + */ +public ReplaceAllObjectsWithTransformationResponse replaceAllObjectsWithTransformation(String indexName, Iterable objects) { + return replaceAllObjectsWithTransformation(indexName, objects, -1); +} + +/** + * Helper: Similar to the `saveObjects` method but requires a Push connector + * (https://www.algolia.com/doc/guides/sending-and-managing-data/send-and-update-your-data/connectors/push/) + * to be created first, in order to transform records before indexing them to Algolia. The + * `region` must have been passed to the client instantiation method. + * + * @param indexName The `indexName` to replace `objects` in. + * @param objects The array of `objects` to store in the given Algolia `indexName`. + * @param batchSize The size of the chunk of `objects`. The number of `batch` calls will be equal + * to `length(objects) / batchSize`. + * @throws AlgoliaRetryException When the retry has failed on all hosts + * @throws AlgoliaApiException When the API sends an http error code + * @throws AlgoliaRuntimeException When an error occurred during the serialization + */ +public ReplaceAllObjectsWithTransformationResponse replaceAllObjectsWithTransformation(String indexName, Iterable objects, int batchSize) { + return replaceAllObjectsWithTransformation(indexName, objects, batchSize, null, null); +} + +/** + * Helper: Similar to the `saveObjects` method but requires a Push connector + * (https://www.algolia.com/doc/guides/sending-and-managing-data/send-and-update-your-data/connectors/push/) + * to be created first, in order to transform records before indexing them to Algolia. The + * `region` must have been passed to the client instantiation method. + * + * @param indexName The `indexName` to replace `objects` in. + * @param objects The array of `objects` to store in the given Algolia `indexName`. + * @param batchSize The size of the chunk of `objects`. The number of `batch` calls will be equal + * to `length(objects) / batchSize`. + * @param scopes The `scopes` to keep from the index. Defaults to ['settings', 'rules', + * 'synonyms']. + * @throws AlgoliaRetryException When the retry has failed on all hosts + * @throws AlgoliaApiException When the API sends an http error code + * @throws AlgoliaRuntimeException When an error occurred during the serialization + */ +public ReplaceAllObjectsWithTransformationResponse replaceAllObjectsWithTransformation(String indexName, Iterable objects, int batchSize, List scopes) { + return replaceAllObjectsWithTransformation(indexName, objects, batchSize, scopes, null); +} + +/** + * Helper: Similar to the `saveObjects` method but requires a Push connector + * (https://www.algolia.com/doc/guides/sending-and-managing-data/send-and-update-your-data/connectors/push/) + * to be created first, in order to transform records before indexing them to Algolia. The + * `region` must have been passed to the client instantiation method. + * + * @param indexName The `indexName` to replace `objects` in. + * @param objects The array of `objects` to store in the given Algolia `indexName`. + * @param batchSize The size of the chunk of `objects`. The number of `batch` calls will be equal + * to `length(objects) / batchSize`. + * @param scopes The `scopes` to keep from the index. Defaults to ['settings', 'rules', + * 'synonyms']. + * @param requestOptions The requestOptions to send along with the query, they will be merged with + * the transporter requestOptions. (optional) + * @throws AlgoliaRetryException When the retry has failed on all hosts + * @throws AlgoliaApiException When the API sends an http error code + * @throws AlgoliaRuntimeException When an error occurred during the serialization + */ +public ReplaceAllObjectsWithTransformationResponse replaceAllObjectsWithTransformation( + String indexName, + Iterable objects, + int batchSize, + List scopes, + RequestOptions requestOptions +) { + if (this.ingestionTransporter == null) { + throw new AlgoliaRuntimeException("`setTransformationRegion` must have been called before calling this method."); + } + + Random rnd = new Random(); + String tmpIndexName = indexName + "_tmp_" + rnd.nextInt(100); + + if (batchSize == -1) { + batchSize = 1000; + } + + if (scopes == null) { + scopes = new ArrayList() { + { + add(ScopeType.SETTINGS); + add(ScopeType.RULES); + add(ScopeType.SYNONYMS); + } + }; + } + + try { + // Copy settings, synonyms and rules + UpdatedAtResponse copyOperationResponse = operationIndex( + indexName, + new OperationIndexParams().setOperation(OperationType.COPY).setDestination(tmpIndexName).setScope(scopes), + requestOptions + ); + + // Save new objects + List watchResponses = + this.ingestionTransporter.chunkedPush( + tmpIndexName, + objects, + com.algolia.model.ingestion.Action.ADD_OBJECT, + true, + batchSize, + indexName, + requestOptions + ); + + waitForTask(tmpIndexName, copyOperationResponse.getTaskID(), requestOptions); + + copyOperationResponse = operationIndex( + indexName, + new OperationIndexParams().setOperation(OperationType.COPY).setDestination(tmpIndexName).setScope(scopes), + requestOptions + ); + waitForTask(tmpIndexName, copyOperationResponse.getTaskID(), requestOptions); + + // Move temporary index to source index + UpdatedAtResponse moveOperationResponse = operationIndex( + tmpIndexName, + new OperationIndexParams().setOperation(OperationType.MOVE).setDestination(indexName), + requestOptions + ); + waitForTask(tmpIndexName, moveOperationResponse.getTaskID(), requestOptions); + + return new ReplaceAllObjectsWithTransformationResponse() + .setCopyOperationResponse(copyOperationResponse) + .setWatchResponses(ingestionResponseToSearchResponse(watchResponses)) + .setMoveOperationResponse(moveOperationResponse); + } catch (Exception e) { + deleteIndex(tmpIndexName); + + throw e; + } +} + +private List ingestionResponseToSearchResponse(List responses) { + try { + ObjectMapper mapper = new ObjectMapper(); + String json = mapper.writeValueAsString(responses); + + return mapper.readValue(json, new TypeReference>() {}); + } catch (Exception e) { + throw new AlgoliaRuntimeException("ingestion WatchResponse cannot be converted to a search WatchResponse"); + } +} + + /** * Helper: Generates a secured API key based on the given `parent_api_key` and given * `restrictions`. diff --git a/templates/php/api.mustache b/templates/php/api.mustache index a5a253e59fd..f973dbe9b0a 100644 --- a/templates/php/api.mustache +++ b/templates/php/api.mustache @@ -330,8 +330,8 @@ use Algolia\AlgoliaSearch\Exceptions\NotFoundException; $objects, $action = 'addObject', $waitForTasks = true, - $referenceIndexName = null, $batchSize = 1000, + $referenceIndexName = null, $requestOptions = [] ) { $responses = []; @@ -340,6 +340,7 @@ use Algolia\AlgoliaSearch\Exceptions\NotFoundException; foreach ($objects as $object) { $records[] = $object; + $ok = false; if (sizeof($records) === $batchSize || $count === sizeof($objects) - 1) { $responses[] = $this->push($indexName, ['action' => $action, 'records' => $records], false, $referenceIndexName, $requestOptions); @@ -357,18 +358,17 @@ use Algolia\AlgoliaSearch\Exceptions\NotFoundException; $timeoutCalculation = 'Algolia\AlgoliaSearch\Support\Helpers::linearTimeout'; foreach ($responses as $response) { - $this->waitForTask($indexName, $response['taskID']); $retry = 0; while ($retry < 50) { try { - $resp = $this->getEvent($response->runID, $response->eventID); + $this->getEvent($response['runID'], $response['eventID']); + + $ok = true; break; } catch (NotFoundException $e) { - if (404 === $e->getCode()) { - return null; - } + // just retry } ++$retry; @@ -377,7 +377,9 @@ use Algolia\AlgoliaSearch\Exceptions\NotFoundException; ); } - throw new ExceededRetriesException('Maximum number of retries (50) exceeded.'); + if (false === $ok) { + throw new ExceededRetriesException('Maximum number of retries (50) exceeded.'); + } } } @@ -537,6 +539,72 @@ use Algolia\AlgoliaSearch\Exceptions\NotFoundException; return new SynonymIterator($indexName, $this, $requestOptions); } + /** + * Helper: Similar to the `replaceAllObjects` method but requires a Push connector (https://www.algolia.com/doc/guides/sending-and-managing-data/send-and-update-your-data/connectors/push/) to be created first, in order to transform records before indexing them to Algolia. The `region` must have been passed to the client instantiation method. + * + * @param string $indexName The `indexName` to replace `objects` in. + * @param array $objects The array of `objects` to store in the given Algolia `indexName`. + * @param array $batchSize The size of the chunk of `objects`. The number of `batch` calls will be equal to `length(objects) / batchSize`. Defaults to 1000. + * @param array $requestOptions Request options + */ + public function replaceAllObjectsWithTransformation($indexName, $objects, $batchSize = 1000, $scopes = ['settings', 'rules', 'synonyms'], $requestOptions = []) + { + if (null == $this->ingestionTransporter) { + throw new \InvalidArgumentException('`setTransformationRegion` must have been called before calling this method.'); + } + + $tmpIndexName = $indexName.'_tmp_'.rand(10000000, 99999999); + + try { + $copyOperationResponse = $this->operationIndex( + $indexName, + [ + 'operation' => 'copy', + 'destination' => $tmpIndexName, + 'scope' => $scopes, + ], + $requestOptions + ); + + $watchResponses = $this->ingestionTransporter->chunkedPush($tmpIndexName, $objects, 'addObject', true, $batchSize, $indexName, $requestOptions); + + $this->waitForTask($tmpIndexName, $copyOperationResponse['taskID']); + + $copyOperationResponse = $this->operationIndex( + $indexName, + [ + 'operation' => 'copy', + 'destination' => $tmpIndexName, + 'scope' => $scopes, + ], + $requestOptions + ); + + $this->waitForTask($tmpIndexName, $copyOperationResponse['taskID']); + + $moveOperationResponse = $this->operationIndex( + $tmpIndexName, + [ + 'operation' => 'move', + 'destination' => $indexName, + ], + $requestOptions + ); + + $this->waitForTask($tmpIndexName, $moveOperationResponse['taskID']); + + return [ + 'copyOperationResponse' => $copyOperationResponse, + 'watchResponses' => $watchResponses, + 'moveOperationResponse' => $moveOperationResponse, + ]; + } catch (\Throwable $e) { + $this->deleteIndex($tmpIndexName); + + throw $e; + } + } + /** * Helper: Replace all objects in an index using a temporary one. * See https://api-clients-automation.netlify.app/docs/add-new-api-client#5-helpers for implementation details. diff --git a/templates/python/__init_model__.mustache b/templates/python/__init_model__.mustache index ca292c0ad88..b23ec473779 100644 --- a/templates/python/__init_model__.mustache +++ b/templates/python/__init_model__.mustache @@ -10,12 +10,14 @@ from .{{classFilename}} import {{classname}} {{#isSearchClient}} from .replace_all_objects_response import ReplaceAllObjectsResponse +from .replace_all_objects_with_transformation_response import ReplaceAllObjectsWithTransformationResponse from .secured_api_key_restrictions import SecuredApiKeyRestrictions {{/isSearchClient}} __all__ = ( {{#isSearchClient}} "ReplaceAllObjectsResponse", + "ReplaceAllObjectsWithTransformationResponse", "SecuredApiKeyRestrictions", {{/isSearchClient}} {{#models}} diff --git a/templates/python/api.mustache b/templates/python/api.mustache index 7b77864e1e8..64058fa9a48 100644 --- a/templates/python/api.mustache +++ b/templates/python/api.mustache @@ -8,11 +8,13 @@ from algoliasearch.search.models import ( BrowseParamsObject, OperationType, ReplaceAllObjectsResponse, + ReplaceAllObjectsWithTransformationResponse, ScopeType, SecuredApiKeyRestrictions, + WatchResponse, ) -from algoliasearch.ingestion.models import (WatchResponse) +from algoliasearch.ingestion.models import WatchResponse as IngestionWatchResponse from algoliasearch.ingestion.config import IngestionConfig from algoliasearch.ingestion.client import (IngestionClient, IngestionClientSync) from algoliasearch.ingestion.models import Action as IngestionAction @@ -59,6 +61,8 @@ class {{classname}}{{#isSyncClient}}Sync{{/isSyncClient}}: elif config is None: config = {{#lambda.pascalcase}}{{client}}Config{{/lambda.pascalcase}}(app_id, api_key{{#hasRegionalHost}}, region{{/hasRegionalHost}}) + config.set_default_hosts() + self._config = config self._request_options = RequestOptions(config) @@ -84,18 +88,25 @@ class {{classname}}{{#isSyncClient}}Sync{{/isSyncClient}}: if transporter is None: transporter = Transporter{{#isSyncClient}}Sync{{/isSyncClient}}(config) - client = {{classname}}{{#isSyncClient}}Sync{{/isSyncClient}}(app_id=config.app_id, api_key=config.api_key, {{#hasRegionalHost}}region=config.region, {{/hasRegionalHost}}transporter=transporter, config=config) - {{#isSearchClient}} + _ingestion_transporter: Optional[IngestionClient{{#isSyncClient}}Sync{{/isSyncClient}}] = None + if config.region is not None: ingestion_config = IngestionConfig(config.app_id, config.api_key, config.region) if config.hosts is not None: ingestion_config.hosts = config.hosts - client._ingestion_transporter = IngestionClient{{#isSyncClient}}Sync{{/isSyncClient}}.create_with_config(ingestion_config) + _ingestion_transporter = IngestionClient{{#isSyncClient}}Sync{{/isSyncClient}}.create_with_config(ingestion_config) {{/isSearchClient}} + client = {{classname}}{{#isSyncClient}}Sync{{/isSyncClient}}(app_id=config.app_id, api_key=config.api_key, {{#hasRegionalHost}}region=config.region, {{/hasRegionalHost}}transporter=transporter, config=config) + + {{#isSearchClient}} + if _ingestion_transporter is not None: + client._ingestion_transporter = _ingestion_transporter + {{/isSearchClient}} + return client {{^isSyncClient}} diff --git a/templates/python/config.mustache b/templates/python/config.mustache index d6fc19b8c9e..30f35268696 100644 --- a/templates/python/config.mustache +++ b/templates/python/config.mustache @@ -46,14 +46,30 @@ class {{#lambda.pascalcase}}{{client}}{{/lambda.pascalcase}}Config(BaseConfig): if https_proxy is not None: self.proxies["https"] = https_proxy + {{#hasRegionalHost}} self.region = region + {{/hasRegionalHost}} + + {{#isSearchClient}} + self.region = None + + def set_transformation_region(self, region: str = ""): + "This method is required to be called with the appropriate region of your Algolia application if you wish to leverage the *_with_transformation methods." + self.region = region + {{/isSearchClient}} + + def set_default_hosts(self): + if self.hosts is not None: + return + + {{#hasRegionalHost}} _regions = [{{#allowedRegions}}'{{.}}'{{^-last}},{{/-last}}{{/allowedRegions}}] - if {{^fallbackToAliasHost}}not region or {{/fallbackToAliasHost}}(region is not None and region not in _regions): + if {{^fallbackToAliasHost}}not self.region or {{/fallbackToAliasHost}}(self.region is not None and self.region not in _regions): raise ValueError(f"`region` {{^fallbackToAliasHost}}is required and {{/fallbackToAliasHost}}must be one of the following: {', '.join(_regions)}") - self.hosts = HostsCollection([Host({{#fallbackToAliasHost}}"{{{hostWithFallback}}}" if region is None else {{/fallbackToAliasHost}} "{{{regionalHost}}}".replace("{region}", region or ""))]) + self.hosts = HostsCollection([Host({{#fallbackToAliasHost}}"{{{hostWithFallback}}}" if self.region is None else {{/fallbackToAliasHost}} "{{{regionalHost}}}".replace("{region}", self.region or ""))]) {{/hasRegionalHost}} {{^hasRegionalHost}} @@ -76,12 +92,4 @@ class {{#lambda.pascalcase}}{{client}}{{/lambda.pascalcase}}Config(BaseConfig): reorder_hosts=True, {{/hostWithAppID}} ) - {{/hasRegionalHost}} - - {{#isSearchClient}} - self.region = None - - def set_transformation_region(self, region: str = ""): - "This method is required to be called with the appropriate region of your Algolia application if you wish to leverage the *_with_transformation methods." - self.region = region - {{/isSearchClient}} \ No newline at end of file + {{/hasRegionalHost}} \ No newline at end of file diff --git a/templates/python/ingestion_helpers.mustache b/templates/python/ingestion_helpers.mustache index 9ce9b4a4594..d2e9662d8ce 100644 --- a/templates/python/ingestion_helpers.mustache +++ b/templates/python/ingestion_helpers.mustache @@ -27,7 +27,7 @@ request_options=request_options, ) ) - requests = [] + records = [] if wait_for_tasks: for response in responses: {{^isSyncClient}}async {{/isSyncClient}}def _func(_: Optional[Event]) -> Event: @@ -61,4 +61,4 @@ error_validate=lambda _: _retry_count >= 50, error_message=lambda _: f"The maximum number of retries exceeded. (${_retry_count}/${50})", ) - return responses + return responses \ No newline at end of file diff --git a/templates/python/search_helpers.mustache b/templates/python/search_helpers.mustache index eb4a7b65382..635f0caaf4b 100644 --- a/templates/python/search_helpers.mustache +++ b/templates/python/search_helpers.mustache @@ -299,7 +299,7 @@ wait_for_tasks: bool = False, batch_size: int = 1000, request_options: Optional[Union[dict, RequestOptions]] = None, - ) -> List[WatchResponse]: + ) -> List[IngestionWatchResponse]: """ Helper: Similar to the `save_objects` method but requires a Push connector (https://www.algolia.com/doc/guides/sending-and-managing-data/send-and-update-your-data/connectors/push/) to be created first, in order to transform records before indexing them to Algolia. The `region` must've been passed to the client's config at instantiation. """ @@ -342,7 +342,7 @@ wait_for_tasks: bool = False, batch_size: int = 1000, request_options: Optional[Union[dict, RequestOptions]] = None, - ) -> List[WatchResponse]: + ) -> List[IngestionWatchResponse]: """ Helper: Similar to the `partial_update_objects` method but requires a Push connector (https://www.algolia.com/doc/guides/sending-and-managing-data/send-and-update-your-data/connectors/push/) to be created first, in order to transform records before indexing them to Algolia. The `region` must've been passed to the client instantiation method. """ @@ -382,6 +382,79 @@ ) return responses + {{^isSyncClient}}async {{/isSyncClient}}def replace_all_objects_with_transformation( + self, + index_name: str, + objects: List[Dict[str, Any]], + batch_size: int = 1000, + scopes = ["settings", "rules", "synonyms"], + request_options: Optional[Union[dict, RequestOptions]] = None, + ) -> ReplaceAllObjectsWithTransformationResponse: + """ + Helper: Similar to the `replaceAllObjects` method but requires a Push connector (https://www.algolia.com/doc/guides/sending-and-managing-data/send-and-update-your-data/connectors/push/) to be created first, in order to transform records before indexing them to Algolia. The `region` must have been passed to the client instantiation method. + + See https://api-clients-automation.netlify.app/docs/add-new-api-client#5-helpers for implementation details. + """ + if self._ingestion_transporter is None: + raise ValueError("`region` must be provided at client instantiation before calling this method.") + tmp_index_name = self.create_temporary_name(index_name) + + try: + {{^isSyncClient}}async {{/isSyncClient}}def _copy() -> UpdatedAtResponse: + return {{^isSyncClient}}await {{/isSyncClient}}self.operation_index( + index_name=index_name, + operation_index_params=OperationIndexParams( + operation=OperationType.COPY, + destination=tmp_index_name, + scope=scopes, + ), + request_options=request_options, + ) + + copy_operation_response = {{^isSyncClient}}await {{/isSyncClient}}_copy() + + watch_responses = {{^isSyncClient}}await {{/isSyncClient}}self._ingestion_transporter.chunked_push( + index_name=tmp_index_name, + objects=objects, + wait_for_tasks=True, + batch_size=batch_size, + reference_index_name=index_name, + request_options=request_options, + ) + + {{^isSyncClient}}await {{/isSyncClient}}self.wait_for_task( + index_name=tmp_index_name, task_id=copy_operation_response.task_id + ) + + copy_operation_response = {{^isSyncClient}}await {{/isSyncClient}}_copy() + {{^isSyncClient}}await {{/isSyncClient}}self.wait_for_task( + index_name=tmp_index_name, task_id=copy_operation_response.task_id + ) + + move_operation_response = {{^isSyncClient}}await {{/isSyncClient}}self.operation_index( + index_name=tmp_index_name, + operation_index_params=OperationIndexParams( + operation=OperationType.MOVE, + destination=index_name, + ), + request_options=request_options, + ) + {{^isSyncClient}}await {{/isSyncClient}}self.wait_for_task( + index_name=tmp_index_name, task_id=move_operation_response.task_id + ) + + search_watch_responses: List[WatchResponse] = [WatchResponse.model_validate(wr.model_dump()) for wr in watch_responses] + + return ReplaceAllObjectsWithTransformationResponse( + copy_operation_response=copy_operation_response, + watch_responses=search_watch_responses, + move_operation_response=move_operation_response, + ) + except Exception as e: + {{^isSyncClient}}await {{/isSyncClient}}self.delete_index(tmp_index_name) + + raise e + {{^isSyncClient}}async {{/isSyncClient}}def replace_all_objects( self, index_name: str,