Skip to content

Commit b4b7073

Browse files
authored
feat: Enable time range rolling for DataFrame, DataFrameGroupBy and SeriesGroupBy (#1605)
* feat: Enable range rolling for DataFrame, DataFrameGroupBy and SeriesGroupBy * update docs * fix lint * resolve merge conflict
1 parent eb0bcd3 commit b4b7073

File tree

9 files changed

+279
-80
lines changed

9 files changed

+279
-80
lines changed

bigframes/core/groupby/dataframe_group_by.py

Lines changed: 33 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -14,11 +14,13 @@
1414

1515
from __future__ import annotations
1616

17+
import datetime
1718
import typing
1819
from typing import Literal, Sequence, Tuple, Union
1920

2021
import bigframes_vendored.constants as constants
2122
import bigframes_vendored.pandas.core.groupby as vendored_pandas_groupby
23+
import numpy
2224
import pandas as pd
2325

2426
from bigframes import session
@@ -30,6 +32,7 @@
3032
import bigframes.core.ordering as order
3133
import bigframes.core.utils as utils
3234
import bigframes.core.validations as validations
35+
from bigframes.core.window import rolling
3336
import bigframes.core.window as windows
3437
import bigframes.core.window_spec as window_specs
3538
import bigframes.dataframe as df
@@ -309,28 +312,41 @@ def diff(self, periods=1) -> series.Series:
309312
@validations.requires_ordering()
310313
def rolling(
311314
self,
312-
window: int,
315+
window: int | pd.Timedelta | numpy.timedelta64 | datetime.timedelta | str,
313316
min_periods=None,
314317
on: str | None = None,
315318
closed: Literal["right", "left", "both", "neither"] = "right",
316319
) -> windows.Window:
317-
window_spec = window_specs.WindowSpec(
318-
bounds=window_specs.RowsWindowBounds.from_window_size(window, closed),
319-
min_periods=min_periods if min_periods is not None else window,
320-
grouping_keys=tuple(ex.deref(col) for col in self._by_col_ids),
321-
)
322-
block = self._block.order_by(
323-
[order.ascending_over(col) for col in self._by_col_ids],
324-
)
325-
skip_agg_col_id = (
326-
None if on is None else self._block.resolve_label_exact_or_error(on)
327-
)
328-
return windows.Window(
329-
block,
330-
window_spec,
331-
self._selected_cols,
320+
if isinstance(window, int):
321+
window_spec = window_specs.WindowSpec(
322+
bounds=window_specs.RowsWindowBounds.from_window_size(window, closed),
323+
min_periods=min_periods if min_periods is not None else window,
324+
grouping_keys=tuple(ex.deref(col) for col in self._by_col_ids),
325+
)
326+
block = self._block.order_by(
327+
[order.ascending_over(col) for col in self._by_col_ids],
328+
)
329+
skip_agg_col_id = (
330+
None if on is None else self._block.resolve_label_exact_or_error(on)
331+
)
332+
return windows.Window(
333+
block,
334+
window_spec,
335+
self._selected_cols,
336+
drop_null_groups=self._dropna,
337+
skip_agg_column_id=skip_agg_col_id,
338+
)
339+
340+
return rolling.create_range_window(
341+
self._block,
342+
window,
343+
min_periods=min_periods,
344+
value_column_ids=self._selected_cols,
345+
on=on,
346+
closed=closed,
347+
is_series=False,
348+
grouping_keys=self._by_col_ids,
332349
drop_null_groups=self._dropna,
333-
skip_agg_column_id=skip_agg_col_id,
334350
)
335351

336352
@validations.requires_ordering()

bigframes/core/groupby/series_group_by.py

Lines changed: 30 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -14,11 +14,14 @@
1414

1515
from __future__ import annotations
1616

17+
import datetime
1718
import typing
1819
from typing import Literal, Sequence, Union
1920

2021
import bigframes_vendored.constants as constants
2122
import bigframes_vendored.pandas.core.groupby as vendored_pandas_groupby
23+
import numpy
24+
import pandas
2225

2326
from bigframes import session
2427
from bigframes.core import expression as ex
@@ -29,6 +32,7 @@
2932
import bigframes.core.ordering as order
3033
import bigframes.core.utils as utils
3134
import bigframes.core.validations as validations
35+
from bigframes.core.window import rolling
3236
import bigframes.core.window as windows
3337
import bigframes.core.window_spec as window_specs
3438
import bigframes.dataframe as df
@@ -246,24 +250,36 @@ def diff(self, periods=1) -> series.Series:
246250
@validations.requires_ordering()
247251
def rolling(
248252
self,
249-
window: int,
253+
window: int | pandas.Timedelta | numpy.timedelta64 | datetime.timedelta | str,
250254
min_periods=None,
251255
closed: Literal["right", "left", "both", "neither"] = "right",
252256
) -> windows.Window:
253-
window_spec = window_specs.WindowSpec(
254-
bounds=window_specs.RowsWindowBounds.from_window_size(window, closed),
255-
min_periods=min_periods if min_periods is not None else window,
256-
grouping_keys=tuple(ex.deref(col) for col in self._by_col_ids),
257-
)
258-
block = self._block.order_by(
259-
[order.ascending_over(col) for col in self._by_col_ids],
260-
)
261-
return windows.Window(
262-
block,
263-
window_spec,
264-
[self._value_column],
265-
drop_null_groups=self._dropna,
257+
if isinstance(window, int):
258+
window_spec = window_specs.WindowSpec(
259+
bounds=window_specs.RowsWindowBounds.from_window_size(window, closed),
260+
min_periods=min_periods if min_periods is not None else window,
261+
grouping_keys=tuple(ex.deref(col) for col in self._by_col_ids),
262+
)
263+
block = self._block.order_by(
264+
[order.ascending_over(col) for col in self._by_col_ids],
265+
)
266+
return windows.Window(
267+
block,
268+
window_spec,
269+
[self._value_column],
270+
drop_null_groups=self._dropna,
271+
is_series=True,
272+
)
273+
274+
return rolling.create_range_window(
275+
self._block,
276+
window,
277+
min_periods=min_periods,
278+
value_column_ids=[self._value_column],
279+
closed=closed,
266280
is_series=True,
281+
grouping_keys=self._by_col_ids,
282+
drop_null_groups=self._dropna,
267283
)
268284

269285
@validations.requires_ordering()

bigframes/core/window/ordering.py

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -78,4 +78,9 @@ def _(root: nodes.WindowOpNode, column_id: str):
7878

7979
@find_order_direction.register
8080
def _(root: nodes.ProjectionNode, column_id: str):
81+
for expr, ref in root.assignments:
82+
if ref.name == column_id and isinstance(expr, ex.DerefOp):
83+
# This source column is renamed.
84+
return find_order_direction(root.child, expr.id.name)
85+
8186
return find_order_direction(root.child, column_id)

bigframes/core/window/rolling.py

Lines changed: 50 additions & 26 deletions
Original file line numberDiff line numberDiff line change
@@ -76,18 +76,7 @@ def _apply_aggregate(
7676
self,
7777
op: agg_ops.UnaryAggregateOp,
7878
):
79-
agg_col_ids = [
80-
col_id
81-
for col_id in self._value_column_ids
82-
if col_id != self._skip_agg_column_id
83-
]
84-
agg_block = self._aggregate_block(op, agg_col_ids)
85-
86-
if self._skip_agg_column_id is not None:
87-
# Concat the skipped column to the result.
88-
agg_block, _ = agg_block.join(
89-
self._block.select_column(self._skip_agg_column_id), how="outer"
90-
)
79+
agg_block = self._aggregate_block(op)
9180

9281
if self._is_series:
9382
from bigframes.series import Series
@@ -102,9 +91,12 @@ def _apply_aggregate(
10291
]
10392
return DataFrame(agg_block)._reindex_columns(column_labels)
10493

105-
def _aggregate_block(
106-
self, op: agg_ops.UnaryAggregateOp, agg_col_ids: typing.List[str]
107-
) -> blocks.Block:
94+
def _aggregate_block(self, op: agg_ops.UnaryAggregateOp) -> blocks.Block:
95+
agg_col_ids = [
96+
col_id
97+
for col_id in self._value_column_ids
98+
if col_id != self._skip_agg_column_id
99+
]
108100
block, result_ids = self._block.multi_apply_window_op(
109101
agg_col_ids,
110102
op,
@@ -123,39 +115,71 @@ def _aggregate_block(
123115
block = block.set_index(col_ids=index_ids)
124116

125117
labels = [self._block.col_id_to_label[col] for col in agg_col_ids]
118+
if self._skip_agg_column_id is not None:
119+
result_ids = [self._skip_agg_column_id, *result_ids]
120+
labels.insert(0, self._block.col_id_to_label[self._skip_agg_column_id])
121+
126122
return block.select_columns(result_ids).with_column_labels(labels)
127123

128124

129125
def create_range_window(
130126
block: blocks.Block,
131127
window: pandas.Timedelta | numpy.timedelta64 | datetime.timedelta | str,
128+
*,
129+
value_column_ids: typing.Sequence[str] = tuple(),
132130
min_periods: int | None,
131+
on: str | None = None,
133132
closed: typing.Literal["right", "left", "both", "neither"],
134133
is_series: bool,
134+
grouping_keys: typing.Sequence[str] = tuple(),
135+
drop_null_groups: bool = True,
135136
) -> Window:
136137

137-
index_dtypes = block.index.dtypes
138-
if len(index_dtypes) > 1:
139-
raise ValueError("Range rolling on MultiIndex is not supported")
140-
if index_dtypes[0] != dtypes.TIMESTAMP_DTYPE:
141-
raise ValueError("Index type should be timestamps with timezones")
138+
if on is None:
139+
# Rolling on index
140+
index_dtypes = block.index.dtypes
141+
if len(index_dtypes) > 1:
142+
raise ValueError("Range rolling on MultiIndex is not supported")
143+
if index_dtypes[0] != dtypes.TIMESTAMP_DTYPE:
144+
raise ValueError("Index type should be timestamps with timezones")
145+
rolling_key_col_id = block.index_columns[0]
146+
else:
147+
# Rolling on a specific column
148+
rolling_key_col_id = block.resolve_label_exact_or_error(on)
149+
if block.expr.get_column_type(rolling_key_col_id) != dtypes.TIMESTAMP_DTYPE:
150+
raise ValueError(f"Column {on} type should be timestamps with timezones")
142151

143152
order_direction = window_ordering.find_order_direction(
144-
block.expr.node, block.index_columns[0]
153+
block.expr.node, rolling_key_col_id
145154
)
146155
if order_direction is None:
156+
target_str = "index" if on is None else f"column {on}"
147157
raise ValueError(
148-
"The index might not be in a monotonic order. Please sort the index before rolling."
158+
f"The {target_str} might not be in a monotonic order. Please sort by {target_str} before rolling."
149159
)
150160
if isinstance(window, str):
151161
window = pandas.Timedelta(window)
152162
spec = window_spec.WindowSpec(
153163
bounds=window_spec.RangeWindowBounds.from_timedelta_window(window, closed),
154164
min_periods=1 if min_periods is None else min_periods,
155165
ordering=(
156-
ordering.OrderingExpression(
157-
ex.deref(block.index_columns[0]), order_direction
158-
),
166+
ordering.OrderingExpression(ex.deref(rolling_key_col_id), order_direction),
159167
),
168+
grouping_keys=tuple(ex.deref(col) for col in grouping_keys),
169+
)
170+
171+
selected_value_col_ids = (
172+
value_column_ids if value_column_ids else block.value_columns
173+
)
174+
# This step must be done after finding the order direction of the window key.
175+
if grouping_keys:
176+
block = block.order_by([ordering.ascending_over(col) for col in grouping_keys])
177+
178+
return Window(
179+
block,
180+
spec,
181+
value_column_ids=selected_value_col_ids,
182+
is_series=is_series,
183+
skip_agg_column_id=None if on is None else rolling_key_col_id,
184+
drop_null_groups=drop_null_groups,
160185
)
161-
return Window(block, spec, block.value_columns, is_series=is_series)

bigframes/dataframe.py

Lines changed: 23 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -67,12 +67,12 @@
6767
import bigframes.core.utils as utils
6868
import bigframes.core.validations as validations
6969
import bigframes.core.window
70+
from bigframes.core.window import rolling
7071
import bigframes.core.window_spec as windows
7172
import bigframes.dtypes
7273
import bigframes.exceptions as bfe
7374
import bigframes.formatting_helpers as formatter
7475
import bigframes.operations as ops
75-
import bigframes.operations.aggregations
7676
import bigframes.operations.aggregations as agg_ops
7777
import bigframes.operations.ai
7878
import bigframes.operations.plotting as plotting
@@ -3393,23 +3393,33 @@ def _perform_join_by_index(
33933393
@validations.requires_ordering()
33943394
def rolling(
33953395
self,
3396-
window: int,
3396+
window: int | pandas.Timedelta | numpy.timedelta64 | datetime.timedelta | str,
33973397
min_periods=None,
33983398
on: str | None = None,
33993399
closed: Literal["right", "left", "both", "neither"] = "right",
34003400
) -> bigframes.core.window.Window:
3401-
window_def = windows.WindowSpec(
3402-
bounds=windows.RowsWindowBounds.from_window_size(window, closed),
3403-
min_periods=min_periods if min_periods is not None else window,
3404-
)
3405-
skip_agg_col_id = (
3406-
None if on is None else self._block.resolve_label_exact_or_error(on)
3407-
)
3408-
return bigframes.core.window.Window(
3401+
if isinstance(window, int):
3402+
window_def = windows.WindowSpec(
3403+
bounds=windows.RowsWindowBounds.from_window_size(window, closed),
3404+
min_periods=min_periods if min_periods is not None else window,
3405+
)
3406+
skip_agg_col_id = (
3407+
None if on is None else self._block.resolve_label_exact_or_error(on)
3408+
)
3409+
return bigframes.core.window.Window(
3410+
self._block,
3411+
window_def,
3412+
self._block.value_columns,
3413+
skip_agg_column_id=skip_agg_col_id,
3414+
)
3415+
3416+
return rolling.create_range_window(
34093417
self._block,
3410-
window_def,
3411-
self._block.value_columns,
3412-
skip_agg_column_id=skip_agg_col_id,
3418+
window,
3419+
min_periods=min_periods,
3420+
on=on,
3421+
closed=closed,
3422+
is_series=False,
34133423
)
34143424

34153425
@validations.requires_ordering()

bigframes/series.py

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1590,7 +1590,11 @@ def rolling(
15901590
)
15911591

15921592
return rolling.create_range_window(
1593-
self._block, window, min_periods, closed, is_series=True
1593+
block=self._block,
1594+
window=window,
1595+
min_periods=min_periods,
1596+
closed=closed,
1597+
is_series=True,
15941598
)
15951599

15961600
@validations.requires_ordering()

0 commit comments

Comments
 (0)