Skip to content

Commit 6eef87e

Browse files
committed
feat: java(script)
1 parent 2ebd1f9 commit 6eef87e

File tree

8 files changed

+244
-184
lines changed

8 files changed

+244
-184
lines changed
Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -5,6 +5,10 @@ method:
55
- Records
66
x-available-languages:
77
- javascript
8+
- go
9+
- java
10+
- php
11+
- python
812
operationId: chunkedPush
913
summary: Replace all records in an index
1014
description: |

specs/ingestion/spec.yml

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -202,3 +202,6 @@ paths:
202202
# ###############
203203
/setClientApiKey:
204204
$ref: '../common/helpers/setClientApiKey.yml#/method'
205+
206+
/chunkedPush:
207+
$ref: 'helpers/chunkedPush.yml#/method'

templates/java/api_helpers.mustache

Lines changed: 112 additions & 107 deletions
Original file line numberDiff line numberDiff line change
@@ -1,3 +1,106 @@
1+
{{#isIngestionClient}}
2+
private <T> List<PushTaskRecords> objectsToPushTaskRecords(Iterable<T> objects) {
3+
try {
4+
ObjectMapper mapper = new ObjectMapper();
5+
String json = mapper.writeValueAsString(objects);
6+
7+
return mapper.readValue(json, new TypeReference<List<PushTaskRecords>>() {});
8+
} catch (Exception e) {
9+
throw new AlgoliaRuntimeException(
10+
"each object must have an `objectID` key in order to be indexed"
11+
);
12+
}
13+
}
14+
15+
/**
16+
* Helper: Chunks the given `objects` list in subset of 1000 elements max in order to make it fit
17+
* in `push` requests by leveraging the Transformation pipeline setup in the Push connector
18+
* (https://www.algolia.com/doc/guides/sending-and-managing-data/send-and-update-your-data/connectors/push/).
19+
*
20+
* @summary Helper: Chunks the given `objects` list in subset of 1000 elements max in order to
21+
* make it fit in `batch` requests.
22+
* @param indexName - The `indexName` to replace `objects` in.
23+
* @param objects - The array of `objects` to store in the given Algolia `indexName`.
24+
* @param action - The `batch` `action` to perform on the given array of `objects`.
25+
* @param waitForTasks - Whether or not we should wait until every `batch` tasks has been
26+
* processed, this operation may slow the total execution time of this method but is more
27+
* reliable.
28+
* @param batchSize - The size of the chunk of `objects`. The number of `batch` calls will be
29+
* equal to `length(objects) / batchSize`. Defaults to 1000.
30+
* @param referenceIndexName - This is required when targeting an index that does not have a push
31+
* connector setup (e.g. a tmp index), but you wish to attach another index's transformation
32+
* to it (e.g. the source index name).
33+
* @param requestOptions - The requestOptions to send along with the query, they will be forwarded
34+
* to the `getTask` method and merged with the transporter requestOptions.
35+
*/
36+
public <T> List<WatchResponse> chunkedPush(
37+
String indexName,
38+
Iterable<T> objects,
39+
Action action,
40+
boolean waitForTasks,
41+
int batchSize,
42+
String referenceIndexName,
43+
RequestOptions requestOptions
44+
) {
45+
List<WatchResponse> responses = new ArrayList<>();
46+
List<T> records = new ArrayList<>();
47+
48+
for (T item : objects) {
49+
if (records.size() == batchSize) {
50+
WatchResponse watch =
51+
this.push(
52+
indexName,
53+
new PushTaskPayload().setAction(action).setRecords(this.objectsToPushTaskRecords(records)),
54+
waitForTasks,
55+
referenceIndexName,
56+
requestOptions
57+
);
58+
responses.add(watch);
59+
records.clear();
60+
}
61+
62+
records.add(item);
63+
}
64+
65+
if (records.size() > 0) {
66+
WatchResponse watch =
67+
this.push(
68+
indexName,
69+
new PushTaskPayload().setAction(action).setRecords(this.objectsToPushTaskRecords(records)),
70+
waitForTasks,
71+
referenceIndexName,
72+
requestOptions
73+
);
74+
responses.add(watch);
75+
}
76+
77+
if (waitForTasks) {
78+
responses.forEach(response -> {
79+
TaskUtils.retryUntil(
80+
() -> {
81+
try {
82+
return this.getEvent(response.getRunID(), response.getEventID());
83+
} catch (AlgoliaApiException e) {
84+
if (e.getStatusCode() == 404) {
85+
return null;
86+
}
87+
88+
throw e;
89+
}
90+
},
91+
(Event resp) -> {
92+
return resp != null;
93+
},
94+
50,
95+
null
96+
);
97+
}
98+
);
99+
}
100+
101+
return responses;
102+
}
103+
{{/isIngestionClient}}
1104
{{#isSearchClient}}
2105
/**
3106
* Helper: Wait for a task to complete with `indexName` and `taskID`.
@@ -585,99 +688,6 @@ public CompletableFuture<List<SearchForFacetValuesResponse>> searchForFacetsAsyn
585688
);
586689
}
587690
588-
/**
589-
* Helper: Chunks the given `objects` list in subset of 1000 elements max in order to make it fit
590-
* in `push` requests by leveraging the Transformation pipeline setup in the Push connector
591-
* (https://www.algolia.com/doc/guides/sending-and-managing-data/send-and-update-your-data/connectors/push/).
592-
*
593-
* @summary Helper: Chunks the given `objects` list in subset of 1000 elements max in order to
594-
* make it fit in `batch` requests.
595-
* @param indexName - The `indexName` to replace `objects` in.
596-
* @param objects - The array of `objects` to store in the given Algolia `indexName`.
597-
* @param action - The `batch` `action` to perform on the given array of `objects`.
598-
* @param waitForTasks - Whether or not we should wait until every `batch` tasks has been
599-
* processed, this operation may slow the total execution time of this method but is more
600-
* reliable.
601-
* @param batchSize - The size of the chunk of `objects`. The number of `batch` calls will be
602-
* equal to `length(objects) / batchSize`. Defaults to 1000.
603-
* @param referenceIndexName - This is required when targeting an index that does not have a push
604-
* connector setup (e.g. a tmp index), but you wish to attach another index's transformation
605-
* to it (e.g. the source index name).
606-
* @param requestOptions - The requestOptions to send along with the query, they will be forwarded
607-
* to the `getTask` method and merged with the transporter requestOptions.
608-
*/
609-
public <T> List<WatchResponse> chunkedPush(
610-
String indexName,
611-
Iterable<T> objects,
612-
com.algolia.model.ingestion.Action action,
613-
boolean waitForTasks,
614-
int batchSize,
615-
String referenceIndexName,
616-
RequestOptions requestOptions
617-
) {
618-
if (this.ingestionTransporter == null) {
619-
throw new AlgoliaRuntimeException("`setTransformationRegion` must have been called before calling this method.");
620-
}
621-
622-
List<WatchResponse> responses = new ArrayList<>();
623-
List<T> records = new ArrayList<>();
624-
625-
for (T item : objects) {
626-
if (records.size() == batchSize) {
627-
WatchResponse watch =
628-
this.ingestionTransporter.push(
629-
indexName,
630-
new PushTaskPayload().setAction(action).setRecords(this.objectsToPushTaskRecords(records)),
631-
waitForTasks,
632-
referenceIndexName,
633-
requestOptions
634-
);
635-
responses.add(watch);
636-
records.clear();
637-
}
638-
639-
records.add(item);
640-
}
641-
642-
if (records.size() > 0) {
643-
WatchResponse watch =
644-
this.ingestionTransporter.push(
645-
indexName,
646-
new PushTaskPayload().setAction(action).setRecords(this.objectsToPushTaskRecords(records)),
647-
waitForTasks,
648-
referenceIndexName,
649-
requestOptions
650-
);
651-
responses.add(watch);
652-
}
653-
654-
if (waitForTasks) {
655-
responses.forEach(response -> {
656-
TaskUtils.retryUntil(
657-
() -> {
658-
try {
659-
return this.ingestionTransporter.getEvent(response.getRunID(), response.getEventID());
660-
} catch (AlgoliaApiException e) {
661-
if (e.getStatusCode() == 404) {
662-
return null;
663-
}
664-
665-
throw e;
666-
}
667-
},
668-
(Event resp) -> {
669-
return resp != null;
670-
},
671-
50,
672-
null
673-
);
674-
}
675-
);
676-
}
677-
678-
return responses;
679-
}
680-
681691
/**
682692
* Helper: Chunks the given `objects` list in subset of 1000 elements max in order to make it fit
683693
* in `batch` requests.
@@ -823,20 +833,11 @@ public <T> List<WatchResponse> saveObjectsWithTransformation(
823833
int batchSize,
824834
RequestOptions requestOptions
825835
) {
826-
return chunkedPush(indexName, objects, com.algolia.model.ingestion.Action.ADD_OBJECT, waitForTasks, batchSize, null, requestOptions);
827-
}
828-
829-
private <T> List<PushTaskRecords> objectsToPushTaskRecords(Iterable<T> objects) {
830-
try {
831-
ObjectMapper mapper = new ObjectMapper();
832-
String json = mapper.writeValueAsString(objects);
836+
if (this.ingestionTransporter == null) {
837+
throw new AlgoliaRuntimeException("`setTransformationRegion` must have been called before calling this method.");
838+
}
833839
834-
return mapper.readValue(json, new TypeReference<List<PushTaskRecords>>() {});
835-
} catch (Exception e) {
836-
throw new AlgoliaRuntimeException(
837-
"each object must have an `objectID` key in order to be used with the" + " WithTransformation methods"
838-
);
839-
}
840+
return this.ingestionTransporter.chunkedPush(indexName, objects, com.algolia.model.ingestion.Action.ADD_OBJECT, waitForTasks, batchSize, null, requestOptions);
840841
}
841842
842843
/**
@@ -1053,7 +1054,11 @@ public <T> List<WatchResponse> partialUpdateObjectsWithTransformation(
10531054
int batchSize,
10541055
RequestOptions requestOptions
10551056
) {
1056-
return chunkedPush(
1057+
if (this.ingestionTransporter == null) {
1058+
throw new AlgoliaRuntimeException("`setTransformationRegion` must have been called before calling this method.");
1059+
}
1060+
1061+
return this.ingestionTransporter.chunkedPush(
10571062
indexName,
10581063
objects,
10591064
createIfNotExists ? com.algolia.model.ingestion.Action.PARTIAL_UPDATE_OBJECT : com.algolia.model.ingestion.Action.PARTIAL_UPDATE_OBJECT_NO_CREATE,

templates/javascript/clients/algoliasearch/builds/definition.mustache

Lines changed: 12 additions & 76 deletions
Original file line numberDiff line numberDiff line change
@@ -10,7 +10,7 @@ import type { {{#lambda.titlecase}}{{{dependencyName}}}{{/lambda.titlecase}}Clie
1010

1111
import type { ChunkedBatchOptions, ReplaceAllObjectsOptions, ReplaceAllObjectsWithTransformationResponse } from '@algolia/client-search';
1212
import type { PartialUpdateObjectsOptions, SaveObjectsOptions } from '@algolia/client-search';
13-
import type { PushTaskRecords, WatchResponse } from '@algolia/ingestion';
13+
import type { WatchResponse } from '@algolia/ingestion';
1414

1515
import type {
1616
InitClientOptions,
@@ -75,21 +75,6 @@ export type Algoliasearch = SearchClient & {
7575
options: ReplaceAllObjectsOptions,
7676
requestOptions?: RequestOptions | undefined,
7777
) => Promise<ReplaceAllObjectsWithTransformationResponse>;
78-
79-
/**
80-
* Helper: Chunks the given `objects` list in subset of 1000 elements max in order to make it fit in `push` requests by leveraging the Transformation pipeline setup in the Push connector (https://www.algolia.com/doc/guides/sending-and-managing-data/send-and-update-your-data/connectors/push/).
81-
*
82-
* @summary Helper: Chunks the given `objects` list in subset of 1000 elements max in order to make it fit in `batch` requests.
83-
* @param chunkedPush - The `chunkedPush` object.
84-
* @param chunkedPush.indexName - The `indexName` to replace `objects` in.
85-
* @param chunkedPush.objects - The array of `objects` to store in the given Algolia `indexName`.
86-
* @param chunkedPush.action - The `batch` `action` to perform on the given array of `objects`, defaults to `addObject`.
87-
* @param chunkedPush.waitForTasks - Whether or not we should wait until every `batch` tasks has been processed, this operation may slow the total execution time of this method but is more reliable.
88-
* @param chunkedPush.batchSize - The size of the chunk of `objects`. The number of `batch` calls will be equal to `length(objects) / batchSize`. Defaults to 1000.
89-
* @param chunkedPush.referenceIndexName - This is required when targeting an index that does not have a push connector setup (e.g. a tmp index), but you wish to attach another index's transformation to it (e.g. the source index name).
90-
* @param requestOptions - The requestOptions to send along with the query, they will be forwarded to the `getEvent` method and merged with the transporter requestOptions.
91-
*/
92-
chunkedPush: (options: ChunkedBatchOptions & { referenceIndexName?: string }, requestOptions?: RequestOptions) => Promise<Array<WatchResponse>>;
9378
};
9479

9580
export type TransformationOptions = {
@@ -129,19 +114,20 @@ export function algoliasearch(
129114
...client,
130115
131116
async saveObjectsWithTransformation({ indexName, objects, waitForTasks }, requestOptions): Promise<Array<WatchResponse>> {
132-
return this.chunkedPush({ indexName, objects, action: 'addObject' });
117+
if (!ingestionTransporter) {
118+
throw new Error('`transformation.region` must be provided at client instantiation before calling this method.');
119+
}
120+
121+
if (!options?.transformation?.region) {
122+
throw new Error('`region` must be provided when leveraging the transformation pipeline');
123+
}
124+
125+
return ingestionTransporter.chunkedPush({ indexName, objects, action: 'addObject', waitForTasks }, requestOptions);
133126
},
134127

135128
async partialUpdateObjectsWithTransformation(
136129
{ indexName, objects, createIfNotExists, waitForTasks },
137130
requestOptions,
138-
): Promise<Array<WatchResponse>> {
139-
return this.chunkedPush({ indexName, objects, action: createIfNotExists ? 'partialUpdateObject' : 'partialUpdateObjectNoCreate' });
140-
},
141-
142-
async chunkedPush(
143-
{ indexName, objects, action = 'addObject', waitForTasks, batchSize = 1000, referenceIndexName }: ChunkedBatchOptions & { referenceIndexName?: string },
144-
requestOptions?: RequestOptions,
145131
): Promise<Array<WatchResponse>> {
146132
if (!ingestionTransporter) {
147133
throw new Error('`transformation.region` must be provided at client instantiation before calling this method.');
@@ -151,57 +137,7 @@ export function algoliasearch(
151137
throw new Error('`region` must be provided when leveraging the transformation pipeline');
152138
}
153139

154-
let records: Array<PushTaskRecords> = [];
155-
const responses: Array<WatchResponse> = [];
156-
157-
const objectEntries = objects.entries();
158-
for (const [i, obj] of objectEntries) {
159-
records.push(obj as PushTaskRecords);
160-
if (records.length === batchSize || i === objects.length - 1) {
161-
responses.push(
162-
await ingestionTransporter.push(
163-
{ indexName, pushTaskPayload: { action, records }, referenceIndexName },
164-
requestOptions,
165-
),
166-
);
167-
records = [];
168-
}
169-
}
170-
171-
let retryCount = 0;
172-
173-
if (waitForTasks) {
174-
for (const resp of responses) {
175-
if (!resp.eventID) {
176-
throw new Error('received unexpected response from the push endpoint, eventID must not be undefined');
177-
}
178-
179-
await createIterablePromise({
180-
func: async () => {
181-
if (resp.eventID === undefined || !resp.eventID) {
182-
throw new Error('received unexpected response from the push endpoint, eventID must not be undefined');
183-
}
184-
185-
return ingestionTransporter.getEvent({ runID: resp.runID, eventID: resp.eventID }).catch((error: ApiError) => {
186-
if (error.status === 404) {
187-
return undefined;
188-
}
189-
190-
throw error;
191-
})
192-
},
193-
validate: (response) => response !== undefined,
194-
aggregator: () => (retryCount += 1),
195-
error: {
196-
validate: () => retryCount >= 50,
197-
message: () => `The maximum number of retries exceeded. (${retryCount}/${50})`,
198-
},
199-
timeout: (): number => Math.min(retryCount * 500, 5000),
200-
});
201-
}
202-
}
203-
204-
return responses;
140+
return ingestionTransporter.chunkedPush({ indexName, objects, action: createIfNotExists ? 'partialUpdateObject' : 'partialUpdateObjectNoCreate', waitForTasks }, requestOptions);
205141
},
206142

207143
async replaceAllObjectsWithTransformation(
@@ -236,7 +172,7 @@ export function algoliasearch(
236172
requestOptions,
237173
);
238174

239-
const watchResponses = await this.chunkedPush(
175+
const watchResponses = await ingestionTransporter.chunkedPush(
240176
{ indexName: tmpIndexName, objects, waitForTasks: true, batchSize, referenceIndexName: indexName },
241177
requestOptions,
242178
);

0 commit comments

Comments
 (0)