3434 Union ,
3535)
3636
37+ from elastic_transport import OpenTelemetrySpan
38+
3739from .. import Elasticsearch
3840from ..compat import to_bytes
3941from ..exceptions import ApiError , NotFoundError , TransportError
@@ -322,6 +324,7 @@ def _process_bulk_chunk(
322324 Tuple [_TYPE_BULK_ACTION_HEADER , _TYPE_BULK_ACTION_BODY ],
323325 ]
324326 ],
327+ otel_span : OpenTelemetrySpan ,
325328 raise_on_exception : bool = True ,
326329 raise_on_error : bool = True ,
327330 ignore_status : Union [int , Collection [int ]] = (),
@@ -331,28 +334,29 @@ def _process_bulk_chunk(
331334 """
332335 Send a bulk request to elasticsearch and process the output.
333336 """
334- if isinstance (ignore_status , int ):
335- ignore_status = (ignore_status ,)
336-
337- try :
338- # send the actual request
339- resp = client .bulk (* args , operations = bulk_actions , ** kwargs ) # type: ignore[arg-type]
340- except ApiError as e :
341- gen = _process_bulk_chunk_error (
342- error = e ,
343- bulk_data = bulk_data ,
344- ignore_status = ignore_status ,
345- raise_on_exception = raise_on_exception ,
346- raise_on_error = raise_on_error ,
347- )
348- else :
349- gen = _process_bulk_chunk_success (
350- resp = resp .body ,
351- bulk_data = bulk_data ,
352- ignore_status = ignore_status ,
353- raise_on_error = raise_on_error ,
354- )
355- yield from gen
337+ with client ._otel .use_span (otel_span ):
338+ if isinstance (ignore_status , int ):
339+ ignore_status = (ignore_status ,)
340+
341+ try :
342+ # send the actual request
343+ resp = client .bulk (* args , operations = bulk_actions , ** kwargs ) # type: ignore[arg-type]
344+ except ApiError as e :
345+ gen = _process_bulk_chunk_error (
346+ error = e ,
347+ bulk_data = bulk_data ,
348+ ignore_status = ignore_status ,
349+ raise_on_exception = raise_on_exception ,
350+ raise_on_error = raise_on_error ,
351+ )
352+ else :
353+ gen = _process_bulk_chunk_success (
354+ resp = resp .body ,
355+ bulk_data = bulk_data ,
356+ ignore_status = ignore_status ,
357+ raise_on_error = raise_on_error ,
358+ )
359+ yield from gen
356360
357361
358362def streaming_bulk (
@@ -370,6 +374,7 @@ def streaming_bulk(
370374 max_backoff : float = 600 ,
371375 yield_ok : bool = True ,
372376 ignore_status : Union [int , Collection [int ]] = (),
377+ span_name : str = "helpers.streaming_bulk" ,
373378 * args : Any ,
374379 ** kwargs : Any ,
375380) -> Iterable [Tuple [bool , Dict [str , Any ]]]:
@@ -406,73 +411,78 @@ def streaming_bulk(
406411 :arg yield_ok: if set to False will skip successful documents in the output
407412 :arg ignore_status: list of HTTP status code that you want to ignore
408413 """
409- client = client .options ()
410- client ._client_meta = (("h" , "bp" ),)
414+ with client ._otel .helpers_span (span_name ) as otel_span :
415+ client = client .options ()
416+ client ._client_meta = (("h" , "bp" ),)
411417
412- serializer = client .transport .serializers .get_serializer ("application/json" )
418+ serializer = client .transport .serializers .get_serializer ("application/json" )
413419
414- bulk_data : List [
415- Union [
416- Tuple [_TYPE_BULK_ACTION_HEADER ],
417- Tuple [_TYPE_BULK_ACTION_HEADER , _TYPE_BULK_ACTION_BODY ],
420+ bulk_data : List [
421+ Union [
422+ Tuple [_TYPE_BULK_ACTION_HEADER ],
423+ Tuple [_TYPE_BULK_ACTION_HEADER , _TYPE_BULK_ACTION_BODY ],
424+ ]
418425 ]
419- ]
420- bulk_actions : List [bytes ]
421- for bulk_data , bulk_actions in _chunk_actions (
422- map (expand_action_callback , actions ), chunk_size , max_chunk_bytes , serializer
423- ):
424- for attempt in range (max_retries + 1 ):
425- to_retry : List [bytes ] = []
426- to_retry_data : List [
427- Union [
428- Tuple [_TYPE_BULK_ACTION_HEADER ],
429- Tuple [_TYPE_BULK_ACTION_HEADER , _TYPE_BULK_ACTION_BODY ],
430- ]
431- ] = []
432- if attempt :
433- time .sleep (min (max_backoff , initial_backoff * 2 ** (attempt - 1 )))
434-
435- try :
436- for data , (ok , info ) in zip (
437- bulk_data ,
438- _process_bulk_chunk (
439- client ,
440- bulk_actions ,
426+ bulk_actions : List [bytes ]
427+ for bulk_data , bulk_actions in _chunk_actions (
428+ map (expand_action_callback , actions ),
429+ chunk_size ,
430+ max_chunk_bytes ,
431+ serializer ,
432+ ):
433+ for attempt in range (max_retries + 1 ):
434+ to_retry : List [bytes ] = []
435+ to_retry_data : List [
436+ Union [
437+ Tuple [_TYPE_BULK_ACTION_HEADER ],
438+ Tuple [_TYPE_BULK_ACTION_HEADER , _TYPE_BULK_ACTION_BODY ],
439+ ]
440+ ] = []
441+ if attempt :
442+ time .sleep (min (max_backoff , initial_backoff * 2 ** (attempt - 1 )))
443+
444+ try :
445+ for data , (ok , info ) in zip (
441446 bulk_data ,
442- raise_on_exception ,
443- raise_on_error ,
444- ignore_status ,
445- * args ,
446- ** kwargs ,
447- ),
448- ):
449- if not ok :
450- action , info = info .popitem ()
451- # retry if retries enabled, we get 429, and we are not
452- # in the last attempt
453- if (
454- max_retries
455- and info ["status" ] == 429
456- and (attempt + 1 ) <= max_retries
457- ):
458- # _process_bulk_chunk expects bytes so we need to
459- # re-serialize the data
460- to_retry .extend (map (serializer .dumps , data ))
461- to_retry_data .append (data )
462- else :
463- yield ok , {action : info }
464- elif yield_ok :
465- yield ok , info
466-
467- except ApiError as e :
468- # suppress 429 errors since we will retry them
469- if attempt == max_retries or e .status_code != 429 :
470- raise
471- else :
472- if not to_retry :
473- break
474- # retry only subset of documents that didn't succeed
475- bulk_actions , bulk_data = to_retry , to_retry_data
447+ _process_bulk_chunk (
448+ client ,
449+ bulk_actions ,
450+ bulk_data ,
451+ otel_span ,
452+ raise_on_exception ,
453+ raise_on_error ,
454+ ignore_status ,
455+ * args ,
456+ ** kwargs ,
457+ ),
458+ ):
459+ if not ok :
460+ action , info = info .popitem ()
461+ # retry if retries enabled, we get 429, and we are not
462+ # in the last attempt
463+ if (
464+ max_retries
465+ and info ["status" ] == 429
466+ and (attempt + 1 ) <= max_retries
467+ ):
468+ # _process_bulk_chunk expects bytes so we need to
469+ # re-serialize the data
470+ to_retry .extend (map (serializer .dumps , data ))
471+ to_retry_data .append (data )
472+ else :
473+ yield ok , {action : info }
474+ elif yield_ok :
475+ yield ok , info
476+
477+ except ApiError as e :
478+ # suppress 429 errors since we will retry them
479+ if attempt == max_retries or e .status_code != 429 :
480+ raise
481+ else :
482+ if not to_retry :
483+ break
484+ # retry only subset of documents that didn't succeed
485+ bulk_actions , bulk_data = to_retry , to_retry_data
476486
477487
478488def bulk (
@@ -519,7 +529,7 @@ def bulk(
519529 # make streaming_bulk yield successful results so we can count them
520530 kwargs ["yield_ok" ] = True
521531 for ok , item in streaming_bulk (
522- client , actions , ignore_status = ignore_status , * args , ** kwargs # type: ignore[misc]
532+ client , actions , ignore_status = ignore_status , span_name = "helpers.bulk" , * args , ** kwargs # type: ignore[misc]
523533 ):
524534 # go through request-response pairs and detect failures
525535 if not ok :
@@ -589,27 +599,31 @@ def _setup_queues(self) -> None:
589599 ] = Queue (max (queue_size , thread_count ))
590600 self ._quick_put = self ._inqueue .put
591601
592- pool = BlockingPool (thread_count )
602+ with client ._otel .helpers_span ("helpers.parallel_bulk" ) as otel_span :
603+ pool = BlockingPool (thread_count )
593604
594- try :
595- for result in pool .imap (
596- lambda bulk_chunk : list (
597- _process_bulk_chunk (
598- client ,
599- bulk_chunk [1 ],
600- bulk_chunk [0 ],
601- ignore_status = ignore_status , # type: ignore[misc]
602- * args ,
603- ** kwargs ,
604- )
605- ),
606- _chunk_actions (expanded_actions , chunk_size , max_chunk_bytes , serializer ),
607- ):
608- yield from result
609-
610- finally :
611- pool .close ()
612- pool .join ()
605+ try :
606+ for result in pool .imap (
607+ lambda bulk_chunk : list (
608+ _process_bulk_chunk (
609+ client ,
610+ bulk_chunk [1 ],
611+ bulk_chunk [0 ],
612+ otel_span = otel_span ,
613+ ignore_status = ignore_status , # type: ignore[misc]
614+ * args ,
615+ ** kwargs ,
616+ )
617+ ),
618+ _chunk_actions (
619+ expanded_actions , chunk_size , max_chunk_bytes , serializer
620+ ),
621+ ):
622+ yield from result
623+
624+ finally :
625+ pool .close ()
626+ pool .join ()
613627
614628
615629def scan (
0 commit comments