Skip to content

Commit da37742

Browse files
chore: update log adapter to support session-scoped api method logging
Updates `log_adapter` to optionally store API method names in a `Session` instance instead of the global list. This improves label accuracy when running tests in parallel. - Updates `Session` to initialize `_api_methods` list and lock. - Updates `log_adapter.add_api_method` and `get_and_reset_api_methods` to handle session-scoped logging. - Updates `log_adapter.method_logger` and `property_logger` to identify the session from arguments. - Propagates `session` through `start_query_with_client` and its callers to ensure labels are correctly associated with the session.
1 parent 7d152d3 commit da37742

File tree

6 files changed

+69
-14
lines changed

6 files changed

+69
-14
lines changed

bigframes/core/log_adapter.py

Lines changed: 44 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -174,7 +174,8 @@ def wrapper(*args, **kwargs):
174174
full_method_name = f"{base_name.lower()}-{api_method_name}"
175175
# Track directly called methods
176176
if len(_call_stack) == 0:
177-
add_api_method(full_method_name)
177+
session = _find_session(*args, **kwargs)
178+
add_api_method(full_method_name, session=session)
178179

179180
_call_stack.append(full_method_name)
180181

@@ -220,7 +221,8 @@ def wrapped(*args, **kwargs):
220221
full_property_name = f"{class_name.lower()}-{property_name.lower()}"
221222

222223
if len(_call_stack) == 0:
223-
add_api_method(full_property_name)
224+
session = _find_session(*args, **kwargs)
225+
add_api_method(full_property_name, session=session)
224226

225227
_call_stack.append(full_property_name)
226228
try:
@@ -250,25 +252,40 @@ def wrapper(func):
250252
return wrapper
251253

252254

253-
def add_api_method(api_method_name):
255+
def add_api_method(api_method_name, session=None):
254256
global _lock
255257
global _api_methods
256-
with _lock:
257-
# Push the method to the front of the _api_methods list
258-
_api_methods.insert(0, api_method_name.replace("<", "").replace(">", ""))
259-
# Keep the list length within the maximum limit (adjust MAX_LABELS_COUNT as needed)
260-
_api_methods = _api_methods[:MAX_LABELS_COUNT]
258+
259+
if session is not None:
260+
with session._api_methods_lock:
261+
session._api_methods.insert(
262+
0, api_method_name.replace("<", "").replace(">", "")
263+
)
264+
session._api_methods = session._api_methods[:MAX_LABELS_COUNT]
265+
else:
266+
with _lock:
267+
# Push the method to the front of the _api_methods list
268+
_api_methods.insert(0, api_method_name.replace("<", "").replace(">", ""))
269+
# Keep the list length within the maximum limit (adjust MAX_LABELS_COUNT as needed)
270+
_api_methods = _api_methods[:MAX_LABELS_COUNT]
261271

262272

263-
def get_and_reset_api_methods(dry_run: bool = False):
273+
def get_and_reset_api_methods(dry_run: bool = False, session=None):
264274
global _lock
275+
methods = []
276+
if session is not None:
277+
with session._api_methods_lock:
278+
methods.extend(session._api_methods)
279+
if not dry_run:
280+
session._api_methods.clear()
281+
265282
with _lock:
266-
previous_api_methods = list(_api_methods)
283+
methods.extend(_api_methods)
267284

268285
# dry_run might not make a job resource, so only reset the log on real queries.
269286
if not dry_run:
270287
_api_methods.clear()
271-
return previous_api_methods
288+
return methods
272289

273290

274291
def _get_bq_client(*args, **kwargs):
@@ -283,3 +300,19 @@ def _get_bq_client(*args, **kwargs):
283300
return kwargv._block.session.bqclient
284301

285302
return None
303+
304+
305+
def _find_session(*args, **kwargs):
306+
# This function cannot import Session at the top level because Session
307+
# imports log_adapter.
308+
# We can't import bigframes.session in type checking block either.
309+
from bigframes.session import Session
310+
311+
if args and isinstance(args[0], Session):
312+
return args[0]
313+
314+
session = kwargs.get("session")
315+
if session is not None and isinstance(session, Session):
316+
return session
317+
318+
return None

