Skip to content

Commit ac0e4e0

Browse files
authored
Update __init__.py
1 parent 1874501 commit ac0e4e0

File tree

1 file changed

+113
-107
lines changed
  • instrumentation/opentelemetry-instrumentation-elasticsearch/src/opentelemetry/instrumentation/elasticsearch

1 file changed

+113
-107
lines changed

instrumentation/opentelemetry-instrumentation-elasticsearch/src/opentelemetry/instrumentation/elasticsearch/__init__.py

Lines changed: 113 additions & 107 deletions
Original file line numberDiff line numberDiff line change
@@ -11,111 +11,79 @@
1111
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
1212
# See the License for the specific language governing permissions and
1313
# limitations under the License.
14-
1514
"""
1615
This library allows tracing HTTP elasticsearch made by the
1716
`elasticsearch <https://elasticsearch-py.readthedocs.io/en/master/>`_ library.
18-
1917
Usage
2018
-----
21-
2219
.. code-block:: python
23-
2420
from opentelemetry.instrumentation.elasticsearch import ElasticsearchInstrumentor
2521
import elasticsearch
26-
27-
2822
# instrument elasticsearch
2923
ElasticsearchInstrumentor().instrument()
30-
3124
# Using elasticsearch as normal now will automatically generate spans
3225
es = elasticsearch.Elasticsearch()
3326
es.index(index='my-index', doc_type='my-type', id=1, body={'my': 'data', 'timestamp': datetime.now()})
3427
es.get(index='my-index', doc_type='my-type', id=1)
35-
3628
Elasticsearch instrumentation prefixes operation names with the string "Elasticsearch". This
3729
can be changed to a different string by either setting the `OTEL_PYTHON_ELASTICSEARCH_NAME_PREFIX`
3830
environment variable or by passing the prefix as an argument to the instrumentor. For example,
39-
40-
4131
.. code-block:: python
42-
4332
ElasticsearchInstrumentor("my-custom-prefix").instrument()
44-
45-
4633
The `instrument` method accepts the following keyword args:
47-
4834
tracer_provider (TracerProvider) - an optional tracer provider
4935
request_hook (Callable) - a function with extra user-defined logic to be performed before performing the request
5036
this function signature is:
5137
def request_hook(span: Span, method: str, url: str, kwargs)
5238
response_hook (Callable) - a function with extra user-defined logic to be performed after performing the request
5339
this function signature is:
5440
def response_hook(span: Span, response: dict)
55-
5641
for example:
57-
5842
.. code: python
59-
6043
from opentelemetry.instrumentation.elasticsearch import ElasticsearchInstrumentor
6144
import elasticsearch
62-
6345
def request_hook(span, method, url, kwargs):
6446
if span and span.is_recording():
6547
span.set_attribute("custom_user_attribute_from_request_hook", "some-value")
66-
6748
def response_hook(span, response):
6849
if span and span.is_recording():
6950
span.set_attribute("custom_user_attribute_from_response_hook", "some-value")
70-
7151
# instrument elasticsearch with request and response hooks
7252
ElasticsearchInstrumentor().instrument(request_hook=request_hook, response_hook=response_hook)
73-
7453
# Using elasticsearch as normal now will automatically generate spans,
7554
# including user custom attributes added from the hooks
7655
es = elasticsearch.Elasticsearch()
7756
es.index(index='my-index', doc_type='my-type', id=1, body={'my': 'data', 'timestamp': datetime.now()})
7857
es.get(index='my-index', doc_type='my-type', id=1)
79-
8058
API
8159
---
8260
"""
83-
8461
import re
8562
from logging import getLogger
8663
from os import environ
8764
from typing import Collection
88-
8965
import elasticsearch
9066
import elasticsearch.exceptions
9167
from wrapt import wrap_function_wrapper as _wrap
92-
9368
from opentelemetry.instrumentation.elasticsearch.package import _instruments
9469
from opentelemetry.instrumentation.elasticsearch.version import __version__
9570
from opentelemetry.instrumentation.instrumentor import BaseInstrumentor
9671
from opentelemetry.instrumentation.utils import unwrap
9772
from opentelemetry.semconv.trace import SpanAttributes
9873
from opentelemetry.trace import SpanKind, get_tracer
99-
10074
logger = getLogger(__name__)
101-
102-
10375
# Values to add as tags from the actual
10476
# payload returned by Elasticsearch, if any.
10577
_ATTRIBUTES_FROM_RESULT = [
10678
"found",
10779
"timed_out",
10880
"took",
10981
]
110-
11182
_DEFAULT_OP_NAME = "request"
112-
113-
11483
class ElasticsearchInstrumentor(BaseInstrumentor):
11584
"""An instrumentor for elasticsearch
11685
See `BaseInstrumentor`
11786
"""
118-
11987
def __init__(self, span_name_prefix=None):
12088
if not span_name_prefix:
12189
span_name_prefix = environ.get(
@@ -124,80 +92,108 @@ def __init__(self, span_name_prefix=None):
12492
)
12593
self._span_name_prefix = span_name_prefix.strip()
12694
super().__init__()
127-
12895
def instrumentation_dependencies(self) -> Collection[str]:
12996
return _instruments
13097

