Skip to content

Commit 50a4027

Browse files
fix(clients): allow chunked requests on WithTransformation methods (generated)
algolia/api-clients-automation#5011 Co-authored-by: algolia-bot <[email protected]> Co-authored-by: Clément Vannicatte <[email protected]>
1 parent 7508aea commit 50a4027

File tree

2 files changed

+118
-34
lines changed

2 files changed

+118
-34
lines changed

algoliasearch/src/main/java/com/algolia/api/IngestionClient.java

Lines changed: 100 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -10,6 +10,7 @@
1010
import com.algolia.model.ingestion.*;
1111
import com.algolia.utils.*;
1212
import com.fasterxml.jackson.core.type.TypeReference;
13+
import com.fasterxml.jackson.databind.ObjectMapper;
1314
import java.time.Duration;
1415
import java.util.ArrayList;
1516
import java.util.EnumSet;
@@ -4845,4 +4846,103 @@ public CompletableFuture<WatchResponse> validateSourceBeforeUpdateAsync(@Nonnull
48454846
throws AlgoliaRuntimeException {
48464847
return this.validateSourceBeforeUpdateAsync(sourceID, sourceUpdate, null);
48474848
}
4849+
4850+
private <T> List<PushTaskRecords> objectsToPushTaskRecords(Iterable<T> objects) {
4851+
try {
4852+
ObjectMapper mapper = new ObjectMapper();
4853+
String json = mapper.writeValueAsString(objects);
4854+
4855+
return mapper.readValue(json, new TypeReference<List<PushTaskRecords>>() {});
4856+
} catch (Exception e) {
4857+
throw new AlgoliaRuntimeException("each object must have an `objectID` key in order to be indexed");
4858+
}
4859+
}
4860+
4861+
/**
4862+
* Helper: Chunks the given `objects` list in subset of 1000 elements max in order to make it fit
4863+
* in `push` requests by leveraging the Transformation pipeline setup in the Push connector
4864+
* (https://www.algolia.com/doc/guides/sending-and-managing-data/send-and-update-your-data/connectors/push/).
4865+
*
4866+
* @summary Helper: Chunks the given `objects` list in subset of 1000 elements max in order to
4867+
* make it fit in `batch` requests.
4868+
* @param indexName - The `indexName` to replace `objects` in.
4869+
* @param objects - The array of `objects` to store in the given Algolia `indexName`.
4870+
* @param action - The `batch` `action` to perform on the given array of `objects`.
4871+
* @param waitForTasks - Whether or not we should wait until every `batch` tasks has been
4872+
* processed, this operation may slow the total execution time of this method but is more
4873+
* reliable.
4874+
* @param batchSize - The size of the chunk of `objects`. The number of `batch` calls will be
4875+
* equal to `length(objects) / batchSize`. Defaults to 1000.
4876+
* @param referenceIndexName - This is required when targeting an index that does not have a push
4877+
* connector setup (e.g. a tmp index), but you wish to attach another index's transformation
4878+
* to it (e.g. the source index name).
4879+
* @param requestOptions - The requestOptions to send along with the query, they will be forwarded
4880+
* to the `getEvent` method and merged with the transporter requestOptions.
4881+
*/
4882+
public <T> List<WatchResponse> chunkedPush(
4883+
String indexName,
4884+
Iterable<T> objects,
4885+
Action action,
4886+
boolean waitForTasks,
4887+
int batchSize,
4888+
String referenceIndexName,
4889+
RequestOptions requestOptions
4890+
) {
4891+
List<WatchResponse> responses = new ArrayList<>();
4892+
List<T> records = new ArrayList<>();
4893+
4894+
for (T item : objects) {
4895+
if (records.size() == batchSize) {
4896+
WatchResponse watch =
4897+
this.push(
4898+
indexName,
4899+
new PushTaskPayload().setAction(action).setRecords(this.objectsToPushTaskRecords(records)),
4900+
waitForTasks,
4901+
referenceIndexName,
4902+
requestOptions
4903+
);
4904+
responses.add(watch);
4905+
records.clear();
4906+
}
4907+
4908+
records.add(item);
4909+
}
4910+
4911+
if (records.size() > 0) {
4912+
WatchResponse watch =
4913+
this.push(
4914+
indexName,
4915+
new PushTaskPayload().setAction(action).setRecords(this.objectsToPushTaskRecords(records)),
4916+
waitForTasks,
4917+
referenceIndexName,
4918+
requestOptions
4919+
);
4920+
responses.add(watch);
4921+
}
4922+
4923+
if (waitForTasks) {
4924+
responses.forEach(response -> {
4925+
TaskUtils.retryUntil(
4926+
() -> {
4927+
try {
4928+
return this.getEvent(response.getRunID(), response.getEventID());
4929+
} catch (AlgoliaApiException e) {
4930+
if (e.getStatusCode() == 404) {
4931+
return null;
4932+
}
4933+
4934+
throw e;
4935+
}
4936+
},
4937+
(Event resp) -> {
4938+
return resp != null;
4939+
},
4940+
50,
4941+
null
4942+
);
4943+
});
4944+
}
4945+
4946+
return responses;
4947+
}
48484948
}

