Skip to content

Commit 12762fb

Browse files
bulk timeouts
1 parent 9563bc6 commit 12762fb

File tree

3 files changed

+150
-34
lines changed

3 files changed

+150
-34
lines changed

elasticsearch/_async/helpers.py

Lines changed: 44 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -39,6 +39,8 @@
3939
_TYPE_BULK_ACTION_BODY,
4040
_TYPE_BULK_ACTION_HEADER,
4141
_TYPE_BULK_ACTION_HEADER_AND_BODY,
42+
_TYPE_BULK_ACTION_HEADER_WITH_META_AND_BODY,
43+
BulkMeta,
4244
_ActionChunker,
4345
_process_bulk_chunk_error,
4446
_process_bulk_chunk_success,
@@ -54,9 +56,10 @@
5456

5557

5658
async def _chunk_actions(
57-
actions: AsyncIterable[_TYPE_BULK_ACTION_HEADER_AND_BODY],
59+
actions: AsyncIterable[_TYPE_BULK_ACTION_HEADER_WITH_META_AND_BODY],
5860
chunk_size: int,
5961
max_chunk_bytes: int,
62+
flush_after_seconds: Optional[float],
6063
serializer: Serializer,
6164
) -> AsyncIterable[
6265
Tuple[
@@ -76,10 +79,44 @@ async def _chunk_actions(
7679
chunker = _ActionChunker(
7780
chunk_size=chunk_size, max_chunk_bytes=max_chunk_bytes, serializer=serializer
7881
)
79-
async for action, data in actions:
80-
ret = chunker.feed(action, data)
81-
if ret:
82-
yield ret
82+
83+
if not flush_after_seconds:
84+
async for action, data in actions:
85+
ret = chunker.feed(action, data)
86+
if ret:
87+
yield ret
88+
else:
89+
item_queue: asyncio.Queue[_TYPE_BULK_ACTION_HEADER_WITH_META_AND_BODY] = (
90+
asyncio.Queue()
91+
)
92+
93+
async def get_items() -> None:
94+
try:
95+
async for item in actions:
96+
await item_queue.put(item)
97+
except Exception:
98+
await item_queue.put((BulkMeta.done, None))
99+
raise
100+
await item_queue.put((BulkMeta.done, None))
101+
102+
item_getter_job = asyncio.create_task(get_items())
103+
104+
timeout: Optional[float] = flush_after_seconds
105+
while True:
106+
try:
107+
action, data = await asyncio.wait_for(item_queue.get(), timeout=timeout)
108+
timeout = flush_after_seconds
109+
except asyncio.TimeoutError:
110+
action, data = BulkMeta.flush, None
111+
timeout = None
112+
113+
if action is BulkMeta.done:
114+
break
115+
ret = chunker.feed(action, data)
116+
if ret:
117+
yield ret
118+
await item_getter_job
119+
83120
ret = chunker.flush()
84121
if ret:
85122
yield ret
@@ -162,6 +199,7 @@ async def async_streaming_bulk(
162199
actions: Union[Iterable[_TYPE_BULK_ACTION], AsyncIterable[_TYPE_BULK_ACTION]],
163200
chunk_size: int = 500,
164201
max_chunk_bytes: int = 100 * 1024 * 1024,
202+
flush_after_seconds: Optional[float] = None,
165203
raise_on_error: bool = True,
166204
expand_action_callback: Callable[
167205
[_TYPE_BULK_ACTION], _TYPE_BULK_ACTION_HEADER_AND_BODY
@@ -234,7 +272,7 @@ async def map_actions() -> AsyncIterable[_TYPE_BULK_ACTION_HEADER_AND_BODY]:
234272
]
235273
bulk_actions: List[bytes]
236274
async for bulk_data, bulk_actions in _chunk_actions(
237-
map_actions(), chunk_size, max_chunk_bytes, serializer
275+
map_actions(), chunk_size, max_chunk_bytes, flush_after_seconds, serializer
238276
):
239277
for attempt in range(max_retries + 1):
240278
to_retry: List[bytes] = []

elasticsearch/helpers/actions.py

Lines changed: 78 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -16,10 +16,11 @@
1616
# under the License.
1717

1818
import logging
19+
import queue
1920
import time
2021
from enum import Enum
2122
from operator import methodcaller
22-
from queue import Queue
23+
from threading import Thread
2324
from typing import (
2425
Any,
2526
Callable,
@@ -48,16 +49,23 @@
4849

4950
class BulkMeta(Enum):
5051
flush = 1
52+
done = 2
5153

5254

5355
BULK_FLUSH = BulkMeta.flush
5456

55-
_TYPE_BULK_ACTION = Union[bytes, str, Dict[str, Any], BulkMeta]
57+
_TYPE_BULK_ACTION = Union[bytes, str, Dict[str, Any]]
5658
_TYPE_BULK_ACTION_HEADER = Dict[str, Any]
57-
_TYPE_BULK_ACTION_HEADER_AND_META = Union[Dict[str, Any], BulkMeta]
5859
_TYPE_BULK_ACTION_BODY = Union[None, bytes, Dict[str, Any]]
5960
_TYPE_BULK_ACTION_HEADER_AND_BODY = Tuple[
60-
_TYPE_BULK_ACTION_HEADER_AND_META, _TYPE_BULK_ACTION_BODY
61+
_TYPE_BULK_ACTION_HEADER, _TYPE_BULK_ACTION_BODY
62+
]
63+
64+
_TYPE_BULK_ACTION_WITH_META = Union[bytes, str, Dict[str, Any], BulkMeta]
65+
_TYPE_BULK_ACTION_HEADER_WITH_META = Union[Dict[str, Any], BulkMeta]
66+
_TYPE_BULK_ACTION_HEADER_WITH_META_AND_BODY = Union[
67+
Tuple[_TYPE_BULK_ACTION_HEADER, _TYPE_BULK_ACTION_BODY],
68+
Tuple[BulkMeta, Any],
6169
]
6270

6371

@@ -71,9 +79,6 @@ def expand_action(data: _TYPE_BULK_ACTION) -> _TYPE_BULK_ACTION_HEADER_AND_BODY:
7179
if isinstance(data, (bytes, str)):
7280
return {"index": {}}, to_bytes(data, "utf-8")
7381

74-
if isinstance(data, BulkMeta):
75-
return data, {}
76-
7782
# make sure we don't alter the action
7883
data = data.copy()
7984
op_type: str = data.pop("_op_type", "index")
@@ -151,7 +156,9 @@ def __init__(
151156
] = []
152157

153158
def feed(
154-
self, action: _TYPE_BULK_ACTION_HEADER_AND_META, data: _TYPE_BULK_ACTION_BODY
159+
self,
160+
action: _TYPE_BULK_ACTION_HEADER_WITH_META,
161+
data: _TYPE_BULK_ACTION_BODY,
155162
) -> Optional[
156163
Tuple[
157164
List[
@@ -224,9 +231,10 @@ def flush(
224231

225232

226233
def _chunk_actions(
227-
actions: Iterable[_TYPE_BULK_ACTION_HEADER_AND_BODY],
234+
actions: Iterable[_TYPE_BULK_ACTION_HEADER_WITH_META_AND_BODY],
228235
chunk_size: int,
229236
max_chunk_bytes: int,
237+
flush_after_seconds: Optional[float],
230238
serializer: Serializer,
231239
) -> Iterable[
232240
Tuple[
@@ -246,10 +254,48 @@ def _chunk_actions(
246254
chunker = _ActionChunker(
247255
chunk_size=chunk_size, max_chunk_bytes=max_chunk_bytes, serializer=serializer
248256
)
249-
for action, data in actions:
250-
ret = chunker.feed(action, data)
251-
if ret:
252-
yield ret
257+
258+
if not flush_after_seconds:
259+
for action, data in actions:
260+
ret = chunker.feed(action, data)
261+
if ret:
262+
yield ret
263+
else:
264+
item_queue: queue.Queue[_TYPE_BULK_ACTION_HEADER_WITH_META_AND_BODY] = (
265+
queue.Queue()
266+
)
267+
268+
def get_items() -> None:
269+
ret = None
270+
try:
271+
for item in actions:
272+
item_queue.put(item)
273+
except BaseException as exc:
274+
ret = exc
275+
item_queue.put((BulkMeta.done, ret))
276+
277+
item_getter_job = Thread(target=get_items)
278+
item_getter_job.start()
279+
280+
timeout: Optional[float] = flush_after_seconds
281+
while True:
282+
try:
283+
action, data = item_queue.get(timeout=timeout)
284+
timeout = flush_after_seconds
285+
except queue.Empty:
286+
action, data = BulkMeta.flush, None
287+
timeout = None
288+
289+
if action is BulkMeta.done:
290+
if isinstance(data, BaseException):
291+
raise data
292+
break
293+
ret = chunker.feed(action, data)
294+
if ret:
295+
yield ret
296+
297+
item_getter_job.join()
298+
253299
ret = chunker.flush()
254300
if ret:
255301
yield ret
@@ -376,9 +422,10 @@ def _process_bulk_chunk(
376422

377423
def streaming_bulk(
378424
client: Elasticsearch,
379-
actions: Iterable[_TYPE_BULK_ACTION],
425+
actions: Iterable[_TYPE_BULK_ACTION_WITH_META],
380426
chunk_size: int = 500,
381427
max_chunk_bytes: int = 100 * 1024 * 1024,
428+
flush_after_seconds: Optional[float] = None,
382429
raise_on_error: bool = True,
383430
expand_action_callback: Callable[
384431
[_TYPE_BULK_ACTION], _TYPE_BULK_ACTION_HEADER_AND_BODY
@@ -440,6 +487,13 @@ def streaming_bulk(
440487

441488
serializer = client.transport.serializers.get_serializer("application/json")
442489

490+
def expand_action_with_meta(
491+
data: _TYPE_BULK_ACTION_WITH_META,
492+
) -> _TYPE_BULK_ACTION_HEADER_WITH_META_AND_BODY:
493+
if isinstance(data, BulkMeta):
494+
return data, None
495+
return expand_action_callback(data)
496+
443497
bulk_data: List[
444498
Union[
445499
Tuple[_TYPE_BULK_ACTION_HEADER],
@@ -448,9 +502,10 @@ def streaming_bulk(
448502
]
449503
bulk_actions: List[bytes]
450504
for bulk_data, bulk_actions in _chunk_actions(
451-
map(expand_action_callback, actions),
505+
map(expand_action_with_meta, actions),
452506
chunk_size,
453507
max_chunk_bytes,
508+
flush_after_seconds,
454509
serializer,
455510
):
456511
for attempt in range(max_retries + 1):
@@ -572,6 +627,7 @@ def parallel_bulk(
572627
thread_count: int = 4,
573628
chunk_size: int = 500,
574629
max_chunk_bytes: int = 100 * 1024 * 1024,
630+
flush_after_seconds: Optional[float] = None,
575631
queue_size: int = 4,
576632
expand_action_callback: Callable[
577633
[_TYPE_BULK_ACTION], _TYPE_BULK_ACTION_HEADER_AND_BODY
@@ -611,7 +667,7 @@ def _setup_queues(self) -> None:
611667
super()._setup_queues() # type: ignore[misc]
612668
# The queue must be at least the size of the number of threads to
613669
# prevent hanging when inserting sentinel values during teardown.
614-
self._inqueue: Queue[
670+
self._inqueue: queue.Queue[
615671
Tuple[
616672
List[
617673
Union[
@@ -620,7 +676,7 @@ def _setup_queues(self) -> None:
620676
],
621677
List[bytes],
622678
]
623-
] = Queue(max(queue_size, thread_count))
679+
] = queue.Queue(max(queue_size, thread_count))
624680
self._quick_put = self._inqueue.put
625681

626682
with client._otel.helpers_span("helpers.parallel_bulk") as otel_span:
@@ -640,7 +696,11 @@ def _setup_queues(self) -> None:
640696
)
641697
),
642698
_chunk_actions(
643-
expanded_actions, chunk_size, max_chunk_bytes, serializer
699+
expanded_actions,
700+
chunk_size,
701+
max_chunk_bytes,
702+
flush_after_seconds,
703+
serializer,
644704
),
645705
):
646706
yield from result

test_elasticsearch/test_helpers.py

Lines changed: 28 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,7 @@
1818
import pickle
1919
import threading
2020
import time
21+
from typing import Optional
2122
from unittest import mock
2223

2324
import pytest
@@ -85,7 +86,6 @@ def setup_method(self, _):
8586
def test_expand_action(self):
8687
assert helpers.expand_action({}) == ({"index": {}}, {})
8788
assert helpers.expand_action({"key": "val"}) == ({"index": {}}, {"key": "val"})
88-
assert helpers.expand_action(helpers.BULK_FLUSH) == (helpers.BULK_FLUSH, {})
8989

9090
def test_expand_action_actions(self):
9191
assert helpers.expand_action(
@@ -157,30 +157,44 @@ def test__source_metadata_or_source(self):
157157
{"_source": {"key2": "val2"}, "key": "val", "_op_type": "update"}
158158
) == ({"update": {}}, {"key2": "val2"})
159159

160-
def test_chunks_are_chopped_by_byte_size(self):
160+
@pytest.mark.parametrize("flush_seconds", [None, 10])
161+
def test_chunks_are_chopped_by_byte_size(self, flush_seconds: Optional[float]):
161162
assert 100 == len(
162-
list(helpers._chunk_actions(self.actions, 100000, 1, JSONSerializer()))
163+
list(
164+
helpers._chunk_actions(
165+
self.actions, 100000, 1, flush_seconds, JSONSerializer()
166+
)
167+
)
163168
)
164169

165-
def test_chunks_are_chopped_by_chunk_size(self):
170+
@pytest.mark.parametrize("flush_seconds", [None, 10])
171+
def test_chunks_are_chopped_by_chunk_size(self, flush_seconds: Optional[float]):
166172
assert 10 == len(
167-
list(helpers._chunk_actions(self.actions, 10, 99999999, JSONSerializer()))
173+
list(
174+
helpers._chunk_actions(
175+
self.actions, 10, 99999999, flush_seconds, JSONSerializer()
176+
)
177+
)
168178
)
169179

170-
def test_chunks_are_chopped_by_byte_size_properly(self):
180+
@pytest.mark.parametrize("flush_seconds", [None, 10])
181+
def test_chunks_are_chopped_by_byte_size_properly(
182+
self, flush_seconds: Optional[float]
183+
):
171184
max_byte_size = 170
172185
chunks = list(
173186
helpers._chunk_actions(
174-
self.actions, 100000, max_byte_size, JSONSerializer()
187+
self.actions, 100000, max_byte_size, flush_seconds, JSONSerializer()
175188
)
176189
)
177190
assert 25 == len(chunks)
178191
for chunk_data, chunk_actions in chunks:
179192
chunk = b"".join(chunk_actions)
180193
assert len(chunk) <= max_byte_size
181194

182-
def test_chunks_are_chopped_by_flush(self):
183-
flush = helpers.expand_action(helpers.BULK_FLUSH)
195+
@pytest.mark.parametrize("flush_seconds", [None, 10])
196+
def test_chunks_are_chopped_by_flush(self, flush_seconds: Optional[float]):
197+
flush = (helpers.BULK_FLUSH, None)
184198
actions = (
185199
self.actions[:3]
186200
+ [flush] * 2 # two consecutive flushes after 3 items
@@ -189,7 +203,11 @@ def test_chunks_are_chopped_by_flush(self):
189203
+ self.actions[4:]
190204
+ [flush] # flush at the end
191205
)
192-
chunks = list(helpers._chunk_actions(actions, 100, 99999999, JSONSerializer()))
206+
chunks = list(
207+
helpers._chunk_actions(
208+
actions, 100, 99999999, flush_seconds, JSONSerializer()
209+
)
210+
)
193211
assert 3 == len(chunks)
194212
assert len(chunks[0][0]) == 3
195213
assert len(chunks[0][1]) == 6

0 commit comments

Comments
 (0)