Skip to content

Commit c0c9bce

Browse files
- fix: chunker withholding last batch until next action would arrive even if the batch was ready.
1 parent 42f1834 commit c0c9bce

File tree

2 files changed

+47
-49
lines changed

2 files changed

+47
-49
lines changed

elasticsearch/_async/helpers.py

Lines changed: 2 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -97,8 +97,7 @@ async def _chunk_actions(
9797
data: _TYPE_BULK_ACTION_BODY
9898
if not flush_after_seconds:
9999
async for action, data in actions:
100-
ret = chunker.feed(action, data)
101-
if ret:
100+
for ret in chunker.feed(action, data):
102101
yield ret
103102
else:
104103
sender, receiver = create_memory_object_stream[
@@ -128,8 +127,7 @@ async def get_items() -> None:
128127

129128
if action is BulkMeta.done:
130129
break
131-
ret = chunker.feed(action, data)
132-
if ret:
130+
for ret in chunker.feed(action, data):
133131
yield ret
134132

135133
ret = chunker.flush()

elasticsearch/helpers/actions.py

Lines changed: 45 additions & 45 deletions
Original file line numberDiff line numberDiff line change
@@ -158,7 +158,7 @@ def feed(
158158
self,
159159
action: _TYPE_BULK_ACTION_HEADER_WITH_META,
160160
data: _TYPE_BULK_ACTION_BODY,
161-
) -> Optional[
161+
) -> List[
162162
Tuple[
163163
List[
164164
Union[
@@ -169,43 +169,42 @@ def feed(
169169
List[bytes],
170170
]
171171
]:
172-
ret = None
173-
action_bytes = b""
174-
data_bytes: Optional[bytes] = None
175-
cur_size = 0
176-
if not isinstance(action, BulkMeta):
177-
action_bytes = to_bytes(self.serializer.dumps(action), "utf-8")
178-
# +1 to account for the trailing new line character
179-
cur_size = len(action_bytes) + 1
180-
181-
if data is not None:
182-
data_bytes = to_bytes(self.serializer.dumps(data), "utf-8")
183-
cur_size += len(data_bytes) + 1
184-
else:
185-
data_bytes = None
172+
if action is BulkMeta.flush and self.bulk_actions:
173+
return [self.flush()]
174+
elif isinstance(action, BulkMeta):
175+
return []
176+
177+
action_bytes = to_bytes(self.serializer.dumps(action), "utf-8")
178+
# +1 to account for the trailing new line character
179+
cur_size = len(action_bytes) + 1
180+
if data is not None:
181+
data_bytes = to_bytes(self.serializer.dumps(data), "utf-8")
182+
cur_size += len(data_bytes) + 1
183+
else:
184+
data_bytes = None
186185

187-
# full chunk, send it and start a new one
188-
if self.bulk_actions and (
189-
self.size + cur_size > self.max_chunk_bytes
190-
or self.action_count == self.chunk_size
191-
or (action == BulkMeta.flush and self.bulk_actions)
192-
):
193-
ret = (self.bulk_data, self.bulk_actions)
194-
self.bulk_actions = []
195-
self.bulk_data = []
196-
self.size = 0
197-
self.action_count = 0
198-
199-
if not isinstance(action, BulkMeta):
200-
self.bulk_actions.append(action_bytes)
201-
if data_bytes is not None:
202-
self.bulk_actions.append(data_bytes)
203-
self.bulk_data.append((action, data))
204-
else:
205-
self.bulk_data.append((action,))
186+
ret = []
187+
188+
# FORWARD CHECK: if the currently incoming action doesn't fit in the
189+
# active chunk, flush it and hold the action for the next chunk.
190+
if self.bulk_actions and self.size + cur_size > self.max_chunk_bytes:
191+
ret.append(self.flush())
192+
193+
self.bulk_actions.append(action_bytes)
194+
if data_bytes is not None:
195+
self.bulk_actions.append(data_bytes)
196+
self.bulk_data.append((action, data))
197+
else:
198+
self.bulk_data.append((action,))
199+
self.size += cur_size
200+
self.action_count += 1
201+
202+
# PRESENT CHECK: the active chunk must not exceed slots nor size limits
203+
# Note: it could be that incoming action+data's byte size could exceed
204+
# max_chunk_bytes all alone, hence we need to check using >=
205+
if self.action_count == self.chunk_size or self.size >= self.max_chunk_bytes:
206+
ret.append(self.flush())
206207

207-
self.size += cur_size
208-
self.action_count += 1
209208
return ret
210209

211210
def flush(
@@ -221,11 +220,14 @@ def flush(
221220
List[bytes],
222221
]
223222
]:
224-
ret = None
225-
if self.bulk_actions:
226-
ret = (self.bulk_data, self.bulk_actions)
227-
self.bulk_actions = []
228-
self.bulk_data = []
223+
if not self.bulk_actions:
224+
return None
225+
226+
ret = (self.bulk_data, self.bulk_actions)
227+
self.bulk_actions = []
228+
self.bulk_data = []
229+
self.size = 0
230+
self.action_count = 0
229231
return ret
230232

231233

@@ -256,8 +258,7 @@ def _chunk_actions(
256258

257259
if not flush_after_seconds:
258260
for action, data in actions:
259-
ret = chunker.feed(action, data)
260-
if ret:
261+
for ret in chunker.feed(action, data):
261262
yield ret
262263
else:
263264
item_queue: queue.Queue[_TYPE_BULK_ACTION_HEADER_WITH_META_AND_BODY] = (
@@ -284,8 +285,7 @@ def get_items() -> None:
284285

285286
if action is BulkMeta.done:
286287
break
287-
ret = chunker.feed(action, data)
288-
if ret:
288+
for ret in chunker.feed(action, data):
289289
yield ret
290290

291291
ret = chunker.flush()

0 commit comments

Comments
 (0)