Skip to content
Draft
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -13,83 +13,16 @@
# limitations under the License.

"""
This library allows tracing HTTP elasticsearch made by the
This library allows tracing HTTP Elasticsearch requests made by the
`elasticsearch <https://elasticsearch-py.readthedocs.io/en/master/>`_ library.

Usage
-----

.. code-block:: python

from opentelemetry.instrumentation.elasticsearch import ElasticsearchInstrumentor
import elasticsearch


# instrument elasticsearch
ElasticsearchInstrumentor().instrument()

# Using elasticsearch as normal now will automatically generate spans
es = elasticsearch.Elasticsearch()
es.index(index='my-index', doc_type='my-type', id=1, body={'my': 'data', 'timestamp': datetime.now()})
es.get(index='my-index', doc_type='my-type', id=1)

Elasticsearch instrumentation prefixes operation names with the string "Elasticsearch". This
can be changed to a different string by either setting the `OTEL_PYTHON_ELASTICSEARCH_NAME_PREFIX`
environment variable or by passing the prefix as an argument to the instrumentor. For example,


.. code-block:: python

ElasticsearchInstrumentor("my-custom-prefix").instrument()


The `instrument` method accepts the following keyword args:

tracer_provider (TracerProvider) - an optional tracer provider
request_hook (Callable) - a function with extra user-defined logic to be performed before performing the request
this function signature is:
def request_hook(span: Span, method: str, url: str, kwargs)
response_hook (Callable) - a function with extra user-defined logic to be performed after performing the request
this function signature is:
def response_hook(span: Span, response: dict)

for example:

.. code: python

from opentelemetry.instrumentation.elasticsearch import ElasticsearchInstrumentor
import elasticsearch

def request_hook(span, method, url, kwargs):
if span and span.is_recording():
span.set_attribute("custom_user_attribute_from_request_hook", "some-value")

def response_hook(span, response):
if span and span.is_recording():
span.set_attribute("custom_user_attribute_from_response_hook", "some-value")

# instrument elasticsearch with request and response hooks
ElasticsearchInstrumentor().instrument(request_hook=request_hook, response_hook=response_hook)

# Using elasticsearch as normal now will automatically generate spans,
# including user custom attributes added from the hooks
es = elasticsearch.Elasticsearch()
es.index(index='my-index', doc_type='my-type', id=1, body={'my': 'data', 'timestamp': datetime.now()})
es.get(index='my-index', doc_type='my-type', id=1)

API
---
"""

import re
from logging import getLogger
from os import environ
from typing import Collection

import elasticsearch
import elasticsearch.exceptions
from wrapt import wrap_function_wrapper as _wrap

from opentelemetry.instrumentation.elasticsearch.package import _instruments
from opentelemetry.instrumentation.elasticsearch.version import __version__
from opentelemetry.instrumentation.instrumentor import BaseInstrumentor
Expand All @@ -99,143 +32,139 @@ def response_hook(span, response):

logger = getLogger(__name__)


# Values to add as tags from the actual
# payload returned by Elasticsearch, if any.
_ATTRIBUTES_FROM_RESULT = [
"found",
"timed_out",
"took",
]

_ATTRIBUTES_FROM_RESULT = ["found", "timed_out", "took"]
_DEFAULT_OP_NAME = "request"
_regex_doc_url = re.compile(r"/_doc/([^/]+)")


class ElasticsearchInstrumentor(BaseInstrumentor):
"""An instrumentor for elasticsearch
See `BaseInstrumentor`
"""
"""An instrumentor for Elasticsearch."""

def __init__(self, span_name_prefix=None):
if not span_name_prefix:
span_name_prefix = environ.get(
"OTEL_PYTHON_ELASTICSEARCH_NAME_PREFIX",
"Elasticsearch",
)
span_name_prefix = span_name_prefix or environ.get(
"OTEL_PYTHON_ELASTICSEARCH_NAME_PREFIX", "Elasticsearch"
)
self._span_name_prefix = span_name_prefix.strip()
super().__init__()

def instrumentation_dependencies(self) -> Collection[str]:
return _instruments

def _instrument(self, **kwargs):
"""
Instruments elasticsarch module
"""
tracer_provider = kwargs.get("tracer_provider")
tracer = get_tracer(__name__, __version__, tracer_provider)
request_hook = kwargs.get("request_hook")
response_hook = kwargs.get("response_hook")

_wrap(
elasticsearch,
"Transport.perform_request",
elasticsearch.Transport,
"perform_request",
_wrap_perform_request(
tracer, self._span_name_prefix, request_hook, response_hook
),
)
_wrap(
elasticsearch.AsyncTransport,
"perform_request",
_wrap_perform_async_request(
tracer, self._span_name_prefix, request_hook, response_hook
),
)

def _uninstrument(self, **kwargs):
unwrap(elasticsearch.Transport, "perform_request")
unwrap(elasticsearch.AsyncTransport, "perform_request")


_regex_doc_url = re.compile(r"/_doc/([^/]+)")
def _extract(args, kwargs, span_name_prefix):
method, url = None, None
try:
method, url, *_ = args
except IndexError:
logger.warning(
"Expected perform_request to receive two positional arguments. Got %d",
len(args),
)

