Skip to content

Commit 9a9f2bc

Browse files
authored
refactor: split read_gbq_table implementation into functions and move to separate module (#642)
* refactor: split `read_gbq_table` implementation into functions and move to separate module add todos * refactor progress * add index_cols function * maybe ready for review * Update bigframes/session/__init__.py
1 parent ac8f40c commit 9a9f2bc

File tree

5 files changed

+493
-308
lines changed

5 files changed

+493
-308
lines changed

bigframes/session/__init__.py

Lines changed: 69 additions & 219 deletions
Original file line numberDiff line numberDiff line change
@@ -18,7 +18,6 @@
1818

1919
import copy
2020
import datetime
21-
import itertools
2221
import logging
2322
import os
2423
import re
@@ -43,7 +42,6 @@
4342
# Even though the ibis.backends.bigquery import is unused, it's needed
4443
# to register new and replacement ops with the Ibis BigQuery backend.
4544
import bigframes_vendored.ibis.backends.bigquery # noqa
46-
import bigframes_vendored.ibis.expr.operations as vendored_ibis_ops
4745
import bigframes_vendored.pandas.io.gbq as third_party_pandas_gbq
4846
import bigframes_vendored.pandas.io.parquet as third_party_pandas_parquet
4947
import bigframes_vendored.pandas.io.parsers.readers as third_party_pandas_readers
@@ -62,7 +60,6 @@
6260
import google.cloud.storage as storage # type: ignore
6361
import ibis
6462
import ibis.backends.bigquery as ibis_bigquery
65-
import ibis.expr.datatypes as ibis_dtypes
6663
import ibis.expr.types as ibis_types
6764
import numpy as np
6865
import pandas
@@ -80,7 +77,6 @@
8077
import bigframes.core as core
8178
import bigframes.core.blocks as blocks
8279
import bigframes.core.compile
83-
import bigframes.core.guid as guid
8480
import bigframes.core.nodes as nodes
8581
from bigframes.core.ordering import IntegerEncoding
8682
import bigframes.core.ordering as order
@@ -92,6 +88,7 @@
9288
from bigframes.functions.remote_function import read_gbq_function as bigframes_rgf
9389
from bigframes.functions.remote_function import remote_function as bigframes_rf
9490
import bigframes.session._io.bigquery as bigframes_io
91+
import bigframes.session._io.bigquery.read_gbq_table as bf_read_gbq_table
9592
import bigframes.session.clients
9693
import bigframes.version
9794

@@ -692,59 +689,6 @@ def read_gbq_table(
692689
use_cache=use_cache,
693690
)
694691

695-
def _get_snapshot_sql_and_primary_key(
696-
self,
697-
table: google.cloud.bigquery.table.Table,
698-
*,
699-
api_name: str,
700-
use_cache: bool = True,
701-
) -> Tuple[ibis_types.Table, Optional[Sequence[str]]]:
702-
"""Create a read-only Ibis table expression representing a table.
703-
704-
If we can get a total ordering from the table, such as via primary key
705-
column(s), then return those too so that ordering generation can be
706-
avoided.
707-
"""
708-
(
709-
snapshot_timestamp,
710-
table,
711-
) = bigframes_io.get_snapshot_datetime_and_table_metadata(
712-
self.bqclient,
713-
table_ref=table.reference,
714-
api_name=api_name,
715-
cache=self._df_snapshot,
716-
use_cache=use_cache,
717-
)
718-
719-
if table.location.casefold() != self._location.casefold():
720-
raise ValueError(
721-
f"Current session is in {self._location} but dataset '{table.project}.{table.dataset_id}' is located in {table.location}"
722-
)
723-
724-
# If there are primary keys defined, the query engine assumes these
725-
# columns are unique, even if the constraint is not enforced. We make
726-
# the same assumption and use these columns as the total ordering keys.
727-
primary_keys = None
728-
if (
729-
(table_constraints := getattr(table, "table_constraints", None)) is not None
730-
and (primary_key := table_constraints.primary_key) is not None
731-
# This will be False for either None or empty list.
732-
# We want primary_keys = None if no primary keys are set.
733-
and (columns := primary_key.columns)
734-
):
735-
primary_keys = columns
736-
737-
try:
738-
table_expression = self.ibis_client.sql(
739-
bigframes_io.create_snapshot_sql(table.reference, snapshot_timestamp)
740-
)
741-
except google.api_core.exceptions.Forbidden as ex:
742-
if "Drive credentials" in ex.message:
743-
ex.message += "\nCheck https://cloud.google.com/bigquery/docs/query-drive-data#Google_Drive_permissions."
744-
raise
745-
746-
return table_expression, primary_keys
747-
748692
def _read_gbq_table(
749693
self,
750694
query: str,
@@ -757,95 +701,104 @@ def _read_gbq_table(
757701
) -> dataframe.DataFrame:
758702
import bigframes.dataframe as dataframe
759703

704+
# ---------------------------------
705+
# Validate and transform parameters
706+
# ---------------------------------
707+
760708
if max_results and max_results <= 0:
761-
raise ValueError("`max_results` should be a positive number.")
709+
raise ValueError(
710+
f"`max_results` should be a positive number, got {max_results}."
711+
)
762712

763713
table_ref = bigquery.table.TableReference.from_string(
764714
query, default_project=self.bqclient.project
765715
)
766716

767-
table = self.bqclient.get_table(table_ref)
768-
(table_expression, primary_keys,) = self._get_snapshot_sql_and_primary_key(
769-
table, api_name=api_name, use_cache=use_cache
717+
# ---------------------------------
718+
# Fetch table metadata and validate
719+
# ---------------------------------
720+
721+
(time_travel_timestamp, table,) = bf_read_gbq_table.get_table_metadata(
722+
self.bqclient,
723+
table_ref=table_ref,
724+
api_name=api_name,
725+
cache=self._df_snapshot,
726+
use_cache=use_cache,
770727
)
771-
total_ordering_cols = primary_keys
772728

773-
if not index_col and primary_keys is not None:
774-
index_col = primary_keys
729+
if table.location.casefold() != self._location.casefold():
730+
raise ValueError(
731+
f"Current session is in {self._location} but dataset '{table.project}.{table.dataset_id}' is located in {table.location}"
732+
)
733+
734+
# -----------------------------------------
735+
# Create Ibis table expression and validate
736+
# -----------------------------------------
737+
738+
# Use a time travel to make sure the DataFrame is deterministic, even
739+
# if the underlying table changes.
740+
table_expression = bf_read_gbq_table.get_ibis_time_travel_table(
741+
self.ibis_client,
742+
table_ref,
743+
time_travel_timestamp,
744+
)
775745

776746
for key in columns:
777747
if key not in table_expression.columns:
778748
raise ValueError(
779749
f"Column '{key}' of `columns` not found in this table."
780750
)
781751

782-
if isinstance(index_col, str):
783-
index_cols: List[str] = [index_col]
784-
else:
785-
index_cols = list(index_col)
752+
# ---------------------------------------
753+
# Create a non-default index and validate
754+
# ---------------------------------------
755+
756+
# TODO(b/337925142): Move index_cols creation to before we create the
757+
# Ibis table expression so we don't have a "SELECT *" subquery in the
758+
# query that checks for index uniqueness.
759+
760+
index_cols, is_index_unique = bf_read_gbq_table.get_index_cols_and_uniqueness(
761+
bqclient=self.bqclient,
762+
ibis_client=self.ibis_client,
763+
table=table,
764+
table_expression=table_expression,
765+
index_col=index_col,
766+
api_name=api_name,
767+
)
786768

787769
for key in index_cols:
788770
if key not in table_expression.columns:
789771
raise ValueError(
790772
f"Column `{key}` of `index_col` not found in this table."
791773
)
792774

775+
# TODO(b/337925142): We should push down column filters when we get the time
776+
# travel table to avoid "SELECT *" subqueries.
793777
if columns:
794778
table_expression = table_expression.select([*index_cols, *columns])
795779

796-
# If the index is unique and sortable, then we don't need to generate
797-
# an ordering column.
798-
ordering = None
799-
if total_ordering_cols is not None:
800-
# Note: currently, a table has a total ordering only when the
801-
# primary key(s) are set on a table. The query engine assumes such
802-
# columns are unique, even if not enforced.
803-
ordering = order.ExpressionOrdering(
804-
ordering_value_columns=tuple(
805-
order.ascending_over(column_id) for column_id in total_ordering_cols
806-
),
807-
total_ordering_columns=frozenset(total_ordering_cols),
808-
)
809-
column_values = [table_expression[col] for col in table_expression.columns]
810-
array_value = core.ArrayValue.from_ibis(
811-
self,
812-
table_expression,
813-
columns=column_values,
814-
hidden_ordering_columns=[],
815-
ordering=ordering,
816-
)
780+
# ----------------------------
781+
# Create ordering and validate
782+
# ----------------------------
817783

818-
elif len(index_cols) != 0:
819-
# We have index columns, lets see if those are actually total_order_columns
820-
ordering = order.ExpressionOrdering(
821-
ordering_value_columns=tuple(
822-
[order.ascending_over(column_id) for column_id in index_cols]
823-
),
824-
total_ordering_columns=frozenset(index_cols),
825-
)
826-
is_total_ordering = self._check_index_uniqueness(
827-
table_expression, index_cols
784+
if is_index_unique:
785+
array_value = bf_read_gbq_table.to_array_value_with_total_ordering(
786+
session=self,
787+
table_expression=table_expression,
788+
total_ordering_cols=index_cols,
828789
)
829-
if is_total_ordering:
830-
column_values = [
831-
table_expression[col] for col in table_expression.columns
832-
]
833-
array_value = core.ArrayValue.from_ibis(
834-
self,
835-
table_expression,
836-
columns=column_values,
837-
hidden_ordering_columns=[],
838-
ordering=ordering,
839-
)
840-
else:
841-
array_value = self._create_total_ordering(
842-
table_expression, table_rows=table.num_rows
843-
)
844790
else:
845-
array_value = self._create_total_ordering(
846-
table_expression, table_rows=table.num_rows
791+
# Note: Even though we're adding a default ordering here, that's
792+
# just so we have a deterministic total ordering. If the user
793+
# specified a non-unique index, we still sort by that later.
794+
array_value = bf_read_gbq_table.to_array_value_with_default_ordering(
795+
session=self, table=table_expression, table_rows=table.num_rows
847796
)
848797

798+
# ----------------------------------------------------
799+
# Create Block & default index if len(index_cols) == 0
800+
# ----------------------------------------------------
801+
849802
value_columns = [col for col in array_value.column_ids if col not in index_cols]
850803
block = blocks.Block(
851804
array_value,
@@ -862,27 +815,6 @@ def _read_gbq_table(
862815
df.sort_index()
863816
return df
864817

865-
def _check_index_uniqueness(
866-
self, table: ibis_types.Table, index_cols: List[str]
867-
) -> bool:
868-
distinct_table = table.select(*index_cols).distinct()
869-
is_unique_sql = f"""WITH full_table AS (
870-
{self.ibis_client.compile(table)}
871-
),
872-
distinct_table AS (
873-
{self.ibis_client.compile(distinct_table)}
874-
)
875-
876-
SELECT (SELECT COUNT(*) FROM full_table) AS `total_count`,
877-
(SELECT COUNT(*) FROM distinct_table) AS `distinct_count`
878-
"""
879-
results, _ = self._start_query(is_unique_sql)
880-
row = next(iter(results))
881-
882-
total_count = row["total_count"]
883-
distinct_count = row["distinct_count"]
884-
return total_count == distinct_count
885-
886818
def _read_bigquery_load_job(
887819
self,
888820
filepath_or_buffer: str | IO["bytes"],
@@ -1462,66 +1394,6 @@ def _create_empty_temp_table(
14621394
)
14631395
return bigquery.TableReference.from_string(table)
14641396

1465-
def _create_total_ordering(
1466-
self,
1467-
table: ibis_types.Table,
1468-
table_rows: Optional[int],
1469-
) -> core.ArrayValue:
1470-
# Since this might also be used as the index, don't use the default
1471-
# "ordering ID" name.
1472-
1473-
# For small tables, 64 bits is enough to avoid collisions, 128 bits will never ever collide no matter what
1474-
# Assume table is large if table row count is unknown
1475-
use_double_hash = (
1476-
(table_rows is None) or (table_rows == 0) or (table_rows > 100000)
1477-
)
1478-
1479-
ordering_hash_part = guid.generate_guid("bigframes_ordering_")
1480-
ordering_hash_part2 = guid.generate_guid("bigframes_ordering_")
1481-
ordering_rand_part = guid.generate_guid("bigframes_ordering_")
1482-
1483-
# All inputs into hash must be non-null or resulting hash will be null
1484-
str_values = list(
1485-
map(lambda col: _convert_to_nonnull_string(table[col]), table.columns)
1486-
)
1487-
full_row_str = (
1488-
str_values[0].concat(*str_values[1:])
1489-
if len(str_values) > 1
1490-
else str_values[0]
1491-
)
1492-
full_row_hash = full_row_str.hash().name(ordering_hash_part)
1493-
# By modifying value slightly, we get another hash uncorrelated with the first
1494-
full_row_hash_p2 = (full_row_str + "_").hash().name(ordering_hash_part2)
1495-
# Used to disambiguate between identical rows (which will have identical hash)
1496-
random_value = ibis.random().name(ordering_rand_part)
1497-
1498-
order_values = (
1499-
[full_row_hash, full_row_hash_p2, random_value]
1500-
if use_double_hash
1501-
else [full_row_hash, random_value]
1502-
)
1503-
1504-
original_column_ids = table.columns
1505-
table_with_ordering = table.select(
1506-
itertools.chain(original_column_ids, order_values)
1507-
)
1508-
1509-
ordering = order.ExpressionOrdering(
1510-
ordering_value_columns=tuple(
1511-
order.ascending_over(col.get_name()) for col in order_values
1512-
),
1513-
total_ordering_columns=frozenset(col.get_name() for col in order_values),
1514-
)
1515-
columns = [table_with_ordering[col] for col in original_column_ids]
1516-
hidden_columns = [table_with_ordering[col.get_name()] for col in order_values]
1517-
return core.ArrayValue.from_ibis(
1518-
self,
1519-
table_with_ordering,
1520-
columns,
1521-
hidden_ordering_columns=hidden_columns,
1522-
ordering=ordering,
1523-
)
1524-
15251397
def _ibis_to_temp_table(
15261398
self,
15271399
table: ibis_types.Table,
@@ -2056,28 +1928,6 @@ def _can_cluster_bq(field: bigquery.SchemaField):
20561928
)
20571929

20581930

2059-
def _convert_to_nonnull_string(column: ibis_types.Column) -> ibis_types.StringValue:
2060-
col_type = column.type()
2061-
if (
2062-
col_type.is_numeric()
2063-
or col_type.is_boolean()
2064-
or col_type.is_binary()
2065-
or col_type.is_temporal()
2066-
):
2067-
result = column.cast(ibis_dtypes.String(nullable=True))
2068-
elif col_type.is_geospatial():
2069-
result = typing.cast(ibis_types.GeoSpatialColumn, column).as_text()
2070-
elif col_type.is_string():
2071-
result = column
2072-
else:
2073-
# TO_JSON_STRING works with all data types, but isn't the most efficient
2074-
# Needed for JSON, STRUCT and ARRAY datatypes
2075-
result = vendored_ibis_ops.ToJsonString(column).to_expr() # type: ignore
2076-
# Escape backslashes and use backslash as delineator
2077-
escaped = typing.cast(ibis_types.StringColumn, result.fillna("")).replace("\\", "\\\\") # type: ignore
2078-
return typing.cast(ibis_types.StringColumn, ibis.literal("\\")).concat(escaped)
2079-
2080-
20811931
def _transform_read_gbq_configuration(configuration: Optional[dict]) -> dict:
20821932
"""
20831933
For backwards-compatibility, convert any previously client-side only

0 commit comments

Comments
 (0)