Skip to content

Commit 7fdc5d9

Browse files
committed
feat: python
1 parent 1573a9e commit 7fdc5d9

File tree

3 files changed

+75
-73
lines changed

3 files changed

+75
-73
lines changed

templates/python/api.mustache

Lines changed: 7 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -12,10 +12,13 @@ from algoliasearch.search.models import (
1212
SecuredApiKeyRestrictions,
1313
)
1414

15-
from algoliasearch.ingestion.models import (WatchResponse, Event, PushTaskRecords)
15+
from algoliasearch.ingestion.models import (WatchResponse)
1616
from algoliasearch.ingestion.config import IngestionConfig
1717
from algoliasearch.ingestion.client import (IngestionClient, IngestionClientSync)
1818
{{/isSearchClient}}
19+
{{#isIngestionClient}}
20+
from algoliasearch.ingestion.models import (Action, WatchResponse, Event, PushTaskRecords)
21+
{{/isIngestionClient}}
1922

2023
{{#operations}}{{#operation}}{{#imports}}
2124
from algoliasearch.{{packageName}}.models import {{{.}}}
@@ -130,6 +133,9 @@ class {{classname}}{{#isSyncClient}}Sync{{/isSyncClient}}:
130133
{{#isSearchClient}}
131134
{{> search_helpers}}
132135
{{/isSearchClient}}
136+
{{#isIngestionClient}}
137+
{{> ingestion_helpers}}
138+
{{/isIngestionClient}}
133139

134140
{{#operation}}
135141

Lines changed: 64 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,64 @@
1+
{{^isSyncClient}}async {{/isSyncClient}}def chunked_push(
2+
self,
3+
index_name: str,
4+
objects: List[Dict[str, Any]],
5+
action: Action = Action.ADDOBJECT,
6+
wait_for_tasks: bool = False,
7+
batch_size: int = 1000,
8+
reference_index_name: Optional[str] = None,
9+
request_options: Optional[Union[dict, RequestOptions]] = None,
10+
) -> List[WatchResponse]:
11+
"""
12+
Helper: Chunks the given `objects` list in subset of 1000 elements max in order to make it fit in `push` requests by leveraging the Transformation pipeline setup in the Push connector (https://www.algolia.com/doc/guides/sending-and-managing-data/send-and-update-your-data/connectors/push/).
13+
"""
14+
records: List[PushTaskRecords] = []
15+
responses: List[WatchResponse] = []
16+
for i, obj in enumerate(objects):
17+
records.append(obj) # pyright: ignore
18+
if len(records) == batch_size or i == len(objects) - 1:
19+
responses.append(
20+
{{^isSyncClient}}await {{/isSyncClient}}self.push(
21+
index_name=index_name,
22+
push_task_payload={
23+
"action": action,
24+
"records": records,
25+
},
26+
reference_index_name=reference_index_name,
27+
request_options=request_options,
28+
)
29+
)
30+
requests = []
31+
if wait_for_tasks:
32+
for response in responses:
33+
{{^isSyncClient}}async {{/isSyncClient}}def _func(_: Optional[Event]) -> Event:
34+
if response.event_id is None:
35+
raise ValueError(
36+
"received unexpected response from the push endpoint, eventID must not be undefined"
37+
)
38+
try:
39+
return {{^isSyncClient}}await {{/isSyncClient}}self.get_event(run_id=response.run_id, event_id=response.event_id, request_options=request_options)
40+
except RequestException as e:
41+
if e.status_code == 404:
42+
return None # pyright: ignore
43+
raise e
44+
45+
_retry_count = 0
46+
47+
def _aggregator(_: Event | None) -> None:
48+
nonlocal _retry_count
49+
_retry_count += 1
50+
51+
def _validate(_resp: Event | None) -> bool:
52+
return _resp is not None
53+
54+
timeout = RetryTimeout()
55+
56+
{{^isSyncClient}}await {{/isSyncClient}}create_iterable{{#isSyncClient}}_sync{{/isSyncClient}}(
57+
func=_func,
58+
validate=_validate,
59+
aggregator=_aggregator,
60+
timeout=lambda: timeout(_retry_count),
61+
error_validate=lambda _: _retry_count >= 50,
62+
error_message=lambda _: f"The maximum number of retries exceeded. (${_retry_count}/${50})",
63+
)
64+
return responses

templates/python/search_helpers.mustache

Lines changed: 4 additions & 72 deletions
Original file line numberDiff line numberDiff line change
@@ -303,7 +303,9 @@
303303
"""
304304
Helper: Similar to the `save_objects` method but requires a Push connector (https://www.algolia.com/doc/guides/sending-and-managing-data/send-and-update-your-data/connectors/push/) to be created first, in order to transform records before indexing them to Algolia. The `region` must've been passed to the client's config at instantiation.
305305
"""
306-
return {{^isSyncClient}}await {{/isSyncClient}}self.chunked_push(index_name=index_name, objects=objects, action=Action.ADDOBJECT, wait_for_tasks=wait_for_tasks, batch_size=batch_size, request_options=request_options)
306+
if self._ingestion_transporter is None:
307+
raise ValueError("`region` must be provided at client instantiation before calling this method.")
308+
return {{^isSyncClient}}await {{/isSyncClient}}self._ingestion_transporter.chunked_push(index_name=index_name, objects=objects, action=Action.ADDOBJECT, wait_for_tasks=wait_for_tasks, batch_size=batch_size, request_options=request_options)
307309
308310
{{^isSyncClient}}async {{/isSyncClient}}def delete_objects(
309311
self,
@@ -344,79 +346,9 @@
344346
"""
345347
Helper: Similar to the `partial_update_objects` method but requires a Push connector (https://www.algolia.com/doc/guides/sending-and-managing-data/send-and-update-your-data/connectors/push/) to be created first, in order to transform records before indexing them to Algolia. The `region` must've been passed to the client instantiation method.
346348
"""
347-
return {{^isSyncClient}}await {{/isSyncClient}}self.chunked_push(index_name=index_name, objects=objects, action=Action.PARTIALUPDATEOBJECT if create_if_not_exists else Action.PARTIALUPDATEOBJECTNOCREATE, wait_for_tasks=wait_for_tasks, batch_size=batch_size, request_options=request_options)
348-
349-
{{^isSyncClient}}async {{/isSyncClient}}def chunked_push(
350-
self,
351-
index_name: str,
352-
objects: List[Dict[str, Any]],
353-
action: Action = Action.ADDOBJECT,
354-
wait_for_tasks: bool = False,
355-
batch_size: int = 1000,
356-
reference_index_name: Optional[str] = None,
357-
request_options: Optional[Union[dict, RequestOptions]] = None,
358-
) -> List[WatchResponse]:
359-
"""
360-
Helper: Chunks the given `objects` list in subset of 1000 elements max in order to make it fit in `push` requests by leveraging the Transformation pipeline setup in the Push connector (https://www.algolia.com/doc/guides/sending-and-managing-data/send-and-update-your-data/connectors/push/).
361-
"""
362-
363349
if self._ingestion_transporter is None:
364350
raise ValueError("`region` must be provided at client instantiation before calling this method.")
365-
records: List[PushTaskRecords] = []
366-
responses: List[WatchResponse] = []
367-
for i, obj in enumerate(objects):
368-
records.append(obj) # pyright: ignore
369-
if len(records) == batch_size or i == len(objects) - 1:
370-
responses.append(
371-
{{^isSyncClient}}await {{/isSyncClient}}self._ingestion_transporter.push(
372-
index_name=index_name,
373-
push_task_payload={
374-
"action": action,
375-
"records": records,
376-
},
377-
reference_index_name=reference_index_name,
378-
request_options=request_options,
379-
)
380-
)
381-
requests = []
382-
if wait_for_tasks:
383-
for response in responses:
384-
{{^isSyncClient}}async {{/isSyncClient}}def _func(_: Optional[Event]) -> Event:
385-
if self._ingestion_transporter is None:
386-
raise ValueError(
387-
"`region` must be provided at client instantiation before calling this method."
388-
)
389-
if response.event_id is None:
390-
raise ValueError(
391-
"received unexpected response from the push endpoint, eventID must not be undefined"
392-
)
393-
try:
394-
return {{^isSyncClient}}await {{/isSyncClient}}self._ingestion_transporter.get_event(run_id=response.run_id, event_id=response.event_id, request_options=request_options)
395-
except RequestException as e:
396-
if e.status_code == 404:
397-
return None # pyright: ignore
398-
raise e
399-
400-
_retry_count = 0
401-
402-
def _aggregator(_: Event | None) -> None:
403-
nonlocal _retry_count
404-
_retry_count += 1
405-
406-
def _validate(_resp: Event | None) -> bool:
407-
return _resp is not None
408-
409-
timeout = RetryTimeout()
410-
411-
{{^isSyncClient}}await {{/isSyncClient}}create_iterable{{#isSyncClient}}_sync{{/isSyncClient}}(
412-
func=_func,
413-
validate=_validate,
414-
aggregator=_aggregator,
415-
timeout=lambda: timeout(_retry_count),
416-
error_validate=lambda _: _retry_count >= 50,
417-
error_message=lambda _: f"The maximum number of retries exceeded. (${_retry_count}/${50})",
418-
)
419-
return responses
351+
return {{^isSyncClient}}await {{/isSyncClient}}self._ingestion_transporter.chunked_push(index_name=index_name, objects=objects, action=Action.PARTIALUPDATEOBJECT if create_if_not_exists else Action.PARTIALUPDATEOBJECTNOCREATE, wait_for_tasks=wait_for_tasks, batch_size=batch_size, request_options=request_options)
420352
421353
{{^isSyncClient}}async {{/isSyncClient}}def chunked_batch(
422354
self,

0 commit comments

Comments
 (0)