Skip to content

Commit 5e207a9

Browse files
authored
Update __init__.py
1 parent ad84afa commit 5e207a9

File tree

1 file changed

+51
-127
lines changed
  • instrumentation/opentelemetry-instrumentation-elasticsearch/src/opentelemetry/instrumentation/elasticsearch

1 file changed

+51
-127
lines changed

instrumentation/opentelemetry-instrumentation-elasticsearch/src/opentelemetry/instrumentation/elasticsearch/__init__.py

Lines changed: 51 additions & 127 deletions
Original file line numberDiff line numberDiff line change
@@ -11,103 +11,61 @@
1111
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
1212
# See the License for the specific language governing permissions and
1313
# limitations under the License.
14-
This library allows tracing HTTP elasticsearch made by the
14+
15+
"""
16+
This library allows tracing HTTP Elasticsearch requests made by the
1517
`elasticsearch <https://elasticsearch-py.readthedocs.io/en/master/>`_ library.
16-
Usage
17-
-----
18-
.. code-block:: python
19-
from opentelemetry.instrumentation.elasticsearch import ElasticsearchInstrumentor
20-
import elasticsearch
21-
# instrument elasticsearch
22-
ElasticsearchInstrumentor().instrument()
23-
# Using elasticsearch as normal now will automatically generate spans
24-
es = elasticsearch.Elasticsearch()
25-
es.index(index='my-index', doc_type='my-type', id=1, body={'my': 'data', 'timestamp': datetime.now()})
26-
es.get(index='my-index', doc_type='my-type', id=1)
27-
Elasticsearch instrumentation prefixes operation names with the string "Elasticsearch". This
28-
can be changed to a different string by either setting the `OTEL_PYTHON_ELASTICSEARCH_NAME_PREFIX`
29-
environment variable or by passing the prefix as an argument to the instrumentor. For example,
30-
.. code-block:: python
31-
ElasticsearchInstrumentor("my-custom-prefix").instrument()
32-
The `instrument` method accepts the following keyword args:
33-
tracer_provider (TracerProvider) - an optional tracer provider
34-
request_hook (Callable) - a function with extra user-defined logic to be performed before performing the request
35-
this function signature is:
36-
def request_hook(span: Span, method: str, url: str, kwargs)
37-
response_hook (Callable) - a function with extra user-defined logic to be performed after performing the request
38-
this function signature is:
39-
def response_hook(span: Span, response: dict)
40-
for example:
41-
.. code: python
42-
from opentelemetry.instrumentation.elasticsearch import ElasticsearchInstrumentor
43-
import elasticsearch
44-
def request_hook(span, method, url, kwargs):
45-
if span and span.is_recording():
46-
span.set_attribute("custom_user_attribute_from_request_hook", "some-value")
47-
def response_hook(span, response):
48-
if span and span.is_recording():
49-
span.set_attribute("custom_user_attribute_from_response_hook", "some-value")
50-
# instrument elasticsearch with request and response hooks
51-
ElasticsearchInstrumentor().instrument(request_hook=request_hook, response_hook=response_hook)
52-
# Using elasticsearch as normal now will automatically generate spans,
53-
# including user custom attributes added from the hooks
54-
es = elasticsearch.Elasticsearch()
55-
es.index(index='my-index', doc_type='my-type', id=1, body={'my': 'data', 'timestamp': datetime.now()})
56-
es.get(index='my-index', doc_type='my-type', id=1)
57-
API
58-
---
5918
"""
19+
6020
import re
6121
from logging import getLogger
6222
from os import environ
6323
from typing import Collection
6424
import elasticsearch
65-
import elasticsearch.exceptions
6625
from wrapt import wrap_function_wrapper as _wrap
6726
from opentelemetry.instrumentation.elasticsearch.package import _instruments
6827
from opentelemetry.instrumentation.elasticsearch.version import __version__
6928
from opentelemetry.instrumentation.instrumentor import BaseInstrumentor
7029
from opentelemetry.instrumentation.utils import unwrap
7130
from opentelemetry.semconv.trace import SpanAttributes
7231
from opentelemetry.trace import SpanKind, get_tracer
32+
7333
logger = getLogger(__name__)
74-
# Values to add as tags from the actual
75-
# payload returned by Elasticsearch, if any.
76-
_ATTRIBUTES_FROM_RESULT = [
77-
"found",
78-
"timed_out",
79-
"took",
80-
]
34+
35+
_ATTRIBUTES_FROM_RESULT = ["found", "timed_out", "took"]
8136
_DEFAULT_OP_NAME = "request"
37+
_regex_doc_url = re.compile(r"/_doc/([^/]+)")
38+
39+
8240
class ElasticsearchInstrumentor(BaseInstrumentor):
83-
"""An instrumentor for elasticsearch
84-
See `BaseInstrumentor`
85-
"""
41+
"""An instrumentor for Elasticsearch."""
42+
8643
def __init__(self, span_name_prefix=None):
87-
if not span_name_prefix:
88-
span_name_prefix = environ.get(
89-
"OTEL_PYTHON_ELASTICSEARCH_NAME_PREFIX",
90-
"Elasticsearch",
91-
)
44+
span_name_prefix = span_name_prefix or environ.get(
45+
"OTEL_PYTHON_ELASTICSEARCH_NAME_PREFIX", "Elasticsearch"
46+
)
9247
self._span_name_prefix = span_name_prefix.strip()
9348
super().__init__()
49+
9450
def instrumentation_dependencies(self) -> Collection[str]:
9551
return _instruments
9652

