Skip to content

Commit 3346494

Browse files
refactor: Unordered mode supports user partial orders (#842)
1 parent 827007c commit 3346494

File tree

10 files changed

+111
-84
lines changed

10 files changed

+111
-84
lines changed

bigframes/core/__init__.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -147,7 +147,7 @@ def _compiled_schema(self) -> schemata.ArraySchema:
147147
def as_cached(
148148
self: ArrayValue,
149149
cache_table: google.cloud.bigquery.Table,
150-
ordering: Optional[orderings.TotalOrdering],
150+
ordering: Optional[orderings.RowOrdering],
151151
) -> ArrayValue:
152152
"""
153153
Replace the node with an equivalent one that references a tabel where the value has been materialized to.

bigframes/core/blocks.py

Lines changed: 29 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -498,9 +498,33 @@ def to_pandas(
498498
sampling_method: Optional[str] = None,
499499
random_state: Optional[int] = None,
500500
*,
501-
ordered: Optional[bool] = None,
501+
ordered: bool = True,
502502
) -> Tuple[pd.DataFrame, bigquery.QueryJob]:
503-
"""Run query and download results as a pandas DataFrame."""
503+
"""Run query and download results as a pandas DataFrame.
504+
505+
Args:
506+
max_download_size (int, default None):
507+
Download size threshold in MB. If max_download_size is exceeded when downloading data
508+
(e.g., to_pandas()), the data will be downsampled if
509+
bigframes.options.sampling.enable_downsampling is True, otherwise, an error will be
510+
raised. If set to a value other than None, this will supersede the global config.
511+
sampling_method (str, default None):
512+
Downsampling algorithms to be chosen from, the choices are: "head": This algorithm
513+
returns a portion of the data from the beginning. It is fast and requires minimal
514+
computations to perform the downsampling; "uniform": This algorithm returns uniform
515+
random samples of the data. If set to a value other than None, this will supersede
516+
the global config.
517+
random_state (int, default None):
518+
The seed for the uniform downsampling algorithm. If provided, the uniform method may
519+
take longer to execute and require more computation. If set to a value other than
520+
None, this will supersede the global config.
521+
ordered (bool, default True):
522+
Determines whether the resulting pandas dataframe will be ordered.
523+
Whether the row ordering is deterministics depends on whether session ordering is strict.
524+
525+
Returns:
526+
pandas.DataFrame, QueryJob
527+
"""
504528
if (sampling_method is not None) and (sampling_method not in _SAMPLING_METHODS):
505529
raise NotImplementedError(
506530
f"The downsampling method {sampling_method} is not implemented, "
@@ -517,10 +541,7 @@ def to_pandas(
517541

518542
df, query_job = self._materialize_local(
519543
materialize_options=MaterializationOptions(
520-
downsampling=sampling,
521-
ordered=ordered
522-
if ordered is not None
523-
else self.session._strictly_ordered,
544+
downsampling=sampling, ordered=ordered
524545
)
525546
)
526547
df.set_axis(self.column_labels, axis=1, copy=False)
@@ -547,7 +568,7 @@ def to_pandas_batches(
547568
dtypes = dict(zip(self.index_columns, self.index.dtypes))
548569
dtypes.update(zip(self.value_columns, self.dtypes))
549570
_, query_job = self.session._query_to_destination(
550-
self.session._to_sql(self.expr, ordered=self.session._strictly_ordered),
571+
self.session._to_sql(self.expr, ordered=True),
551572
list(self.index_columns),
552573
api_name="cached",
553574
do_clustering=False,
@@ -2593,10 +2614,7 @@ def to_pandas(self, *, ordered: Optional[bool] = None) -> pd.Index:
25932614
index_columns = list(self._block.index_columns)
25942615
expr = self._expr.select_columns(index_columns)
25952616
results, _ = self.session._execute(
2596-
expr,
2597-
ordered=ordered
2598-
if (ordered is not None)
2599-
else self.session._strictly_ordered,
2617+
expr, ordered=ordered if ordered is not None else True
26002618
)
26012619
df = expr.session._rows_to_dataframe(results)
26022620
df = df.set_index(index_columns)

bigframes/core/compile/__init__.py

Lines changed: 2 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -14,19 +14,13 @@
1414
from __future__ import annotations
1515

1616
from bigframes.core.compile.api import (
17-
compile_ordered,
18-
compile_peek,
19-
compile_raw,
20-
compile_unordered,
17+
SQLCompiler,
2118
test_only_ibis_inferred_schema,
2219
test_only_try_evaluate,
2320
)
2421

2522
__all__ = [
26-
"compile_peek",
27-
"compile_unordered",
28-
"compile_ordered",
29-
"compile_raw",
23+
"SQLCompiler",
3024
"test_only_try_evaluate",
3125
"test_only_ibis_inferred_schema",
3226
]

bigframes/core/compile/api.py

Lines changed: 38 additions & 32 deletions
Original file line numberDiff line numberDiff line change
@@ -25,38 +25,44 @@
2525
_STRICT_COMPILER = compiler.Compiler(strict=True)
2626

2727

28-
def compile_peek(node: bigframes.core.nodes.BigFrameNode, n_rows: int) -> str:
29-
"""Compile node into sql that selects N arbitrary rows, may not execute deterministically."""
30-
return _STRICT_COMPILER.compile_unordered_ir(node).peek_sql(n_rows)
31-
32-
33-
def compile_unordered(
34-
node: bigframes.core.nodes.BigFrameNode, *, col_id_overrides: Mapping[str, str] = {}
35-
) -> str:
36-
"""Compile node into sql where rows are unsorted, and no ordering information is preserved."""
37-
return _STRICT_COMPILER.compile_unordered_ir(node).to_sql(
38-
col_id_overrides=col_id_overrides
39-
)
40-
41-
42-
def compile_ordered(
43-
node: bigframes.core.nodes.BigFrameNode, *, col_id_overrides: Mapping[str, str] = {}
44-
) -> str:
45-
"""Compile node into sql where rows are sorted with ORDER BY."""
46-
return _STRICT_COMPILER.compile_ordered_ir(node).to_sql(
47-
col_id_overrides=col_id_overrides, ordered=True
48-
)
49-
50-
51-
def compile_raw(
52-
node: bigframes.core.nodes.BigFrameNode,
53-
) -> Tuple[str, bigframes.core.ordering.TotalOrdering]:
54-
"""Compile node into sql that exposes all columns, including hidden ordering-only columns."""
55-
ir = _STRICT_COMPILER.compile_ordered_ir(node)
56-
sql = ir.raw_sql()
57-
ordering_info = ir._ordering
58-
assert ir.has_total_order
59-
return sql, ordering_info # type: ignore
28+
class SQLCompiler:
29+
def __init__(self, strict: bool = True):
30+
self._compiler = compiler.Compiler(strict=strict)
31+
32+
def compile_peek(self, node: bigframes.core.nodes.BigFrameNode, n_rows: int) -> str:
33+
"""Compile node into sql that selects N arbitrary rows, may not execute deterministically."""
34+
return self._compiler.compile_unordered_ir(node).peek_sql(n_rows)
35+
36+
def compile_unordered(
37+
self,
38+
node: bigframes.core.nodes.BigFrameNode,
39+
*,
40+
col_id_overrides: Mapping[str, str] = {},
41+
) -> str:
42+
"""Compile node into sql where rows are unsorted, and no ordering information is preserved."""
43+
return self._compiler.compile_unordered_ir(node).to_sql(
44+
col_id_overrides=col_id_overrides
45+
)
46+
47+
def compile_ordered(
48+
self,
49+
node: bigframes.core.nodes.BigFrameNode,
50+
*,
51+
col_id_overrides: Mapping[str, str] = {},
52+
) -> str:
53+
"""Compile node into sql where rows are sorted with ORDER BY."""
54+
return self._compiler.compile_ordered_ir(node).to_sql(
55+
col_id_overrides=col_id_overrides, ordered=True
56+
)
57+
58+
def compile_raw(
59+
self,
60+
node: bigframes.core.nodes.BigFrameNode,
61+
) -> Tuple[str, bigframes.core.ordering.RowOrdering]:
62+
"""Compile node into sql that exposes all columns, including hidden ordering-only columns."""
63+
ir = self._compiler.compile_ordered_ir(node)
64+
sql = ir.raw_sql()
65+
return sql, ir._ordering
6066

6167

6268
def test_only_try_evaluate(node: bigframes.core.nodes.BigFrameNode):

bigframes/core/compile/compiler.py

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -46,7 +46,8 @@ class Compiler:
4646

4747
def compile_ordered_ir(self, node: nodes.BigFrameNode) -> compiled.OrderedIR:
4848
ir = typing.cast(compiled.OrderedIR, self.compile_node(node, True))
49-
assert ir.has_total_order
49+
if self.strict:
50+
assert ir.has_total_order
5051
return ir
5152

5253
def compile_unordered_ir(self, node: nodes.BigFrameNode) -> compiled.UnorderedIR:

bigframes/core/indexes/base.py

Lines changed: 1 addition & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -492,9 +492,7 @@ def to_pandas(self) -> pandas.Index:
492492
pandas.Index:
493493
A pandas Index with all of the labels from this Index.
494494
"""
495-
return self._block.index.to_pandas(
496-
ordered=self._block.session._strictly_ordered
497-
)
495+
return self._block.index.to_pandas(ordered=True)
498496

499497
def to_numpy(self, dtype=None, **kwargs) -> np.ndarray:
500498
return self.to_pandas().to_numpy(dtype, **kwargs)

bigframes/dataframe.py

Lines changed: 10 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -1192,15 +1192,14 @@ def cov(self, *, numeric_only: bool = False) -> DataFrame:
11921192
def to_arrow(
11931193
self,
11941194
*,
1195-
ordered: Optional[bool] = None,
1195+
ordered: bool = True,
11961196
) -> pyarrow.Table:
11971197
"""Write DataFrame to an Arrow table / record batch.
11981198
11991199
Args:
1200-
ordered (bool, default None):
1201-
Determines whether the resulting Arrow table will be deterministically ordered.
1202-
In some cases, unordered may result in a faster-executing query. If set to a value
1203-
other than None, will override Session default.
1200+
ordered (bool, default True):
1201+
Determines whether the resulting Arrow table will be ordered.
1202+
In some cases, unordered may result in a faster-executing query.
12041203
12051204
Returns:
12061205
pyarrow.Table: A pyarrow Table with all rows and columns of this DataFrame.
@@ -1211,9 +1210,7 @@ def to_arrow(
12111210
)
12121211

12131212
self._optimize_query_complexity()
1214-
pa_table, query_job = self._block.to_arrow(
1215-
ordered=ordered if ordered is not None else self._session._strictly_ordered,
1216-
)
1213+
pa_table, query_job = self._block.to_arrow(ordered=ordered)
12171214
self._set_internal_query_job(query_job)
12181215
return pa_table
12191216

@@ -1223,7 +1220,7 @@ def to_pandas(
12231220
sampling_method: Optional[str] = None,
12241221
random_state: Optional[int] = None,
12251222
*,
1226-
ordered: Optional[bool] = None,
1223+
ordered: bool = True,
12271224
) -> pandas.DataFrame:
12281225
"""Write DataFrame to pandas DataFrame.
12291226
@@ -1243,10 +1240,9 @@ def to_pandas(
12431240
The seed for the uniform downsampling algorithm. If provided, the uniform method may
12441241
take longer to execute and require more computation. If set to a value other than
12451242
None, this will supersede the global config.
1246-
ordered (bool, default None):
1247-
Determines whether the resulting pandas dataframe will be deterministically ordered.
1248-
In some cases, unordered may result in a faster-executing query. If set to a value
1249-
other than None, will override Session default.
1243+
ordered (bool, default True):
1244+
Determines whether the resulting pandas dataframe will be ordered.
1245+
In some cases, unordered may result in a faster-executing query.
12501246
12511247
Returns:
12521248
pandas.DataFrame: A pandas DataFrame with all rows and columns of this DataFrame if the
@@ -1259,7 +1255,7 @@ def to_pandas(
12591255
max_download_size=max_download_size,
12601256
sampling_method=sampling_method,
12611257
random_state=random_state,
1262-
ordered=ordered if ordered is not None else self._session._strictly_ordered,
1258+
ordered=ordered,
12631259
)
12641260
self._set_internal_query_job(query_job)
12651261
return df.set_axis(self._block.column_labels, axis=1, copy=False)

bigframes/series.py

Lines changed: 5 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -329,7 +329,7 @@ def to_pandas(
329329
sampling_method: Optional[str] = None,
330330
random_state: Optional[int] = None,
331331
*,
332-
ordered: Optional[bool] = None,
332+
ordered: bool = True,
333333
) -> pandas.Series:
334334
"""Writes Series to pandas Series.
335335
@@ -349,10 +349,9 @@ def to_pandas(
349349
The seed for the uniform downsampling algorithm. If provided, the uniform method may
350350
take longer to execute and require more computation. If set to a value other than
351351
None, this will supersede the global config.
352-
ordered (bool, default None):
353-
Determines whether the resulting pandas series will be deterministically ordered.
354-
In some cases, unordered may result in a faster-executing query. If set to a value
355-
other than None, will override Session default.
352+
ordered (bool, default True):
353+
Determines whether the resulting pandas series will be ordered.
354+
In some cases, unordered may result in a faster-executing query.
356355
357356
358357
Returns:
@@ -364,7 +363,7 @@ def to_pandas(
364363
max_download_size=max_download_size,
365364
sampling_method=sampling_method,
366365
random_state=random_state,
367-
ordered=ordered if ordered is not None else self._session._strictly_ordered,
366+
ordered=ordered,
368367
)
369368
self._set_internal_query_job(query_job)
370369
series = df.squeeze(axis=1)

bigframes/session/__init__.py

Lines changed: 9 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -304,6 +304,9 @@ def __init__(
304304
if context._strictly_ordered
305305
else bigframes.enums.DefaultIndexKind.NULL
306306
)
307+
self._compiler = bigframes.core.compile.SQLCompiler(
308+
strict=context._strictly_ordered
309+
)
307310

308311
self._remote_function_session = bigframes_rf._RemoteFunctionSession()
309312

@@ -1893,18 +1896,16 @@ def _cache_with_cluster_cols(
18931896
"""Executes the query and uses the resulting table to rewrite future executions."""
18941897
# TODO: Use this for all executions? Problem is that caching materializes extra
18951898
# ordering columns
1896-
# TODO: May want to support some partial ordering info even for non-strict ordering mode
1897-
keep_order_info = self._strictly_ordered
18981899

1899-
sql, ordering_info = bigframes.core.compile.compile_raw(
1900+
sql, ordering_info = self._compiler.compile_raw(
19001901
self._with_cached_executions(array_value.node)
19011902
)
19021903
tmp_table = self._sql_to_temp_table(
19031904
sql, cluster_cols=cluster_cols, api_name="cached"
19041905
)
19051906
cached_replacement = array_value.as_cached(
19061907
cache_table=self.bqclient.get_table(tmp_table),
1907-
ordering=ordering_info if keep_order_info else None,
1908+
ordering=ordering_info,
19081909
).node
19091910
self._cached_executions[array_value.node] = cached_replacement
19101911

@@ -1917,7 +1918,7 @@ def _cache_with_offsets(self, array_value: core.ArrayValue):
19171918
"Caching with offsets only supported in strictly ordered mode."
19181919
)
19191920
offset_column = bigframes.core.guid.generate_guid("bigframes_offsets")
1920-
sql = bigframes.core.compile.compile_unordered(
1921+
sql = self._compiler.compile_unordered(
19211922
self._with_cached_executions(
19221923
array_value.promote_offsets(offset_column).node
19231924
)
@@ -2023,7 +2024,7 @@ def _peek(
20232024
"""A 'peek' efficiently accesses a small number of rows in the dataframe."""
20242025
if not tree_properties.peekable(self._with_cached_executions(array_value.node)):
20252026
warnings.warn("Peeking this value cannot be done efficiently.")
2026-
sql = bigframes.core.compile.compile_peek(
2027+
sql = self._compiler.compile_peek(
20272028
self._with_cached_executions(array_value.node), n_rows
20282029
)
20292030

@@ -2044,10 +2045,10 @@ def _to_sql(
20442045
array_value = array_value.promote_offsets(offset_column)
20452046
node_w_cached = self._with_cached_executions(array_value.node)
20462047
if ordered:
2047-
return bigframes.core.compile.compile_ordered(
2048+
return self._compiler.compile_ordered(
20482049
node_w_cached, col_id_overrides=col_id_overrides
20492050
)
2050-
return bigframes.core.compile.compile_unordered(
2051+
return self._compiler.compile_unordered(
20512052
node_w_cached, col_id_overrides=col_id_overrides
20522053
)
20532054

tests/system/small/test_unordered.py

Lines changed: 14 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -132,3 +132,17 @@ def test_unordered_mode_blocks_windowing(unordered_session, function):
132132
match=r"Op.*not supported when strict ordering is disabled",
133133
):
134134
function(df)
135+
136+
137+
def test_unordered_mode_cache_preserves_order(unordered_session):
138+
pd_df = pd.DataFrame(
139+
{"a": [1, 2, 3, 4, 5, 6], "b": [4, 5, 9, 3, 1, 6]}, dtype=pd.Int64Dtype()
140+
)
141+
pd_df.index = pd_df.index.astype(pd.Int64Dtype())
142+
df = bpd.DataFrame(pd_df, session=unordered_session)
143+
sorted_df = df.sort_values("b").cache()
144+
bf_result = sorted_df.to_pandas()
145+
pd_result = pd_df.sort_values("b")
146+
147+
# B is unique so unstrict order mode result here should be equivalent to strictly ordered
148+
assert_pandas_df_equal(bf_result, pd_result, ignore_order=False)

0 commit comments

Comments
 (0)