diff --git a/playground/javascript/node/algoliasearch.ts b/playground/javascript/node/algoliasearch.ts index f61193fcb9d..2853bbaa08f 100644 --- a/playground/javascript/node/algoliasearch.ts +++ b/playground/javascript/node/algoliasearch.ts @@ -136,18 +136,24 @@ async function testAlgoliasearchBridgeIngestion() { // Init client with appId and apiKey const client = algoliasearch(appId, adminApiKey, { transformation: { region: 'eu' } }); - await client.saveObjectsWithTransformation({ - indexName: 'foo', - objects: [{ objectID: 'foo', data: { baz: 'baz', win: 42 } }], - waitForTasks: true, - }); - - await client.partialUpdateObjectsWithTransformation({ - indexName: 'foo', - objects: [{ objectID: 'foo', data: { baz: 'baz', win: 42 } }], - waitForTasks: true, - createIfNotExists: false, - }); + // console.log('saveObjectsWithTransformation', await client.saveObjectsWithTransformation({ + // indexName: 'foo', + // objects: [{ objectID: 'foo', data: { baz: 'baz', win: 42 } }], + // waitForTasks: true, + // })); + // + // console.log('partialUpdateObjectsWithTransformation', await client.partialUpdateObjectsWithTransformation({ + // indexName: 'foo', + // objects: [{ objectID: 'foo', data: { baz: 'baz', win: 42 } }], + // waitForTasks: true, + // createIfNotExists: false, + // })); + + console.log('replaceAllObjectsWithTransformation', await client.replaceAllObjectsWithTransformation({ + indexName: 'boyd', + objects: [{ objectID: 'foo', data: { baz: 'baz', win: 42 } }, { objectID: 'bar', data: { baz: 'baz', win: 24 } }], + batchSize: 2 + })); } // testAlgoliasearch(); diff --git a/scripts/cts/runCts.ts b/scripts/cts/runCts.ts index 43308b2a557..7885e47506f 100644 --- a/scripts/cts/runCts.ts +++ b/scripts/cts/runCts.ts @@ -13,6 +13,7 @@ import { assertPushMockValid } from './testServer/pushMock.ts'; import { assertValidReplaceAllObjects } from './testServer/replaceAllObjects.ts'; import { assertValidReplaceAllObjectsFailed } from './testServer/replaceAllObjectsFailed.ts'; import { assertValidReplaceAllObjectsScopes } from './testServer/replaceAllObjectsScopes.ts'; +import { assertValidReplaceAllObjectsWithTransformation } from './testServer/replaceAllObjectsWithTransformation.ts'; import { assertValidTimeouts } from './testServer/timeout.ts'; import { assertValidWaitForApiKey } from './testServer/waitFor.ts'; @@ -154,6 +155,7 @@ export async function runCts( assertValidTimeouts(languages.length); assertChunkWrapperValid(languages.length - skip('dart')); assertValidReplaceAllObjects(languages.length - skip('dart')); + assertValidReplaceAllObjectsWithTransformation(only('javascript')); assertValidAccountCopyIndex(only('javascript')); assertValidReplaceAllObjectsFailed(languages.length - skip('dart')); assertValidReplaceAllObjectsScopes(languages.length - skip('dart')); diff --git a/scripts/cts/testServer/index.ts b/scripts/cts/testServer/index.ts index 7bfe72eecba..448c69d630d 100644 --- a/scripts/cts/testServer/index.ts +++ b/scripts/cts/testServer/index.ts @@ -17,6 +17,7 @@ import { pushMockServer } from './pushMock.ts'; import { replaceAllObjectsServer } from './replaceAllObjects.ts'; import { replaceAllObjectsServerFailed } from './replaceAllObjectsFailed.ts'; import { replaceAllObjectsScopesServer } from './replaceAllObjectsScopes.ts'; +import { replaceAllObjectsWithTransformationServer } from './replaceAllObjectsWithTransformation.ts'; import { timeoutServer } from './timeout.ts'; import { timeoutServerBis } from './timeoutBis.ts'; import { waitForApiKeyServer } from './waitFor.ts'; @@ -37,6 +38,7 @@ export async function startTestServer(suites: Record): Promise apiKeyServer(), algoliaMockServer(), pushMockServer(), + replaceAllObjectsWithTransformationServer(), ); } if (suites.benchmark) { diff --git a/scripts/cts/testServer/replaceAllObjectsWithTransformation.ts b/scripts/cts/testServer/replaceAllObjectsWithTransformation.ts new file mode 100644 index 00000000000..3a57e129f21 --- /dev/null +++ b/scripts/cts/testServer/replaceAllObjectsWithTransformation.ts @@ -0,0 +1,132 @@ +import type { Server } from 'http'; + +import { expect } from 'chai'; +import type { Express } from 'express'; +import express from 'express'; + +import { setupServer } from './index.ts'; + +const raowtState: Record< + string, + { + copyCount: number; + pushCount: number; + tmpIndexName: string; + waitTaskCount: number; + waitingForFinalWaitTask: boolean; + successful: boolean; + } +> = {}; + +export function assertValidReplaceAllObjectsWithTransformation(expectedCount: number): void { + expect(Object.keys(raowtState)).to.have.length(expectedCount); + for (const lang in raowtState) { + expect(raowtState[lang].successful).to.equal(true); + } +} + +function addRoutes(app: Express): void { + app.use(express.urlencoded({ extended: true })); + app.use( + express.json({ + type: ['application/json', 'text/plain'], // the js client sends the body as text/plain + }), + ); + + app.post('/1/indexes/:indexName/operation', (req, res) => { + expect(req.params.indexName).to.match(/^cts_e2e_replace_all_objects_with_transformation_(.*)$/); + + switch (req.body.operation) { + case 'copy': { + expect(req.params.indexName).to.not.include('tmp'); + expect(req.body.destination).to.include('tmp'); + expect(req.body.scope).to.deep.equal(['settings', 'rules', 'synonyms']); + + const lang = req.params.indexName.replace('cts_e2e_replace_all_objects_with_transformation_', ''); + if (!raowtState[lang] || raowtState[lang].successful) { + raowtState[lang] = { + copyCount: 1, + pushCount: 0, + waitTaskCount: 0, + tmpIndexName: req.body.destination, + waitingForFinalWaitTask: false, + successful: false, + }; + } else { + raowtState[lang].copyCount++; + } + + res.json({ taskID: 123 + raowtState[lang].copyCount, updatedAt: '2021-01-01T00:00:00.000Z' }); + break; + } + case 'move': { + const lang = req.body.destination.replace('cts_e2e_replace_all_objects_with_transformation_', ''); + expect(raowtState).to.include.keys(lang); + expect(raowtState[lang]).to.deep.equal({ + copyCount: 2, + pushCount: 10, + waitTaskCount: 2, + tmpIndexName: req.params.indexName, + waitingForFinalWaitTask: false, + successful: false, + }); + + expect(req.body.scope).to.equal(undefined); + + raowtState[lang].waitingForFinalWaitTask = true; + + res.json({ taskID: 777, updatedAt: '2021-01-01T00:00:00.000Z' }); + + break; + } + default: + res.status(400).json({ + message: `invalid operation: ${req.body.operation}, body: ${JSON.stringify(req.body)}`, + }); + } + }); + + app.post('/1/push/:indexName', (req, res) => { + const lang = req.params.indexName.match( + /^cts_e2e_replace_all_objects_with_transformation_(.*)_tmp_\d+$/, + )?.[1] as string; + expect(raowtState).to.include.keys(lang); + expect(req.body.action === 'addObject').to.equal(true); + + raowtState[lang].pushCount += req.body.records.length; + + res.json({ + runID: 'b1b7a982-524c-40d2-bb7f-48aab075abda', + eventID: `113b2068-6337-4c85-b5c2-e7b213d8292${raowtState[lang].pushCount}`, + message: 'OK', + createdAt: '2022-05-12T06:24:30.049Z', + }); + }); + + app.get('/1/runs/:runID/events/:eventID', (req, res) => { + res.json({ status: 'finished' }); + }); + + app.get('/1/indexes/:indexName/task/:taskID', (req, res) => { + const lang = req.params.indexName.match( + /^cts_e2e_replace_all_objects_with_transformation_(.*)_tmp_\d+$/, + )?.[1] as string; + expect(raowtState).to.include.keys(lang); + + raowtState[lang].waitTaskCount++; + if (raowtState[lang].waitingForFinalWaitTask) { + expect(req.params.taskID).to.equal('777'); + expect(raowtState[lang].waitTaskCount).to.equal(3); + + raowtState[lang].successful = true; + } + + res.json({ status: 'published', updatedAt: '2021-01-01T00:00:00.000Z' }); + }); +} + +export function replaceAllObjectsWithTransformationServer(): Promise { + // this server is used to simulate the responses for the replaceAllObjectsWithTransformationServer method, + // and uses a state machine to determine if the logic is correct. + return setupServer('replaceAllObjectsWithTransformationServer', 6690, addRoutes); +} diff --git a/specs/search/helpers/chunkedPush.yml b/specs/search/helpers/chunkedPush.yml new file mode 100644 index 00000000000..b0a3c92f357 --- /dev/null +++ b/specs/search/helpers/chunkedPush.yml @@ -0,0 +1,55 @@ +method: + get: + x-helper: true + tags: + - Records + x-available-languages: + - javascript + operationId: chunkedPush + summary: Replace all records in an index + description: | + Helper: Chunks the given `objects` list in subset of 1000 elements max in order to make it fit in `push` requests by leveraging the Transformation pipeline setup in the Push connector (https://www.algolia.com/doc/guides/sending-and-managing-data/send-and-update-your-data/connectors/push/). + parameters: + - in: query + name: indexName + description: The `indexName` to replace `objects` in. + required: true + schema: + type: string + - in: query + name: objects + description: List of objects to replace the current objects with. + required: true + schema: + type: array + items: + type: object + - in: query + name: action + description: The `batch` `action` to perform on the given array of `objects`, defaults to `addObject`. + required: false + schema: + $ref: '../../common/schemas/Batch.yml#/action' + - in: query + name: waitForTasks + description: Whether or not we should wait until every `batch` tasks has been processed, this operation may slow the total execution time of this method but is more reliable. + required: false + schema: + type: boolean + - in: query + name: batchSize + description: The size of the chunk of `objects`. The number of `batch` calls will be equal to `length(objects) / batchSize`. Defaults to 1000. + required: false + schema: + type: integer + responses: + '200': + description: OK + content: + application/json: + schema: + type: array + items: + $ref: '../../common/schemas/ingestion/WatchResponse.yml' + '400': + $ref: '../../common/responses/IndexNotFound.yml' diff --git a/specs/search/helpers/replaceAllObjectsWithTransformation.yml b/specs/search/helpers/replaceAllObjectsWithTransformation.yml new file mode 100644 index 00000000000..2521366622a --- /dev/null +++ b/specs/search/helpers/replaceAllObjectsWithTransformation.yml @@ -0,0 +1,82 @@ +method: + get: + x-helper: true + tags: + - Records + x-available-languages: + - javascript + operationId: replaceAllObjectsWithTransformation + summary: Replace all records in an index + description: | + Replace all records from your index with a new set of records by leveraging the Transformation pipeline setup in the Push connector (https://www.algolia.com/doc/guides/sending-and-managing-data/send-and-update-your-data/connectors/push/). + + This method lets you replace all records in your index without downtime. It performs these operations: + 1. Copy settings, synonyms, and rules from your original index to a temporary index. + 2. Add your new records to the temporary index. + 3. Replace your original index with the temporary index. + + Use the safe parameter to ensure that these (asynchronous) operations are performed in sequence. + If there's an error duing one of these steps, the temporary index won't be deleted. + This operation is rate-limited. + This method creates a temporary index: your record count is temporarily doubled. Algolia doesn't count the three days with the highest number of records towards your monthly usage. + If you're on a legacy plan (before July 2020), this method counts two operations towards your usage (in addition to the number of records): copySettings and moveIndex. + The API key you use for this operation must have access to the index YourIndex and the temporary index YourIndex_tmp. + parameters: + - in: query + name: indexName + description: The `indexName` to replace `objects` in. + required: true + schema: + type: string + - in: query + name: objects + description: List of objects to replace the current objects with. + required: true + schema: + type: array + items: + type: object + - in: query + name: batchSize + description: The size of the chunk of `objects`. The number of `batch` calls will be equal to `length(objects) / batchSize`. Defaults to 1000. + required: false + schema: + type: integer + default: 1000 + - in: query + name: scopes + description: List of scopes to kepp in the index. Defaults to `settings`, `synonyms`, and `rules`. + required: false + schema: + type: array + items: + $ref: '../common/enums.yml#/scopeType' + responses: + '200': + description: OK + content: + application/json: + schema: + $ref: '#/replaceAllObjectsWithTransformationResponse' + '400': + $ref: '../../common/responses/IndexNotFound.yml' + +replaceAllObjectsWithTransformationResponse: + type: object + additionalProperties: false + properties: + copyOperationResponse: + description: The response of the `operationIndex` request for the `copy` operation. + $ref: '../../common/responses/common.yml#/updatedAtResponse' + watchResponses: + type: array + description: The response of the `push` request(s). + items: + $ref: '../../common/schemas/ingestion/WatchResponse.yml' + moveOperationResponse: + description: The response of the `operationIndex` request for the `move` operation. + $ref: '../../common/responses/common.yml#/updatedAtResponse' + required: + - copyOperationResponse + - watchResponses + - moveOperationResponse diff --git a/specs/search/spec.yml b/specs/search/spec.yml index 0d861f445bc..dfa93fbcaa7 100644 --- a/specs/search/spec.yml +++ b/specs/search/spec.yml @@ -378,6 +378,9 @@ paths: /replaceAllObjects: $ref: 'helpers/replaceAllObjects.yml#/method' + /replaceAllObjectsWithTransformation: + $ref: 'helpers/replaceAllObjectsWithTransformation.yml#/method' + /chunkedBatch: $ref: 'helpers/chunkedBatch.yml#/method' diff --git a/templates/javascript/clients/algoliasearch/builds/definition.mustache b/templates/javascript/clients/algoliasearch/builds/definition.mustache index e3623f407db..d15b7ec5f57 100644 --- a/templates/javascript/clients/algoliasearch/builds/definition.mustache +++ b/templates/javascript/clients/algoliasearch/builds/definition.mustache @@ -1,12 +1,14 @@ // {{{generationBanner}}} -import type { ClientOptions, RequestOptions } from '@algolia/client-common'; +import { createIterablePromise } from '@algolia/client-common'; +import type { ApiError, ClientOptions, RequestOptions } from '@algolia/client-common'; {{#dependencies}} import { {{{dependencyName}}}Client } from '{{{dependencyPackage}}}'; import type { {{#lambda.titlecase}}{{{dependencyName}}}{{/lambda.titlecase}}Client } from '{{{dependencyPackage}}}'; {{/dependencies}} +import type { ChunkedBatchOptions, ReplaceAllObjectsOptions, ReplaceAllObjectsWithTransformationResponse } from '@algolia/client-search'; import type { PartialUpdateObjectsOptions, SaveObjectsOptions } from '@algolia/client-search'; import type { PushTaskRecords, WatchResponse } from '@algolia/ingestion'; @@ -57,6 +59,36 @@ export type Algoliasearch = SearchClient & { * @param requestOptions - The requestOptions to send along with the query, they will be forwarded to the `getTask` method and merged with the transporter requestOptions. */ partialUpdateObjectsWithTransformation: (options: PartialUpdateObjectsOptions, requestOptions?: RequestOptions | undefined) => Promise; + + /** + * Helper: Similar to the `replaceAllObjects` method but requires a Push connector (https://www.algolia.com/doc/guides/sending-and-managing-data/send-and-update-your-data/connectors/push/) to be created first, in order to transform records before indexing them to Algolia. The `region` must have been passed to the client instantiation method. + * + * @summary Helper: Replaces all objects (records) in the given `index_name` by leveraging the Transformation pipeline setup in the Push connector (https://www.algolia.com/doc/guides/sending-and-managing-data/send-and-update-your-data/connectors/push/) with the given `objects`. A temporary index is created during this process in order to backup your data. + * @param replaceAllObjects - The `replaceAllObjects` object. + * @param replaceAllObjects.indexName - The `indexName` to replace `objects` in. + * @param replaceAllObjects.objects - The array of `objects` to store in the given Algolia `indexName`. + * @param replaceAllObjects.batchSize - The size of the chunk of `objects`. The number of `batch` calls will be equal to `objects.length / batchSize`. Defaults to 1000. + * @param replaceAllObjects.scopes - The `scopes` to keep from the index. Defaults to ['settings', 'rules', 'synonyms']. + * @param requestOptions - The requestOptions to send along with the query, they will be forwarded to the `push`, `operationIndex` and `getEvent` method and merged with the transporter requestOptions. + */ + replaceAllObjectsWithTransformation: ( + options: ReplaceAllObjectsOptions, + requestOptions?: RequestOptions | undefined, + ) => Promise; + + /** + * Helper: Chunks the given `objects` list in subset of 1000 elements max in order to make it fit in `push` requests by leveraging the Transformation pipeline setup in the Push connector (https://www.algolia.com/doc/guides/sending-and-managing-data/send-and-update-your-data/connectors/push/). + * + * @summary Helper: Chunks the given `objects` list in subset of 1000 elements max in order to make it fit in `batch` requests. + * @param chunkedPush - The `chunkedPush` object. + * @param chunkedPush.indexName - The `indexName` to replace `objects` in. + * @param chunkedPush.objects - The array of `objects` to store in the given Algolia `indexName`. + * @param chunkedPush.action - The `batch` `action` to perform on the given array of `objects`, defaults to `addObject`. + * @param chunkedPush.waitForTasks - Whether or not we should wait until every `batch` tasks has been processed, this operation may slow the total execution time of this method but is more reliable. + * @param chunkedPush.batchSize - The size of the chunk of `objects`. The number of `batch` calls will be equal to `length(objects) / batchSize`. Defaults to 1000. + * @param requestOptions - The requestOptions to send along with the query, they will be forwarded to the `getEvent` method and merged with the transporter requestOptions. + */ + chunkedPush: (options: ChunkedBatchOptions, requestOptions?: RequestOptions) => Promise>; }; export type TransformationOptions = { @@ -142,6 +174,149 @@ export function algoliasearch( ); }, + async chunkedPush( + { indexName, objects, action = 'addObject', waitForTasks, batchSize = 1000 }: ChunkedBatchOptions, + requestOptions?: RequestOptions, + ): Promise> { + if (!ingestionTransporter) { + throw new Error('`transformation.region` must be provided at client instantiation before calling this method.'); + } + + if (!options?.transformation?.region) { + throw new Error('`region` must be provided when leveraging the transformation pipeline'); + } + + let records: Array = []; + const responses: Array = []; + + const objectEntries = objects.entries(); + for (const [i, obj] of objectEntries) { + records.push(obj as PushTaskRecords); + if (records.length === batchSize || i === objects.length - 1) { + responses.push( + await ingestionTransporter.push( + { indexName, pushTaskPayload: { action, records }, watch: waitForTasks }, + requestOptions, + ), + ); + records = []; + } + } + + let retryCount = 0; + + if (waitForTasks) { + for (const resp of responses) { + if (!resp.eventID) { + throw new Error('received unexpected response from the push endpoint, eventID must not be undefined'); + } + + await createIterablePromise({ + func: async () => { + if (resp.eventID === undefined || !resp.eventID) { + throw new Error('received unexpected response from the push endpoint, eventID must not be undefined'); + } + + return ingestionTransporter.getEvent({ runID: resp.runID, eventID: resp.eventID }).catch((error: ApiError) => { + if (error.status === 404) { + return undefined; + } + + throw error; + }) + }, + validate: (response) => response !== undefined, + aggregator: () => (retryCount += 1), + error: { + validate: () => retryCount >= 50, + message: () => `The maximum number of retries exceeded. (${retryCount}/${50})`, + }, + timeout: (): number => Math.min(retryCount * 500, 5000), + }); + } + } + + return responses; + }, + + async replaceAllObjectsWithTransformation( + { indexName, objects, batchSize, scopes }: ReplaceAllObjectsOptions, + requestOptions?: RequestOptions | undefined, + ): Promise { + if (!ingestionTransporter) { + throw new Error('`transformation.region` must be provided at client instantiation before calling this method.'); + } + + if (!options?.transformation?.region) { + throw new Error('`region` must be provided when leveraging the transformation pipeline'); + } + + const randomSuffix = Math.floor(Math.random() * 1000000) + 100000; + const tmpIndexName = `${indexName}_tmp_${randomSuffix}`; + + if (scopes === undefined) { + scopes = ['settings', 'rules', 'synonyms']; + } + + try { + let copyOperationResponse = await this.operationIndex( + { + indexName, + operationIndexParams: { + operation: 'copy', + destination: tmpIndexName, + scope: scopes, + }, + }, + requestOptions, + ); + + const watchResponses = await this.chunkedPush( + { indexName: tmpIndexName, objects, waitForTasks: true, batchSize }, + requestOptions, + ); + + await this.waitForTask({ + indexName: tmpIndexName, + taskID: copyOperationResponse.taskID, + }); + + copyOperationResponse = await this.operationIndex( + { + indexName, + operationIndexParams: { + operation: 'copy', + destination: tmpIndexName, + scope: scopes, + }, + }, + requestOptions, + ); + await this.waitForTask({ + indexName: tmpIndexName, + taskID: copyOperationResponse.taskID, + }); + + const moveOperationResponse = await this.operationIndex( + { + indexName: tmpIndexName, + operationIndexParams: { operation: 'move', destination: indexName }, + }, + requestOptions, + ); + await this.waitForTask({ + indexName: tmpIndexName, + taskID: moveOperationResponse.taskID, + }); + + return { copyOperationResponse, watchResponses, moveOperationResponse }; + } catch (error) { + await this.deleteIndex({ indexName: tmpIndexName }); + + throw error; + } + }, + /** * Get the value of the `algoliaAgent`, used by our libraries internally and telemetry system. */ diff --git a/tests/CTS/client/search/replaceAllObjectsWithTransformation.json b/tests/CTS/client/search/replaceAllObjectsWithTransformation.json new file mode 100644 index 00000000000..b28ee420dac --- /dev/null +++ b/tests/CTS/client/search/replaceAllObjectsWithTransformation.json @@ -0,0 +1,110 @@ +[ + { + "testName": "call replaceAllObjectsWithTransformation without error", + "autoCreateClient": false, + "steps": [ + { + "type": "createClient", + "parameters": { + "appId": "test-app-id", + "apiKey": "test-api-key", + "customHosts": [ + { + "port": 6690 + } + ], + "transformationRegion": "us" + } + }, + { + "type": "method", + "method": "replaceAllObjectsWithTransformation", + "parameters": { + "indexName": "cts_e2e_replace_all_objects_with_transformation_${{language}}", + "objects": [ + { + "objectID": "1", + "name": "Adam" + }, + { + "objectID": "2", + "name": "Benoit" + }, + { + "objectID": "3", + "name": "Cyril" + }, + { + "objectID": "4", + "name": "David" + }, + { + "objectID": "5", + "name": "Eva" + }, + { + "objectID": "6", + "name": "Fiona" + }, + { + "objectID": "7", + "name": "Gael" + }, + { + "objectID": "8", + "name": "Hugo" + }, + { + "objectID": "9", + "name": "Igor" + }, + { + "objectID": "10", + "name": "Julia" + } + ], + "batchSize": 3 + }, + "expected": { + "type": "response", + "match": { + "copyOperationResponse": { + "taskID": 125, + "updatedAt": "2021-01-01T00:00:00.000Z" + }, + "watchResponses": [ + { + "runID": "b1b7a982-524c-40d2-bb7f-48aab075abda", + "eventID": "113b2068-6337-4c85-b5c2-e7b213d82923", + "message": "OK", + "createdAt": "2022-05-12T06:24:30.049Z" + }, + { + "runID": "b1b7a982-524c-40d2-bb7f-48aab075abda", + "eventID": "113b2068-6337-4c85-b5c2-e7b213d82926", + "message": "OK", + "createdAt": "2022-05-12T06:24:30.049Z" + }, + { + "runID": "b1b7a982-524c-40d2-bb7f-48aab075abda", + "eventID": "113b2068-6337-4c85-b5c2-e7b213d82929", + "message": "OK", + "createdAt": "2022-05-12T06:24:30.049Z" + }, + { + "runID": "b1b7a982-524c-40d2-bb7f-48aab075abda", + "eventID": "113b2068-6337-4c85-b5c2-e7b213d829210", + "message": "OK", + "createdAt": "2022-05-12T06:24:30.049Z" + } + ], + "moveOperationResponse": { + "taskID": 777, + "updatedAt": "2021-01-01T00:00:00.000Z" + } + } + } + } + ] + } +]