From e6c9c441afa0e4f82c302fa8bc9089fd242760fd Mon Sep 17 00:00:00 2001 From: Miguel Grinberg Date: Fri, 11 Jul 2025 15:34:19 +0100 Subject: [PATCH 1/2] Revert "Update opentelemetry conventions (#245) (#249)" This reverts commit c7e210dee5c9bc9e59257b608d458b263059c2b4. --- docs/sphinx/conf.py | 2 +- elastic_transport/_async_transport.py | 5 +---- elastic_transport/_node/_http_aiohttp.py | 1 + elastic_transport/_otel.py | 25 ++++-------------------- elastic_transport/_transport.py | 5 +---- tests/async_/test_async_transport.py | 7 +++++-- tests/test_node_pool.py | 2 ++ tests/test_otel.py | 16 +++------------ tests/test_transport.py | 5 +++++ utils/build-dists.py | 1 + 10 files changed, 24 insertions(+), 45 deletions(-) diff --git a/docs/sphinx/conf.py b/docs/sphinx/conf.py index 85a6edae..e8ae94bd 100644 --- a/docs/sphinx/conf.py +++ b/docs/sphinx/conf.py @@ -45,5 +45,5 @@ intersphinx_mapping = { "python": ("https://docs.python.org/3", None), - "requests": ("https://requests.readthedocs.io/en/latest", None), + "requests": ("https://docs.python-requests.org/en/latest", None), } diff --git a/elastic_transport/_async_transport.py b/elastic_transport/_async_transport.py index 33017147..1e66f742 100644 --- a/elastic_transport/_async_transport.py +++ b/elastic_transport/_async_transport.py @@ -260,9 +260,7 @@ async def perform_request( # type: ignore[override, return] node: BaseAsyncNode = self.node_pool.get() # type: ignore[assignment] start_time = self._loop.time() try: - otel_span.set_node_metadata( - node.host, node.port, node.base_url, target, method - ) + otel_span.set_node_metadata(node.host, node.port, node.base_url, target) resp = await node.perform_request( method, target, @@ -366,7 +364,6 @@ async def perform_request( # type: ignore[override, return] # We either got a response we're happy with or # we've exhausted all of our retries so we return it. if not retry or attempt >= max_retries: - otel_span.set_db_response(resp.meta.status) return TransportApiResponse(resp.meta, body) else: _logger.warning( diff --git a/elastic_transport/_node/_http_aiohttp.py b/elastic_transport/_node/_http_aiohttp.py index 5ed17003..be55b934 100644 --- a/elastic_transport/_node/_http_aiohttp.py +++ b/elastic_transport/_node/_http_aiohttp.py @@ -141,6 +141,7 @@ async def perform_request( # type: ignore[override] headers: Optional[HttpHeaders] = None, request_timeout: Union[DefaultType, Optional[float]] = DEFAULT, ) -> NodeApiResponse: + global _AIOHTTP_FIXED_HEAD_BUG if self.session is None: self._create_aiohttp_session() assert self.session is not None diff --git a/elastic_transport/_otel.py b/elastic_transport/_otel.py index 1097a000..d30d9719 100644 --- a/elastic_transport/_otel.py +++ b/elastic_transport/_otel.py @@ -49,25 +49,14 @@ def __init__( self.body_strategy = body_strategy self.endpoint_id = endpoint_id - if self.otel_span: - self.otel_span.set_attribute("db.system.name", "elasticsearch") - if self.endpoint_id: - self.otel_span.set_attribute("db.operation.name", self.endpoint_id) - def set_node_metadata( - self, - host: str, - port: int, - base_url: str, - target: str, - method: str, + self, host: str, port: int, base_url: str, target: str ) -> None: if self.otel_span is None: return # url.full does not contain auth info which is passed as headers self.otel_span.set_attribute("url.full", base_url + target) - self.otel_span.set_attribute("http.request.method", method) self.otel_span.set_attribute("server.address", host) self.otel_span.set_attribute("server.port", port) @@ -77,10 +66,10 @@ def set_elastic_cloud_metadata(self, headers: Mapping[str, str]) -> None: cluster_name = headers.get("X-Found-Handling-Cluster") if cluster_name is not None: - self.otel_span.set_attribute("db.namespace", cluster_name) + self.otel_span.set_attribute("db.elasticsearch.cluster.name", cluster_name) node_name = headers.get("X-Found-Handling-Instance") if node_name is not None: - self.otel_span.set_attribute("elasticsearch.node.name", node_name) + self.otel_span.set_attribute("db.elasticsearch.node.name", node_name) def set_db_statement(self, serialized_body: bytes) -> None: if self.otel_span is None: @@ -90,11 +79,5 @@ def set_db_statement(self, serialized_body: bytes) -> None: return elif self.body_strategy == "raw" and self.endpoint_id in SEARCH_ENDPOINTS: self.otel_span.set_attribute( - "db.query.text", serialized_body.decode("utf-8") + "db.statement", serialized_body.decode("utf-8") ) - - def set_db_response(self, status_code: int) -> None: - if self.otel_span is None: - return - - self.otel_span.set_attribute("db.response.status_code", str(status_code)) diff --git a/elastic_transport/_transport.py b/elastic_transport/_transport.py index 32fb9e0f..3219e52c 100644 --- a/elastic_transport/_transport.py +++ b/elastic_transport/_transport.py @@ -338,9 +338,7 @@ def perform_request( # type: ignore[return] node = self.node_pool.get() start_time = time.time() try: - otel_span.set_node_metadata( - node.host, node.port, node.base_url, target, method - ) + otel_span.set_node_metadata(node.host, node.port, node.base_url, target) resp = node.perform_request( method, target, @@ -444,7 +442,6 @@ def perform_request( # type: ignore[return] # We either got a response we're happy with or # we've exhausted all of our retries so we return it. if not retry or attempt >= max_retries: - otel_span.set_db_response(resp.meta.status) return TransportApiResponse(resp.meta, body) else: _logger.warning( diff --git a/tests/async_/test_async_transport.py b/tests/async_/test_async_transport.py index 24a869ce..f541b16d 100644 --- a/tests/async_/test_async_transport.py +++ b/tests/async_/test_async_transport.py @@ -393,6 +393,7 @@ async def test_sniff_on_start(): calls = [] def sniff_callback(*args): + nonlocal calls calls.append(args) return [NodeConfig("http", "localhost", 80)] @@ -419,6 +420,7 @@ async def test_sniff_before_requests(): calls = [] def sniff_callback(*args): + nonlocal calls calls.append(args) return [] @@ -444,6 +446,7 @@ async def test_sniff_on_node_failure(): calls = [] def sniff_callback(*args): + nonlocal calls calls.append(args) return [] @@ -548,7 +551,7 @@ async def test_sniffed_nodes_added_to_pool(async_sniff_callback): if async_sniff_callback: async def sniff_callback(*_): - nonlocal sniffed_at + nonlocal loop, sniffed_at await asyncio.sleep(0.1) sniffed_at = loop.time() return sniffed_nodes @@ -556,7 +559,7 @@ async def sniff_callback(*_): else: def sniff_callback(*_): - nonlocal sniffed_at + nonlocal loop, sniffed_at time.sleep(0.1) sniffed_at = loop.time() return sniffed_nodes diff --git a/tests/test_node_pool.py b/tests/test_node_pool.py index 49b14ab9..22160155 100644 --- a/tests/test_node_pool.py +++ b/tests/test_node_pool.py @@ -245,6 +245,8 @@ def __init__(self): self.nodes_gotten = 0 def run(self) -> None: + nonlocal pool + while time.time() < start + 2: node = pool.get() self.nodes_gotten += 1 diff --git a/tests/test_otel.py b/tests/test_otel.py index 2b580a13..e1666ed0 100644 --- a/tests/test_otel.py +++ b/tests/test_otel.py @@ -41,7 +41,6 @@ def test_no_span(): 9200, "http://localhost:9200/", "_ml/anomaly_detectors/my-job/_open", - "POST", ) span.set_elastic_cloud_metadata( { @@ -66,7 +65,6 @@ def test_detailed_span(): 9200, "http://localhost:9200/", "_ml/anomaly_detectors/my-job/_open", - "POST", ) span.set_elastic_cloud_metadata( { @@ -74,22 +72,16 @@ def test_detailed_span(): "X-Found-Handling-Instance": "instance-0000000001", } ) - span.set_db_response(202) spans = memory_exporter.get_finished_spans() assert len(spans) == 1 assert spans[0].name == "ml.open_job" assert spans[0].attributes == { - "db.system.name": "elasticsearch", "url.full": "http://localhost:9200/_ml/anomaly_detectors/my-job/_open", - "http.request.method": "POST", "server.address": "localhost", "server.port": 9200, - "db.operation.name": "my-job/_open", - "http.request.method": "POST", - "db.namespace": "e9106fc68e3044f0b1475b04bf4ffd5f", - "elasticsearch.node.name": "instance-0000000001", - "db.response.status_code": "202", + "db.elasticsearch.cluster.name": "e9106fc68e3044f0b1475b04bf4ffd5f", + "db.elasticsearch.node.name": "instance-0000000001", } @@ -103,7 +95,5 @@ def test_db_statement(): assert len(spans) == 1 assert spans[0].name == "search" assert spans[0].attributes == { - "db.system.name": "elasticsearch", - "db.operation.name": "search", - "db.query.text": '{"query":{"match_all":{}}}', + "db.statement": '{"query":{"match_all":{}}}', } diff --git a/tests/test_transport.py b/tests/test_transport.py index 08d8e04f..07d063d2 100644 --- a/tests/test_transport.py +++ b/tests/test_transport.py @@ -434,6 +434,7 @@ def test_sniff_on_start(): calls = [] def sniff_callback(*args): + nonlocal calls calls.append(args) return [NodeConfig("http", "localhost", 80)] @@ -457,6 +458,7 @@ def test_sniff_before_requests(): calls = [] def sniff_callback(*args): + nonlocal calls calls.append(args) return [] @@ -480,6 +482,7 @@ def test_sniff_on_node_failure(): calls = [] def sniff_callback(*args): + nonlocal calls calls.append(args) return [] @@ -652,6 +655,8 @@ def __init__(self): self.successful_requests = 0 def run(self) -> None: + nonlocal t, start + while time.time() < start + 2: t.perform_request("GET", "/") self.successful_requests += 1 diff --git a/utils/build-dists.py b/utils/build-dists.py index 4217db72..e5320faf 100644 --- a/utils/build-dists.py +++ b/utils/build-dists.py @@ -54,6 +54,7 @@ def set_tmp_dir(): def run(argv, expect_exit_code=0): + global tmp_dir if tmp_dir is None: os.chdir(base_dir) else: From 2c9637952969cafc579ec505dbe7b25f9168aa07 Mon Sep 17 00:00:00 2001 From: Miguel Grinberg Date: Mon, 14 Jul 2025 14:28:05 +0100 Subject: [PATCH 2/2] keep the linter updates --- elastic_transport/_node/_http_aiohttp.py | 1 - tests/async_/test_async_transport.py | 7 ++----- tests/test_node_pool.py | 2 -- tests/test_transport.py | 5 ----- utils/build-dists.py | 1 - 5 files changed, 2 insertions(+), 14 deletions(-) diff --git a/elastic_transport/_node/_http_aiohttp.py b/elastic_transport/_node/_http_aiohttp.py index be55b934..5ed17003 100644 --- a/elastic_transport/_node/_http_aiohttp.py +++ b/elastic_transport/_node/_http_aiohttp.py @@ -141,7 +141,6 @@ async def perform_request( # type: ignore[override] headers: Optional[HttpHeaders] = None, request_timeout: Union[DefaultType, Optional[float]] = DEFAULT, ) -> NodeApiResponse: - global _AIOHTTP_FIXED_HEAD_BUG if self.session is None: self._create_aiohttp_session() assert self.session is not None diff --git a/tests/async_/test_async_transport.py b/tests/async_/test_async_transport.py index f541b16d..24a869ce 100644 --- a/tests/async_/test_async_transport.py +++ b/tests/async_/test_async_transport.py @@ -393,7 +393,6 @@ async def test_sniff_on_start(): calls = [] def sniff_callback(*args): - nonlocal calls calls.append(args) return [NodeConfig("http", "localhost", 80)] @@ -420,7 +419,6 @@ async def test_sniff_before_requests(): calls = [] def sniff_callback(*args): - nonlocal calls calls.append(args) return [] @@ -446,7 +444,6 @@ async def test_sniff_on_node_failure(): calls = [] def sniff_callback(*args): - nonlocal calls calls.append(args) return [] @@ -551,7 +548,7 @@ async def test_sniffed_nodes_added_to_pool(async_sniff_callback): if async_sniff_callback: async def sniff_callback(*_): - nonlocal loop, sniffed_at + nonlocal sniffed_at await asyncio.sleep(0.1) sniffed_at = loop.time() return sniffed_nodes @@ -559,7 +556,7 @@ async def sniff_callback(*_): else: def sniff_callback(*_): - nonlocal loop, sniffed_at + nonlocal sniffed_at time.sleep(0.1) sniffed_at = loop.time() return sniffed_nodes diff --git a/tests/test_node_pool.py b/tests/test_node_pool.py index 22160155..49b14ab9 100644 --- a/tests/test_node_pool.py +++ b/tests/test_node_pool.py @@ -245,8 +245,6 @@ def __init__(self): self.nodes_gotten = 0 def run(self) -> None: - nonlocal pool - while time.time() < start + 2: node = pool.get() self.nodes_gotten += 1 diff --git a/tests/test_transport.py b/tests/test_transport.py index 07d063d2..08d8e04f 100644 --- a/tests/test_transport.py +++ b/tests/test_transport.py @@ -434,7 +434,6 @@ def test_sniff_on_start(): calls = [] def sniff_callback(*args): - nonlocal calls calls.append(args) return [NodeConfig("http", "localhost", 80)] @@ -458,7 +457,6 @@ def test_sniff_before_requests(): calls = [] def sniff_callback(*args): - nonlocal calls calls.append(args) return [] @@ -482,7 +480,6 @@ def test_sniff_on_node_failure(): calls = [] def sniff_callback(*args): - nonlocal calls calls.append(args) return [] @@ -655,8 +652,6 @@ def __init__(self): self.successful_requests = 0 def run(self) -> None: - nonlocal t, start - while time.time() < start + 2: t.perform_request("GET", "/") self.successful_requests += 1 diff --git a/utils/build-dists.py b/utils/build-dists.py index e5320faf..4217db72 100644 --- a/utils/build-dists.py +++ b/utils/build-dists.py @@ -54,7 +54,6 @@ def set_tmp_dir(): def run(argv, expect_exit_code=0): - global tmp_dir if tmp_dir is None: os.chdir(base_dir) else: