Skip to content

Commit a5ad033

Browse files
authored
chore: internal version of resample. (#1004)
* chore: internal version of resample. * skip legacy pandas * replace generate array with range. * update casts * update to avoid ibis 9 aggregation bug. * re-order the code to avoid ibis 9 'must be qualified with a dataset' bug. * update comments * add unordered mode test * test case with duplicate * updates * doc updates * node updates * fix doc * fix doc
1 parent 0359bc8 commit a5ad033

File tree

11 files changed

+811
-0
lines changed

11 files changed

+811
-0
lines changed

bigframes/core/__init__.py

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -75,6 +75,16 @@ def from_pyarrow(cls, arrow_table: pa.Table, session: Session):
7575
)
7676
return cls(node)
7777

78+
@classmethod
79+
def from_range(cls, start, end, step):
80+
return cls(
81+
nodes.FromRangeNode(
82+
start=start.node,
83+
end=end.node,
84+
step=step,
85+
)
86+
)
87+
7888
@classmethod
7989
def from_table(
8090
cls,

bigframes/core/blocks.py

Lines changed: 175 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -23,6 +23,7 @@
2323

2424
import ast
2525
import dataclasses
26+
import datetime
2627
import functools
2728
import itertools
2829
import random
@@ -43,6 +44,7 @@
4344

4445
import bigframes_vendored.constants as constants
4546
import google.cloud.bigquery as bigquery
47+
import numpy
4648
import pandas as pd
4749
import pyarrow as pa
4850

@@ -1834,6 +1836,179 @@ def transpose(
18341836
.with_transpose_cache(self)
18351837
)
18361838

1839+
def _generate_sequence(
1840+
self,
1841+
start,
1842+
stop,
1843+
step: int = 1,
1844+
):
1845+
range_expr = self.expr.from_range(
1846+
start,
1847+
stop,
1848+
step,
1849+
)
1850+
1851+
return Block(
1852+
range_expr,
1853+
column_labels=["min"],
1854+
index_columns=[],
1855+
)
1856+
1857+
def _generate_resample_label(
1858+
self,
1859+
rule: str,
1860+
closed: Optional[Literal["right", "left"]] = None,
1861+
label: Optional[Literal["right", "left"]] = None,
1862+
on: Optional[Label] = None,
1863+
level: typing.Union[LevelType, typing.Sequence[LevelType]] = None,
1864+
origin: Union[
1865+
Union[pd.Timestamp, datetime.datetime, numpy.datetime64, int, float, str],
1866+
Literal["epoch", "start", "start_day", "end", "end_day"],
1867+
] = "start_day",
1868+
) -> Block:
1869+
# Validate and resolve the index or column to use for grouping
1870+
if on is None:
1871+
if len(self.index_columns) == 0:
1872+
raise ValueError(
1873+
f"No index for resampling. Expected {bigframes.dtypes.DATETIME_DTYPE} or "
1874+
f"{bigframes.dtypes.TIMESTAMP_DTYPE} index or 'on' parameter specifying a column."
1875+
)
1876+
if len(self.index_columns) > 1 and (level is None):
1877+
raise ValueError(
1878+
"Multiple indices are not supported for this operation"
1879+
" when 'level' is not set."
1880+
)
1881+
level = level or 0
1882+
col_id = self.index.resolve_level(level)[0]
1883+
# Reset index to make the resampling level a column, then drop all other index columns.
1884+
# This simplifies processing by focusing solely on the column required for resampling.
1885+
block = self.reset_index(drop=False)
1886+
block = block.drop_columns(
1887+
[col for col in self.index.column_ids if col != col_id]
1888+
)
1889+
elif level is not None:
1890+
raise ValueError("The Grouper cannot specify both a key and a level!")
1891+
else:
1892+
matches = self.label_to_col_id.get(on, [])
1893+
if len(matches) > 1:
1894+
raise ValueError(
1895+
f"Multiple columns matching id {on} were found. {constants.FEEDBACK_LINK}"
1896+
)
1897+
if len(matches) == 0:
1898+
raise KeyError(f"The grouper name {on} is not found")
1899+
1900+
col_id = matches[0]
1901+
block = self
1902+
if level is None:
1903+
dtype = self._column_type(col_id)
1904+
elif isinstance(level, int):
1905+
dtype = self.index.dtypes[level]
1906+
else:
1907+
dtype = self.index.dtypes[self.index.names.index(level)]
1908+
1909+
if dtype not in (
1910+
bigframes.dtypes.DATETIME_DTYPE,
1911+
bigframes.dtypes.TIMESTAMP_DTYPE,
1912+
):
1913+
raise TypeError(
1914+
f"Invalid column type: {dtype}. Expected types are "
1915+
f"{bigframes.dtypes.DATETIME_DTYPE}, or "
1916+
f"{bigframes.dtypes.TIMESTAMP_DTYPE}."
1917+
)
1918+
1919+
freq = pd.tseries.frequencies.to_offset(rule)
1920+
assert freq is not None
1921+
1922+
if origin not in ("epoch", "start", "start_day"):
1923+
raise ValueError(
1924+
"'origin' should be equal to 'epoch', 'start' or 'start_day'"
1925+
f". Got '{origin}' instead."
1926+
)
1927+
1928+
agg_specs = [
1929+
(
1930+
ex.UnaryAggregation(agg_ops.min_op, ex.deref(col_id)),
1931+
guid.generate_guid(),
1932+
),
1933+
]
1934+
origin_block = Block(
1935+
block.expr.aggregate(agg_specs, dropna=True),
1936+
column_labels=["origin"],
1937+
index_columns=[],
1938+
)
1939+
1940+
col_level = block.value_columns.index(col_id)
1941+
1942+
block = block.merge(
1943+
origin_block, how="cross", left_join_ids=[], right_join_ids=[], sort=True
1944+
)
1945+
1946+
# After merging, the original column ids are altered. 'col_level' is the index of
1947+
# the datetime column used for resampling. 'block.value_columns[-1]' is the
1948+
# 'origin' column, which is the minimum datetime value.
1949+
block, label_col_id = block.apply_binary_op(
1950+
block.value_columns[col_level],
1951+
block.value_columns[-1],
1952+
op=ops.DatetimeToIntegerLabelOp(freq=freq, closed=closed, origin=origin),
1953+
)
1954+
block = block.drop_columns([block.value_columns[-2]])
1955+
1956+
# Generate integer label sequence.
1957+
min_agg_specs = [
1958+
(
1959+
ex.UnaryAggregation(agg_ops.min_op, ex.deref(label_col_id)),
1960+
guid.generate_guid(),
1961+
),
1962+
]
1963+
max_agg_specs = [
1964+
(
1965+
ex.UnaryAggregation(agg_ops.max_op, ex.deref(label_col_id)),
1966+
guid.generate_guid(),
1967+
),
1968+
]
1969+
label_start = block.expr.aggregate(min_agg_specs, dropna=True)
1970+
label_stop = block.expr.aggregate(max_agg_specs, dropna=True)
1971+
1972+
label_block = block._generate_sequence(
1973+
start=label_start,
1974+
stop=label_stop,
1975+
)
1976+
1977+
label_block = label_block.merge(
1978+
origin_block, how="cross", left_join_ids=[], right_join_ids=[], sort=True
1979+
)
1980+
1981+
block = label_block.merge(
1982+
block,
1983+
how="left",
1984+
left_join_ids=[label_block.value_columns[0]],
1985+
right_join_ids=[label_col_id],
1986+
sort=True,
1987+
)
1988+
1989+
block, resample_label_id = block.apply_binary_op(
1990+
block.value_columns[0],
1991+
block.value_columns[1],
1992+
op=ops.IntegerLabelToDatetimeOp(freq=freq, label=label, origin=origin),
1993+
)
1994+
1995+
# After multiple merges, the columns:
1996+
# - block.value_columns[0] is the integer label sequence,
1997+
# - block.value_columns[1] is the origin column (minimum datetime value),
1998+
# - col_level+2 represents the datetime column used for resampling,
1999+
# - block.value_columns[-2] is the integer label column derived from the datetime column.
2000+
# These columns are no longer needed.
2001+
block = block.drop_columns(
2002+
[
2003+
block.value_columns[0],
2004+
block.value_columns[1],
2005+
block.value_columns[col_level + 2],
2006+
block.value_columns[-2],
2007+
]
2008+
)
2009+
2010+
return block.set_index([resample_label_id])
2011+
18372012
def _create_stack_column(self, col_label: typing.Tuple, stack_labels: pd.Index):
18382013
dtype = None
18392014
input_columns: list[Optional[str]] = []

bigframes/core/compile/compiler.py

Lines changed: 34 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -112,6 +112,40 @@ def compile_join(self, node: nodes.JoinNode, ordered: bool = True):
112112
conditions=condition_pairs,
113113
)
114114

