Skip to content

Commit 7e8296d

Browse files
refactor: caching and aggregation don't require ordering (#759)
1 parent 35fc7e4 commit 7e8296d

File tree

8 files changed

+102
-16
lines changed

8 files changed

+102
-16
lines changed

bigframes/core/__init__.py

Lines changed: 12 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -177,7 +177,7 @@ def _compiled_schema(self) -> schemata.ArraySchema:
177177
def as_cached(
178178
self: ArrayValue,
179179
cache_table: google.cloud.bigquery.Table,
180-
ordering: orderings.ExpressionOrdering,
180+
ordering: Optional[orderings.ExpressionOrdering],
181181
) -> ArrayValue:
182182
"""
183183
Replace the node with an equivalent one that references a tabel where the value has been materialized to.
@@ -234,6 +234,8 @@ def promote_offsets(self, col_id: str) -> ArrayValue:
234234
"""
235235
Convenience function to promote copy of column offsets to a value column. Can be used to reset index.
236236
"""
237+
if not self.session._strictly_ordered:
238+
raise ValueError("Generating offsets not supported in unordered mode")
237239
return ArrayValue(nodes.PromoteOffsetsNode(child=self.node, col_id=col_id))
238240

239241
def concat(self, other: typing.Sequence[ArrayValue]) -> ArrayValue:
@@ -382,6 +384,10 @@ def project_window_op(
382384
never_skip_nulls: will disable null skipping for operators that would otherwise do so
383385
skip_reproject_unsafe: skips the reprojection step, can be used when performing many non-dependent window operations, user responsible for not nesting window expressions, or using outputs as join, filter or aggregation keys before a reprojection
384386
"""
387+
if not self.session._strictly_ordered:
388+
# TODO: Support unbounded windows with aggregate ops and some row-order-independent analytic ops
389+
# TODO: Support non-deterministic windowing
390+
raise ValueError("Windowed ops not supported in unordered mode")
385391
return ArrayValue(
386392
nodes.WindowOpNode(
387393
child=self.node,
@@ -433,8 +439,9 @@ def unpivot(
433439
"""
434440
# There will be N labels, used to disambiguate which of N source columns produced each output row
435441
explode_offsets_id = bigframes.core.guid.generate_guid("unpivot_offsets_")
436-
labels_array = self._create_unpivot_labels_array(row_labels, index_col_ids)
437-
labels_array = labels_array.promote_offsets(explode_offsets_id)
442+
labels_array = self._create_unpivot_labels_array(
443+
row_labels, index_col_ids, explode_offsets_id
444+
)
438445

439446
# Unpivot creates N output rows for each input row, labels disambiguate these N rows
440447
joined_array = self._cross_join_w_labels(labels_array, join_side)
@@ -500,6 +507,7 @@ def _create_unpivot_labels_array(
500507
self,
501508
former_column_labels: typing.Sequence[typing.Hashable],
502509
col_ids: typing.Sequence[str],
510+
offsets_id: str,
503511
) -> ArrayValue:
504512
"""Create an ArrayValue from a list of label tuples."""
505513
rows = []
@@ -510,6 +518,7 @@ def _create_unpivot_labels_array(
510518
col_ids[i]: (row_label[i] if pandas.notnull(row_label[i]) else None)
511519
for i in range(len(col_ids))
512520
}
521+
row[offsets_id] = row_offset
513522
rows.append(row)
514523

515524
return ArrayValue.from_pyarrow(pa.Table.from_pylist(rows), session=self.session)

bigframes/core/blocks.py

Lines changed: 26 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -553,7 +553,7 @@ def _materialize_local(
553553
"""Run query and download results as a pandas DataFrame. Return the total number of results as well."""
554554
# TODO(swast): Allow for dry run and timeout.
555555
_, query_job = self.session._query_to_destination(
556-
self.session._to_sql(self.expr, sorted=True),
556+
self.session._to_sql(self.expr, sorted=materialize_options.ordered),
557557
list(self.index_columns),
558558
api_name="cached",
559559
do_clustering=False,
@@ -1016,7 +1016,7 @@ def aggregate_all_and_stack(
10161016
index_columns=[index_id],
10171017
column_labels=self.column_labels,
10181018
index_labels=[None],
1019-
).transpose(original_row_index=pd.Index([None]))
1019+
).transpose(original_row_index=pd.Index([None]), single_row_mode=True)
10201020
else: # axis_n == 1
10211021
# using offsets as identity to group on.
10221022
# TODO: Allow to promote identity/total_order columns instead for better perf
@@ -1659,6 +1659,8 @@ def melt(
16591659
value_vars=typing.Sequence[str],
16601660
var_names=typing.Sequence[typing.Hashable],
16611661
value_name: typing.Hashable = "value",
1662+
*,
1663+
create_offsets_index: bool = True,
16621664
):
16631665
"""
16641666
Unpivot columns to produce longer, narrower dataframe.
@@ -1679,20 +1681,31 @@ def melt(
16791681
index_col_ids=var_col_ids,
16801682
join_side="right",
16811683
)
1682-
index_id = guid.generate_guid()
1683-
unpivot_expr = unpivot_expr.promote_offsets(index_id)
1684+
1685+
if create_offsets_index:
1686+
index_id = guid.generate_guid()
1687+
unpivot_expr = unpivot_expr.promote_offsets(index_id)
1688+
index_cols = [index_id]
1689+
else:
1690+
index_cols = []
1691+
16841692
# Need to reorder to get id_vars before var_col and unpivot_col
16851693
unpivot_expr = unpivot_expr.select_columns(
1686-
[index_id, *id_vars, *var_col_ids, unpivot_col_id]
1694+
[*index_cols, *id_vars, *var_col_ids, unpivot_col_id]
16871695
)
16881696

16891697
return Block(
16901698
unpivot_expr,
16911699
column_labels=[*id_labels, *var_names, value_name],
1692-
index_columns=[index_id],
1700+
index_columns=index_cols,
16931701
)
16941702

1695-
def transpose(self, *, original_row_index: Optional[pd.Index] = None) -> Block:
1703+
def transpose(
1704+
self,
1705+
*,
1706+
original_row_index: Optional[pd.Index] = None,
1707+
single_row_mode: bool = False,
1708+
) -> Block:
16961709
"""Transpose the block. Will fail if dtypes aren't coercible to a common type or too many rows.
16971710
Can provide the original_row_index directly if it is already known, otherwise a query is needed.
16981711
"""
@@ -1718,7 +1731,11 @@ def transpose(self, *, original_row_index: Optional[pd.Index] = None) -> Block:
17181731
block.column_labels, pd.Index(range(len(block.column_labels)))
17191732
)
17201733
)
1721-
numbered_block, offsets = numbered_block.promote_offsets()
1734+
# TODO: Determine if single row from expression tree (after aggregation without groupby)
1735+
if single_row_mode:
1736+
numbered_block, offsets = numbered_block.create_constant(0)
1737+
else:
1738+
numbered_block, offsets = numbered_block.promote_offsets()
17221739

17231740
stacked_block = numbered_block.melt(
17241741
id_vars=(offsets,),
@@ -1727,6 +1744,7 @@ def transpose(self, *, original_row_index: Optional[pd.Index] = None) -> Block:
17271744
"col_offset",
17281745
),
17291746
value_vars=block.value_columns,
1747+
create_offsets_index=False,
17301748
)
17311749
col_labels = stacked_block.value_columns[-2 - original_col_index.nlevels : -2]
17321750
col_offset = stacked_block.value_columns[-2] # disambiguator we created earlier

