@@ -592,6 +592,101 @@ func (c *APIClient) PartialUpdateObjects(indexName string, objects []map[string]
592592 return c.ChunkedBatch(indexName, objects, action, partialUpdateObjectsToChunkedBatchOptions(opts)...)
593593}
594594
595+ /*
596+ ChunkedPush Chunks the given `objects` list in subset of 1000 elements max in order to make it fit in `push` requests by leveraging the Transformation pipeline setup in the Push connector (https://www.algolia.com/doc/guides/sending-and-managing-data/send-and-update-your-data/connectors/push/).
597+
598+ @param indexName string - the index name to save objects into.
599+ @param objects []map[string]any - List of objects to save.
600+ @param action Action - The action to perform on the objects.
601+ @param referenceIndexName *string - This is required when targeting an index that does not have a push connector setup (e.g. a tmp index), but you wish to attach another index's transformation to it (e.g. the source index name).
602+ @param opts ...ChunkedBatchOption - Optional parameters for the request.
603+ @return []ingestion.WatchResponse - List of push responses.
604+ @return error - Error if any.
605+ */
606+ func (c *APIClient) ChunkedPush(indexName string, objects []map[string]any, action Action, referenceIndexName *string, opts ...ChunkedBatchOption) ([]ingestion.WatchResponse, error) {
607+ if c.ingestionTransporter == nil {
608+ return nil, reportError(" `region` must be provided at client instantiation before calling this method." )
609+ }
610+
611+ conf := config{
612+ headerParams: map[string]string{} ,
613+ waitForTasks: false,
614+ batchSize: 1000,
615+ }
616+
617+ for _, opt := range opts {
618+ opt.apply(&conf)
619+ }
620+
621+ records := make([]map[string]any, 0, len(objects)%conf.batchSize)
622+ responses := make([]ingestion.WatchResponse, 0, len(objects)%conf.batchSize)
623+
624+ for i, obj := range objects {
625+ records = append(records, obj)
626+
627+ if len(records) == conf.batchSize || i == len(objects)-1 {
628+ pushRecords := make([]ingestion.PushTaskRecords, 0, len(records))
629+
630+ rawRecords, err := json.Marshal(records)
631+ if err != nil {
632+ return nil, reportError(" unable to marshal the given `objects`: %w" , err)
633+ }
634+
635+ err = json.Unmarshal(rawRecords, &pushRecords)
636+ if err != nil {
637+ return nil, reportError(" unable to unmarshal the given `objects` to an `[]ingestion.PushTaskRecords` payload: %w" , err)
638+ }
639+
640+ request := c.ingestionTransporter.NewApiPushRequest(
641+ indexName,
642+ ingestion.NewEmptyPushTaskPayload().
643+ SetAction(ingestion.Action(action)).
644+ SetRecords(pushRecords),
645+ )
646+
647+ if referenceIndexName != nil {
648+ request = request.WithReferenceIndexName(*referenceIndexName)
649+ }
650+
651+ resp, err := c.ingestionTransporter.Push(request)
652+ if err != nil {
653+ return nil, err //nolint: wrapcheck
654+ }
655+
656+ responses = append(responses, *resp)
657+ records = make([]map[string]any, 0, len(objects)%conf.batchSize)
658+ }
659+ }
660+
661+ if conf.waitForTasks {
662+ for _, resp := range responses {
663+ _, err := CreateIterable( //nolint:wrapcheck
664+ func(*ingestion.Event, error) (*ingestion.Event, error) {
665+ if resp.EventID == nil {
666+ return nil, reportError(" received unexpected response from the push endpoint, eventID must not be undefined" )
667+ }
668+
669+ return c.ingestionTransporter.GetEvent(c.ingestionTransporter.NewApiGetEventRequest(resp.RunID, *resp.EventID))
670+ },
671+ func(response *ingestion.Event, err error) (bool, error) {
672+ var apiErr *APIError
673+ if errors.As(err, &apiErr) {
674+ return apiErr.Status != 404, nil
675+ }
676+
677+ return true, err
678+ },
679+ WithTimeout(func(count int) time.Duration { return time.Duration(min(200*count, 5000)) * time.Millisecond } ), WithMaxRetries(50),
680+ )
681+ if err != nil {
682+ return nil, err
683+ }
684+ }
685+ }
686+
687+ return responses, nil
688+ }
689+
595690/*
596691ChunkedBatch chunks the given `objects` list in subset of 1000 elements max in order to make it fit in `batch` requests.
597692
@@ -747,30 +842,7 @@ Helper: Similar to the `SaveObjects` method but requires a Push connector (https
747842 @return error - Error if any.
748843*/
749844func (c *APIClient) SaveObjectsWithTransformation(indexName string, objects []map[string]any, opts ...ChunkedBatchOption) ([]ingestion.WatchResponse, error) {
750- if c.ingestionTransporter == nil {
751- return nil, reportError(" `region` must be provided at client instantiation before calling this method." )
752- }
753-
754- var records []ingestion.PushTaskRecords
755-
756- rawObjects, err := json.Marshal(objects)
757- if err != nil {
758- return nil, reportError(" unable to marshal the given `objects`: %w" , err)
759- }
760-
761- err = json.Unmarshal(rawObjects, &records)
762- if err != nil {
763- return nil, reportError(" unable to unmarshal the given `objects` to an `[]ingestion.PushTaskRecords` payload: %w" , err)
764- }
765-
766- return c.ingestionTransporter.Push( //nolint:wrapcheck
767- c.ingestionTransporter.NewApiPushRequest(
768- indexName,
769- ingestion.NewEmptyPushTaskPayload().
770- SetAction(ingestion.ACTION_ADD_OBJECT).
771- SetRecords(records),
772- ),
773- )
845+ return c.ChunkedPush(indexName, objects, ACTION_ADD_OBJECT, nil, opts...)
774846}
775847
776848/*
@@ -783,46 +855,22 @@ Helper: Similar to the `PartialUpdateObjects` method but requires a Push connect
783855 @return error - Error if any.
784856*/
785857func (c *APIClient) PartialUpdateObjectsWithTransformation(indexName string, objects []map[string]any, opts ...PartialUpdateObjectsOption) ([]ingestion.WatchResponse, error) {
786- if c.ingestionTransporter == nil {
787- return nil, reportError(" `region` must be provided at client instantiation before calling this method." )
858+ conf := config{
859+ headerParams: map[string]string{} ,
860+ createIfNotExists: true,
788861 }
789862
790-
791- conf := config{
792- headerParams: map[string]string{} ,
793- createIfNotExists: true,
794- }
795-
796- for _, opt := range opts {
797- opt.apply(&conf)
798- }
799-
800- var action ingestion.Action
801-
802- if conf.createIfNotExists {
803- action = ingestion.ACTION_PARTIAL_UPDATE_OBJECT
804- } else {
805- action = ingestion.ACTION_PARTIAL_UPDATE_OBJECT_NO_CREATE
806- }
807-
808- var records []ingestion.PushTaskRecords
809-
810- rawObjects, err := json.Marshal(objects)
811- if err != nil {
812- return nil, reportError(" unable to marshal the given `objects`: %w" , err)
863+ for _, opt := range opts {
864+ opt.apply(&conf)
813865 }
814866
815- err = json.Unmarshal(rawObjects, &records)
816- if err != nil {
817- return nil, reportError(" unable to unmarshal the given `objects` to an `[]ingestion.PushTaskRecords` payload: %w" , err)
867+ var action Action
868+
869+ if conf.createIfNotExists {
870+ action = ACTION_PARTIAL_UPDATE_OBJECT
871+ } else {
872+ action = ACTION_PARTIAL_UPDATE_OBJECT_NO_CREATE
818873 }
819874
820- return c.ingestionTransporter.Push( //nolint:wrapcheck
821- c.ingestionTransporter.NewApiPushRequest(
822- indexName,
823- ingestion.NewEmptyPushTaskPayload().
824- SetAction(action).
825- SetRecords(records),
826- ),
827- )
875+ return c.ChunkedPush(indexName, objects, action, nil, partialUpdateObjectsToChunkedBatchOptions(opts)...)
828876}
0 commit comments