115+
@_compile_node.register
116+
def compile_fromrange(self, node: nodes.FromRangeNode, ordered: bool = True):
117+
# Both start and end are single elements and do not inherently have an order
118+
start = self.compile_unordered_ir(node.start)
119+
end = self.compile_unordered_ir(node.end)
120+
start_table = start._to_ibis_expr()
121+
end_table = end._to_ibis_expr()
122+
123+
start_column = start_table.schema().names[0]
124+
end_column = end_table.schema().names[0]
125+
126+
# Perform a cross join to avoid errors
127+
joined_table = start_table.cross_join(end_table)
128+
129+
labels_array_table = ibis.range(
130+
joined_table[start_column], joined_table[end_column] + node.step, node.step
131+
).name("labels")
132+
labels = (
133+
typing.cast(ibis.expr.types.ArrayValue, labels_array_table)
134+
.unnest()
135+
.as_table()
136+
)
137+
if ordered:
138+
return compiled.OrderedIR(
139+
labels,
140+
columns=[labels[labels.columns[0]]],
141+
ordering=bf_ordering.TotalOrdering().from_offset_col(labels.columns[0]),
142+
)
143+
else:
144+
return compiled.UnorderedIR(
145+
labels,
146+
columns=[labels[labels.columns[0]]],
147+
)
148+
115149
@_compile_node.register
116150
def compile_readlocal(self, node: nodes.ReadLocalNode, ordered: bool = True):
117151
array_as_pd = pd.read_feather(io.BytesIO(node.feather_bytes))

0 commit comments

Comments
 (0)