Skip to content

Commit f2ed29c

Browse files
refactor: Distinguish between range and row windows (#672)
1 parent 2218c21 commit f2ed29c

File tree

9 files changed

+249
-106
lines changed

9 files changed

+249
-106
lines changed

bigframes/core/block_transforms.py

Lines changed: 23 additions & 26 deletions
Original file line numberDiff line numberDiff line change
@@ -71,21 +71,19 @@ def indicate_duplicates(
7171
if keep == "first":
7272
# Count how many copies occur up to current copy of value
7373
# Discard this value if there are copies BEFORE
74-
window_spec = windows.WindowSpec(
74+
window_spec = windows.cumulative_rows(
7575
grouping_keys=tuple(columns),
76-
following=0,
7776
)
7877
elif keep == "last":
7978
# Count how many copies occur up to current copy of values
8079
# Discard this value if there are copies AFTER
81-
window_spec = windows.WindowSpec(
80+
window_spec = windows.inverse_cumulative_rows(
8281
grouping_keys=tuple(columns),
83-
preceding=0,
8482
)
8583
else: # keep == False
8684
# Count how many copies of the value occur in entire series.
8785
# Discard this value if there are copies ANYWHERE
88-
window_spec = windows.WindowSpec(grouping_keys=tuple(columns))
86+
window_spec = windows.unbound(grouping_keys=tuple(columns))
8987
block, dummy = block.create_constant(1)
9088
block, val_count_col_id = block.apply_window_op(
9189
dummy,
@@ -114,7 +112,7 @@ def quantile(
114112
dropna: bool = False,
115113
) -> blocks.Block:
116114
# TODO: handle windowing and more interpolation methods
117-
window = core.WindowSpec(
115+
window = windows.unbound(
118116
grouping_keys=tuple(grouping_column_ids),
119117
)
120118
quantile_cols = []
@@ -212,8 +210,8 @@ def _interpolate_column(
212210
if interpolate_method not in ["linear", "nearest", "ffill"]:
213211
raise ValueError("interpolate method not supported")
214212
window_ordering = (ordering.OrderingExpression(ex.free_var(x_values)),)
215-
backwards_window = windows.WindowSpec(following=0, ordering=window_ordering)
216-
forwards_window = windows.WindowSpec(preceding=0, ordering=window_ordering)
213+
backwards_window = windows.rows(following=0, ordering=window_ordering)
214+
forwards_window = windows.rows(preceding=0, ordering=window_ordering)
217215

218216
# Note, this method may
219217
block, notnull = block.apply_unary_op(column, ops.notnull_op)
@@ -364,7 +362,7 @@ def value_counts(
364362
)
365363
count_id = agg_ids[0]
366364
if normalize:
367-
unbound_window = windows.WindowSpec()
365+
unbound_window = windows.unbound()
368366
block, total_count_id = block.apply_window_op(
369367
count_id, agg_ops.sum_op, unbound_window
370368
)
@@ -388,7 +386,7 @@ def value_counts(
388386

389387
def pct_change(block: blocks.Block, periods: int = 1) -> blocks.Block:
390388
column_labels = block.column_labels
391-
window_spec = windows.WindowSpec(
389+
window_spec = windows.rows(
392390
preceding=periods if periods > 0 else None,
393391
following=-periods if periods < 0 else None,
394392
)
@@ -430,23 +428,22 @@ def rank(
430428
ops.isnull_op,
431429
)
432430
nullity_col_ids.append(nullity_col_id)
433-
window = windows.WindowSpec(
434-
# BigQuery has syntax to reorder nulls with "NULLS FIRST/LAST", but that is unavailable through ibis presently, so must order on a separate nullity expression first.
435-
ordering=(
436-
ordering.OrderingExpression(
437-
ex.free_var(col),
438-
ordering.OrderingDirection.ASC
439-
if ascending
440-
else ordering.OrderingDirection.DESC,
441-
na_last=(na_option in ["bottom", "keep"]),
442-
),
431+
window_ordering = (
432+
ordering.OrderingExpression(
433+
ex.free_var(col),
434+
ordering.OrderingDirection.ASC
435+
if ascending
436+
else ordering.OrderingDirection.DESC,
437+
na_last=(na_option in ["bottom", "keep"]),
443438
),
444439
)
445440
# Count_op ignores nulls, so if na_option is "top" or "bottom", we instead count the nullity columns, where nulls have been mapped to bools
446441
block, rownum_id = block.apply_window_op(
447442
col if na_option == "keep" else nullity_col_id,
448443
agg_ops.dense_rank_op if method == "dense" else agg_ops.count_op,
449-
window_spec=window,
444+
window_spec=windows.unbound(ordering=window_ordering)
445+
if method == "dense"
446+
else windows.rows(following=0, ordering=window_ordering),
450447
skip_reproject_unsafe=(col != columns[-1]),
451448
)
452449
rownum_col_ids.append(rownum_id)
@@ -464,7 +461,7 @@ def rank(
464461
block, result_id = block.apply_window_op(
465462
rownum_col_ids[i],
466463
agg_op,
467-
window_spec=windows.WindowSpec(grouping_keys=(columns[i],)),
464+
window_spec=windows.unbound(grouping_keys=(columns[i],)),
468465
skip_reproject_unsafe=(i < (len(columns) - 1)),
469466
)
470467
post_agg_rownum_col_ids.append(result_id)
@@ -528,7 +525,7 @@ def nsmallest(
528525
block, counter = block.apply_window_op(
529526
column_ids[0],
530527
agg_ops.rank_op,
531-
window_spec=windows.WindowSpec(ordering=tuple(order_refs)),
528+
window_spec=windows.unbound(ordering=tuple(order_refs)),
532529
)
533530
block, condition = block.project_expr(ops.le_op.as_expr(counter, ex.const(n)))
534531
block = block.filter_by_id(condition)
@@ -558,7 +555,7 @@ def nlargest(
558555
block, counter = block.apply_window_op(
559556
column_ids[0],
560557
agg_ops.rank_op,
561-
window_spec=windows.WindowSpec(ordering=tuple(order_refs)),
558+
window_spec=windows.unbound(ordering=tuple(order_refs)),
562559
)
563560
block, condition = block.project_expr(ops.le_op.as_expr(counter, ex.const(n)))
564561
block = block.filter_by_id(condition)
@@ -653,7 +650,7 @@ def _mean_delta_to_power(
653650
grouping_column_ids: typing.Sequence[str],
654651
) -> typing.Tuple[blocks.Block, typing.Sequence[str]]:
655652
"""Calculate (x-mean(x))^n. Useful for calculating moment statistics such as skew and kurtosis."""
656-
window = windows.WindowSpec(grouping_keys=tuple(grouping_column_ids))
653+
window = windows.unbound(grouping_keys=tuple(grouping_column_ids))
657654
block, mean_ids = block.multi_apply_window_op(column_ids, agg_ops.mean_op, window)
658655
delta_ids = []
659656
for val_id, mean_val_id in zip(column_ids, mean_ids):
@@ -845,7 +842,7 @@ def _idx_extrema(
845842
for idx_col in original_block.index_columns
846843
],
847844
]
848-
window_spec = windows.WindowSpec(ordering=tuple(order_refs))
845+
window_spec = windows.unbound(ordering=tuple(order_refs))
849846
idx_col = original_block.index_columns[0]
850847
block, result_col = block.apply_window_op(
851848
idx_col, agg_ops.first_op, window_spec

bigframes/core/blocks.py

Lines changed: 4 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -47,6 +47,7 @@
4747
import bigframes.core.tree_properties as tree_properties
4848
import bigframes.core.utils
4949
import bigframes.core.utils as utils
50+
import bigframes.core.window_spec as window_specs
5051
import bigframes.dtypes
5152
import bigframes.features
5253
import bigframes.operations as ops
@@ -816,7 +817,7 @@ def multi_apply_window_op(
816817
self,
817818
columns: typing.Sequence[str],
818819
op: agg_ops.WindowOp,
819-
window_spec: core.WindowSpec,
820+
window_spec: window_specs.WindowSpec,
820821
*,
821822
skip_null_groups: bool = False,
822823
never_skip_nulls: bool = False,
@@ -875,7 +876,7 @@ def apply_window_op(
875876
self,
876877
column: str,
877878
op: agg_ops.WindowOp,
878-
window_spec: core.WindowSpec,
879+
window_spec: window_specs.WindowSpec,
879880
*,
880881
result_label: Label = None,
881882
skip_null_groups: bool = False,
@@ -2029,7 +2030,7 @@ def _is_monotonic(
20292030
return self._stats_cache[column_name][op_name]
20302031

20312032
period = 1
2032-
window = bigframes.core.WindowSpec(
2033+
window = window_specs.rows(
20332034
preceding=period,
20342035
following=None,
20352036
)

bigframes/core/compile/compiled.py

Lines changed: 30 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -40,7 +40,7 @@
4040
OrderingExpression,
4141
)
4242
import bigframes.core.schema as schemata
43-
from bigframes.core.window_spec import WindowSpec
43+
from bigframes.core.window_spec import RangeWindowBounds, RowsWindowBounds, WindowSpec
4444
import bigframes.dtypes
4545
import bigframes.operations.aggregations as agg_ops
4646

@@ -735,7 +735,9 @@ def project_window_op(
735735
skip_reproject_unsafe: skips the reprojection step, can be used when performing many non-dependent window operations, user responsible for not nesting window expressions, or using outputs as join, filter or aggregation keys before a reprojection
736736
"""
737737
column = typing.cast(ibis_types.Column, self._get_ibis_column(column_name))
738-
window = self._ibis_window_from_spec(window_spec, allow_ties=op.handles_ties)
738+
window = self._ibis_window_from_spec(
739+
window_spec, require_total_order=op.uses_total_row_ordering
740+
)
739741
bindings = {col: self._get_ibis_column(col) for col in self.column_ids}
740742

741743
window_op = agg_compiler.compile_analytic(
@@ -1162,7 +1164,9 @@ def _create_string_ordering_column(self) -> ibis_types.StringColumn:
11621164
def _compile_expression(self, expr: ex.Expression):
11631165
return op_compiler.compile_expression(expr, self._ibis_bindings)
11641166

1165-
def _ibis_window_from_spec(self, window_spec: WindowSpec, allow_ties: bool = False):
1167+
def _ibis_window_from_spec(
1168+
self, window_spec: WindowSpec, require_total_order: bool
1169+
):
11661170
group_by: typing.List[ibis_types.Value] = (
11671171
[
11681172
typing.cast(
@@ -1175,26 +1179,40 @@ def _ibis_window_from_spec(self, window_spec: WindowSpec, allow_ties: bool = Fal
11751179
)
11761180
if self._reduced_predicate is not None:
11771181
group_by.append(self._reduced_predicate)
1182+
1183+
# Construct ordering. There are basically 3 main cases
1184+
# 1. Order-independent op (aggregation, cut, rank) with unbound window - no ordering clause needed
1185+
# 2. Order-independent op (aggregation, cut, rank) with range window - use ordering clause, ties allowed
1186+
# 3. Order-depedenpent op (navigation functions, array_agg) or rows bounds - use total row order to break ties.
11781187
if window_spec.ordering:
11791188
order_by = _convert_ordering_to_table_values(
11801189
{**self._column_names, **self._hidden_ordering_column_names},
11811190
window_spec.ordering,
11821191
)
1183-
if not allow_ties:
1184-
# Most operator need an unambiguous ordering, so the table's total ordering is appended
1192+
if require_total_order or isinstance(window_spec.bounds, RowsWindowBounds):
1193+
# Some operators need an unambiguous ordering, so the table's total ordering is appended
11851194
order_by = tuple([*order_by, *self._ibis_order])
1186-
elif (window_spec.following is not None) or (window_spec.preceding is not None):
1195+
elif isinstance(window_spec.bounds, RowsWindowBounds):
11871196
# If window spec has following or preceding bounds, we need to apply an unambiguous ordering.
11881197
order_by = tuple(self._ibis_order)
11891198
else:
11901199
# Unbound grouping window. Suitable for aggregations but not for analytic function application.
11911200
order_by = None
1192-
return ibis.window(
1193-
preceding=window_spec.preceding,
1194-
following=window_spec.following,
1195-
order_by=order_by,
1196-
group_by=group_by,
1197-
)
1201+
1202+
bounds = window_spec.bounds
1203+
window = ibis.window(order_by=order_by, group_by=group_by)
1204+
if bounds is not None:
1205+
if isinstance(bounds, RangeWindowBounds):
1206+
window = window.preceding_following(
1207+
bounds.preceding, bounds.following, how="range"
1208+
)
1209+
if isinstance(bounds, RowsWindowBounds):
1210+
window = window.preceding_following(
1211+
bounds.preceding, bounds.following, how="rows"
1212+
)
1213+
else:
1214+
raise ValueError(f"unrecognized window bounds {bounds}")
1215+
return window
11981216

11991217
class Builder:
12001218
def __init__(

bigframes/core/groupby/__init__.py

Lines changed: 13 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -28,6 +28,7 @@
2828
import bigframes.core.ordering as order
2929
import bigframes.core.utils as utils
3030
import bigframes.core.window as windows
31+
import bigframes.core.window_spec as window_specs
3132
import bigframes.dataframe as df
3233
import bigframes.dtypes as dtypes
3334
import bigframes.operations.aggregations as agg_ops
@@ -217,15 +218,15 @@ def cumprod(self, *args, **kwargs) -> df.DataFrame:
217218
return self._apply_window_op(agg_ops.product_op, numeric_only=True)
218219

219220
def shift(self, periods=1) -> series.Series:
220-
window = core.WindowSpec(
221+
window = window_specs.rows(
221222
grouping_keys=tuple(self._by_col_ids),
222223
preceding=periods if periods > 0 else None,
223224
following=-periods if periods < 0 else None,
224225
)
225226
return self._apply_window_op(agg_ops.ShiftOp(periods), window=window)
226227

227228
def diff(self, periods=1) -> series.Series:
228-
window = core.WindowSpec(
229+
window = window_specs.rows(
229230
grouping_keys=tuple(self._by_col_ids),
230231
preceding=periods if periods > 0 else None,
231232
following=-periods if periods < 0 else None,
@@ -234,7 +235,7 @@ def diff(self, periods=1) -> series.Series:
234235

235236
def rolling(self, window: int, min_periods=None) -> windows.Window:
236237
# To get n size window, need current row and n-1 preceding rows.
237-
window_spec = core.WindowSpec(
238+
window_spec = window_specs.rows(
238239
grouping_keys=tuple(self._by_col_ids),
239240
preceding=window - 1,
240241
following=0,
@@ -248,9 +249,8 @@ def rolling(self, window: int, min_periods=None) -> windows.Window:
248249
)
249250

250251
def expanding(self, min_periods: int = 1) -> windows.Window:
251-
window_spec = core.WindowSpec(
252+
window_spec = window_specs.cumulative_rows(
252253
grouping_keys=tuple(self._by_col_ids),
253-
following=0,
254254
min_periods=min_periods,
255255
)
256256
block = self._block.order_by(
@@ -424,8 +424,8 @@ def _apply_window_op(
424424
numeric_only: bool = False,
425425
):
426426
"""Apply window op to groupby. Defaults to grouped cumulative window."""
427-
window_spec = window or core.WindowSpec(
428-
grouping_keys=tuple(self._by_col_ids), following=0
427+
window_spec = window or window_specs.cumulative_rows(
428+
grouping_keys=tuple(self._by_col_ids)
429429
)
430430
columns = self._aggregated_columns(numeric_only=numeric_only)
431431
block, result_ids = self._block.multi_apply_window_op(
@@ -594,15 +594,15 @@ def cumcount(self, *args, **kwargs) -> series.Series:
594594

595595
def shift(self, periods=1) -> series.Series:
596596
"""Shift index by desired number of periods."""
597-
window = core.WindowSpec(
597+
window = window_specs.rows(
598598
grouping_keys=tuple(self._by_col_ids),
599599
preceding=periods if periods > 0 else None,
600600
following=-periods if periods < 0 else None,
601601
)
602602
return self._apply_window_op(agg_ops.ShiftOp(periods), window=window)
603603

604604
def diff(self, periods=1) -> series.Series:
605-
window = core.WindowSpec(
605+
window = window_specs.rows(
606606
grouping_keys=tuple(self._by_col_ids),
607607
preceding=periods if periods > 0 else None,
608608
following=-periods if periods < 0 else None,
@@ -611,7 +611,7 @@ def diff(self, periods=1) -> series.Series:
611611

612612
def rolling(self, window: int, min_periods=None) -> windows.Window:
613613
# To get n size window, need current row and n-1 preceding rows.
614-
window_spec = core.WindowSpec(
614+
window_spec = window_specs.rows(
615615
grouping_keys=tuple(self._by_col_ids),
616616
preceding=window - 1,
617617
following=0,
@@ -629,9 +629,8 @@ def rolling(self, window: int, min_periods=None) -> windows.Window:
629629
)
630630

631631
def expanding(self, min_periods: int = 1) -> windows.Window:
632-
window_spec = core.WindowSpec(
632+
window_spec = window_specs.cumulative_rows(
633633
grouping_keys=tuple(self._by_col_ids),
634-
following=0,
635634
min_periods=min_periods,
636635
)
637636
block = self._block.order_by(
@@ -661,8 +660,8 @@ def _apply_window_op(
661660
window: typing.Optional[core.WindowSpec] = None,
662661
):
663662
"""Apply window op to groupby. Defaults to grouped cumulative window."""
664-
window_spec = window or core.WindowSpec(
665-
grouping_keys=tuple(self._by_col_ids), following=0
663+
window_spec = window or window_specs.cumulative_rows(
664+
grouping_keys=tuple(self._by_col_ids)
666665
)
667666

668667
label = self._value_name if not discard_name else None

bigframes/core/reshape/__init__.py

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -19,10 +19,10 @@
1919
import pandas as pd
2020

2121
import bigframes.constants as constants
22-
import bigframes.core as core
2322
import bigframes.core.expression as ex
2423
import bigframes.core.ordering as order
2524
import bigframes.core.utils as utils
25+
import bigframes.core.window_spec as window_specs
2626
import bigframes.dataframe
2727
import bigframes.operations as ops
2828
import bigframes.operations.aggregations as agg_ops
@@ -159,7 +159,7 @@ def cut(
159159
)
160160

161161
return x._apply_window_op(
162-
agg_ops.CutOp(bins, labels=labels), window_spec=core.WindowSpec()
162+
agg_ops.CutOp(bins, labels=labels), window_spec=window_specs.unbound()
163163
)
164164

165165

@@ -189,7 +189,7 @@ def qcut(
189189
block, result = block.apply_window_op(
190190
x._value_column,
191191
agg_ops.QcutOp(q), # type: ignore
192-
window_spec=core.WindowSpec(
192+
window_spec=window_specs.unbound(
193193
grouping_keys=(nullity_id,),
194194
ordering=(order.ascending_over(x._value_column),),
195195
),

0 commit comments

Comments
 (0)