12
12
# See the License for the specific language governing permissions and
13
13
# limitations under the License.
14
14
15
+ import logging
15
16
import typing
16
17
17
18
import httpx
31
32
from opentelemetry .trace .span import Span
32
33
from opentelemetry .trace .status import Status
33
34
35
+ _logger = logging .getLogger (__name__ )
36
+
34
37
URL = typing .Tuple [bytes , bytes , typing .Optional [int ], bytes ]
35
38
Headers = typing .List [typing .Tuple [bytes , bytes ]]
36
39
RequestHook = typing .Callable [[Span , "RequestInfo" ], None ]
@@ -258,98 +261,48 @@ async def handle_async_request(
258
261
return status_code , headers , stream , extensions
259
262
260
263
261
- def _instrument (
262
- tracer_provider : TracerProvider = None ,
263
- request_hook : typing .Optional [RequestHook ] = None ,
264
- response_hook : typing .Optional [ResponseHook ] = None ,
265
- ) -> None :
266
- """Enables tracing of all Client and AsyncClient instances
267
-
268
- When a Client or AsyncClient gets created, a telemetry transport is passed
269
- in to the instance.
270
- """
271
- # pylint:disable=unused-argument
272
- def instrumented_sync_send (wrapped , instance , args , kwargs ):
273
- if context .get_value ("suppress_instrumentation" ):
274
- return wrapped (* args , ** kwargs )
264
+ class _InstrumentedClient (httpx .Client ):
275
265
276
- transport = instance ._transport or httpx .HTTPTransport ()
277
- telemetry_transport = SyncOpenTelemetryTransport (
278
- transport ,
279
- tracer_provider = tracer_provider ,
280
- request_hook = request_hook ,
281
- response_hook = response_hook ,
282
- )
266
+ _tracer_provider = None
267
+ _request_hook = None
268
+ _response_hook = None
283
269
284
- instance . _transport = telemetry_transport
285
- return wrapped (* args , ** kwargs )
270
+ def __init__ ( self , * args , ** kwargs ):
271
+ super (). __init__ (* args , ** kwargs )
286
272
287
- async def instrumented_async_send (wrapped , instance , args , kwargs ):
288
- if context .get_value ("suppress_instrumentation" ):
289
- return await wrapped (* args , ** kwargs )
273
+ self ._original_transport = self ._transport
274
+ self ._is_instrumented_by_opentelemetry = True
290
275
291
- transport = instance ._transport or httpx .AsyncHTTPTransport ()
292
- telemetry_transport = AsyncOpenTelemetryTransport (
293
- transport ,
294
- tracer_provider = tracer_provider ,
295
- request_hook = request_hook ,
296
- response_hook = response_hook ,
276
+ self ._transport = SyncOpenTelemetryTransport (
277
+ self ._transport ,
278
+ tracer_provider = _InstrumentedClient ._tracer_provider ,
279
+ request_hook = _InstrumentedClient ._request_hook ,
280
+ response_hook = _InstrumentedClient ._response_hook ,
297
281
)
298
282
299
- instance ._transport = telemetry_transport
300
- return await wrapped (* args , ** kwargs )
301
283
302
- wrapt . wrap_function_wrapper (httpx .Client , "send" , instrumented_sync_send )
284
+ class _InstrumentedAsyncClient (httpx .AsyncClient ):
303
285
304
- wrapt . wrap_function_wrapper (
305
- httpx . AsyncClient , "send" , instrumented_async_send
306
- )
286
+ _tracer_provider = None
287
+ _request_hook = None
288
+ _response_hook = None
307
289
290
+ def __init__ (self , * args , ** kwargs ):
291
+ super ().__init__ (* args , ** kwargs )
308
292
309
- def _instrument_client (
310
- client : typing .Union [httpx .Client , httpx .AsyncClient ],
311
- tracer_provider : TracerProvider = None ,
312
- request_hook : typing .Optional [RequestHook ] = None ,
313
- response_hook : typing .Optional [ResponseHook ] = None ,
314
- ) -> None :
315
- """Enables instrumentation for the given Client or AsyncClient"""
316
- # pylint: disable=protected-access
317
- if isinstance (client , httpx .Client ):
318
- transport = client ._transport or httpx .HTTPTransport ()
319
- telemetry_transport = SyncOpenTelemetryTransport (
320
- transport ,
321
- tracer_provider = tracer_provider ,
322
- request_hook = request_hook ,
323
- response_hook = response_hook ,
324
- )
325
- elif isinstance (client , httpx .AsyncClient ):
326
- transport = client ._transport or httpx .AsyncHTTPTransport ()
327
- telemetry_transport = AsyncOpenTelemetryTransport (
328
- transport ,
329
- tracer_provider = tracer_provider ,
330
- request_hook = request_hook ,
331
- response_hook = response_hook ,
332
- )
333
- else :
334
- raise TypeError ("Invalid client provided" )
335
- client ._transport = telemetry_transport
293
+ self ._original_transport = self ._transport
294
+ self ._is_instrumented_by_opentelemetry = True
336
295
337
-
338
- def _uninstrument () -> None :
339
- """Disables instrumenting for all newly created Client and AsyncClient instances"""
340
- unwrap (httpx .Client , "send" )
341
- unwrap (httpx .AsyncClient , "send" )
342
-
343
-
344
- def _uninstrument_client (
345
- client : typing .Union [httpx .Client , httpx .AsyncClient ]
346
- ) -> None :
347
- """Disables instrumentation for the given Client or AsyncClient"""
348
- # pylint: disable=protected-access
349
- unwrap (client , "send" )
296
+ self ._transport = AsyncOpenTelemetryTransport (
297
+ self ._transport ,
298
+ tracer_provider = _InstrumentedAsyncClient ._tracer_provider ,
299
+ request_hook = _InstrumentedAsyncClient ._request_hook ,
300
+ response_hook = _InstrumentedAsyncClient ._response_hook ,
301
+ )
350
302
351
303
352
304
class HTTPXClientInstrumentor (BaseInstrumentor ):
305
+ # pylint: disable=protected-access,attribute-defined-outside-init
353
306
"""An instrumentor for httpx Client and AsyncClient
354
307
355
308
See `BaseInstrumentor`
@@ -369,14 +322,31 @@ def _instrument(self, **kwargs):
369
322
``response_hook``: A hook that receives the span, request, and response
370
323
that is called right before the span ends
371
324
"""
372
- _instrument (
373
- tracer_provider = kwargs .get ("tracer_provider" ),
374
- request_hook = kwargs .get ("request_hook" ),
375
- response_hook = kwargs .get ("response_hook" ),
376
- )
325
+ self ._original_client = httpx .Client
326
+ self ._original_async_client = httpx .AsyncClient
327
+ request_hook = kwargs .get ("request_hook" )
328
+ response_hook = kwargs .get ("response_hook" )
329
+ if callable (request_hook ):
330
+ _InstrumentedClient ._request_hook = request_hook
331
+ _InstrumentedAsyncClient ._request_hook = request_hook
332
+ if callable (response_hook ):
333
+ _InstrumentedClient ._response_hook = response_hook
334
+ _InstrumentedAsyncClient ._response_hook = response_hook
335
+ tracer_provider = kwargs .get ("tracer_provider" )
336
+ _InstrumentedClient ._tracer_provider = tracer_provider
337
+ _InstrumentedAsyncClient ._tracer_provider = tracer_provider
338
+ httpx .Client = _InstrumentedClient
339
+ httpx .AsyncClient = _InstrumentedAsyncClient
377
340
378
341
def _uninstrument (self , ** kwargs ):
379
- _uninstrument ()
342
+ httpx .Client = self ._original_client
343
+ httpx .AsyncClient = self ._original_async_client
344
+ _InstrumentedClient ._tracer_provider = None
345
+ _InstrumentedClient ._request_hook = None
346
+ _InstrumentedClient ._response_hook = None
347
+ _InstrumentedAsyncClient ._tracer_provider = None
348
+ _InstrumentedAsyncClient ._request_hook = None
349
+ _InstrumentedAsyncClient ._response_hook = None
380
350
381
351
@staticmethod
382
352
def instrument_client (
@@ -395,12 +365,34 @@ def instrument_client(
395
365
response_hook: A hook that receives the span, request, and response
396
366
that is called right before the span ends
397
367
"""
398
- _instrument_client (
399
- client ,
400
- tracer_provider = tracer_provider ,
401
- request_hook = request_hook ,
402
- response_hook = response_hook ,
403
- )
368
+ # pylint: disable=protected-access
369
+ if not hasattr (client , "_is_instrumented_by_opentelemetry" ):
370
+ client ._is_instrumented_by_opentelemetry = False
371
+
372
+ if not client ._is_instrumented_by_opentelemetry :
373
+ if isinstance (client , httpx .Client ):
374
+ client ._original_transport = client ._transport
375
+ transport = client ._transport or httpx .HTTPTransport ()
376
+ client ._transport = SyncOpenTelemetryTransport (
377
+ transport ,
378
+ tracer_provider = tracer_provider ,
379
+ request_hook = request_hook ,
380
+ response_hook = response_hook ,
381
+ )
382
+ client ._is_instrumented_by_opentelemetry = True
383
+ if isinstance (client , httpx .AsyncClient ):
384
+ transport = client ._transport or httpx .AsyncHTTPTransport ()
385
+ client ._transport = AsyncOpenTelemetryTransport (
386
+ transport ,
387
+ tracer_provider = tracer_provider ,
388
+ request_hook = request_hook ,
389
+ response_hook = response_hook ,
390
+ )
391
+ client ._is_instrumented_by_opentelemetry = True
392
+ else :
393
+ _logger .warning (
394
+ "Attempting to instrument Httpx client while already instrumented"
395
+ )
404
396
405
397
@staticmethod
406
398
def uninstrument_client (
@@ -411,4 +403,12 @@ def uninstrument_client(
411
403
Args:
412
404
client: The httpx Client or AsyncClient instance
413
405
"""
414
- _uninstrument_client (client )
406
+ if hasattr (client , "_original_transport" ):
407
+ client ._transport = client ._original_transport
408
+ del client ._original_transport
409
+ client ._is_instrumented_by_opentelemetry = False
410
+ else :
411
+ _logger .warning (
412
+ "Attempting to uninstrument Httpx "
413
+ "client while already uninstrumented"
414
+ )
0 commit comments