Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -168,86 +168,6 @@ describe('api', () => {
}),
).rejects.toThrow('`transformation.region` must be provided at client instantiation before calling this method.');
});

test('exposes the transformation methods at the root of the client', async () => {
const ingestionClient = algoliasearch('APP_ID', 'API_KEY', {
requester: browserEchoRequester(),
transformation: { region: 'us' },
});

expect(ingestionClient.saveObjectsWithTransformation).not.toBeUndefined();

let res = (await ingestionClient.saveObjectsWithTransformation({
indexName: 'foo',
objects: [{ objectID: 'bar', baz: 42 }],
waitForTasks: true,
})) as unknown as EchoResponse;

expect(res.headers).toEqual(
expect.objectContaining({
'x-algolia-application-id': 'APP_ID',
'x-algolia-api-key': 'API_KEY',
}),
);
expect(res.url.startsWith('https://data.us.algolia.com/1/push/foo?watch=true')).toBeTruthy();
expect(res.data).toEqual({
action: 'addObject',
records: [
{
baz: 42,
objectID: 'bar',
},
],
});
expect(ingestionClient.partialUpdateObjectsWithTransformation).not.toBeUndefined();

res = (await ingestionClient.partialUpdateObjectsWithTransformation({
indexName: 'foo',
objects: [{ objectID: 'bar', baz: 42 }],
waitForTasks: true,
createIfNotExists: true,
})) as unknown as EchoResponse;

expect(res.headers).toEqual(
expect.objectContaining({
'x-algolia-application-id': 'APP_ID',
'x-algolia-api-key': 'API_KEY',
}),
);
expect(res.url.startsWith('https://data.us.algolia.com/1/push/foo?watch=true')).toBeTruthy();
expect(res.data).toEqual({
action: 'partialUpdateObject',
records: [
{
baz: 42,
objectID: 'bar',
},
],
});

res = (await ingestionClient.partialUpdateObjectsWithTransformation({
indexName: 'foo',
objects: [{ objectID: 'bar', baz: 42 }],
waitForTasks: true,
})) as unknown as EchoResponse;

expect(res.headers).toEqual(
expect.objectContaining({
'x-algolia-application-id': 'APP_ID',
'x-algolia-api-key': 'API_KEY',
}),
);
expect(res.url.startsWith('https://data.us.algolia.com/1/push/foo?watch=true')).toBeTruthy();
expect(res.data).toEqual({
action: 'partialUpdateObjectNoCreate',
records: [
{
baz: 42,
objectID: 'bar',
},
],
});
});
});
});

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,10 @@ method:
- Records
x-available-languages:
- javascript
- go
- java
- php
- python
operationId: chunkedPush
summary: Replace all records in an index
description: |
Expand Down
3 changes: 3 additions & 0 deletions specs/ingestion/spec.yml
Original file line number Diff line number Diff line change
Expand Up @@ -202,3 +202,6 @@ paths:
# ###############
/setClientApiKey:
$ref: '../common/helpers/setClientApiKey.yml#/method'

/chunkedPush:
$ref: 'helpers/chunkedPush.yml#/method'
Original file line number Diff line number Diff line change
Expand Up @@ -61,6 +61,8 @@ method:
content:
application/json:
schema:
$ref: '../../common/schemas/ingestion/WatchResponse.yml'
type: array
items:
$ref: '../../common/schemas/ingestion/WatchResponse.yml'
'400':
$ref: '../../common/responses/IndexNotFound.yml'
4 changes: 3 additions & 1 deletion specs/search/helpers/saveObjectsWithTransformation.yml
Original file line number Diff line number Diff line change
Expand Up @@ -54,6 +54,8 @@ method:
content:
application/json:
schema:
$ref: '../../common/schemas/ingestion/WatchResponse.yml'
type: array
items:
$ref: '../../common/schemas/ingestion/WatchResponse.yml'
'400':
$ref: '../../common/responses/IndexNotFound.yml'
112 changes: 32 additions & 80 deletions templates/go/api.mustache
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,17 @@ type config struct {
headerParams map[string]string
timeouts transport.RequestConfiguration

{{#isIngestionClient}}
// -- ChunkedPush options
waitForTasks bool
batchSize int

// -- Iterable options
maxRetries int
timeout func(int) time.Duration
aggregator func(any, error)
{{/isIngestionClient}}

{{#isSearchClient}}
// -- ChunkedBatch options
waitForTasks bool
Expand Down Expand Up @@ -102,42 +113,6 @@ func WithConnectTimeout(timeout time.Duration) requestOption {

{{#isSearchClient}}

// --------- ChunkedBatch options ---------

type ChunkedBatchOption interface {
RequestOption
chunkedBatch()
}

type chunkedBatchOption func(*config)

var (
_ ChunkedBatchOption = (*chunkedBatchOption)(nil)
_ ChunkedBatchOption = (*requestOption)(nil)
)

func (c chunkedBatchOption) apply(conf *config) {
c(conf)
}

func (c chunkedBatchOption) chunkedBatch() {}

func (r requestOption) chunkedBatch() {}

// WithWaitForTasks whether or not we should wait until every `batch` tasks has been processed, this operation may slow the total execution time of this method but is more reliable.
func WithWaitForTasks(waitForTasks bool) chunkedBatchOption {
return chunkedBatchOption(func(c *config) {
c.waitForTasks = waitForTasks
})
}

// WithBatchSize the size of the chunk of `objects`. The number of `batch` calls will be equal to `length(objects) / batchSize`. Defaults to 1000.
func WithBatchSize(batchSize int) chunkedBatchOption {
return chunkedBatchOption(func(c *config) {
c.batchSize = batchSize
})
}

// --------- PartialUpdateObjects options ---------

type PartialUpdateObjectsOption interface {
Expand Down Expand Up @@ -206,49 +181,6 @@ func WithScopes(scopes []ScopeType) replaceAllObjectsOption {
})
}

// --------- Iterable options ---------.

type IterableOption interface {
RequestOption
iterable()
}

type iterableOption func(*config)

var (
_ IterableOption = (*iterableOption)(nil)
_ IterableOption = (*requestOption)(nil)
)

func (i iterableOption) apply(c *config) {
i(c)
}

func (r requestOption) iterable() {}

func (i iterableOption) iterable() {}

// WithMaxRetries the maximum number of retry. Default to 50.
func WithMaxRetries(maxRetries int) iterableOption {
return iterableOption(func(c *config) {
c.maxRetries = maxRetries
})
}

// WithTimeout he function to decide how long to wait between retries. Default to min(retryCount * 200, 5000)
func WithTimeout(timeout func(int) time.Duration) iterableOption {
return iterableOption(func(c *config) {
c.timeout = timeout
})
}

// WithAggregator the function to aggregate the results of the iterable.
func WithAggregator(aggregator func(any, error)) iterableOption {
return iterableOption(func(c *config) {
c.aggregator = aggregator
})
}

// --------- WaitForKey options ---------.

type WaitForApiKeyOption interface {
Expand Down Expand Up @@ -295,6 +227,18 @@ func toRequestOptions[T RequestOption](opts []T) []RequestOption {
return requestOpts
}

func toIngestionRequestOptions(opts []RequestOption) []ingestion.RequestOption {
requestOpts := make([]ingestion.RequestOption, 0, len(opts))

for _, opt := range opts {
if opt, ok := opt.(ingestion.RequestOption); ok {
requestOpts = append(requestOpts, opt)
}
}

return requestOpts
}

func toIterableOptions(opts []ChunkedBatchOption) []IterableOption {
iterableOpts := make([]IterableOption, 0, len(opts))

Expand Down Expand Up @@ -571,5 +515,13 @@ func (c *APIClient) {{nickname}}({{#hasParams}}r {{#structPrefix}}{{&classname}}
{{/operations}}

{{#isSearchClient}}
{{> helpers}}

{{> search_helpers}}
{{/isSearchClient}}
{{/isSearchClient}}

{{#isIngestionClient}}
{{> helpers}}

{{> ingestion_helpers}}
{{/isIngestionClient}}
121 changes: 121 additions & 0 deletions templates/go/helpers.mustache
Original file line number Diff line number Diff line change
@@ -0,0 +1,121 @@
// --------- ChunkedBatch options ---------

type ChunkedBatchOption interface {
RequestOption
chunkedBatch()
}

type chunkedBatchOption func(*config)

var (
_ ChunkedBatchOption = (*chunkedBatchOption)(nil)
_ ChunkedBatchOption = (*requestOption)(nil)
)

func (c chunkedBatchOption) apply(conf *config) {
c(conf)
}

func (c chunkedBatchOption) chunkedBatch() {}

func (r requestOption) chunkedBatch() {}

// WithWaitForTasks whether or not we should wait until every `batch` tasks has been processed, this operation may slow the total execution time of this method but is more reliable.
func WithWaitForTasks(waitForTasks bool) chunkedBatchOption {
return chunkedBatchOption(func(c *config) {
c.waitForTasks = waitForTasks
})
}

// WithBatchSize the size of the chunk of `objects`. The number of `batch` calls will be equal to `length(objects) / batchSize`. Defaults to 1000.
func WithBatchSize(batchSize int) chunkedBatchOption {
return chunkedBatchOption(func(c *config) {
c.batchSize = batchSize
})
}

// --------- Iterable options ---------.

type IterableOption interface {
RequestOption
iterable()
}

type iterableOption func(*config)

var (
_ IterableOption = (*iterableOption)(nil)
_ IterableOption = (*requestOption)(nil)
)

func (i iterableOption) apply(c *config) {
i(c)
}

func (r requestOption) iterable() {}

func (i iterableOption) iterable() {}

// WithMaxRetries the maximum number of retry. Default to 50.
func WithMaxRetries(maxRetries int) iterableOption {
return iterableOption(func(c *config) {
c.maxRetries = maxRetries
})
}

// WithTimeout he function to decide how long to wait between retries. Default to min(retryCount * 200, 5000)
func WithTimeout(timeout func(int) time.Duration) iterableOption {
return iterableOption(func(c *config) {
c.timeout = timeout
})
}

// WithAggregator the function to aggregate the results of the iterable.
func WithAggregator(aggregator func(any, error)) iterableOption {
return iterableOption(func(c *config) {
c.aggregator = aggregator
})
}

func CreateIterable[T any](execute func(*T, error) (*T, error), validate func(*T, error) (bool, error), opts ...IterableOption) (*T, error) {
conf := config{
headerParams: map[string]string{},
maxRetries: -1,
timeout: func(count int) time.Duration {
return 0 * time.Millisecond
},
}

for _, opt := range opts {
opt.apply(&conf)
}

var executor func(*T, error) (*T, error)

retryCount := 0

executor = func(previousResponse *T, previousError error) (*T, error) {
response, responseErr := execute(previousResponse, previousError)

retryCount++

if conf.aggregator != nil {
conf.aggregator(response, responseErr)
}

canStop, err := validate(response, responseErr)
if canStop || err != nil {
return response, err
}

if conf.maxRetries >= 0 && retryCount >= conf.maxRetries {
return nil, errs.NewWaitError(fmt.Sprintf("The maximum number of retries exceeded. (%d/%d)", retryCount, conf.maxRetries))
}

time.Sleep(conf.timeout(retryCount))

return executor(response, responseErr)
}

return executor(nil, nil)
}
Loading