Skip to content

Commit 3c4abf2

Browse files
perf: Fall back to ordering by bq pk when possible (#1350)
* perf: Fall back to ordering by bq pk when possible * use pk before index, fix unit test * Apply suggestions from code review * Update bigframes/session/_io/bigquery/read_gbq_table.py * fix null index case --------- Co-authored-by: Tim Sweña (Swast) <[email protected]>
1 parent f433ecf commit 3c4abf2

File tree

4 files changed

+35
-75
lines changed

4 files changed

+35
-75
lines changed

bigframes/session/_io/bigquery/read_gbq_table.py

Lines changed: 16 additions & 61 deletions
Original file line numberDiff line numberDiff line change
@@ -152,24 +152,28 @@ def validate_table(
152152
return False
153153

154154

155-
def are_index_cols_unique(
155+
def infer_unique_columns(
156156
bqclient: bigquery.Client,
157157
table: bigquery.table.Table,
158158
index_cols: List[str],
159159
api_name: str,
160160
metadata_only: bool = False,
161-
) -> bool:
162-
if len(index_cols) == 0:
163-
return False
161+
) -> Tuple[str, ...]:
162+
"""Return a set of columns that can provide a unique row key or empty if none can be inferred.
163+
164+
Note: primary keys are not enforced, but these are assumed to be unique
165+
by the query engine, so we make the same assumption here.
166+
"""
164167
# If index_cols contain the primary_keys, the query engine assumes they are
165168
# provide a unique index.
166-
primary_keys = frozenset(_get_primary_keys(table))
167-
if (len(primary_keys) > 0) and primary_keys <= frozenset(index_cols):
168-
return True
169+
primary_keys = tuple(_get_primary_keys(table))
170+
if (len(primary_keys) > 0) and frozenset(primary_keys) <= frozenset(index_cols):
171+
# Essentially, just reordering the primary key to match the index col order
172+
return tuple(index_col for index_col in index_cols if index_col in primary_keys)
169173

170-
if metadata_only:
174+
if primary_keys or metadata_only or (not index_cols):
171175
# Sometimes not worth scanning data to check uniqueness
172-
return False
176+
return primary_keys
173177
# TODO(b/337925142): Avoid a "SELECT *" subquery here by ensuring
174178
# table_expression only selects just index_cols.
175179
is_unique_sql = bigframes.core.sql.is_distinct_sql(index_cols, table.reference)
@@ -178,7 +182,9 @@ def are_index_cols_unique(
178182
results = bqclient.query_and_wait(is_unique_sql, job_config=job_config)
179183
row = next(iter(results))
180184

181-
return row["total_count"] == row["distinct_count"]
185+
if row["total_count"] == row["distinct_count"]:
186+
return tuple(index_cols)
187+
return ()
182188

183189

184190
def _get_primary_keys(
@@ -279,54 +285,3 @@ def get_index_cols(
279285
index_cols = primary_keys
280286

281287
return index_cols
282-
283-
284-
def get_time_travel_datetime_and_table_metadata(
285-
bqclient: bigquery.Client,
286-
table_ref: bigquery.TableReference,
287-
*,
288-
api_name: str,
289-
cache: Dict[bigquery.TableReference, Tuple[datetime.datetime, bigquery.Table]],
290-
use_cache: bool = True,
291-
) -> Tuple[datetime.datetime, bigquery.Table]:
292-
cached_table = cache.get(table_ref)
293-
if use_cache and cached_table is not None:
294-
snapshot_timestamp, _ = cached_table
295-
296-
# Cache hit could be unexpected. See internal issue 329545805.
297-
# Raise a warning with more information about how to avoid the
298-
# problems with the cache.
299-
msg = (
300-
f"Reading cached table from {snapshot_timestamp} to avoid "
301-
"incompatibilies with previous reads of this table. To read "
302-
"the latest version, set `use_cache=False` or close the "
303-
"current session with Session.close() or "
304-
"bigframes.pandas.close_session()."
305-
)
306-
# There are many layers before we get to (possibly) the user's code:
307-
# pandas.read_gbq_table
308-
# -> with_default_session
309-
# -> Session.read_gbq_table
310-
# -> _read_gbq_table
311-
# -> _get_snapshot_sql_and_primary_key
312-
# -> get_snapshot_datetime_and_table_metadata
313-
warnings.warn(msg, stacklevel=7)
314-
return cached_table
315-
316-
# TODO(swast): It's possible that the table metadata is changed between now
317-
# and when we run the CURRENT_TIMESTAMP() query to see when we can time
318-
# travel to. Find a way to fetch the table metadata and BQ's current time
319-
# atomically.
320-
table = bqclient.get_table(table_ref)
321-
322-
job_config = bigquery.QueryJobConfig()
323-
job_config.labels["bigframes-api"] = api_name
324-
snapshot_timestamp = list(
325-
bqclient.query(
326-
"SELECT CURRENT_TIMESTAMP() AS `current_timestamp`",
327-
job_config=job_config,
328-
).result()
329-
)[0][0]
330-
cached_table = (snapshot_timestamp, table)
331-
cache[table_ref] = cached_table
332-
return cached_table

bigframes/session/loader.py

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -424,7 +424,7 @@ def read_gbq_table(
424424
# in the query that checks for index uniqueness.
425425
# TODO(b/338065601): Provide a way to assume uniqueness and avoid this
426426
# check.
427-
is_index_unique = bf_read_gbq_table.are_index_cols_unique(
427+
primary_key = bf_read_gbq_table.infer_unique_columns(
428428
bqclient=self._bqclient,
429429
table=table,
430430
index_cols=index_cols,
@@ -440,12 +440,12 @@ def read_gbq_table(
440440
schema=schema,
441441
predicate=filter_str,
442442
at_time=time_travel_timestamp if enable_snapshot else None,
443-
primary_key=index_cols if is_index_unique else (),
443+
primary_key=primary_key,
444444
session=self._session,
445445
)
446446
# if we don't have a unique index, we order by row hash if we are in strict mode
447447
if self._force_total_order:
448-
if not is_index_unique:
448+
if not primary_key:
449449
array_value = array_value.order_by(
450450
[
451451
bigframes.core.ordering.OrderingExpression(

tests/unit/session/test_read_gbq_table.py

Lines changed: 13 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -27,23 +27,28 @@
2727
@pytest.mark.parametrize(
2828
("index_cols", "primary_keys", "values_distinct", "expected"),
2929
(
30-
(["col1", "col2"], ["col1", "col2", "col3"], False, False),
31-
(["col1", "col2", "col3"], ["col1", "col2", "col3"], True, True),
30+
(["col1", "col2"], ["col1", "col2", "col3"], False, ("col1", "col2", "col3")),
31+
(
32+
["col1", "col2", "col3"],
33+
["col1", "col2", "col3"],
34+
True,
35+
("col1", "col2", "col3"),
36+
),
3237
(
3338
["col2", "col3", "col1"],
3439
[
3540
"col3",
3641
"col2",
3742
],
3843
True,
39-
True,
44+
("col2", "col3"),
4045
),
41-
(["col1", "col2"], [], False, False),
42-
([], ["col1", "col2", "col3"], False, False),
43-
([], [], False, False),
46+
(["col1", "col2"], [], False, ()),
47+
([], ["col1", "col2", "col3"], False, ("col1", "col2", "col3")),
48+
([], [], False, ()),
4449
),
4550
)
46-
def test_are_index_cols_unique(index_cols, primary_keys, values_distinct, expected):
51+
def test_infer_unique_columns(index_cols, primary_keys, values_distinct, expected):
4752
"""If a primary key is set on the table, we use that as the index column
4853
by default, no error should be raised in this case.
4954
@@ -87,6 +92,6 @@ def test_are_index_cols_unique(index_cols, primary_keys, values_distinct, expect
8792
)
8893
table._properties["location"] = session._location
8994

90-
result = bf_read_gbq_table.are_index_cols_unique(bqclient, table, index_cols, "")
95+
result = bf_read_gbq_table.infer_unique_columns(bqclient, table, index_cols, "")
9196

9297
assert result == expected

tests/unit/session/test_session.py

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -217,10 +217,10 @@ def test_read_gbq_cached_table():
217217
table,
218218
)
219219

220-
session.bqclient.get_table.return_value = table
221-
session.bqclient.query_and_wait.return_value = (
222-
{"total_count": 3, "distinct_count": 2},
220+
session.bqclient.query_and_wait = mock.MagicMock(
221+
return_value=({"total_count": 3, "distinct_count": 2},)
223222
)
223+
session.bqclient.get_table.return_value = table
224224

225225
with pytest.warns(UserWarning, match=re.escape("use_cache=False")):
226226
df = session.read_gbq("my-project.my_dataset.my_table")

0 commit comments

Comments
 (0)