Skip to content

Commit bfaae38

Browse files
committed
fix: py
1 parent 1a5d52f commit bfaae38

File tree

1 file changed

+18
-7
lines changed

1 file changed

+18
-7
lines changed

templates/python/ingestion_helpers.mustache

Lines changed: 18 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -11,6 +11,7 @@
1111
"""
1212
Helper: Chunks the given `objects` list in subset of 1000 elements max in order to make it fit in `push` requests by leveraging the Transformation pipeline setup in the Push connector (https://www.algolia.com/doc/guides/sending-and-managing-data/send-and-update-your-data/connectors/push/).
1313
"""
14+
offset = 0
1415
records: List[PushTaskRecords] = []
1516
responses: List[WatchResponse] = []
1617
wait_batch_size = batch_size // 10
@@ -20,7 +21,7 @@
2021
records.append(obj) # pyright: ignore
2122
if len(records) == batch_size or i == len(objects) - 1:
2223
responses.append(
23-
{{^isSyncClient}}await {{/isSyncClient}}self.push(
24+
await self.push(
2425
index_name=index_name,
2526
push_task_payload={
2627
"action": action,
@@ -31,18 +32,27 @@
3132
)
3233
)
3334
records = []
34-
if wait_for_tasks and (len(responses) % wait_batch_size == 0 or i == len(objects) - 1):
35-
for response in responses[-wait_batch_size:]:
36-
{{^isSyncClient}}async {{/isSyncClient}}def _func(_: Optional[Event]) -> Event:
35+
if (
36+
wait_for_tasks
37+
and len(responses) > 0
38+
and (len(responses) % wait_batch_size == 0 or i == len(objects) - 1)
39+
):
40+
for response in responses[offset:offset+wait_batch_size]:
41+
42+
async def _func(_: Optional[Event]) -> Event:
3743
if response.event_id is None:
3844
raise ValueError(
3945
"received unexpected response from the push endpoint, eventID must not be undefined"
4046
)
4147
try:
42-
return {{^isSyncClient}}await {{/isSyncClient}}self.get_event(run_id=response.run_id, event_id=response.event_id, request_options=request_options)
48+
return await self.get_event(
49+
run_id=response.run_id,
50+
event_id=response.event_id,
51+
request_options=request_options,
52+
)
4353
except RequestException as e:
4454
if e.status_code == 404:
45-
return None # pyright: ignore
55+
return None # pyright: ignore
4656
raise e
4757

4858
_retry_count = 0
@@ -56,12 +66,13 @@
5666

5767
timeout = RetryTimeout()
5868

59-
{{^isSyncClient}}await {{/isSyncClient}}create_iterable{{#isSyncClient}}_sync{{/isSyncClient}}(
69+
await create_iterable(
6070
func=_func,
6171
validate=_validate,
6272
aggregator=_aggregator,
6373
timeout=lambda: timeout(_retry_count),
6474
error_validate=lambda _: _retry_count >= 50,
6575
error_message=lambda _: f"The maximum number of retries exceeded. (${_retry_count}/${50})",
6676
)
77+
offset += wait_batch_size
6778
return responses

0 commit comments

Comments
 (0)