algoliasearch/src/main/java/com/algolia/api/SearchClient.java

Lines changed: 18 additions & 34 deletions
Original file line numberDiff line numberDiff line change
@@ -8,13 +8,10 @@
88
import com.algolia.config.ClientOptions;
99
import com.algolia.exceptions.*;
1010
import com.algolia.internal.JsonSerializer;
11-
import com.algolia.model.ingestion.PushTaskPayload;
12-
import com.algolia.model.ingestion.PushTaskRecords;
1311
import com.algolia.model.ingestion.WatchResponse;
1412
import com.algolia.model.search.*;
1513
import com.algolia.utils.*;
1614
import com.fasterxml.jackson.core.type.TypeReference;
17-
import com.fasterxml.jackson.databind.ObjectMapper;
1815
import java.nio.charset.Charset;
1916
import java.security.InvalidKeyException;
2017
import java.security.NoSuchAlgorithmException;
@@ -6765,7 +6762,7 @@ public <T> List<BatchResponse> chunkedBatch(
67656762
* @throws AlgoliaApiException When the API sends an http error code
67666763
* @throws AlgoliaRuntimeException When an error occurred during the serialization
67676764
*/
6768-
public <T> WatchResponse saveObjectsWithTransformation(String indexName, Iterable<T> objects) {
6765+
public <T> List<WatchResponse> saveObjectsWithTransformation(String indexName, Iterable<T> objects) {
67696766
return saveObjectsWithTransformation(indexName, objects, null);
67706767
}
67716768

@@ -6780,7 +6777,7 @@ public <T> WatchResponse saveObjectsWithTransformation(String indexName, Iterabl
67806777
* @param requestOptions The requestOptions to send along with the query, they will be merged with
67816778
* the transporter requestOptions. (optional)
67826779
*/
6783-
public <T> WatchResponse saveObjectsWithTransformation(String indexName, Iterable<T> objects, RequestOptions requestOptions) {
6780+
public <T> List<WatchResponse> saveObjectsWithTransformation(String indexName, Iterable<T> objects, RequestOptions requestOptions) {
67846781
return saveObjectsWithTransformation(indexName, objects, false, requestOptions);
67856782
}
67866783

@@ -6798,7 +6795,7 @@ public <T> WatchResponse saveObjectsWithTransformation(String indexName, Iterabl
67986795
* @param requestOptions The requestOptions to send along with the query, they will be merged with
67996796
* the transporter requestOptions. (optional)
68006797
*/
6801-
public <T> WatchResponse saveObjectsWithTransformation(
6798+
public <T> List<WatchResponse> saveObjectsWithTransformation(
68026799
String indexName,
68036800
Iterable<T> objects,
68046801
boolean waitForTasks,
@@ -6823,7 +6820,7 @@ public <T> WatchResponse saveObjectsWithTransformation(
68236820
* @param requestOptions The requestOptions to send along with the query, they will be merged with
68246821
* the transporter requestOptions. (optional)
68256822
*/
6826-
public <T> WatchResponse saveObjectsWithTransformation(
6823+
public <T> List<WatchResponse> saveObjectsWithTransformation(
68276824
String indexName,
68286825
Iterable<T> objects,
68296826
boolean waitForTasks,
@@ -6834,28 +6831,17 @@ public <T> WatchResponse saveObjectsWithTransformation(
68346831
throw new AlgoliaRuntimeException("`setTransformationRegion` must have been called before calling this method.");
68356832
}
68366833

6837-
return this.ingestionTransporter.push(
6834+
return this.ingestionTransporter.chunkedPush(
68386835
indexName,
6839-
new PushTaskPayload().setAction(com.algolia.model.ingestion.Action.ADD_OBJECT).setRecords(this.objectsToPushTaskRecords(objects)),
6836+
objects,
6837+
com.algolia.model.ingestion.Action.ADD_OBJECT,
68406838
waitForTasks,
6839+
batchSize,
68416840
null,
68426841
requestOptions
68436842
);
68446843
}
68456844

6846-
private <T> List<PushTaskRecords> objectsToPushTaskRecords(Iterable<T> objects) {
6847-
try {
6848-
ObjectMapper mapper = new ObjectMapper();
6849-
String json = mapper.writeValueAsString(objects);
6850-
6851-
return mapper.readValue(json, new TypeReference<List<PushTaskRecords>>() {});
6852-
} catch (Exception e) {
6853-
throw new AlgoliaRuntimeException(
6854-
"each object must have an `objectID` key in order to be used with the" + " WithTransformation methods"
6855-
);
6856-
}
6857-
}
6858-
68596845
/**
68606846
* Helper: Saves the given array of objects in the given index. The `chunkedBatch` helper is used
68616847
* under the hood, which creates a `batch` requests with at most 1000 objects in it.
@@ -7003,7 +6989,7 @@ public List<BatchResponse> deleteObjects(
70036989
* @param createIfNotExists To be provided if non-existing objects are passed, otherwise, the call
70046990
* will fail.
70056991
*/
7006-
public <T> WatchResponse partialUpdateObjectsWithTransformation(String indexName, Iterable<T> objects, boolean createIfNotExists) {
6992+
public <T> List<WatchResponse> partialUpdateObjectsWithTransformation(String indexName, Iterable<T> objects, boolean createIfNotExists) {
70076993
return partialUpdateObjectsWithTransformation(indexName, objects, createIfNotExists, false, null);
70086994
}
70096995

@@ -7021,7 +7007,7 @@ public <T> WatchResponse partialUpdateObjectsWithTransformation(String indexName
70217007
* processed, this operation may slow the total execution time of this method but is more
70227008
* reliable.
70237009
*/
7024-
public <T> WatchResponse partialUpdateObjectsWithTransformation(
7010+
public <T> List<WatchResponse> partialUpdateObjectsWithTransformation(
70257011
String indexName,
70267012
Iterable<T> objects,
70277013
boolean createIfNotExists,
@@ -7046,7 +7032,7 @@ public <T> WatchResponse partialUpdateObjectsWithTransformation(
70467032
* @param requestOptions The requestOptions to send along with the query, they will be merged with
70477033
* the transporter requestOptions. (optional)
70487034
*/
7049-
public <T> WatchResponse partialUpdateObjectsWithTransformation(
7035+
public <T> List<WatchResponse> partialUpdateObjectsWithTransformation(
70507036
String indexName,
70517037
Iterable<T> objects,
70527038
boolean createIfNotExists,
@@ -7074,7 +7060,7 @@ public <T> WatchResponse partialUpdateObjectsWithTransformation(
70747060
* @param requestOptions The requestOptions to send along with the query, they will be merged with
70757061
* the transporter requestOptions. (optional)
70767062
*/
7077-
public <T> WatchResponse partialUpdateObjectsWithTransformation(
7063+
public <T> List<WatchResponse> partialUpdateObjectsWithTransformation(
70787064
String indexName,
70797065
Iterable<T> objects,
70807066
boolean createIfNotExists,
@@ -7086,16 +7072,14 @@ public <T> WatchResponse partialUpdateObjectsWithTransformation(
70867072
throw new AlgoliaRuntimeException("`setTransformationRegion` must have been called before calling this method.");
70877073
}
70887074

7089-
return this.ingestionTransporter.push(
7075+
return this.ingestionTransporter.chunkedPush(
70907076
indexName,
7091-
new PushTaskPayload()
7092-
.setAction(
7093-
createIfNotExists
7094-
? com.algolia.model.ingestion.Action.PARTIAL_UPDATE_OBJECT
7095-
: com.algolia.model.ingestion.Action.PARTIAL_UPDATE_OBJECT_NO_CREATE
7096-
)
7097-
.setRecords(this.objectsToPushTaskRecords(objects)),
7077+
objects,
7078+
createIfNotExists
7079+
? com.algolia.model.ingestion.Action.PARTIAL_UPDATE_OBJECT
7080+
: com.algolia.model.ingestion.Action.PARTIAL_UPDATE_OBJECT_NO_CREATE,
70987081
waitForTasks,
7082+
batchSize,
70997083
null,
71007084
requestOptions
71017085
);

0 commit comments

Comments
 (0)