Skip to content

Commit 2de4129

Browse files
refactor: Reduce compile interface to returning just SQL text (#786)
1 parent 324d93c commit 2de4129

File tree

7 files changed

+137
-238
lines changed

7 files changed

+137
-238
lines changed

bigframes/core/__init__.py

Lines changed: 3 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -27,7 +27,7 @@
2727
import pyarrow as pa
2828
import pyarrow.feather as pa_feather
2929

30-
import bigframes.core.compile as compiling
30+
import bigframes.core.compile
3131
import bigframes.core.expression as ex
3232
import bigframes.core.guid
3333
import bigframes.core.join_def as join_def
@@ -142,12 +142,7 @@ def schema(self) -> schemata.ArraySchema:
142142

143143
@functools.cached_property
144144
def _compiled_schema(self) -> schemata.ArraySchema:
145-
compiled = self._compile_unordered()
146-
items = tuple(
147-
schemata.SchemaItem(id, compiled.get_column_type(id))
148-
for id in compiled.column_ids
149-
)
150-
return schemata.ArraySchema(items)
145+
return bigframes.core.compile.test_only_ibis_inferred_schema(self.node)
151146

152147
def as_cached(
153148
self: ArrayValue,
@@ -169,21 +164,11 @@ def as_cached(
169164

170165
def _try_evaluate_local(self):
171166
"""Use only for unit testing paths - not fully featured. Will throw exception if fails."""
172-
import ibis
173-
174-
return ibis.pandas.connect({}).execute(
175-
self._compile_ordered()._to_ibis_expr(ordering_mode="unordered")
176-
)
167+
return bigframes.core.compile.test_only_try_evaluate(self.node)
177168

178169
def get_column_type(self, key: str) -> bigframes.dtypes.Dtype:
179170
return self.schema.get_type(key)
180171

181-
def _compile_ordered(self) -> compiling.OrderedIR:
182-
return compiling.compile_ordered_ir(self.node)
183-
184-
def _compile_unordered(self) -> compiling.UnorderedIR:
185-
return compiling.compile_unordered_ir(self.node)
186-
187172
def row_count(self) -> ArrayValue:
188173
"""Get number of rows in ArrayValue as a single-entry ArrayValue."""
189174
return ArrayValue(nodes.RowCountNode(child=self.node))

bigframes/core/compile/__init__.py

Lines changed: 15 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -11,13 +11,22 @@
1111
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
1212
# See the License for the specific language governing permissions and
1313
# limitations under the License.
14+
from __future__ import annotations
1415

15-
from bigframes.core.compile.compiled import OrderedIR, UnorderedIR
16-
from bigframes.core.compile.compiler import compile_ordered_ir, compile_unordered_ir
16+
from bigframes.core.compile.api import (
17+
compile_ordered,
18+
compile_peek,
19+
compile_raw,
20+
compile_unordered,
21+
test_only_ibis_inferred_schema,
22+
test_only_try_evaluate,
23+
)
1724

1825
__all__ = [
19-
"compile_ordered_ir",
20-
"compile_unordered_ir",
21-
"OrderedIR",
22-
"UnorderedIR",
26+
"compile_peek",
27+
"compile_unordered",
28+
"compile_ordered",
29+
"compile_raw",
30+
"test_only_try_evaluate",
31+
"test_only_ibis_inferred_schema",
2332
]

bigframes/core/compile/api.py

Lines changed: 72 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,72 @@
1+
# Copyright 2024 Google LLC
2+
#
3+
# Licensed under the Apache License, Version 2.0 (the "License");
4+
# you may not use this file except in compliance with the License.
5+
# You may obtain a copy of the License at
6+
#
7+
# http://www.apache.org/licenses/LICENSE-2.0
8+
#
9+
# Unless required by applicable law or agreed to in writing, software
10+
# distributed under the License is distributed on an "AS IS" BASIS,
11+
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12+
# See the License for the specific language governing permissions and
13+
# limitations under the License.
14+
from __future__ import annotations
15+
16+
from typing import Mapping, Tuple, TYPE_CHECKING
17+
18+
import bigframes.core.compile.compiler as compiler
19+
20+
if TYPE_CHECKING:
21+
import bigframes.core.nodes
22+
import bigframes.core.ordering
23+
import bigframes.core.schema
24+
25+
26+
def compile_peek(node: bigframes.core.nodes.BigFrameNode, n_rows: int) -> str:
27+
"""Compile node into sql that selects N arbitrary rows, may not execute deterministically."""
28+
return compiler.compile_unordered_ir(node).peek_sql(n_rows)
29+
30+
31+
def compile_unordered(
32+
node: bigframes.core.nodes.BigFrameNode, *, col_id_overrides: Mapping[str, str] = {}
33+
) -> str:
34+
"""Compile node into sql where rows are unsorted, and no ordering information is preserved."""
35+
return compiler.compile_unordered_ir(node).to_sql(col_id_overrides=col_id_overrides)
36+
37+
38+
def compile_ordered(
39+
node: bigframes.core.nodes.BigFrameNode, *, col_id_overrides: Mapping[str, str] = {}
40+
) -> str:
41+
"""Compile node into sql where rows are sorted with ORDER BY."""
42+
return compiler.compile_ordered_ir(node).to_sql(
43+
col_id_overrides=col_id_overrides, sorted=True
44+
)
45+
46+
47+
def compile_raw(
48+
node: bigframes.core.nodes.BigFrameNode,
49+
) -> Tuple[str, bigframes.core.ordering.ExpressionOrdering]:
50+
"""Compile node into sql that exposes all columns, including hidden ordering-only columns."""
51+
ir = compiler.compile_ordered_ir(node)
52+
sql = ir.raw_sql()
53+
ordering_info = ir._ordering
54+
return sql, ordering_info
55+
56+
57+
def test_only_try_evaluate(node: bigframes.core.nodes.BigFrameNode):
58+
"""Use only for unit testing paths - not fully featured. Will throw exception if fails."""
59+
ibis = compiler.compile_ordered_ir(node)._to_ibis_expr(ordering_mode="unordered")
60+
return ibis.pandas.connect({}).execute(ibis)
61+
62+
63+
def test_only_ibis_inferred_schema(node: bigframes.core.nodes.BigFrameNode):
64+
"""Use only for testing paths to ensure ibis inferred schema does not diverge from bigframes inferred schema."""
65+
import bigframes.core.schema
66+
67+
compiled = compiler.compile_unordered_ir(node)
68+
items = tuple(
69+
bigframes.core.schema.SchemaItem(id, compiled.get_column_type(id))
70+
for id in compiled.column_ids
71+
)
72+
return bigframes.core.schema.ArraySchema(items)

bigframes/core/compile/compiled.py

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -923,6 +923,15 @@ def to_sql(
923923
)
924924
return typing.cast(str, sql)
925925

926+
def raw_sql(self) -> str:
927+
"""Return sql with all hidden columns. Used to cache with ordering information."""
928+
return ibis_bigquery.Backend().compile(
929+
self._to_ibis_expr(
930+
ordering_mode="unordered",
931+
expose_hidden_cols=True,
932+
)
933+
)
934+
926935
def _to_ibis_expr(
927936
self,
928937
*,

bigframes/core/nodes.py

Lines changed: 13 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -375,6 +375,19 @@ class CachedTableNode(BigFrameNode):
375375

376376
ordering: typing.Optional[orderings.ExpressionOrdering] = field()
377377

378+
def __post_init__(self):
379+
# enforce invariants
380+
physical_names = set(map(lambda i: i.name, self.physical_schema))
381+
logical_names = self.original_node.schema.names
382+
if not set(logical_names).issubset(physical_names):
383+
raise ValueError(
384+
f"Requested schema {logical_names} cannot be derived from table schema {self.physical_schema}"
385+
)
386+
if not set(self.hidden_columns).issubset(physical_names):
387+
raise ValueError(
388+
f"Requested hidden columns {self.hidden_columns} cannot be derived from table schema {self.physical_schema}"
389+
)
390+
378391
@property
379392
def session(self):
380393
return self.original_node.session

bigframes/session/__init__.py

Lines changed: 25 additions & 36 deletions
Original file line numberDiff line numberDiff line change
@@ -64,7 +64,6 @@
6464
import google.cloud.storage as storage # type: ignore
6565
import ibis
6666
import ibis.backends.bigquery as ibis_bigquery
67-
import ibis.expr.types as ibis_types
6867
import jellyfish
6968
import numpy as np
7069
import pandas
@@ -248,6 +247,8 @@ def __init__(
248247
# the ibis client has been created
249248
original_default_query_job_config = self.bqclient.default_query_job_config
250249

250+
# Only used to fetch remote function metadata.
251+
# TODO: Remove in favor of raw bq client
251252
self.ibis_client = typing.cast(
252253
ibis_bigquery.Backend,
253254
ibis.bigquery.connect(
@@ -1497,14 +1498,14 @@ def _create_empty_temp_table(
14971498
)
14981499
return bigquery.TableReference.from_string(table)
14991500

1500-
def _ibis_to_temp_table(
1501+
def _sql_to_temp_table(
15011502
self,
1502-
table: ibis_types.Table,
1503+
sql: str,
15031504
cluster_cols: Iterable[str],
15041505
api_name: str,
15051506
) -> bigquery.TableReference:
15061507
destination, _ = self._query_to_destination(
1507-
self.ibis_client.compile(table),
1508+
sql,
15081509
index_cols=list(cluster_cols),
15091510
api_name=api_name,
15101511
)
@@ -1847,17 +1848,15 @@ def _cache_with_cluster_cols(
18471848
# TODO: May want to support some partial ordering info even for non-strict ordering mode
18481849
keep_order_info = self._strictly_ordered
18491850

1850-
compiled_value = self._compile_ordered(array_value)
1851-
1852-
ibis_expr = compiled_value._to_ibis_expr(
1853-
ordering_mode="unordered", expose_hidden_cols=keep_order_info
1851+
sql, ordering_info = bigframes.core.compile.compile_raw(
1852+
self._with_cached_executions(array_value.node)
18541853
)
1855-
tmp_table = self._ibis_to_temp_table(
1856-
ibis_expr, cluster_cols=cluster_cols, api_name="cached"
1854+
tmp_table = self._sql_to_temp_table(
1855+
sql, cluster_cols=cluster_cols, api_name="cached"
18571856
)
18581857
cached_replacement = array_value.as_cached(
18591858
cache_table=self.bqclient.get_table(tmp_table),
1860-
ordering=compiled_value._ordering if keep_order_info else None,
1859+
ordering=ordering_info if keep_order_info else None,
18611860
).node
18621861
self._cached_executions[array_value.node] = cached_replacement
18631862

@@ -1869,13 +1868,14 @@ def _cache_with_offsets(self, array_value: core.ArrayValue):
18691868
raise ValueError(
18701869
"Caching with offsets only supported in strictly ordered mode."
18711870
)
1872-
compiled_value = self._compile_ordered(array_value)
1873-
1874-
ibis_expr = compiled_value._to_ibis_expr(
1875-
ordering_mode="offset_col", order_col_name="bigframes_offsets"
1871+
sql = bigframes.core.compile.compile_unordered(
1872+
self._with_cached_executions(
1873+
array_value.promote_offsets("bigframes_offsets").node
1874+
)
18761875
)
1877-
tmp_table = self._ibis_to_temp_table(
1878-
ibis_expr, cluster_cols=["bigframes_offsets"], api_name="cached"
1876+
1877+
tmp_table = self._sql_to_temp_table(
1878+
sql, cluster_cols=["bigframes_offsets"], api_name="cached"
18791879
)
18801880
cached_replacement = array_value.as_cached(
18811881
cache_table=self.bqclient.get_table(tmp_table),
@@ -1962,7 +1962,9 @@ def _peek(
19621962
"""A 'peek' efficiently accesses a small number of rows in the dataframe."""
19631963
if not tree_properties.peekable(self._with_cached_executions(array_value.node)):
19641964
warnings.warn("Peeking this value cannot be done efficiently.")
1965-
sql = self._compile_unordered(array_value).peek_sql(n_rows)
1965+
sql = bigframes.core.compile.compile_peek(
1966+
self._with_cached_executions(array_value.node), n_rows
1967+
)
19661968

19671969
# TODO(swast): plumb through the api_name of the user-facing api that
19681970
# caused this query.
@@ -1979,26 +1981,13 @@ def _to_sql(
19791981
) -> str:
19801982
if offset_column:
19811983
array_value = array_value.promote_offsets(offset_column)
1984+
node_w_cached = self._with_cached_executions(array_value.node)
19821985
if sorted:
1983-
return self._compile_ordered(array_value).to_sql(
1984-
col_id_overrides=col_id_overrides, sorted=True
1986+
return bigframes.core.compile.compile_ordered(
1987+
node_w_cached, col_id_overrides=col_id_overrides
19851988
)
1986-
return self._compile_unordered(array_value).to_sql(
1987-
col_id_overrides=col_id_overrides
1988-
)
1989-
1990-
def _compile_ordered(
1991-
self, array_value: core.ArrayValue
1992-
) -> bigframes.core.compile.OrderedIR:
1993-
return bigframes.core.compile.compile_ordered_ir(
1994-
self._with_cached_executions(array_value.node)
1995-
)
1996-
1997-
def _compile_unordered(
1998-
self, array_value: core.ArrayValue
1999-
) -> bigframes.core.compile.UnorderedIR:
2000-
return bigframes.core.compile.compile_unordered_ir(
2001-
self._with_cached_executions(array_value.node)
1989+
return bigframes.core.compile.compile_unordered(
1990+
node_w_cached, col_id_overrides=col_id_overrides
20021991
)
20031992

20041993
def _get_table_size(self, destination_table):

0 commit comments

Comments
 (0)