op_name = f"{span_name_prefix}{url or method or _DEFAULT_OP_NAME}"
doc_id = None

if url:
match = _regex_doc_url.search(url)
if match:
doc_span = match.span()
op_name = f"{span_name_prefix}{url[:doc_span[0]]}/_doc/:id{url[doc_span[1]:]}"
doc_id = match.group(1)

# search api https://www.elastic.co/guide/en/elasticsearch/reference/current/search-search.html
_regex_search_url = re.compile(r"/([^/]+)/_search[/]?")
return method, url, op_name, kwargs.get("body"), kwargs.get("params", {}), doc_id


def _wrap_perform_request(
tracer, span_name_prefix, request_hook=None, response_hook=None
):
# pylint: disable=R0912,R0914
def _set_span_attributes(span, url, method, body, params, doc_id):
attributes = {SpanAttributes.DB_SYSTEM: "elasticsearch"}
if url:
attributes["elasticsearch.url"] = url
if method:
attributes["elasticsearch.method"] = method
if body:
attributes[SpanAttributes.DB_STATEMENT] = str(body)
if params:
attributes["elasticsearch.params"] = str(params)
if doc_id:
attributes["elasticsearch.id"] = doc_id

for key, value in attributes.items():
span.set_attribute(key, value)


def _set_span_attributes_from_rv(span, return_value):
for member in _ATTRIBUTES_FROM_RESULT:
if member in return_value:
span.set_attribute(f"elasticsearch.{member}", str(return_value[member]))


def _wrap_perform_request(tracer, span_name_prefix, request_hook=None, response_hook=None):
def wrapper(wrapped, _, args, kwargs):
method = url = None
try:
method, url, *_ = args
except IndexError:
logger.warning(
"expected perform_request to receive two positional arguments. "
"Got %d",
len(args),
)

op_name = span_name_prefix + (url or method or _DEFAULT_OP_NAME)

doc_id = None
search_target = None

if url:
# TODO: This regex-based solution avoids creating an unbounded number of span names, but should be replaced by instrumenting individual Elasticsearch methods instead of Transport.perform_request()
# A limitation of the regex is that only the '_doc' mapping type is supported. Mapping types are deprecated since Elasticsearch 7
# https://github.com/open-telemetry/opentelemetry-python-contrib/issues/708
match = _regex_doc_url.search(url)
if match is not None:
# Remove the full document ID from the URL
doc_span = match.span()
op_name = (
span_name_prefix
+ url[: doc_span[0]]
+ "/_doc/:id"
+ url[doc_span[1] :]
)
# Put the document ID in attributes
doc_id = match.group(1)
match = _regex_search_url.search(url)
if match is not None:
op_name = span_name_prefix + "/<target>/_search"
search_target = match.group(1)

params = kwargs.get("params", {})
body = kwargs.get("body", None)

with tracer.start_as_current_span(
op_name,
kind=SpanKind.CLIENT,
) as span:
method, url, op_name, body, params, doc_id = _extract(args, kwargs, span_name_prefix)

with tracer.start_as_current_span(op_name, kind=SpanKind.CLIENT) as span:
if callable(request_hook):
request_hook(span, method, url, kwargs)

if span.is_recording():
attributes = {
SpanAttributes.DB_SYSTEM: "elasticsearch",
}
if url:
attributes["elasticsearch.url"] = url
if method:
attributes["elasticsearch.method"] = method
if body:
attributes[SpanAttributes.DB_STATEMENT] = str(body)
if params:
attributes["elasticsearch.params"] = str(params)
if doc_id:
attributes["elasticsearch.id"] = doc_id
if search_target:
attributes["elasticsearch.target"] = search_target
for key, value in attributes.items():
span.set_attribute(key, value)

rv = wrapped(*args, **kwargs)
if isinstance(rv, dict) and span.is_recording():
for member in _ATTRIBUTES_FROM_RESULT:
if member in rv:
span.set_attribute(
f"elasticsearch.{member}",
str(rv[member]),
)
_set_span_attributes(span, url, method, body, params, doc_id)

return_value = wrapped(*args, **kwargs)

if isinstance(return_value, dict) and span.is_recording():
_set_span_attributes_from_rv(span, return_value)

if callable(response_hook):
response_hook(span, rv)
return rv
response_hook(span, return_value)

return return_value

return wrapper


def _wrap_perform_async_request(tracer, span_name_prefix, request_hook=None, response_hook=None):
async def wrapper(wrapped, _, args, kwargs):
method, url, op_name, body, params, doc_id = _extract(args, kwargs, span_name_prefix)

with tracer.start_as_current_span(op_name, kind=SpanKind.CLIENT) as span:
if callable(request_hook):
request_hook(span, method, url, kwargs)

if span.is_recording():
_set_span_attributes(span, url, method, body, params, doc_id)

return_value = await wrapped(*args, **kwargs)

if isinstance(return_value, dict) and span.is_recording():
_set_span_attributes_from_rv(span, return_value)

if callable(response_hook):
response_hook(span, return_value)

return return_value

return wrapper
Loading