Skip to content

Commit cf34585

Browse files
committed
Revert "fix(core): Make log_adapter state thread-local"
This reverts commit e2c38d3.
1 parent e2c38d3 commit cf34585

File tree

2 files changed

+26
-47
lines changed

2 files changed

+26
-47
lines changed

bigframes/core/log_adapter.py

Lines changed: 26 additions & 32 deletions
Original file line numberDiff line numberDiff line change
@@ -15,14 +15,12 @@
1515
import functools
1616
import inspect
1717
import threading
18-
from typing import Optional
18+
from typing import List, Optional
1919

2020
from google.cloud import bigquery
2121
import pandas
2222

23-
_thread_local_data = threading.local()
24-
_thread_local_data._api_methods = []
25-
_thread_local_data._call_stack = []
23+
_lock = threading.Lock()
2624

2725
# The limit is 64 (https://cloud.google.com/bigquery/docs/labels-intro#requirements),
2826
# but leave a few spare for internal labels to be added.
@@ -32,8 +30,12 @@
3230
PANDAS_PARAM_TRACKING_TASK = "pandas_param_tracking"
3331
LOG_OVERRIDE_NAME = "__log_override_name__"
3432

33+
_api_methods: List = []
3534
_excluded_methods = ["__setattr__", "__getattr__"]
3635

36+
# Stack to track method calls
37+
_call_stack: List = []
38+
3739

3840
def submit_pandas_labels(
3941
bq_client: Optional[bigquery.Client],
@@ -170,14 +172,11 @@ def wrapper(*args, **kwargs):
170172
base_name = custom_base_name
171173

172174
full_method_name = f"{base_name.lower()}-{api_method_name}"
173-
if not hasattr(_thread_local_data, "_call_stack"):
174-
_thread_local_data._call_stack = []
175-
176175
# Track directly called methods
177-
if len(_thread_local_data._call_stack) == 0:
176+
if len(_call_stack) == 0:
178177
add_api_method(full_method_name)
179178

180-
_thread_local_data._call_stack.append(full_method_name)
179+
_call_stack.append(full_method_name)
181180

182181
try:
183182
return method(*args, **kwargs)
@@ -186,7 +185,7 @@ def wrapper(*args, **kwargs):
186185
# or not fully supported (NotImplementedError) in BigFrames.
187186
# Logging is currently supported only when we can access the bqclient through
188187
# _block.session.bqclient.
189-
if len(_thread_local_data._call_stack) == 1:
188+
if len(_call_stack) == 1:
190189
submit_pandas_labels(
191190
_get_bq_client(*args, **kwargs),
192191
base_name,
@@ -197,7 +196,7 @@ def wrapper(*args, **kwargs):
197196
)
198197
raise e
199198
finally:
200-
_thread_local_data._call_stack.pop()
199+
_call_stack.pop()
201200

202201
return wrapper
203202

@@ -215,21 +214,19 @@ def property_logger(prop):
215214
def shared_wrapper(prop):
216215
@functools.wraps(prop)
217216
def wrapped(*args, **kwargs):
218-
if not hasattr(_thread_local_data, "_call_stack"):
219-
_thread_local_data._call_stack = []
220217
qualname_parts = getattr(prop, "__qualname__", prop.__name__).split(".")
221218
class_name = qualname_parts[-2] if len(qualname_parts) > 1 else ""
222219
property_name = prop.__name__
223220
full_property_name = f"{class_name.lower()}-{property_name.lower()}"
224221

225-
if len(_thread_local_data._call_stack) == 0:
222+
if len(_call_stack) == 0:
226223
add_api_method(full_property_name)
227224

228-
_thread_local_data._call_stack.append(full_property_name)
225+
_call_stack.append(full_property_name)
229226
try:
230227
return prop(*args, **kwargs)
231228
finally:
232-
_thread_local_data._call_stack.pop()
229+
_call_stack.pop()
233230

234231
return wrapped
235232

@@ -254,26 +251,23 @@ def wrapper(func):
254251

255252

256253
def add_api_method(api_method_name):
257-
if not hasattr(_thread_local_data, "_api_methods"):
258-
_thread_local_data._api_methods = []
259-
260-
# Push the method to the front of the _api_methods list
261-
_thread_local_data._api_methods.insert(
262-
0, api_method_name.replace("<", "").replace(">", "")
263-
)
264-
# Keep the list length within the maximum limit
265-
_thread_local_data._api_methods = _thread_local_data._api_methods[:MAX_LABELS_COUNT]
254+
global _lock
255+
global _api_methods
256+
with _lock:
257+
# Push the method to the front of the _api_methods list
258+
_api_methods.insert(0, api_method_name.replace("<", "").replace(">", ""))
259+
# Keep the list length within the maximum limit (adjust MAX_LABELS_COUNT as needed)
260+
_api_methods = _api_methods[:MAX_LABELS_COUNT]
266261

267262

268263
def get_and_reset_api_methods(dry_run: bool = False):
269-
if not hasattr(_thread_local_data, "_api_methods"):
270-
_thread_local_data._api_methods = []
271-
272-
previous_api_methods = list(_thread_local_data._api_methods)
264+
global _lock
265+
with _lock:
266+
previous_api_methods = list(_api_methods)
273267

274-
# dry_run might not make a job resource, so only reset the log on real queries.
275-
if not dry_run:
276-
_thread_local_data._api_methods.clear()
268+
# dry_run might not make a job resource, so only reset the log on real queries.
269+
if not dry_run:
270+
_api_methods.clear()
277271
return previous_api_methods
278272

279273

bigframes/testing/mocks.py

Lines changed: 0 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -27,10 +27,7 @@
2727
import bigframes
2828
import bigframes.clients
2929
import bigframes.core.global_session
30-
import bigframes.core.log_adapter
3130
import bigframes.dataframe
32-
import bigframes.session._io.bigquery
33-
from bigframes.session._io.bigquery import create_job_configs_labels
3431
import bigframes.session.clients
3532

3633
"""Utilities for creating test resources."""
@@ -93,18 +90,6 @@ def query_mock(
9390
job_config: Optional[google.cloud.bigquery.QueryJobConfig] = None,
9491
**kwargs,
9592
):
96-
job_config = (
97-
job_config
98-
if job_config is not None
99-
else google.cloud.bigquery.QueryJobConfig()
100-
)
101-
api_methods = bigframes.core.log_adapter.get_and_reset_api_methods(
102-
dry_run=job_config.dry_run
103-
)
104-
job_config.labels = create_job_configs_labels(
105-
job_configs_labels=job_config.labels,
106-
api_methods=api_methods,
107-
)
10893
queries.append(query)
10994
job_configs.append(copy.deepcopy(job_config))
11095
query_job = mock.create_autospec(google.cloud.bigquery.QueryJob, instance=True)

0 commit comments

Comments
 (0)