diff --git a/instrumentation/opentelemetry-instrumentation-elasticsearch/src/opentelemetry/instrumentation/elasticsearch/__init__.py b/instrumentation/opentelemetry-instrumentation-elasticsearch/src/opentelemetry/instrumentation/elasticsearch/__init__.py index 6f29b1f43d..c8d36a1e5a 100644 --- a/instrumentation/opentelemetry-instrumentation-elasticsearch/src/opentelemetry/instrumentation/elasticsearch/__init__.py +++ b/instrumentation/opentelemetry-instrumentation-elasticsearch/src/opentelemetry/instrumentation/elasticsearch/__init__.py @@ -13,83 +13,16 @@ # limitations under the License. """ -This library allows tracing HTTP elasticsearch made by the +This library allows tracing HTTP Elasticsearch requests made by the `elasticsearch `_ library. - -Usage ------ - -.. code-block:: python - - from opentelemetry.instrumentation.elasticsearch import ElasticsearchInstrumentor - import elasticsearch - - - # instrument elasticsearch - ElasticsearchInstrumentor().instrument() - - # Using elasticsearch as normal now will automatically generate spans - es = elasticsearch.Elasticsearch() - es.index(index='my-index', doc_type='my-type', id=1, body={'my': 'data', 'timestamp': datetime.now()}) - es.get(index='my-index', doc_type='my-type', id=1) - -Elasticsearch instrumentation prefixes operation names with the string "Elasticsearch". This -can be changed to a different string by either setting the `OTEL_PYTHON_ELASTICSEARCH_NAME_PREFIX` -environment variable or by passing the prefix as an argument to the instrumentor. For example, - - -.. code-block:: python - - ElasticsearchInstrumentor("my-custom-prefix").instrument() - - -The `instrument` method accepts the following keyword args: - -tracer_provider (TracerProvider) - an optional tracer provider -request_hook (Callable) - a function with extra user-defined logic to be performed before performing the request - this function signature is: - def request_hook(span: Span, method: str, url: str, kwargs) -response_hook (Callable) - a function with extra user-defined logic to be performed after performing the request - this function signature is: - def response_hook(span: Span, response: dict) - -for example: - -.. code: python - - from opentelemetry.instrumentation.elasticsearch import ElasticsearchInstrumentor - import elasticsearch - - def request_hook(span, method, url, kwargs): - if span and span.is_recording(): - span.set_attribute("custom_user_attribute_from_request_hook", "some-value") - - def response_hook(span, response): - if span and span.is_recording(): - span.set_attribute("custom_user_attribute_from_response_hook", "some-value") - - # instrument elasticsearch with request and response hooks - ElasticsearchInstrumentor().instrument(request_hook=request_hook, response_hook=response_hook) - - # Using elasticsearch as normal now will automatically generate spans, - # including user custom attributes added from the hooks - es = elasticsearch.Elasticsearch() - es.index(index='my-index', doc_type='my-type', id=1, body={'my': 'data', 'timestamp': datetime.now()}) - es.get(index='my-index', doc_type='my-type', id=1) - -API ---- """ import re from logging import getLogger from os import environ from typing import Collection - import elasticsearch -import elasticsearch.exceptions from wrapt import wrap_function_wrapper as _wrap - from opentelemetry.instrumentation.elasticsearch.package import _instruments from opentelemetry.instrumentation.elasticsearch.version import __version__ from opentelemetry.instrumentation.instrumentor import BaseInstrumentor @@ -99,29 +32,18 @@ def response_hook(span, response): logger = getLogger(__name__) - -# Values to add as tags from the actual -# payload returned by Elasticsearch, if any. -_ATTRIBUTES_FROM_RESULT = [ - "found", - "timed_out", - "took", -] - +_ATTRIBUTES_FROM_RESULT = ["found", "timed_out", "took"] _DEFAULT_OP_NAME = "request" +_regex_doc_url = re.compile(r"/_doc/([^/]+)") class ElasticsearchInstrumentor(BaseInstrumentor): - """An instrumentor for elasticsearch - See `BaseInstrumentor` - """ + """An instrumentor for Elasticsearch.""" def __init__(self, span_name_prefix=None): - if not span_name_prefix: - span_name_prefix = environ.get( - "OTEL_PYTHON_ELASTICSEARCH_NAME_PREFIX", - "Elasticsearch", - ) + span_name_prefix = span_name_prefix or environ.get( + "OTEL_PYTHON_ELASTICSEARCH_NAME_PREFIX", "Elasticsearch" + ) self._span_name_prefix = span_name_prefix.strip() super().__init__() @@ -129,113 +51,120 @@ def instrumentation_dependencies(self) -> Collection[str]: return _instruments def _instrument(self, **kwargs): - """ - Instruments elasticsarch module - """ tracer_provider = kwargs.get("tracer_provider") tracer = get_tracer(__name__, __version__, tracer_provider) request_hook = kwargs.get("request_hook") response_hook = kwargs.get("response_hook") + _wrap( - elasticsearch, - "Transport.perform_request", + elasticsearch.Transport, + "perform_request", _wrap_perform_request( tracer, self._span_name_prefix, request_hook, response_hook ), ) + _wrap( + elasticsearch.AsyncTransport, + "perform_request", + _wrap_perform_async_request( + tracer, self._span_name_prefix, request_hook, response_hook + ), + ) def _uninstrument(self, **kwargs): unwrap(elasticsearch.Transport, "perform_request") + unwrap(elasticsearch.AsyncTransport, "perform_request") -_regex_doc_url = re.compile(r"/_doc/([^/]+)") +def _extract(args, kwargs, span_name_prefix): + method, url = None, None + try: + method, url, *_ = args + except IndexError: + logger.warning( + "Expected perform_request to receive two positional arguments. Got %d", + len(args), + ) + + op_name = f"{span_name_prefix}{url or method or _DEFAULT_OP_NAME}" + doc_id = None + + if url: + match = _regex_doc_url.search(url) + if match: + doc_span = match.span() + op_name = f"{span_name_prefix}{url[:doc_span[0]]}/_doc/:id{url[doc_span[1]:]}" + doc_id = match.group(1) -# search api https://www.elastic.co/guide/en/elasticsearch/reference/current/search-search.html -_regex_search_url = re.compile(r"/([^/]+)/_search[/]?") + return method, url, op_name, kwargs.get("body"), kwargs.get("params", {}), doc_id -def _wrap_perform_request( - tracer, span_name_prefix, request_hook=None, response_hook=None -): - # pylint: disable=R0912,R0914 +def _set_span_attributes(span, url, method, body, params, doc_id): + attributes = {SpanAttributes.DB_SYSTEM: "elasticsearch"} + if url: + attributes["elasticsearch.url"] = url + if method: + attributes["elasticsearch.method"] = method + if body: + attributes[SpanAttributes.DB_STATEMENT] = str(body) + if params: + attributes["elasticsearch.params"] = str(params) + if doc_id: + attributes["elasticsearch.id"] = doc_id + + for key, value in attributes.items(): + span.set_attribute(key, value) + + +def _set_span_attributes_from_rv(span, return_value): + for member in _ATTRIBUTES_FROM_RESULT: + if member in return_value: + span.set_attribute(f"elasticsearch.{member}", str(return_value[member])) + + +def _wrap_perform_request(tracer, span_name_prefix, request_hook=None, response_hook=None): def wrapper(wrapped, _, args, kwargs): - method = url = None - try: - method, url, *_ = args - except IndexError: - logger.warning( - "expected perform_request to receive two positional arguments. " - "Got %d", - len(args), - ) - - op_name = span_name_prefix + (url or method or _DEFAULT_OP_NAME) - - doc_id = None - search_target = None - - if url: - # TODO: This regex-based solution avoids creating an unbounded number of span names, but should be replaced by instrumenting individual Elasticsearch methods instead of Transport.perform_request() - # A limitation of the regex is that only the '_doc' mapping type is supported. Mapping types are deprecated since Elasticsearch 7 - # https://github.com/open-telemetry/opentelemetry-python-contrib/issues/708 - match = _regex_doc_url.search(url) - if match is not None: - # Remove the full document ID from the URL - doc_span = match.span() - op_name = ( - span_name_prefix - + url[: doc_span[0]] - + "/_doc/:id" - + url[doc_span[1] :] - ) - # Put the document ID in attributes - doc_id = match.group(1) - match = _regex_search_url.search(url) - if match is not None: - op_name = span_name_prefix + "//_search" - search_target = match.group(1) - - params = kwargs.get("params", {}) - body = kwargs.get("body", None) - - with tracer.start_as_current_span( - op_name, - kind=SpanKind.CLIENT, - ) as span: + method, url, op_name, body, params, doc_id = _extract(args, kwargs, span_name_prefix) + with tracer.start_as_current_span(op_name, kind=SpanKind.CLIENT) as span: if callable(request_hook): request_hook(span, method, url, kwargs) if span.is_recording(): - attributes = { - SpanAttributes.DB_SYSTEM: "elasticsearch", - } - if url: - attributes["elasticsearch.url"] = url - if method: - attributes["elasticsearch.method"] = method - if body: - attributes[SpanAttributes.DB_STATEMENT] = str(body) - if params: - attributes["elasticsearch.params"] = str(params) - if doc_id: - attributes["elasticsearch.id"] = doc_id - if search_target: - attributes["elasticsearch.target"] = search_target - for key, value in attributes.items(): - span.set_attribute(key, value) - - rv = wrapped(*args, **kwargs) - if isinstance(rv, dict) and span.is_recording(): - for member in _ATTRIBUTES_FROM_RESULT: - if member in rv: - span.set_attribute( - f"elasticsearch.{member}", - str(rv[member]), - ) + _set_span_attributes(span, url, method, body, params, doc_id) + + return_value = wrapped(*args, **kwargs) + + if isinstance(return_value, dict) and span.is_recording(): + _set_span_attributes_from_rv(span, return_value) if callable(response_hook): - response_hook(span, rv) - return rv + response_hook(span, return_value) + + return return_value return wrapper + + +def _wrap_perform_async_request(tracer, span_name_prefix, request_hook=None, response_hook=None): + async def wrapper(wrapped, _, args, kwargs): + method, url, op_name, body, params, doc_id = _extract(args, kwargs, span_name_prefix) + + with tracer.start_as_current_span(op_name, kind=SpanKind.CLIENT) as span: + if callable(request_hook): + request_hook(span, method, url, kwargs) + + if span.is_recording(): + _set_span_attributes(span, url, method, body, params, doc_id) + + return_value = await wrapped(*args, **kwargs) + + if isinstance(return_value, dict) and span.is_recording(): + _set_span_attributes_from_rv(span, return_value) + + if callable(response_hook): + response_hook(span, return_value) + + return return_value + + return wrapper \ No newline at end of file