Skip to content

Commit 92fdb93

Browse files
refactor: Extract DataFrame execution to class (#899)
1 parent 1053d56 commit 92fdb93

File tree

16 files changed

+769
-480
lines changed

16 files changed

+769
-480
lines changed

bigframes/core/blocks.py

Lines changed: 11 additions & 22 deletions
Original file line numberDiff line numberDiff line change
@@ -488,12 +488,7 @@ def to_arrow(
488488
list(self.value_columns) + list(self.index_columns)
489489
)
490490

491-
_, query_job = self.session._query_to_destination(
492-
self.session._to_sql(expr, ordered=ordered),
493-
list(self.index_columns),
494-
api_name="cached",
495-
do_clustering=False,
496-
)
491+
_, query_job = self.session._execute(expr, ordered=ordered)
497492
results_iterator = query_job.result()
498493
pa_table = results_iterator.to_arrow()
499494

@@ -582,12 +577,7 @@ def to_pandas_batches(
582577
see https://cloud.google.com/python/docs/reference/bigquery/latest/google.cloud.bigquery.job.QueryJob#google_cloud_bigquery_job_QueryJob_result"""
583578
dtypes = dict(zip(self.index_columns, self.index.dtypes))
584579
dtypes.update(zip(self.value_columns, self.dtypes))
585-
_, query_job = self.session._query_to_destination(
586-
self.session._to_sql(self.expr, ordered=True),
587-
list(self.index_columns),
588-
api_name="cached",
589-
do_clustering=False,
590-
)
580+
_, query_job = self.session._execute(self.expr, ordered=True)
591581
results_iterator = query_job.result(
592582
page_size=page_size, max_results=max_results
593583
)
@@ -617,11 +607,8 @@ def _materialize_local(
617607
) -> Tuple[pd.DataFrame, bigquery.QueryJob]:
618608
"""Run query and download results as a pandas DataFrame. Return the total number of results as well."""
619609
# TODO(swast): Allow for dry run and timeout.
620-
_, query_job = self.session._query_to_destination(
621-
self.session._to_sql(self.expr, ordered=materialize_options.ordered),
622-
list(self.index_columns),
623-
api_name="cached",
624-
do_clustering=False,
610+
_, query_job = self.session._execute(
611+
self.expr, ordered=materialize_options.ordered
625612
)
626613
results_iterator = query_job.result()
627614

@@ -797,8 +784,7 @@ def _compute_dry_run(
797784
self, value_keys: Optional[Iterable[str]] = None
798785
) -> bigquery.QueryJob:
799786
expr = self._apply_value_keys_to_expr(value_keys=value_keys)
800-
job_config = bigquery.QueryJobConfig(dry_run=True)
801-
_, query_job = self.session._execute(expr, job_config=job_config, dry_run=True)
787+
_, query_job = self.session._dry_run(expr)
802788
return query_job
803789

804790
def _apply_value_keys_to_expr(self, value_keys: Optional[Iterable[str]] = None):
@@ -2404,12 +2390,15 @@ def to_sql_query(
24042390
def cached(self, *, force: bool = False, session_aware: bool = False) -> None:
24052391
"""Write the block to a session table."""
24062392
# use a heuristic for whether something needs to be cached
2407-
if (not force) and self.session._is_trivially_executable(self.expr):
2393+
if (not force) and self.session._executor._is_trivially_executable(self.expr):
24082394
return
24092395
elif session_aware:
2410-
self.session._cache_with_session_awareness(self.expr)
2396+
bfet_roots = [obj._block._expr.node for obj in self.session.objects]
2397+
self.session._executor._cache_with_session_awareness(
2398+
self.expr, session_forest=bfet_roots
2399+
)
24112400
else:
2412-
self.session._cache_with_cluster_cols(
2401+
self.session._executor._cache_with_cluster_cols(
24132402
self.expr, cluster_cols=self.index_columns
24142403
)
24152404

bigframes/core/compile/api.py

Lines changed: 8 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -13,7 +13,9 @@
1313
# limitations under the License.
1414
from __future__ import annotations
1515

16-
from typing import Mapping, Tuple, TYPE_CHECKING
16+
from typing import Mapping, Sequence, Tuple, TYPE_CHECKING
17+
18+
import google.cloud.bigquery as bigquery
1719

1820
import bigframes.core.compile.compiler as compiler
1921

@@ -58,11 +60,13 @@ def compile_ordered(
5860
def compile_raw(
5961
self,
6062
node: bigframes.core.nodes.BigFrameNode,
61-
) -> Tuple[str, bigframes.core.ordering.RowOrdering]:
63+
) -> Tuple[
64+
str, Sequence[bigquery.SchemaField], bigframes.core.ordering.RowOrdering
65+
]:
6266
"""Compile node into sql that exposes all columns, including hidden ordering-only columns."""
6367
ir = self._compiler.compile_ordered_ir(node)
64-
sql = ir.raw_sql()
65-
return sql, ir._ordering
68+
sql, schema = ir.raw_sql_and_schema()
69+
return sql, schema, ir._ordering
6670

6771

6872
def test_only_try_evaluate(node: bigframes.core.nodes.BigFrameNode):

bigframes/core/compile/compiled.py

Lines changed: 26 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -20,11 +20,14 @@
2020
from typing import Collection, Literal, Optional, Sequence
2121

2222
import bigframes_vendored.ibis.expr.operations as vendored_ibis_ops
23+
import google.cloud.bigquery
2324
import ibis
2425
import ibis.backends.bigquery as ibis_bigquery
26+
import ibis.backends.bigquery.datatypes
2527
import ibis.common.deferred # type: ignore
2628
import ibis.expr.datatypes as ibis_dtypes
2729
import ibis.expr.operations as ibis_ops
30+
import ibis.expr.schema as ibis_schema
2831
import ibis.expr.types as ibis_types
2932
import pandas
3033

@@ -531,7 +534,8 @@ def __init__(
531534
for column in self._columns
532535
}
533536
self._hidden_ordering_column_names = {
534-
column.get_name(): column for column in self._hidden_ordering_columns
537+
typing.cast(str, column.get_name()): column
538+
for column in self._hidden_ordering_columns
535539
}
536540
### Validation
537541
value_col_ids = self._column_names.keys()
@@ -947,14 +951,28 @@ def to_sql(
947951
)
948952
return typing.cast(str, sql)
949953

950-
def raw_sql(self) -> str:
951-
"""Return sql with all hidden columns. Used to cache with ordering information."""
952-
return ibis_bigquery.Backend().compile(
953-
self._to_ibis_expr(
954-
ordering_mode="unordered",
955-
expose_hidden_cols=True,
956-
)
954+
def raw_sql_and_schema(
955+
self,
956+
) -> typing.Tuple[str, typing.Sequence[google.cloud.bigquery.SchemaField]]:
957+
"""Return sql with all hidden columns. Used to cache with ordering information.
958+
959+
Also returns schema, as the extra ordering columns are determined compile-time.
960+
"""
961+
all_columns = (*self.column_ids, *self._hidden_ordering_column_names.keys())
962+
as_ibis = self._to_ibis_expr(
963+
ordering_mode="unordered",
964+
expose_hidden_cols=True,
965+
).select(all_columns)
966+
967+
# Ibis will produce non-nullable schema types, but bigframes should always be nullable
968+
fixed_ibis_schema = ibis_schema.Schema.from_tuples(
969+
(name, dtype.copy(nullable=True))
970+
for (name, dtype) in as_ibis.schema().items()
971+
)
972+
bq_schema = ibis.backends.bigquery.datatypes.BigQuerySchema.from_ibis(
973+
fixed_ibis_schema
957974
)
975+
return ibis_bigquery.Backend().compile(as_ibis), bq_schema
958976

959977
def _to_ibis_expr(
960978
self,

bigframes/dataframe.py

Lines changed: 21 additions & 30 deletions
Original file line numberDiff line numberDiff line change
@@ -1214,7 +1214,6 @@ def to_arrow(
12141214
category=bigframes.exceptions.PreviewWarning,
12151215
)
12161216

1217-
self._optimize_query_complexity()
12181217
pa_table, query_job = self._block.to_arrow(ordered=ordered)
12191218
self._set_internal_query_job(query_job)
12201219
return pa_table
@@ -1255,7 +1254,6 @@ def to_pandas(
12551254
downsampled rows and all columns of this DataFrame.
12561255
"""
12571256
# TODO(orrbradford): Optimize this in future. Potentially some cases where we can return the stored query job
1258-
self._optimize_query_complexity()
12591257
df, query_job = self._block.to_pandas(
12601258
max_download_size=max_download_size,
12611259
sampling_method=sampling_method,
@@ -1285,7 +1283,6 @@ def to_pandas_batches(
12851283
form the original dataframe. Results stream from bigquery,
12861284
see https://cloud.google.com/python/docs/reference/bigquery/latest/google.cloud.bigquery.table.RowIterator#google_cloud_bigquery_table_RowIterator_to_arrow_iterable
12871285
"""
1288-
self._optimize_query_complexity()
12891286
return self._block.to_pandas_batches(
12901287
page_size=page_size, max_results=max_results
12911288
)
@@ -3046,12 +3043,6 @@ def to_gbq(
30463043
ordering_id: Optional[str] = None,
30473044
clustering_columns: Union[pandas.Index, Iterable[typing.Hashable]] = (),
30483045
) -> str:
3049-
dispositions = {
3050-
"fail": bigquery.WriteDisposition.WRITE_EMPTY,
3051-
"replace": bigquery.WriteDisposition.WRITE_TRUNCATE,
3052-
"append": bigquery.WriteDisposition.WRITE_APPEND,
3053-
}
3054-
30553046
temp_table_ref = None
30563047

30573048
if destination_table is None:
@@ -3063,7 +3054,7 @@ def to_gbq(
30633054
)
30643055
if_exists = "replace"
30653056

3066-
temp_table_ref = self._session._random_table(
3057+
temp_table_ref = self._session._temp_storage_manager._random_table(
30673058
# The client code owns this table reference now, so skip_cleanup=True
30683059
# to not clean it up when we close the session.
30693060
skip_cleanup=True,
@@ -3086,10 +3077,11 @@ def to_gbq(
30863077
if if_exists is None:
30873078
if_exists = "fail"
30883079

3089-
if if_exists not in dispositions:
3080+
valid_if_exists = ["fail", "replace", "append"]
3081+
if if_exists not in valid_if_exists:
30903082
raise ValueError(
30913083
f"Got invalid value {repr(if_exists)} for if_exists. "
3092-
f"Valid options include None or one of {dispositions.keys()}."
3084+
f"Valid options include None or one of {valid_if_exists}."
30933085
)
30943086

30953087
try:
@@ -3101,16 +3093,25 @@ def to_gbq(
31013093
clustering_columns, index=index
31023094
)
31033095

3104-
job_config = bigquery.QueryJobConfig(
3105-
write_disposition=dispositions[if_exists],
3106-
destination=bigquery.table.TableReference.from_string(
3107-
destination_table,
3108-
default_project=default_project,
3109-
),
3110-
clustering_fields=clustering_fields if clustering_fields else None,
3096+
export_array, id_overrides = self._prepare_export(
3097+
index=index and self._has_index, ordering_id=ordering_id
3098+
)
3099+
destination = bigquery.table.TableReference.from_string(
3100+
destination_table,
3101+
default_project=default_project,
3102+
)
3103+
_, query_job = self._session._export(
3104+
export_array,
3105+
destination=destination,
3106+
col_id_overrides=id_overrides,
3107+
cluster_cols=clustering_fields,
3108+
if_exists=if_exists,
31113109
)
3110+
self._set_internal_query_job(query_job)
31123111

3113-
self._run_io_query(index=index, ordering_id=ordering_id, job_config=job_config)
3112+
# The query job should have finished, so there should be always be a result table.
3113+
result_table = query_job.destination
3114+
assert result_table is not None
31143115

31153116
if temp_table_ref:
31163117
bigframes.session._io.bigquery.set_table_expiration(
@@ -3402,19 +3403,16 @@ def _run_io_query(
34023403
self,
34033404
index: bool,
34043405
ordering_id: Optional[str] = None,
3405-
job_config: Optional[bigquery.job.QueryJobConfig] = None,
34063406
) -> bigquery.TableReference:
34073407
"""Executes a query job presenting this dataframe and returns the destination
34083408
table."""
34093409
session = self._block.expr.session
3410-
self._optimize_query_complexity()
34113410
export_array, id_overrides = self._prepare_export(
34123411
index=index and self._has_index, ordering_id=ordering_id
34133412
)
34143413

34153414
_, query_job = session._execute(
34163415
export_array,
3417-
job_config=job_config,
34183416
ordered=False,
34193417
col_id_overrides=id_overrides,
34203418
)
@@ -3669,13 +3667,6 @@ def _cached(self, *, force: bool = False) -> DataFrame:
36693667
self._block.cached(force=force)
36703668
return self
36713669

3672-
def _optimize_query_complexity(self):
3673-
"""Reduce query complexity by caching repeated subtrees and recursively materializing maximum-complexity subtrees.
3674-
May generate many queries and take substantial time to execute.
3675-
"""
3676-
# TODO: Move all this to session
3677-
self._session._simplify_with_caching(self._block.expr)
3678-
36793670
_DataFrameOrSeries = typing.TypeVar("_DataFrameOrSeries")
36803671

36813672
@validations.requires_ordering()

bigframes/series.py

Lines changed: 0 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -188,7 +188,6 @@ def __len__(self):
188188
__len__.__doc__ = inspect.getdoc(vendored_pandas_series.Series.__len__)
189189

190190
def __iter__(self) -> typing.Iterator:
191-
self._optimize_query_complexity()
192191
return itertools.chain.from_iterable(
193192
map(lambda x: x.squeeze(axis=1), self._block.to_pandas_batches())
194193
)
@@ -358,7 +357,6 @@ def to_pandas(
358357
pandas.Series: A pandas Series with all rows of this Series if the data_sampling_threshold_mb
359358
is not exceeded; otherwise, a pandas Series with downsampled rows of the DataFrame.
360359
"""
361-
self._optimize_query_complexity()
362360
df, query_job = self._block.to_pandas(
363361
max_download_size=max_download_size,
364362
sampling_method=sampling_method,
@@ -1892,13 +1890,6 @@ def _cached(self, *, force: bool = True, session_aware: bool = True) -> Series:
18921890
self._block.cached(force=force, session_aware=session_aware)
18931891
return self
18941892

1895-
def _optimize_query_complexity(self):
1896-
"""Reduce query complexity by caching repeated subtrees and recursively materializing maximum-complexity subtrees.
1897-
May generate many queries and take substantial time to execute.
1898-
"""
1899-
# TODO: Move all this to session
1900-
self._block.session._simplify_with_caching(self._block.expr)
1901-
19021893

19031894
def _is_list_like(obj: typing.Any) -> typing_extensions.TypeGuard[typing.Sequence]:
19041895
return pandas.api.types.is_list_like(obj)

0 commit comments

Comments
 (0)