Skip to content

Commit 0c6abfb

Browse files
committed
fix: php
1 parent 1b2d325 commit 0c6abfb

File tree

3 files changed

+103
-100
lines changed

3 files changed

+103
-100
lines changed
Lines changed: 90 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,90 @@
1+
/*
2+
ChunkedPush Chunks the given `objects` list in subset of 1000 elements max in order to make it fit in `push` requests by leveraging the Transformation pipeline setup in the Push connector (https://www.algolia.com/doc/guides/sending-and-managing-data/send-and-update-your-data/connectors/push/).
3+
4+
@param indexName string - the index name to save objects into.
5+
@param objects []map[string]any - List of objects to save.
6+
@param action Action - The action to perform on the objects.
7+
@param referenceIndexName *string - This is required when targeting an index that does not have a push connector setup (e.g. a tmp index), but you wish to attach another index's transformation to it (e.g. the source index name).
8+
@param opts ...ChunkedBatchOption - Optional parameters for the request.
9+
@return []ingestion.WatchResponse - List of push responses.
10+
@return error - Error if any.
11+
*/
12+
func (c *APIClient) ChunkedPush(indexName string, objects []map[string]any, action Action, referenceIndexName *string, opts ...ChunkedBatchOption) ([]ingestion.WatchResponse, error) {
13+
conf := config{
14+
headerParams: map[string]string{},
15+
waitForTasks: false,
16+
batchSize: 1000,
17+
}
18+
19+
for _, opt := range opts {
20+
opt.apply(&conf)
21+
}
22+
23+
records := make([]map[string]any, 0, len(objects)%conf.batchSize)
24+
responses := make([]ingestion.WatchResponse, 0, len(objects)%conf.batchSize)
25+
26+
for i, obj := range objects {
27+
records = append(records, obj)
28+
29+
if len(records) == conf.batchSize || i == len(objects)-1 {
30+
pushRecords := make([]ingestion.PushTaskRecords, 0, len(records))
31+
32+
rawRecords, err := json.Marshal(records)
33+
if err != nil {
34+
return nil, reportError("unable to marshal the given `objects`: %w", err)
35+
}
36+
37+
err = json.Unmarshal(rawRecords, &pushRecords)
38+
if err != nil {
39+
return nil, reportError("unable to unmarshal the given `objects` to an `[]ingestion.PushTaskRecords` payload: %w", err)
40+
}
41+
42+
request := c.NewApiPushRequest(
43+
indexName,
44+
ingestion.NewEmptyPushTaskPayload().
45+
SetAction(ingestion.Action(action)).
46+
SetRecords(pushRecords),
47+
)
48+
49+
if referenceIndexName != nil {
50+
request = request.WithReferenceIndexName(*referenceIndexName)
51+
}
52+
53+
resp, err := c.Push(request)
54+
if err != nil {
55+
return nil, err //nolint: wrapcheck
56+
}
57+
58+
responses = append(responses, *resp)
59+
records = make([]map[string]any, 0, len(objects)%conf.batchSize)
60+
}
61+
}
62+
63+
if conf.waitForTasks {
64+
for _, resp := range responses {
65+
_, err := CreateIterable( //nolint:wrapcheck
66+
func(*ingestion.Event, error) (*ingestion.Event, error) {
67+
if resp.EventID == nil {
68+
return nil, reportError("received unexpected response from the push endpoint, eventID must not be undefined")
69+
}
70+
71+
return c.GetEvent(c.NewApiGetEventRequest(resp.RunID, *resp.EventID))
72+
},
73+
func(response *ingestion.Event, err error) (bool, error) {
74+
var apiErr *APIError
75+
if errors.As(err, &apiErr) {
76+
return apiErr.Status != 404, nil
77+
}
78+
79+
return true, err
80+
},
81+
WithTimeout(func(count int) time.Duration { return time.Duration(min(500*count, 5000)) * time.Millisecond }), WithMaxRetries(50),
82+
)
83+
if err != nil {
84+
return nil, err
85+
}
86+
}
87+
}
88+
89+
return responses, nil
90+
}

