Skip to content

Commit 1c2800f

Browse files
committed
Use enum instead of function for status retries
1 parent 8f12f96 commit 1c2800f

File tree

4 files changed

+26
-48
lines changed

4 files changed

+26
-48
lines changed

elasticsearch/_async/helpers.py

Lines changed: 12 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -158,12 +158,6 @@ async def azip(
158158
pass
159159

160160

161-
def _retry_for_status(status: int) -> bool:
162-
if status == 429:
163-
return True
164-
return False
165-
166-
167161
async def async_streaming_bulk(
168162
client: AsyncElasticsearch,
169163
actions: Union[Iterable[_TYPE_BULK_ACTION], AsyncIterable[_TYPE_BULK_ACTION]],
@@ -173,13 +167,13 @@ async def async_streaming_bulk(
173167
expand_action_callback: Callable[
174168
[_TYPE_BULK_ACTION], _TYPE_BULK_ACTION_HEADER_AND_BODY
175169
] = expand_action,
176-
retry_for_status_callback: Callable[[int], bool] = _retry_for_status,
177170
raise_on_exception: bool = True,
178171
max_retries: int = 0,
179172
initial_backoff: float = 2,
180173
max_backoff: float = 600,
181174
yield_ok: bool = True,
182175
ignore_status: Union[int, Collection[int]] = (),
176+
retry_on_status: Union[int, Collection[int]] = (429,),
183177
*args: Any,
184178
**kwargs: Any,
185179
) -> AsyncIterable[Tuple[bool, Dict[str, Any]]]:
@@ -191,7 +185,7 @@ async def async_streaming_bulk(
191185
entire input is consumed and sent.
192186
193187
If you specify ``max_retries`` it will also retry any documents that were
194-
rejected with a ``429`` status code. Use ``retry_for_status_callback`` to
188+
rejected with a ``429`` status code. Use ``retry_on_status`` to
195189
configure which status codes will be retried. To do this it will wait
196190
(**by calling time.sleep which will block**) for ``initial_backoff`` seconds
197191
and then, every subsequent rejection for the same chunk, for double the time
@@ -208,12 +202,11 @@ async def async_streaming_bulk(
208202
:arg expand_action_callback: callback executed on each action passed in,
209203
should return a tuple containing the action line and the data line
210204
(`None` if data line should be omitted).
211-
:arg retry_for_status_callback: callback executed on each item's status,
212-
should return a True if the status require a retry and False if not.
205+
:arg retry_on_status: HTTP status code that will trigger a retry.
213206
(if `None` is specified only status 429 will retry).
214207
:arg max_retries: maximum number of times a document will be retried when
215-
retry_for_status_callback (defaulting to ``429``) is received,
216-
set to 0 (default) for no retries on retry_for_status_callback
208+
retry_on_status (defaulting to ``429``) is received,
209+
set to 0 (default) for no retries
217210
:arg initial_backoff: number of seconds we should wait before the first
218211
retry. Any subsequent retries will be powers of ``initial_backoff *
219212
2**retry_number``
@@ -225,6 +218,9 @@ async def async_streaming_bulk(
225218
client = client.options()
226219
client._client_meta = (("h", "bp"),)
227220

221+
if isinstance(retry_on_status, int):
222+
retry_on_status = (retry_on_status,)
223+
228224
async def map_actions() -> AsyncIterable[_TYPE_BULK_ACTION_HEADER_AND_BODY]:
229225
async for item in aiter(actions):
230226
yield expand_action_callback(item)
@@ -277,10 +273,10 @@ async def map_actions() -> AsyncIterable[_TYPE_BULK_ACTION_HEADER_AND_BODY]:
277273
if not ok:
278274
action, info = info.popitem()
279275
# retry if retries enabled, we are not in the last attempt,
280-
# and retry_for_status_callback is true (defaulting to 429)
276+
# and status not in retry_on_status (defaulting to 429)
281277
if (
282278
max_retries
283-
and retry_for_status_callback(info["status"])
279+
and info["status"] in retry_on_status
284280
and (attempt + 1) <= max_retries
285281
):
286282
# _process_bulk_chunk expects strings so we need to
@@ -293,11 +289,9 @@ async def map_actions() -> AsyncIterable[_TYPE_BULK_ACTION_HEADER_AND_BODY]:
293289
yield ok, info
294290

295291
except ApiError as e:
296-
# suppress any status which retry_for_status_callback is true (defaulting to 429)
292+
# suppress any status in retry_on_status (429 by default)
297293
# since we will retry them
298-
if attempt == max_retries or not retry_for_status_callback(
299-
e.status_code
300-
):
294+
if attempt == max_retries or e.status_code not in retry_on_status:
301295
raise
302296
else:
303297
if not to_retry:

elasticsearch/helpers/actions.py

Lines changed: 12 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -359,12 +359,6 @@ def _process_bulk_chunk(
359359
yield from gen
360360

361361

362-
def _retry_for_status(status: int) -> bool:
363-
if status == 429:
364-
return True
365-
return False
366-
367-
368362
def streaming_bulk(
369363
client: Elasticsearch,
370364
actions: Iterable[_TYPE_BULK_ACTION],
@@ -374,13 +368,13 @@ def streaming_bulk(
374368
expand_action_callback: Callable[
375369
[_TYPE_BULK_ACTION], _TYPE_BULK_ACTION_HEADER_AND_BODY
376370
] = expand_action,
377-
retry_for_status_callback: Callable[[int], bool] = _retry_for_status,
378371
raise_on_exception: bool = True,
379372
max_retries: int = 0,
380373
initial_backoff: float = 2,
381374
max_backoff: float = 600,
382375
yield_ok: bool = True,
383376
ignore_status: Union[int, Collection[int]] = (),
377+
retry_on_status: Union[int, Collection[int]] = (429,),
384378
span_name: str = "helpers.streaming_bulk",
385379
*args: Any,
386380
**kwargs: Any,
@@ -393,7 +387,7 @@ def streaming_bulk(
393387
entire input is consumed and sent.
394388
395389
If you specify ``max_retries`` it will also retry any documents that were
396-
rejected with a ``429`` status code. Use ``retry_for_status_callback`` to
390+
rejected with a ``429`` status code. Use ``retry_on_status`` to
397391
configure which status codes will be retried. To do this it will wait
398392
(**by calling time.sleep which will block**) for ``initial_backoff`` seconds
399393
and then, every subsequent rejection for the same chunk, for double the time
@@ -410,12 +404,11 @@ def streaming_bulk(
410404
:arg expand_action_callback: callback executed on each action passed in,
411405
should return a tuple containing the action line and the data line
412406
(`None` if data line should be omitted).
413-
:arg retry_for_status_callback: callback executed on each item's status,
414-
should return a True if the status require a retry and False if not.
407+
:arg retry_on_status: HTTP status code that will trigger a retry.
415408
(if `None` is specified only status 429 will retry).
416409
:arg max_retries: maximum number of times a document will be retried when
417-
retry_for_status_callback (defaulting to ``429``) is received,
418-
set to 0 (default) for no retries on retry_for_status_callback
410+
retry_on_status (defaulting to ``429``) is received,
411+
set to 0 (default) for no retries
419412
:arg initial_backoff: number of seconds we should wait before the first
420413
retry. Any subsequent retries will be powers of ``initial_backoff *
421414
2**retry_number``
@@ -427,6 +420,9 @@ def streaming_bulk(
427420
client = client.options()
428421
client._client_meta = (("h", "bp"),)
429422

423+
if isinstance(retry_on_status, int):
424+
retry_on_status = (retry_on_status,)
425+
430426
serializer = client.transport.serializers.get_serializer("application/json")
431427

432428
bulk_data: List[
@@ -471,10 +467,10 @@ def streaming_bulk(
471467
if not ok:
472468
action, info = info.popitem()
473469
# retry if retries enabled, we are not in the last attempt,
474-
# and retry_for_status_callback is true (defaulting to 429)
470+
# and status not in retry_on_status (defaulting to 429)
475471
if (
476472
max_retries
477-
and retry_for_status_callback(info["status"])
473+
and info["status"] in retry_on_status
478474
and (attempt + 1) <= max_retries
479475
):
480476
# _process_bulk_chunk expects bytes so we need to
@@ -487,11 +483,9 @@ def streaming_bulk(
487483
yield ok, info
488484

489485
except ApiError as e:
490-
# suppress any status which retry_for_status_callback is true (defaulting to 429)
486+
# suppress any status in retry_on_status (429 by default)
491487
# since we will retry them
492-
if attempt == max_retries or not retry_for_status_callback(
493-
e.status_code
494-
):
488+
if attempt == max_retries or e.status_code not in retry_on_status:
495489
raise
496490
else:
497491
if not to_retry:

test_elasticsearch/test_async/test_server/test_helpers.py

Lines changed: 1 addition & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -312,11 +312,6 @@ async def test_connection_timeout_is_retried_with_retry_status_callback(
312312
{"_index": "i", "_id": 42, "f": "v"},
313313
]
314314

315-
def _retry_for_connection_timeout(status):
316-
if status == 522:
317-
return True
318-
return False
319-
320315
results = [
321316
x
322317
async for x in helpers.async_streaming_bulk(
@@ -325,7 +320,7 @@ def _retry_for_connection_timeout(status):
325320
raise_on_exception=False,
326321
raise_on_error=False,
327322
chunk_size=1,
328-
retry_for_status_callback=_retry_for_connection_timeout,
323+
retry_on_status=522,
329324
max_retries=1,
330325
initial_backoff=0,
331326
)

test_elasticsearch/test_server/test_helpers.py

Lines changed: 1 addition & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -305,11 +305,6 @@ def test_connection_timeout_is_retried_with_retry_status_callback(sync_client):
305305
{"_index": "i", "_id": 42, "f": "v"},
306306
]
307307

308-
def _retry_for_connection_timeout(status):
309-
if status == 522:
310-
return True
311-
return False
312-
313308
results = list(
314309
helpers.streaming_bulk(
315310
failing_client,
@@ -318,7 +313,7 @@ def _retry_for_connection_timeout(status):
318313
raise_on_exception=False,
319314
raise_on_error=False,
320315
chunk_size=1,
321-
retry_for_status_callback=_retry_for_connection_timeout,
316+
retry_on_status=522,
322317
max_retries=1,
323318
initial_backoff=0,
324319
)

0 commit comments

Comments
 (0)