bigframes/session/__init__.py

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -208,6 +208,9 @@ def __init__(
208208
self._session_id: str = "session" + secrets.token_hex(3)
209209
# store table ids and delete them when the session is closed
210210

211+
self._api_methods: list[str] = []
212+
self._api_methods_lock = threading.Lock()
213+
211214
self._objects: list[
212215
weakref.ReferenceType[
213216
Union[
@@ -2160,6 +2163,7 @@ def _start_query_ml_ddl(
21602163
query_with_job=True,
21612164
job_retry=third_party_gcb_retry.DEFAULT_ML_JOB_RETRY,
21622165
publisher=self._publisher,
2166+
session=self,
21632167
)
21642168
return iterator, query_job
21652169

@@ -2188,6 +2192,7 @@ def _create_object_table(self, path: str, connection: str) -> str:
21882192
timeout=None,
21892193
query_with_job=True,
21902194
publisher=self._publisher,
2195+
session=self,
21912196
)
21922197

21932198
return table

bigframes/session/_io/bigquery/__init__.py

Lines changed: 10 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -228,12 +228,14 @@ def format_option(key: str, value: Union[bool, str]) -> str:
228228
return f"{key}={repr(value)}"
229229

230230

231-
def add_and_trim_labels(job_config):
231+
def add_and_trim_labels(job_config, session=None):
232232
"""
233233
Add additional labels to the job configuration and trim the total number of labels
234234
to ensure they do not exceed MAX_LABELS_COUNT labels per job.
235235
"""
236-
api_methods = log_adapter.get_and_reset_api_methods(dry_run=job_config.dry_run)
236+
api_methods = log_adapter.get_and_reset_api_methods(
237+
dry_run=job_config.dry_run, session=session
238+
)
237239
job_config.labels = create_job_configs_labels(
238240
job_configs_labels=job_config.labels,
239241
api_methods=api_methods,
@@ -270,6 +272,7 @@ def start_query_with_client(
270272
metrics: Optional[bigframes.session.metrics.ExecutionMetrics],
271273
query_with_job: Literal[True],
272274
publisher: bigframes.core.events.Publisher,
275+
session=None,
273276
) -> Tuple[google.cloud.bigquery.table.RowIterator, bigquery.QueryJob]:
274277
...
275278

@@ -286,6 +289,7 @@ def start_query_with_client(
286289
metrics: Optional[bigframes.session.metrics.ExecutionMetrics],
287290
query_with_job: Literal[False],
288291
publisher: bigframes.core.events.Publisher,
292+
session=None,
289293
) -> Tuple[google.cloud.bigquery.table.RowIterator, Optional[bigquery.QueryJob]]:
290294
...
291295

@@ -303,6 +307,7 @@ def start_query_with_client(
303307
query_with_job: Literal[True],
304308
job_retry: google.api_core.retry.Retry,
305309
publisher: bigframes.core.events.Publisher,
310+
session=None,
306311
) -> Tuple[google.cloud.bigquery.table.RowIterator, bigquery.QueryJob]:
307312
...
308313

@@ -320,6 +325,7 @@ def start_query_with_client(
320325
query_with_job: Literal[False],
321326
job_retry: google.api_core.retry.Retry,
322327
publisher: bigframes.core.events.Publisher,
328+
session=None,
323329
) -> Tuple[google.cloud.bigquery.table.RowIterator, Optional[bigquery.QueryJob]]:
324330
...
325331

@@ -340,14 +346,15 @@ def start_query_with_client(
340346
# version 3.36.0 or later.
341347
job_retry: google.api_core.retry.Retry = third_party_gcb_retry.DEFAULT_JOB_RETRY,
342348
publisher: bigframes.core.events.Publisher,
349+
session=None,
343350
) -> Tuple[google.cloud.bigquery.table.RowIterator, Optional[bigquery.QueryJob]]:
344351
"""
345352
Starts query job and waits for results.
346353
"""
347354
# Note: Ensure no additional labels are added to job_config after this
348355
# point, as `add_and_trim_labels` ensures the label count does not
349356
# exceed MAX_LABELS_COUNT.
350-
add_and_trim_labels(job_config)
357+
add_and_trim_labels(job_config, session=session)
351358

352359
try:
353360
if not query_with_job:

bigframes/session/bq_caching_executor.py

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -323,6 +323,7 @@ def _export_gbq(
323323
iterator, job = self._run_execute_query(
324324
sql=sql,
325325
job_config=job_config,
326+
session=array_value.session,
326327
)
327328

328329
has_timedelta_col = any(
@@ -389,6 +390,7 @@ def _run_execute_query(
389390
sql: str,
390391
job_config: Optional[bq_job.QueryJobConfig] = None,
391392
query_with_job: bool = True,
393+
session=None,
392394
) -> Tuple[bq_table.RowIterator, Optional[bigquery.QueryJob]]:
393395
"""
394396
Starts BigQuery query job and waits for results.
@@ -415,6 +417,7 @@ def _run_execute_query(
415417
timeout=None,
416418
query_with_job=True,
417419
publisher=self._publisher,
420+
session=session,
418421
)
419422
else:
420423
return bq_io.start_query_with_client(
@@ -427,6 +430,7 @@ def _run_execute_query(
427430
timeout=None,
428431
query_with_job=False,
429432
publisher=self._publisher,
433+
session=session,
430434
)
431435

432436
except google.api_core.exceptions.BadRequest as e:
@@ -661,6 +665,7 @@ def _execute_plan_gbq(
661665
sql=compiled.sql,
662666
job_config=job_config,
663667
query_with_job=(destination_table is not None),
668+
session=plan.session,
664669
)
665670

666671
# we could actually cache even when caching is not explicitly requested, but being conservative for now

bigframes/session/direct_gbq_execution.py

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -60,6 +60,7 @@ def execute(
6060

6161
iterator, query_job = self._run_execute_query(
6262
sql=compiled.sql,
63+
session=plan.session,
6364
)
6465

6566
# just immediately downlaod everything for simplicity
@@ -75,6 +76,7 @@ def _run_execute_query(
7576
self,
7677
sql: str,
7778
job_config: Optional[bq_job.QueryJobConfig] = None,
79+
session=None,
7880
) -> Tuple[bq_table.RowIterator, Optional[bigquery.QueryJob]]:
7981
"""
8082
Starts BigQuery query job and waits for results.
@@ -89,4 +91,5 @@ def _run_execute_query(
8991
metrics=None,
9092
query_with_job=False,
9193
publisher=self._publisher,
94+
session=session,
9295
)

bigframes/session/loader.py

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1284,6 +1284,7 @@ def _start_query_with_job_optional(
12841284
metrics=None,
12851285
query_with_job=False,
12861286
publisher=self._publisher,
1287+
session=self._session,
12871288
)
12881289
return rows
12891290

@@ -1310,6 +1311,7 @@ def _start_query_with_job(
13101311
metrics=None,
13111312
query_with_job=True,
13121313
publisher=self._publisher,
1314+
session=self._session,
13131315
)
13141316
return query_job
13151317

0 commit comments

Comments
 (0)