Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 2 additions & 4 deletions elasticsearch/_async/helpers.py
Original file line number Diff line number Diff line change
Expand Up @@ -97,8 +97,7 @@ async def _chunk_actions(
data: _TYPE_BULK_ACTION_BODY
if not flush_after_seconds:
async for action, data in actions:
ret = chunker.feed(action, data)
if ret:
for ret in chunker.feed(action, data):
yield ret
else:
sender, receiver = create_memory_object_stream[
Expand Down Expand Up @@ -128,8 +127,7 @@ async def get_items() -> None:

if action is BulkMeta.done:
break
ret = chunker.feed(action, data)
if ret:
for ret in chunker.feed(action, data):
yield ret

ret = chunker.flush()
Expand Down
90 changes: 45 additions & 45 deletions elasticsearch/helpers/actions.py
Original file line number Diff line number Diff line change
Expand Up @@ -158,7 +158,7 @@ def feed(
self,
action: _TYPE_BULK_ACTION_HEADER_WITH_META,
data: _TYPE_BULK_ACTION_BODY,
) -> Optional[
) -> List[
Tuple[
List[
Union[
Expand All @@ -169,43 +169,42 @@ def feed(
List[bytes],
]
]:
ret = None
action_bytes = b""
data_bytes: Optional[bytes] = None
cur_size = 0
if not isinstance(action, BulkMeta):
action_bytes = to_bytes(self.serializer.dumps(action), "utf-8")
# +1 to account for the trailing new line character
cur_size = len(action_bytes) + 1

if data is not None:
data_bytes = to_bytes(self.serializer.dumps(data), "utf-8")
cur_size += len(data_bytes) + 1
else:
data_bytes = None
if action is BulkMeta.flush and self.bulk_actions:
return [self.flush()]
elif isinstance(action, BulkMeta):
return []

action_bytes = to_bytes(self.serializer.dumps(action), "utf-8")
# +1 to account for the trailing new line character
cur_size = len(action_bytes) + 1
if data is not None:
data_bytes = to_bytes(self.serializer.dumps(data), "utf-8")
cur_size += len(data_bytes) + 1
else:
data_bytes = None

# full chunk, send it and start a new one
if self.bulk_actions and (
self.size + cur_size > self.max_chunk_bytes
or self.action_count == self.chunk_size
or (action == BulkMeta.flush and self.bulk_actions)
):
ret = (self.bulk_data, self.bulk_actions)
self.bulk_actions = []
self.bulk_data = []
self.size = 0
self.action_count = 0

if not isinstance(action, BulkMeta):
self.bulk_actions.append(action_bytes)
if data_bytes is not None:
self.bulk_actions.append(data_bytes)
self.bulk_data.append((action, data))
else:
self.bulk_data.append((action,))
ret = []

# FORWARD CHECK: if the currently incoming action doesn't fit in the
# active chunk, flush it and hold the action for the next chunk.
if self.bulk_actions and self.size + cur_size > self.max_chunk_bytes:
ret.append(self.flush())

self.bulk_actions.append(action_bytes)
if data_bytes is not None:
self.bulk_actions.append(data_bytes)
self.bulk_data.append((action, data))
else:
self.bulk_data.append((action,))
self.size += cur_size
self.action_count += 1

# PRESENT CHECK: the active chunk must not exceed slots nor size limits
# Note: it could be that incoming action+data's byte size could exceed
# max_chunk_bytes all alone, hence we need to check using >=
if self.action_count == self.chunk_size or self.size >= self.max_chunk_bytes:
ret.append(self.flush())

self.size += cur_size
self.action_count += 1
return ret

def flush(
Expand All @@ -221,11 +220,14 @@ def flush(
List[bytes],
]
]:
ret = None
if self.bulk_actions:
ret = (self.bulk_data, self.bulk_actions)
self.bulk_actions = []
self.bulk_data = []
if not self.bulk_actions:
return None

ret = (self.bulk_data, self.bulk_actions)
self.bulk_actions = []
self.bulk_data = []
self.size = 0
self.action_count = 0
return ret


Expand Down Expand Up @@ -256,8 +258,7 @@ def _chunk_actions(

if not flush_after_seconds:
for action, data in actions:
ret = chunker.feed(action, data)
if ret:
for ret in chunker.feed(action, data):
yield ret
else:
item_queue: queue.Queue[_TYPE_BULK_ACTION_HEADER_WITH_META_AND_BODY] = (
Expand All @@ -284,8 +285,7 @@ def get_items() -> None:

if action is BulkMeta.done:
break
ret = chunker.feed(action, data)
if ret:
for ret in chunker.feed(action, data):
yield ret

ret = chunker.flush()
Expand Down
Loading