3535run method or the executor's worker thread."
3636"""
3737
38+ from __future__ import annotations
39+
3840import threading
3941from concurrent import futures
40- from typing import Collection
42+ from typing import TYPE_CHECKING , Any , Callable , Collection
4143
42- from wrapt import wrap_function_wrapper
44+ from wrapt import (
45+ wrap_function_wrapper , # type: ignore[reportUnknownVariableType]
46+ )
4347
4448from opentelemetry import context
4549from opentelemetry .instrumentation .instrumentor import BaseInstrumentor
4650from opentelemetry .instrumentation .threading .package import _instruments
4751from opentelemetry .instrumentation .utils import unwrap
4852
53+ if TYPE_CHECKING :
54+ from typing import Protocol , TypeVar
55+
56+ R = TypeVar ("R" )
57+
58+ class HasOtelContext (Protocol ):
59+ _otel_context : context .Context
60+
4961
5062class ThreadingInstrumentor (BaseInstrumentor ):
5163 __WRAPPER_START_METHOD = "start"
@@ -55,12 +67,12 @@ class ThreadingInstrumentor(BaseInstrumentor):
5567 def instrumentation_dependencies (self ) -> Collection [str ]:
5668 return _instruments
5769
58- def _instrument (self , ** kwargs ):
70+ def _instrument (self , ** kwargs : Any ):
5971 self ._instrument_thread ()
6072 self ._instrument_timer ()
6173 self ._instrument_thread_pool ()
6274
63- def _uninstrument (self , ** kwargs ):
75+ def _uninstrument (self , ** kwargs : Any ):
6476 self ._uninstrument_thread ()
6577 self ._uninstrument_timer ()
6678 self ._uninstrument_thread_pool ()
@@ -117,12 +129,22 @@ def _uninstrument_thread_pool():
117129 )
118130
119131 @staticmethod
120- def __wrap_threading_start (call_wrapped , instance , args , kwargs ):
132+ def __wrap_threading_start (
133+ call_wrapped : Callable [[], None ],
134+ instance : HasOtelContext ,
135+ args : ...,
136+ kwargs : ...,
137+ ) -> None :
121138 instance ._otel_context = context .get_current ()
122139 return call_wrapped (* args , ** kwargs )
123140
124141 @staticmethod
125- def __wrap_threading_run (call_wrapped , instance , args , kwargs ):
142+ def __wrap_threading_run (
143+ call_wrapped : Callable [..., R ],
144+ instance : HasOtelContext ,
145+ args : tuple [Any , ...],
146+ kwargs : dict [str , Any ],
147+ ) -> R :
126148 token = None
127149 try :
128150 token = context .attach (instance ._otel_context )
@@ -131,12 +153,17 @@ def __wrap_threading_run(call_wrapped, instance, args, kwargs):
131153 context .detach (token )
132154
133155 @staticmethod
134- def __wrap_thread_pool_submit (call_wrapped , instance , args , kwargs ):
156+ def __wrap_thread_pool_submit (
157+ call_wrapped : Callable [..., R ],
158+ instance : futures .ThreadPoolExecutor ,
159+ args : tuple [Callable [..., Any ], ...],
160+ kwargs : dict [str , Any ],
161+ ) -> R :
135162 # obtain the original function and wrapped kwargs
136163 original_func = args [0 ]
137164 otel_context = context .get_current ()
138165
139- def wrapped_func (* func_args , ** func_kwargs ) :
166+ def wrapped_func (* func_args : Any , ** func_kwargs : Any ) -> R :
140167 token = None
141168 try :
142169 token = context .attach (otel_context )
@@ -145,5 +172,5 @@ def wrapped_func(*func_args, **func_kwargs):
145172 context .detach (token )
146173
147174 # replace the original function with the wrapped function
148- new_args = (wrapped_func ,) + args [1 :]
175+ new_args : tuple [ Callable [..., Any ], ...] = (wrapped_func ,) + args [1 :]
149176 return call_wrapped (* new_args , ** kwargs )
0 commit comments