9753
def _instrument(self, **kwargs):
98-
"""
99-
100-
Instruments elasticsearch module
101-
"""
10254
tracer_provider = kwargs.get("tracer_provider")
10355
tracer = get_tracer(__name__, __version__, tracer_provider)
104-
@@ -143,49 +143,124 @@ def _instrument(self, **kwargs):
56+
request_hook = kwargs.get("request_hook")
57+
response_hook = kwargs.get("response_hook")
58+
59+
_wrap(
60+
elasticsearch.Transport,
61+
"perform_request",
62+
_wrap_perform_request(
10563
tracer, self._span_name_prefix, request_hook, response_hook
10664
),
10765
)
10866
_wrap(
109-
elasticsearch,
110-
"AsyncTransport.perform_request",
67+
elasticsearch.AsyncTransport,
68+
"perform_request",
11169
_wrap_perform_async_request(
11270
tracer, self._span_name_prefix, request_hook, response_hook
11371
),
@@ -118,49 +76,31 @@ def _uninstrument(self, **kwargs):
11876
unwrap(elasticsearch.AsyncTransport, "perform_request")
11977

12078

121-
_regex_doc_url = re.compile(r"/_doc/([^/]+)")
122-
123-
12479
def _extract(args, kwargs, span_name_prefix):
125-
method = url = None
80+
method, url = None, None
12681
try:
12782
method, url, *_ = args
12883
except IndexError:
12984
logger.warning(
130-
"expected perform_request to receive two positional arguments. "
131-
"Got %d",
85+
"Expected perform_request to receive two positional arguments. Got %d",
13286
len(args),
13387
)
134-
op_name = span_name_prefix + (url or method or _DEFAULT_OP_NAME)
88+
89+
op_name = f"{span_name_prefix}{url or method or _DEFAULT_OP_NAME}"
13590
doc_id = None
91+
13692
if url:
137-
# TODO: This regex-based solution avoids creating an unbounded number of span names,
138-
# but should be replaced by instrumenting individual Elasticsearch methods instead of
139-
# Transport.perform_request()
140-
# A limitation of the regex is that only the '_doc' mapping type is supported.
141-
# Mapping types are deprecated since Elasticsearch 7
142-
# https://github.com/open-telemetry/opentelemetry-python-contrib/issues/708
14393
match = _regex_doc_url.search(url)
144-
if match is not None:
145-
# Remove the full document ID from the URL
94+
if match:
14695
doc_span = match.span()
147-
op_name = (
148-
span_name_prefix
149-
+ url[: doc_span[0]]
150-
+ "/_doc/:id"
151-
+ url[doc_span[1] :]
152-
)
153-
# Put the document ID in attributes
96+
op_name = f"{span_name_prefix}{url[:doc_span[0]]}/_doc/:id{url[doc_span[1]:]}"
15497
doc_id = match.group(1)
155-
params = kwargs.get("params", {})
156-
body = kwargs.get("body", None)
157-
return method, url, op_name, body, params, doc_id
98+
99+
return method, url, op_name, kwargs.get("body"), kwargs.get("params", {}), doc_id
158100

159101

160102
def _set_span_attributes(span, url, method, body, params, doc_id):
161-
attributes = {
162-
SpanAttributes.DB_SYSTEM: "elasticsearch",
163-
}
103+
attributes = {SpanAttributes.DB_SYSTEM: "elasticsearch"}
164104
if url:
165105
attributes["elasticsearch.url"] = url
166106
if method:
@@ -171,76 +111,60 @@ def _set_span_attributes(span, url, method, body, params, doc_id):
171111
attributes["elasticsearch.params"] = str(params)
172112
if doc_id:
173113
attributes["elasticsearch.id"] = doc_id
114+
174115
for key, value in attributes.items():
175116
span.set_attribute(key, value)
176117

177118

178119
def _set_span_attributes_from_rv(span, return_value):
179120
for member in _ATTRIBUTES_FROM_RESULT:
180121
if member in return_value:
181-
span.set_attribute(
182-
f"elasticsearch.{member}",
183-
str(return_value[member]),
184-
)
122+
span.set_attribute(f"elasticsearch.{member}", str(return_value[member]))
185123

186124

187-
def _wrap_perform_request(
188-
tracer, span_name_prefix, request_hook=None, response_hook=None
189-
):
190-
# pylint: disable=R0912
125+
def _wrap_perform_request(tracer, span_name_prefix, request_hook=None, response_hook=None):
191126
def wrapper(wrapped, _, args, kwargs):
192-
193-
method, url, op_name, body, params, doc_id = _extract(
194-
args, kwargs, span_name_prefix
195-
)
196-
with tracer.start_as_current_span(
197-
op_name,
198-
kind=SpanKind.CLIENT,
199-
) as span:
127+
method, url, op_name, body, params, doc_id = _extract(args, kwargs, span_name_prefix)
200128

129+
with tracer.start_as_current_span(op_name, kind=SpanKind.CLIENT) as span:
201130
if callable(request_hook):
202131
request_hook(span, method, url, kwargs)
203132

204133
if span.is_recording():
205134
_set_span_attributes(span, url, method, body, params, doc_id)
206135

207136
return_value = wrapped(*args, **kwargs)
137+
208138
if isinstance(return_value, dict) and span.is_recording():
209139
_set_span_attributes_from_rv(span, return_value)
210140

211141
if callable(response_hook):
212142
response_hook(span, return_value)
143+
213144
return return_value
214145

215146
return wrapper
216147

217148

218-
def _wrap_perform_async_request(
219-
tracer, span_name_prefix, request_hook=None, response_hook=None
220-
):
221-
# pylint: disable=R0912
149+
def _wrap_perform_async_request(tracer, span_name_prefix, request_hook=None, response_hook=None):
222150
async def wrapper(wrapped, _, args, kwargs):
223-
method, url, op_name, body, params, doc_id = _extract(
224-
args, kwargs, span_name_prefix
225-
)
151+
method, url, op_name, body, params, doc_id = _extract(args, kwargs, span_name_prefix)
226152

227-
with tracer.start_as_current_span(
228-
op_name,
229-
@@ -196,33 +271,14 @@ def wrapper(wrapped, _, args, kwargs):
153+
with tracer.start_as_current_span(op_name, kind=SpanKind.CLIENT) as span:
154+
if callable(request_hook):
230155
request_hook(span, method, url, kwargs)
231156

232157
if span.is_recording():
233-
234-
_set_span_attributes(span, url, method, body, params, doc_id)
158+
_set_span_attributes(span, url, method, body, params, doc_id)
235159

236160
return_value = await wrapped(*args, **kwargs)
161+
237162
if isinstance(return_value, dict) and span.is_recording():
238163
_set_span_attributes_from_rv(span, return_value)
239164

240165
if callable(response_hook):
241-
response_hook(span, return_value)
166+
response_hook(span, return_value)
167+
242168
return return_value
243169

244-
return wrapper
245-
246-
170+
return wrapper

0 commit comments

Comments
 (0)