diff --git a/docs/sphinx/conf.py b/docs/sphinx/conf.py index e8ae94bd..85a6edae 100644 --- a/docs/sphinx/conf.py +++ b/docs/sphinx/conf.py @@ -45,5 +45,5 @@ intersphinx_mapping = { "python": ("https://docs.python.org/3", None), - "requests": ("https://docs.python-requests.org/en/latest", None), + "requests": ("https://requests.readthedocs.io/en/latest", None), } diff --git a/elastic_transport/_async_transport.py b/elastic_transport/_async_transport.py index 1e66f742..33017147 100644 --- a/elastic_transport/_async_transport.py +++ b/elastic_transport/_async_transport.py @@ -260,7 +260,9 @@ async def perform_request( # type: ignore[override, return] node: BaseAsyncNode = self.node_pool.get() # type: ignore[assignment] start_time = self._loop.time() try: - otel_span.set_node_metadata(node.host, node.port, node.base_url, target) + otel_span.set_node_metadata( + node.host, node.port, node.base_url, target, method + ) resp = await node.perform_request( method, target, @@ -364,6 +366,7 @@ async def perform_request( # type: ignore[override, return] # We either got a response we're happy with or # we've exhausted all of our retries so we return it. if not retry or attempt >= max_retries: + otel_span.set_db_response(resp.meta.status) return TransportApiResponse(resp.meta, body) else: _logger.warning( diff --git a/elastic_transport/_node/_http_aiohttp.py b/elastic_transport/_node/_http_aiohttp.py index be55b934..5ed17003 100644 --- a/elastic_transport/_node/_http_aiohttp.py +++ b/elastic_transport/_node/_http_aiohttp.py @@ -141,7 +141,6 @@ async def perform_request( # type: ignore[override] headers: Optional[HttpHeaders] = None, request_timeout: Union[DefaultType, Optional[float]] = DEFAULT, ) -> NodeApiResponse: - global _AIOHTTP_FIXED_HEAD_BUG if self.session is None: self._create_aiohttp_session() assert self.session is not None diff --git a/elastic_transport/_otel.py b/elastic_transport/_otel.py index d30d9719..1097a000 100644 --- a/elastic_transport/_otel.py +++ b/elastic_transport/_otel.py @@ -49,14 +49,25 @@ def __init__( self.body_strategy = body_strategy self.endpoint_id = endpoint_id + if self.otel_span: + self.otel_span.set_attribute("db.system.name", "elasticsearch") + if self.endpoint_id: + self.otel_span.set_attribute("db.operation.name", self.endpoint_id) + def set_node_metadata( - self, host: str, port: int, base_url: str, target: str + self, + host: str, + port: int, + base_url: str, + target: str, + method: str, ) -> None: if self.otel_span is None: return # url.full does not contain auth info which is passed as headers self.otel_span.set_attribute("url.full", base_url + target) + self.otel_span.set_attribute("http.request.method", method) self.otel_span.set_attribute("server.address", host) self.otel_span.set_attribute("server.port", port) @@ -66,10 +77,10 @@ def set_elastic_cloud_metadata(self, headers: Mapping[str, str]) -> None: cluster_name = headers.get("X-Found-Handling-Cluster") if cluster_name is not None: - self.otel_span.set_attribute("db.elasticsearch.cluster.name", cluster_name) + self.otel_span.set_attribute("db.namespace", cluster_name) node_name = headers.get("X-Found-Handling-Instance") if node_name is not None: - self.otel_span.set_attribute("db.elasticsearch.node.name", node_name) + self.otel_span.set_attribute("elasticsearch.node.name", node_name) def set_db_statement(self, serialized_body: bytes) -> None: if self.otel_span is None: @@ -79,5 +90,11 @@ def set_db_statement(self, serialized_body: bytes) -> None: return elif self.body_strategy == "raw" and self.endpoint_id in SEARCH_ENDPOINTS: self.otel_span.set_attribute( - "db.statement", serialized_body.decode("utf-8") + "db.query.text", serialized_body.decode("utf-8") ) + + def set_db_response(self, status_code: int) -> None: + if self.otel_span is None: + return + + self.otel_span.set_attribute("db.response.status_code", str(status_code)) diff --git a/elastic_transport/_transport.py b/elastic_transport/_transport.py index 3219e52c..32fb9e0f 100644 --- a/elastic_transport/_transport.py +++ b/elastic_transport/_transport.py @@ -338,7 +338,9 @@ def perform_request( # type: ignore[return] node = self.node_pool.get() start_time = time.time() try: - otel_span.set_node_metadata(node.host, node.port, node.base_url, target) + otel_span.set_node_metadata( + node.host, node.port, node.base_url, target, method + ) resp = node.perform_request( method, target, @@ -442,6 +444,7 @@ def perform_request( # type: ignore[return] # We either got a response we're happy with or # we've exhausted all of our retries so we return it. if not retry or attempt >= max_retries: + otel_span.set_db_response(resp.meta.status) return TransportApiResponse(resp.meta, body) else: _logger.warning( diff --git a/tests/async_/test_async_transport.py b/tests/async_/test_async_transport.py index f541b16d..24a869ce 100644 --- a/tests/async_/test_async_transport.py +++ b/tests/async_/test_async_transport.py @@ -393,7 +393,6 @@ async def test_sniff_on_start(): calls = [] def sniff_callback(*args): - nonlocal calls calls.append(args) return [NodeConfig("http", "localhost", 80)] @@ -420,7 +419,6 @@ async def test_sniff_before_requests(): calls = [] def sniff_callback(*args): - nonlocal calls calls.append(args) return [] @@ -446,7 +444,6 @@ async def test_sniff_on_node_failure(): calls = [] def sniff_callback(*args): - nonlocal calls calls.append(args) return [] @@ -551,7 +548,7 @@ async def test_sniffed_nodes_added_to_pool(async_sniff_callback): if async_sniff_callback: async def sniff_callback(*_): - nonlocal loop, sniffed_at + nonlocal sniffed_at await asyncio.sleep(0.1) sniffed_at = loop.time() return sniffed_nodes @@ -559,7 +556,7 @@ async def sniff_callback(*_): else: def sniff_callback(*_): - nonlocal loop, sniffed_at + nonlocal sniffed_at time.sleep(0.1) sniffed_at = loop.time() return sniffed_nodes diff --git a/tests/test_node_pool.py b/tests/test_node_pool.py index 22160155..49b14ab9 100644 --- a/tests/test_node_pool.py +++ b/tests/test_node_pool.py @@ -245,8 +245,6 @@ def __init__(self): self.nodes_gotten = 0 def run(self) -> None: - nonlocal pool - while time.time() < start + 2: node = pool.get() self.nodes_gotten += 1 diff --git a/tests/test_otel.py b/tests/test_otel.py index e1666ed0..2b580a13 100644 --- a/tests/test_otel.py +++ b/tests/test_otel.py @@ -41,6 +41,7 @@ def test_no_span(): 9200, "http://localhost:9200/", "_ml/anomaly_detectors/my-job/_open", + "POST", ) span.set_elastic_cloud_metadata( { @@ -65,6 +66,7 @@ def test_detailed_span(): 9200, "http://localhost:9200/", "_ml/anomaly_detectors/my-job/_open", + "POST", ) span.set_elastic_cloud_metadata( { @@ -72,16 +74,22 @@ def test_detailed_span(): "X-Found-Handling-Instance": "instance-0000000001", } ) + span.set_db_response(202) spans = memory_exporter.get_finished_spans() assert len(spans) == 1 assert spans[0].name == "ml.open_job" assert spans[0].attributes == { + "db.system.name": "elasticsearch", "url.full": "http://localhost:9200/_ml/anomaly_detectors/my-job/_open", + "http.request.method": "POST", "server.address": "localhost", "server.port": 9200, - "db.elasticsearch.cluster.name": "e9106fc68e3044f0b1475b04bf4ffd5f", - "db.elasticsearch.node.name": "instance-0000000001", + "db.operation.name": "my-job/_open", + "http.request.method": "POST", + "db.namespace": "e9106fc68e3044f0b1475b04bf4ffd5f", + "elasticsearch.node.name": "instance-0000000001", + "db.response.status_code": "202", } @@ -95,5 +103,7 @@ def test_db_statement(): assert len(spans) == 1 assert spans[0].name == "search" assert spans[0].attributes == { - "db.statement": '{"query":{"match_all":{}}}', + "db.system.name": "elasticsearch", + "db.operation.name": "search", + "db.query.text": '{"query":{"match_all":{}}}', } diff --git a/tests/test_transport.py b/tests/test_transport.py index 07d063d2..08d8e04f 100644 --- a/tests/test_transport.py +++ b/tests/test_transport.py @@ -434,7 +434,6 @@ def test_sniff_on_start(): calls = [] def sniff_callback(*args): - nonlocal calls calls.append(args) return [NodeConfig("http", "localhost", 80)] @@ -458,7 +457,6 @@ def test_sniff_before_requests(): calls = [] def sniff_callback(*args): - nonlocal calls calls.append(args) return [] @@ -482,7 +480,6 @@ def test_sniff_on_node_failure(): calls = [] def sniff_callback(*args): - nonlocal calls calls.append(args) return [] @@ -655,8 +652,6 @@ def __init__(self): self.successful_requests = 0 def run(self) -> None: - nonlocal t, start - while time.time() < start + 2: t.perform_request("GET", "/") self.successful_requests += 1 diff --git a/utils/build-dists.py b/utils/build-dists.py index e5320faf..4217db72 100644 --- a/utils/build-dists.py +++ b/utils/build-dists.py @@ -54,7 +54,6 @@ def set_tmp_dir(): def run(argv, expect_exit_code=0): - global tmp_dir if tmp_dir is None: os.chdir(base_dir) else: