Skip to content
Merged
71 changes: 63 additions & 8 deletions bigframes/core/array_value.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,6 @@
from dataclasses import dataclass
import datetime
import functools
import itertools
import typing
from typing import Iterable, List, Mapping, Optional, Sequence, Tuple

Expand Down Expand Up @@ -267,19 +266,75 @@ def compute_values(self, assignments: Sequence[ex.Expression]):
)

def compute_general_expression(self, assignments: Sequence[ex.Expression]):
"""
Applies arbitrary column expressions to the current execution block.

This method transforms the logical plan by applying a sequence of expressions that
preserve the length of the input columns. It supports both scalar operations
and window functions. Each expression is assigned a unique internal column identifier.

Args:
assignments (Sequence[ex.Expression]): A sequence of expression objects
representing the transformations to apply to the columns.

Returns:
Tuple[ArrayValue, Tuple[str, ...]]: A tuple containing:
- An `ArrayValue` wrapping the new root node of the updated logical plan.
- A tuple of strings representing the unique column IDs generated for
each expression in the assignments.
"""
named_exprs = [
nodes.ColumnDef(expr, ids.ColumnId.unique()) for expr in assignments
]
# TODO: Push this to rewrite later to go from block expression to planning form
# TODO: Jointly fragmentize expressions to more efficiently reuse common sub-expressions
fragments = tuple(
itertools.chain.from_iterable(
expression_factoring.fragmentize_expression(expr)
for expr in named_exprs
)
new_root = expression_factoring.apply_col_exprs_to_plan(self.node, named_exprs)

target_ids = tuple(named_expr.id for named_expr in named_exprs)
return (ArrayValue(new_root), target_ids)

def compute_general_reduction(
self,
assignments: Sequence[ex.Expression],
by_column_ids: typing.Sequence[str] = (),
*,
dropna: bool = False,
):
"""
Applies arbitrary aggregation expressions to the block, optionally grouped by keys.

This method handles reduction operations (e.g., sum, mean, count) that collapse
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

How do we handle methods like APPROX_TOP_COUNT that can't be used in windows?

SELECT
  pickup_datetime,
  trip_distance,
  APPROX_TOP_COUNT(trip_distance, 10) OVER (ORDER BY pickup_datetime ROWS BETWEEN 3 PRECEDING AND CURRENT ROW ) AS rolling_sum_trip_distance_4_trips
FROM
  `bigquery-public-data`.`new_york`.`tlc_green_trips_2014`
ORDER BY
  pickup_datetime;

Likewise, what about methods like PERCENTILE_CONT that require a window with an order by clause?

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Presumably, these ops are handled at a diiferent layer to avoid such cases? It'd be good to be explicit in the docs here that ops provided need to work as both analytical and aggregate functions. Or if we can use such functions here, what to do to avoid windows / force group by.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

yeah, best to catch these early. adding a windowizable prop to ops, and will throw if try to windowize non-windowizable op

multiple input rows into a single scalar value per group. If grouping keys are
provided, the operation is performed per group; otherwise, it is a global reduction.

Args:
assignments (Sequence[ex.Expression]): A sequence of aggregation expressions
to be calculated.
by_column_ids (typing.Sequence[str], optional): A sequence of column IDs
to use as grouping keys. Defaults to an empty tuple (global reduction).
dropna (bool, optional): If True, rows containing null values in the
`by_column_ids` columns will be filtered out before the reduction
is applied. Defaults to False.

Returns:
Tuple[ArrayValue, Tuple[str, ...]]: A tuple containing:
- An `ArrayValue` wrapping the new root node representing the
aggregation/group-by result.
- A tuple of strings representing the unique column IDs assigned to the
resulting aggregate columns.
"""
plan = self.node
if dropna:
for col_id in by_column_ids:
plan = nodes.FilterNode(plan, ops.notnull_op.as_expr(col_id))

named_exprs = [
nodes.ColumnDef(expr, ids.ColumnId.unique()) for expr in assignments
]
# TODO: Push this to rewrite later to go from block expression to planning form
new_root = expression_factoring.apply_agg_exprs_to_plan(
plan, named_exprs, grouping_keys=[ex.deref(by) for by in by_column_ids]
)
target_ids = tuple(named_expr.id for named_expr in named_exprs)
new_root = expression_factoring.push_into_tree(self.node, fragments, target_ids)
return (ArrayValue(new_root), target_ids)

def project_to_id(self, expression: ex.Expression):
Expand Down
115 changes: 40 additions & 75 deletions bigframes/core/block_transforms.py
Original file line number Diff line number Diff line change
Expand Up @@ -622,40 +622,28 @@ def skew(
original_columns = skew_column_ids
column_labels = block.select_columns(original_columns).column_labels

block, delta3_ids = _mean_delta_to_power(
block, 3, original_columns, grouping_column_ids
)
# counts, moment3 for each column
aggregations = []
for i, col in enumerate(original_columns):
for col in original_columns:
delta3_expr = _mean_delta_to_power(3, col)
count_agg = agg_expressions.UnaryAggregation(
agg_ops.count_op,
ex.deref(col),
)
moment3_agg = agg_expressions.UnaryAggregation(
agg_ops.mean_op,
ex.deref(delta3_ids[i]),
delta3_expr,
)
variance_agg = agg_expressions.UnaryAggregation(
agg_ops.PopVarOp(),
ex.deref(col),
)
aggregations.extend([count_agg, moment3_agg, variance_agg])
skew_expr = _skew_from_moments_and_count(count_agg, moment3_agg, variance_agg)
aggregations.append(skew_expr)

block, agg_ids = block.aggregate(
by_column_ids=grouping_column_ids, aggregations=aggregations
block, _ = block.reduce_general(
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It's be worth a comment explaining why it's safe to ignore the second element of the returned tuple.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The second element is mostly not used, so just removing it from the method entirely

aggregations, grouping_column_ids, column_labels=column_labels
)

skew_ids = []
for i, col in enumerate(original_columns):
# Corresponds to order of aggregations in preceding loop
count_id, moment3_id, var_id = agg_ids[i * 3 : (i * 3) + 3]
block, skew_id = _skew_from_moments_and_count(
block, count_id, moment3_id, var_id
)
skew_ids.append(skew_id)

block = block.select_columns(skew_ids).with_column_labels(column_labels)
if not grouping_column_ids:
# When ungrouped, transpose result row into a series
# perform transpose last, so as to not invalidate cache
Expand All @@ -672,36 +660,23 @@ def kurt(
) -> blocks.Block:
original_columns = skew_column_ids
column_labels = block.select_columns(original_columns).column_labels

block, delta4_ids = _mean_delta_to_power(
block, 4, original_columns, grouping_column_ids
)
# counts, moment4 for each column
aggregations = []
for i, col in enumerate(original_columns):
kurt_exprs = []
for col in original_columns:
delta_4_expr = _mean_delta_to_power(4, col)
count_agg = agg_expressions.UnaryAggregation(agg_ops.count_op, ex.deref(col))
moment4_agg = agg_expressions.UnaryAggregation(
agg_ops.mean_op, ex.deref(delta4_ids[i])
)
moment4_agg = agg_expressions.UnaryAggregation(agg_ops.mean_op, delta_4_expr)
variance_agg = agg_expressions.UnaryAggregation(
agg_ops.PopVarOp(), ex.deref(col)
)
aggregations.extend([count_agg, moment4_agg, variance_agg])

block, agg_ids = block.aggregate(
by_column_ids=grouping_column_ids, aggregations=aggregations
)

kurt_ids = []
for i, col in enumerate(original_columns):
# Corresponds to order of aggregations in preceding loop
count_id, moment4_id, var_id = agg_ids[i * 3 : (i * 3) + 3]
block, kurt_id = _kurt_from_moments_and_count(
block, count_id, moment4_id, var_id
)
kurt_ids.append(kurt_id)
kurt_expr = _kurt_from_moments_and_count(count_agg, moment4_agg, variance_agg)
kurt_exprs.append(kurt_expr)

block = block.select_columns(kurt_ids).with_column_labels(column_labels)
block, _ = block.reduce_general(
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Same here.

kurt_exprs, grouping_column_ids, column_labels=column_labels
)
if not grouping_column_ids:
# When ungrouped, transpose result row into a series
# perform transpose last, so as to not invalidate cache
Expand All @@ -712,38 +687,30 @@ def kurt(


def _mean_delta_to_power(
block: blocks.Block,
n_power: int,
column_ids: typing.Sequence[str],
grouping_column_ids: typing.Sequence[str],
) -> typing.Tuple[blocks.Block, typing.Sequence[str]]:
val_id: str,
) -> ex.Expression:
"""Calculate (x-mean(x))^n. Useful for calculating moment statistics such as skew and kurtosis."""
window = windows.unbound(grouping_keys=tuple(grouping_column_ids))
block, mean_ids = block.multi_apply_window_op(column_ids, agg_ops.mean_op, window)
delta_ids = []
for val_id, mean_val_id in zip(column_ids, mean_ids):
delta = ops.sub_op.as_expr(val_id, mean_val_id)
delta_power = ops.pow_op.as_expr(delta, ex.const(n_power))
block, delta_power_id = block.project_expr(delta_power)
delta_ids.append(delta_power_id)
return block, delta_ids
mean_expr = agg_expressions.UnaryAggregation(agg_ops.mean_op, ex.deref(val_id))
delta = ops.sub_op.as_expr(val_id, mean_expr)
return ops.pow_op.as_expr(delta, ex.const(n_power))


def _skew_from_moments_and_count(
block: blocks.Block, count_id: str, moment3_id: str, moment2_id: str
) -> typing.Tuple[blocks.Block, str]:
count: ex.Expression, moment3: ex.Expression, moment2: ex.Expression
) -> ex.Expression:
# Calculate skew using count, third moment and population variance
# See G1 estimator:
# https://en.wikipedia.org/wiki/Skewness#Sample_skewness
moments_estimator = ops.div_op.as_expr(
moment3_id, ops.pow_op.as_expr(moment2_id, ex.const(3 / 2))
moment3, ops.pow_op.as_expr(moment2, ex.const(3 / 2))
)

countminus1 = ops.sub_op.as_expr(count_id, ex.const(1))
countminus2 = ops.sub_op.as_expr(count_id, ex.const(2))
countminus1 = ops.sub_op.as_expr(count, ex.const(1))
countminus2 = ops.sub_op.as_expr(count, ex.const(2))
adjustment = ops.div_op.as_expr(
ops.unsafe_pow_op.as_expr(
ops.mul_op.as_expr(count_id, countminus1), ex.const(1 / 2)
ops.mul_op.as_expr(count, countminus1), ex.const(1 / 2)
),
countminus2,
)
Expand All @@ -752,14 +719,14 @@ def _skew_from_moments_and_count(

# Need to produce NA if have less than 3 data points
cleaned_skew = ops.where_op.as_expr(
skew, ops.ge_op.as_expr(count_id, ex.const(3)), ex.const(None)
skew, ops.ge_op.as_expr(count, ex.const(3)), ex.const(None)
)
return block.project_expr(cleaned_skew)
return cleaned_skew


def _kurt_from_moments_and_count(
block: blocks.Block, count_id: str, moment4_id: str, moment2_id: str
) -> typing.Tuple[blocks.Block, str]:
count: ex.Expression, moment4: ex.Expression, moment2: ex.Expression
) -> ex.Expression:
# Kurtosis is often defined as the second standardize moment: moment(4)/moment(2)**2
# Pandas however uses Fisher’s estimator, implemented below
# numerator = (count + 1) * (count - 1) * moment4
Expand All @@ -768,28 +735,26 @@ def _kurt_from_moments_and_count(
# kurtosis = (numerator / denominator) - adjustment

numerator = ops.mul_op.as_expr(
moment4_id,
moment4,
ops.mul_op.as_expr(
ops.sub_op.as_expr(count_id, ex.const(1)),
ops.add_op.as_expr(count_id, ex.const(1)),
ops.sub_op.as_expr(count, ex.const(1)),
ops.add_op.as_expr(count, ex.const(1)),
),
)

# Denominator
countminus2 = ops.sub_op.as_expr(count_id, ex.const(2))
countminus3 = ops.sub_op.as_expr(count_id, ex.const(3))
countminus2 = ops.sub_op.as_expr(count, ex.const(2))
countminus3 = ops.sub_op.as_expr(count, ex.const(3))

# Denominator
denominator = ops.mul_op.as_expr(
ops.unsafe_pow_op.as_expr(moment2_id, ex.const(2)),
ops.unsafe_pow_op.as_expr(moment2, ex.const(2)),
ops.mul_op.as_expr(countminus2, countminus3),
)

# Adjustment
adj_num = ops.mul_op.as_expr(
ops.unsafe_pow_op.as_expr(
ops.sub_op.as_expr(count_id, ex.const(1)), ex.const(2)
),
ops.unsafe_pow_op.as_expr(ops.sub_op.as_expr(count, ex.const(1)), ex.const(2)),
ex.const(3),
)
adj_denom = ops.mul_op.as_expr(countminus2, countminus3)
Expand All @@ -800,9 +765,9 @@ def _kurt_from_moments_and_count(

# Need to produce NA if have less than 4 data points
cleaned_kurt = ops.where_op.as_expr(
kurt, ops.ge_op.as_expr(count_id, ex.const(4)), ex.const(None)
kurt, ops.ge_op.as_expr(count, ex.const(4)), ex.const(None)
)
return block.project_expr(cleaned_kurt)
return cleaned_kurt


def align(
Expand Down
45 changes: 44 additions & 1 deletion bigframes/core/blocks.py
Original file line number Diff line number Diff line change
Expand Up @@ -1146,13 +1146,15 @@ def project_exprs(
index_labels=self._index_labels,
)

# This is a new experimental version of the project_exprs that supports mixing analytic and scalar expressions
def project_block_exprs(
self,
exprs: Sequence[ex.Expression],
labels: Union[Sequence[Label], pd.Index],
drop=False,
) -> Block:
"""
Version of the project_exprs that supports mixing analytic and scalar expressions
"""
new_array, _ = self.expr.compute_general_expression(exprs)
if drop:
new_array = new_array.drop_columns(self.value_columns)
Expand All @@ -1167,6 +1169,47 @@ def project_block_exprs(
index_labels=self._index_labels,
)

def reduce_general(
self,
aggregations: typing.Sequence[ex.Expression] = (),
by_column_ids: typing.Sequence[str] = (),
column_labels: Optional[pd.Index] = None,
*,
dropna: bool = True,
) -> typing.Tuple[Block, typing.Sequence[str]]:
"""
Version of the aggregate that supports mixing analytic and scalar expressions.
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Since we still have aggregate, aggregate_size, and aggregate_all_and_stack let's say when to use them over this.

Alternatively, put an internal deprecated comment in the docstrings of those and file an issue for ourselves to cleanup uses of them.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

good point, removing aggregate and aggregate_size in this PR now.

"""
if column_labels is None:
column_labels = pd.Index(range(len(aggregations)))

result_expr, output_col_ids = self.expr.compute_general_reduction(
aggregations, by_column_ids, dropna=dropna
)

names: typing.List[Label] = []
if len(by_column_ids) == 0:
result_expr, label_id = result_expr.create_constant(0, pd.Int64Dtype())
index_columns = (label_id,)
names = [None]
else:
index_columns = tuple(by_column_ids) # type: ignore
for by_col_id in by_column_ids:
if by_col_id in self.value_columns:
names.append(self.col_id_to_label[by_col_id])
else:
names.append(self.col_id_to_index_name[by_col_id])

return (
Block(
result_expr,
index_columns=index_columns,
column_labels=column_labels,
index_labels=names,
),
[id.name for id in output_col_ids],
)

def apply_window_op(
self,
column: str,
Expand Down
5 changes: 5 additions & 0 deletions bigframes/core/expression.py
Original file line number Diff line number Diff line change
Expand Up @@ -152,6 +152,11 @@ def bottom_up(self, t: Callable[[Expression], Expression]) -> Expression:
expr = t(expr)
return expr

def top_down(self, t: Callable[[Expression], Expression]) -> Expression:
expr = t(self)
expr = expr.transform_children(lambda child: child.top_down(t))
return expr

def walk(self) -> Generator[Expression, None, None]:
yield self
for child in self.children:
Expand Down
Loading