Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions playground/php/src/search.php
Original file line number Diff line number Diff line change
Expand Up @@ -89,8 +89,8 @@
// }
// var_dump($rules);

$configForIngestion = $config->setFullHosts(['http://localhost:6689'])->setTransformationRegion('eu');
$configForIngestion = $config->setTransformationRegion('eu');

$clientWithTransformation = SearchClient::createWithConfig($configForIngestion);

var_dump($clientWithTransformation->saveObjectsWithTransformation('boyd', [['objectID' => '1', 'name' => 'Michel']], true));
var_dump($clientWithTransformation->replaceAllObjectsWithTransformation('boyd', [['objectID' => '1', 'name' => 'Michel']], true));
5 changes: 4 additions & 1 deletion playground/python/app/ingestion.py
Original file line number Diff line number Diff line change
@@ -1,13 +1,16 @@
from os import environ
from asyncio import run

from algoliasearch.ingestion import __version__
from algoliasearch.ingestion.client import IngestionClient
from dotenv import load_dotenv

load_dotenv("../.env")

async def main():
print("IngestionClient version", __version__)

client = IngestionClient("FOO", "BAR")
client = IngestionClient(environ.get("ALGOLIA_APPLICATION_ID"), environ.get("ALGOLIA_ADMIN_KEY"), "eu")

print("client initialized", client)

Expand Down
46 changes: 24 additions & 22 deletions playground/python/app/search.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,26 +11,26 @@
def main():
print("SearchClient version", __version__)

client = SearchClientSync(
environ.get("ALGOLIA_APPLICATION_ID"), environ.get("ALGOLIA_ADMIN_KEY")
)
client.add_user_agent("playground")
client.add_user_agent("bar", "baz")

print("user_agent", client._config._user_agent.get())
print("client initialized", client)

try:
resp = client.search_synonyms("foo")
print(resp)
client.browse_synonyms("foo", lambda _resp: print(_resp))
finally:
client.close()

print("client closed")

print("with transformations")

# client = SearchClientSync(
# environ.get("ALGOLIA_APPLICATION_ID"), environ.get("ALGOLIA_ADMIN_KEY")
# )
# client.add_user_agent("playground")
# client.add_user_agent("bar", "baz")
#
# print("user_agent", client._config._user_agent.get())
# print("client initialized", client)
#
# try:
# resp = client.search_synonyms("foo")
# print(resp)
# client.browse_synonyms("foo", lambda _resp: print(_resp))
# finally:
# client.close()
#
# print("client closed")
#
# print("with transformations")
#
config = SearchConfig(
environ.get("ALGOLIA_APPLICATION_ID"), environ.get("ALGOLIA_ADMIN_KEY")
)
Expand All @@ -46,10 +46,12 @@ def main():
print("user_agent", client._config._user_agent.get())