templates/go/search_helpers.mustache

Lines changed: 10 additions & 97 deletions
Original file line numberDiff line numberDiff line change
@@ -592,101 +592,6 @@ func (c *APIClient) PartialUpdateObjects(indexName string, objects []map[string]
592592
return c.ChunkedBatch(indexName, objects, action, partialUpdateObjectsToChunkedBatchOptions(opts)...)
593593
}
594594

595-
/*
596-
ChunkedPush Chunks the given `objects` list in subset of 1000 elements max in order to make it fit in `push` requests by leveraging the Transformation pipeline setup in the Push connector (https://www.algolia.com/doc/guides/sending-and-managing-data/send-and-update-your-data/connectors/push/).
597-
598-
@param indexName string - the index name to save objects into.
599-
@param objects []map[string]any - List of objects to save.
600-
@param action Action - The action to perform on the objects.
601-
@param referenceIndexName *string - This is required when targeting an index that does not have a push connector setup (e.g. a tmp index), but you wish to attach another index's transformation to it (e.g. the source index name).
602-
@param opts ...ChunkedBatchOption - Optional parameters for the request.
603-
@return []ingestion.WatchResponse - List of push responses.
604-
@return error - Error if any.
605-
*/
606-
func (c *APIClient) ChunkedPush(indexName string, objects []map[string]any, action Action, referenceIndexName *string, opts ...ChunkedBatchOption) ([]ingestion.WatchResponse, error) {
607-
if c.ingestionTransporter == nil {
608-
return nil, reportError("`region` must be provided at client instantiation before calling this method.")
609-
}
610-
611-
conf := config{
612-
headerParams: map[string]string{},
613-
waitForTasks: false,
614-
batchSize: 1000,
615-
}
616-
617-
for _, opt := range opts {
618-
opt.apply(&conf)
619-
}
620-
621-
records := make([]map[string]any, 0, len(objects)%conf.batchSize)
622-
responses := make([]ingestion.WatchResponse, 0, len(objects)%conf.batchSize)
623-
624-
for i, obj := range objects {
625-
records = append(records, obj)
626-
627-
if len(records) == conf.batchSize || i == len(objects)-1 {
628-
pushRecords := make([]ingestion.PushTaskRecords, 0, len(records))
629-
630-
rawRecords, err := json.Marshal(records)
631-
if err != nil {
632-
return nil, reportError("unable to marshal the given `objects`: %w", err)
633-
}
634-
635-
err = json.Unmarshal(rawRecords, &pushRecords)
636-
if err != nil {
637-
return nil, reportError("unable to unmarshal the given `objects` to an `[]ingestion.PushTaskRecords` payload: %w", err)
638-
}
639-
640-
request := c.ingestionTransporter.NewApiPushRequest(
641-
indexName,
642-
ingestion.NewEmptyPushTaskPayload().
643-
SetAction(ingestion.Action(action)).
644-
SetRecords(pushRecords),
645-
)
646-
647-
if referenceIndexName != nil {
648-
request = request.WithReferenceIndexName(*referenceIndexName)
649-
}
650-
651-
resp, err := c.ingestionTransporter.Push(request)
652-
if err != nil {
653-
return nil, err //nolint: wrapcheck
654-
}
655-
656-
responses = append(responses, *resp)
657-
records = make([]map[string]any, 0, len(objects)%conf.batchSize)
658-
}
659-
}
660-
661-
if conf.waitForTasks {
662-
for _, resp := range responses {
663-
_, err := CreateIterable( //nolint:wrapcheck
664-
func(*ingestion.Event, error) (*ingestion.Event, error) {
665-
if resp.EventID == nil {
666-
return nil, reportError("received unexpected response from the push endpoint, eventID must not be undefined")
667-
}
668-
669-
return c.ingestionTransporter.GetEvent(c.ingestionTransporter.NewApiGetEventRequest(resp.RunID, *resp.EventID))
670-
},
671-
func(response *ingestion.Event, err error) (bool, error) {
672-
var apiErr *APIError
673-
if errors.As(err, &apiErr) {
674-
return apiErr.Status != 404, nil
675-
}
676-
677-
return true, err
678-
},
679-
WithTimeout(func(count int) time.Duration { return time.Duration(min(500*count, 5000)) * time.Millisecond }), WithMaxRetries(50),
680-
)
681-
if err != nil {
682-
return nil, err
683-
}
684-
}
685-
}
686-
687-
return responses, nil
688-
}
689-
690595
/*
691596
ChunkedBatch chunks the given `objects` list in subset of 1000 elements max in order to make it fit in `batch` requests.
692597

@@ -842,7 +747,11 @@ Helper: Similar to the `SaveObjects` method but requires a Push connector (https
842747
@return error - Error if any.
843748
*/
844749
func (c *APIClient) SaveObjectsWithTransformation(indexName string, objects []map[string]any, opts ...ChunkedBatchOption) ([]ingestion.WatchResponse, error) {
845-
return c.ChunkedPush(indexName, objects, ACTION_ADD_OBJECT, nil, opts...)
750+
if c.ingestionTransporter == nil {
751+
return nil, reportError("`region` must be provided at client instantiation before calling this method.")
752+
}
753+
754+
return c.ingestionTransporter.ChunkedPush(indexName, objects, ACTION_ADD_OBJECT, nil, opts...)
846755
}
847756

