Skip to content

Commit 8922e5e

Browse files
refactor: define scalar expression structs (#309)
Thank you for opening a Pull Request! Before submitting your PR, there are a few things you can do to make sure it goes smoothly: - [ ] Make sure to open an issue as a [bug/issue](https://togithub.com/googleapis/python-bigquery-dataframes/issues/new/choose) before writing your code! That way we can discuss the change, evaluate designs, and agree on the general idea - [ ] Ensure the tests and linter pass - [ ] Code coverage does not decrease (if any source code was changed) - [ ] Appropriate docs were updated (if necessary) Fixes #<issue_number_goes_here> 🦕
1 parent 5cccd36 commit 8922e5e

File tree

10 files changed

+199
-81
lines changed

10 files changed

+199
-81
lines changed

bigframes/core/__init__.py

Lines changed: 4 additions & 42 deletions
Original file line numberDiff line numberDiff line change
@@ -23,14 +23,14 @@
2323

2424
import bigframes.core.compile.compiled as compiled
2525
import bigframes.core.compile.compiler as compiler
26+
import bigframes.core.expression as expressions
2627
import bigframes.core.guid
2728
import bigframes.core.nodes as nodes
2829
from bigframes.core.ordering import OrderingColumnReference
2930
import bigframes.core.ordering as orderings
3031
import bigframes.core.utils
3132
from bigframes.core.window_spec import WindowSpec
3233
import bigframes.dtypes
33-
import bigframes.operations as ops
3434
import bigframes.operations.aggregations as agg_ops
3535
import bigframes.session._io.bigquery
3636

@@ -152,48 +152,10 @@ def concat(self, other: typing.Sequence[ArrayValue]) -> ArrayValue:
152152
nodes.ConcatNode(children=tuple([self.node, *[val.node for val in other]]))
153153
)
154154

155-
def project_unary_op(
156-
self, column_name: str, op: ops.UnaryOp, output_name=None
157-
) -> ArrayValue:
158-
"""Creates a new expression based on this expression with unary operation applied to one column."""
155+
def project(self, expression: expressions.Expression, output_id: str):
159156
return ArrayValue(
160-
nodes.ProjectRowOpNode(
161-
child=self.node, input_ids=(column_name,), op=op, output_id=output_name
162-
)
163-
)
164-
165-
def project_binary_op(
166-
self,
167-
left_column_id: str,
168-
right_column_id: str,
169-
op: ops.BinaryOp,
170-
output_column_id: str,
171-
) -> ArrayValue:
172-
"""Creates a new expression based on this expression with binary operation applied to two columns."""
173-
return ArrayValue(
174-
nodes.ProjectRowOpNode(
175-
child=self.node,
176-
input_ids=(left_column_id, right_column_id),
177-
op=op,
178-
output_id=output_column_id,
179-
)
180-
)
181-
182-
def project_ternary_op(
183-
self,
184-
col_id_1: str,
185-
col_id_2: str,
186-
col_id_3: str,
187-
op: ops.TernaryOp,
188-
output_column_id: str,
189-
) -> ArrayValue:
190-
"""Creates a new expression based on this expression with ternary operation applied to three columns."""
191-
return ArrayValue(
192-
nodes.ProjectRowOpNode(
193-
child=self.node,
194-
input_ids=(col_id_1, col_id_2, col_id_3),
195-
op=op,
196-
output_id=output_column_id,
157+
nodes.ProjectionNode(
158+
child=self.node, assignments=((expression, output_id),)
197159
)
198160
)
199161

bigframes/core/blocks.py

Lines changed: 22 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -669,7 +669,7 @@ def apply_unary_op(
669669
"""
670670
# TODO(tbergeron): handle labels safely so callers don't need to
671671
result_id = guid.generate_guid()
672-
expr = self._expr.project_unary_op(column, op, result_id)
672+
expr = self._expr.project(op.as_expr(column), result_id)
673673
block = Block(
674674
expr,
675675
index_columns=self.index_columns,
@@ -686,8 +686,8 @@ def apply_binary_op(
686686
result_label: Label = None,
687687
) -> typing.Tuple[Block, str]:
688688
result_id = guid.generate_guid()
689-
expr = self._expr.project_binary_op(
690-
left_column_id, right_column_id, op, result_id
689+
expr = self._expr.project(
690+
op.as_expr(left_column_id, right_column_id), result_id
691691
)
692692
block = Block(
693693
expr,
@@ -706,9 +706,7 @@ def apply_ternary_op(
706706
result_label: Label = None,
707707
) -> typing.Tuple[Block, str]:
708708
result_id = guid.generate_guid()
709-
expr = self._expr.project_ternary_op(
710-
col_id_1, col_id_2, col_id_3, op, result_id
711-
)
709+
expr = self._expr.project(op.as_expr(col_id_1, col_id_2, col_id_3), result_id)
712710
block = Block(
713711
expr,
714712
index_columns=self.index_columns,
@@ -1240,9 +1238,14 @@ def add_prefix(self, prefix: str, axis: str | int | None = None) -> Block:
12401238
if axis_number == 0:
12411239
expr = self._expr
12421240
for index_col in self._index_columns:
1243-
expr = expr.project_unary_op(index_col, ops.AsTypeOp(to_type="string"))
1241+
expr = expr.project(
1242+
expression=ops.AsTypeOp(to_type="string").as_expr(index_col),
1243+
output_id=index_col,
1244+
)
12441245
prefix_op = ops.ApplyLeft(base_op=ops.add_op, left_scalar=prefix)
1245-
expr = expr.project_unary_op(index_col, prefix_op)
1246+
expr = expr.project(
1247+
expression=prefix_op.as_expr(index_col), output_id=index_col
1248+
)
12461249
return Block(
12471250
expr,
12481251
index_columns=self.index_columns,
@@ -1259,9 +1262,14 @@ def add_suffix(self, suffix: str, axis: str | int | None = None) -> Block:
12591262
if axis_number == 0:
12601263
expr = self._expr
12611264
for index_col in self._index_columns:
1262-
expr = expr.project_unary_op(index_col, ops.AsTypeOp(to_type="string"))
1265+
expr = expr.project(
1266+
expression=ops.AsTypeOp(to_type="string").as_expr(index_col),
1267+
output_id=index_col,
1268+
)
12631269
prefix_op = ops.ApplyRight(base_op=ops.add_op, right_scalar=suffix)
1264-
expr = expr.project_unary_op(index_col, prefix_op)
1270+
expr = expr.project(
1271+
expression=prefix_op.as_expr(index_col), output_id=index_col
1272+
)
12651273
return Block(
12661274
expr,
12671275
index_columns=self.index_columns,
@@ -1568,10 +1576,10 @@ def merge(
15681576
coalesced_ids = []
15691577
for left_id, right_id in zip(left_join_ids, right_join_ids):
15701578
coalesced_id = guid.generate_guid()
1571-
joined_expr = joined_expr.project_binary_op(
1572-
get_column_left[left_id],
1573-
get_column_right[right_id],
1574-
ops.coalesce_op,
1579+
joined_expr = joined_expr.project(
1580+
ops.coalesce_op.as_expr(
1581+
get_column_left[left_id], get_column_right[right_id]
1582+
),
15751583
coalesced_id,
15761584
)
15771585
coalesced_ids.append(coalesced_id)

bigframes/core/compile/compiled.py

Lines changed: 9 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -28,6 +28,7 @@
2828

2929
import bigframes.constants as constants
3030
import bigframes.core.compile.scalar_op_compiler as op_compilers
31+
import bigframes.core.expression as expressions
3132
import bigframes.core.guid
3233
from bigframes.core.ordering import (
3334
encode_order_string,
@@ -151,18 +152,19 @@ def _reproject_to_table(self: T) -> T:
151152
"""
152153
...
153154

154-
def project_row_op(
155+
def project_expression(
155156
self: T,
156-
input_column_ids: typing.Sequence[str],
157-
op: ops.RowOp,
157+
expression: expressions.Expression,
158158
output_column_id: typing.Optional[str] = None,
159159
) -> T:
160-
"""Creates a new expression based on this expression with unary operation applied to one column."""
160+
"""Apply an expression to the ArrayValue and assign the output to a column."""
161161
result_id = (
162-
output_column_id or input_column_ids[0]
162+
output_column_id or expression.unbound_variables[0]
163163
) # overwrite input if not output id provided
164-
inputs = tuple(self._get_ibis_column(col) for col in input_column_ids)
165-
value = op_compiler.compile_row_op(op, inputs).name(result_id)
164+
bindings = {
165+
col: self._get_ibis_column(col) for col in expression.unbound_variables
166+
}
167+
value = op_compiler.compile_expression(expression, bindings).name(result_id)
166168
return self._set_or_replace_by_id(result_id, value)
167169

168170
def assign(self: T, source_id: str, destination_id: str) -> T:

bigframes/core/compile/compiler.py

Lines changed: 5 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -143,10 +143,11 @@ def compile_reversed(node: nodes.ReversedNode, ordered: bool = True):
143143

144144

145145
@_compile_node.register
146-
def compile_project(node: nodes.ProjectRowOpNode, ordered: bool = True):
147-
return compile_node(node.child, ordered).project_row_op(
148-
node.input_ids, node.op, node.output_id
149-
)
146+
def compile_projection(node: nodes.ProjectionNode, ordered: bool = True):
147+
result = compile_node(node.child, ordered)
148+
for expr, id in node.assignments:
149+
result = result.project_expression(expr, id)
150+
return result
150151

151152

152153
@_compile_node.register

bigframes/core/compile/scalar_op_compiler.py

Lines changed: 40 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -26,6 +26,7 @@
2626
import pandas as pd
2727

2828
import bigframes.constants as constants
29+
import bigframes.core.expression as expressions
2930
import bigframes.dtypes
3031
import bigframes.dtypes as dtypes
3132
import bigframes.operations as ops
@@ -50,6 +51,45 @@ class ScalarOpCompiler:
5051
],
5152
] = {}
5253

54+
@functools.singledispatchmethod
55+
def compile_expression(
56+
self,
57+
expression: expressions.Expression,
58+
bindings: typing.Dict[str, ibis_types.Value],
59+
) -> ibis_types.Value:
60+
raise NotImplementedError(f"Unrecognized expression: {expression}")
61+
62+
@compile_expression.register
63+
def _(
64+
self,
65+
expression: expressions.ScalarConstantExpression,
66+
bindings: typing.Dict[str, ibis_types.Value],
67+
) -> ibis_types.Value:
68+
return ibis.literal(expression.value)
69+
70+
@compile_expression.register
71+
def _(
72+
self,
73+
expression: expressions.UnboundVariableExpression,
74+
bindings: typing.Dict[str, ibis_types.Value],
75+
) -> ibis_types.Value:
76+
if expression.id not in bindings:
77+
raise ValueError(f"Could not resolve unbound variable {expression.id}")
78+
else:
79+
return bindings[expression.id]
80+
81+
@compile_expression.register
82+
def _(
83+
self,
84+
expression: expressions.OpExpression,
85+
bindings: typing.Dict[str, ibis_types.Value],
86+
) -> ibis_types.Value:
87+
inputs = [
88+
self.compile_expression(sub_expr, bindings)
89+
for sub_expr in expression.inputs
90+
]
91+
return self.compile_row_op(expression.op, inputs)
92+
5393
def compile_row_op(
5494
self, op: ops.RowOp, inputs: typing.Sequence[ibis_types.Value]
5595
) -> ibis_types.Value:

bigframes/core/expression.py

Lines changed: 69 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,69 @@
1+
# Copyright 2023 Google LLC
2+
#
3+
# Licensed under the Apache License, Version 2.0 (the "License");
4+
# you may not use this file except in compliance with the License.
5+
# You may obtain a copy of the License at
6+
#
7+
# http://www.apache.org/licenses/LICENSE-2.0
8+
#
9+
# Unless required by applicable law or agreed to in writing, software
10+
# distributed under the License is distributed on an "AS IS" BASIS,
11+
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12+
# See the License for the specific language governing permissions and
13+
# limitations under the License.
14+
15+
from __future__ import annotations
16+
17+
import abc
18+
import dataclasses
19+
import itertools
20+
import typing
21+
22+
import bigframes.operations
23+
24+
25+
@dataclasses.dataclass(frozen=True)
26+
class Expression(abc.ABC):
27+
"""An expression represents a computation taking N scalar inputs and producing a single output scalar."""
28+
29+
@property
30+
def unbound_variables(self) -> typing.Tuple[str, ...]:
31+
return ()
32+
33+
34+
@dataclasses.dataclass(frozen=True)
35+
class ScalarConstantExpression(Expression):
36+
"""An expression representing a scalar constant."""
37+
38+
# TODO: Further constrain?
39+
value: typing.Hashable
40+
41+
42+
@dataclasses.dataclass(frozen=True)
43+
class UnboundVariableExpression(Expression):
44+
"""A variable expression representing an unbound variable."""
45+
46+
id: str
47+
48+
@property
49+
def unbound_variables(self) -> typing.Tuple[str, ...]:
50+
return (self.id,)
51+
52+
53+
@dataclasses.dataclass(frozen=True)
54+
class OpExpression(Expression):
55+
"""An expression representing a scalar operation applied to 1 or more argument sub-expressions."""
56+
57+
op: bigframes.operations.RowOp
58+
inputs: typing.Tuple[Expression, ...]
59+
60+
def __post_init__(self):
61+
assert self.op.arguments == len(self.inputs)
62+
63+
@property
64+
def unbound_variables(self) -> typing.Tuple[str, ...]:
65+
return tuple(
66+
itertools.chain.from_iterable(
67+
map(lambda x: x.unbound_variables, self.inputs)
68+
)
69+
)

bigframes/core/indexes/index.py

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -604,8 +604,8 @@ def coalesce_columns(
604604
expr = expr.drop_columns([left_id])
605605
elif how == "outer":
606606
coalesced_id = bigframes.core.guid.generate_guid()
607-
expr = expr.project_binary_op(
608-
left_id, right_id, ops.coalesce_op, coalesced_id
607+
expr = expr.project(
608+
ops.coalesce_op.as_expr(left_id, right_id), coalesced_id
609609
)
610610
expr = expr.drop_columns([left_id, right_id])
611611
result_ids.append(coalesced_id)

bigframes/core/nodes.py

Lines changed: 4 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -17,15 +17,15 @@
1717
from dataclasses import dataclass, field, fields
1818
import functools
1919
import typing
20-
from typing import Optional, Tuple
20+
from typing import Tuple
2121

2222
import pandas
2323

24+
import bigframes.core.expression as expressions
2425
import bigframes.core.guid
2526
from bigframes.core.ordering import OrderingColumnReference
2627
import bigframes.core.window_spec as window
2728
import bigframes.dtypes
28-
import bigframes.operations as ops
2929
import bigframes.operations.aggregations as agg_ops
3030

3131
if typing.TYPE_CHECKING:
@@ -196,10 +196,8 @@ def __hash__(self):
196196

197197

198198
@dataclass(frozen=True)
199-
class ProjectRowOpNode(UnaryNode):
200-
input_ids: typing.Tuple[str, ...]
201-
op: ops.RowOp
202-
output_id: Optional[str] = None
199+
class ProjectionNode(UnaryNode):
200+
assignments: typing.Tuple[typing.Tuple[expressions.Expression, str], ...]
203201

204202
def __hash__(self):
205203
return self._node_hash

0 commit comments

Comments
 (0)