@@ -8,7 +8,7 @@ import { {{{dependencyName}}}Client } from '{{{dependencyPackage}}}';
88import type { {{#lambda.titlecase} }{ {{dependencyName} }}{ {/lambda.titlecase} }Client } from '{ {{dependencyPackage} }}';
99{ {/dependencies} }
1010
11- import type { ChunkedBatchOptions, ReplaceAllObjectsOptions, ReplaceAllObjectsWithTransformationResponse } from '@algolia/client-search';
11+ import type { ReplaceAllObjectsOptions, ReplaceAllObjectsWithTransformationResponse } from '@algolia/client-search';
1212import type { PartialUpdateObjectsOptions, SaveObjectsOptions } from '@algolia/client-search';
1313import type { PushTaskRecords, WatchResponse } from '@algolia/ingestion';
1414
@@ -75,20 +75,6 @@ export type Algoliasearch = SearchClient & {
7575 options: ReplaceAllObjectsOptions,
7676 requestOptions?: RequestOptions | undefined,
7777 ) => Promise<ReplaceAllObjectsWithTransformationResponse >;
78-
79- /**
80- * Helper: Chunks the given `objects` list in subset of 1000 elements max in order to make it fit in `push` requests by leveraging the Transformation pipeline setup in the Push connector (https://www.algolia.com/doc/guides/sending-and-managing-data/send-and-update-your-data/connectors/push/).
81- *
82- * @summary Helper: Chunks the given `objects` list in subset of 1000 elements max in order to make it fit in `batch` requests.
83- * @param chunkedPush - The `chunkedPush` object.
84- * @param chunkedPush.indexName - The `indexName` to replace `objects` in.
85- * @param chunkedPush.objects - The array of `objects` to store in the given Algolia `indexName`.
86- * @param chunkedPush.action - The `batch` `action` to perform on the given array of `objects`, defaults to `addObject`.
87- * @param chunkedPush.waitForTasks - Whether or not we should wait until every `batch` tasks has been processed, this operation may slow the total execution time of this method but is more reliable.
88- * @param chunkedPush.batchSize - The size of the chunk of `objects`. The number of `batch` calls will be equal to `length(objects) / batchSize`. Defaults to 1000.
89- * @param requestOptions - The requestOptions to send along with the query, they will be forwarded to the `getTask` method and merged with the transporter requestOptions.
90- */
91- chunkedPush: (options: ChunkedBatchOptions, requestOptions?: RequestOptions) => Promise<Array <WatchResponse >>;
9278};
9379
9480export type TransformationOptions = {
@@ -174,71 +160,6 @@ export function algoliasearch(
174160 );
175161 },
176162
177- async chunkedPush(
178- { indexName, objects, action = ' addObject' , waitForTasks, batchSize = 1000 } : ChunkedBatchOptions,
179- requestOptions?: RequestOptions,
180- ): Promise<Array <WatchResponse >> {
181- if (! ingestionTransporter) {
182- throw new Error(' `transformation.region` must be provided at client instantiation before calling this method.' );
183- }
184-
185- if (!options?.transformation?.region) {
186- throw new Error(' `region` must be provided when leveraging the transformation pipeline' );
187- }
188-
189- let records: Array<PushTaskRecords > = [];
190- const responses: Array<WatchResponse > = [];
191-
192- const objectEntries = objects.entries();
193- for (const [i, obj] of objectEntries) {
194- records.push(obj as PushTaskRecords);
195- if (records.length === batchSize || i === objects.length - 1) {
196- responses.push(
197- await ingestionTransporter.push(
198- { indexName, pushTaskPayload: { action, records } , watch: waitForTasks },
199- requestOptions,
200- ),
201- );
202- records = [];
203- }
204- }
205-
206- let retryCount = 0;
207-
208- if (waitForTasks) {
209- for (const resp of responses) {
210- if (resp.eventID === undefined || ! resp.eventID) {
211- throw new Error(' received unexpected response from the push endpoint, eventID must not be undefined' );
212- }
213-
214- await createIterablePromise({
215- func: async () => {
216- if (resp.eventID === undefined || ! resp.eventID) {
217- throw new Error(' received unexpected response from the push endpoint, eventID must not be undefined' );
218- }
219-
220- return ingestionTransporter.getEvent({ runID: resp.runID, eventID: resp.eventID } ).catch((error: ApiError) => {
221- if (error.status === 404) {
222- return undefined;
223- }
224-
225- throw error;
226- })
227- },
228- validate: (response) => response !== undefined,
229- aggregator: () => (retryCount += 1),
230- error: {
231- validate: () => retryCount >= 50,
232- message: () => `The maximum number of retries exceeded. (${retryCount} /${ 50} )`,
233- },
234- timeout: (): number => Math.min(retryCount * 200, 5000),
235- });
236- }
237- }
238-
239- return responses;
240- },
241-
242163 async replaceAllObjectsWithTransformation(
243164 { indexName, objects, batchSize, scopes } : ReplaceAllObjectsOptions,
244165 requestOptions?: RequestOptions | undefined,
@@ -271,10 +192,59 @@ export function algoliasearch(
271192 requestOptions,
272193 );
273194
274- const watchResponses = await this.chunkedPush(
275- { indexName: tmpIndexName, objects, waitForTasks: true , batchSize } ,
276- requestOptions,
277- );
195+ let records: Array<PushTaskRecords > = [];
196+ const watchResponses: Array<WatchResponse > = [];
197+
198+ const objectEntries = objects.entries();
199+ for (const [i, obj] of objectEntries) {
200+ records.push(obj as PushTaskRecords);
201+ if (records.length === batchSize || i === objects.length - 1) {
202+ watchResponses.push(
203+ await ingestionTransporter.push(
204+ {
205+ indexName: tmpIndexName,
206+ pushTaskPayload: { action: ' addObject' , records } ,
207+ referenceIndexName: indexName,
208+ },
209+ requestOptions,
210+ ),
211+ );
212+ records = [];
213+ }
214+ }
215+
216+ let retryCount = 0;
217+
218+ for (const resp of watchResponses) {
219+ if (resp.eventID === undefined || ! resp.eventID) {
220+ throw new Error(' received unexpected response from the push endpoint, eventID must not be undefined' );
221+ }
222+
223+ await createIterablePromise({
224+ func: async () => {
225+ if (resp.eventID === undefined || ! resp.eventID) {
226+ throw new Error(' received unexpected response from the push endpoint, eventID must not be undefined' );
227+ }
228+
229+ return ingestionTransporter
230+ .getEvent({ runID: resp.runID, eventID: resp.eventID } )
231+ .catch((error: ApiError) => {
232+ if (error.status === 404) {
233+ return undefined;
234+ }
235+
236+ throw error;
237+ });
238+ },
239+ validate: (response) => response !== undefined,
240+ aggregator: () => (retryCount += 1),
241+ error: {
242+ validate: () => retryCount >= 50,
243+ message: () => `The maximum number of retries exceeded. (${retryCount} /${ 50} )`,
244+ },
245+ timeout: (): number => Math.min(retryCount * 200, 5000),
246+ });
247+ }
278248
279249 await this.waitForTask({
280250 indexName: tmpIndexName,
0 commit comments