848757
/*
@@ -855,6 +764,10 @@ Helper: Similar to the `PartialUpdateObjects` method but requires a Push connect
855764
@return error - Error if any.
856765
*/
857766
func (c *APIClient) PartialUpdateObjectsWithTransformation(indexName string, objects []map[string]any, opts ...PartialUpdateObjectsOption) ([]ingestion.WatchResponse, error) {
767+
if c.ingestionTransporter == nil {
768+
return nil, reportError("`region` must be provided at client instantiation before calling this method.")
769+
}
770+
858771
conf := config{
859772
headerParams: map[string]string{},
860773
createIfNotExists: true,
@@ -872,5 +785,5 @@ func (c *APIClient) PartialUpdateObjectsWithTransformation(indexName string, obj
872785
action = ACTION_PARTIAL_UPDATE_OBJECT_NO_CREATE
873786
}
874787

875-
return c.ChunkedPush(indexName, objects, action, nil, partialUpdateObjectsToChunkedBatchOptions(opts)...)
788+
return c.ingestionTransporter.ChunkedPush(indexName, objects, action, nil, partialUpdateObjectsToChunkedBatchOptions(opts)...)
876789
}

templates/php/api.mustache

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -342,15 +342,15 @@ use Algolia\AlgoliaSearch\Exceptions\NotFoundException;
342342
$records[] = $object;
343343
344344
if (sizeof($records) === $batchSize || $count === sizeof($objects) - 1) {
345-
$responses[] = $this->ingestionTransporter->push($indexName, ['action' => $action, 'records' => $records], false, $referenceIndexName, $requestOptions);
345+
$responses[] = $this->push($indexName, ['action' => $action, 'records' => $records], false, $referenceIndexName, $requestOptions);
346346
$records = [];
347347
}
348348

349349
++$count;
350350
}
351351

352352
if (!empty($records)) {
353-
$responses[] = $this->ingestionTransporter->push($indexName, ['action' => $action, 'records' => $records], false, $referenceIndexName, $requestOptions);
353+
$responses[] = $this->push($indexName, ['action' => $action, 'records' => $records], false, $referenceIndexName, $requestOptions);
354354
}
355355

356356
if ($waitForTasks && !empty($responses)) {
@@ -362,7 +362,7 @@ use Algolia\AlgoliaSearch\Exceptions\NotFoundException;
362362
363363
while ($retry < 50) {
364364
try {
365-
$resp = $this->ingestionTransporter->getEvent($response->runID, $response->eventID);
365+
$resp = $this->getEvent($response->runID, $response->eventID);
366366
367367
break;
368368
} catch (NotFoundException $e) {

0 commit comments

Comments
 (0)