From 0dd652e2a6e4ee5bd33fcc1ac51eb89ce85aec90 Mon Sep 17 00:00:00 2001 From: Riccardo Magliocchetti Date: Wed, 16 Oct 2024 16:09:14 +0200 Subject: [PATCH 01/18] httpx: rewrote patching to use wrapt instead of subclassing client --- .../instrumentation/httpx/__init__.py | 298 ++++++++++++------ .../tests/test_httpx_integration.py | 30 +- 2 files changed, 214 insertions(+), 114 deletions(-) diff --git a/instrumentation/opentelemetry-instrumentation-httpx/src/opentelemetry/instrumentation/httpx/__init__.py b/instrumentation/opentelemetry-instrumentation-httpx/src/opentelemetry/instrumentation/httpx/__init__.py index b9b9a31d3e..8e27f849f5 100644 --- a/instrumentation/opentelemetry-instrumentation-httpx/src/opentelemetry/instrumentation/httpx/__init__.py +++ b/instrumentation/opentelemetry-instrumentation-httpx/src/opentelemetry/instrumentation/httpx/__init__.py @@ -193,10 +193,10 @@ async def async_response_hook(span, request, response): import logging import typing -from asyncio import iscoroutinefunction from types import TracebackType import httpx +from wrapt import wrap_function_wrapper from opentelemetry.instrumentation._semconv import ( _get_schema_url, @@ -217,6 +217,7 @@ async def async_response_hook(span, request, response): from opentelemetry.instrumentation.utils import ( http_status_to_status_code, is_http_instrumentation_enabled, + unwrap, ) from opentelemetry.propagate import inject from opentelemetry.semconv.attributes.error_attributes import ERROR_TYPE @@ -731,44 +732,183 @@ def _instrument(self, **kwargs): ``async_request_hook``: Async ``request_hook`` for ``httpx.AsyncClient`` ``async_response_hook``: Async``response_hook`` for ``httpx.AsyncClient`` """ - self._original_client = httpx.Client - self._original_async_client = httpx.AsyncClient - request_hook = kwargs.get("request_hook") - response_hook = kwargs.get("response_hook") - async_request_hook = kwargs.get("async_request_hook") - async_response_hook = kwargs.get("async_response_hook") - if callable(request_hook): - _InstrumentedClient._request_hook = request_hook - if callable(async_request_hook) and iscoroutinefunction( - async_request_hook - ): - _InstrumentedAsyncClient._request_hook = async_request_hook - if callable(response_hook): - _InstrumentedClient._response_hook = response_hook - if callable(async_response_hook) and iscoroutinefunction( - async_response_hook - ): - _InstrumentedAsyncClient._response_hook = async_response_hook tracer_provider = kwargs.get("tracer_provider") - _InstrumentedClient._tracer_provider = tracer_provider - _InstrumentedAsyncClient._tracer_provider = tracer_provider - # Intentionally using a private attribute here, see: - # https://github.com/open-telemetry/opentelemetry-python-contrib/pull/2538#discussion_r1610603719 - httpx.Client = httpx._api.Client = _InstrumentedClient - httpx.AsyncClient = _InstrumentedAsyncClient + self._request_hook = kwargs.get("request_hook") + self._response_hook = kwargs.get("response_hook") + self._async_request_hook = kwargs.get("async_request_hook") + self._async_response_hook = kwargs.get("async_response_hook") + + if getattr(self, "__instrumented", False): + print("already instrumented") + return + + _OpenTelemetrySemanticConventionStability._initialize() + self._sem_conv_opt_in_mode = _OpenTelemetrySemanticConventionStability._get_opentelemetry_stability_opt_in_mode( + _OpenTelemetryStabilitySignalType.HTTP, + ) + self._tracer = get_tracer( + __name__, + instrumenting_library_version=__version__, + tracer_provider=tracer_provider, + schema_url=_get_schema_url(self._sem_conv_opt_in_mode), + ) + + wrap_function_wrapper( + "httpx", + "HTTPTransport.handle_request", + self._handle_request_wrapper, + ) + wrap_function_wrapper( + "httpx", + "AsyncHTTPTransport.handle_async_request", + self._handle_async_request_wrapper, + ) + + self.__instrumented = True def _uninstrument(self, **kwargs): - httpx.Client = httpx._api.Client = self._original_client - httpx.AsyncClient = self._original_async_client - _InstrumentedClient._tracer_provider = None - _InstrumentedClient._request_hook = None - _InstrumentedClient._response_hook = None - _InstrumentedAsyncClient._tracer_provider = None - _InstrumentedAsyncClient._request_hook = None - _InstrumentedAsyncClient._response_hook = None + import httpx + + unwrap(httpx.HTTPTransport, "handle_request") + unwrap(httpx.AsyncHTTPTransport, "handle_async_request") + + def _handle_request_wrapper(self, wrapped, instance, args, kwargs): + if not is_http_instrumentation_enabled(): + return wrapped(*args, **kwargs) + + method, url, headers, stream, extensions = _extract_parameters( + args, kwargs + ) + method_original = method.decode() + span_name = _get_default_span_name(method_original) + span_attributes = {} + # apply http client response attributes according to semconv + _apply_request_client_attributes_to_span( + span_attributes, + url, + method_original, + self._sem_conv_opt_in_mode, + ) + + request_info = RequestInfo(method, url, headers, stream, extensions) + + with self._tracer.start_as_current_span( + span_name, kind=SpanKind.CLIENT, attributes=span_attributes + ) as span: + exception = None + if callable(self._request_hook): + self._request_hook(span, request_info) + + _inject_propagation_headers(headers, args, kwargs) + + try: + response = wrapped(*args, **kwargs) + except Exception as exc: # pylint: disable=W0703 + exception = exc + response = getattr(exc, "response", None) + + if isinstance(response, (httpx.Response, tuple)): + status_code, headers, stream, extensions, http_version = ( + _extract_response(response) + ) + + if span.is_recording(): + # apply http client response attributes according to semconv + _apply_response_client_attributes_to_span( + span, + status_code, + http_version, + self._sem_conv_opt_in_mode, + ) + if callable(self._response_hook): + self._response_hook( + span, + request_info, + ResponseInfo(status_code, headers, stream, extensions), + ) + + if exception: + if span.is_recording() and _report_new( + self._sem_conv_opt_in_mode + ): + span.set_attribute( + ERROR_TYPE, type(exception).__qualname__ + ) + raise exception.with_traceback(exception.__traceback__) + + return response + + async def _handle_async_request_wrapper( + self, wrapped, instance, args, kwargs + ): + if not is_http_instrumentation_enabled(): + return await wrapped(*args, **kwargs) + + method, url, headers, stream, extensions = _extract_parameters( + args, kwargs + ) + method_original = method.decode() + span_name = _get_default_span_name(method_original) + span_attributes = {} + # apply http client response attributes according to semconv + _apply_request_client_attributes_to_span( + span_attributes, + url, + method_original, + self._sem_conv_opt_in_mode, + ) + + request_info = RequestInfo(method, url, headers, stream, extensions) + + with self._tracer.start_as_current_span( + span_name, kind=SpanKind.CLIENT, attributes=span_attributes + ) as span: + exception = None + if callable(self._async_request_hook): + await self._async_request_hook(span, request_info) + + _inject_propagation_headers(headers, args, kwargs) + + try: + response = await wrapped(*args, **kwargs) + except Exception as exc: # pylint: disable=W0703 + exception = exc + response = getattr(exc, "response", None) + + if isinstance(response, (httpx.Response, tuple)): + status_code, headers, stream, extensions, http_version = ( + _extract_response(response) + ) + + if span.is_recording(): + # apply http client response attributes according to semconv + _apply_response_client_attributes_to_span( + span, + status_code, + http_version, + self._sem_conv_opt_in_mode, + ) + + if callable(self._async_response_hook): + await self._async_response_hook( + span, + request_info, + ResponseInfo(status_code, headers, stream, extensions), + ) + + if exception: + if span.is_recording() and _report_new( + self._sem_conv_opt_in_mode + ): + span.set_attribute( + ERROR_TYPE, type(exception).__qualname__ + ) + raise exception.with_traceback(exception.__traceback__) + + return response - @staticmethod def instrument_client( + self, client: typing.Union[httpx.Client, httpx.AsyncClient], tracer_provider: TracerProvider = None, request_hook: typing.Union[ @@ -788,67 +928,27 @@ def instrument_client( response_hook: A hook that receives the span, request, and response that is called right before the span ends """ - # pylint: disable=protected-access - if not hasattr(client, "_is_instrumented_by_opentelemetry"): - client._is_instrumented_by_opentelemetry = False - if not client._is_instrumented_by_opentelemetry: - if isinstance(client, httpx.Client): - client._original_transport = client._transport - client._original_mounts = client._mounts.copy() - transport = client._transport or httpx.HTTPTransport() - client._transport = SyncOpenTelemetryTransport( - transport, - tracer_provider=tracer_provider, - request_hook=request_hook, - response_hook=response_hook, - ) - client._is_instrumented_by_opentelemetry = True - client._mounts.update( - { - url_pattern: ( - SyncOpenTelemetryTransport( - transport, - tracer_provider=tracer_provider, - request_hook=request_hook, - response_hook=response_hook, - ) - if transport is not None - else transport - ) - for url_pattern, transport in client._original_mounts.items() - } - ) - - if isinstance(client, httpx.AsyncClient): - transport = client._transport or httpx.AsyncHTTPTransport() - client._original_mounts = client._mounts.copy() - client._transport = AsyncOpenTelemetryTransport( - transport, - tracer_provider=tracer_provider, - request_hook=request_hook, - response_hook=response_hook, - ) - client._is_instrumented_by_opentelemetry = True - client._mounts.update( - { - url_pattern: ( - AsyncOpenTelemetryTransport( - transport, - tracer_provider=tracer_provider, - request_hook=request_hook, - response_hook=response_hook, - ) - if transport is not None - else transport - ) - for url_pattern, transport in client._original_mounts.items() - } - ) - else: + if getattr(client, "_is_instrumented_by_opentelemetry", False): _logger.warning( "Attempting to instrument Httpx client while already instrumented" ) + return + + if hasattr(client._transport, "handle_request"): + wrap_function_wrapper( + client._transport, + "handle_request", + self._handle_request_wrapper, + ) + client._is_instrumented_by_opentelemetry = True + if hasattr(client._transport, "handle_async_request"): + wrap_function_wrapper( + client._transport, + "handle_async_request", + self._handle_async_request_wrapper, + ) + client._is_instrumented_by_opentelemetry = True @staticmethod def uninstrument_client( @@ -859,15 +959,9 @@ def uninstrument_client( Args: client: The httpx Client or AsyncClient instance """ - if hasattr(client, "_original_transport"): - client._transport = client._original_transport - del client._original_transport + if hasattr(client._transport, "handle_request"): + unwrap(client._transport, "handle_request") + client._is_instrumented_by_opentelemetry = False + elif hasattr(client._transport, "handle_async_request"): + unwrap(client._transport, "handle_async_request") client._is_instrumented_by_opentelemetry = False - if hasattr(client, "_original_mounts"): - client._mounts = client._original_mounts.copy() - del client._original_mounts - else: - _logger.warning( - "Attempting to uninstrument Httpx " - "client while already uninstrumented" - ) diff --git a/instrumentation/opentelemetry-instrumentation-httpx/tests/test_httpx_integration.py b/instrumentation/opentelemetry-instrumentation-httpx/tests/test_httpx_integration.py index 0d055515e0..ea9ddf5f21 100644 --- a/instrumentation/opentelemetry-instrumentation-httpx/tests/test_httpx_integration.py +++ b/instrumentation/opentelemetry-instrumentation-httpx/tests/test_httpx_integration.py @@ -166,11 +166,14 @@ def setUp(self): ) ) + HTTPXClientInstrumentor().instrument() + # pylint: disable=invalid-name def tearDown(self): super().tearDown() self.env_patch.stop() respx.stop() + HTTPXClientInstrumentor().uninstrument() def assert_span( self, exporter: "SpanExporter" = None, num_spans: int = 1 @@ -743,6 +746,8 @@ def setUp(self): super().setUp() HTTPXClientInstrumentor().instrument() self.client = self.create_client() + + def tearDown(self): HTTPXClientInstrumentor().uninstrument() def create_proxy_mounts(self): @@ -769,6 +774,7 @@ def test_custom_tracer_provider(self): result = self.create_tracer_provider(resource=resource) tracer_provider, exporter = result + HTTPXClientInstrumentor().uninstrument() HTTPXClientInstrumentor().instrument( tracer_provider=tracer_provider ) @@ -787,6 +793,7 @@ def test_response_hook(self): else "response_hook" ) response_hook_kwargs = {response_hook_key: self.response_hook} + HTTPXClientInstrumentor().uninstrument() HTTPXClientInstrumentor().instrument( tracer_provider=self.tracer_provider, **response_hook_kwargs, @@ -808,6 +815,7 @@ def test_response_hook(self): HTTPXClientInstrumentor().uninstrument() def test_response_hook_sync_async_kwargs(self): + HTTPXClientInstrumentor().uninstrument() HTTPXClientInstrumentor().instrument( tracer_provider=self.tracer_provider, response_hook=_response_hook, @@ -819,7 +827,7 @@ def test_response_hook_sync_async_kwargs(self): self.assertEqual(result.text, "Hello!") span = self.assert_span() self.assertEqual( - span.attributes, + dict(span.attributes), { SpanAttributes.HTTP_METHOD: "GET", SpanAttributes.HTTP_URL: self.URL, @@ -836,6 +844,7 @@ def test_request_hook(self): else "request_hook" ) request_hook_kwargs = {request_hook_key: self.request_hook} + HTTPXClientInstrumentor().uninstrument() HTTPXClientInstrumentor().instrument( tracer_provider=self.tracer_provider, **request_hook_kwargs, @@ -849,6 +858,7 @@ def test_request_hook(self): HTTPXClientInstrumentor().uninstrument() def test_request_hook_sync_async_kwargs(self): + HTTPXClientInstrumentor().uninstrument() HTTPXClientInstrumentor().instrument( tracer_provider=self.tracer_provider, request_hook=_request_hook, @@ -863,6 +873,7 @@ def test_request_hook_sync_async_kwargs(self): HTTPXClientInstrumentor().uninstrument() def test_request_hook_no_span_update(self): + HTTPXClientInstrumentor().uninstrument() HTTPXClientInstrumentor().instrument( tracer_provider=self.tracer_provider, request_hook=self.no_update_request_hook, @@ -876,6 +887,7 @@ def test_request_hook_no_span_update(self): HTTPXClientInstrumentor().uninstrument() def test_not_recording(self): + HTTPXClientInstrumentor().uninstrument() with mock.patch("opentelemetry.trace.INVALID_SPAN") as mock_span: HTTPXClientInstrumentor().instrument( tracer_provider=trace.NoOpTracerProvider() @@ -894,6 +906,7 @@ def test_not_recording(self): HTTPXClientInstrumentor().uninstrument() def test_suppress_instrumentation_new_client(self): + HTTPXClientInstrumentor().uninstrument() HTTPXClientInstrumentor().instrument() with suppress_http_instrumentation(): client = self.create_client() @@ -904,6 +917,7 @@ def test_suppress_instrumentation_new_client(self): HTTPXClientInstrumentor().uninstrument() def test_instrument_client(self): + HTTPXClientInstrumentor().uninstrument() client = self.create_client() HTTPXClientInstrumentor().instrument_client(client) result = self.perform_request(self.URL, client=client) @@ -929,10 +943,7 @@ def test_instrumentation_without_client(self): self.URL, ) - HTTPXClientInstrumentor().uninstrument() - def test_uninstrument(self): - HTTPXClientInstrumentor().instrument() HTTPXClientInstrumentor().uninstrument() client = self.create_client() result = self.perform_request(self.URL, client=client) @@ -942,6 +953,7 @@ def test_uninstrument(self): self.assert_span(num_spans=0) def test_uninstrument_client(self): + HTTPXClientInstrumentor().uninstrument() HTTPXClientInstrumentor().uninstrument_client(self.client) result = self.perform_request(self.URL) @@ -950,7 +962,6 @@ def test_uninstrument_client(self): self.assert_span(num_spans=0) def test_uninstrument_new_client(self): - HTTPXClientInstrumentor().instrument() client1 = self.create_client() HTTPXClientInstrumentor().uninstrument_client(client1) @@ -973,7 +984,6 @@ def test_uninstrument_new_client(self): def test_instrument_proxy(self): proxy_mounts = self.create_proxy_mounts() - HTTPXClientInstrumentor().instrument() client = self.create_client(mounts=proxy_mounts) self.perform_request(self.URL, client=client) self.assert_span(num_spans=1) @@ -982,9 +992,9 @@ def test_instrument_proxy(self): 2, (SyncOpenTelemetryTransport, AsyncOpenTelemetryTransport), ) - HTTPXClientInstrumentor().uninstrument() def test_instrument_client_with_proxy(self): + HTTPXClientInstrumentor().uninstrument() proxy_mounts = self.create_proxy_mounts() client = self.create_client(mounts=proxy_mounts) self.assert_proxy_mounts( @@ -1005,7 +1015,6 @@ def test_instrument_client_with_proxy(self): def test_uninstrument_client_with_proxy(self): proxy_mounts = self.create_proxy_mounts() - HTTPXClientInstrumentor().instrument() client = self.create_client(mounts=proxy_mounts) self.assert_proxy_mounts( client._mounts.values(), @@ -1068,7 +1077,7 @@ def create_client( transport: typing.Optional[SyncOpenTelemetryTransport] = None, **kwargs, ): - return httpx.Client(transport=transport, **kwargs) + return httpx.Client(**kwargs) def perform_request( self, @@ -1188,10 +1197,7 @@ class TestAsyncInstrumentationIntegration(BaseTestCases.BaseInstrumentorTest): def setUp(self): super().setUp() - HTTPXClientInstrumentor().instrument() - self.client = self.create_client() self.client2 = self.create_client() - HTTPXClientInstrumentor().uninstrument() def create_client( self, From 06bc9b51ccca0c4601527a46b5ea7e9b64ea170e Mon Sep 17 00:00:00 2001 From: Riccardo Magliocchetti Date: Wed, 16 Oct 2024 16:40:46 +0200 Subject: [PATCH 02/18] Remove dead code --- .../src/opentelemetry/instrumentation/httpx/__init__.py | 6 ------ 1 file changed, 6 deletions(-) diff --git a/instrumentation/opentelemetry-instrumentation-httpx/src/opentelemetry/instrumentation/httpx/__init__.py b/instrumentation/opentelemetry-instrumentation-httpx/src/opentelemetry/instrumentation/httpx/__init__.py index 8e27f849f5..984eea89b2 100644 --- a/instrumentation/opentelemetry-instrumentation-httpx/src/opentelemetry/instrumentation/httpx/__init__.py +++ b/instrumentation/opentelemetry-instrumentation-httpx/src/opentelemetry/instrumentation/httpx/__init__.py @@ -738,10 +738,6 @@ def _instrument(self, **kwargs): self._async_request_hook = kwargs.get("async_request_hook") self._async_response_hook = kwargs.get("async_response_hook") - if getattr(self, "__instrumented", False): - print("already instrumented") - return - _OpenTelemetrySemanticConventionStability._initialize() self._sem_conv_opt_in_mode = _OpenTelemetrySemanticConventionStability._get_opentelemetry_stability_opt_in_mode( _OpenTelemetryStabilitySignalType.HTTP, @@ -764,8 +760,6 @@ def _instrument(self, **kwargs): self._handle_async_request_wrapper, ) - self.__instrumented = True - def _uninstrument(self, **kwargs): import httpx From cc7b97eb49fef1827034d0ef625a2bc79c848b4b Mon Sep 17 00:00:00 2001 From: Riccardo Magliocchetti Date: Thu, 17 Oct 2024 15:40:21 +0200 Subject: [PATCH 03/18] Handle hooks in instrument_client --- .../instrumentation/httpx/__init__.py | 15 +++++++++++++++ 1 file changed, 15 insertions(+) diff --git a/instrumentation/opentelemetry-instrumentation-httpx/src/opentelemetry/instrumentation/httpx/__init__.py b/instrumentation/opentelemetry-instrumentation-httpx/src/opentelemetry/instrumentation/httpx/__init__.py index 984eea89b2..42b4d3a6f7 100644 --- a/instrumentation/opentelemetry-instrumentation-httpx/src/opentelemetry/instrumentation/httpx/__init__.py +++ b/instrumentation/opentelemetry-instrumentation-httpx/src/opentelemetry/instrumentation/httpx/__init__.py @@ -193,6 +193,7 @@ async def async_response_hook(span, request, response): import logging import typing +from asyncio import iscoroutinefunction from types import TracebackType import httpx @@ -929,6 +930,20 @@ def instrument_client( ) return + if iscoroutinefunction(request_hook): + self._async_request_hook = request_hook + self._request_hook = None + else: + self._request_hook = request_hook + self._async_request_hook = None + + if iscoroutinefunction(response_hook): + self._async_response_hook = response_hook + self._response_hook = None + else: + self._response_hook = response_hook + self._async_response_hook = None + if hasattr(client._transport, "handle_request"): wrap_function_wrapper( client._transport, From f0de350894f82e3a56930083fb0d817893560704 Mon Sep 17 00:00:00 2001 From: Riccardo Magliocchetti Date: Fri, 18 Oct 2024 14:09:55 +0200 Subject: [PATCH 04/18] Update some test helper --- .../tests/test_httpx_integration.py | 51 +++++++++++++++---- 1 file changed, 42 insertions(+), 9 deletions(-) diff --git a/instrumentation/opentelemetry-instrumentation-httpx/tests/test_httpx_integration.py b/instrumentation/opentelemetry-instrumentation-httpx/tests/test_httpx_integration.py index ea9ddf5f21..eb58bf03ed 100644 --- a/instrumentation/opentelemetry-instrumentation-httpx/tests/test_httpx_integration.py +++ b/instrumentation/opentelemetry-instrumentation-httpx/tests/test_httpx_integration.py @@ -21,6 +21,7 @@ import httpx import respx +from wrapt import ObjectProxy import opentelemetry.instrumentation.httpx from opentelemetry import trace @@ -168,6 +169,10 @@ def setUp(self): HTTPXClientInstrumentor().instrument() + def print_spans(self, spans): + for span in spans: + print(span.name, span.attributes) + # pylint: disable=invalid-name def tearDown(self): super().tearDown() @@ -181,7 +186,9 @@ def assert_span( if exporter is None: exporter = self.memory_exporter span_list = exporter.get_finished_spans() - self.assertEqual(num_spans, len(span_list)) + self.assertEqual( + num_spans, len(span_list), self.print_spans(span_list) + ) if num_spans == 0: return None if num_spans == 1: @@ -760,14 +767,25 @@ def create_proxy_mounts(self): ), } - def assert_proxy_mounts(self, mounts, num_mounts, transport_type): + def assert_proxy_mounts(self, mounts, num_mounts, transport_type=None): self.assertEqual(len(mounts), num_mounts) for transport in mounts: with self.subTest(transport): - self.assertIsInstance( - transport, - transport_type, - ) + if transport_type: + self.assertIsInstance( + transport, + transport_type, + ) + else: + handler = getattr(transport, "handle_request", None) + if not handler: + handler = getattr( + transport, "handle_async_request" + ) + self.assertTrue( + isinstance(handler, ObjectProxy) + and getattr(handler, "__wrapped__") + ) def test_custom_tracer_provider(self): resource = resources.Resource.create({}) @@ -990,9 +1008,25 @@ def test_instrument_proxy(self): self.assert_proxy_mounts( client._mounts.values(), 2, - (SyncOpenTelemetryTransport, AsyncOpenTelemetryTransport), ) + def print_handler(self, client): + transport = client._transport + handler = getattr( + transport, + "handle_request", + getattr(transport, "handle_async_request", None), + ) + print( + handler, + ( + getattr(handler, "__wrapped__", "no wrapped") + if handler + else "no handler" + ), + ) + return handler + def test_instrument_client_with_proxy(self): HTTPXClientInstrumentor().uninstrument() proxy_mounts = self.create_proxy_mounts() @@ -1009,7 +1043,6 @@ def test_instrument_client_with_proxy(self): self.assert_proxy_mounts( client._mounts.values(), 2, - (SyncOpenTelemetryTransport, AsyncOpenTelemetryTransport), ) HTTPXClientInstrumentor().uninstrument_client(client) @@ -1019,13 +1052,13 @@ def test_uninstrument_client_with_proxy(self): self.assert_proxy_mounts( client._mounts.values(), 2, - (SyncOpenTelemetryTransport, AsyncOpenTelemetryTransport), ) HTTPXClientInstrumentor().uninstrument_client(client) result = self.perform_request(self.URL, client=client) self.assertEqual(result.text, "Hello!") + # FIXME: this does fail if uninstrument() has been called before and is a change of behaviour from before self.assert_span(num_spans=0) self.assert_proxy_mounts( client._mounts.values(), From b15b83cb8423112178c586270279c9d470fbc25d Mon Sep 17 00:00:00 2001 From: Riccardo Magliocchetti Date: Tue, 22 Oct 2024 15:44:35 +0200 Subject: [PATCH 05/18] green tests \o/ --- .../instrumentation/httpx/__init__.py | 49 +++++++++++++++++-- .../tests/test_httpx_integration.py | 35 ++++--------- 2 files changed, 54 insertions(+), 30 deletions(-) diff --git a/instrumentation/opentelemetry-instrumentation-httpx/src/opentelemetry/instrumentation/httpx/__init__.py b/instrumentation/opentelemetry-instrumentation-httpx/src/opentelemetry/instrumentation/httpx/__init__.py index 42b4d3a6f7..8e7357aac2 100644 --- a/instrumentation/opentelemetry-instrumentation-httpx/src/opentelemetry/instrumentation/httpx/__init__.py +++ b/instrumentation/opentelemetry-instrumentation-httpx/src/opentelemetry/instrumentation/httpx/__init__.py @@ -736,8 +736,18 @@ def _instrument(self, **kwargs): tracer_provider = kwargs.get("tracer_provider") self._request_hook = kwargs.get("request_hook") self._response_hook = kwargs.get("response_hook") - self._async_request_hook = kwargs.get("async_request_hook") - self._async_response_hook = kwargs.get("async_response_hook") + _async_request_hook = kwargs.get("async_request_hook") + self._async_request_hook = ( + _async_request_hook + if iscoroutinefunction(_async_request_hook) + else None + ) + _async_response_hook = kwargs.get("async_response_hook") + self._async_response_hook = ( + _async_response_hook + if iscoroutinefunction(_async_response_hook) + else None + ) _OpenTelemetrySemanticConventionStability._initialize() self._sem_conv_opt_in_mode = _OpenTelemetrySemanticConventionStability._get_opentelemetry_stability_opt_in_mode( @@ -829,7 +839,7 @@ def _handle_request_wrapper(self, wrapped, instance, args, kwargs): span.set_attribute( ERROR_TYPE, type(exception).__qualname__ ) - raise exception.with_traceback(exception.__traceback__) + raise exception return response @@ -898,7 +908,7 @@ async def _handle_async_request_wrapper( span.set_attribute( ERROR_TYPE, type(exception).__qualname__ ) - raise exception.with_traceback(exception.__traceback__) + raise exception return response @@ -930,6 +940,19 @@ def instrument_client( ) return + # FIXME: sharing state in the instrumentor instance maybe it's not that great, need to pass tracer and semconv to each + # instance separately + _OpenTelemetrySemanticConventionStability._initialize() + self._sem_conv_opt_in_mode = _OpenTelemetrySemanticConventionStability._get_opentelemetry_stability_opt_in_mode( + _OpenTelemetryStabilitySignalType.HTTP, + ) + self._tracer = get_tracer( + __name__, + instrumenting_library_version=__version__, + tracer_provider=tracer_provider, + schema_url=_get_schema_url(self._sem_conv_opt_in_mode), + ) + if iscoroutinefunction(request_hook): self._async_request_hook = request_hook self._request_hook = None @@ -950,6 +973,13 @@ def instrument_client( "handle_request", self._handle_request_wrapper, ) + for transport in client._mounts.values(): + # FIXME: check it's not wrapped already? + wrap_function_wrapper( + transport, + "handle_request", + self._handle_request_wrapper, + ) client._is_instrumented_by_opentelemetry = True if hasattr(client._transport, "handle_async_request"): wrap_function_wrapper( @@ -957,6 +987,13 @@ def instrument_client( "handle_async_request", self._handle_async_request_wrapper, ) + for transport in client._mounts.values(): + # FIXME: check it's not wrapped already? + wrap_function_wrapper( + transport, + "handle_async_request", + self._handle_async_request_wrapper, + ) client._is_instrumented_by_opentelemetry = True @staticmethod @@ -970,7 +1007,11 @@ def uninstrument_client( """ if hasattr(client._transport, "handle_request"): unwrap(client._transport, "handle_request") + for transport in client._mounts.values(): + unwrap(transport, "handle_request") client._is_instrumented_by_opentelemetry = False elif hasattr(client._transport, "handle_async_request"): unwrap(client._transport, "handle_async_request") + for transport in client._mounts.values(): + unwrap(transport, "handle_async_request") client._is_instrumented_by_opentelemetry = False diff --git a/instrumentation/opentelemetry-instrumentation-httpx/tests/test_httpx_integration.py b/instrumentation/opentelemetry-instrumentation-httpx/tests/test_httpx_integration.py index eb58bf03ed..bf7f6caeae 100644 --- a/instrumentation/opentelemetry-instrumentation-httpx/tests/test_httpx_integration.py +++ b/instrumentation/opentelemetry-instrumentation-httpx/tests/test_httpx_integration.py @@ -167,8 +167,6 @@ def setUp(self): ) ) - HTTPXClientInstrumentor().instrument() - def print_spans(self, spans): for span in spans: print(span.name, span.attributes) @@ -751,8 +749,9 @@ def create_proxy_transport(self, url: str): def setUp(self): super().setUp() - HTTPXClientInstrumentor().instrument() self.client = self.create_client() + # FIXME: calling instrument() instead fixes 13*2 tests :( + HTTPXClientInstrumentor().instrument_client(self.client) def tearDown(self): HTTPXClientInstrumentor().uninstrument() @@ -792,7 +791,6 @@ def test_custom_tracer_provider(self): result = self.create_tracer_provider(resource=resource) tracer_provider, exporter = result - HTTPXClientInstrumentor().uninstrument() HTTPXClientInstrumentor().instrument( tracer_provider=tracer_provider ) @@ -802,7 +800,6 @@ def test_custom_tracer_provider(self): self.assertEqual(result.text, "Hello!") span = self.assert_span(exporter=exporter) self.assertIs(span.resource, resource) - HTTPXClientInstrumentor().uninstrument() def test_response_hook(self): response_hook_key = ( @@ -811,7 +808,6 @@ def test_response_hook(self): else "response_hook" ) response_hook_kwargs = {response_hook_key: self.response_hook} - HTTPXClientInstrumentor().uninstrument() HTTPXClientInstrumentor().instrument( tracer_provider=self.tracer_provider, **response_hook_kwargs, @@ -830,10 +826,8 @@ def test_response_hook(self): HTTP_RESPONSE_BODY: "Hello!", }, ) - HTTPXClientInstrumentor().uninstrument() def test_response_hook_sync_async_kwargs(self): - HTTPXClientInstrumentor().uninstrument() HTTPXClientInstrumentor().instrument( tracer_provider=self.tracer_provider, response_hook=_response_hook, @@ -845,7 +839,7 @@ def test_response_hook_sync_async_kwargs(self): self.assertEqual(result.text, "Hello!") span = self.assert_span() self.assertEqual( - dict(span.attributes), + span.attributes, { SpanAttributes.HTTP_METHOD: "GET", SpanAttributes.HTTP_URL: self.URL, @@ -853,7 +847,6 @@ def test_response_hook_sync_async_kwargs(self): HTTP_RESPONSE_BODY: "Hello!", }, ) - HTTPXClientInstrumentor().uninstrument() def test_request_hook(self): request_hook_key = ( @@ -862,7 +855,6 @@ def test_request_hook(self): else "request_hook" ) request_hook_kwargs = {request_hook_key: self.request_hook} - HTTPXClientInstrumentor().uninstrument() HTTPXClientInstrumentor().instrument( tracer_provider=self.tracer_provider, **request_hook_kwargs, @@ -873,10 +865,8 @@ def test_request_hook(self): self.assertEqual(result.text, "Hello!") span = self.assert_span() self.assertEqual(span.name, "GET" + self.URL) - HTTPXClientInstrumentor().uninstrument() def test_request_hook_sync_async_kwargs(self): - HTTPXClientInstrumentor().uninstrument() HTTPXClientInstrumentor().instrument( tracer_provider=self.tracer_provider, request_hook=_request_hook, @@ -888,10 +878,8 @@ def test_request_hook_sync_async_kwargs(self): self.assertEqual(result.text, "Hello!") span = self.assert_span() self.assertEqual(span.name, "GET" + self.URL) - HTTPXClientInstrumentor().uninstrument() def test_request_hook_no_span_update(self): - HTTPXClientInstrumentor().uninstrument() HTTPXClientInstrumentor().instrument( tracer_provider=self.tracer_provider, request_hook=self.no_update_request_hook, @@ -902,10 +890,8 @@ def test_request_hook_no_span_update(self): self.assertEqual(result.text, "Hello!") span = self.assert_span() self.assertEqual(span.name, "GET") - HTTPXClientInstrumentor().uninstrument() def test_not_recording(self): - HTTPXClientInstrumentor().uninstrument() with mock.patch("opentelemetry.trace.INVALID_SPAN") as mock_span: HTTPXClientInstrumentor().instrument( tracer_provider=trace.NoOpTracerProvider() @@ -921,10 +907,8 @@ def test_not_recording(self): self.assertTrue(mock_span.is_recording.called) self.assertFalse(mock_span.set_attribute.called) self.assertFalse(mock_span.set_status.called) - HTTPXClientInstrumentor().uninstrument() def test_suppress_instrumentation_new_client(self): - HTTPXClientInstrumentor().uninstrument() HTTPXClientInstrumentor().instrument() with suppress_http_instrumentation(): client = self.create_client() @@ -932,10 +916,8 @@ def test_suppress_instrumentation_new_client(self): self.assertEqual(result.text, "Hello!") self.assert_span(num_spans=0) - HTTPXClientInstrumentor().uninstrument() def test_instrument_client(self): - HTTPXClientInstrumentor().uninstrument() client = self.create_client() HTTPXClientInstrumentor().instrument_client(client) result = self.perform_request(self.URL, client=client) @@ -962,6 +944,7 @@ def test_instrumentation_without_client(self): ) def test_uninstrument(self): + HTTPXClientInstrumentor().instrument() HTTPXClientInstrumentor().uninstrument() client = self.create_client() result = self.perform_request(self.URL, client=client) @@ -971,7 +954,6 @@ def test_uninstrument(self): self.assert_span(num_spans=0) def test_uninstrument_client(self): - HTTPXClientInstrumentor().uninstrument() HTTPXClientInstrumentor().uninstrument_client(self.client) result = self.perform_request(self.URL) @@ -980,6 +962,7 @@ def test_uninstrument_client(self): self.assert_span(num_spans=0) def test_uninstrument_new_client(self): + HTTPXClientInstrumentor().instrument() client1 = self.create_client() HTTPXClientInstrumentor().uninstrument_client(client1) @@ -1002,6 +985,7 @@ def test_uninstrument_new_client(self): def test_instrument_proxy(self): proxy_mounts = self.create_proxy_mounts() + HTTPXClientInstrumentor().instrument() client = self.create_client(mounts=proxy_mounts) self.perform_request(self.URL, client=client) self.assert_span(num_spans=1) @@ -1028,7 +1012,6 @@ def print_handler(self, client): return handler def test_instrument_client_with_proxy(self): - HTTPXClientInstrumentor().uninstrument() proxy_mounts = self.create_proxy_mounts() client = self.create_client(mounts=proxy_mounts) self.assert_proxy_mounts( @@ -1048,6 +1031,7 @@ def test_instrument_client_with_proxy(self): def test_uninstrument_client_with_proxy(self): proxy_mounts = self.create_proxy_mounts() + HTTPXClientInstrumentor().instrument() client = self.create_client(mounts=proxy_mounts) self.assert_proxy_mounts( client._mounts.values(), @@ -1110,7 +1094,7 @@ def create_client( transport: typing.Optional[SyncOpenTelemetryTransport] = None, **kwargs, ): - return httpx.Client(**kwargs) + return httpx.Client(transport=transport, **kwargs) def perform_request( self, @@ -1231,6 +1215,7 @@ class TestAsyncInstrumentationIntegration(BaseTestCases.BaseInstrumentorTest): def setUp(self): super().setUp() self.client2 = self.create_client() + HTTPXClientInstrumentor().instrument_client(self.client2) def create_client( self, @@ -1284,7 +1269,6 @@ def test_async_response_hook_does_nothing_if_not_coroutine(self): SpanAttributes.HTTP_STATUS_CODE: 200, }, ) - HTTPXClientInstrumentor().uninstrument() def test_async_request_hook_does_nothing_if_not_coroutine(self): HTTPXClientInstrumentor().instrument( @@ -1297,4 +1281,3 @@ def test_async_request_hook_does_nothing_if_not_coroutine(self): self.assertEqual(result.text, "Hello!") span = self.assert_span() self.assertEqual(span.name, "GET") - HTTPXClientInstrumentor().uninstrument() From 94295ac7c6241d5f3f757350c5c5b6b4cfc89d60 Mon Sep 17 00:00:00 2001 From: Riccardo Magliocchetti Date: Tue, 22 Oct 2024 15:48:18 +0200 Subject: [PATCH 06/18] check that sync hooks are callable --- .../src/opentelemetry/instrumentation/httpx/__init__.py | 8 ++++++-- 1 file changed, 6 insertions(+), 2 deletions(-) diff --git a/instrumentation/opentelemetry-instrumentation-httpx/src/opentelemetry/instrumentation/httpx/__init__.py b/instrumentation/opentelemetry-instrumentation-httpx/src/opentelemetry/instrumentation/httpx/__init__.py index 8e7357aac2..9631e1003c 100644 --- a/instrumentation/opentelemetry-instrumentation-httpx/src/opentelemetry/instrumentation/httpx/__init__.py +++ b/instrumentation/opentelemetry-instrumentation-httpx/src/opentelemetry/instrumentation/httpx/__init__.py @@ -734,8 +734,12 @@ def _instrument(self, **kwargs): ``async_response_hook``: Async``response_hook`` for ``httpx.AsyncClient`` """ tracer_provider = kwargs.get("tracer_provider") - self._request_hook = kwargs.get("request_hook") - self._response_hook = kwargs.get("response_hook") + _request_hook = kwargs.get("request_hook") + self._request_hook = _request_hook if callable(_request_hook) else None + _response_hook = kwargs.get("response_hook") + self._response_hook = ( + _response_hook if callable(_response_hook) else None + ) _async_request_hook = kwargs.get("async_request_hook") self._async_request_hook = ( _async_request_hook From ccffa9b4335dbbd9c96bf1108e28deb6b294be88 Mon Sep 17 00:00:00 2001 From: Riccardo Magliocchetti Date: Tue, 22 Oct 2024 15:50:14 +0200 Subject: [PATCH 07/18] Remove debug helpers --- .../tests/test_httpx_integration.py | 25 +------------------ 1 file changed, 1 insertion(+), 24 deletions(-) diff --git a/instrumentation/opentelemetry-instrumentation-httpx/tests/test_httpx_integration.py b/instrumentation/opentelemetry-instrumentation-httpx/tests/test_httpx_integration.py index bf7f6caeae..58193adff6 100644 --- a/instrumentation/opentelemetry-instrumentation-httpx/tests/test_httpx_integration.py +++ b/instrumentation/opentelemetry-instrumentation-httpx/tests/test_httpx_integration.py @@ -167,10 +167,6 @@ def setUp(self): ) ) - def print_spans(self, spans): - for span in spans: - print(span.name, span.attributes) - # pylint: disable=invalid-name def tearDown(self): super().tearDown() @@ -184,9 +180,7 @@ def assert_span( if exporter is None: exporter = self.memory_exporter span_list = exporter.get_finished_spans() - self.assertEqual( - num_spans, len(span_list), self.print_spans(span_list) - ) + self.assertEqual(num_spans, len(span_list)) if num_spans == 0: return None if num_spans == 1: @@ -994,23 +988,6 @@ def test_instrument_proxy(self): 2, ) - def print_handler(self, client): - transport = client._transport - handler = getattr( - transport, - "handle_request", - getattr(transport, "handle_async_request", None), - ) - print( - handler, - ( - getattr(handler, "__wrapped__", "no wrapped") - if handler - else "no handler" - ), - ) - return handler - def test_instrument_client_with_proxy(self): proxy_mounts = self.create_proxy_mounts() client = self.create_client(mounts=proxy_mounts) From 986f3db52a616a123bbe9011fdf297890018d813 Mon Sep 17 00:00:00 2001 From: Riccardo Magliocchetti Date: Tue, 22 Oct 2024 15:50:59 +0200 Subject: [PATCH 08/18] remove FIXME --- .../tests/test_httpx_integration.py | 1 - 1 file changed, 1 deletion(-) diff --git a/instrumentation/opentelemetry-instrumentation-httpx/tests/test_httpx_integration.py b/instrumentation/opentelemetry-instrumentation-httpx/tests/test_httpx_integration.py index 58193adff6..7e4141eb28 100644 --- a/instrumentation/opentelemetry-instrumentation-httpx/tests/test_httpx_integration.py +++ b/instrumentation/opentelemetry-instrumentation-httpx/tests/test_httpx_integration.py @@ -744,7 +744,6 @@ def create_proxy_transport(self, url: str): def setUp(self): super().setUp() self.client = self.create_client() - # FIXME: calling instrument() instead fixes 13*2 tests :( HTTPXClientInstrumentor().instrument_client(self.client) def tearDown(self): From 3a2706baf86b5b1a37260f071c74844bb675b17b Mon Sep 17 00:00:00 2001 From: Riccardo Magliocchetti Date: Tue, 22 Oct 2024 16:35:42 +0200 Subject: [PATCH 09/18] Reduce use of shared state in the instrumentor class --- .../instrumentation/httpx/__init__.py | 161 +++++++++++------- 1 file changed, 103 insertions(+), 58 deletions(-) diff --git a/instrumentation/opentelemetry-instrumentation-httpx/src/opentelemetry/instrumentation/httpx/__init__.py b/instrumentation/opentelemetry-instrumentation-httpx/src/opentelemetry/instrumentation/httpx/__init__.py index 9631e1003c..9141711891 100644 --- a/instrumentation/opentelemetry-instrumentation-httpx/src/opentelemetry/instrumentation/httpx/__init__.py +++ b/instrumentation/opentelemetry-instrumentation-httpx/src/opentelemetry/instrumentation/httpx/__init__.py @@ -194,6 +194,7 @@ async def async_response_hook(span, request, response): import logging import typing from asyncio import iscoroutinefunction +from functools import partial from types import TracebackType import httpx @@ -734,45 +735,53 @@ def _instrument(self, **kwargs): ``async_response_hook``: Async``response_hook`` for ``httpx.AsyncClient`` """ tracer_provider = kwargs.get("tracer_provider") - _request_hook = kwargs.get("request_hook") - self._request_hook = _request_hook if callable(_request_hook) else None - _response_hook = kwargs.get("response_hook") - self._response_hook = ( - _response_hook if callable(_response_hook) else None - ) - _async_request_hook = kwargs.get("async_request_hook") - self._async_request_hook = ( - _async_request_hook - if iscoroutinefunction(_async_request_hook) + request_hook = kwargs.get("request_hook") + response_hook = kwargs.get("response_hook") + async_request_hook = kwargs.get("async_request_hook") + async_request_hook = ( + async_request_hook + if iscoroutinefunction(async_request_hook) else None ) - _async_response_hook = kwargs.get("async_response_hook") - self._async_response_hook = ( - _async_response_hook - if iscoroutinefunction(_async_response_hook) + async_response_hook = kwargs.get("async_response_hook") + async_response_hook = ( + async_response_hook + if iscoroutinefunction(async_response_hook) else None ) _OpenTelemetrySemanticConventionStability._initialize() - self._sem_conv_opt_in_mode = _OpenTelemetrySemanticConventionStability._get_opentelemetry_stability_opt_in_mode( + sem_conv_opt_in_mode = _OpenTelemetrySemanticConventionStability._get_opentelemetry_stability_opt_in_mode( _OpenTelemetryStabilitySignalType.HTTP, ) - self._tracer = get_tracer( + tracer = get_tracer( __name__, instrumenting_library_version=__version__, tracer_provider=tracer_provider, - schema_url=_get_schema_url(self._sem_conv_opt_in_mode), + schema_url=_get_schema_url(sem_conv_opt_in_mode), ) wrap_function_wrapper( "httpx", "HTTPTransport.handle_request", - self._handle_request_wrapper, + partial( + self._handle_request_wrapper, + tracer=tracer, + sem_conv_opt_in_mode=sem_conv_opt_in_mode, + request_hook=request_hook, + response_hook=response_hook, + ), ) wrap_function_wrapper( "httpx", "AsyncHTTPTransport.handle_async_request", - self._handle_async_request_wrapper, + partial( + self._handle_async_request_wrapper, + tracer=tracer, + sem_conv_opt_in_mode=sem_conv_opt_in_mode, + async_request_hook=async_request_hook, + async_response_hook=async_response_hook, + ), ) def _uninstrument(self, **kwargs): @@ -781,7 +790,17 @@ def _uninstrument(self, **kwargs): unwrap(httpx.HTTPTransport, "handle_request") unwrap(httpx.AsyncHTTPTransport, "handle_async_request") - def _handle_request_wrapper(self, wrapped, instance, args, kwargs): + def _handle_request_wrapper( + self, + wrapped, + instance, + args, + kwargs, + tracer, + sem_conv_opt_in_mode, + request_hook, + response_hook, + ): if not is_http_instrumentation_enabled(): return wrapped(*args, **kwargs) @@ -796,17 +815,17 @@ def _handle_request_wrapper(self, wrapped, instance, args, kwargs): span_attributes, url, method_original, - self._sem_conv_opt_in_mode, + sem_conv_opt_in_mode, ) request_info = RequestInfo(method, url, headers, stream, extensions) - with self._tracer.start_as_current_span( + with tracer.start_as_current_span( span_name, kind=SpanKind.CLIENT, attributes=span_attributes ) as span: exception = None - if callable(self._request_hook): - self._request_hook(span, request_info) + if callable(request_hook): + request_hook(span, request_info) _inject_propagation_headers(headers, args, kwargs) @@ -827,19 +846,17 @@ def _handle_request_wrapper(self, wrapped, instance, args, kwargs): span, status_code, http_version, - self._sem_conv_opt_in_mode, + sem_conv_opt_in_mode, ) - if callable(self._response_hook): - self._response_hook( + if callable(response_hook): + response_hook( span, request_info, ResponseInfo(status_code, headers, stream, extensions), ) if exception: - if span.is_recording() and _report_new( - self._sem_conv_opt_in_mode - ): + if span.is_recording() and _report_new(sem_conv_opt_in_mode): span.set_attribute( ERROR_TYPE, type(exception).__qualname__ ) @@ -848,7 +865,15 @@ def _handle_request_wrapper(self, wrapped, instance, args, kwargs): return response async def _handle_async_request_wrapper( - self, wrapped, instance, args, kwargs + self, + wrapped, + instance, + args, + kwargs, + tracer, + sem_conv_opt_in_mode, + async_request_hook, + async_response_hook, ): if not is_http_instrumentation_enabled(): return await wrapped(*args, **kwargs) @@ -864,17 +889,17 @@ async def _handle_async_request_wrapper( span_attributes, url, method_original, - self._sem_conv_opt_in_mode, + sem_conv_opt_in_mode, ) request_info = RequestInfo(method, url, headers, stream, extensions) - with self._tracer.start_as_current_span( + with tracer.start_as_current_span( span_name, kind=SpanKind.CLIENT, attributes=span_attributes ) as span: exception = None - if callable(self._async_request_hook): - await self._async_request_hook(span, request_info) + if callable(async_request_hook): + await async_request_hook(span, request_info) _inject_propagation_headers(headers, args, kwargs) @@ -895,20 +920,18 @@ async def _handle_async_request_wrapper( span, status_code, http_version, - self._sem_conv_opt_in_mode, + sem_conv_opt_in_mode, ) - if callable(self._async_response_hook): - await self._async_response_hook( + if callable(async_response_hook): + await async_response_hook( span, request_info, ResponseInfo(status_code, headers, stream, extensions), ) if exception: - if span.is_recording() and _report_new( - self._sem_conv_opt_in_mode - ): + if span.is_recording() and _report_new(sem_conv_opt_in_mode): span.set_attribute( ERROR_TYPE, type(exception).__qualname__ ) @@ -944,59 +967,81 @@ def instrument_client( ) return - # FIXME: sharing state in the instrumentor instance maybe it's not that great, need to pass tracer and semconv to each - # instance separately _OpenTelemetrySemanticConventionStability._initialize() - self._sem_conv_opt_in_mode = _OpenTelemetrySemanticConventionStability._get_opentelemetry_stability_opt_in_mode( + sem_conv_opt_in_mode = _OpenTelemetrySemanticConventionStability._get_opentelemetry_stability_opt_in_mode( _OpenTelemetryStabilitySignalType.HTTP, ) - self._tracer = get_tracer( + tracer = get_tracer( __name__, instrumenting_library_version=__version__, tracer_provider=tracer_provider, - schema_url=_get_schema_url(self._sem_conv_opt_in_mode), + schema_url=_get_schema_url(sem_conv_opt_in_mode), ) if iscoroutinefunction(request_hook): - self._async_request_hook = request_hook - self._request_hook = None + async_request_hook = request_hook + request_hook = None else: - self._request_hook = request_hook - self._async_request_hook = None + request_hook = request_hook + async_request_hook = None if iscoroutinefunction(response_hook): - self._async_response_hook = response_hook - self._response_hook = None + async_response_hook = response_hook + response_hook = None else: - self._response_hook = response_hook - self._async_response_hook = None + response_hook = response_hook + async_response_hook = None if hasattr(client._transport, "handle_request"): wrap_function_wrapper( client._transport, "handle_request", - self._handle_request_wrapper, + partial( + self._handle_request_wrapper, + tracer=tracer, + sem_conv_opt_in_mode=sem_conv_opt_in_mode, + request_hook=request_hook, + response_hook=response_hook, + ), ) for transport in client._mounts.values(): # FIXME: check it's not wrapped already? wrap_function_wrapper( transport, "handle_request", - self._handle_request_wrapper, + partial( + self._handle_request_wrapper, + tracer=tracer, + sem_conv_opt_in_mode=sem_conv_opt_in_mode, + request_hook=request_hook, + response_hook=response_hook, + ), ) client._is_instrumented_by_opentelemetry = True if hasattr(client._transport, "handle_async_request"): wrap_function_wrapper( client._transport, "handle_async_request", - self._handle_async_request_wrapper, + partial( + self._handle_async_request_wrapper, + tracer=tracer, + sem_conv_opt_in_mode=sem_conv_opt_in_mode, + async_request_hook=async_request_hook, + async_response_hook=async_response_hook, + ), ) for transport in client._mounts.values(): # FIXME: check it's not wrapped already? wrap_function_wrapper( transport, "handle_async_request", - self._handle_async_request_wrapper, + partial( + self._handle_async_request_wrapper, + tracer=tracer, + sem_conv_opt_in_mode=sem_conv_opt_in_mode, + async_request_hook=async_request_hook, + async_response_hook=async_response_hook, + ), ) client._is_instrumented_by_opentelemetry = True From dfb3b48f1001a6083f1022764d9f28482e6a71e7 Mon Sep 17 00:00:00 2001 From: Riccardo Magliocchetti Date: Tue, 22 Oct 2024 16:56:38 +0200 Subject: [PATCH 10/18] Cast span attributes to dict when comparing against a dict --- .../tests/test_httpx_integration.py | 22 +++++++++---------- 1 file changed, 11 insertions(+), 11 deletions(-) diff --git a/instrumentation/opentelemetry-instrumentation-httpx/tests/test_httpx_integration.py b/instrumentation/opentelemetry-instrumentation-httpx/tests/test_httpx_integration.py index 7e4141eb28..d520e3a9aa 100644 --- a/instrumentation/opentelemetry-instrumentation-httpx/tests/test_httpx_integration.py +++ b/instrumentation/opentelemetry-instrumentation-httpx/tests/test_httpx_integration.py @@ -206,7 +206,7 @@ def test_basic(self): self.assertEqual(span.name, "GET") self.assertEqual( - span.attributes, + dict(span.attributes), { SpanAttributes.HTTP_METHOD: "GET", SpanAttributes.HTTP_URL: self.URL, @@ -230,7 +230,7 @@ def test_nonstandard_http_method(self): self.assertIs(span.kind, trace.SpanKind.CLIENT) self.assertEqual(span.name, "HTTP") self.assertEqual( - span.attributes, + dict(span.attributes), { SpanAttributes.HTTP_METHOD: "_OTHER", SpanAttributes.HTTP_URL: self.URL, @@ -254,7 +254,7 @@ def test_nonstandard_http_method_new_semconv(self): self.assertIs(span.kind, trace.SpanKind.CLIENT) self.assertEqual(span.name, "HTTP") self.assertEqual( - span.attributes, + dict(span.attributes), { HTTP_REQUEST_METHOD: "_OTHER", URL_FULL: self.URL, @@ -294,7 +294,7 @@ def test_basic_new_semconv(self): SpanAttributes.SCHEMA_URL, ) self.assertEqual( - span.attributes, + dict(span.attributes), { HTTP_REQUEST_METHOD: "GET", URL_FULL: url, @@ -329,7 +329,7 @@ def test_basic_both_semconv(self): ) self.assertEqual( - span.attributes, + dict(span.attributes), { SpanAttributes.HTTP_METHOD: "GET", HTTP_REQUEST_METHOD: "GET", @@ -456,7 +456,7 @@ def test_requests_500_error(self): span = self.assert_span() self.assertEqual( - span.attributes, + dict(span.attributes), { SpanAttributes.HTTP_METHOD: "GET", SpanAttributes.HTTP_URL: self.URL, @@ -512,7 +512,7 @@ def test_requests_timeout_exception_new_semconv(self): span = self.assert_span() self.assertEqual( - span.attributes, + dict(span.attributes), { HTTP_REQUEST_METHOD: "GET", URL_FULL: url, @@ -533,7 +533,7 @@ def test_requests_timeout_exception_both_semconv(self): span = self.assert_span() self.assertEqual( - span.attributes, + dict(span.attributes), { SpanAttributes.HTTP_METHOD: "GET", HTTP_REQUEST_METHOD: "GET", @@ -634,7 +634,7 @@ def test_response_hook(self): self.assertEqual(result.text, "Hello!") span = self.assert_span() self.assertEqual( - span.attributes, + dict(span.attributes), { SpanAttributes.HTTP_METHOD: "GET", SpanAttributes.HTTP_URL: self.URL, @@ -811,7 +811,7 @@ def test_response_hook(self): self.assertEqual(result.text, "Hello!") span = self.assert_span() self.assertEqual( - span.attributes, + dict(span.attributes), { SpanAttributes.HTTP_METHOD: "GET", SpanAttributes.HTTP_URL: self.URL, @@ -832,7 +832,7 @@ def test_response_hook_sync_async_kwargs(self): self.assertEqual(result.text, "Hello!") span = self.assert_span() self.assertEqual( - span.attributes, + dict(span.attributes), { SpanAttributes.HTTP_METHOD: "GET", SpanAttributes.HTTP_URL: self.URL, From f356e90a864072898d96f18b8c41aa581f3c1d4c Mon Sep 17 00:00:00 2001 From: Riccardo Magliocchetti Date: Tue, 22 Oct 2024 20:35:04 +0200 Subject: [PATCH 11/18] Please some pylint errors --- .../instrumentation/httpx/__init__.py | 15 +++++++-------- 1 file changed, 7 insertions(+), 8 deletions(-) diff --git a/instrumentation/opentelemetry-instrumentation-httpx/src/opentelemetry/instrumentation/httpx/__init__.py b/instrumentation/opentelemetry-instrumentation-httpx/src/opentelemetry/instrumentation/httpx/__init__.py index 9141711891..2536f36596 100644 --- a/instrumentation/opentelemetry-instrumentation-httpx/src/opentelemetry/instrumentation/httpx/__init__.py +++ b/instrumentation/opentelemetry-instrumentation-httpx/src/opentelemetry/instrumentation/httpx/__init__.py @@ -11,6 +11,7 @@ # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. # See the License for the specific language governing permissions and # limitations under the License. +# pylint: disable=too-many-lines """ Usage ----- @@ -785,13 +786,11 @@ def _instrument(self, **kwargs): ) def _uninstrument(self, **kwargs): - import httpx - unwrap(httpx.HTTPTransport, "handle_request") unwrap(httpx.AsyncHTTPTransport, "handle_async_request") - def _handle_request_wrapper( - self, + @staticmethod + def _handle_request_wrapper( # pylint: disable=too-many-locals wrapped, instance, args, @@ -864,8 +863,8 @@ def _handle_request_wrapper( return response - async def _handle_async_request_wrapper( - self, + @staticmethod + async def _handle_async_request_wrapper( # pylint: disable=too-many-locals wrapped, instance, args, @@ -982,14 +981,14 @@ def instrument_client( async_request_hook = request_hook request_hook = None else: - request_hook = request_hook + # request_hook already set async_request_hook = None if iscoroutinefunction(response_hook): async_response_hook = response_hook response_hook = None else: - response_hook = response_hook + # response_hook already set async_response_hook = None if hasattr(client._transport, "handle_request"): From 921162a3316dd5645f9fac5b25b52d561f449188 Mon Sep 17 00:00:00 2001 From: Riccardo Magliocchetti Date: Wed, 23 Oct 2024 16:15:39 +0200 Subject: [PATCH 12/18] Add tests for subclassed clients --- .../tests/test_httpx_integration.py | 30 +++++++++++++++++++ 1 file changed, 30 insertions(+) diff --git a/instrumentation/opentelemetry-instrumentation-httpx/tests/test_httpx_integration.py b/instrumentation/opentelemetry-instrumentation-httpx/tests/test_httpx_integration.py index d520e3a9aa..3efe081034 100644 --- a/instrumentation/opentelemetry-instrumentation-httpx/tests/test_httpx_integration.py +++ b/instrumentation/opentelemetry-instrumentation-httpx/tests/test_httpx_integration.py @@ -1182,6 +1182,21 @@ def perform_request( def create_proxy_transport(self, url): return httpx.HTTPTransport(proxy=httpx.Proxy(url)) + def test_can_instrument_subclassed_client(self): + class CustomClient(httpx.Client): + pass + + client = CustomClient() + self.assertFalse( + isinstance(client._transport.handle_request, ObjectProxy) + ) + + HTTPXClientInstrumentor().instrument() + + self.assertTrue( + isinstance(client._transport.handle_request, ObjectProxy) + ) + class TestAsyncInstrumentationIntegration(BaseTestCases.BaseInstrumentorTest): response_hook = staticmethod(_async_response_hook) @@ -1257,3 +1272,18 @@ def test_async_request_hook_does_nothing_if_not_coroutine(self): self.assertEqual(result.text, "Hello!") span = self.assert_span() self.assertEqual(span.name, "GET") + + def test_can_instrument_subclassed_async_client(self): + class CustomAsyncClient(httpx.AsyncClient): + pass + + client = CustomAsyncClient() + self.assertFalse( + isinstance(client._transport.handle_async_request, ObjectProxy) + ) + + HTTPXClientInstrumentor().instrument() + + self.assertTrue( + isinstance(client._transport.handle_async_request, ObjectProxy) + ) From eede6a6a57fd50d2189b22a5b6cb2d5af88818bf Mon Sep 17 00:00:00 2001 From: Riccardo Magliocchetti Date: Thu, 24 Oct 2024 15:23:33 +0200 Subject: [PATCH 13/18] Add missing wrapt dependency --- .../opentelemetry-instrumentation-httpx/pyproject.toml | 1 + 1 file changed, 1 insertion(+) diff --git a/instrumentation/opentelemetry-instrumentation-httpx/pyproject.toml b/instrumentation/opentelemetry-instrumentation-httpx/pyproject.toml index 599091716b..c986fac4a1 100644 --- a/instrumentation/opentelemetry-instrumentation-httpx/pyproject.toml +++ b/instrumentation/opentelemetry-instrumentation-httpx/pyproject.toml @@ -29,6 +29,7 @@ dependencies = [ "opentelemetry-instrumentation == 0.49b0.dev", "opentelemetry-semantic-conventions == 0.49b0.dev", "opentelemetry-util-http == 0.49b0.dev", + "wrapt >= 1.0.0, < 2.0.0", ] [project.optional-dependencies] From 1951496951785837d95364d16e8d34a653bd0a23 Mon Sep 17 00:00:00 2001 From: Riccardo Magliocchetti Date: Mon, 28 Oct 2024 10:32:38 +0100 Subject: [PATCH 14/18] Add changelog --- CHANGELOG.md | 2 ++ 1 file changed, 2 insertions(+) diff --git a/CHANGELOG.md b/CHANGELOG.md index 867f30afbb..1b73c88a23 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -39,6 +39,8 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0 ([#2871](https://github.com/open-telemetry/opentelemetry-python-contrib/pull/2871)) - `opentelemetry-instrumentation` Don't fail distro loading if instrumentor raises ImportError, instead skip them ([#2923](https://github.com/open-telemetry/opentelemetry-python-contrib/pull/2923)) +- `opentelemetry-instrumentation-httpx` Rewrote instrumentation to use wrapt instead of subclassing + ([#2909](https://github.com/open-telemetry/opentelemetry-python-contrib/pull/2909)) ## Version 1.27.0/0.48b0 (2024-08-28) From deb4a1a560749bcd7f785af936fe537122eddc61 Mon Sep 17 00:00:00 2001 From: Riccardo Magliocchetti Date: Mon, 28 Oct 2024 21:47:25 +0100 Subject: [PATCH 15/18] Update instrumentation/opentelemetry-instrumentation-httpx/tests/test_httpx_integration.py --- .../tests/test_httpx_integration.py | 1 - 1 file changed, 1 deletion(-) diff --git a/instrumentation/opentelemetry-instrumentation-httpx/tests/test_httpx_integration.py b/instrumentation/opentelemetry-instrumentation-httpx/tests/test_httpx_integration.py index 3efe081034..07699700c4 100644 --- a/instrumentation/opentelemetry-instrumentation-httpx/tests/test_httpx_integration.py +++ b/instrumentation/opentelemetry-instrumentation-httpx/tests/test_httpx_integration.py @@ -1018,7 +1018,6 @@ def test_uninstrument_client_with_proxy(self): result = self.perform_request(self.URL, client=client) self.assertEqual(result.text, "Hello!") - # FIXME: this does fail if uninstrument() has been called before and is a change of behaviour from before self.assert_span(num_spans=0) self.assert_proxy_mounts( client._mounts.values(), From f18b86941491def258d7862aee54231871ac8df4 Mon Sep 17 00:00:00 2001 From: Riccardo Magliocchetti Date: Mon, 28 Oct 2024 22:12:11 +0100 Subject: [PATCH 16/18] Update instrumentation/opentelemetry-instrumentation-httpx/src/opentelemetry/instrumentation/httpx/__init__.py --- .../src/opentelemetry/instrumentation/httpx/__init__.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/instrumentation/opentelemetry-instrumentation-httpx/src/opentelemetry/instrumentation/httpx/__init__.py b/instrumentation/opentelemetry-instrumentation-httpx/src/opentelemetry/instrumentation/httpx/__init__.py index 2536f36596..d5b77e9c29 100644 --- a/instrumentation/opentelemetry-instrumentation-httpx/src/opentelemetry/instrumentation/httpx/__init__.py +++ b/instrumentation/opentelemetry-instrumentation-httpx/src/opentelemetry/instrumentation/httpx/__init__.py @@ -859,7 +859,7 @@ def _handle_request_wrapper( # pylint: disable=too-many-locals span.set_attribute( ERROR_TYPE, type(exception).__qualname__ ) - raise exception + raise exception.with_traceback(exception.__traceback__) return response From f4573e31db8c167f3d9948143fc9f5985d1e13e0 Mon Sep 17 00:00:00 2001 From: Riccardo Magliocchetti Date: Mon, 28 Oct 2024 22:14:09 +0100 Subject: [PATCH 17/18] Update instrumentation/opentelemetry-instrumentation-httpx/src/opentelemetry/instrumentation/httpx/__init__.py --- .../src/opentelemetry/instrumentation/httpx/__init__.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/instrumentation/opentelemetry-instrumentation-httpx/src/opentelemetry/instrumentation/httpx/__init__.py b/instrumentation/opentelemetry-instrumentation-httpx/src/opentelemetry/instrumentation/httpx/__init__.py index d5b77e9c29..c10e322f9b 100644 --- a/instrumentation/opentelemetry-instrumentation-httpx/src/opentelemetry/instrumentation/httpx/__init__.py +++ b/instrumentation/opentelemetry-instrumentation-httpx/src/opentelemetry/instrumentation/httpx/__init__.py @@ -934,7 +934,7 @@ async def _handle_async_request_wrapper( # pylint: disable=too-many-locals span.set_attribute( ERROR_TYPE, type(exception).__qualname__ ) - raise exception + raise exception.with_traceback(exception.__traceback__) return response From b37344666543f271b1e714664743f111fc3d4bdc Mon Sep 17 00:00:00 2001 From: Riccardo Magliocchetti Date: Tue, 29 Oct 2024 21:25:41 +0100 Subject: [PATCH 18/18] Apply suggestions from code review --- .../src/opentelemetry/instrumentation/httpx/__init__.py | 2 -- 1 file changed, 2 deletions(-) diff --git a/instrumentation/opentelemetry-instrumentation-httpx/src/opentelemetry/instrumentation/httpx/__init__.py b/instrumentation/opentelemetry-instrumentation-httpx/src/opentelemetry/instrumentation/httpx/__init__.py index c10e322f9b..d3a2cecfe6 100644 --- a/instrumentation/opentelemetry-instrumentation-httpx/src/opentelemetry/instrumentation/httpx/__init__.py +++ b/instrumentation/opentelemetry-instrumentation-httpx/src/opentelemetry/instrumentation/httpx/__init__.py @@ -1004,7 +1004,6 @@ def instrument_client( ), ) for transport in client._mounts.values(): - # FIXME: check it's not wrapped already? wrap_function_wrapper( transport, "handle_request", @@ -1030,7 +1029,6 @@ def instrument_client( ), ) for transport in client._mounts.values(): - # FIXME: check it's not wrapped already? wrap_function_wrapper( transport, "handle_async_request",