try:
resp = client.save_objects_with_transformation(
"foo", [{"objectID": "bar"}], wait_for_tasks=True
resp = client.replace_all_objects_with_transformation(
"boyd", [{"objectID": "bar"},{"objectID": "bar"},{"objectID": "bar"},{"objectID": "bar"},{"objectID": "bar"}], 2
)
print(resp)
except Exception as e:
print(e)
finally:
client.close()

Expand Down
20 changes: 10 additions & 10 deletions playground/python/poetry.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

4 changes: 3 additions & 1 deletion scripts/cts/runCts.ts
Original file line number Diff line number Diff line change
Expand Up @@ -155,7 +155,9 @@ export async function runCts(
assertValidTimeouts(languages.length);
assertChunkWrapperValid(languages.length - skip('dart'));
assertValidReplaceAllObjects(languages.length - skip('dart'));
assertValidReplaceAllObjectsWithTransformation(only('javascript'));
assertValidReplaceAllObjectsWithTransformation(
only('javascript') + only('go') + only('python') + only('java') + only('php'),
);
assertValidAccountCopyIndex(only('javascript'));
assertValidReplaceAllObjectsFailed(languages.length - skip('dart'));
assertValidReplaceAllObjectsScopes(languages.length - skip('dart'));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -92,6 +92,7 @@ function addRoutes(app: Express): void {
)?.[1] as string;
expect(raowtState).to.include.keys(lang);
expect(req.body.action === 'addObject').to.equal(true);
expect(req.query.referenceIndexName === `cts_e2e_replace_all_objects_with_transformation_${lang}`).to.equal(true);

raowtState[lang].pushCount += req.body.records.length;

Expand All @@ -104,7 +105,14 @@ function addRoutes(app: Express): void {
});

app.get('/1/runs/:runID/events/:eventID', (req, res) => {
res.json({ status: 'finished' });
res.json({
status: 'succeeded',
eventID: '113b2068-6337-4c85-b5c2-e7b213d82921',
runID: 'b1b7a982-524c-40d2-bb7f-48aab075abda',
type: 'fetch',
batchSize: 1,
publishedAt: '2022-05-12T06:24:30.049Z',
});
});

app.get('/1/indexes/:indexName/task/:taskID', (req, res) => {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,10 @@ method:
- Records
x-available-languages:
- javascript
- go
- java
- php
- python
operationId: replaceAllObjectsWithTransformation
summary: Replace all records in an index
description: |
Expand Down
19 changes: 13 additions & 6 deletions templates/go/api.mustache
Original file line number Diff line number Diff line change
Expand Up @@ -227,16 +227,23 @@ func toRequestOptions[T RequestOption](opts []T) []RequestOption {
return requestOpts
}

func toIngestionRequestOptions(opts []RequestOption) []ingestion.RequestOption {
requestOpts := make([]ingestion.RequestOption, 0, len(opts))
// toIngestionChunkedBatchOptions converts the current chunked batch opts to ingestion ones.
func toIngestionChunkedBatchOptions(opts []ChunkedBatchOption) []ingestion.ChunkedBatchOption {
conf := config{}

for _, opt := range opts {
if opt, ok := opt.(ingestion.RequestOption); ok {
requestOpts = append(requestOpts, opt)
}
opt.apply(&conf)
}

return requestOpts
ingestionOpts := make([]ingestion.ChunkedBatchOption, 0, len(opts))

if conf.batchSize > 0 {
ingestionOpts = append(ingestionOpts, ingestion.WithBatchSize(conf.batchSize))
}

ingestionOpts = append(ingestionOpts, ingestion.WithWaitForTasks(conf.waitForTasks))

return ingestionOpts
}

func toIterableOptions(opts []ChunkedBatchOption) []IterableOption {
Expand Down
2 changes: 1 addition & 1 deletion templates/go/ingestion_helpers.mustache
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@ ChunkedPush Chunks the given `objects` list in subset of 1000 elements max in or
@return []WatchResponse - List of push responses.
@return error - Error if any.
*/
func (c *APIClient) ChunkedPush(indexName string, objects []map[string]any, action Action, referenceIndexName *string, opts ...RequestOption) ([]WatchResponse, error) {
func (c *APIClient) ChunkedPush(indexName string, objects []map[string]any, action Action, referenceIndexName *string, opts ...ChunkedBatchOption) ([]WatchResponse, error) {
conf := config{
headerParams: map[string]string{},
waitForTasks: false,
Expand Down
124 changes: 109 additions & 15 deletions templates/go/search_helpers.mustache
Original file line number Diff line number Diff line change
Expand Up @@ -599,6 +599,100 @@ func (c *APIClient) ChunkedBatch(indexName string, objects []map[string]any, act
return responses, nil
}

/*
ReplaceAllObjectsWithTransformation is similar to the `replaceAllObjects` method but requires a Push connector (https://www.algolia.com/doc/guides/sending-and-managing-data/send-and-update-your-data/connectors/push/) to be created first, in order to transform records before indexing them to Algolia. The `region` must have been passed to the client instantiation method.
See https://api-clients-automation.netlify.app/docs/add-new-api-client#5-helpers for implementation details.

@param indexName string - the index name to replace objects into.
@param objects []map[string]any - List of objects to replace.
@param opts ...ReplaceAllObjectsOption - Optional parameters for the request.
@return *ReplaceAllObjectsResponse - The response of the replace all objects operation.
@return error - Error if any.
*/
func (c *APIClient) ReplaceAllObjectsWithTransformation(indexName string, objects []map[string]any, opts ...ReplaceAllObjectsOption) (*ReplaceAllObjectsWithTransformationResponse, error) {
if c.ingestionTransporter == nil {
return nil, reportError("`region` must be provided at client instantiation before calling this method.")
}

tmpIndexName := fmt.Sprintf("%s_tmp_%d", indexName, time.Now().UnixNano())

conf := config{
headerParams: map[string]string{},
scopes: []ScopeType{SCOPE_TYPE_SETTINGS, SCOPE_TYPE_RULES, SCOPE_TYPE_SYNONYMS},
}

for _, opt := range opts {
opt.apply(&conf)
}

opts = append(opts, WithWaitForTasks(true))

copyResp, err := c.OperationIndex(c.NewApiOperationIndexRequest(indexName, NewOperationIndexParams(OPERATION_TYPE_COPY, tmpIndexName, WithOperationIndexParamsScope(conf.scopes))), toRequestOptions(opts)...)
if err != nil {
return nil, err
}

watchResp, err := c.ingestionTransporter.ChunkedPush(tmpIndexName, objects, ingestion.Action(ACTION_ADD_OBJECT), &indexName, toIngestionChunkedBatchOptions(replaceAllObjectsToChunkBactchOptions(opts))...)
if err != nil {
_, _ = c.DeleteIndex(c.NewApiDeleteIndexRequest(tmpIndexName))

return nil, err //nolint:wrapcheck
}

_, err = c.WaitForTask(tmpIndexName, copyResp.TaskID, replaceAllObjectsToIterableOptions(opts)...)
if err != nil {
_, _ = c.DeleteIndex(c.NewApiDeleteIndexRequest(tmpIndexName))

return nil, err
}

copyResp, err = c.OperationIndex(c.NewApiOperationIndexRequest(indexName, NewOperationIndexParams(OPERATION_TYPE_COPY, tmpIndexName, WithOperationIndexParamsScope(conf.scopes))), toRequestOptions(opts)...)
if err != nil {
_, _ = c.DeleteIndex(c.NewApiDeleteIndexRequest(tmpIndexName))

return nil, err
}

_, err = c.WaitForTask(tmpIndexName, copyResp.TaskID, replaceAllObjectsToIterableOptions(opts)...)
if err != nil {
_, _ = c.DeleteIndex(c.NewApiDeleteIndexRequest(tmpIndexName))

return nil, err
}

moveResp, err := c.OperationIndex(c.NewApiOperationIndexRequest(tmpIndexName, NewOperationIndexParams(OPERATION_TYPE_MOVE, indexName)), toRequestOptions(opts)...)
if err != nil {
_, _ = c.DeleteIndex(c.NewApiDeleteIndexRequest(tmpIndexName))

return nil, err
}

_, err = c.WaitForTask(tmpIndexName, moveResp.TaskID, replaceAllObjectsToIterableOptions(opts)...)
if err != nil {
_, _ = c.DeleteIndex(c.NewApiDeleteIndexRequest(tmpIndexName))

return nil, err
}

var searchWatchResp []WatchResponse

rawResp, err := json.Marshal(watchResp)
if err != nil {
return nil, fmt.Errorf("unable to convert the ingestion WatchResponse to search WatchResponse: %w", err)
}

err = json.Unmarshal(rawResp, &searchWatchResp)
if err != nil {
return nil, fmt.Errorf("unable to convert the ingestion WatchResponse to search WatchResponse: %w", err)
}

return &ReplaceAllObjectsWithTransformationResponse{
CopyOperationResponse: *copyResp,
WatchResponses: searchWatchResp,
MoveOperationResponse: *moveResp,
}, nil
}

/*
ReplaceAllObjects replaces all objects (records) in the given `indexName` with the given `objects`. A temporary index is created during this process in order to backup your data.
See https://api-clients-automation.netlify.app/docs/add-new-api-client#5-helpers for implementation details.
Expand Down Expand Up @@ -708,7 +802,7 @@ func (c *APIClient) SaveObjectsWithTransformation(indexName string, objects []ma
return nil, reportError("`region` must be provided at client instantiation before calling this method.")
}

return c.ingestionTransporter.ChunkedPush(indexName, objects, ingestion.Action(ACTION_ADD_OBJECT), nil, toIngestionRequestOptions(toRequestOptions(opts))...) //nolint:wrapcheck
return c.ingestionTransporter.ChunkedPush(indexName, objects, ingestion.Action(ACTION_ADD_OBJECT), nil, toIngestionChunkedBatchOptions(opts)...) //nolint:wrapcheck
}

/*
Expand All @@ -725,22 +819,22 @@ func (c *APIClient) PartialUpdateObjectsWithTransformation(indexName string, obj
return nil, reportError("`region` must be provided at client instantiation before calling this method.")
}

conf := config{
headerParams: map[string]string{},
createIfNotExists: true,
}
conf := config{
headerParams: map[string]string{},
createIfNotExists: true,
}

for _, opt := range opts {
opt.apply(&conf)
}
for _, opt := range opts {
opt.apply(&conf)
}

var action Action
var action Action

if conf.createIfNotExists {
action = ACTION_PARTIAL_UPDATE_OBJECT
} else {
action = ACTION_PARTIAL_UPDATE_OBJECT_NO_CREATE
}
if conf.createIfNotExists {
action = ACTION_PARTIAL_UPDATE_OBJECT
} else {
action = ACTION_PARTIAL_UPDATE_OBJECT_NO_CREATE
}

return c.ingestionTransporter.ChunkedPush(indexName, objects, ingestion.Action(action), nil, toIngestionRequestOptions(toRequestOptions(opts))...) //nolint:wrapcheck
return c.ingestionTransporter.ChunkedPush(indexName, objects, ingestion.Action(action), nil, toIngestionChunkedBatchOptions(partialUpdateObjectsToChunkedBatchOptions(opts))...) //nolint:wrapcheck
}
Loading