Skip to content

Commit e228010

Browse files
refactor: Create ordering base class to represent partial ordering (#814)
1 parent bca4ee0 commit e228010

File tree

10 files changed

+116
-76
lines changed

10 files changed

+116
-76
lines changed

bigframes/core/__init__.py

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -78,7 +78,7 @@ def from_cached(
7878
cls,
7979
original: ArrayValue,
8080
table: google.cloud.bigquery.Table,
81-
ordering: orderings.ExpressionOrdering,
81+
ordering: orderings.TotalOrdering,
8282
):
8383
node = nodes.CachedTableNode(
8484
original_node=original.node,
@@ -147,7 +147,7 @@ def _compiled_schema(self) -> schemata.ArraySchema:
147147
def as_cached(
148148
self: ArrayValue,
149149
cache_table: google.cloud.bigquery.Table,
150-
ordering: Optional[orderings.ExpressionOrdering],
150+
ordering: Optional[orderings.TotalOrdering],
151151
) -> ArrayValue:
152152
"""
153153
Replace the node with an equivalent one that references a tabel where the value has been materialized to.

bigframes/core/compile/api.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -46,7 +46,7 @@ def compile_ordered(
4646

4747
def compile_raw(
4848
node: bigframes.core.nodes.BigFrameNode,
49-
) -> Tuple[str, bigframes.core.ordering.ExpressionOrdering]:
49+
) -> Tuple[str, bigframes.core.ordering.TotalOrdering]:
5050
"""Compile node into sql that exposes all columns, including hidden ordering-only columns."""
5151
ir = compiler.compile_ordered_ir(node)
5252
sql = ir.raw_sql()

bigframes/core/compile/compiled.py

Lines changed: 10 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -36,9 +36,9 @@
3636
from bigframes.core.ordering import (
3737
ascending_over,
3838
encode_order_string,
39-
ExpressionOrdering,
4039
IntegerEncoding,
4140
OrderingExpression,
41+
TotalOrdering,
4242
)
4343
import bigframes.core.schema as schemata
4444
import bigframes.core.sql
@@ -194,7 +194,7 @@ def _aggregate_base(
194194
if by_column_ids:
195195
result = table.group_by(by_column_ids).aggregate(**stats)
196196
# Must have deterministic ordering, so order by the unique "by" column
197-
ordering = ExpressionOrdering(
197+
ordering = TotalOrdering(
198198
tuple([ascending_over(column_id) for column_id in by_column_ids]),
199199
total_ordering_columns=frozenset(by_column_ids),
200200
)
@@ -210,7 +210,7 @@ def _aggregate_base(
210210
# Ordering is irrelevant for single-row output, but set ordering id regardless
211211
# as other ops(join etc.) expect it.
212212
# TODO: Maybe can make completely empty
213-
ordering = ExpressionOrdering(
213+
ordering = TotalOrdering(
214214
ordering_value_columns=tuple([]),
215215
total_ordering_columns=frozenset([]),
216216
)
@@ -279,7 +279,7 @@ def row_count(self) -> OrderedIR:
279279
return OrderedIR(
280280
ibis_table,
281281
(ibis_table["count"],),
282-
ordering=ExpressionOrdering(
282+
ordering=TotalOrdering(
283283
ordering_value_columns=(ascending_over("count"),),
284284
total_ordering_columns=frozenset(["count"]),
285285
),
@@ -519,7 +519,7 @@ def __init__(
519519
table: ibis_types.Table,
520520
columns: Sequence[ibis_types.Value],
521521
hidden_ordering_columns: Optional[Sequence[ibis_types.Value]] = None,
522-
ordering: ExpressionOrdering = ExpressionOrdering(),
522+
ordering: TotalOrdering = TotalOrdering(),
523523
predicates: Optional[Collection[ibis_types.BooleanValue]] = None,
524524
):
525525
super().__init__(table, columns, predicates)
@@ -598,10 +598,7 @@ def from_pandas(
598598
return cls(
599599
keys_memtable,
600600
columns=[keys_memtable[column].name(column) for column in pd_df.columns],
601-
ordering=ExpressionOrdering(
602-
ordering_value_columns=tuple([ascending_over(ORDER_ID_COLUMN)]),
603-
total_ordering_columns=frozenset([ORDER_ID_COLUMN]),
604-
),
601+
ordering=TotalOrdering.from_offset_col(ORDER_ID_COLUMN),
605602
hidden_ordering_columns=(keys_memtable[ORDER_ID_COLUMN],),
606603
)
607604

@@ -760,7 +757,7 @@ def explode(self, column_ids: typing.Sequence[str]) -> OrderedIR:
760757
],
761758
table_w_unnest[unnest_offset_id],
762759
]
763-
ordering = ExpressionOrdering(
760+
ordering = TotalOrdering(
764761
ordering_value_columns=tuple(
765762
[
766763
*self._ordering.ordering_value_columns,
@@ -1153,7 +1150,7 @@ def _bake_ordering(self) -> OrderedIR:
11531150
self._ibis_bindings[expr.scalar_expression.id]
11541151
)
11551152

1156-
new_ordering = ExpressionOrdering(
1153+
new_ordering = TotalOrdering(
11571154
tuple(new_exprs),
11581155
self._ordering.integer_encoding,
11591156
self._ordering.string_encoding,
@@ -1176,7 +1173,7 @@ def _project_offsets(self) -> OrderedIR:
11761173
ordering_mode="offset_col", order_col_name=ORDER_ID_COLUMN
11771174
)
11781175
columns = [table[column_name] for column_name in self._column_names]
1179-
ordering = ExpressionOrdering(
1176+
ordering = TotalOrdering(
11801177
ordering_value_columns=tuple([ascending_over(ORDER_ID_COLUMN)]),
11811178
total_ordering_columns=frozenset([ORDER_ID_COLUMN]),
11821179
integer_encoding=IntegerEncoding(True, is_sequential=True),
@@ -1300,7 +1297,7 @@ class Builder:
13001297
def __init__(
13011298
self,
13021299
table: ibis_types.Table,
1303-
ordering: ExpressionOrdering,
1300+
ordering: TotalOrdering,
13041301
columns: Collection[ibis_types.Value] = (),
13051302
hidden_ordering_columns: Collection[ibis_types.Value] = (),
13061303
predicates: Optional[Collection[ibis_types.BooleanValue]] = None,

bigframes/core/compile/compiler.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -195,7 +195,7 @@ def compile_read_table_ordered(node: nodes.ReadTableNode):
195195
)
196196
else:
197197
integer_encoding = bf_ordering.IntegerEncoding()
198-
ordering = bf_ordering.ExpressionOrdering(
198+
ordering = bf_ordering.TotalOrdering(
199199
ordering_value_columns,
200200
integer_encoding=integer_encoding,
201201
total_ordering_columns=frozenset(node.total_order_cols),

bigframes/core/compile/concat.py

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -21,9 +21,9 @@
2121
import bigframes.core.compile.compiled as compiled
2222
from bigframes.core.ordering import (
2323
ascending_over,
24-
ExpressionOrdering,
2524
reencode_order_string,
2625
StringEncoding,
26+
TotalOrdering,
2727
)
2828

2929
ORDER_ID_COLUMN = "bigframes_ordering_id"
@@ -83,7 +83,7 @@ def concat_ordered(
8383
)
8484
tables.append(table)
8585
combined_table = ibis.union(*tables)
86-
ordering = ExpressionOrdering(
86+
ordering = TotalOrdering(
8787
ordering_value_columns=tuple([ascending_over(ORDER_ID_COLUMN)]),
8888
total_ordering_columns=frozenset([ORDER_ID_COLUMN]),
8989
string_encoding=StringEncoding(True, prefix_size + max_encoding_size),

bigframes/core/compile/default_ordering.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -82,7 +82,7 @@ def gen_default_ordering(table: ibis.table, use_double_hash: bool = True):
8282
itertools.chain(original_column_ids, order_values)
8383
)
8484

85-
ordering = order.ExpressionOrdering(
85+
ordering = order.TotalOrdering(
8686
ordering_value_columns=tuple(
8787
order.ascending_over(col.get_name()) for col in order_values
8888
),

bigframes/core/compile/single_column.py

Lines changed: 1 addition & 33 deletions
Original file line numberDiff line numberDiff line change
@@ -16,8 +16,6 @@
1616

1717
from __future__ import annotations
1818

19-
from typing import Mapping
20-
2119
import ibis
2220
import ibis.expr.datatypes as ibis_dtypes
2321
import ibis.expr.types as ibis_types
@@ -84,7 +82,7 @@ def join_by_column_ordered(
8482
)
8583

8684
# Preserve ordering accross joins.
87-
ordering = join_orderings(
85+
ordering = orderings.join_orderings(
8886
left._ordering,
8987
right._ordering,
9088
l_mapping,
@@ -173,33 +171,3 @@ def value_to_join_key(value: ibis_types.Value):
173171
if not value.type().is_string():
174172
value = value.cast(ibis_dtypes.str)
175173
return value.fillna(ibis_types.literal("$NULL_SENTINEL$"))
176-
177-
178-
def join_orderings(
179-
left: orderings.ExpressionOrdering,
180-
right: orderings.ExpressionOrdering,
181-
left_id_mapping: Mapping[str, str],
182-
right_id_mapping: Mapping[str, str],
183-
left_order_dominates: bool = True,
184-
) -> orderings.ExpressionOrdering:
185-
left_ordering_refs = [
186-
ref.remap_names(left_id_mapping) for ref in left.all_ordering_columns
187-
]
188-
right_ordering_refs = [
189-
ref.remap_names(right_id_mapping) for ref in right.all_ordering_columns
190-
]
191-
if left_order_dominates:
192-
joined_refs = [*left_ordering_refs, *right_ordering_refs]
193-
else:
194-
joined_refs = [*right_ordering_refs, *left_ordering_refs]
195-
196-
left_total_order_cols = frozenset(
197-
[left_id_mapping[id] for id in left.total_ordering_columns]
198-
)
199-
right_total_order_cols = frozenset(
200-
[right_id_mapping[id] for id in right.total_ordering_columns]
201-
)
202-
return orderings.ExpressionOrdering(
203-
ordering_value_columns=tuple(joined_refs),
204-
total_ordering_columns=left_total_order_cols | right_total_order_cols,
205-
)

bigframes/core/nodes.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -373,7 +373,7 @@ class CachedTableNode(BigFrameNode):
373373
table_id: str = field()
374374
physical_schema: Tuple[bq.SchemaField, ...] = field()
375375

376-
ordering: typing.Optional[orderings.ExpressionOrdering] = field()
376+
ordering: typing.Optional[orderings.TotalOrdering] = field()
377377

378378
def __post_init__(self):
379379
# enforce invariants

bigframes/core/ordering.py

Lines changed: 96 additions & 21 deletions
Original file line numberDiff line numberDiff line change
@@ -94,10 +94,42 @@ class IntegerEncoding:
9494

9595

9696
@dataclass(frozen=True)
97-
class ExpressionOrdering:
98-
"""Immutable object that holds information about the ordering of rows in a ArrayValue object."""
97+
class RowOrdering:
98+
"""Immutable object that holds information about the ordering of rows in a ArrayValue object. May not be unambiguous."""
9999

100100
ordering_value_columns: typing.Tuple[OrderingExpression, ...] = ()
101+
102+
@property
103+
def all_ordering_columns(self) -> Sequence[OrderingExpression]:
104+
return list(self.ordering_value_columns)
105+
106+
@property
107+
def referenced_columns(self) -> Set[str]:
108+
return set(
109+
col
110+
for part in self.ordering_value_columns
111+
for col in part.scalar_expression.unbound_variables
112+
)
113+
114+
def with_reverse(self) -> RowOrdering:
115+
"""Reverses the ordering."""
116+
return RowOrdering(
117+
tuple([col.with_reverse() for col in self.ordering_value_columns]),
118+
)
119+
120+
def with_column_remap(self, mapping: typing.Mapping[str, str]) -> RowOrdering:
121+
new_value_columns = [
122+
col.remap_names(mapping) for col in self.all_ordering_columns
123+
]
124+
return TotalOrdering(
125+
tuple(new_value_columns),
126+
)
127+
128+
129+
@dataclass(frozen=True)
130+
class TotalOrdering(RowOrdering):
131+
"""Immutable object that holds information about the ordering of rows in a ArrayValue object. Guaranteed to be unambiguous."""
132+
101133
integer_encoding: IntegerEncoding = IntegerEncoding(False)
102134
string_encoding: StringEncoding = StringEncoding(False)
103135
# A table has a total ordering defined by the identities of a set of 1 or more columns.
@@ -106,8 +138,8 @@ class ExpressionOrdering:
106138
total_ordering_columns: frozenset[str] = field(default_factory=frozenset)
107139

108140
@classmethod
109-
def from_offset_col(cls, col: str) -> ExpressionOrdering:
110-
return ExpressionOrdering(
141+
def from_offset_col(cls, col: str) -> TotalOrdering:
142+
return TotalOrdering(
111143
(ascending_over(col),),
112144
integer_encoding=IntegerEncoding(True, is_sequential=True),
113145
total_ordering_columns=frozenset({col}),
@@ -119,7 +151,7 @@ def with_non_sequential(self):
119151
This is useful when filtering, but not sorting, an expression.
120152
"""
121153
if self.integer_encoding.is_sequential:
122-
return ExpressionOrdering(
154+
return TotalOrdering(
123155
self.ordering_value_columns,
124156
integer_encoding=IntegerEncoding(
125157
self.integer_encoding.is_encoded, is_sequential=False
@@ -132,7 +164,7 @@ def with_non_sequential(self):
132164
def with_ordering_columns(
133165
self,
134166
ordering_value_columns: Sequence[OrderingExpression] = (),
135-
) -> ExpressionOrdering:
167+
) -> TotalOrdering:
136168
"""Creates a new ordering that reorders by the given columns.
137169
138170
Args:
@@ -147,7 +179,7 @@ def with_ordering_columns(
147179
new_ordering = self._truncate_ordering(
148180
(*ordering_value_columns, *self.ordering_value_columns)
149181
)
150-
return ExpressionOrdering(
182+
return TotalOrdering(
151183
new_ordering,
152184
total_ordering_columns=self.total_ordering_columns,
153185
)
@@ -173,7 +205,7 @@ def _truncate_ordering(
173205

174206
def with_reverse(self):
175207
"""Reverses the ordering."""
176-
return ExpressionOrdering(
208+
return TotalOrdering(
177209
tuple([col.with_reverse() for col in self.ordering_value_columns]),
178210
total_ordering_columns=self.total_ordering_columns,
179211
)
@@ -185,7 +217,7 @@ def with_column_remap(self, mapping: typing.Mapping[str, str]):
185217
new_total_order = frozenset(
186218
mapping.get(col_id, col_id) for col_id in self.total_ordering_columns
187219
)
188-
return ExpressionOrdering(
220+
return TotalOrdering(
189221
tuple(new_value_columns),
190222
integer_encoding=self.integer_encoding,
191223
string_encoding=self.string_encoding,
@@ -211,18 +243,6 @@ def is_string_encoded(self) -> bool:
211243
def is_sequential(self) -> bool:
212244
return self.integer_encoding.is_encoded and self.integer_encoding.is_sequential
213245

214-
@property
215-
def all_ordering_columns(self) -> Sequence[OrderingExpression]:
216-
return list(self.ordering_value_columns)
217-
218-
@property
219-
def referenced_columns(self) -> Set[str]:
220-
return set(
221-
col
222-
for part in self.ordering_value_columns
223-
for col in part.scalar_expression.unbound_variables
224-
)
225-
226246

227247
def encode_order_string(
228248
order_id: ibis_types.IntegerColumn, length: int = DEFAULT_ORDERING_ID_LENGTH
@@ -257,3 +277,58 @@ def descending_over(id: str, nulls_last: bool = True) -> OrderingExpression:
257277
return OrderingExpression(
258278
expression.free_var(id), direction=OrderingDirection.DESC, na_last=nulls_last
259279
)
280+
281+
282+
@typing.overload
283+
def join_orderings(
284+
left: TotalOrdering,
285+
right: TotalOrdering,
286+
left_id_mapping: Mapping[str, str],
287+
right_id_mapping: Mapping[str, str],
288+
left_order_dominates: bool = True,
289+
) -> TotalOrdering:
290+
...
291+
292+
293+
@typing.overload
294+
def join_orderings(
295+
left: RowOrdering,
296+
right: RowOrdering,
297+
left_id_mapping: Mapping[str, str],
298+
right_id_mapping: Mapping[str, str],
299+
left_order_dominates: bool = True,
300+
) -> RowOrdering:
301+
...
302+
303+
304+
def join_orderings(
305+
left: RowOrdering,
306+
right: RowOrdering,
307+
left_id_mapping: Mapping[str, str],
308+
right_id_mapping: Mapping[str, str],
309+
left_order_dominates: bool = True,
310+
) -> RowOrdering:
311+
left_ordering_refs = [
312+
ref.remap_names(left_id_mapping) for ref in left.all_ordering_columns
313+
]
314+
right_ordering_refs = [
315+
ref.remap_names(right_id_mapping) for ref in right.all_ordering_columns
316+
]
317+
if left_order_dominates:
318+
joined_refs = [*left_ordering_refs, *right_ordering_refs]
319+
else:
320+
joined_refs = [*right_ordering_refs, *left_ordering_refs]
321+
322+
if isinstance(left, TotalOrdering) and isinstance(right, TotalOrdering):
323+
left_total_order_cols = frozenset(
324+
[left_id_mapping[id] for id in left.total_ordering_columns]
325+
)
326+
right_total_order_cols = frozenset(
327+
[right_id_mapping[id] for id in right.total_ordering_columns]
328+
)
329+
return TotalOrdering(
330+
ordering_value_columns=tuple(joined_refs),
331+
total_ordering_columns=left_total_order_cols | right_total_order_cols,
332+
)
333+
else:
334+
return RowOrdering(tuple(joined_refs))

0 commit comments

Comments
 (0)