Skip to content

Commit 7e76f8d

Browse files
authored
Remove OpenTelemetry class (elastic#157)
Since deserialization happens in the main client, this is the place where the OpenTelemetry class and context manager should live.
1 parent f432e04 commit 7e76f8d

File tree

5 files changed

+44
-214
lines changed

5 files changed

+44
-214
lines changed

elastic_transport/__init__.py

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -41,6 +41,7 @@
4141
Urllib3HttpNode,
4242
)
4343
from ._node_pool import NodePool, NodeSelector, RandomSelector, RoundRobinSelector
44+
from ._otel import OpenTelemetrySpan
4445
from ._response import ApiResponse as ApiResponse
4546
from ._response import BinaryApiResponse as BinaryApiResponse
4647
from ._response import HeadApiResponse as HeadApiResponse
@@ -79,6 +80,7 @@
7980
"NodePool",
8081
"NodeSelector",
8182
"ObjectApiResponse",
83+
"OpenTelemetrySpan",
8284
"RandomSelector",
8385
"RequestsHttpNode",
8486
"RoundRobinSelector",

elastic_transport/_async_transport.py

Lines changed: 4 additions & 42 deletions
Original file line numberDiff line numberDiff line change
@@ -174,7 +174,7 @@ def __init__(
174174
# sniffing. Uses '_sniffing_task' instead.
175175
self._sniffing_lock = None # type: ignore[assignment]
176176

177-
async def perform_request( # type: ignore[override]
177+
async def perform_request( # type: ignore[override, return]
178178
self,
179179
method: str,
180180
target: str,
@@ -186,8 +186,7 @@ async def perform_request( # type: ignore[override]
186186
retry_on_timeout: Union[bool, DefaultType] = DEFAULT,
187187
request_timeout: Union[Optional[float], DefaultType] = DEFAULT,
188188
client_meta: Union[Tuple[Tuple[str, str], ...], DefaultType] = DEFAULT,
189-
endpoint_id: Optional[str] = None,
190-
path_parts: Optional[Mapping[str, str]] = None,
189+
otel_span: Union[OpenTelemetrySpan, DefaultType] = DEFAULT,
191190
) -> TransportApiResponse:
192191
"""
193192
Perform the actual request. Retrieve a node from the node
@@ -211,47 +210,9 @@ async def perform_request( # type: ignore[override]
211210
:arg retry_on_timeout: Set to true to retry after timeout errors.
212211
:arg request_timeout: Amount of time to wait for a response to fail with a timeout error.
213212
:arg client_meta: Extra client metadata key-value pairs to send in the client meta header.
214-
:arg endpoint_id: The endpoint id of the request, such as `ml.close_job`.
215-
Used for OpenTelemetry instrumentation.
216-
:arg path_paths: Dictionary with all dynamic value in the url path.
217-
Used for OpenTelemetry instrumentation.
213+
:arg otel_span: OpenTelemetry span used to add metadata to the span.
218214
:returns: Tuple of the :class:`elastic_transport.ApiResponseMeta` with the deserialized response.
219215
"""
220-
path_parts = path_parts if path_parts is not None else {}
221-
with self.otel.span(
222-
method,
223-
endpoint_id=endpoint_id,
224-
path_parts=path_parts,
225-
) as otel_span:
226-
response = await self._perform_request(
227-
method,
228-
target,
229-
body=body,
230-
headers=headers,
231-
max_retries=max_retries,
232-
retry_on_status=retry_on_status,
233-
retry_on_timeout=retry_on_timeout,
234-
request_timeout=request_timeout,
235-
client_meta=client_meta,
236-
otel_span=otel_span,
237-
)
238-
otel_span.set_elastic_cloud_metadata(response.meta.headers)
239-
return response
240-
241-
async def _perform_request( # type: ignore[override,return]
242-
self,
243-
method: str,
244-
target: str,
245-
*,
246-
body: Optional[Any] = None,
247-
headers: Union[Mapping[str, Any], DefaultType] = DEFAULT,
248-
max_retries: Union[int, DefaultType] = DEFAULT,
249-
retry_on_status: Union[Collection[int], DefaultType] = DEFAULT,
250-
retry_on_timeout: Union[bool, DefaultType] = DEFAULT,
251-
request_timeout: Union[Optional[float], DefaultType] = DEFAULT,
252-
client_meta: Union[Tuple[Tuple[str, str], ...], DefaultType] = DEFAULT,
253-
otel_span: OpenTelemetrySpan,
254-
) -> TransportApiResponse:
255216
await self._async_call()
256217

257218
if headers is DEFAULT:
@@ -261,6 +222,7 @@ async def _perform_request( # type: ignore[override,return]
261222
max_retries = resolve_default(max_retries, self.max_retries)
262223
retry_on_timeout = resolve_default(retry_on_timeout, self.retry_on_timeout)
263224
retry_on_status = resolve_default(retry_on_status, self.retry_on_status)
225+
otel_span = resolve_default(otel_span, OpenTelemetrySpan(None))
264226

265227
if self.meta_header:
266228
request_headers["x-elastic-client-meta"] = ",".join(

elastic_transport/_otel.py

Lines changed: 8 additions & 68 deletions
Original file line numberDiff line numberDiff line change
@@ -17,27 +17,13 @@
1717

1818
from __future__ import annotations
1919

20-
import contextlib
21-
import os
22-
from typing import Generator, Mapping, Optional
20+
from typing import TYPE_CHECKING, Mapping
2321

24-
try:
25-
from opentelemetry import trace
26-
from opentelemetry.trace import Span
27-
28-
_tracer: trace.Tracer | None = trace.get_tracer("elastic-transport")
29-
except ModuleNotFoundError:
30-
_tracer = None
22+
if TYPE_CHECKING:
23+
from typing import Literal
3124

25+
from opentelemetry.trace import Span
3226

33-
# Valid values for the enabled config are 'true' and 'false'. Default is 'true'.
34-
ENABLED_ENV_VAR = "OTEL_PYTHON_INSTRUMENTATION_ELASTICSEARCH_ENABLED"
35-
# Describes how to handle search queries in the request body when assigned to
36-
# a span attribute.
37-
# Valid values are 'omit' and 'raw'.
38-
# Default is 'omit' as 'raw' has security implications.
39-
BODY_STRATEGY_ENV_VAR = "OTEL_PYTHON_INSTRUMENTATION_ELASTICSEARCH_CAPTURE_SEARCH_QUERY"
40-
DEFAULT_BODY_STRATEGY = "omit"
4127

4228
# A list of the Elasticsearch endpoints that qualify as "search" endpoints. The search query in
4329
# the request body may be captured for these endpoints, depending on the body capture strategy.
@@ -57,9 +43,10 @@
5743
class OpenTelemetrySpan:
5844
def __init__(
5945
self,
60-
otel_span: Optional[Span],
61-
endpoint_id: Optional[str] = None,
62-
body_strategy: Optional[str] = None,
46+
otel_span: Span | None,
47+
endpoint_id: str | None = None,
48+
# TODO import Literal at the top-level when dropping Python 3.7
49+
body_strategy: 'Literal["omit", "raw"]' = "omit",
6350
):
6451
self.otel_span = otel_span
6552
self.body_strategy = body_strategy
@@ -97,50 +84,3 @@ def set_db_statement(self, serialized_body: bytes) -> None:
9784
self.otel_span.set_attribute(
9885
"db.statement", serialized_body.decode("utf-8")
9986
)
100-
101-
102-
class OpenTelemetry:
103-
def __init__(
104-
self,
105-
enabled: bool | None = None,
106-
tracer: trace.Tracer | None = None,
107-
body_strategy: str | None = None,
108-
):
109-
if enabled is None:
110-
enabled = os.environ.get(ENABLED_ENV_VAR, "false") != "false"
111-
self.tracer = tracer or _tracer
112-
self.enabled = enabled and self.tracer is not None
113-
114-
if body_strategy is not None:
115-
self.body_strategy = body_strategy
116-
else:
117-
self.body_strategy = os.environ.get(
118-
BODY_STRATEGY_ENV_VAR, DEFAULT_BODY_STRATEGY
119-
)
120-
121-
@contextlib.contextmanager
122-
def span(
123-
self,
124-
method: str,
125-
*,
126-
endpoint_id: Optional[str],
127-
path_parts: Mapping[str, str],
128-
) -> Generator[OpenTelemetrySpan, None, None]:
129-
if not self.enabled or self.tracer is None:
130-
yield OpenTelemetrySpan(None)
131-
return
132-
133-
span_name = endpoint_id or method
134-
with self.tracer.start_as_current_span(span_name) as otel_span:
135-
otel_span.set_attribute("http.request.method", method)
136-
otel_span.set_attribute("db.system", "elasticsearch")
137-
if endpoint_id is not None:
138-
otel_span.set_attribute("db.operation", endpoint_id)
139-
for key, value in path_parts.items():
140-
otel_span.set_attribute(f"db.elasticsearch.path_parts.{key}", value)
141-
142-
yield OpenTelemetrySpan(
143-
otel_span,
144-
endpoint_id=endpoint_id,
145-
body_strategy=self.body_strategy,
146-
)

elastic_transport/_transport.py

Lines changed: 6 additions & 46 deletions
Original file line numberDiff line numberDiff line change
@@ -60,7 +60,7 @@
6060
Urllib3HttpNode,
6161
)
6262
from ._node_pool import NodePool, NodeSelector
63-
from ._otel import OpenTelemetry, OpenTelemetrySpan
63+
from ._otel import OpenTelemetrySpan
6464
from ._serializer import DEFAULT_SERIALIZERS, Serializer, SerializerCollection
6565
from ._version import __version__
6666
from .client_utils import client_meta_version, resolve_default
@@ -226,9 +226,6 @@ def __init__(
226226
self.retry_on_status = retry_on_status
227227
self.retry_on_timeout = retry_on_timeout
228228

229-
# Instrumentation
230-
self.otel = OpenTelemetry()
231-
232229
# Build the NodePool from all the options
233230
node_pool_kwargs: Dict[str, Any] = {}
234231
if node_selector_class is not None:
@@ -256,7 +253,7 @@ def __init__(
256253
if sniff_on_start:
257254
self.sniff(True)
258255

259-
def perform_request(
256+
def perform_request( # type: ignore[return]
260257
self,
261258
method: str,
262259
target: str,
@@ -268,8 +265,7 @@ def perform_request(
268265
retry_on_timeout: Union[bool, DefaultType] = DEFAULT,
269266
request_timeout: Union[Optional[float], DefaultType] = DEFAULT,
270267
client_meta: Union[Tuple[Tuple[str, str], ...], DefaultType] = DEFAULT,
271-
endpoint_id: Optional[str] = None,
272-
path_parts: Optional[Mapping[str, str]] = None,
268+
otel_span: Union[OpenTelemetrySpan, DefaultType] = DEFAULT,
273269
) -> TransportApiResponse:
274270
"""
275271
Perform the actual request. Retrieve a node from the node
@@ -293,54 +289,18 @@ def perform_request(
293289
:arg retry_on_timeout: Set to true to retry after timeout errors.
294290
:arg request_timeout: Amount of time to wait for a response to fail with a timeout error.
295291
:arg client_meta: Extra client metadata key-value pairs to send in the client meta header.
296-
:arg endpoint_id: The endpoint id of the request, such as `ml.close_job`.
297-
Used for OpenTelemetry instrumentation.
298-
:arg path_paths: Dictionary with all dynamic value in the url path.
299-
Used for OpenTelemetry instrumentation.
292+
:arg otel_span: OpenTelemetry span used to add metadata to the span.
293+
300294
:returns: Tuple of the :class:`elastic_transport.ApiResponseMeta` with the deserialized response.
301295
"""
302-
path_parts = path_parts if path_parts is not None else {}
303-
with self.otel.span(
304-
method,
305-
endpoint_id=endpoint_id,
306-
path_parts=path_parts,
307-
) as otel_span:
308-
response = self._perform_request(
309-
method,
310-
target,
311-
body=body,
312-
headers=headers,
313-
max_retries=max_retries,
314-
retry_on_status=retry_on_status,
315-
retry_on_timeout=retry_on_timeout,
316-
request_timeout=request_timeout,
317-
client_meta=client_meta,
318-
otel_span=otel_span,
319-
)
320-
otel_span.set_elastic_cloud_metadata(response.meta.headers)
321-
return response
322-
323-
def _perform_request( # type: ignore[return]
324-
self,
325-
method: str,
326-
target: str,
327-
*,
328-
body: Optional[Any] = None,
329-
headers: Union[Mapping[str, Any], DefaultType] = DEFAULT,
330-
max_retries: Union[int, DefaultType] = DEFAULT,
331-
retry_on_status: Union[Collection[int], DefaultType] = DEFAULT,
332-
retry_on_timeout: Union[bool, DefaultType] = DEFAULT,
333-
request_timeout: Union[Optional[float], DefaultType] = DEFAULT,
334-
client_meta: Union[Tuple[Tuple[str, str], ...], DefaultType] = DEFAULT,
335-
otel_span: OpenTelemetrySpan,
336-
) -> TransportApiResponse:
337296
if headers is DEFAULT:
338297
request_headers = HttpHeaders()
339298
else:
340299
request_headers = HttpHeaders(headers)
341300
max_retries = resolve_default(max_retries, self.max_retries)
342301
retry_on_timeout = resolve_default(retry_on_timeout, self.retry_on_timeout)
343302
retry_on_status = resolve_default(retry_on_status, self.retry_on_status)
303+
otel_span = resolve_default(otel_span, OpenTelemetrySpan(None))
344304

345305
if self.meta_header:
346306
request_headers["x-elastic-client-meta"] = ",".join(

0 commit comments

Comments
 (0)