bigframes/core/compile/compiler.py

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -109,6 +109,11 @@ def compile_cached_table(node: nodes.CachedTableNode, ordered: bool = True):
109109
)
110110
ibis_table = ibis.table(physical_schema, full_table_name)
111111
if ordered:
112+
if node.ordering is None:
113+
# If this happens, session malfunctioned while applying cached results.
114+
raise ValueError(
115+
"Cannot use unordered cached value. Result requires ordering information."
116+
)
112117
return compiled.OrderedIR(
113118
ibis_table,
114119
columns=tuple(

bigframes/core/nodes.py

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -411,6 +411,7 @@ def transform_children(
411411
return self
412412

413413

414+
# This node shouldn't be used in the "original" expression tree, only used as replacement for original during planning
414415
@dataclass(frozen=True)
415416
class CachedTableNode(BigFrameNode):
416417
# The original BFET subtree that was cached
@@ -422,7 +423,7 @@ class CachedTableNode(BigFrameNode):
422423
table_id: str = field()
423424
physical_schema: Tuple[bq.SchemaField, ...] = field()
424425

425-
ordering: orderings.ExpressionOrdering = field()
426+
ordering: typing.Optional[orderings.ExpressionOrdering] = field()
426427

427428
@property
428429
def session(self):
@@ -446,6 +447,8 @@ def variables_introduced(self) -> int:
446447
@property
447448
def hidden_columns(self) -> typing.Tuple[str, ...]:
448449
"""Physical columns used to define ordering but not directly exposed as value columns."""
450+
if self.ordering is None:
451+
return ()
449452
return tuple(
450453
col
451454
for col in sorted(self.ordering.referenced_columns)

bigframes/session/__init__.py

Lines changed: 12 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -294,6 +294,9 @@ def __init__(
294294
self._bytes_processed_sum = 0
295295
self._slot_millis_sum = 0
296296
self._execution_count = 0
297+
# Whether this session treats objects as totally ordered.
298+
# Will expose as feature later, only False for internal testing
299+
self._strictly_ordered = True
297300

298301
@property
299302
def bqclient(self):
@@ -1841,24 +1844,31 @@ def _cache_with_cluster_cols(
18411844
"""Executes the query and uses the resulting table to rewrite future executions."""
18421845
# TODO: Use this for all executions? Problem is that caching materializes extra
18431846
# ordering columns
1847+
# TODO: May want to support some partial ordering info even for non-strict ordering mode
1848+
keep_order_info = self._strictly_ordered
1849+
18441850
compiled_value = self._compile_ordered(array_value)
18451851

18461852
ibis_expr = compiled_value._to_ibis_expr(
1847-
ordering_mode="unordered", expose_hidden_cols=True
1853+
ordering_mode="unordered", expose_hidden_cols=keep_order_info
18481854
)
18491855
tmp_table = self._ibis_to_temp_table(
18501856
ibis_expr, cluster_cols=cluster_cols, api_name="cached"
18511857
)
18521858
cached_replacement = array_value.as_cached(
18531859
cache_table=self.bqclient.get_table(tmp_table),
1854-
ordering=compiled_value._ordering,
1860+
ordering=compiled_value._ordering if keep_order_info else None,
18551861
).node
18561862
self._cached_executions[array_value.node] = cached_replacement
18571863

18581864
def _cache_with_offsets(self, array_value: core.ArrayValue):
18591865
"""Executes the query and uses the resulting table to rewrite future executions."""
18601866
# TODO: Use this for all executions? Problem is that caching materializes extra
18611867
# ordering columns
1868+
if not self._strictly_ordered:
1869+
raise ValueError(
1870+
"Caching with offsets only supported in strictly ordered mode."
1871+
)
18621872
compiled_value = self._compile_ordered(array_value)
18631873

18641874
ibis_expr = compiled_value._to_ibis_expr(

tests/system/conftest.py

Lines changed: 11 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -139,6 +139,17 @@ def session() -> Generator[bigframes.Session, None, None]:
139139
session.close() # close generated session at cleanup time
140140

141141

142+
@pytest.fixture(scope="session")
143+
def unordered_session() -> Generator[bigframes.Session, None, None]:
144+
context = bigframes.BigQueryOptions(
145+
location="US",
146+
)
147+
session = bigframes.Session(context=context)
148+
session._strictly_ordered = False
149+
yield session
150+
session.close() # close generated session at cleanup type
151+
152+
142153
@pytest.fixture(scope="session")
143154
def session_tokyo(tokyo_location: str) -> Generator[bigframes.Session, None, None]:
144155
context = bigframes.BigQueryOptions(

tests/system/small/test_dataframe.py

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -3124,9 +3124,9 @@ def test_dataframe_aggregate_int(scalars_df_index, scalars_pandas_df_index, col,
31243124

31253125
# Check dtype separately
31263126
assert bf_result.dtype == "Int64"
3127-
3127+
# Is otherwise "object" dtype
3128+
pd_result.index = pd_result.index.astype("string[pyarrow]")
31283129
# Pandas may produce narrower numeric types
3129-
# Pandas has object index type
31303130
assert_series_equal(pd_result, bf_result, check_dtype=False, check_index_type=False)
31313131

31323132

@@ -3146,6 +3146,7 @@ def test_dataframe_aggregate_bool(scalars_df_index, scalars_pandas_df_index, col
31463146

31473147
# Pandas may produce narrower numeric types
31483148
# Pandas has object index type
3149+
pd_result.index = pd_result.index.astype("string[pyarrow]")
31493150
assert_series_equal(pd_result, bf_result, check_dtype=False, check_index_type=False)
31503151

31513152

@@ -3183,6 +3184,7 @@ def test_dataframe_aggregates(
31833184

31843185
# Pandas may produce narrower numeric types, but bigframes always produces Float64
31853186
# Pandas has object index type
3187+
pd_result.index = pd_result.index.astype("string[pyarrow]")
31863188
assert_series_equal(
31873189
pd_result,
31883190
bf_result,

tests/system/small/test_unordered.py

Lines changed: 28 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,28 @@
1+
# Copyright 2024 Google LLC
2+
#
3+
# Licensed under the Apache License, Version 2.0 (the "License");
4+
# you may not use this file except in compliance with the License.
5+
# You may obtain a copy of the License at
6+
#
7+
# http://www.apache.org/licenses/LICENSE-2.0
8+
#
9+
# Unless required by applicable law or agreed to in writing, software
10+
# distributed under the License is distributed on an "AS IS" BASIS,
11+
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12+
# See the License for the specific language governing permissions and
13+
# limitations under the License.
14+
import pandas as pd
15+
16+
import bigframes.pandas as bpd
17+
from tests.system.utils import assert_pandas_df_equal
18+
19+
20+
def test_unordered_mode_cache_aggregate(unordered_session):
21+
pd_df = pd.DataFrame({"a": [1, 2, 3], "b": [4, 5, 6]}, dtype=pd.Int64Dtype())
22+
df = bpd.DataFrame(pd_df, session=unordered_session)
23+
mean_diff = df - df.mean()
24+
mean_diff.cache()
25+
bf_result = mean_diff.to_pandas(ordered=False)
26+
pd_result = pd_df - pd_df.mean()
27+
28+
assert_pandas_df_equal(bf_result, pd_result, ignore_order=True)

0 commit comments

Comments
 (0)