Skip to content

Commit 1b40a11

Browse files
authored
chore: update comments and tests for label counts. (#1182)
* chore: update comments and tests for label counts. * update docstring * update logic and tests * update to use named param * metric update * update logic * fix * update value and test
1 parent c4bffc3 commit 1b40a11

File tree

7 files changed

+160
-64
lines changed

7 files changed

+160
-64
lines changed

bigframes/session/__init__.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1592,7 +1592,7 @@ def _start_query_ml_ddl(
15921592
job_config.destination_encryption_configuration = None
15931593

15941594
return bf_io_bigquery.start_query_with_client(
1595-
self.bqclient, sql, job_config, metrics=self._metrics
1595+
self.bqclient, sql, job_config=job_config, metrics=self._metrics
15961596
)
15971597

15981598
def _create_object_table(self, path: str, connection: str) -> str:

bigframes/session/_io/bigquery/__init__.py

Lines changed: 35 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -40,7 +40,7 @@
4040

4141

4242
IO_ORDERING_ID = "bqdf_row_nums"
43-
MAX_LABELS_COUNT = 64
43+
MAX_LABELS_COUNT = 64 - 8
4444
_LIST_TABLES_LIMIT = 10000 # calls to bqclient.list_tables
4545
# will be limited to this many tables
4646

@@ -204,7 +204,12 @@ def format_option(key: str, value: Union[bool, str]) -> str:
204204
return f"{key}={repr(value)}"
205205

206206

207-
def add_labels(job_config, api_name: Optional[str] = None):
207+
def add_and_trim_labels(job_config, api_name: Optional[str] = None):
208+
"""
209+
Add additional labels to the job configuration and trim the total number of labels
210+
to ensure they do not exceed the maximum limit allowed by BigQuery, which is 64
211+
labels per job.
212+
"""
208213
api_methods = log_adapter.get_and_reset_api_methods(dry_run=job_config.dry_run)
209214
job_config.labels = create_job_configs_labels(
210215
job_configs_labels=job_config.labels,
@@ -217,18 +222,28 @@ def start_query_with_client(
217222
bq_client: bigquery.Client,
218223
sql: str,
219224
job_config: bigquery.job.QueryJobConfig,
225+
location: Optional[str] = None,
226+
project: Optional[str] = None,
220227
max_results: Optional[int] = None,
228+
page_size: Optional[int] = None,
221229
timeout: Optional[float] = None,
222230
api_name: Optional[str] = None,
223231
metrics: Optional[bigframes.session.metrics.ExecutionMetrics] = None,
224232
) -> Tuple[bigquery.table.RowIterator, bigquery.QueryJob]:
225233
"""
226234
Starts query job and waits for results.
227235
"""
228-
add_labels(job_config, api_name=api_name)
229-
230236
try:
231-
query_job = bq_client.query(sql, job_config=job_config, timeout=timeout)
237+
# Note: Ensure no additional labels are added to job_config after this point,
238+
# as `add_and_trim_labels` ensures the label count does not exceed 64.
239+
add_and_trim_labels(job_config, api_name=api_name)
240+
query_job = bq_client.query(
241+
sql,
242+
job_config=job_config,
243+
location=location,
244+
project=project,
245+
timeout=timeout,
246+
)
232247
except google.api_core.exceptions.Forbidden as ex:
233248
if "Drive credentials" in ex.message:
234249
ex.message += CHECK_DRIVE_PERMISSIONS
@@ -237,10 +252,15 @@ def start_query_with_client(
237252
opts = bigframes.options.display
238253
if opts.progress_bar is not None and not query_job.configuration.dry_run:
239254
results_iterator = formatting_helpers.wait_for_query_job(
240-
query_job, max_results=max_results, progress_bar=opts.progress_bar
255+
query_job,
256+
max_results=max_results,
257+
progress_bar=opts.progress_bar,
258+
page_size=page_size,
241259
)
242260
else:
243-
results_iterator = query_job.result(max_results=max_results)
261+
results_iterator = query_job.result(
262+
max_results=max_results, page_size=page_size
263+
)
244264

245265
if metrics is not None:
246266
metrics.count_job_stats(query_job)
@@ -304,11 +324,15 @@ def create_bq_dataset_reference(
304324
bigquery.DatasetReference: The constructed reference to the anonymous dataset.
305325
"""
306326
job_config = google.cloud.bigquery.QueryJobConfig()
307-
add_labels(job_config, api_name=api_name)
308-
query_job = bq_client.query(
309-
"SELECT 1", location=location, project=project, job_config=job_config
327+
328+
_, query_job = start_query_with_client(
329+
bq_client,
330+
"SELECT 1",
331+
location=location,
332+
job_config=job_config,
333+
project=project,
334+
api_name=api_name,
310335
)
311-
query_job.result() # blocks until finished
312336

313337
# The anonymous dataset is used by BigQuery to write query results and
314338
# session tables. BigQuery DataFrames also writes temp tables directly

bigframes/session/executor.py

Lines changed: 20 additions & 38 deletions
Original file line numberDiff line numberDiff line change
@@ -48,7 +48,6 @@
4848
import bigframes.core.schema
4949
import bigframes.core.tree_properties as tree_properties
5050
import bigframes.features
51-
import bigframes.formatting_helpers as formatting_helpers
5251
import bigframes.session._io.bigquery as bq_io
5352
import bigframes.session.metrics
5453
import bigframes.session.planner
@@ -347,20 +346,22 @@ def export_gcs(
347346
format=format,
348347
export_options=dict(export_options),
349348
)
350-
job_config = bigquery.QueryJobConfig()
351-
bq_io.add_labels(job_config, api_name=f"dataframe-to_{format.lower()}")
352-
export_job = self.bqclient.query(export_data_statement, job_config=job_config)
353-
self._wait_on_job(export_job)
349+
350+
bq_io.start_query_with_client(
351+
self.bqclient,
352+
export_data_statement,
353+
job_config=bigquery.QueryJobConfig(),
354+
api_name=f"dataframe-to_{format.lower()}",
355+
metrics=self.metrics,
356+
)
354357
return query_job
355358

356359
def dry_run(
357360
self, array_value: bigframes.core.ArrayValue, ordered: bool = True
358361
) -> bigquery.QueryJob:
359362
sql = self.to_sql(array_value, ordered=ordered)
360363
job_config = bigquery.QueryJobConfig(dry_run=True)
361-
bq_io.add_labels(job_config)
362364
query_job = self.bqclient.query(sql, job_config=job_config)
363-
_ = query_job.result()
364365
return query_job
365366

366367
def peek(
@@ -487,15 +488,19 @@ def _run_execute_query(
487488
if not self.strictly_ordered:
488489
job_config.labels["bigframes-mode"] = "unordered"
489490

490-
# Note: add_labels is global scope which may have unexpected effects
491-
bq_io.add_labels(job_config, api_name=api_name)
491+
# Note: add_and_trim_labels is global scope which may have unexpected effects
492+
# Ensure no additional labels are added to job_config after this point,
493+
# as `add_and_trim_labels` ensures the label count does not exceed 64.
494+
bq_io.add_and_trim_labels(job_config, api_name=api_name)
492495
try:
493-
query_job = self.bqclient.query(sql, job_config=job_config)
494-
return (
495-
self._wait_on_job(
496-
query_job, max_results=max_results, page_size=page_size
497-
),
498-
query_job,
496+
return bq_io.start_query_with_client(
497+
self.bqclient,
498+
sql,
499+
job_config=job_config,
500+
api_name=api_name,
501+
max_results=max_results,
502+
page_size=page_size,
503+
metrics=self.metrics,
499504
)
500505

501506
except google.api_core.exceptions.BadRequest as e:
@@ -506,29 +511,6 @@ def _run_execute_query(
506511
else:
507512
raise
508513

509-
def _wait_on_job(
510-
self,
511-
query_job: bigquery.QueryJob,
512-
page_size: Optional[int] = None,
513-
max_results: Optional[int] = None,
514-
) -> bq_table.RowIterator:
515-
opts = bigframes.options.display
516-
if opts.progress_bar is not None and not query_job.configuration.dry_run:
517-
results_iterator = formatting_helpers.wait_for_query_job(
518-
query_job,
519-
progress_bar=opts.progress_bar,
520-
max_results=max_results,
521-
page_size=page_size,
522-
)
523-
else:
524-
results_iterator = query_job.result(
525-
max_results=max_results, page_size=page_size
526-
)
527-
528-
if self.metrics is not None:
529-
self.metrics.count_job_stats(query_job)
530-
return results_iterator
531-
532514
def replace_cached_subtrees(self, node: nodes.BigFrameNode) -> nodes.BigFrameNode:
533515
return nodes.top_down(
534516
node, lambda x: self._cached_executions.get(x, x), memoize=True

bigframes/session/loader.py

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -707,9 +707,9 @@ def _start_query(
707707
return bf_io_bigquery.start_query_with_client(
708708
self._bqclient,
709709
sql,
710-
job_config,
711-
max_results,
712-
timeout,
710+
job_config=job_config,
711+
max_results=max_results,
712+
timeout=timeout,
713713
api_name=api_name,
714714
)
715715

tests/system/small/test_session.py

Lines changed: 3 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -574,16 +574,10 @@ def test_read_gbq_with_custom_global_labels(
574574
bigframes.options.compute.extra_query_labels["test3"] = False
575575

576576
query_job = session.read_gbq(scalars_table_id).query_job
577-
job_labels = query_job.labels # type:ignore
578-
expected_labels = {"test1": "1", "test2": "abc", "test3": "false"}
579-
580-
# All jobs should include a bigframes-api key. See internal issue 336521938.
581-
assert "bigframes-api" in job_labels
582-
583-
assert all(
584-
job_labels.get(key) == value for key, value in expected_labels.items()
585-
)
586577

578+
# No real job created from read_gbq, so we should expect 0 labels
579+
assert query_job is not None
580+
assert query_job.labels == {}
587581
# No labels outside of the option_context.
588582
assert len(bigframes.options.compute.extra_query_labels) == 0
589583

tests/unit/session/test_io_bigquery.py

Lines changed: 96 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -26,6 +26,24 @@
2626
from tests.unit import resources
2727

2828

29+
@pytest.fixture(scope="function")
30+
def mock_bq_client(mocker):
31+
mock_client = mocker.Mock(spec=bigquery.Client)
32+
mock_query_job = mocker.Mock(spec=bigquery.QueryJob)
33+
mock_row_iterator = mocker.Mock(spec=bigquery.table.RowIterator)
34+
35+
mock_query_job.result.return_value = mock_row_iterator
36+
37+
mock_destination = bigquery.DatasetReference(
38+
project="mock_project", dataset_id="mock_dataset"
39+
)
40+
mock_query_job.destination = mock_destination
41+
42+
mock_client.query.return_value = mock_query_job
43+
44+
return mock_client
45+
46+
2947
def test_create_job_configs_labels_is_none():
3048
api_methods = ["agg", "series-mode"]
3149
labels = io_bq.create_job_configs_labels(
@@ -124,7 +142,7 @@ def test_create_job_configs_labels_length_limit_met():
124142
"bigframes-api": "read_pandas",
125143
"source": "bigquery-dataframes-temp",
126144
}
127-
for i in range(61):
145+
for i in range(53):
128146
key = f"bigframes-api-test-{i}"
129147
value = f"test{i}"
130148
cur_labels[key] = value
@@ -141,13 +159,89 @@ def test_create_job_configs_labels_length_limit_met():
141159
job_configs_labels=cur_labels, api_methods=api_methods
142160
)
143161
assert labels is not None
144-
assert len(labels) == 64
162+
assert len(labels) == 56
145163
assert "dataframe-max" in labels.values()
146164
assert "dataframe-head" not in labels.values()
147165
assert "bigframes-api" in labels.keys()
148166
assert "source" in labels.keys()
149167

150168

169+
def test_add_and_trim_labels_length_limit_met():
170+
log_adapter.get_and_reset_api_methods()
171+
cur_labels = {
172+
"bigframes-api": "read_pandas",
173+
"source": "bigquery-dataframes-temp",
174+
}
175+
for i in range(10):
176+
key = f"bigframes-api-test-{i}"
177+
value = f"test{i}"
178+
cur_labels[key] = value
179+
180+
df = bpd.DataFrame(
181+
{"col1": [1, 2], "col2": [3, 4]}, session=resources.create_bigquery_session()
182+
)
183+
184+
job_config = bigquery.job.QueryJobConfig()
185+
job_config.labels = cur_labels
186+
187+
df.max()
188+
for _ in range(52):
189+
df.head()
190+
191+
io_bq.add_and_trim_labels(job_config=job_config)
192+
assert job_config.labels is not None
193+
assert len(job_config.labels) == 56
194+
assert "dataframe-max" not in job_config.labels.values()
195+
assert "dataframe-head" in job_config.labels.values()
196+
assert "bigframes-api" in job_config.labels.keys()
197+
assert "source" in job_config.labels.keys()
198+
199+
200+
@pytest.mark.parametrize(
201+
("max_results", "timeout", "api_name"),
202+
[(None, None, None), (100, 30.0, "test_api")],
203+
)
204+
def test_start_query_with_client_labels_length_limit_met(
205+
mock_bq_client, max_results, timeout, api_name
206+
):
207+
sql = "select * from abc"
208+
cur_labels = {
209+
"bigframes-api": "read_pandas",
210+
"source": "bigquery-dataframes-temp",
211+
}
212+
for i in range(10):
213+
key = f"bigframes-api-test-{i}"
214+
value = f"test{i}"
215+
cur_labels[key] = value
216+
217+
df = bpd.DataFrame(
218+
{"col1": [1, 2], "col2": [3, 4]}, session=resources.create_bigquery_session()
219+
)
220+
221+
job_config = bigquery.job.QueryJobConfig()
222+
job_config.labels = cur_labels
223+
224+
df.max()
225+
for _ in range(52):
226+
df.head()
227+
228+
io_bq.start_query_with_client(
229+
mock_bq_client,
230+
sql,
231+
job_config,
232+
max_results=max_results,
233+
timeout=timeout,
234+
api_name=api_name,
235+
)
236+
237+
assert job_config.labels is not None
238+
assert len(job_config.labels) == 56
239+
assert "dataframe-max" not in job_config.labels.values()
240+
assert "dataframe-head" in job_config.labels.values()
241+
assert "bigframes-api" in job_config.labels.keys()
242+
assert "source" in job_config.labels.keys()
243+
244+
151245
def test_create_temp_table_default_expiration():
152246
"""Make sure the created table has an expiration."""
153247
expiration = datetime.datetime(

third_party/bigframes_vendored/pandas/core/frame.py

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -6771,6 +6771,7 @@ def iat(self):
67716771
**Examples:**
67726772
67736773
>>> import bigframes.pandas as bpd
6774+
>>> bpd.options.display.progress_bar = None
67746775
>>> df = bpd.DataFrame([[0, 2, 3], [0, 4, 1], [10, 20, 30]],
67756776
... columns=['A', 'B', 'C'])
67766777
>>> bpd.options.display.progress_bar = None
@@ -6804,6 +6805,7 @@ def at(self):
68046805
**Examples:**
68056806
68066807
>>> import bigframes.pandas as bpd
6808+
>>> bpd.options.display.progress_bar = None
68076809
>>> df = bpd.DataFrame([[0, 2, 3], [0, 4, 1], [10, 20, 30]],
68086810
... index=[4, 5, 6], columns=['A', 'B', 'C'])
68096811
>>> bpd.options.display.progress_bar = None

0 commit comments

Comments
 (0)