Skip to content

Commit eadfbb2

Browse files
authored
Merge branch 'main' into fix-df-where
2 parents 757c2e2 + 164c481 commit eadfbb2

File tree

33 files changed

+936
-161
lines changed

33 files changed

+936
-161
lines changed

bigframes/core/bigframe_node.py

Lines changed: 20 additions & 32 deletions
Original file line numberDiff line numberDiff line change
@@ -20,17 +20,7 @@
2020
import functools
2121
import itertools
2222
import typing
23-
from typing import (
24-
Callable,
25-
Dict,
26-
Generator,
27-
Iterable,
28-
Mapping,
29-
Sequence,
30-
Set,
31-
Tuple,
32-
Union,
33-
)
23+
from typing import Callable, Dict, Generator, Iterable, Mapping, Sequence, Tuple, Union
3424

3525
from bigframes.core import expression, field, identifiers
3626
import bigframes.core.schema as schemata
@@ -309,33 +299,31 @@ def unique_nodes(
309299
seen.add(item)
310300
stack.extend(item.child_nodes)
311301

312-
def edges(
302+
def iter_nodes_topo(
313303
self: BigFrameNode,
314-
) -> Generator[Tuple[BigFrameNode, BigFrameNode], None, None]:
315-
for item in self.unique_nodes():
316-
for child in item.child_nodes:
317-
yield (item, child)
318-
319-
def iter_nodes_topo(self: BigFrameNode) -> Generator[BigFrameNode, None, None]:
320-
"""Returns nodes from bottom up."""
321-
queue = collections.deque(
322-
[node for node in self.unique_nodes() if not node.child_nodes]
323-
)
324-
304+
) -> Generator[BigFrameNode, None, None]:
305+
"""Returns nodes in reverse topological order, using Kahn's algorithm."""
325306
child_to_parents: Dict[
326-
BigFrameNode, Set[BigFrameNode]
327-
] = collections.defaultdict(set)
328-
for parent, child in self.edges():
329-
child_to_parents[child].add(parent)
330-
331-
yielded = set()
307+
BigFrameNode, list[BigFrameNode]
308+
] = collections.defaultdict(list)
309+
out_degree: Dict[BigFrameNode, int] = collections.defaultdict(int)
310+
311+
queue: collections.deque["BigFrameNode"] = collections.deque()
312+
for node in list(self.unique_nodes()):
313+
num_children = len(node.child_nodes)
314+
out_degree[node] = num_children
315+
if num_children == 0:
316+
queue.append(node)
317+
for child in node.child_nodes:
318+
child_to_parents[child].append(node)
332319

333320
while queue:
334321
item = queue.popleft()
335322
yield item
336-
yielded.add(item)
337-
for parent in child_to_parents[item]:
338-
if set(parent.child_nodes).issubset(yielded):
323+
parents = child_to_parents.get(item, [])
324+
for parent in parents:
325+
out_degree[parent] -= 1
326+
if out_degree[parent] == 0:
339327
queue.append(parent)
340328

