From 916186a0fd4d1221f497b178a932718332d8d78c Mon Sep 17 00:00:00 2001 From: shortcuts Date: Wed, 18 Jun 2025 16:30:17 +0200 Subject: [PATCH 1/2] fix(clients): allow chunked requests on WithTransformation methods --- .../__tests__/algoliasearch.common.test.ts | 80 --------- .../helpers/chunkedPush.yml | 4 + specs/ingestion/spec.yml | 3 + ...partialUpdateObjectsWithTransformation.yml | 4 +- .../helpers/saveObjectsWithTransformation.yml | 4 +- templates/go/api.mustache | 112 ++++-------- templates/go/helpers.mustache | 121 +++++++++++++ templates/go/ingestion_helpers.mustache | 90 ++++++++++ templates/go/search_helpers.mustache | 120 ++----------- templates/java/api.mustache | 1 + templates/java/api_helpers.mustache | 162 +++++++++++++----- .../algoliasearch/builds/definition.mustache | 115 +------------ .../javascript/clients/api-single.mustache | 4 +- .../clients/client/api/imports.mustache | 8 + .../client/api/ingestionHelpers.mustache | 69 ++++++++ .../client/model/clientMethodProps.mustache | 33 ++++ templates/php/api.mustache | 89 +++++++++- templates/python/api.mustache | 9 +- templates/python/ingestion_helpers.mustache | 64 +++++++ templates/python/search_helpers.mustache | 32 +--- ...artialUpdateObjectsWithTransformation.json | 4 +- .../search/saveObjectsWithTransformation.json | 4 +- 22 files changed, 681 insertions(+), 451 deletions(-) rename specs/{search => ingestion}/helpers/chunkedPush.yml (97%) create mode 100644 templates/go/helpers.mustache create mode 100644 templates/go/ingestion_helpers.mustache create mode 100644 templates/javascript/clients/client/api/ingestionHelpers.mustache create mode 100644 templates/python/ingestion_helpers.mustache diff --git a/clients/algoliasearch-client-javascript/packages/algoliasearch/__tests__/algoliasearch.common.test.ts b/clients/algoliasearch-client-javascript/packages/algoliasearch/__tests__/algoliasearch.common.test.ts index c76d2baa185..1a0b00573d2 100644 --- a/clients/algoliasearch-client-javascript/packages/algoliasearch/__tests__/algoliasearch.common.test.ts +++ b/clients/algoliasearch-client-javascript/packages/algoliasearch/__tests__/algoliasearch.common.test.ts @@ -168,86 +168,6 @@ describe('api', () => { }), ).rejects.toThrow('`transformation.region` must be provided at client instantiation before calling this method.'); }); - - test('exposes the transformation methods at the root of the client', async () => { - const ingestionClient = algoliasearch('APP_ID', 'API_KEY', { - requester: browserEchoRequester(), - transformation: { region: 'us' }, - }); - - expect(ingestionClient.saveObjectsWithTransformation).not.toBeUndefined(); - - let res = (await ingestionClient.saveObjectsWithTransformation({ - indexName: 'foo', - objects: [{ objectID: 'bar', baz: 42 }], - waitForTasks: true, - })) as unknown as EchoResponse; - - expect(res.headers).toEqual( - expect.objectContaining({ - 'x-algolia-application-id': 'APP_ID', - 'x-algolia-api-key': 'API_KEY', - }), - ); - expect(res.url.startsWith('https://data.us.algolia.com/1/push/foo?watch=true')).toBeTruthy(); - expect(res.data).toEqual({ - action: 'addObject', - records: [ - { - baz: 42, - objectID: 'bar', - }, - ], - }); - expect(ingestionClient.partialUpdateObjectsWithTransformation).not.toBeUndefined(); - - res = (await ingestionClient.partialUpdateObjectsWithTransformation({ - indexName: 'foo', - objects: [{ objectID: 'bar', baz: 42 }], - waitForTasks: true, - createIfNotExists: true, - })) as unknown as EchoResponse; - - expect(res.headers).toEqual( - expect.objectContaining({ - 'x-algolia-application-id': 'APP_ID', - 'x-algolia-api-key': 'API_KEY', - }), - ); - expect(res.url.startsWith('https://data.us.algolia.com/1/push/foo?watch=true')).toBeTruthy(); - expect(res.data).toEqual({ - action: 'partialUpdateObject', - records: [ - { - baz: 42, - objectID: 'bar', - }, - ], - }); - - res = (await ingestionClient.partialUpdateObjectsWithTransformation({ - indexName: 'foo', - objects: [{ objectID: 'bar', baz: 42 }], - waitForTasks: true, - })) as unknown as EchoResponse; - - expect(res.headers).toEqual( - expect.objectContaining({ - 'x-algolia-application-id': 'APP_ID', - 'x-algolia-api-key': 'API_KEY', - }), - ); - expect(res.url.startsWith('https://data.us.algolia.com/1/push/foo?watch=true')).toBeTruthy(); - expect(res.data).toEqual({ - action: 'partialUpdateObjectNoCreate', - records: [ - { - baz: 42, - objectID: 'bar', - }, - ], - }); - }); }); }); diff --git a/specs/search/helpers/chunkedPush.yml b/specs/ingestion/helpers/chunkedPush.yml similarity index 97% rename from specs/search/helpers/chunkedPush.yml rename to specs/ingestion/helpers/chunkedPush.yml index b0a3c92f357..eb9173a413f 100644 --- a/specs/search/helpers/chunkedPush.yml +++ b/specs/ingestion/helpers/chunkedPush.yml @@ -5,6 +5,10 @@ method: - Records x-available-languages: - javascript + - go + - java + - php + - python operationId: chunkedPush summary: Replace all records in an index description: | diff --git a/specs/ingestion/spec.yml b/specs/ingestion/spec.yml index 9d7a3cb6825..4f0712ec058 100644 --- a/specs/ingestion/spec.yml +++ b/specs/ingestion/spec.yml @@ -202,3 +202,6 @@ paths: # ############### /setClientApiKey: $ref: '../common/helpers/setClientApiKey.yml#/method' + + /chunkedPush: + $ref: 'helpers/chunkedPush.yml#/method' diff --git a/specs/search/helpers/partialUpdateObjectsWithTransformation.yml b/specs/search/helpers/partialUpdateObjectsWithTransformation.yml index bf1bae0dc13..544f7ebeeee 100644 --- a/specs/search/helpers/partialUpdateObjectsWithTransformation.yml +++ b/specs/search/helpers/partialUpdateObjectsWithTransformation.yml @@ -61,6 +61,8 @@ method: content: application/json: schema: - $ref: '../../common/schemas/ingestion/WatchResponse.yml' + type: array + items: + $ref: '../../common/schemas/ingestion/WatchResponse.yml' '400': $ref: '../../common/responses/IndexNotFound.yml' diff --git a/specs/search/helpers/saveObjectsWithTransformation.yml b/specs/search/helpers/saveObjectsWithTransformation.yml index ca487d7dc22..db8d7f7de79 100644 --- a/specs/search/helpers/saveObjectsWithTransformation.yml +++ b/specs/search/helpers/saveObjectsWithTransformation.yml @@ -54,6 +54,8 @@ method: content: application/json: schema: - $ref: '../../common/schemas/ingestion/WatchResponse.yml' + type: array + items: + $ref: '../../common/schemas/ingestion/WatchResponse.yml' '400': $ref: '../../common/responses/IndexNotFound.yml' diff --git a/templates/go/api.mustache b/templates/go/api.mustache index d31d81d6c2c..e82cd9db638 100644 --- a/templates/go/api.mustache +++ b/templates/go/api.mustache @@ -33,6 +33,17 @@ type config struct { headerParams map[string]string timeouts transport.RequestConfiguration + {{#isIngestionClient}} + // -- ChunkedPush options + waitForTasks bool + batchSize int + + // -- Iterable options + maxRetries int + timeout func(int) time.Duration + aggregator func(any, error) + {{/isIngestionClient}} + {{#isSearchClient}} // -- ChunkedBatch options waitForTasks bool @@ -102,42 +113,6 @@ func WithConnectTimeout(timeout time.Duration) requestOption { {{#isSearchClient}} -// --------- ChunkedBatch options --------- - -type ChunkedBatchOption interface { - RequestOption - chunkedBatch() -} - -type chunkedBatchOption func(*config) - -var ( - _ ChunkedBatchOption = (*chunkedBatchOption)(nil) - _ ChunkedBatchOption = (*requestOption)(nil) -) - -func (c chunkedBatchOption) apply(conf *config) { - c(conf) -} - -func (c chunkedBatchOption) chunkedBatch() {} - -func (r requestOption) chunkedBatch() {} - -// WithWaitForTasks whether or not we should wait until every `batch` tasks has been processed, this operation may slow the total execution time of this method but is more reliable. -func WithWaitForTasks(waitForTasks bool) chunkedBatchOption { - return chunkedBatchOption(func(c *config) { - c.waitForTasks = waitForTasks - }) -} - -// WithBatchSize the size of the chunk of `objects`. The number of `batch` calls will be equal to `length(objects) / batchSize`. Defaults to 1000. -func WithBatchSize(batchSize int) chunkedBatchOption { - return chunkedBatchOption(func(c *config) { - c.batchSize = batchSize - }) -} - // --------- PartialUpdateObjects options --------- type PartialUpdateObjectsOption interface { @@ -206,49 +181,6 @@ func WithScopes(scopes []ScopeType) replaceAllObjectsOption { }) } -// --------- Iterable options ---------. - -type IterableOption interface { - RequestOption - iterable() -} - -type iterableOption func(*config) - -var ( - _ IterableOption = (*iterableOption)(nil) - _ IterableOption = (*requestOption)(nil) -) - -func (i iterableOption) apply(c *config) { - i(c) -} - -func (r requestOption) iterable() {} - -func (i iterableOption) iterable() {} - -// WithMaxRetries the maximum number of retry. Default to 50. -func WithMaxRetries(maxRetries int) iterableOption { - return iterableOption(func(c *config) { - c.maxRetries = maxRetries - }) -} - -// WithTimeout he function to decide how long to wait between retries. Default to min(retryCount * 200, 5000) -func WithTimeout(timeout func(int) time.Duration) iterableOption { - return iterableOption(func(c *config) { - c.timeout = timeout - }) -} - -// WithAggregator the function to aggregate the results of the iterable. -func WithAggregator(aggregator func(any, error)) iterableOption { - return iterableOption(func(c *config) { - c.aggregator = aggregator - }) -} - // --------- WaitForKey options ---------. type WaitForApiKeyOption interface { @@ -295,6 +227,18 @@ func toRequestOptions[T RequestOption](opts []T) []RequestOption { return requestOpts } +func toIngestionRequestOptions(opts []RequestOption) []ingestion.RequestOption { + requestOpts := make([]ingestion.RequestOption, 0, len(opts)) + + for _, opt := range opts { + if opt, ok := opt.(ingestion.RequestOption); ok { + requestOpts = append(requestOpts, opt) + } + } + + return requestOpts +} + func toIterableOptions(opts []ChunkedBatchOption) []IterableOption { iterableOpts := make([]IterableOption, 0, len(opts)) @@ -571,5 +515,13 @@ func (c *APIClient) {{nickname}}({{#hasParams}}r {{#structPrefix}}{{&classname}} {{/operations}} {{#isSearchClient}} +{{> helpers}} + {{> search_helpers}} -{{/isSearchClient}} \ No newline at end of file +{{/isSearchClient}} + +{{#isIngestionClient}} +{{> helpers}} + +{{> ingestion_helpers}} +{{/isIngestionClient}} \ No newline at end of file diff --git a/templates/go/helpers.mustache b/templates/go/helpers.mustache new file mode 100644 index 00000000000..67ede6bd9ca --- /dev/null +++ b/templates/go/helpers.mustache @@ -0,0 +1,121 @@ +// --------- ChunkedBatch options --------- + +type ChunkedBatchOption interface { + RequestOption + chunkedBatch() +} + +type chunkedBatchOption func(*config) + +var ( + _ ChunkedBatchOption = (*chunkedBatchOption)(nil) + _ ChunkedBatchOption = (*requestOption)(nil) +) + +func (c chunkedBatchOption) apply(conf *config) { + c(conf) +} + +func (c chunkedBatchOption) chunkedBatch() {} + +func (r requestOption) chunkedBatch() {} + +// WithWaitForTasks whether or not we should wait until every `batch` tasks has been processed, this operation may slow the total execution time of this method but is more reliable. +func WithWaitForTasks(waitForTasks bool) chunkedBatchOption { + return chunkedBatchOption(func(c *config) { + c.waitForTasks = waitForTasks + }) +} + +// WithBatchSize the size of the chunk of `objects`. The number of `batch` calls will be equal to `length(objects) / batchSize`. Defaults to 1000. +func WithBatchSize(batchSize int) chunkedBatchOption { + return chunkedBatchOption(func(c *config) { + c.batchSize = batchSize + }) +} + +// --------- Iterable options ---------. + +type IterableOption interface { + RequestOption + iterable() +} + +type iterableOption func(*config) + +var ( + _ IterableOption = (*iterableOption)(nil) + _ IterableOption = (*requestOption)(nil) +) + +func (i iterableOption) apply(c *config) { + i(c) +} + +func (r requestOption) iterable() {} + +func (i iterableOption) iterable() {} + +// WithMaxRetries the maximum number of retry. Default to 50. +func WithMaxRetries(maxRetries int) iterableOption { + return iterableOption(func(c *config) { + c.maxRetries = maxRetries + }) +} + +// WithTimeout he function to decide how long to wait between retries. Default to min(retryCount * 200, 5000) +func WithTimeout(timeout func(int) time.Duration) iterableOption { + return iterableOption(func(c *config) { + c.timeout = timeout + }) +} + +// WithAggregator the function to aggregate the results of the iterable. +func WithAggregator(aggregator func(any, error)) iterableOption { + return iterableOption(func(c *config) { + c.aggregator = aggregator + }) +} + +func CreateIterable[T any](execute func(*T, error) (*T, error), validate func(*T, error) (bool, error), opts ...IterableOption) (*T, error) { + conf := config{ + headerParams: map[string]string{}, + maxRetries: -1, + timeout: func(count int) time.Duration { + return 0 * time.Millisecond + }, + } + + for _, opt := range opts { + opt.apply(&conf) + } + + var executor func(*T, error) (*T, error) + + retryCount := 0 + + executor = func(previousResponse *T, previousError error) (*T, error) { + response, responseErr := execute(previousResponse, previousError) + + retryCount++ + + if conf.aggregator != nil { + conf.aggregator(response, responseErr) + } + + canStop, err := validate(response, responseErr) + if canStop || err != nil { + return response, err + } + + if conf.maxRetries >= 0 && retryCount >= conf.maxRetries { + return nil, errs.NewWaitError(fmt.Sprintf("The maximum number of retries exceeded. (%d/%d)", retryCount, conf.maxRetries)) + } + + time.Sleep(conf.timeout(retryCount)) + + return executor(response, responseErr) + } + + return executor(nil, nil) +} \ No newline at end of file diff --git a/templates/go/ingestion_helpers.mustache b/templates/go/ingestion_helpers.mustache new file mode 100644 index 00000000000..c1cd379ab96 --- /dev/null +++ b/templates/go/ingestion_helpers.mustache @@ -0,0 +1,90 @@ +/* +ChunkedPush Chunks the given `objects` list in subset of 1000 elements max in order to make it fit in `push` requests by leveraging the Transformation pipeline setup in the Push connector (https://www.algolia.com/doc/guides/sending-and-managing-data/send-and-update-your-data/connectors/push/). + + @param indexName string - the index name to save objects into. + @param objects []map[string]any - List of objects to save. + @param action Action - The action to perform on the objects. + @param referenceIndexName *string - This is required when targeting an index that does not have a push connector setup (e.g. a tmp index), but you wish to attach another index's transformation to it (e.g. the source index name). + @param opts ...ChunkedBatchOption - Optional parameters for the request. + @return []WatchResponse - List of push responses. + @return error - Error if any. +*/ +func (c *APIClient) ChunkedPush(indexName string, objects []map[string]any, action Action, referenceIndexName *string, opts ...RequestOption) ([]WatchResponse, error) { + conf := config{ + headerParams: map[string]string{}, + waitForTasks: false, + batchSize: 1000, + } + + for _, opt := range opts { + opt.apply(&conf) + } + + records := make([]map[string]any, 0, len(objects)%conf.batchSize) + responses := make([]WatchResponse, 0, len(objects)%conf.batchSize) + + for i, obj := range objects { + records = append(records, obj) + + if len(records) == conf.batchSize || i == len(objects)-1 { + pushRecords := make([]PushTaskRecords, 0, len(records)) + + rawRecords, err := json.Marshal(records) + if err != nil { + return nil, reportError("unable to marshal the given `objects`: %w", err) + } + + err = json.Unmarshal(rawRecords, &pushRecords) + if err != nil { + return nil, reportError("unable to unmarshal the given `objects` to an `[]PushTaskRecords` payload: %w", err) + } + + request := c.NewApiPushRequest( + indexName, + NewEmptyPushTaskPayload(). + SetAction(action). + SetRecords(pushRecords), + ) + + if referenceIndexName != nil { + request = request.WithReferenceIndexName(*referenceIndexName) + } + + resp, err := c.Push(request) + if err != nil { + return nil, err //nolint: wrapcheck + } + + responses = append(responses, *resp) + records = make([]map[string]any, 0, len(objects)%conf.batchSize) + } + } + + if conf.waitForTasks { + for _, resp := range responses { + _, err := CreateIterable( //nolint:wrapcheck + func(*Event, error) (*Event, error) { + if resp.EventID == nil { + return nil, reportError("received unexpected response from the push endpoint, eventID must not be undefined") + } + + return c.GetEvent(c.NewApiGetEventRequest(resp.RunID, *resp.EventID)) + }, + func(response *Event, err error) (bool, error) { + var apiErr *APIError + if errors.As(err, &apiErr) { + return apiErr.Status != 404, nil + } + + return true, err + }, + WithTimeout(func(count int) time.Duration { return time.Duration(min(500*count, 5000)) * time.Millisecond }), WithMaxRetries(50), + ) + if err != nil { + return nil, err + } + } + } + + return responses, nil +} \ No newline at end of file diff --git a/templates/go/search_helpers.mustache b/templates/go/search_helpers.mustache index a830cd95aec..2df49c6712d 100644 --- a/templates/go/search_helpers.mustache +++ b/templates/go/search_helpers.mustache @@ -1,46 +1,3 @@ -func CreateIterable[T any](execute func(*T, error) (*T, error), validate func(*T, error) (bool, error), opts ...IterableOption) (*T, error) { - conf := config{ - headerParams: map[string]string{}, - maxRetries: -1, - timeout: func(count int) time.Duration { - return 0 * time.Millisecond - }, - } - - for _, opt := range opts { - opt.apply(&conf) - } - - var executor func(*T, error) (*T, error) - - retryCount := 0 - - executor = func(previousResponse *T, previousError error) (*T, error) { - response, responseErr := execute(previousResponse, previousError) - - retryCount++ - - if conf.aggregator != nil { - conf.aggregator(response, responseErr) - } - - canStop, err := validate(response, responseErr) - if canStop || err != nil { - return response, err - } - - if conf.maxRetries >= 0 && retryCount >= conf.maxRetries { - return nil, errs.NewWaitError(fmt.Sprintf("The maximum number of retries exceeded. (%d/%d)", retryCount, conf.maxRetries)) - } - - time.Sleep(conf.timeout(retryCount)) - - return executor(response, responseErr) - } - - return executor(nil, nil) -} - /* SearchForHits calls the `search` method but with certainty that we will only request Algolia records (hits) and not facets. Disclaimer: We don't assert that the parameters you pass to this method only contains `hits` requests to prevent impacting search performances, this helper is purely for typing purposes. @@ -746,31 +703,12 @@ Helper: Similar to the `SaveObjects` method but requires a Push connector (https @return []BatchResponse - List of batch responses. @return error - Error if any. */ -func (c *APIClient) SaveObjectsWithTransformation(indexName string, objects []map[string]any, opts ...ChunkedBatchOption) (*ingestion.WatchResponse, error) { - if c.ingestionTransporter == nil { +func (c *APIClient) SaveObjectsWithTransformation(indexName string, objects []map[string]any, opts ...ChunkedBatchOption) ([]ingestion.WatchResponse, error) { + if c.ingestionTransporter == nil { return nil, reportError("`region` must be provided at client instantiation before calling this method.") - } - - var records []ingestion.PushTaskRecords - - rawObjects, err := json.Marshal(objects) - if err != nil { - return nil, reportError("unable to marshal the given `objects`: %w", err) - } - - err = json.Unmarshal(rawObjects, &records) - if err != nil { - return nil, reportError("unable to unmarshal the given `objects` to an `[]ingestion.PushTaskRecords` payload: %w", err) - } + } - return c.ingestionTransporter.Push( //nolint:wrapcheck - c.ingestionTransporter.NewApiPushRequest( - indexName, - ingestion.NewEmptyPushTaskPayload(). - SetAction(ingestion.ACTION_ADD_OBJECT). - SetRecords(records), - ), - ) + return c.ingestionTransporter.ChunkedPush(indexName, objects, ingestion.Action(ACTION_ADD_OBJECT), nil, toIngestionRequestOptions(toRequestOptions(opts))...) //nolint:wrapcheck } /* @@ -782,47 +720,27 @@ Helper: Similar to the `PartialUpdateObjects` method but requires a Push connect @return []BatchResponse - List of batch responses. @return error - Error if any. */ -func (c *APIClient) PartialUpdateObjectsWithTransformation(indexName string, objects []map[string]any, opts ...PartialUpdateObjectsOption) (*ingestion.WatchResponse, error) { - if c.ingestionTransporter == nil { +func (c *APIClient) PartialUpdateObjectsWithTransformation(indexName string, objects []map[string]any, opts ...PartialUpdateObjectsOption) ([]ingestion.WatchResponse, error) { + if c.ingestionTransporter == nil { return nil, reportError("`region` must be provided at client instantiation before calling this method.") - } - - - conf := config{ - headerParams: map[string]string{}, - createIfNotExists: true, } - for _, opt := range opts { - opt.apply(&conf) - } - - var action ingestion.Action - - if conf.createIfNotExists { - action = ingestion.ACTION_PARTIAL_UPDATE_OBJECT - } else { - action = ingestion.ACTION_PARTIAL_UPDATE_OBJECT_NO_CREATE - } - - var records []ingestion.PushTaskRecords + conf := config{ + headerParams: map[string]string{}, + createIfNotExists: true, + } - rawObjects, err := json.Marshal(objects) - if err != nil { - return nil, reportError("unable to marshal the given `objects`: %w", err) + for _, opt := range opts { + opt.apply(&conf) } - err = json.Unmarshal(rawObjects, &records) - if err != nil { - return nil, reportError("unable to unmarshal the given `objects` to an `[]ingestion.PushTaskRecords` payload: %w", err) + var action Action + + if conf.createIfNotExists { + action = ACTION_PARTIAL_UPDATE_OBJECT + } else { + action = ACTION_PARTIAL_UPDATE_OBJECT_NO_CREATE } - return c.ingestionTransporter.Push( //nolint:wrapcheck - c.ingestionTransporter.NewApiPushRequest( - indexName, - ingestion.NewEmptyPushTaskPayload(). - SetAction(action). - SetRecords(records), - ), - ) + return c.ingestionTransporter.ChunkedPush(indexName, objects, ingestion.Action(action), nil, toIngestionRequestOptions(toRequestOptions(opts))...) //nolint:wrapcheck } \ No newline at end of file diff --git a/templates/java/api.mustache b/templates/java/api.mustache index 49c23c6185f..8f7f38baad8 100644 --- a/templates/java/api.mustache +++ b/templates/java/api.mustache @@ -46,6 +46,7 @@ import java.util.Base64; import javax.annotation.Nonnull; import javax.crypto.Mac; import javax.crypto.spec.SecretKeySpec; +import com.algolia.model.ingestion.Event; import com.algolia.model.ingestion.PushTaskPayload; import com.algolia.model.ingestion.PushTaskRecords; import com.algolia.model.ingestion.WatchResponse; diff --git a/templates/java/api_helpers.mustache b/templates/java/api_helpers.mustache index ded924a9bb3..f05a67645f6 100644 --- a/templates/java/api_helpers.mustache +++ b/templates/java/api_helpers.mustache @@ -1,3 +1,106 @@ +{{#isIngestionClient}} +private List objectsToPushTaskRecords(Iterable objects) { + try { + ObjectMapper mapper = new ObjectMapper(); + String json = mapper.writeValueAsString(objects); + + return mapper.readValue(json, new TypeReference>() {}); + } catch (Exception e) { + throw new AlgoliaRuntimeException( + "each object must have an `objectID` key in order to be indexed" + ); + } +} + +/** + * Helper: Chunks the given `objects` list in subset of 1000 elements max in order to make it fit + * in `push` requests by leveraging the Transformation pipeline setup in the Push connector + * (https://www.algolia.com/doc/guides/sending-and-managing-data/send-and-update-your-data/connectors/push/). + * + * @summary Helper: Chunks the given `objects` list in subset of 1000 elements max in order to + * make it fit in `batch` requests. + * @param indexName - The `indexName` to replace `objects` in. + * @param objects - The array of `objects` to store in the given Algolia `indexName`. + * @param action - The `batch` `action` to perform on the given array of `objects`. + * @param waitForTasks - Whether or not we should wait until every `batch` tasks has been + * processed, this operation may slow the total execution time of this method but is more + * reliable. + * @param batchSize - The size of the chunk of `objects`. The number of `batch` calls will be + * equal to `length(objects) / batchSize`. Defaults to 1000. + * @param referenceIndexName - This is required when targeting an index that does not have a push + * connector setup (e.g. a tmp index), but you wish to attach another index's transformation + * to it (e.g. the source index name). + * @param requestOptions - The requestOptions to send along with the query, they will be forwarded + * to the `getTask` method and merged with the transporter requestOptions. + */ +public List chunkedPush( + String indexName, + Iterable objects, + Action action, + boolean waitForTasks, + int batchSize, + String referenceIndexName, + RequestOptions requestOptions +) { + List responses = new ArrayList<>(); + List records = new ArrayList<>(); + + for (T item : objects) { + if (records.size() == batchSize) { + WatchResponse watch = + this.push( + indexName, + new PushTaskPayload().setAction(action).setRecords(this.objectsToPushTaskRecords(records)), + waitForTasks, + referenceIndexName, + requestOptions + ); + responses.add(watch); + records.clear(); + } + + records.add(item); + } + + if (records.size() > 0) { + WatchResponse watch = + this.push( + indexName, + new PushTaskPayload().setAction(action).setRecords(this.objectsToPushTaskRecords(records)), + waitForTasks, + referenceIndexName, + requestOptions + ); + responses.add(watch); + } + + if (waitForTasks) { + responses.forEach(response -> { + TaskUtils.retryUntil( + () -> { + try { + return this.getEvent(response.getRunID(), response.getEventID()); + } catch (AlgoliaApiException e) { + if (e.getStatusCode() == 404) { + return null; + } + + throw e; + } + }, + (Event resp) -> { + return resp != null; + }, + 50, + null + ); + } + ); + } + + return responses; +} +{{/isIngestionClient}} {{#isSearchClient}} /** * Helper: Wait for a task to complete with `indexName` and `taskID`. @@ -665,7 +768,7 @@ public List chunkedBatch( * @throws AlgoliaApiException When the API sends an http error code * @throws AlgoliaRuntimeException When an error occurred during the serialization */ -public WatchResponse saveObjectsWithTransformation(String indexName, Iterable objects) { +public List saveObjectsWithTransformation(String indexName, Iterable objects) { return saveObjectsWithTransformation(indexName, objects, null); } @@ -680,7 +783,7 @@ public WatchResponse saveObjectsWithTransformation(String indexName, Iterabl * @param requestOptions The requestOptions to send along with the query, they will be merged with * the transporter requestOptions. (optional) */ -public WatchResponse saveObjectsWithTransformation(String indexName, Iterable objects, RequestOptions requestOptions) { +public List saveObjectsWithTransformation(String indexName, Iterable objects, RequestOptions requestOptions) { return saveObjectsWithTransformation(indexName, objects, false, requestOptions); } @@ -698,7 +801,7 @@ public WatchResponse saveObjectsWithTransformation(String indexName, Iterabl * @param requestOptions The requestOptions to send along with the query, they will be merged with * the transporter requestOptions. (optional) */ -public WatchResponse saveObjectsWithTransformation( +public List saveObjectsWithTransformation( String indexName, Iterable objects, boolean waitForTasks, @@ -723,7 +826,7 @@ public WatchResponse saveObjectsWithTransformation( * @param requestOptions The requestOptions to send along with the query, they will be merged with * the transporter requestOptions. (optional) */ -public WatchResponse saveObjectsWithTransformation( +public List saveObjectsWithTransformation( String indexName, Iterable objects, boolean waitForTasks, @@ -734,26 +837,7 @@ public WatchResponse saveObjectsWithTransformation( throw new AlgoliaRuntimeException("`setTransformationRegion` must have been called before calling this method."); } - return this.ingestionTransporter.push( - indexName, - new PushTaskPayload().setAction(com.algolia.model.ingestion.Action.ADD_OBJECT).setRecords(this.objectsToPushTaskRecords(objects)), - waitForTasks, - null, - requestOptions - ); -} - -private List objectsToPushTaskRecords(Iterable objects) { - try { - ObjectMapper mapper = new ObjectMapper(); - String json = mapper.writeValueAsString(objects); - - return mapper.readValue(json, new TypeReference>() {}); - } catch (Exception e) { - throw new AlgoliaRuntimeException( - "each object must have an `objectID` key in order to be used with the" + " WithTransformation methods" - ); - } + return this.ingestionTransporter.chunkedPush(indexName, objects, com.algolia.model.ingestion.Action.ADD_OBJECT, waitForTasks, batchSize, null, requestOptions); } /** @@ -891,7 +975,7 @@ public List deleteObjects(String indexName, List objectID * @param createIfNotExists To be provided if non-existing objects are passed, otherwise, the call * will fail. */ -public WatchResponse partialUpdateObjectsWithTransformation(String indexName, Iterable objects, boolean createIfNotExists) { +public List partialUpdateObjectsWithTransformation(String indexName, Iterable objects, boolean createIfNotExists) { return partialUpdateObjectsWithTransformation(indexName, objects, createIfNotExists, false, null); } @@ -909,7 +993,7 @@ public WatchResponse partialUpdateObjectsWithTransformation(String indexName * processed, this operation may slow the total execution time of this method but is more * reliable. */ -public WatchResponse partialUpdateObjectsWithTransformation( +public List partialUpdateObjectsWithTransformation( String indexName, Iterable objects, boolean createIfNotExists, @@ -934,7 +1018,7 @@ public WatchResponse partialUpdateObjectsWithTransformation( * @param requestOptions The requestOptions to send along with the query, they will be merged with * the transporter requestOptions. (optional) */ -public WatchResponse partialUpdateObjectsWithTransformation( +public List partialUpdateObjectsWithTransformation( String indexName, Iterable objects, boolean createIfNotExists, @@ -962,7 +1046,7 @@ public WatchResponse partialUpdateObjectsWithTransformation( * @param requestOptions The requestOptions to send along with the query, they will be merged with * the transporter requestOptions. (optional) */ -public WatchResponse partialUpdateObjectsWithTransformation( +public List partialUpdateObjectsWithTransformation( String indexName, Iterable objects, boolean createIfNotExists, @@ -974,19 +1058,15 @@ public WatchResponse partialUpdateObjectsWithTransformation( throw new AlgoliaRuntimeException("`setTransformationRegion` must have been called before calling this method."); } - return this.ingestionTransporter.push( - indexName, - new PushTaskPayload() - .setAction( - createIfNotExists - ? com.algolia.model.ingestion.Action.PARTIAL_UPDATE_OBJECT - : com.algolia.model.ingestion.Action.PARTIAL_UPDATE_OBJECT_NO_CREATE - ) - .setRecords(this.objectsToPushTaskRecords(objects)), - waitForTasks, - null, - requestOptions - ); + return this.ingestionTransporter.chunkedPush( + indexName, + objects, + createIfNotExists ? com.algolia.model.ingestion.Action.PARTIAL_UPDATE_OBJECT : com.algolia.model.ingestion.Action.PARTIAL_UPDATE_OBJECT_NO_CREATE, + waitForTasks, + batchSize, + null, + requestOptions + ); } /** diff --git a/templates/javascript/clients/algoliasearch/builds/definition.mustache b/templates/javascript/clients/algoliasearch/builds/definition.mustache index d15b7ec5f57..69591890cef 100644 --- a/templates/javascript/clients/algoliasearch/builds/definition.mustache +++ b/templates/javascript/clients/algoliasearch/builds/definition.mustache @@ -10,7 +10,7 @@ import type { {{#lambda.titlecase}}{{{dependencyName}}}{{/lambda.titlecase}}Clie import type { ChunkedBatchOptions, ReplaceAllObjectsOptions, ReplaceAllObjectsWithTransformationResponse } from '@algolia/client-search'; import type { PartialUpdateObjectsOptions, SaveObjectsOptions } from '@algolia/client-search'; -import type { PushTaskRecords, WatchResponse } from '@algolia/ingestion'; +import type { WatchResponse } from '@algolia/ingestion'; import type { InitClientOptions, @@ -44,7 +44,7 @@ export type Algoliasearch = SearchClient & { * @param saveObjects.waitForTasks - Whether or not we should wait until every `batch` tasks has been processed, this operation may slow the total execution time of this method but is more reliable. * @param requestOptions - The requestOptions to send along with the query, they will be forwarded to the `batch` method and merged with the transporter requestOptions. */ - saveObjectsWithTransformation: (options: SaveObjectsOptions, requestOptions?: RequestOptions | undefined) => Promise; + saveObjectsWithTransformation: (options: SaveObjectsOptions, requestOptions?: RequestOptions | undefined) => Promise>; /** * Helper: Similar to the `partialUpdateObjects` method but requires a Push connector (https://www.algolia.com/doc/guides/sending-and-managing-data/send-and-update-your-data/connectors/push/) to be created first, in order to transform records before indexing them to Algolia. The `region` must have been passed to the client instantiation method. @@ -58,7 +58,7 @@ export type Algoliasearch = SearchClient & { * @param partialUpdateObjects.waitForTasks - Whether or not we should wait until every `batch` tasks has been processed, this operation may slow the total execution time of this method but is more reliable. * @param requestOptions - The requestOptions to send along with the query, they will be forwarded to the `getTask` method and merged with the transporter requestOptions. */ - partialUpdateObjectsWithTransformation: (options: PartialUpdateObjectsOptions, requestOptions?: RequestOptions | undefined) => Promise; + partialUpdateObjectsWithTransformation: (options: PartialUpdateObjectsOptions, requestOptions?: RequestOptions | undefined) => Promise>; /** * Helper: Similar to the `replaceAllObjects` method but requires a Push connector (https://www.algolia.com/doc/guides/sending-and-managing-data/send-and-update-your-data/connectors/push/) to be created first, in order to transform records before indexing them to Algolia. The `region` must have been passed to the client instantiation method. @@ -75,20 +75,6 @@ export type Algoliasearch = SearchClient & { options: ReplaceAllObjectsOptions, requestOptions?: RequestOptions | undefined, ) => Promise; - - /** - * Helper: Chunks the given `objects` list in subset of 1000 elements max in order to make it fit in `push` requests by leveraging the Transformation pipeline setup in the Push connector (https://www.algolia.com/doc/guides/sending-and-managing-data/send-and-update-your-data/connectors/push/). - * - * @summary Helper: Chunks the given `objects` list in subset of 1000 elements max in order to make it fit in `batch` requests. - * @param chunkedPush - The `chunkedPush` object. - * @param chunkedPush.indexName - The `indexName` to replace `objects` in. - * @param chunkedPush.objects - The array of `objects` to store in the given Algolia `indexName`. - * @param chunkedPush.action - The `batch` `action` to perform on the given array of `objects`, defaults to `addObject`. - * @param chunkedPush.waitForTasks - Whether or not we should wait until every `batch` tasks has been processed, this operation may slow the total execution time of this method but is more reliable. - * @param chunkedPush.batchSize - The size of the chunk of `objects`. The number of `batch` calls will be equal to `length(objects) / batchSize`. Defaults to 1000. - * @param requestOptions - The requestOptions to send along with the query, they will be forwarded to the `getEvent` method and merged with the transporter requestOptions. - */ - chunkedPush: (options: ChunkedBatchOptions, requestOptions?: RequestOptions) => Promise>; }; export type TransformationOptions = { @@ -127,7 +113,7 @@ export function algoliasearch( return { ...client, - async saveObjectsWithTransformation({ indexName, objects, waitForTasks }, requestOptions): Promise { + async saveObjectsWithTransformation({ indexName, objects, waitForTasks }, requestOptions): Promise> { if (!ingestionTransporter) { throw new Error('`transformation.region` must be provided at client instantiation before calling this method.'); } @@ -136,47 +122,12 @@ export function algoliasearch( throw new Error('`region` must be provided when leveraging the transformation pipeline'); } - return ingestionTransporter?.push( - { - indexName, - watch: waitForTasks, - pushTaskPayload: { - action: 'addObject', - records: objects as PushTaskRecords[], - }, - }, - requestOptions, - ); + return ingestionTransporter.chunkedPush({ indexName, objects, action: 'addObject', waitForTasks }, requestOptions); }, async partialUpdateObjectsWithTransformation( { indexName, objects, createIfNotExists, waitForTasks }, requestOptions, - ): Promise { - if (!ingestionTransporter) { - throw new Error('`transformation.region` must be provided at client instantiation before calling this method.'); - } - - if (!options?.transformation?.region) { - throw new Error('`region` must be provided when leveraging the transformation pipeline'); - } - - return ingestionTransporter?.push( - { - indexName, - watch: waitForTasks, - pushTaskPayload: { - action: createIfNotExists ? 'partialUpdateObject' : 'partialUpdateObjectNoCreate', - records: objects as PushTaskRecords[], - }, - }, - requestOptions, - ); - }, - - async chunkedPush( - { indexName, objects, action = 'addObject', waitForTasks, batchSize = 1000 }: ChunkedBatchOptions, - requestOptions?: RequestOptions, ): Promise> { if (!ingestionTransporter) { throw new Error('`transformation.region` must be provided at client instantiation before calling this method.'); @@ -186,57 +137,7 @@ export function algoliasearch( throw new Error('`region` must be provided when leveraging the transformation pipeline'); } - let records: Array = []; - const responses: Array = []; - - const objectEntries = objects.entries(); - for (const [i, obj] of objectEntries) { - records.push(obj as PushTaskRecords); - if (records.length === batchSize || i === objects.length - 1) { - responses.push( - await ingestionTransporter.push( - { indexName, pushTaskPayload: { action, records }, watch: waitForTasks }, - requestOptions, - ), - ); - records = []; - } - } - - let retryCount = 0; - - if (waitForTasks) { - for (const resp of responses) { - if (!resp.eventID) { - throw new Error('received unexpected response from the push endpoint, eventID must not be undefined'); - } - - await createIterablePromise({ - func: async () => { - if (resp.eventID === undefined || !resp.eventID) { - throw new Error('received unexpected response from the push endpoint, eventID must not be undefined'); - } - - return ingestionTransporter.getEvent({ runID: resp.runID, eventID: resp.eventID }).catch((error: ApiError) => { - if (error.status === 404) { - return undefined; - } - - throw error; - }) - }, - validate: (response) => response !== undefined, - aggregator: () => (retryCount += 1), - error: { - validate: () => retryCount >= 50, - message: () => `The maximum number of retries exceeded. (${retryCount}/${50})`, - }, - timeout: (): number => Math.min(retryCount * 500, 5000), - }); - } - } - - return responses; + return ingestionTransporter.chunkedPush({ indexName, objects, action: createIfNotExists ? 'partialUpdateObject' : 'partialUpdateObjectNoCreate', waitForTasks }, requestOptions); }, async replaceAllObjectsWithTransformation( @@ -271,8 +172,8 @@ export function algoliasearch( requestOptions, ); - const watchResponses = await this.chunkedPush( - { indexName: tmpIndexName, objects, waitForTasks: true, batchSize }, + const watchResponses = await ingestionTransporter.chunkedPush( + { indexName: tmpIndexName, objects, waitForTasks: true, batchSize, referenceIndexName: indexName }, requestOptions, ); diff --git a/templates/javascript/clients/api-single.mustache b/templates/javascript/clients/api-single.mustache index e4ff19d3ab3..41b210f5d85 100644 --- a/templates/javascript/clients/api-single.mustache +++ b/templates/javascript/clients/api-single.mustache @@ -7,7 +7,6 @@ export const apiClientVersion = '{{packageVersion}}'; {{#operations}} {{> client/api/hosts}} - {{#isIngestionClient}} {{> client/api/guards}} {{/isIngestionClient}} @@ -93,6 +92,9 @@ export function create{{#lambda.titlecase}}{{clientName}}{{/lambda.titlecase}}({ } }, + {{#isIngestionClient}} + {{> client/api/ingestionHelpers}} + {{/isIngestionClient}} {{#isSearchClient}} {{> client/api/helpers}} {{/isSearchClient}} diff --git a/templates/javascript/clients/client/api/imports.mustache b/templates/javascript/clients/client/api/imports.mustache index e26313348d9..a1893408be2 100644 --- a/templates/javascript/clients/client/api/imports.mustache +++ b/templates/javascript/clients/client/api/imports.mustache @@ -12,6 +12,10 @@ import { {{#isCompositionFullClient}} createIterablePromise, {{/isCompositionFullClient}} + {{#isIngestionClient}} + createIterablePromise, + ApiError, + {{/isIngestionClient}} } from '@algolia/client-common'; import type { CreateClientOptions, @@ -31,6 +35,9 @@ import type { {{classname}} } from '{{filename}}'; {{#operations}} import type { + {{#isIngestionClient}} + ChunkedPushOptions, + {{/isIngestionClient}} {{#isSearchClient}} BrowseOptions, ChunkedBatchOptions, @@ -76,4 +83,5 @@ import type { import type { SubscriptionTrigger } from '../model/subscriptionTrigger'; import type { TaskCreateTrigger } from '../model/taskCreateTrigger'; import type { Trigger } from '../model/trigger'; + import type { PushTaskRecords } from '../model/pushTaskRecords'; {{/isIngestionClient}} \ No newline at end of file diff --git a/templates/javascript/clients/client/api/ingestionHelpers.mustache b/templates/javascript/clients/client/api/ingestionHelpers.mustache new file mode 100644 index 00000000000..ad9ff089179 --- /dev/null +++ b/templates/javascript/clients/client/api/ingestionHelpers.mustache @@ -0,0 +1,69 @@ +/** + * Helper: Chunks the given `objects` list in subset of 1000 elements max in order to make it fit in `push` requests by leveraging the Transformation pipeline setup in the Push connector (https://www.algolia.com/doc/guides/sending-and-managing-data/send-and-update-your-data/connectors/push/). + * + * @summary Helper: Chunks the given `objects` list in subset of 1000 elements max in order to make it fit in `batch` requests. + * @param chunkedPush - The `chunkedPush` object. + * @param chunkedPush.indexName - The `indexName` to replace `objects` in. + * @param chunkedPush.objects - The array of `objects` to store in the given Algolia `indexName`. + * @param chunkedPush.action - The `batch` `action` to perform on the given array of `objects`, defaults to `addObject`. + * @param chunkedPush.waitForTasks - Whether or not we should wait until every `batch` tasks has been processed, this operation may slow the total execution time of this method but is more reliable. + * @param chunkedPush.batchSize - The size of the chunk of `objects`. The number of `batch` calls will be equal to `length(objects) / batchSize`. Defaults to 1000. + * @param chunkedPush.referenceIndexName - This is required when targeting an index that does not have a push connector setup (e.g. a tmp index), but you wish to attach another index's transformation to it (e.g. the source index name). + * @param requestOptions - The requestOptions to send along with the query, they will be forwarded to the `getEvent` method and merged with the transporter requestOptions. + */ +async chunkedPush( + { indexName, objects, action = 'addObject', waitForTasks, batchSize = 1000, referenceIndexName }: ChunkedPushOptions, + requestOptions?: RequestOptions, +): Promise> { + let records: Array = []; + const responses: Array = []; + + const objectEntries = objects.entries(); + for (const [i, obj] of objectEntries) { + records.push(obj as PushTaskRecords); + if (records.length === batchSize || i === objects.length - 1) { + responses.push( + await this.push( + { indexName, pushTaskPayload: { action, records }, referenceIndexName }, + requestOptions, + ), + ); + records = []; + } + } + + let retryCount = 0; + + if (waitForTasks) { + for (const resp of responses) { + if (!resp.eventID) { + throw new Error('received unexpected response from the push endpoint, eventID must not be undefined'); + } + + await createIterablePromise({ + func: async () => { + if (resp.eventID === undefined || !resp.eventID) { + throw new Error('received unexpected response from the push endpoint, eventID must not be undefined'); + } + + return this.getEvent({ runID: resp.runID, eventID: resp.eventID }).catch((error: ApiError) => { + if (error.status === 404) { + return undefined; + } + + throw error; + }) + }, + validate: (response) => response !== undefined, + aggregator: () => (retryCount += 1), + error: { + validate: () => retryCount >= 50, + message: () => `The maximum number of retries exceeded. (${retryCount}/${50})`, + }, + timeout: (): number => Math.min(retryCount * 500, 5000), + }); + } + } + + return responses; +}, diff --git a/templates/javascript/clients/client/model/clientMethodProps.mustache b/templates/javascript/clients/client/model/clientMethodProps.mustache index 8400fc8e3bf..a004582c24d 100644 --- a/templates/javascript/clients/client/model/clientMethodProps.mustache +++ b/templates/javascript/clients/client/model/clientMethodProps.mustache @@ -236,5 +236,38 @@ export type WaitForCompositionTaskOptions = { compositionID: string; }; {{/isCompositionFullClient}} +{{#isIngestionClient}} +export type ChunkedPushOptions = { + /** + * The `indexName` to replace `objects` in. + */ + indexName: string; + + /** + * The `batch` `action` to perform on the given array of `objects`, defaults to `addObject`. + */ + action?: Action | undefined; + + /** + * Whether or not we should wait until every `batch` tasks has been processed, this operation may slow the total execution time of this method but is more reliable. + */ + waitForTasks?: boolean | undefined; + + /** + * The size of the chunk of `objects`. The number of `batch` calls will be equal to `length(objects) / batchSize`. Defaults to 1000. + */ + batchSize?: number | undefined; + + /** + * This is required when targeting an index that does not have a push connector setup (e.g. a tmp index), but you wish to attach another index's transformation to it (e.g. the source index name). + */ + referenceIndexName?: string | undefined; + + /** + * The array of `objects` to store in the given Algolia `indexName`. + */ + objects: Array>; +} +{{/isIngestionClient}} {{/apiInfo.apis.0}} \ No newline at end of file diff --git a/templates/php/api.mustache b/templates/php/api.mustache index 3609586c56e..3e295e78197 100644 --- a/templates/php/api.mustache +++ b/templates/php/api.mustache @@ -313,6 +313,79 @@ use Algolia\AlgoliaSearch\Exceptions\NotFoundException; {{/operation}} + {{#isIngestionClient}} + /** + * Helper: Chunks the given `objects` list in subset of 1000 elements max in order to make it fit in `batch` requests. + * + * @param string $indexName the `indexName` to replace `objects` in + * @param array $objects the array of `objects` to store in the given Algolia `indexName` + * @param array $action the `batch` `action` to perform on the given array of `objects`, defaults to `addObject` + * @param bool $waitForTasks whether or not we should wait until every `batch` tasks has been processed, this operation may slow the total execution time of this method but is more reliable + * @param array $batchSize The size of the chunk of `objects`. The number of `batch` calls will be equal to `length(objects) / batchSize`. Defaults to 1000. + * @param array $referenceIndexName This is required when targeting an index that does not have a push connector setup (e.g. a tmp index), but you wish to attach another index's transformation to it (e.g. the source index name). + * @param array $requestOptions Request options + */ + public function chunkedPush( + $indexName, + $objects, + $action = 'addObject', + $waitForTasks = true, + $referenceIndexName = null, + $batchSize = 1000, + $requestOptions = [] + ) { + $responses = []; + $records = []; + $count = 0; + + foreach ($objects as $object) { + $records[] = $object; + + if (sizeof($records) === $batchSize || $count === sizeof($objects) - 1) { + $responses[] = $this->push($indexName, ['action' => $action, 'records' => $records], false, $referenceIndexName, $requestOptions); + $records = []; + } + + ++$count; + } + + if (!empty($records)) { + $responses[] = $this->push($indexName, ['action' => $action, 'records' => $records], false, $referenceIndexName, $requestOptions); + } + + if ($waitForTasks && !empty($responses)) { + $timeoutCalculation = 'Algolia\AlgoliaSearch\Support\Helpers::linearTimeout'; + + foreach ($responses as $response) { + $this->waitForTask($indexName, $response['taskID']); + $retry = 0; + + while ($retry < 50) { + try { + $resp = $this->getEvent($response->runID, $response->eventID); + + break; + } catch (NotFoundException $e) { + if (404 === $e->getCode()) { + return null; + } + } + + ++$retry; + usleep( + call_user_func_array($timeoutCalculation, [$this->config->getWaitTaskTimeBeforeRetry(), $retry]) + ); + } + + throw new ExceededRetriesException('Maximum number of retries (50) exceeded.'); + } + } + + return $responses; + } + + {{/isIngestionClient}} + {{#isSearchClient}} /** * Wait for a task to complete with `indexName` and `taskID`. @@ -555,11 +628,11 @@ use Algolia\AlgoliaSearch\Exceptions\NotFoundException; */ public function saveObjectsWithTransformation($indexName, $objects, $waitForTasks = false, $batchSize = 1000, $requestOptions = []) { - if (null == $this->ingestionTransporter) { - throw new \InvalidArgumentException('`setTransformationRegion` must have been called before calling this method.'); - } + if (null == $this->ingestionTransporter) { + throw new \InvalidArgumentException('`setTransformationRegion` must have been called before calling this method.'); + } - return $this->ingestionTransporter->push($indexName, ['action'=>'addObject', 'records'=>$objects], $waitForTasks, $requestOptions); + return $this->ingestionTransporter->chunkedPush($indexName, $objects, 'addObject', $waitForTasks, $batchSize, $requestOptions); } /** @@ -611,11 +684,11 @@ use Algolia\AlgoliaSearch\Exceptions\NotFoundException; */ public function partialUpdateObjectsWithTransformation($indexName, $objects, $createIfNotExists, $waitForTasks = false, $batchSize = 1000, $requestOptions = []) { - if (null == $this->ingestionTransporter) { - throw new \InvalidArgumentException('`setTransformationRegion` must have been called before calling this method.'); - } + if (null == $this->ingestionTransporter) { + throw new \InvalidArgumentException('`setTransformationRegion` must have been called before calling this method.'); + } - return $this->ingestionTransporter->push($indexName, ['action'=>(true == $createIfNotExists) ? 'partialUpdateObject' : 'partialUpdateObjectNoCreate', 'records'=>$objects], $waitForTasks, $requestOptions); + return $this->ingestionTransporter->chunkedPush($indexName, $objects, ($createIfNotExists == TRUE) ? 'partialUpdateObject' : 'partialUpdateObjectNoCreate', $waitForTasks, $batchSize, $requestOptions); } /** diff --git a/templates/python/api.mustache b/templates/python/api.mustache index 2d465fabd2a..7b77864e1e8 100644 --- a/templates/python/api.mustache +++ b/templates/python/api.mustache @@ -12,10 +12,14 @@ from algoliasearch.search.models import ( SecuredApiKeyRestrictions, ) -from algoliasearch.ingestion.models import WatchResponse +from algoliasearch.ingestion.models import (WatchResponse) from algoliasearch.ingestion.config import IngestionConfig from algoliasearch.ingestion.client import (IngestionClient, IngestionClientSync) +from algoliasearch.ingestion.models import Action as IngestionAction {{/isSearchClient}} +{{#isIngestionClient}} +from algoliasearch.ingestion.models import (Action, WatchResponse, Event, PushTaskRecords) +{{/isIngestionClient}} {{#operations}}{{#operation}}{{#imports}} from algoliasearch.{{packageName}}.models import {{{.}}} @@ -130,6 +134,9 @@ class {{classname}}{{#isSyncClient}}Sync{{/isSyncClient}}: {{#isSearchClient}} {{> search_helpers}} {{/isSearchClient}} + {{#isIngestionClient}} +{{> ingestion_helpers}} + {{/isIngestionClient}} {{#operation}} diff --git a/templates/python/ingestion_helpers.mustache b/templates/python/ingestion_helpers.mustache new file mode 100644 index 00000000000..9ce9b4a4594 --- /dev/null +++ b/templates/python/ingestion_helpers.mustache @@ -0,0 +1,64 @@ + {{^isSyncClient}}async {{/isSyncClient}}def chunked_push( + self, + index_name: str, + objects: List[Dict[str, Any]], + action: Action = Action.ADDOBJECT, + wait_for_tasks: bool = False, + batch_size: int = 1000, + reference_index_name: Optional[str] = None, + request_options: Optional[Union[dict, RequestOptions]] = None, + ) -> List[WatchResponse]: + """ + Helper: Chunks the given `objects` list in subset of 1000 elements max in order to make it fit in `push` requests by leveraging the Transformation pipeline setup in the Push connector (https://www.algolia.com/doc/guides/sending-and-managing-data/send-and-update-your-data/connectors/push/). + """ + records: List[PushTaskRecords] = [] + responses: List[WatchResponse] = [] + for i, obj in enumerate(objects): + records.append(obj) # pyright: ignore + if len(records) == batch_size or i == len(objects) - 1: + responses.append( + {{^isSyncClient}}await {{/isSyncClient}}self.push( + index_name=index_name, + push_task_payload={ + "action": action, + "records": records, + }, + reference_index_name=reference_index_name, + request_options=request_options, + ) + ) + requests = [] + if wait_for_tasks: + for response in responses: + {{^isSyncClient}}async {{/isSyncClient}}def _func(_: Optional[Event]) -> Event: + if response.event_id is None: + raise ValueError( + "received unexpected response from the push endpoint, eventID must not be undefined" + ) + try: + return {{^isSyncClient}}await {{/isSyncClient}}self.get_event(run_id=response.run_id, event_id=response.event_id, request_options=request_options) + except RequestException as e: + if e.status_code == 404: + return None # pyright: ignore + raise e + + _retry_count = 0 + + def _aggregator(_: Event | None) -> None: + nonlocal _retry_count + _retry_count += 1 + + def _validate(_resp: Event | None) -> bool: + return _resp is not None + + timeout = RetryTimeout() + + {{^isSyncClient}}await {{/isSyncClient}}create_iterable{{#isSyncClient}}_sync{{/isSyncClient}}( + func=_func, + validate=_validate, + aggregator=_aggregator, + timeout=lambda: timeout(_retry_count), + error_validate=lambda _: _retry_count >= 50, + error_message=lambda _: f"The maximum number of retries exceeded. (${_retry_count}/${50})", + ) + return responses diff --git a/templates/python/search_helpers.mustache b/templates/python/search_helpers.mustache index 19a41ef269c..eb4a7b65382 100644 --- a/templates/python/search_helpers.mustache +++ b/templates/python/search_helpers.mustache @@ -74,7 +74,7 @@ "`apiKey` is required when waiting for an `update` operation." ) - {{^isSyncClient}}async {{/isSyncClient}}def _func(_prev: Optional[GetApiKeyResponse]) -> GetApiKeyResponse: + {{^isSyncClient}}async {{/isSyncClient}}def _func(_: Optional[GetApiKeyResponse]) -> GetApiKeyResponse: try: return {{^isSyncClient}}await {{/isSyncClient}}self.get_api_key(key=key, request_options=request_options) except RequestException as e: @@ -209,7 +209,7 @@ page = search_synonyms_params.page or 0 search_synonyms_params.hits_per_page = hits_per_page - {{^isSyncClient}}async {{/isSyncClient}}def _func(_prev: Optional[SearchSynonymsResponse]) -> SearchSynonymsResponse: + {{^isSyncClient}}async {{/isSyncClient}}def _func(_: Optional[SearchSynonymsResponse]) -> SearchSynonymsResponse: nonlocal page resp = {{^isSyncClient}}await {{/isSyncClient}}self.search_synonyms( index_name=index_name, @@ -299,23 +299,13 @@ wait_for_tasks: bool = False, batch_size: int = 1000, request_options: Optional[Union[dict, RequestOptions]] = None, - ) -> WatchResponse: + ) -> List[WatchResponse]: """ Helper: Similar to the `save_objects` method but requires a Push connector (https://www.algolia.com/doc/guides/sending-and-managing-data/send-and-update-your-data/connectors/push/) to be created first, in order to transform records before indexing them to Algolia. The `region` must've been passed to the client's config at instantiation. """ if self._ingestion_transporter is None: raise ValueError("`region` must be provided at client instantiation before calling this method.") - - return {{^isSyncClient}}await {{/isSyncClient}}self._ingestion_transporter.push( - index_name=index_name, - push_task_payload={ - "action": Action.ADDOBJECT, - "records": objects, - }, - watch=wait_for_tasks, - request_options=request_options, - ) - + return {{^isSyncClient}}await {{/isSyncClient}}self._ingestion_transporter.chunked_push(index_name=index_name, objects=objects, action=IngestionAction.ADDOBJECT, wait_for_tasks=wait_for_tasks, batch_size=batch_size, request_options=request_options) {{^isSyncClient}}async {{/isSyncClient}}def delete_objects( self, @@ -352,23 +342,13 @@ wait_for_tasks: bool = False, batch_size: int = 1000, request_options: Optional[Union[dict, RequestOptions]] = None, - ) -> WatchResponse: + ) -> List[WatchResponse]: """ Helper: Similar to the `partial_update_objects` method but requires a Push connector (https://www.algolia.com/doc/guides/sending-and-managing-data/send-and-update-your-data/connectors/push/) to be created first, in order to transform records before indexing them to Algolia. The `region` must've been passed to the client instantiation method. """ if self._ingestion_transporter is None: raise ValueError("`region` must be provided at client instantiation before calling this method.") - - return {{^isSyncClient}}await {{/isSyncClient}}self._ingestion_transporter.push( - index_name=index_name, - push_task_payload={ - "action": Action.PARTIALUPDATEOBJECT if create_if_not_exists else Action.PARTIALUPDATEOBJECTNOCREATE, - "records": objects, - }, - watch=wait_for_tasks, - request_options=request_options, - ) - + return {{^isSyncClient}}await {{/isSyncClient}}self._ingestion_transporter.chunked_push(index_name=index_name, objects=objects, action=IngestionAction.PARTIALUPDATEOBJECT if create_if_not_exists else IngestionAction.PARTIALUPDATEOBJECTNOCREATE, wait_for_tasks=wait_for_tasks, batch_size=batch_size, request_options=request_options) {{^isSyncClient}}async {{/isSyncClient}}def chunked_batch( self, diff --git a/tests/CTS/client/search/partialUpdateObjectsWithTransformation.json b/tests/CTS/client/search/partialUpdateObjectsWithTransformation.json index 4383129ff08..e9ec529ddcc 100644 --- a/tests/CTS/client/search/partialUpdateObjectsWithTransformation.json +++ b/tests/CTS/client/search/partialUpdateObjectsWithTransformation.json @@ -35,12 +35,12 @@ }, "expected": { "type": "response", - "match": { + "match": [{ "runID": "b1b7a982-524c-40d2-bb7f-48aab075abda", "eventID": "113b2068-6337-4c85-b5c2-e7b213d82925", "message": "OK", "createdAt": "2022-05-12T06:24:30.049Z" - } + }] } } ] diff --git a/tests/CTS/client/search/saveObjectsWithTransformation.json b/tests/CTS/client/search/saveObjectsWithTransformation.json index 51d616b0b6d..afafc6621ff 100644 --- a/tests/CTS/client/search/saveObjectsWithTransformation.json +++ b/tests/CTS/client/search/saveObjectsWithTransformation.json @@ -34,12 +34,12 @@ }, "expected": { "type": "response", - "match": { + "match": [{ "runID": "b1b7a982-524c-40d2-bb7f-48aab075abda", "eventID": "113b2068-6337-4c85-b5c2-e7b213d82925", "message": "OK", "createdAt": "2022-05-12T06:24:30.049Z" - } + }] } } ] From 2eec2ac2596f860c4abe66dd65d2165b25676344 Mon Sep 17 00:00:00 2001 From: shortcuts Date: Wed, 18 Jun 2025 16:38:31 +0200 Subject: [PATCH 2/2] chore: description --- templates/java/api_helpers.mustache | 2 +- .../clients/algoliasearch/builds/definition.mustache | 4 ++-- templates/php/api.mustache | 6 +++--- 3 files changed, 6 insertions(+), 6 deletions(-) diff --git a/templates/java/api_helpers.mustache b/templates/java/api_helpers.mustache index f05a67645f6..294d7662bdc 100644 --- a/templates/java/api_helpers.mustache +++ b/templates/java/api_helpers.mustache @@ -31,7 +31,7 @@ private List objectsToPushTaskRecords(Iterable objects) * connector setup (e.g. a tmp index), but you wish to attach another index's transformation * to it (e.g. the source index name). * @param requestOptions - The requestOptions to send along with the query, they will be forwarded - * to the `getTask` method and merged with the transporter requestOptions. + * to the `getEvent` method and merged with the transporter requestOptions. */ public List chunkedPush( String indexName, diff --git a/templates/javascript/clients/algoliasearch/builds/definition.mustache b/templates/javascript/clients/algoliasearch/builds/definition.mustache index 69591890cef..36b9b9bf3cd 100644 --- a/templates/javascript/clients/algoliasearch/builds/definition.mustache +++ b/templates/javascript/clients/algoliasearch/builds/definition.mustache @@ -42,7 +42,7 @@ export type Algoliasearch = SearchClient & { * @param saveObjects.objects - The array of `objects` to store in the given Algolia `indexName`. * @param saveObjects.batchSize - The size of the chunk of `objects`. The number of `batch` calls will be equal to `length(objects) / batchSize`. Defaults to 1000. * @param saveObjects.waitForTasks - Whether or not we should wait until every `batch` tasks has been processed, this operation may slow the total execution time of this method but is more reliable. - * @param requestOptions - The requestOptions to send along with the query, they will be forwarded to the `batch` method and merged with the transporter requestOptions. + * @param requestOptions - The requestOptions to send along with the query, they will be forwarded to the `push` method and merged with the transporter requestOptions. */ saveObjectsWithTransformation: (options: SaveObjectsOptions, requestOptions?: RequestOptions | undefined) => Promise>; @@ -56,7 +56,7 @@ export type Algoliasearch = SearchClient & { * @param partialUpdateObjects.createIfNotExists - To be provided if non-existing objects are passed, otherwise, the call will fail.. * @param partialUpdateObjects.batchSize - The size of the chunk of `objects`. The number of `batch` calls will be equal to `length(objects) / batchSize`. Defaults to 1000. * @param partialUpdateObjects.waitForTasks - Whether or not we should wait until every `batch` tasks has been processed, this operation may slow the total execution time of this method but is more reliable. - * @param requestOptions - The requestOptions to send along with the query, they will be forwarded to the `getTask` method and merged with the transporter requestOptions. + * @param requestOptions - The requestOptions to send along with the query, they will be forwarded to the `push` method and merged with the transporter requestOptions. */ partialUpdateObjectsWithTransformation: (options: PartialUpdateObjectsOptions, requestOptions?: RequestOptions | undefined) => Promise>; diff --git a/templates/php/api.mustache b/templates/php/api.mustache index 3e295e78197..a5a253e59fd 100644 --- a/templates/php/api.mustache +++ b/templates/php/api.mustache @@ -321,7 +321,7 @@ use Algolia\AlgoliaSearch\Exceptions\NotFoundException; * @param array $objects the array of `objects` to store in the given Algolia `indexName` * @param array $action the `batch` `action` to perform on the given array of `objects`, defaults to `addObject` * @param bool $waitForTasks whether or not we should wait until every `batch` tasks has been processed, this operation may slow the total execution time of this method but is more reliable - * @param array $batchSize The size of the chunk of `objects`. The number of `batch` calls will be equal to `length(objects) / batchSize`. Defaults to 1000. + * @param array $batchSize The size of the chunk of `objects`. The number of `push` calls will be equal to `length(objects) / batchSize`. Defaults to 1000. * @param array $referenceIndexName This is required when targeting an index that does not have a push connector setup (e.g. a tmp index), but you wish to attach another index's transformation to it (e.g. the source index name). * @param array $requestOptions Request options */ @@ -623,7 +623,7 @@ use Algolia\AlgoliaSearch\Exceptions\NotFoundException; * @param string $indexName the `indexName` to replace `objects` in * @param array $objects the array of `objects` to store in the given Algolia `indexName` * @param bool $waitForTasks Whether or not we should wait until every `batch` tasks has been processed, this operation may slow the total execution time of this method but is more reliable - * @param array $batchSize The size of the chunk of `objects`. The number of `batch` calls will be equal to `length(objects) / batchSize`. Defaults to 1000. + * @param array $batchSize The size of the chunk of `objects`. The number of `push` calls will be equal to `length(objects) / batchSize`. Defaults to 1000. * @param array $requestOptions Request options */ public function saveObjectsWithTransformation($indexName, $objects, $waitForTasks = false, $batchSize = 1000, $requestOptions = []) @@ -679,7 +679,7 @@ use Algolia\AlgoliaSearch\Exceptions\NotFoundException; * @param array $objects the array of `objects` to store in the given Algolia `indexName` * @param bool $createIfNotExists To be provided if non-existing objects are passed, otherwise, the call will fail.. * @param bool $waitForTasks Whether or not we should wait until every `batch` tasks has been processed, this operation may slow the total execution time of this method but is more reliable - * @param array $batchSize The size of the chunk of `objects`. The number of `batch` calls will be equal to `length(objects) / batchSize`. Defaults to 1000. + * @param array $batchSize The size of the chunk of `objects`. The number of `push` calls will be equal to `length(objects) / batchSize`. Defaults to 1000. * @param array $requestOptions Request options */ public function partialUpdateObjectsWithTransformation($indexName, $objects, $createIfNotExists, $waitForTasks = false, $batchSize = 1000, $requestOptions = [])