diff --git a/elasticsearch/_async/helpers.py b/elasticsearch/_async/helpers.py index 061ba2414..93da6220e 100644 --- a/elasticsearch/_async/helpers.py +++ b/elasticsearch/_async/helpers.py @@ -97,8 +97,7 @@ async def _chunk_actions( data: _TYPE_BULK_ACTION_BODY if not flush_after_seconds: async for action, data in actions: - ret = chunker.feed(action, data) - if ret: + for ret in chunker.feed(action, data): yield ret else: sender, receiver = create_memory_object_stream[ @@ -128,8 +127,7 @@ async def get_items() -> None: if action is BulkMeta.done: break - ret = chunker.feed(action, data) - if ret: + for ret in chunker.feed(action, data): yield ret ret = chunker.flush() diff --git a/elasticsearch/helpers/actions.py b/elasticsearch/helpers/actions.py index 22d4ebe22..320253f1b 100644 --- a/elasticsearch/helpers/actions.py +++ b/elasticsearch/helpers/actions.py @@ -158,7 +158,7 @@ def feed( self, action: _TYPE_BULK_ACTION_HEADER_WITH_META, data: _TYPE_BULK_ACTION_BODY, - ) -> Optional[ + ) -> List[ Tuple[ List[ Union[ @@ -169,43 +169,42 @@ def feed( List[bytes], ] ]: - ret = None - action_bytes = b"" - data_bytes: Optional[bytes] = None - cur_size = 0 - if not isinstance(action, BulkMeta): - action_bytes = to_bytes(self.serializer.dumps(action), "utf-8") - # +1 to account for the trailing new line character - cur_size = len(action_bytes) + 1 - - if data is not None: - data_bytes = to_bytes(self.serializer.dumps(data), "utf-8") - cur_size += len(data_bytes) + 1 - else: - data_bytes = None + if action is BulkMeta.flush and self.bulk_actions: + return [self.flush()] + elif isinstance(action, BulkMeta): + return [] + + action_bytes = to_bytes(self.serializer.dumps(action), "utf-8") + # +1 to account for the trailing new line character + cur_size = len(action_bytes) + 1 + if data is not None: + data_bytes = to_bytes(self.serializer.dumps(data), "utf-8") + cur_size += len(data_bytes) + 1 + else: + data_bytes = None - # full chunk, send it and start a new one - if self.bulk_actions and ( - self.size + cur_size > self.max_chunk_bytes - or self.action_count == self.chunk_size - or (action == BulkMeta.flush and self.bulk_actions) - ): - ret = (self.bulk_data, self.bulk_actions) - self.bulk_actions = [] - self.bulk_data = [] - self.size = 0 - self.action_count = 0 - - if not isinstance(action, BulkMeta): - self.bulk_actions.append(action_bytes) - if data_bytes is not None: - self.bulk_actions.append(data_bytes) - self.bulk_data.append((action, data)) - else: - self.bulk_data.append((action,)) + ret = [] + + # FORWARD CHECK: if the currently incoming action doesn't fit in the + # active chunk, flush it and hold the action for the next chunk. + if self.bulk_actions and self.size + cur_size > self.max_chunk_bytes: + ret.append(self.flush()) + + self.bulk_actions.append(action_bytes) + if data_bytes is not None: + self.bulk_actions.append(data_bytes) + self.bulk_data.append((action, data)) + else: + self.bulk_data.append((action,)) + self.size += cur_size + self.action_count += 1 + + # PRESENT CHECK: the active chunk must not exceed slots nor size limits + # Note: it could be that incoming action+data's byte size could exceed + # max_chunk_bytes all alone, hence we need to check using >= + if self.action_count == self.chunk_size or self.size >= self.max_chunk_bytes: + ret.append(self.flush()) - self.size += cur_size - self.action_count += 1 return ret def flush( @@ -221,11 +220,14 @@ def flush( List[bytes], ] ]: - ret = None - if self.bulk_actions: - ret = (self.bulk_data, self.bulk_actions) - self.bulk_actions = [] - self.bulk_data = [] + if not self.bulk_actions: + return None + + ret = (self.bulk_data, self.bulk_actions) + self.bulk_actions = [] + self.bulk_data = [] + self.size = 0 + self.action_count = 0 return ret @@ -256,8 +258,7 @@ def _chunk_actions( if not flush_after_seconds: for action, data in actions: - ret = chunker.feed(action, data) - if ret: + for ret in chunker.feed(action, data): yield ret else: item_queue: queue.Queue[_TYPE_BULK_ACTION_HEADER_WITH_META_AND_BODY] = ( @@ -284,8 +285,7 @@ def get_items() -> None: if action is BulkMeta.done: break - ret = chunker.feed(action, data) - if ret: + for ret in chunker.feed(action, data): yield ret ret = chunker.flush()