341329
def top_down(

bigframes/core/blocks.py

Lines changed: 4 additions & 40 deletions
Original file line numberDiff line numberDiff line change
@@ -1232,46 +1232,10 @@ def aggregate_all_and_stack(
12321232
index_labels=[None],
12331233
).transpose(original_row_index=pd.Index([None]), single_row_mode=True)
12341234
else: # axis_n == 1
1235-
# using offsets as identity to group on.
1236-
# TODO: Allow to promote identity/total_order columns instead for better perf
1237-
expr_with_offsets, offset_col = self.expr.promote_offsets()
1238-
stacked_expr, (_, value_col_ids, passthrough_cols,) = unpivot(
1239-
expr_with_offsets,
1240-
row_labels=self.column_labels,
1241-
unpivot_columns=[tuple(self.value_columns)],
1242-
passthrough_columns=[*self.index_columns, offset_col],
1243-
)
1244-
# these corresponed to passthrough_columns provided to unpivot
1245-
index_cols = passthrough_cols[:-1]
1246-
og_offset_col = passthrough_cols[-1]
1247-
index_aggregations = [
1248-
(
1249-
ex.UnaryAggregation(agg_ops.AnyValueOp(), ex.deref(col_id)),
1250-
col_id,
1251-
)
1252-
for col_id in index_cols
1253-
]
1254-
# TODO: may need add NullaryAggregation in main_aggregation
1255-
# when agg add support for axis=1, needed for agg("size", axis=1)
1256-
assert isinstance(
1257-
operation, agg_ops.UnaryAggregateOp
1258-
), f"Expected a unary operation, but got {operation}. Please report this error and how you got here to the BigQuery DataFrames team (bit.ly/bigframes-feedback)."
1259-
main_aggregation = (
1260-
ex.UnaryAggregation(operation, ex.deref(value_col_ids[0])),
1261-
value_col_ids[0],
1262-
)
1263-
# Drop row identity after aggregating over it
1264-
result_expr = stacked_expr.aggregate(
1265-
[*index_aggregations, main_aggregation],
1266-
by_column_ids=[og_offset_col],
1267-
dropna=dropna,
1268-
).drop_columns([og_offset_col])
1269-
return Block(
1270-
result_expr,
1271-
index_columns=index_cols,
1272-
column_labels=[None],
1273-
index_labels=self.index.names,
1274-
)
1235+
as_array = ops.ToArrayOp().as_expr(*(col for col in self.value_columns))
1236+
reduced = ops.ArrayReduceOp(operation).as_expr(as_array)
1237+
block, id = self.project_expr(reduced, None)
1238+
return block.select_column(id)
12751239

12761240
def aggregate_size(
12771241
self,

bigframes/core/compile/ibis_compiler/aggregate_compiler.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -165,7 +165,7 @@ def _(
165165
) -> ibis_types.NumericValue:
166166
# Will be null if all inputs are null. Pandas defaults to zero sum though.
167167
bq_sum = _apply_window_if_present(column.sum(), window)
168-
return bq_sum.fill_null(ibis_types.literal(0))
168+
return bq_sum.coalesce(ibis_types.literal(0))
169169

170170

171171
@compile_unary_agg.register

bigframes/core/compile/ibis_compiler/scalar_op_registry.py

Lines changed: 22 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1201,6 +1201,28 @@ def array_slice_op_impl(x: ibis_types.Value, op: ops.ArraySliceOp):
12011201
return res
12021202

12031203

1204+
@scalar_op_compiler.register_nary_op(ops.ToArrayOp, pass_op=False)
1205+
def to_arry_op_impl(*values: ibis_types.Value):
1206+
do_upcast_bool = any(t.type().is_numeric() for t in values)
1207+
if do_upcast_bool:
1208+
values = tuple(
1209+
val.cast(ibis_dtypes.int64) if val.type().is_boolean() else val
1210+
for val in values
1211+
)
1212+
return ibis_api.array(values)
1213+
1214+
1215+
@scalar_op_compiler.register_unary_op(ops.ArrayReduceOp, pass_op=True)
1216+
def array_reduce_op_impl(x: ibis_types.Value, op: ops.ArrayReduceOp):
1217+
import bigframes.core.compile.ibis_compiler.aggregate_compiler as agg_compilers
1218+
1219+
return typing.cast(ibis_types.ArrayValue, x).reduce(
1220+
lambda arr_vals: agg_compilers.compile_unary_agg(
1221+
op.aggregation, typing.cast(ibis_types.Column, arr_vals)
1222+
)
1223+
)
1224+
1225+
12041226
# JSON Ops
12051227
@scalar_op_compiler.register_binary_op(ops.JSONSet, pass_op=True)
12061228
def json_set_op_impl(x: ibis_types.Value, y: ibis_types.Value, op: ops.JSONSet):

bigframes/core/compile/polars/compiler.py

Lines changed: 84 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -31,9 +31,12 @@
3131
import bigframes.dtypes
3232
import bigframes.operations as ops
3333
import bigframes.operations.aggregations as agg_ops
34+
import bigframes.operations.array_ops as arr_ops
3435
import bigframes.operations.bool_ops as bool_ops
3536
import bigframes.operations.comparison_ops as comp_ops
37+
import bigframes.operations.date_ops as date_ops
3638
import bigframes.operations.datetime_ops as dt_ops
39+
import bigframes.operations.frequency_ops as freq_ops
3740
import bigframes.operations.generic_ops as gen_ops
3841
import bigframes.operations.json_ops as json_ops
3942
import bigframes.operations.numeric_ops as num_ops
@@ -74,6 +77,20 @@ def decorator(func):
7477

7578

7679
if polars_installed:
80+
_FREQ_MAPPING = {
81+
"Y": "1y",
82+
"Q": "1q",
83+
"M": "1mo",
84+
"W": "1w",
85+
"D": "1d",
86+
"h": "1h",
87+
"min": "1m",
88+
"s": "1s",
89+
"ms": "1ms",
90+
"us": "1us",
91+
"ns": "1ns",
92+
}
93+
7794
_DTYPE_MAPPING = {
7895
# Direct mappings
7996
bigframes.dtypes.INT_DTYPE: pl.Int64(),
@@ -329,11 +346,48 @@ def _(self, op: ops.ScalarOp, input: pl.Expr) -> pl.Expr:
329346
else:
330347
return pl.any_horizontal(*(input.str.ends_with(pat) for pat in op.pat))
331348

349+
@compile_op.register(freq_ops.FloorDtOp)
350+
def _(self, op: ops.ScalarOp, input: pl.Expr) -> pl.Expr:
351+
assert isinstance(op, freq_ops.FloorDtOp)
352+
return input.dt.truncate(every=_FREQ_MAPPING[op.freq])
353+
332354
@compile_op.register(dt_ops.StrftimeOp)
333355
def _(self, op: ops.ScalarOp, input: pl.Expr) -> pl.Expr:
334356
assert isinstance(op, dt_ops.StrftimeOp)
335357
return input.dt.strftime(op.date_format)
336358

359+
@compile_op.register(date_ops.YearOp)
360+
def _(self, op: ops.ScalarOp, input: pl.Expr) -> pl.Expr:
361+
return input.dt.year()
362+
363+
@compile_op.register(date_ops.QuarterOp)
364+
def _(self, op: ops.ScalarOp, input: pl.Expr) -> pl.Expr:
365+
return input.dt.quarter()
366+
367+
@compile_op.register(date_ops.MonthOp)
368+
def _(self, op: ops.ScalarOp, input: pl.Expr) -> pl.Expr:
369+
return input.dt.month()
370+
371+
@compile_op.register(date_ops.DayOfWeekOp)
372+
def _(self, op: ops.ScalarOp, input: pl.Expr) -> pl.Expr:
373+
return input.dt.weekday() - 1
374+
375+
@compile_op.register(date_ops.DayOp)
376+
def _(self, op: ops.ScalarOp, input: pl.Expr) -> pl.Expr:
377+
return input.dt.day()
378+
379+
@compile_op.register(date_ops.IsoYearOp)
380+
def _(self, op: ops.ScalarOp, input: pl.Expr) -> pl.Expr:
381+
return input.dt.iso_year()
382+
383+
@compile_op.register(date_ops.IsoWeekOp)
384+
def _(self, op: ops.ScalarOp, input: pl.Expr) -> pl.Expr:
385+
return input.dt.week()
386+
387+
@compile_op.register(date_ops.IsoDayOp)
388+
def _(self, op: ops.ScalarOp, input: pl.Expr) -> pl.Expr:
389+
return input.dt.weekday()
390+
337391
@compile_op.register(dt_ops.ParseDatetimeOp)
338392
def _(self, op: ops.ScalarOp, input: pl.Expr) -> pl.Expr:
339393
assert isinstance(op, dt_ops.ParseDatetimeOp)
@@ -353,6 +407,36 @@ def _(self, op: ops.ScalarOp, input: pl.Expr) -> pl.Expr:
353407
assert isinstance(op, json_ops.JSONDecode)
354408
return input.str.json_decode(_DTYPE_MAPPING[op.to_type])
355409

410+
@compile_op.register(arr_ops.ToArrayOp)
411+
def _(self, op: ops.ToArrayOp, *inputs: pl.Expr) -> pl.Expr:
412+
return pl.concat_list(*inputs)
413+
414+
@compile_op.register(arr_ops.ArrayReduceOp)
415+
def _(self, op: ops.ArrayReduceOp, input: pl.Expr) -> pl.Expr:
416+
# TODO: Unify this with general aggregation compilation?
417+
if isinstance(op.aggregation, agg_ops.MinOp):
418+
return input.list.min()
419+
if isinstance(op.aggregation, agg_ops.MaxOp):
420+
return input.list.max()
421+
if isinstance(op.aggregation, agg_ops.SumOp):
422+
return input.list.sum()
423+
if isinstance(op.aggregation, agg_ops.MeanOp):
424+
return input.list.mean()
425+
if isinstance(op.aggregation, agg_ops.CountOp):
426+
return input.list.len()
427+
if isinstance(op.aggregation, agg_ops.StdOp):
428+
return input.list.std()
429+
if isinstance(op.aggregation, agg_ops.VarOp):
430+
return input.list.var()
431+
if isinstance(op.aggregation, agg_ops.AnyOp):
432+
return input.list.any()
433+
if isinstance(op.aggregation, agg_ops.AllOp):
434+
return input.list.all()
435+
else:
436+
raise NotImplementedError(
437+
f"Haven't implemented array aggregation: {op.aggregation}"
438+
)
439+
356440
@dataclasses.dataclass(frozen=True)
357441
class PolarsAggregateCompiler:
358442
scalar_compiler = PolarsExpressionCompiler()

bigframes/dataframe.py

Lines changed: 55 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -77,6 +77,7 @@
7777
import bigframes.exceptions as bfe
7878
import bigframes.formatting_helpers as formatter
7979
import bigframes.functions
80+
from bigframes.functions import function_typing
8081
import bigframes.operations as ops
8182
import bigframes.operations.aggregations as agg_ops
8283
import bigframes.operations.ai
@@ -4835,37 +4836,73 @@ def apply(self, func, *, axis=0, args: typing.Tuple = (), **kwargs):
48354836
)
48364837

48374838
# Apply the function
4838-
result_series = rows_as_json_series._apply_unary_op(
4839-
ops.RemoteFunctionOp(function_def=func.udf_def, apply_on_null=True)
4840-
)
4839+
if args:
4840+
result_series = rows_as_json_series._apply_nary_op(
4841+
ops.NaryRemoteFunctionOp(function_def=func.udf_def),
4842+
list(args),
4843+
)
4844+
else:
4845+
result_series = rows_as_json_series._apply_unary_op(
4846+
ops.RemoteFunctionOp(
4847+
function_def=func.udf_def, apply_on_null=True
4848+
)
4849+
)
48414850
else:
48424851
# This is a special case where we are providing not-pandas-like
48434852
# extension. If the bigquery function can take one or more
4844-
# params then we assume that here the user intention is to use
4845-
# the column values of the dataframe as arguments to the
4846-
# function. For this to work the following condition must be
4847-
# true:
4848-
# 1. The number or input params in the function must be same
4849-
# as the number of columns in the dataframe
4853+
# params (excluding the args) then we assume that here the user
4854+
# intention is to use the column values of the dataframe as
4855+
# arguments to the function. For this to work the following
4856+
# condition must be true:
4857+
# 1. The number or input params (excluding the args) in the
4858+
# function must be same as the number of columns in the
4859+
# dataframe.
48504860
# 2. The dtypes of the columns in the dataframe must be
4851-
# compatible with the data types of the input params
4861+
# compatible with the data types of the input params.
48524862
# 3. The order of the columns in the dataframe must correspond
4853-
# to the order of the input params in the function
4863+
# to the order of the input params in the function.
48544864
udf_input_dtypes = func.udf_def.signature.bf_input_types
4855-
if len(udf_input_dtypes) != len(self.columns):
4865+
if not args and len(udf_input_dtypes) != len(self.columns):
48564866
raise ValueError(
4857-
f"BigFrames BigQuery function takes {len(udf_input_dtypes)}"
4858-
f" arguments but DataFrame has {len(self.columns)} columns."
4867+
f"Parameter count mismatch: BigFrames BigQuery function"
4868+
f" expected {len(udf_input_dtypes)} parameters but"
4869+
f" received {len(self.columns)} DataFrame columns."
48594870
)
4860-
if udf_input_dtypes != tuple(self.dtypes.to_list()):
4871+
if args and len(udf_input_dtypes) != len(self.columns) + len(args):
48614872
raise ValueError(
4862-
f"BigFrames BigQuery function takes arguments of types "
4863-
f"{udf_input_dtypes} but DataFrame dtypes are {tuple(self.dtypes)}."
4873+
f"Parameter count mismatch: BigFrames BigQuery function"
4874+
f" expected {len(udf_input_dtypes)} parameters but"
4875+
f" received {len(self.columns) + len(args)} values"
4876+
f" ({len(self.columns)} DataFrame columns and"
4877+
f" {len(args)} args)."
48644878
)
4879+
end_slice = -len(args) if args else None
4880+
if udf_input_dtypes[:end_slice] != tuple(self.dtypes.to_list()):
4881+
raise ValueError(
4882+
f"Data type mismatch for DataFrame columns:"
4883+
f" Expected {udf_input_dtypes[:end_slice]}"
4884+
f" Received {tuple(self.dtypes)}."
4885+
)
4886+
if args:
4887+
bq_types = (
4888+
function_typing.sdk_type_from_python_type(type(arg))
4889+
for arg in args
4890+
)
4891+
args_dtype = tuple(
4892+
function_typing.sdk_type_to_bf_type(bq_type)
4893+
for bq_type in bq_types
4894+
)
4895+
if udf_input_dtypes[end_slice:] != args_dtype:
4896+
raise ValueError(
4897+
f"Data type mismatch for 'args' parameter:"
4898+
f" Expected {udf_input_dtypes[end_slice:]}"
4899+
f" Received {args_dtype}."
4900+
)
48654901

48664902
series_list = [self[col] for col in self.columns]
4903+
op_list = series_list[1:] + list(args)
48674904
result_series = series_list[0]._apply_nary_op(
4868-
ops.NaryRemoteFunctionOp(function_def=func.udf_def), series_list[1:]
4905+
ops.NaryRemoteFunctionOp(function_def=func.udf_def), op_list
48694906
)
48704907
result_series.name = None
48714908

0 commit comments

Comments
 (0)