13198
def _instrument(self, **kwargs):
13299
"""
133-
Instruments elasticsearch module
100+
101+
Instruments elasticsearch module
134102
"""
135103
tracer_provider = kwargs.get("tracer_provider")
136104
tracer = get_tracer(__name__, __version__, tracer_provider)
137-
request_hook = kwargs.get("request_hook")
138-
response_hook = kwargs.get("response_hook")
105+
@@ -143,49 +143,124 @@ def _instrument(self, **kwargs):
106+
tracer, self._span_name_prefix, request_hook, response_hook
107+
),
108+
)
139109
_wrap(
140110
elasticsearch,
141111
"AsyncTransport.perform_request",
142-
_wrap_perform_request(
112+
_wrap_perform_async_request(
143113
tracer, self._span_name_prefix, request_hook, response_hook
144114
),
145115
)
146116

147117
def _uninstrument(self, **kwargs):
118+
unwrap(elasticsearch.Transport, "perform_request")
148119
unwrap(elasticsearch.AsyncTransport, "perform_request")
149120

150121

151122
_regex_doc_url = re.compile(r"/_doc/([^/]+)")
152123

153-
# search api https://www.elastic.co/guide/en/elasticsearch/reference/current/search-search.html
154-
_regex_search_url = re.compile(r"/([^/]+)/_search[/]?")
124+
125+
def _extract(args, kwargs, span_name_prefix):
126+
method = url = None
127+
try:
128+
method, url, *_ = args
129+
except IndexError:
130+
logger.warning(
131+
"expected perform_request to receive two positional arguments. "
132+
"Got %d",
133+
len(args),
134+
)
135+
op_name = span_name_prefix + (url or method or _DEFAULT_OP_NAME)
136+
doc_id = None
137+
if url:
138+
# TODO: This regex-based solution avoids creating an unbounded number of span names,
139+
# but should be replaced by instrumenting individual Elasticsearch methods instead of
140+
# Transport.perform_request()
141+
# A limitation of the regex is that only the '_doc' mapping type is supported.
142+
# Mapping types are deprecated since Elasticsearch 7
143+
# https://github.com/open-telemetry/opentelemetry-python-contrib/issues/708
144+
match = _regex_doc_url.search(url)
145+
if match is not None:
146+
# Remove the full document ID from the URL
147+
doc_span = match.span()
148+
op_name = (
149+
span_name_prefix
150+
+ url[: doc_span[0]]
151+
+ "/_doc/:id"
152+
+ url[doc_span[1] :]
153+
)
154+
# Put the document ID in attributes
155+
doc_id = match.group(1)
156+
params = kwargs.get("params", {})
157+
body = kwargs.get("body", None)
158+
return method, url, op_name, body, params, doc_id
159+
160+
161+
def _set_span_attributes(span, url, method, body, params, doc_id):
162+
attributes = {
163+
SpanAttributes.DB_SYSTEM: "elasticsearch",
164+
}
165+
if url:
166+
attributes["elasticsearch.url"] = url
167+
if method:
168+
attributes["elasticsearch.method"] = method
169+
if body:
170+
attributes[SpanAttributes.DB_STATEMENT] = str(body)
171+
if params:
172+
attributes["elasticsearch.params"] = str(params)
173+
if doc_id:
174+
attributes["elasticsearch.id"] = doc_id
175+
for key, value in attributes.items():
176+
span.set_attribute(key, value)
177+
178+
179+
def _set_span_attributes_from_rv(span, return_value):
180+
for member in _ATTRIBUTES_FROM_RESULT:
181+
if member in return_value:
182+
span.set_attribute(
183+
f"elasticsearch.{member}",
184+
str(return_value[member]),
185+
)
155186

156187

157188
def _wrap_perform_request(
158189
tracer, span_name_prefix, request_hook=None, response_hook=None
159190
):
160-
# pylint: disable=R0912,R0914
191+
# pylint: disable=R0912
161192
def wrapper(wrapped, _, args, kwargs):
162-
method = url = None
163-
try:
164-
method, url, *_ = args
165-
except IndexError:
166-
logger.warning(
167-
"expected perform_request to receive two positional arguments. "
168-
"Got %d",
169-
len(args),
170-
)
171-
172-
op_name = span_name_prefix + (url or method or _DEFAULT_OP_NAME)
173-
174-
doc_id = None
175-
search_target = None
176-
177-
if url:
178-
# TODO: This regex-based solution avoids creating an unbounded number of span names, but should be replaced by instrumenting individual Elasticsearch methods instead of Transport.perform_request()
179-
# A limitation of the regex is that only the '_doc' mapping type is supported. Mapping types are deprecated since Elasticsearch 7
180-
# https://github.com/open-telemetry/opentelemetry-python-contrib/issues/708
181-
match = _regex_doc_url.search(url)
182-
if match is not None:
183-
# Remove the full document ID from the URL
184-
doc_span = match.span()
185-
op_name = (
186-
span_name_prefix
187-
+ url[: doc_span[0]]
188-
+ "/_doc/:id"
189-
+ url[doc_span[1] :]
190-
)
191-
# Put the document ID in attributes
192-
doc_id = match.group(1)
193-
match = _regex_search_url.search(url)
194-
if match is not None:
195-
op_name = span_name_prefix + "/<target>/_search"
196-
search_target = match.group(1)
197-
198-
params = kwargs.get("params", {})
199-
body = kwargs.get("body", None)
200-
193+
194+
method, url, op_name, body, params, doc_id = _extract(
195+
args, kwargs, span_name_prefix
196+
)
201197
with tracer.start_as_current_span(
202198
op_name,
203199
kind=SpanKind.CLIENT,
@@ -207,35 +203,45 @@ def wrapper(wrapped, _, args, kwargs):
207203
request_hook(span, method, url, kwargs)
208204

209205
if span.is_recording():
210-
attributes = {
211-
SpanAttributes.DB_SYSTEM: "elasticsearch",
212-
}
213-
if url:
214-
attributes["elasticsearch.url"] = url
215-
if method:
216-
attributes["elasticsearch.method"] = method
217-
if body:
218-
attributes[SpanAttributes.DB_STATEMENT] = str(body)
219-
if params:
220-
attributes["elasticsearch.params"] = str(params)
221-
if doc_id:
222-
attributes["elasticsearch.id"] = doc_id
223-
if search_target:
224-
attributes["elasticsearch.target"] = search_target
225-
for key, value in attributes.items():
226-
span.set_attribute(key, value)
227-
228-
rv = wrapped(*args, **kwargs)
229-
if isinstance(rv, dict) and span.is_recording():
230-
for member in _ATTRIBUTES_FROM_RESULT:
231-
if member in rv:
232-
span.set_attribute(
233-
f"elasticsearch.{member}",
234-
str(rv[member]),
235-
)
206+
_set_span_attributes(span, url, method, body, params, doc_id)
207+
208+
return_value = wrapped(*args, **kwargs)
209+
if isinstance(return_value, dict) and span.is_recording():
210+
_set_span_attributes_from_rv(span, return_value)
211+
212+
if callable(response_hook):
213+
response_hook(span, return_value)
214+
return return_value
215+
216+
return wrapper
217+
218+
219+
def _wrap_perform_async_request(
220+
tracer, span_name_prefix, request_hook=None, response_hook=None
221+
):
222+
# pylint: disable=R0912
223+
async def wrapper(wrapped, _, args, kwargs):
224+
method, url, op_name, body, params, doc_id = _extract(
225+
args, kwargs, span_name_prefix
226+
)
227+
228+
with tracer.start_as_current_span(
229+
op_name,
230+
@@ -196,33 +271,14 @@ def wrapper(wrapped, _, args, kwargs):
231+
request_hook(span, method, url, kwargs)
232+
233+
if span.is_recording():
234+
235+
_set_span_attributes(span, url, method, body, params, doc_id)
236+
237+
return_value = await wrapped(*args, **kwargs)
238+
if isinstance(return_value, dict) and span.is_recording():
239+
_set_span_attributes_from_rv(span, return_value)
236240

237241
if callable(response_hook):
238-
response_hook(span, rv)
239-
return rv
242+
response_hook(span, return_value)
243+
return return_value
240244

241245
return wrapper
246+
247+

0 commit comments

Comments
 (0)