Skip to content

Commit d132880

Browse files
Add timeout option to streaming_bulk()
1 parent e05d7f1 commit d132880

File tree

1 file changed

+37
-20
lines changed

1 file changed

+37
-20
lines changed

elasticsearch/helpers/actions.py

Lines changed: 37 additions & 20 deletions
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,7 @@
1717

1818
import logging
1919
import time
20+
from enum import Enum
2021
from operator import methodcaller
2122
from queue import Queue
2223
from typing import (
@@ -44,8 +45,15 @@
4445

4546
logger = logging.getLogger("elasticsearch.helpers")
4647

47-
_TYPE_BULK_ACTION = Union[bytes, str, Dict[str, Any]]
48-
_TYPE_BULK_ACTION_HEADER = Dict[str, Any]
48+
49+
class BulkMeta(Enum):
50+
flush = 1
51+
52+
53+
FLUSH_NOW = BulkMeta.flush
54+
55+
_TYPE_BULK_ACTION = Union[bytes, str, Dict[str, Any], BulkMeta]
56+
_TYPE_BULK_ACTION_HEADER = Union[Dict[str, Any], BulkMeta]
4957
_TYPE_BULK_ACTION_BODY = Union[None, bytes, Dict[str, Any]]
5058
_TYPE_BULK_ACTION_HEADER_AND_BODY = Tuple[
5159
_TYPE_BULK_ACTION_HEADER, _TYPE_BULK_ACTION_BODY
@@ -62,6 +70,9 @@ def expand_action(data: _TYPE_BULK_ACTION) -> _TYPE_BULK_ACTION_HEADER_AND_BODY:
6270
if isinstance(data, (bytes, str)):
6371
return {"index": {}}, to_bytes(data, "utf-8")
6472

73+
if isinstance(data, BulkMeta):
74+
return data, {}
75+
6576
# make sure we don't alter the action
6677
data = data.copy()
6778
op_type: str = data.pop("_op_type", "index")
@@ -154,37 +165,43 @@ def feed(
154165
ret = None
155166
raw_action = action
156167
raw_data = data
157-
action_bytes = to_bytes(self.serializer.dumps(action), "utf-8")
158-
# +1 to account for the trailing new line character
159-
cur_size = len(action_bytes) + 1
160-
161-
data_bytes: Optional[bytes]
162-
if data is not None:
163-
data_bytes = to_bytes(self.serializer.dumps(data), "utf-8")
164-
cur_size += len(data_bytes) + 1
165-
else:
166-
data_bytes = None
168+
action_bytes = None
169+
data_bytes = None
170+
cur_size = 0
171+
if not isinstance(action, BulkMeta):
172+
action_bytes = to_bytes(self.serializer.dumps(action), "utf-8")
173+
# +1 to account for the trailing new line character
174+
cur_size = len(action_bytes) + 1
175+
176+
data_bytes: Optional[bytes]
177+
if data is not None:
178+
data_bytes = to_bytes(self.serializer.dumps(data), "utf-8")
179+
cur_size += len(data_bytes) + 1
180+
else:
181+
data_bytes = None
167182

168183
# full chunk, send it and start a new one
169184
if self.bulk_actions and (
170185
self.size + cur_size > self.max_chunk_bytes
171186
or self.action_count == self.chunk_size
187+
or (action == BulkMeta.flush and self.bulk_actions)
172188
):
173189
ret = (self.bulk_data, self.bulk_actions)
174190
self.bulk_actions = []
175191
self.bulk_data = []
176192
self.size = 0
177193
self.action_count = 0
178194

179-
self.bulk_actions.append(action_bytes)
180-
if data_bytes is not None:
181-
self.bulk_actions.append(data_bytes)
182-
self.bulk_data.append((raw_action, raw_data))
183-
else:
184-
self.bulk_data.append((raw_action,))
195+
if action_bytes is not None:
196+
self.bulk_actions.append(action_bytes)
197+
if data_bytes is not None:
198+
self.bulk_actions.append(data_bytes)
199+
self.bulk_data.append((raw_action, raw_data))
200+
else:
201+
self.bulk_data.append((raw_action,))
185202

186-
self.size += cur_size
187-
self.action_count += 1
203+
self.size += cur_size
204+
self.action_count += 1
188205
return ret
189206

190207
def flush(

0 commit comments

Comments
 (0)