Skip to content

Commit a8cf8d9

Browse files
Sparkyczsethmlarson
authored andcommitted
[7.x] Add ignore_status option to bulk helpers
1 parent e4cdd45 commit a8cf8d9

File tree

6 files changed

+121
-12
lines changed

6 files changed

+121
-12
lines changed

elasticsearch/_async/helpers.py

Lines changed: 16 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -59,25 +59,33 @@ async def _process_bulk_chunk(
5959
bulk_data,
6060
raise_on_exception=True,
6161
raise_on_error=True,
62+
ignore_status=(),
6263
*args,
6364
**kwargs
6465
):
6566
"""
6667
Send a bulk request to elasticsearch and process the output.
6768
"""
69+
if not isinstance(ignore_status, (list, tuple)):
70+
ignore_status = (ignore_status,)
71+
6872
try:
6973
# send the actual request
7074
resp = await client.bulk("\n".join(bulk_actions) + "\n", *args, **kwargs)
7175
except TransportError as e:
7276
gen = _process_bulk_chunk_error(
7377
error=e,
7478
bulk_data=bulk_data,
79+
ignore_status=ignore_status,
7580
raise_on_exception=raise_on_exception,
7681
raise_on_error=raise_on_error,
7782
)
7883
else:
7984
gen = _process_bulk_chunk_success(
80-
resp=resp, bulk_data=bulk_data, raise_on_error=raise_on_error
85+
resp=resp,
86+
bulk_data=bulk_data,
87+
ignore_status=ignore_status,
88+
raise_on_error=raise_on_error,
8189
)
8290
for item in gen:
8391
yield item
@@ -121,6 +129,7 @@ async def async_streaming_bulk(
121129
initial_backoff=2,
122130
max_backoff=600,
123131
yield_ok=True,
132+
ignore_status=(),
124133
*args,
125134
**kwargs
126135
):
@@ -156,6 +165,7 @@ async def async_streaming_bulk(
156165
2**retry_number``
157166
:arg max_backoff: maximum number of seconds a retry will wait
158167
:arg yield_ok: if set to False will skip successful documents in the output
168+
:arg ignore_status: list of HTTP status code that you want to ignore
159169
"""
160170

161171
async def map_actions():
@@ -182,6 +192,7 @@ async def map_actions():
182192
bulk_data,
183193
raise_on_exception,
184194
raise_on_error,
195+
ignore_status,
185196
*args,
186197
**kwargs
187198
),
@@ -218,7 +229,9 @@ async def map_actions():
218229
bulk_actions, bulk_data = to_retry, to_retry_data
219230

220231

221-
async def async_bulk(client, actions, stats_only=False, *args, **kwargs):
232+
async def async_bulk(
233+
client, actions, stats_only=False, ignore_status=(), *args, **kwargs
234+
):
222235
"""
223236
Helper for the :meth:`~elasticsearch.AsyncElasticsearch.bulk` api that provides
224237
a more human friendly interface - it consumes an iterator of actions and
@@ -240,6 +253,7 @@ async def async_bulk(client, actions, stats_only=False, *args, **kwargs):
240253
:arg actions: iterator containing the actions
241254
:arg stats_only: if `True` only report number of successful/failed
242255
operations instead of just number of successful and a list of error responses
256+
:arg ignore_status: list of HTTP status code that you want to ignore
243257
244258
Any additional keyword arguments will be passed to
245259
:func:`~elasticsearch.helpers.async_streaming_bulk` which is used to execute

elasticsearch/_async/helpers.pyi

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -48,6 +48,7 @@ def _process_bulk_chunk(
4848
bulk_data: Any,
4949
raise_on_exception: bool = ...,
5050
raise_on_error: bool = ...,
51+
ignore_status: Optional[Union[int, Collection[int]]] = ...,
5152
*args: Any,
5253
**kwargs: Any
5354
) -> AsyncGenerator[Tuple[bool, Any], None]: ...
@@ -67,13 +68,15 @@ def async_streaming_bulk(
6768
initial_backoff: Union[float, int] = ...,
6869
max_backoff: Union[float, int] = ...,
6970
yield_ok: bool = ...,
71+
ignore_status: Optional[Union[int, Collection[int]]] = ...,
7072
*args: Any,
7173
**kwargs: Any
7274
) -> AsyncGenerator[Tuple[bool, Any], None]: ...
7375
async def async_bulk(
7476
client: AsyncElasticsearch,
7577
actions: Union[Iterable[Any], AsyncIterable[Any]],
7678
stats_only: bool = ...,
79+
ignore_status: Optional[Union[int, Collection[int]]] = ...,
7780
*args: Any,
7881
**kwargs: Any
7982
) -> Tuple[int, Union[int, List[Any]]]: ...

elasticsearch/helpers/actions.py

Lines changed: 33 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -161,16 +161,18 @@ def _chunk_actions(actions, chunk_size, max_chunk_bytes, serializer):
161161
yield ret
162162

163163

164-
def _process_bulk_chunk_success(resp, bulk_data, raise_on_error=True):
164+
def _process_bulk_chunk_success(resp, bulk_data, ignore_status, raise_on_error=True):
165165
# if raise on error is set, we need to collect errors per chunk before raising them
166166
errors = []
167167

168168
# go through request-response pairs and detect failures
169169
for data, (op_type, item) in zip(
170170
bulk_data, map(methodcaller("popitem"), resp["items"])
171171
):
172-
ok = 200 <= item.get("status", 500) < 300
173-
if not ok and raise_on_error:
172+
status_code = item.get("status", 500)
173+
174+
ok = 200 <= status_code < 300
175+
if not ok and raise_on_error and status_code not in ignore_status:
174176
# include original document source
175177
if len(data) > 1:
176178
item["data"] = data[1]
@@ -186,10 +188,10 @@ def _process_bulk_chunk_success(resp, bulk_data, raise_on_error=True):
186188

187189

188190
def _process_bulk_chunk_error(
189-
error, bulk_data, raise_on_exception=True, raise_on_error=True
191+
error, bulk_data, ignore_status, raise_on_exception=True, raise_on_error=True
190192
):
191193
# default behavior - just propagate exception
192-
if raise_on_exception:
194+
if raise_on_exception and error.status_code not in ignore_status:
193195
raise error
194196

195197
# if we are not propagating, mark all actions in current chunk as failed
@@ -206,7 +208,7 @@ def _process_bulk_chunk_error(
206208
exc_errors.append({op_type: info})
207209

208210
# emulate standard behavior for failed actions
209-
if raise_on_error:
211+
if raise_on_error and error.status_code not in ignore_status:
210212
raise BulkIndexError(
211213
"%i document(s) failed to index." % len(exc_errors), exc_errors
212214
)
@@ -221,6 +223,7 @@ def _process_bulk_chunk(
221223
bulk_data,
222224
raise_on_exception=True,
223225
raise_on_error=True,
226+
ignore_status=(),
224227
*args,
225228
**kwargs
226229
):
@@ -229,19 +232,26 @@ def _process_bulk_chunk(
229232
"""
230233
kwargs = _add_helper_meta_to_kwargs(kwargs, "bp")
231234

235+
if not isinstance(ignore_status, (list, tuple)):
236+
ignore_status = (ignore_status,)
237+
232238
try:
233239
# send the actual request
234240
resp = client.bulk("\n".join(bulk_actions) + "\n", *args, **kwargs)
235241
except TransportError as e:
236242
gen = _process_bulk_chunk_error(
237243
error=e,
238244
bulk_data=bulk_data,
245+
ignore_status=ignore_status,
239246
raise_on_exception=raise_on_exception,
240247
raise_on_error=raise_on_error,
241248
)
242249
else:
243250
gen = _process_bulk_chunk_success(
244-
resp=resp, bulk_data=bulk_data, raise_on_error=raise_on_error
251+
resp=resp,
252+
bulk_data=bulk_data,
253+
ignore_status=ignore_status,
254+
raise_on_error=raise_on_error,
245255
)
246256
for item in gen:
247257
yield item
@@ -266,6 +276,7 @@ def streaming_bulk(
266276
initial_backoff=2,
267277
max_backoff=600,
268278
yield_ok=True,
279+
ignore_status=(),
269280
*args,
270281
**kwargs
271282
):
@@ -301,6 +312,7 @@ def streaming_bulk(
301312
2**retry_number``
302313
:arg max_backoff: maximum number of seconds a retry will wait
303314
:arg yield_ok: if set to False will skip successful documents in the output
315+
:arg ignore_status: list of HTTP status code that you want to ignore
304316
"""
305317
actions = map(expand_action_callback, actions)
306318

@@ -322,6 +334,7 @@ def streaming_bulk(
322334
bulk_data,
323335
raise_on_exception,
324336
raise_on_error,
337+
ignore_status,
325338
*args,
326339
**kwargs
327340
),
@@ -358,7 +371,7 @@ def streaming_bulk(
358371
bulk_actions, bulk_data = to_retry, to_retry_data
359372

360373

361-
def bulk(client, actions, stats_only=False, *args, **kwargs):
374+
def bulk(client, actions, stats_only=False, ignore_status=(), *args, **kwargs):
362375
"""
363376
Helper for the :meth:`~elasticsearch.Elasticsearch.bulk` api that provides
364377
a more human friendly interface - it consumes an iterator of actions and
@@ -380,6 +393,7 @@ def bulk(client, actions, stats_only=False, *args, **kwargs):
380393
:arg actions: iterator containing the actions
381394
:arg stats_only: if `True` only report number of successful/failed
382395
operations instead of just number of successful and a list of error responses
396+
:arg ignore_status: list of HTTP status code that you want to ignore
383397
384398
Any additional keyword arguments will be passed to
385399
:func:`~elasticsearch.helpers.streaming_bulk` which is used to execute
@@ -393,7 +407,9 @@ def bulk(client, actions, stats_only=False, *args, **kwargs):
393407

394408
# make streaming_bulk yield successful results so we can count them
395409
kwargs["yield_ok"] = True
396-
for ok, item in streaming_bulk(client, actions, *args, **kwargs):
410+
for ok, item in streaming_bulk(
411+
client, actions, ignore_status=ignore_status, *args, **kwargs
412+
):
397413
# go through request-response pairs and detect failures
398414
if not ok:
399415
if not stats_only:
@@ -413,6 +429,7 @@ def parallel_bulk(
413429
max_chunk_bytes=100 * 1024 * 1024,
414430
queue_size=4,
415431
expand_action_callback=expand_action,
432+
ignore_status=(),
416433
*args,
417434
**kwargs
418435
):
@@ -433,6 +450,7 @@ def parallel_bulk(
433450
(`None` if data line should be omitted).
434451
:arg queue_size: size of the task queue between the main thread (producing
435452
chunks to send) and the processing threads.
453+
:arg ignore_status: list of HTTP status code that you want to ignore
436454
"""
437455
# Avoid importing multiprocessing unless parallel_bulk is used
438456
# to avoid exceptions on restricted environments like App Engine
@@ -454,7 +472,12 @@ def _setup_queues(self):
454472
for result in pool.imap(
455473
lambda bulk_chunk: list(
456474
_process_bulk_chunk(
457-
client, bulk_chunk[1], bulk_chunk[0], *args, **kwargs
475+
client,
476+
bulk_chunk[1],
477+
bulk_chunk[0],
478+
ignore_status=ignore_status,
479+
*args,
480+
**kwargs
458481
)
459482
),
460483
_chunk_actions(

elasticsearch/helpers/actions.pyi

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -61,13 +61,15 @@ def streaming_bulk(
6161
initial_backoff: Union[float, int] = ...,
6262
max_backoff: Union[float, int] = ...,
6363
yield_ok: bool = ...,
64+
ignore_status: Optional[Union[int, Collection[int]]] = ...,
6465
*args: Any,
6566
**kwargs: Any
6667
) -> Generator[Tuple[bool, Any], None, None]: ...
6768
def bulk(
6869
client: Elasticsearch,
6970
actions: Iterable[Any],
7071
stats_only: bool = ...,
72+
ignore_status: Optional[Union[int, Collection[int]]] = ...,
7173
*args: Any,
7274
**kwargs: Any
7375
) -> Tuple[int, Union[int, List[Any]]]: ...
@@ -79,6 +81,7 @@ def parallel_bulk(
7981
max_chunk_bytes: int = ...,
8082
queue_size: int = ...,
8183
expand_action_callback: Callable[[Any], Tuple[Dict[str, Any], Optional[Any]]] = ...,
84+
ignore_status: Optional[Union[int, Collection[int]]] = ...,
8285
*args: Any,
8386
**kwargs: Any
8487
) -> Generator[Tuple[bool, Any], None, None]: ...

test_elasticsearch/test_async/test_server/test_helpers.py

Lines changed: 33 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -336,6 +336,39 @@ async def test_error_is_raised(self, async_client):
336336
with pytest.raises(helpers.BulkIndexError):
337337
await helpers.async_bulk(async_client, [{"a": 42}, {"a": "c"}], index="i")
338338

339+
async def test_ignore_error_if_raised(self, async_client):
340+
# ignore the status code 400 in tuple
341+
await helpers.async_bulk(
342+
async_client, [{"a": 42}, {"a": "c"}], index="i", ignore_status=(400,)
343+
)
344+
345+
# ignore the status code 400 in list
346+
await helpers.async_bulk(
347+
async_client,
348+
[{"a": 42}, {"a": "c"}],
349+
index="i",
350+
ignore_status=[
351+
400,
352+
],
353+
)
354+
355+
# ignore the status code 400
356+
await helpers.async_bulk(
357+
async_client, [{"a": 42}, {"a": "c"}], index="i", ignore_status=400
358+
)
359+
360+
# ignore only the status code in the `ignore_status` argument
361+
with pytest.raises(helpers.BulkIndexError):
362+
await helpers.async_bulk(
363+
async_client, [{"a": 42}, {"a": "c"}], index="i", ignore_status=(444,)
364+
)
365+
366+
# ignore transport error exception
367+
failing_client = FailingBulkClient(async_client)
368+
await helpers.async_bulk(
369+
failing_client, [{"a": 42}], index="i", ignore_status=(599,)
370+
)
371+
339372
async def test_errors_are_collected_properly(self, async_client):
340373
await async_client.indices.create(
341374
"i",

test_elasticsearch/test_server/test_helpers.py

Lines changed: 33 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -303,6 +303,39 @@ def test_error_is_raised(self):
303303
index="i",
304304
)
305305

306+
def test_ignore_error_if_raised(self):
307+
# ignore the status code 400 in tuple
308+
helpers.bulk(
309+
self.client, [{"a": 42}, {"a": "c"}], index="i", ignore_status=(400,)
310+
)
311+
312+
# ignore the status code 400 in list
313+
helpers.bulk(
314+
self.client,
315+
[{"a": 42}, {"a": "c"}],
316+
index="i",
317+
ignore_status=[
318+
400,
319+
],
320+
)
321+
322+
# ignore the status code 400
323+
helpers.bulk(self.client, [{"a": 42}, {"a": "c"}], index="i", ignore_status=400)
324+
325+
# ignore only the status code in the `ignore_status` argument
326+
self.assertRaises(
327+
helpers.BulkIndexError,
328+
helpers.bulk,
329+
self.client,
330+
[{"a": 42}, {"a": "c"}],
331+
index="i",
332+
ignore_status=(444,),
333+
)
334+
335+
# ignore transport error exception
336+
failing_client = FailingBulkClient(self.client)
337+
helpers.bulk(failing_client, [{"a": 42}], index="i", ignore_status=(599,))
338+
306339
def test_errors_are_collected_properly(self):
307340
self.client.indices.create(
308341
"i",

0 commit comments

Comments
 (0)