2121 Any ,
2222 AsyncIterable ,
2323 AsyncIterator ,
24+ Awaitable ,
2425 Callable ,
2526 Collection ,
2627 Dict ,
@@ -167,6 +168,7 @@ async def async_streaming_bulk(
167168 expand_action_callback : Callable [
168169 [_TYPE_BULK_ACTION ], _TYPE_BULK_ACTION_HEADER_AND_BODY
169170 ] = expand_action ,
171+ sleep : Callable [[float ],Awaitable [None ]] = asyncio .sleep ,
170172 raise_on_exception : bool = True ,
171173 max_retries : int = 0 ,
172174 initial_backoff : float = 2 ,
@@ -202,6 +204,7 @@ async def async_streaming_bulk(
202204 :arg expand_action_callback: callback executed on each action passed in,
203205 should return a tuple containing the action line and the data line
204206 (`None` if data line should be omitted).
207+ :arg sleep: custom callable defined for custom action on cancelling
205208 :arg retry_on_status: HTTP status code that will trigger a retry.
206209 (if `None` is specified only status 429 will retry).
207210 :arg max_retries: maximum number of times a document will be retried when
@@ -246,7 +249,7 @@ async def map_actions() -> AsyncIterable[_TYPE_BULK_ACTION_HEADER_AND_BODY]:
246249 ]
247250 ] = []
248251 if attempt :
249- await asyncio . sleep (
252+ await sleep (
250253 min (max_backoff , initial_backoff * 2 ** (attempt - 1 ))
251254 )
252255
@@ -304,6 +307,7 @@ async def async_bulk(
304307 client : AsyncElasticsearch ,
305308 actions : Union [Iterable [_TYPE_BULK_ACTION ], AsyncIterable [_TYPE_BULK_ACTION ]],
306309 stats_only : bool = False ,
310+ sleep : Callable [[float ],Awaitable [None ]] = asyncio .sleep ,
307311 ignore_status : Union [int , Collection [int ]] = (),
308312 * args : Any ,
309313 ** kwargs : Any ,
@@ -329,6 +333,7 @@ async def async_bulk(
329333 :arg actions: iterator containing the actions
330334 :arg stats_only: if `True` only report number of successful/failed
331335 operations instead of just number of successful and a list of error responses
336+ :arg sleep: custom callable defined for custom action on cancelling
332337 :arg ignore_status: list of HTTP status code that you want to ignore
333338
334339 Any additional keyword arguments will be passed to
@@ -344,7 +349,7 @@ async def async_bulk(
344349 # make streaming_bulk yield successful results so we can count them
345350 kwargs ["yield_ok" ] = True
346351 async for ok , item in async_streaming_bulk (
347- client , actions , ignore_status = ignore_status , * args , ** kwargs # type: ignore[misc]
352+ client , actions , sleep = sleep , ignore_status = ignore_status , * args , ** kwargs # type: ignore[misc]
348353 ):
349354 # go through request-response pairs and detect failures
350355 if not ok :
0 commit comments