Skip to content

Commit 4e7e67b

Browse files
refactor: SQL builder supports partial ordering (#838)
1 parent 8d1a03a commit 4e7e67b

File tree

4 files changed

+101
-32
lines changed

4 files changed

+101
-32
lines changed

bigframes/core/compile/api.py

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -51,7 +51,8 @@ def compile_raw(
5151
ir = compiler.compile_ordered_ir(node)
5252
sql = ir.raw_sql()
5353
ordering_info = ir._ordering
54-
return sql, ordering_info
54+
assert ir.has_total_order
55+
return sql, ordering_info # type: ignore
5556

5657

5758
def test_only_try_evaluate(node: bigframes.core.nodes.BigFrameNode):

bigframes/core/compile/compiled.py

Lines changed: 28 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -37,7 +37,9 @@
3737
ascending_over,
3838
encode_order_string,
3939
IntegerEncoding,
40+
join_orderings,
4041
OrderingExpression,
42+
RowOrdering,
4143
TotalOrdering,
4244
)
4345
import bigframes.core.schema as schemata
@@ -519,7 +521,7 @@ def __init__(
519521
table: ibis_types.Table,
520522
columns: Sequence[ibis_types.Value],
521523
hidden_ordering_columns: Optional[Sequence[ibis_types.Value]] = None,
522-
ordering: TotalOrdering = TotalOrdering(),
524+
ordering: RowOrdering = RowOrdering(),
523525
predicates: Optional[Collection[ibis_types.BooleanValue]] = None,
524526
):
525527
super().__init__(table, columns, predicates)
@@ -566,6 +568,10 @@ def __init__(
566568
def is_ordered_ir(self) -> bool:
567569
return True
568570

571+
@property
572+
def has_total_order(self) -> bool:
573+
return isinstance(self._ordering, TotalOrdering)
574+
569575
@classmethod
570576
def from_pandas(
571577
cls,
@@ -757,16 +763,13 @@ def explode(self, column_ids: typing.Sequence[str]) -> OrderedIR:
757763
],
758764
table_w_unnest[unnest_offset_id],
759765
]
760-
ordering = TotalOrdering(
761-
ordering_value_columns=tuple(
762-
[
763-
*self._ordering.ordering_value_columns,
764-
ascending_over(unnest_offset_id),
765-
]
766-
),
767-
total_ordering_columns=frozenset(
768-
[*self._ordering.total_ordering_columns, unnest_offset_id]
769-
),
766+
l_mappings = {id: id for id in self._ordering.referenced_columns}
767+
r_mappings = {unnest_offset_id: unnest_offset_id}
768+
ordering = join_orderings(
769+
self._ordering,
770+
TotalOrdering.from_offset_col(unnest_offset_id),
771+
l_mappings,
772+
r_mappings,
770773
)
771774

772775
return OrderedIR(
@@ -1150,12 +1153,19 @@ def _bake_ordering(self) -> OrderedIR:
11501153
self._ibis_bindings[expr.scalar_expression.id]
11511154
)
11521155

1153-
new_ordering = TotalOrdering(
1154-
tuple(new_exprs),
1155-
self._ordering.integer_encoding,
1156-
self._ordering.string_encoding,
1157-
self._ordering.total_ordering_columns,
1158-
)
1156+
if isinstance(self._ordering, TotalOrdering):
1157+
new_ordering: RowOrdering = TotalOrdering(
1158+
tuple(new_exprs),
1159+
self._ordering.integer_encoding,
1160+
self._ordering.string_encoding,
1161+
self._ordering.total_ordering_columns,
1162+
)
1163+
else:
1164+
new_ordering = RowOrdering(
1165+
tuple(new_exprs),
1166+
self._ordering.integer_encoding,
1167+
self._ordering.string_encoding,
1168+
)
11591169
return OrderedIR(
11601170
self._table,
11611171
columns=self.columns,
@@ -1297,7 +1307,7 @@ class Builder:
12971307
def __init__(
12981308
self,
12991309
table: ibis_types.Table,
1300-
ordering: TotalOrdering,
1310+
ordering: RowOrdering,
13011311
columns: Collection[ibis_types.Value] = (),
13021312
hidden_ordering_columns: Collection[ibis_types.Value] = (),
13031313
predicates: Optional[Collection[ibis_types.BooleanValue]] = None,

bigframes/core/compile/compiler.py

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -38,7 +38,9 @@
3838

3939

4040
def compile_ordered_ir(node: nodes.BigFrameNode) -> compiled.OrderedIR:
41-
return typing.cast(compiled.OrderedIR, compile_node(node, True))
41+
ir = typing.cast(compiled.OrderedIR, compile_node(node, True))
42+
assert ir.has_total_order
43+
return ir
4244

4345

4446
def compile_unordered_ir(node: nodes.BigFrameNode) -> compiled.UnorderedIR:

bigframes/core/ordering.py

Lines changed: 68 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -98,6 +98,8 @@ class RowOrdering:
9898
"""Immutable object that holds information about the ordering of rows in a ArrayValue object. May not be unambiguous."""
9999

100100
ordering_value_columns: typing.Tuple[OrderingExpression, ...] = ()
101+
integer_encoding: IntegerEncoding = IntegerEncoding(False)
102+
string_encoding: StringEncoding = StringEncoding(False)
101103

102104
@property
103105
def all_ordering_columns(self) -> Sequence[OrderingExpression]:
@@ -111,6 +113,20 @@ def referenced_columns(self) -> Set[str]:
111113
for col in part.scalar_expression.unbound_variables
112114
)
113115

116+
@property
117+
def is_string_encoded(self) -> bool:
118+
"""True if ordering is fully defined by a fixed length string column."""
119+
return self.string_encoding.is_encoded
120+
121+
@property
122+
def is_sequential(self) -> bool:
123+
return self.integer_encoding.is_encoded and self.integer_encoding.is_sequential
124+
125+
@property
126+
def total_order_col(self) -> Optional[OrderingExpression]:
127+
"""Returns column id of columns that defines total ordering, if such as column exists"""
128+
return None
129+
114130
def with_reverse(self) -> RowOrdering:
115131
"""Reverses the ordering."""
116132
return RowOrdering(
@@ -121,17 +137,66 @@ def with_column_remap(self, mapping: typing.Mapping[str, str]) -> RowOrdering:
121137
new_value_columns = [
122138
col.remap_names(mapping) for col in self.all_ordering_columns
123139
]
124-
return TotalOrdering(
140+
return RowOrdering(
125141
tuple(new_value_columns),
126142
)
127143

144+
def with_non_sequential(self):
145+
"""Create a copy that is marked as non-sequential.
146+
147+
This is useful when filtering, but not sorting, an expression.
148+
"""
149+
if self.integer_encoding.is_sequential:
150+
return RowOrdering(
151+
self.ordering_value_columns,
152+
integer_encoding=IntegerEncoding(
153+
self.integer_encoding.is_encoded, is_sequential=False
154+
),
155+
)
156+
157+
return self
158+
159+
def with_ordering_columns(
160+
self,
161+
ordering_value_columns: Sequence[OrderingExpression] = (),
162+
) -> RowOrdering:
163+
"""Creates a new ordering that reorders by the given columns.
164+
165+
Args:
166+
ordering_value_columns:
167+
In decreasing precedence order, the values used to sort the ordering
168+
169+
Returns:
170+
Modified ExpressionOrdering
171+
"""
172+
173+
# Truncate to remove any unneded col references after all total order cols included
174+
new_ordering = self._truncate_ordering(
175+
(*ordering_value_columns, *self.ordering_value_columns)
176+
)
177+
return RowOrdering(
178+
new_ordering,
179+
)
180+
181+
def _truncate_ordering(
182+
self, order_refs: tuple[OrderingExpression, ...]
183+
) -> tuple[OrderingExpression, ...]:
184+
# Truncate once we refer to a full key in bijective operations
185+
columns_seen: Set[str] = set()
186+
truncated_refs = []
187+
for order_part in order_refs:
188+
expr = order_part.scalar_expression
189+
if not set(expr.unbound_variables).issubset(columns_seen):
190+
if expr.is_bijective:
191+
columns_seen.update(expr.unbound_variables)
192+
truncated_refs.append(order_part)
193+
return tuple(truncated_refs)
194+
128195

129196
@dataclass(frozen=True)
130197
class TotalOrdering(RowOrdering):
131198
"""Immutable object that holds information about the ordering of rows in a ArrayValue object. Guaranteed to be unambiguous."""
132199

133-
integer_encoding: IntegerEncoding = IntegerEncoding(False)
134-
string_encoding: StringEncoding = StringEncoding(False)
135200
# A table has a total ordering defined by the identities of a set of 1 or more columns.
136201
# These columns must always be part of the ordering, in order to guarantee that the ordering is total.
137202
# Therefore, any modifications(or drops) done to these columns must result in hidden copies being made.
@@ -234,15 +299,6 @@ def total_order_col(self) -> Optional[OrderingExpression]:
234299
return None
235300
return order_ref
236301

237-
@property
238-
def is_string_encoded(self) -> bool:
239-
"""True if ordering is fully defined by a fixed length string column."""
240-
return self.string_encoding.is_encoded
241-
242-
@property
243-
def is_sequential(self) -> bool:
244-
return self.integer_encoding.is_encoded and self.integer_encoding.is_sequential
245-
246302

247303
def encode_order_string(
248304
order_id: ibis_types.IntegerColumn, length: int = DEFAULT_ORDERING_ID_LENGTH

0 commit comments

Comments
 (0)