1515import functools
1616import inspect
1717import threading
18- from typing import Optional
18+ from typing import List , Optional
1919
2020from google .cloud import bigquery
2121import pandas
2222
23- _thread_local_data = threading .local ()
24- _thread_local_data ._api_methods = []
25- _thread_local_data ._call_stack = []
23+ _lock = threading .Lock ()
2624
2725# The limit is 64 (https://cloud.google.com/bigquery/docs/labels-intro#requirements),
2826# but leave a few spare for internal labels to be added.
3230PANDAS_PARAM_TRACKING_TASK = "pandas_param_tracking"
3331LOG_OVERRIDE_NAME = "__log_override_name__"
3432
33+ _api_methods : List = []
3534_excluded_methods = ["__setattr__" , "__getattr__" ]
3635
36+ # Stack to track method calls
37+ _call_stack : List = []
38+
3739
3840def submit_pandas_labels (
3941 bq_client : Optional [bigquery .Client ],
@@ -170,14 +172,11 @@ def wrapper(*args, **kwargs):
170172 base_name = custom_base_name
171173
172174 full_method_name = f"{ base_name .lower ()} -{ api_method_name } "
173- if not hasattr (_thread_local_data , "_call_stack" ):
174- _thread_local_data ._call_stack = []
175-
176175 # Track directly called methods
177- if len (_thread_local_data . _call_stack ) == 0 :
176+ if len (_call_stack ) == 0 :
178177 add_api_method (full_method_name )
179178
180- _thread_local_data . _call_stack .append (full_method_name )
179+ _call_stack .append (full_method_name )
181180
182181 try :
183182 return method (* args , ** kwargs )
@@ -186,7 +185,7 @@ def wrapper(*args, **kwargs):
186185 # or not fully supported (NotImplementedError) in BigFrames.
187186 # Logging is currently supported only when we can access the bqclient through
188187 # _block.session.bqclient.
189- if len (_thread_local_data . _call_stack ) == 1 :
188+ if len (_call_stack ) == 1 :
190189 submit_pandas_labels (
191190 _get_bq_client (* args , ** kwargs ),
192191 base_name ,
@@ -197,7 +196,7 @@ def wrapper(*args, **kwargs):
197196 )
198197 raise e
199198 finally :
200- _thread_local_data . _call_stack .pop ()
199+ _call_stack .pop ()
201200
202201 return wrapper
203202
@@ -215,21 +214,19 @@ def property_logger(prop):
215214 def shared_wrapper (prop ):
216215 @functools .wraps (prop )
217216 def wrapped (* args , ** kwargs ):
218- if not hasattr (_thread_local_data , "_call_stack" ):
219- _thread_local_data ._call_stack = []
220217 qualname_parts = getattr (prop , "__qualname__" , prop .__name__ ).split ("." )
221218 class_name = qualname_parts [- 2 ] if len (qualname_parts ) > 1 else ""
222219 property_name = prop .__name__
223220 full_property_name = f"{ class_name .lower ()} -{ property_name .lower ()} "
224221
225- if len (_thread_local_data . _call_stack ) == 0 :
222+ if len (_call_stack ) == 0 :
226223 add_api_method (full_property_name )
227224
228- _thread_local_data . _call_stack .append (full_property_name )
225+ _call_stack .append (full_property_name )
229226 try :
230227 return prop (* args , ** kwargs )
231228 finally :
232- _thread_local_data . _call_stack .pop ()
229+ _call_stack .pop ()
233230
234231 return wrapped
235232
@@ -254,26 +251,23 @@ def wrapper(func):
254251
255252
256253def add_api_method (api_method_name ):
257- if not hasattr (_thread_local_data , "_api_methods" ):
258- _thread_local_data ._api_methods = []
259-
260- # Push the method to the front of the _api_methods list
261- _thread_local_data ._api_methods .insert (
262- 0 , api_method_name .replace ("<" , "" ).replace (">" , "" )
263- )
264- # Keep the list length within the maximum limit
265- _thread_local_data ._api_methods = _thread_local_data ._api_methods [:MAX_LABELS_COUNT ]
254+ global _lock
255+ global _api_methods
256+ with _lock :
257+ # Push the method to the front of the _api_methods list
258+ _api_methods .insert (0 , api_method_name .replace ("<" , "" ).replace (">" , "" ))
259+ # Keep the list length within the maximum limit (adjust MAX_LABELS_COUNT as needed)
260+ _api_methods = _api_methods [:MAX_LABELS_COUNT ]
266261
267262
268263def get_and_reset_api_methods (dry_run : bool = False ):
269- if not hasattr (_thread_local_data , "_api_methods" ):
270- _thread_local_data ._api_methods = []
271-
272- previous_api_methods = list (_thread_local_data ._api_methods )
264+ global _lock
265+ with _lock :
266+ previous_api_methods = list (_api_methods )
273267
274- # dry_run might not make a job resource, so only reset the log on real queries.
275- if not dry_run :
276- _thread_local_data . _api_methods .clear ()
268+ # dry_run might not make a job resource, so only reset the log on real queries.
269+ if not dry_run :
270+ _api_methods .clear ()
277271 return previous_api_methods
278272
279273
0 commit comments