11from __future__ import absolute_import
2+ from __future__ import annotations
23
34import _thread
45import abc
56import os .path
67import sys
78import time
8- import types
9- import typing
9+ from types import CodeType
10+ from types import FrameType
11+ from types import ModuleType
12+ from typing import Any
13+ from typing import Callable
14+ from typing import Dict
15+ from typing import List
16+ from typing import Optional
17+ from typing import Tuple
18+ from typing import Type
1019
1120import wrapt
1221
1524from ddtrace .profiling import collector
1625from ddtrace .profiling .collector import _task
1726from ddtrace .profiling .collector import _traceback
27+ from ddtrace .profiling .event import DDFrame
1828from ddtrace .settings .profiling import config
1929from ddtrace .trace import Tracer
2030
2131
22- T = typing .TypeVar ("T" )
23-
24-
25- def _current_thread () -> typing .Tuple [int , str ]:
26- thread_id = _thread .get_ident ()
32+ def _current_thread () -> Tuple [int , str ]:
33+ thread_id : int = _thread .get_ident ()
2734 return thread_id , _threading .get_thread_name (thread_id )
2835
2936
3037# We need to know if wrapt is compiled in C or not. If it's not using the C module, then the wrappers function will
3138# appear in the stack trace and we need to hide it.
39+ WRAPT_C_EXT : bool
3240if os .environ .get ("WRAPT_DISABLE_EXTENSIONS" ):
3341 WRAPT_C_EXT = False
3442else :
@@ -44,55 +52,69 @@ def _current_thread() -> typing.Tuple[int, str]:
4452class _ProfiledLock (wrapt .ObjectProxy ):
4553 def __init__ (
4654 self ,
47- wrapped : typing . Any ,
48- tracer : typing . Optional [Tracer ],
55+ wrapped : Any ,
56+ tracer : Optional [Tracer ],
4957 max_nframes : int ,
5058 capture_sampler : collector .CaptureSampler ,
5159 endpoint_collection_enabled : bool ,
5260 ) -> None :
5361 wrapt .ObjectProxy .__init__ (self , wrapped )
54- self ._self_tracer = tracer
55- self ._self_max_nframes = max_nframes
56- self ._self_capture_sampler = capture_sampler
57- self ._self_endpoint_collection_enabled = endpoint_collection_enabled
58- frame = sys ._getframe (2 if WRAPT_C_EXT else 3 )
59- code = frame .f_code
60- self ._self_init_loc = "%s:%d" % (os .path .basename (code .co_filename ), frame .f_lineno )
61- self ._self_name : typing .Optional [str ] = None
62-
63- def __aenter__ (self , * args : typing .Any , ** kwargs : typing .Any ) -> typing .Any :
62+ self ._self_tracer : Optional [Tracer ] = tracer
63+ self ._self_max_nframes : int = max_nframes
64+ self ._self_capture_sampler : collector .CaptureSampler = capture_sampler
65+ self ._self_endpoint_collection_enabled : bool = endpoint_collection_enabled
66+ frame : FrameType = sys ._getframe (2 if WRAPT_C_EXT else 3 )
67+ code : CodeType = frame .f_code
68+ self ._self_init_loc : str = "%s:%d" % (os .path .basename (code .co_filename ), frame .f_lineno )
69+ self ._self_acquired_at : int = 0
70+ self ._self_name : Optional [str ] = None
71+
72+ def __aenter__ (self , * args : Any , ** kwargs : Any ) -> Any :
6473 return self ._acquire (self .__wrapped__ .__aenter__ , * args , ** kwargs )
6574
66- def __aexit__ (self , * args : typing . Any , ** kwargs : typing . Any ) -> None :
75+ def __aexit__ (self , * args : Any , ** kwargs : Any ) -> Any :
6776 return self ._release (self .__wrapped__ .__aexit__ , * args , ** kwargs )
6877
69- def _acquire (self , inner_func : typing . Callable [..., T ], * args : typing . Any , ** kwargs : typing . Any ) -> T :
78+ def _acquire (self , inner_func : Callable [..., Any ], * args : Any , ** kwargs : Any ) -> Any :
7079 if not self ._self_capture_sampler .capture ():
7180 return inner_func (* args , ** kwargs )
7281
73- start = time .monotonic_ns ()
82+ start : int = time .monotonic_ns ()
7483 try :
7584 return inner_func (* args , ** kwargs )
7685 finally :
7786 try :
78- end = self ._self_acquired_at = time .monotonic_ns ()
87+ end : int = time .monotonic_ns ()
88+ self ._self_acquired_at = end
89+
90+ thread_id : int
91+ thread_name : str
7992 thread_id , thread_name = _current_thread ()
93+
94+ task_id : Optional [int ]
95+ task_name : Optional [str ]
96+ task_frame : Optional [FrameType ]
8097 task_id , task_name , task_frame = _task .get_task (thread_id )
98+
8199 self ._maybe_update_self_name ()
82- lock_name = "%s:%s" % (self ._self_init_loc , self ._self_name ) if self ._self_name else self ._self_init_loc
100+ lock_name : str = (
101+ "%s:%s" % (self ._self_init_loc , self ._self_name ) if self ._self_name else self ._self_init_loc
102+ )
83103
104+ frame : FrameType
84105 if task_frame is None :
85106 # If we can't get the task frame, we use the caller frame. We expect acquire/release or
86107 # __enter__/__exit__ to be on the stack, so we go back 2 frames.
87108 frame = sys ._getframe (2 )
88109 else :
89110 frame = task_frame
90111
112+ frames : List [DDFrame ]
91113 frames , _ = _traceback .pyframe_to_frames (frame , self ._self_max_nframes )
92114
93- thread_native_id = _threading .get_thread_native_id (thread_id )
115+ thread_native_id : int = _threading .get_thread_native_id (thread_id )
94116
95- handle = ddup .SampleHandle ()
117+ handle : ddup . SampleHandle = ddup .SampleHandle ()
96118 handle .push_monotonic_ns (end )
97119 handle .push_lock_name (lock_name )
98120 handle .push_acquire (end - start , 1 ) # AFAICT, capture_pct does not adjust anything here
@@ -102,26 +124,22 @@ def _acquire(self, inner_func: typing.Callable[..., T], *args: typing.Any, **kwa
102124
103125 if self ._self_tracer is not None :
104126 handle .push_span (self ._self_tracer .current_span ())
105-
106- for f in frames :
107- handle .push_frame (f .function_name , f .file_name , 0 , f .lineno )
108-
127+ for ddframe in frames :
128+ handle .push_frame (ddframe .function_name , ddframe .file_name , 0 , ddframe .lineno )
109129 handle .flush_sample ()
110130 except Exception :
111131 pass # nosec
112132
113- def acquire (self , * args : typing . Any , ** kwargs : typing . Any ) -> typing . Any :
133+ def acquire (self , * args : Any , ** kwargs : Any ) -> Any :
114134 return self ._acquire (self .__wrapped__ .acquire , * args , ** kwargs )
115135
116- def _release (self , inner_func : typing .Callable [..., typing .Any ], * args : typing .Any , ** kwargs : typing .Any ) -> None :
117- # type (typing.Any, typing.Any) -> None
118-
136+ def _release (self , inner_func : Callable [..., Any ], * args : Any , ** kwargs : Any ) -> None :
119137 # The underlying threading.Lock class is implemented using C code, and
120138 # it doesn't have the __dict__ attribute. So we can't do
121139 # self.__dict__.pop("_self_acquired_at", None) to remove the attribute.
122140 # Instead, we need to use the following workaround to retrieve and
123141 # remove the attribute.
124- start = getattr (self , "_self_acquired_at" , None )
142+ start : Optional [ int ] = getattr (self , "_self_acquired_at" , None )
125143 try :
126144 # Though it should generally be avoided to call release() from
127145 # multiple threads, it is possible to do so. In that scenario, the
@@ -137,22 +155,34 @@ def _release(self, inner_func: typing.Callable[..., typing.Any], *args: typing.A
137155 return inner_func (* args , ** kwargs )
138156 finally :
139157 if start is not None :
140- end = time .monotonic_ns ()
158+ end : int = time .monotonic_ns ()
159+
160+ thread_id : int
161+ thread_name : str
141162 thread_id , thread_name = _current_thread ()
163+
164+ task_id : Optional [int ]
165+ task_name : Optional [str ]
166+ task_frame : Optional [FrameType ]
142167 task_id , task_name , task_frame = _task .get_task (thread_id )
143- lock_name = "%s:%s" % (self ._self_init_loc , self ._self_name ) if self ._self_name else self ._self_init_loc
144168
169+ lock_name : str = (
170+ "%s:%s" % (self ._self_init_loc , self ._self_name ) if self ._self_name else self ._self_init_loc
171+ )
172+
173+ frame : FrameType
145174 if task_frame is None :
146175 # See the comments in _acquire
147176 frame = sys ._getframe (2 )
148177 else :
149178 frame = task_frame
150179
180+ frames : List [DDFrame ]
151181 frames , _ = _traceback .pyframe_to_frames (frame , self ._self_max_nframes )
152182
153- thread_native_id = _threading .get_thread_native_id (thread_id )
183+ thread_native_id : int = _threading .get_thread_native_id (thread_id )
154184
155- handle = ddup .SampleHandle ()
185+ handle : ddup . SampleHandle = ddup .SampleHandle ()
156186 handle .push_monotonic_ns (end )
157187 handle .push_lock_name (lock_name )
158188 handle .push_release (end - start , 1 ) # AFAICT, capture_pct does not adjust anything here
@@ -162,25 +192,22 @@ def _release(self, inner_func: typing.Callable[..., typing.Any], *args: typing.A
162192
163193 if self ._self_tracer is not None :
164194 handle .push_span (self ._self_tracer .current_span ())
165-
166- for f in frames :
167- handle .push_frame (f .function_name , f .file_name , 0 , f .lineno )
195+ for ddframe in frames :
196+ handle .push_frame (ddframe .function_name , ddframe .file_name , 0 , ddframe .lineno )
168197 handle .flush_sample ()
169198
170- def release (self , * args : typing . Any , ** kwargs : typing . Any ) -> None :
199+ def release (self , * args : Any , ** kwargs : Any ) -> Any :
171200 return self ._release (self .__wrapped__ .release , * args , ** kwargs )
172201
173- acquire_lock = acquire
174-
175- def __enter__ (self , * args : typing .Any , ** kwargs : typing .Any ) -> typing .Any :
202+ def __enter__ (self , * args : Any , ** kwargs : Any ) -> Any :
176203 return self ._acquire (self .__wrapped__ .__enter__ , * args , ** kwargs )
177204
178- def __exit__ (self , * args : typing . Any , ** kwargs : typing . Any ) -> None :
205+ def __exit__ (self , * args : Any , ** kwargs : Any ) -> None :
179206 self ._release (self .__wrapped__ .__exit__ , * args , ** kwargs )
180207
181- def _find_self_name (self , var_dict : typing . Dict ) -> typing . Optional [str ]:
208+ def _find_self_name (self , var_dict : Dict [ str , Any ] ) -> Optional [str ]:
182209 for name , value in var_dict .items ():
183- if name .startswith ("__" ) or isinstance (value , types . ModuleType ):
210+ if name .startswith ("__" ) or isinstance (value , ModuleType ):
184211 continue
185212 if value is self :
186213 return name
@@ -202,7 +229,8 @@ def _maybe_update_self_name(self) -> None:
202229 # 2: acquire/release (or __enter__/__exit__)
203230 # 3: caller frame
204231 if config .enable_asserts :
205- frame = sys ._getframe (1 )
232+ frame : FrameType = sys ._getframe (1 )
233+ # TODO: replace dict with list
206234 if frame .f_code .co_name not in {"_acquire" , "_release" }:
207235 raise AssertionError ("Unexpected frame %s" % frame .f_code .co_name )
208236 frame = sys ._getframe (2 )
@@ -228,43 +256,36 @@ class FunctionWrapper(wrapt.FunctionWrapper):
228256 # Override the __get__ method: whatever happens, _allocate_lock is always considered by Python like a "static"
229257 # method, even when used as a class attribute. Python never tried to "bind" it to a method, because it sees it is a
230258 # builtin function. Override default wrapt behavior here that tries to detect bound method.
231- def __get__ (
232- self ,
233- instance : typing .Optional [typing .Any ],
234- owner : typing .Optional [typing .Any ] = None ,
235- ) -> "FunctionWrapper" :
259+ def __get__ (self , instance : Any , owner : Optional [Type ] = None ) -> FunctionWrapper :
236260 return self
237261
238262
239263class LockCollector (collector .CaptureSamplerCollector ):
240264 """Record lock usage."""
241265
242- PROFILED_LOCK_CLASS : typing . Type [typing . Any ]
266+ PROFILED_LOCK_CLASS : Type [Any ]
243267
244268 def __init__ (
245269 self ,
246270 nframes : int = config .max_frames ,
247271 endpoint_collection_enabled : bool = config .endpoint_collection ,
248- tracer : typing . Optional [Tracer ] = None ,
249- * args : typing . Any ,
250- ** kwargs : typing . Any ,
272+ tracer : Optional [Tracer ] = None ,
273+ * args : Any ,
274+ ** kwargs : Any ,
251275 ) -> None :
252276 super ().__init__ (* args , ** kwargs )
253- self .nframes = nframes
254- self .endpoint_collection_enabled = endpoint_collection_enabled
255- self .tracer = tracer
256- self ._original : typing . Optional [typing . Any ] = None
277+ self .nframes : int = nframes
278+ self .endpoint_collection_enabled : bool = endpoint_collection_enabled
279+ self .tracer : Optional [ Tracer ] = tracer
280+ self ._original : Optional [Any ] = None
257281
258282 @abc .abstractmethod
259- def _get_patch_target (self ) -> typing . Type [ typing . Any ]:
260- raise NotImplementedError
283+ def _get_patch_target (self ) -> Callable [..., Any ]:
284+ ...
261285
262286 @abc .abstractmethod
263- def _set_patch_target (
264- self ,
265- value : typing .Any ,
266- ) -> None :
267- raise NotImplementedError
287+ def _set_patch_target (self , value : Any ) -> None :
288+ ...
268289
269290 def _start_service (self ) -> None :
270291 """Start collecting lock usage."""
@@ -282,13 +303,9 @@ def patch(self) -> None:
282303 # Nobody should use locks from `_thread`; if they do so, then it's deliberate and we don't profile.
283304 self ._original = self ._get_patch_target ()
284305
285- def _allocate_lock (
286- wrapped : typing .Any ,
287- instance : typing .Any ,
288- args : typing .Tuple [typing .Any , ...],
289- kwargs : typing .Dict [str , typing .Any ],
290- ) -> typing .Any :
291- lock = wrapped (* args , ** kwargs )
306+ # TODO: `instance` is unused
307+ def _allocate_lock (wrapped : Any , instance : Any , args : Any , kwargs : Any ) -> _ProfiledLock :
308+ lock : Any = wrapped (* args , ** kwargs )
292309 return self .PROFILED_LOCK_CLASS (
293310 lock ,
294311 self .tracer ,
0 commit comments