Skip to content

Commit 9563bc6

Browse files
unit tests
1 parent fe73e92 commit 9563bc6

File tree

3 files changed

+40
-13
lines changed

3 files changed

+40
-13
lines changed

elasticsearch/helpers/__init__.py

Lines changed: 10 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -19,12 +19,21 @@
1919
from .._utils import fixup_module_metadata
2020
from .actions import _chunk_actions # noqa: F401
2121
from .actions import _process_bulk_chunk # noqa: F401
22-
from .actions import bulk, expand_action, parallel_bulk, reindex, scan, streaming_bulk
22+
from .actions import (
23+
BULK_FLUSH,
24+
bulk,
25+
expand_action,
26+
parallel_bulk,
27+
reindex,
28+
scan,
29+
streaming_bulk,
30+
)
2331
from .errors import BulkIndexError, ScanError
2432

2533
__all__ = [
2634
"BulkIndexError",
2735
"ScanError",
36+
"BULK_FLUSH",
2837
"expand_action",
2938
"streaming_bulk",
3039
"bulk",

elasticsearch/helpers/actions.py

Lines changed: 10 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -50,13 +50,14 @@ class BulkMeta(Enum):
5050
flush = 1
5151

5252

53-
FLUSH_NOW = BulkMeta.flush
53+
BULK_FLUSH = BulkMeta.flush
5454

5555
_TYPE_BULK_ACTION = Union[bytes, str, Dict[str, Any], BulkMeta]
56-
_TYPE_BULK_ACTION_HEADER = Union[Dict[str, Any], BulkMeta]
56+
_TYPE_BULK_ACTION_HEADER = Dict[str, Any]
57+
_TYPE_BULK_ACTION_HEADER_AND_META = Union[Dict[str, Any], BulkMeta]
5758
_TYPE_BULK_ACTION_BODY = Union[None, bytes, Dict[str, Any]]
5859
_TYPE_BULK_ACTION_HEADER_AND_BODY = Tuple[
59-
_TYPE_BULK_ACTION_HEADER, _TYPE_BULK_ACTION_BODY
60+
_TYPE_BULK_ACTION_HEADER_AND_META, _TYPE_BULK_ACTION_BODY
6061
]
6162

6263

@@ -150,7 +151,7 @@ def __init__(
150151
] = []
151152

152153
def feed(
153-
self, action: _TYPE_BULK_ACTION_HEADER, data: _TYPE_BULK_ACTION_BODY
154+
self, action: _TYPE_BULK_ACTION_HEADER_AND_META, data: _TYPE_BULK_ACTION_BODY
154155
) -> Optional[
155156
Tuple[
156157
List[
@@ -163,17 +164,14 @@ def feed(
163164
]
164165
]:
165166
ret = None
166-
raw_action = action
167-
raw_data = data
168-
action_bytes = None
169-
data_bytes = None
167+
action_bytes = b""
168+
data_bytes: Optional[bytes] = None
170169
cur_size = 0
171170
if not isinstance(action, BulkMeta):
172171
action_bytes = to_bytes(self.serializer.dumps(action), "utf-8")
173172
# +1 to account for the trailing new line character
174173
cur_size = len(action_bytes) + 1
175174

176-
data_bytes: Optional[bytes]
177175
if data is not None:
178176
data_bytes = to_bytes(self.serializer.dumps(data), "utf-8")
179177
cur_size += len(data_bytes) + 1
@@ -192,13 +190,13 @@ def feed(
192190
self.size = 0
193191
self.action_count = 0
194192

195-
if action_bytes is not None:
193+
if not isinstance(action, BulkMeta):
196194
self.bulk_actions.append(action_bytes)
197195
if data_bytes is not None:
198196
self.bulk_actions.append(data_bytes)
199-
self.bulk_data.append((raw_action, raw_data))
197+
self.bulk_data.append((action, data))
200198
else:
201-
self.bulk_data.append((raw_action,))
199+
self.bulk_data.append((action,))
202200

203201
self.size += cur_size
204202
self.action_count += 1

test_elasticsearch/test_helpers.py

Lines changed: 20 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -85,6 +85,7 @@ def setup_method(self, _):
8585
def test_expand_action(self):
8686
assert helpers.expand_action({}) == ({"index": {}}, {})
8787
assert helpers.expand_action({"key": "val"}) == ({"index": {}}, {"key": "val"})
88+
assert helpers.expand_action(helpers.BULK_FLUSH) == (helpers.BULK_FLUSH, {})
8889

8990
def test_expand_action_actions(self):
9091
assert helpers.expand_action(
@@ -178,6 +179,25 @@ def test_chunks_are_chopped_by_byte_size_properly(self):
178179
chunk = b"".join(chunk_actions)
179180
assert len(chunk) <= max_byte_size
180181

182+
def test_chunks_are_chopped_by_flush(self):
183+
flush = helpers.expand_action(helpers.BULK_FLUSH)
184+
actions = (
185+
self.actions[:3]
186+
+ [flush] * 2 # two consecutive flushes after 3 items
187+
+ self.actions[3:4]
188+
+ [flush] # flush after one more item
189+
+ self.actions[4:]
190+
+ [flush] # flush at the end
191+
)
192+
chunks = list(helpers._chunk_actions(actions, 100, 99999999, JSONSerializer()))
193+
assert 3 == len(chunks)
194+
assert len(chunks[0][0]) == 3
195+
assert len(chunks[0][1]) == 6
196+
assert len(chunks[1][0]) == 1
197+
assert len(chunks[1][1]) == 2
198+
assert len(chunks[2][0]) == 96
199+
assert len(chunks[2][1]) == 192
200+
181201

182202
class TestExpandActions:
183203
@pytest.mark.parametrize("action", ["whatever", b"whatever"])

0 commit comments

Comments
 (0)