Skip to content

Commit 599f18b

Browse files
authored
chore: create private _read_gbq_colab which will enable partial ordering mode even when session is in strict mode (#1688)
* chore: add private _read_gbq_colab method that uses partial ordering mode, disables progress bars, disables default index, and communicates via callbacks * add colab read gbq * add test for ordering * add ordered argument to to_pandas_batches * add unit test looking for job labels * remove ordered option for to_pandas_batches * ignore type for mock job configs
1 parent 597d817 commit 599f18b

File tree

6 files changed

+135
-4
lines changed

6 files changed

+135
-4
lines changed

bigframes/session/__init__.py

Lines changed: 21 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -476,6 +476,27 @@ def _register_object(
476476
):
477477
self._objects.append(weakref.ref(object))
478478

479+
def _read_gbq_colab(
480+
self,
481+
query: str,
482+
# TODO: Add a callback parameter that takes some kind of Event object.
483+
# TODO: Add parameter for variables for string formatting.
484+
# TODO: Add dry_run parameter.
485+
) -> dataframe.DataFrame:
486+
"""A version of read_gbq that has the necessary default values for use in colab integrations.
487+
488+
This includes, no ordering, no index, no progress bar, always use string
489+
formatting for embedding local variables / dataframes.
490+
"""
491+
492+
# TODO: Allow for a table ID to avoid queries like read_gbq?
493+
return self._loader.read_gbq_query(
494+
query=query,
495+
index_col=bigframes.enums.DefaultIndexKind.NULL,
496+
api_name="read_gbq_colab",
497+
force_total_order=False,
498+
)
499+
479500
@overload
480501
def read_gbq_query( # type: ignore[overload-overlap]
481502
self,

bigframes/session/loader.py

Lines changed: 15 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -373,6 +373,7 @@ def read_gbq_table( # type: ignore[overload-overlap]
373373
filters: third_party_pandas_gbq.FiltersType = ...,
374374
enable_snapshot: bool = ...,
375375
dry_run: Literal[False] = ...,
376+
force_total_order: Optional[bool] = ...,
376377
) -> dataframe.DataFrame:
377378
...
378379

@@ -394,6 +395,7 @@ def read_gbq_table(
394395
filters: third_party_pandas_gbq.FiltersType = ...,
395396
enable_snapshot: bool = ...,
396397
dry_run: Literal[True] = ...,
398+
force_total_order: Optional[bool] = ...,
397399
) -> pandas.Series:
398400
...
399401

@@ -414,6 +416,7 @@ def read_gbq_table(
414416
filters: third_party_pandas_gbq.FiltersType = (),
415417
enable_snapshot: bool = True,
416418
dry_run: bool = False,
419+
force_total_order: Optional[bool] = None,
417420
) -> dataframe.DataFrame | pandas.Series:
418421
import bigframes._tools.strings
419422
import bigframes.dataframe as dataframe
@@ -608,7 +611,14 @@ def read_gbq_table(
608611
session=self._session,
609612
)
610613
# if we don't have a unique index, we order by row hash if we are in strict mode
611-
if self._force_total_order:
614+
if (
615+
# If the user has explicitly selected or disabled total ordering for
616+
# this API call, respect that choice.
617+
(force_total_order is not None and force_total_order)
618+
# If the user has not explicitly selected or disabled total ordering
619+
# for this API call, respect the default choice for the session.
620+
or (force_total_order is None and self._force_total_order)
621+
):
612622
if not primary_key:
613623
array_value = array_value.order_by(
614624
[
@@ -712,6 +722,7 @@ def read_gbq_query( # type: ignore[overload-overlap]
712722
use_cache: Optional[bool] = ...,
713723
filters: third_party_pandas_gbq.FiltersType = ...,
714724
dry_run: Literal[False] = ...,
725+
force_total_order: Optional[bool] = ...,
715726
) -> dataframe.DataFrame:
716727
...
717728

@@ -728,6 +739,7 @@ def read_gbq_query(
728739
use_cache: Optional[bool] = ...,
729740
filters: third_party_pandas_gbq.FiltersType = ...,
730741
dry_run: Literal[True] = ...,
742+
force_total_order: Optional[bool] = ...,
731743
) -> pandas.Series:
732744
...
733745

@@ -743,6 +755,7 @@ def read_gbq_query(
743755
use_cache: Optional[bool] = None,
744756
filters: third_party_pandas_gbq.FiltersType = (),
745757
dry_run: bool = False,
758+
force_total_order: Optional[bool] = None,
746759
) -> dataframe.DataFrame | pandas.Series:
747760
import bigframes.dataframe as dataframe
748761

@@ -833,6 +846,7 @@ def read_gbq_query(
833846
columns=columns,
834847
use_cache=configuration["query"]["useQueryCache"],
835848
api_name=api_name,
849+
force_total_order=force_total_order,
836850
# max_results and filters are omitted because they are already
837851
# handled by to_query(), above.
838852
)

bigframes/testing/mocks.py

Lines changed: 13 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -51,6 +51,9 @@ def create_bigquery_session(
5151
google.auth.credentials.Credentials, instance=True
5252
)
5353

54+
bq_time = datetime.datetime.now()
55+
table_time = bq_time + datetime.timedelta(minutes=1)
56+
5457
if anonymous_dataset is None:
5558
anonymous_dataset = google.cloud.bigquery.DatasetReference(
5659
"test-project",
@@ -65,6 +68,8 @@ def create_bigquery_session(
6568
# Mock the location.
6669
table = mock.create_autospec(google.cloud.bigquery.Table, instance=True)
6770
table._properties = {}
71+
# TODO(tswast): support tables created before and after the session started.
72+
type(table).created = mock.PropertyMock(return_value=table_time)
6873
type(table).location = mock.PropertyMock(return_value=location)
6974
type(table).schema = mock.PropertyMock(return_value=table_schema)
7075
type(table).reference = mock.PropertyMock(
@@ -73,7 +78,10 @@ def create_bigquery_session(
7378
type(table).num_rows = mock.PropertyMock(return_value=1000000000)
7479
bqclient.get_table.return_value = table
7580

76-
def query_mock(query, *args, **kwargs):
81+
job_configs = []
82+
83+
def query_mock(query, *args, job_config=None, **kwargs):
84+
job_configs.append(job_config)
7785
query_job = mock.create_autospec(google.cloud.bigquery.QueryJob)
7886
type(query_job).destination = mock.PropertyMock(
7987
return_value=anonymous_dataset.table("test_table"),
@@ -83,15 +91,16 @@ def query_mock(query, *args, **kwargs):
8391
)
8492

8593
if query.startswith("SELECT CURRENT_TIMESTAMP()"):
86-
query_job.result = mock.MagicMock(return_value=[[datetime.datetime.now()]])
94+
query_job.result = mock.MagicMock(return_value=[[bq_time]])
8795
else:
8896
type(query_job).schema = mock.PropertyMock(return_value=table_schema)
8997

9098
return query_job
9199

92100
existing_query_and_wait = bqclient.query_and_wait
93101

94-
def query_and_wait_mock(query, *args, **kwargs):
102+
def query_and_wait_mock(query, *args, job_config=None, **kwargs):
103+
job_configs.append(job_config)
95104
if query.startswith("SELECT CURRENT_TIMESTAMP()"):
96105
return iter([[datetime.datetime.now()]])
97106
else:
@@ -109,6 +118,7 @@ def query_and_wait_mock(query, *args, **kwargs):
109118
session._bq_connection_manager = mock.create_autospec(
110119
bigframes.clients.BqConnectionManager, instance=True
111120
)
121+
session._job_configs = job_configs # type: ignore
112122
return session
113123

114124

Lines changed: 13 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,13 @@
1+
# Copyright 2025 Google LLC
2+
#
3+
# Licensed under the Apache License, Version 2.0 (the "License");
4+
# you may not use this file except in compliance with the License.
5+
# You may obtain a copy of the License at
6+
#
7+
# http://www.apache.org/licenses/LICENSE-2.0
8+
#
9+
# Unless required by applicable law or agreed to in writing, software
10+
# distributed under the License is distributed on an "AS IS" BASIS,
11+
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12+
# See the License for the specific language governing permissions and
13+
# limitations under the License.
Lines changed: 41 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,41 @@
1+
# Copyright 2025 Google LLC
2+
#
3+
# Licensed under the Apache License, Version 2.0 (the "License");
4+
# you may not use this file except in compliance with the License.
5+
# You may obtain a copy of the License at
6+
#
7+
# http://www.apache.org/licenses/LICENSE-2.0
8+
#
9+
# Unless required by applicable law or agreed to in writing, software
10+
# distributed under the License is distributed on an "AS IS" BASIS,
11+
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12+
# See the License for the specific language governing permissions and
13+
# limitations under the License.
14+
15+
"""System tests for read_gbq_colab helper functions."""
16+
17+
18+
def test_read_gbq_colab_to_pandas_batches_preserves_order_by(maybe_ordered_session):
19+
df = maybe_ordered_session._read_gbq_colab(
20+
"""
21+
SELECT
22+
name,
23+
SUM(number) AS total
24+
FROM
25+
`bigquery-public-data.usa_names.usa_1910_2013`
26+
WHERE state LIKE 'W%'
27+
GROUP BY name
28+
ORDER BY total DESC
29+
LIMIT 300
30+
"""
31+
)
32+
batches = df.to_pandas_batches(
33+
page_size=100,
34+
)
35+
36+
total_rows = 0
37+
for batch in batches:
38+
assert batch["total"].is_monotonic_decreasing
39+
total_rows += len(batch.index)
40+
41+
assert total_rows > 0
Lines changed: 32 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,32 @@
1+
# Copyright 2025 Google LLC
2+
#
3+
# Licensed under the Apache License, Version 2.0 (the "License");
4+
# you may not use this file except in compliance with the License.
5+
# You may obtain a copy of the License at
6+
#
7+
# http://www.apache.org/licenses/LICENSE-2.0
8+
#
9+
# Unless required by applicable law or agreed to in writing, software
10+
# distributed under the License is distributed on an "AS IS" BASIS,
11+
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12+
# See the License for the specific language governing permissions and
13+
# limitations under the License.
14+
15+
"""Unit tests for read_gbq_colab helper functions."""
16+
17+
from bigframes.testing import mocks
18+
19+
20+
def test_read_gbq_colab_includes_label():
21+
"""Make sure we can tell direct colab usage apart from regular read_gbq usage."""
22+
session = mocks.create_bigquery_session()
23+
_ = session._read_gbq_colab("SELECT 'read-gbq-colab-test'")
24+
configs = session._job_configs # type: ignore
25+
26+
label_values = []
27+
for config in configs:
28+
if config is None:
29+
continue
30+
label_values.extend(config.labels.values())
31+
32+
assert "read_gbq_colab" in label_values

0 commit comments

Comments
 (0)