Skip to content

Commit cf852a2

Browse files
refactor: Make row hash ordering explicit in tree (#1284)
1 parent 5cac5c8 commit cf852a2

File tree

16 files changed

+183
-51
lines changed

16 files changed

+183
-51
lines changed

bigframes/bigquery/_operations/search.py

Lines changed: 6 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -122,12 +122,12 @@ def vector_search(
122122
... base_table="bigframes-dev.bigframes_tests_sys.base_table",
123123
... column_to_search="my_embedding",
124124
... query=search_query,
125-
... top_k=2)
125+
... top_k=2).sort_values("id")
126126
query_id embedding id my_embedding distance
127-
1 cat [3. 5.2] 5 [5. 5.4] 2.009975
128127
0 dog [1. 2.] 1 [1. 2.] 0.0
129-
0 dog [1. 2.] 4 [1. 3.2] 1.2
130128
1 cat [3. 5.2] 2 [2. 4.] 1.56205
129+
0 dog [1. 2.] 4 [1. 3.2] 1.2
130+
1 cat [3. 5.2] 5 [5. 5.4] 2.009975
131131
<BLANKLINE>
132132
[4 rows x 5 columns]
133133
@@ -141,12 +141,12 @@ def vector_search(
141141
... column_to_search="my_embedding",
142142
... query=search_query,
143143
... top_k=2,
144-
... use_brute_force=True)
144+
... use_brute_force=True).sort_values("id")
145145
embedding id my_embedding distance
146146
dog [1. 2.] 1 [1. 2.] 0.0
147-
cat [3. 5.2] 5 [5. 5.4] 2.009975
148-
dog [1. 2.] 4 [1. 3.2] 1.2
149147
cat [3. 5.2] 2 [2. 4.] 1.56205
148+
dog [1. 2.] 4 [1. 3.2] 1.2
149+
cat [3. 5.2] 5 [5. 5.4] 2.009975
150150
<BLANKLINE>
151151
[4 rows x 4 columns]
152152

bigframes/core/__init__.py

Lines changed: 8 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -220,8 +220,14 @@ def filter_by_id(self, predicate_id: str, keep_null: bool = False) -> ArrayValue
220220
def filter(self, predicate: ex.Expression):
221221
return ArrayValue(nodes.FilterNode(child=self.node, predicate=predicate))
222222

223-
def order_by(self, by: Sequence[OrderingExpression]) -> ArrayValue:
224-
return ArrayValue(nodes.OrderByNode(child=self.node, by=tuple(by)))
223+
def order_by(
224+
self, by: Sequence[OrderingExpression], is_total_order: bool = False
225+
) -> ArrayValue:
226+
return ArrayValue(
227+
nodes.OrderByNode(
228+
child=self.node, by=tuple(by), is_total_order=is_total_order
229+
)
230+
)
225231

226232
def reversed(self) -> ArrayValue:
227233
return ArrayValue(nodes.ReversedNode(child=self.node))

bigframes/core/compile/compiled.py

Lines changed: 36 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -278,6 +278,24 @@ def to_sql(
278278
sql = ibis_bigquery.Backend().compile(self._to_ibis_expr())
279279
return typing.cast(str, sql)
280280

281+
def with_total_order(self, by: Sequence[OrderingExpression]) -> OrderedIR:
282+
return OrderedIR(
283+
table=self._table,
284+
columns=self._columns,
285+
predicates=self._predicates,
286+
ordering=TotalOrdering(
287+
ordering_value_columns=tuple(by),
288+
total_ordering_columns=frozenset(
289+
map(
290+
ex.DerefOp,
291+
itertools.chain.from_iterable(
292+
col.referenced_columns for col in by
293+
),
294+
)
295+
),
296+
),
297+
)
298+
281299
def row_count(self, name: str) -> OrderedIR:
282300
original_table = self._to_ibis_expr()
283301
ibis_table = original_table.agg(
@@ -576,6 +594,13 @@ def __init__(
576594
def is_ordered_ir(self) -> bool:
577595
return True
578596

597+
@property
598+
def order_non_deterministic(self) -> bool:
599+
# ordering suffix non-determinism is ok, as rand() is used as suffix for auto-generated order keys.
600+
# but must be resolved before or explode, otherwise the engine might pull the rand() evaluation above the join,
601+
# creating inconsistencies
602+
return not all(col.deterministic for col in self._ordering.all_ordering_columns)
603+
579604
@property
580605
def has_total_order(self) -> bool:
581606
return isinstance(self._ordering, TotalOrdering)
@@ -722,6 +747,9 @@ def _uniform_sampling(self, fraction: float) -> OrderedIR:
722747
)
723748

724749
def explode(self, columns: typing.Sequence[ex.DerefOp]) -> OrderedIR:
750+
if self.order_non_deterministic:
751+
id = bigframes.core.guid.generate_guid()
752+
return self.promote_offsets(id)
725753
table = self._to_ibis_expr(ordering_mode="unordered", expose_hidden_cols=True)
726754
column_ids = tuple(ref.id.sql for ref in columns)
727755

@@ -1229,7 +1257,14 @@ def _bake_ordering(self) -> OrderedIR:
12291257
tuple(new_exprs),
12301258
self._ordering.integer_encoding,
12311259
self._ordering.string_encoding,
1232-
self._ordering.total_ordering_columns,
1260+
total_ordering_columns=frozenset(
1261+
map(
1262+
ex.DerefOp,
1263+
itertools.chain.from_iterable(
1264+
col.referenced_columns for col in new_exprs
1265+
),
1266+
)
1267+
),
12331268
)
12341269
else:
12351270
new_ordering = RowOrdering(

bigframes/core/compile/compiler.py

Lines changed: 6 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -26,7 +26,6 @@
2626

2727
import bigframes.core.compile.compiled as compiled
2828
import bigframes.core.compile.concat as concat_impl
29-
import bigframes.core.compile.default_ordering as default_ordering
3029
import bigframes.core.compile.ibis_types
3130
import bigframes.core.compile.scalar_op_compiler
3231
import bigframes.core.compile.scalar_op_compiler as compile_scalar
@@ -104,10 +103,7 @@ def set_output_names(
104103
)
105104

106105
def compile_ordered_ir(self, node: nodes.BigFrameNode) -> compiled.OrderedIR:
107-
ir = typing.cast(compiled.OrderedIR, self.compile_node(node, True))
108-
if self.strict:
109-
assert ir.has_total_order
110-
return ir
106+
return typing.cast(compiled.OrderedIR, self.compile_node(node, True))
111107

112108
def compile_unordered_ir(self, node: nodes.BigFrameNode) -> compiled.UnorderedIR:
113109
return typing.cast(compiled.UnorderedIR, self.compile_node(node, False))
@@ -274,17 +270,6 @@ def compile_read_table_ordered(
274270
for source_id, out_id in full_mapping.items()
275271
if source_id not in visible_column_mapping
276272
)
277-
elif self.strict: # In strict mode, we fallback to ordering by row hash
278-
order_values = [
279-
col.name(guids.generate_guid())
280-
for col in default_ordering.gen_default_ordering(
281-
ibis_table, use_double_hash=True
282-
)
283-
]
284-
ordering = bf_ordering.TotalOrdering.from_primary_key(
285-
[value.get_name() for value in order_values]
286-
)
287-
hidden_columns = tuple(order_values)
288273
else:
289274
# In unstrict mode, don't generate total ordering from hashing as this is
290275
# expensive (prevent removing any columns from table scan)
@@ -316,7 +301,11 @@ def compile_filter(self, node: nodes.FilterNode, ordered: bool = True):
316301
@_compile_node.register
317302
def compile_orderby(self, node: nodes.OrderByNode, ordered: bool = True):
318303
if ordered:
319-
return self.compile_ordered_ir(node.child).order_by(node.by)
304+
if node.is_total_order:
305+
# more efficient, can just discard any previous ordering and get same result
306+
return self.compile_unordered_ir(node.child).with_total_order(node.by)
307+
else:
308+
return self.compile_ordered_ir(node.child).order_by(node.by)
320309
else:
321310
return self.compile_unordered_ir(node.child)
322311

bigframes/core/compile/default_ordering.py

Lines changed: 23 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -18,7 +18,7 @@
1818

1919
from __future__ import annotations
2020

21-
from typing import cast
21+
from typing import cast, Sequence
2222

2323
import bigframes_vendored.ibis
2424
import bigframes_vendored.ibis.expr.datatypes as ibis_dtypes
@@ -28,7 +28,7 @@
2828
import bigframes.core.guid as guid
2929

3030

31-
def _convert_to_nonnull_string(column: ibis_types.Column) -> ibis_types.StringValue:
31+
def _convert_to_nonnull_string(column: ibis_types.Value) -> ibis_types.StringValue:
3232
col_type = column.type()
3333
if (
3434
col_type.is_numeric()
@@ -60,29 +60,35 @@ def _convert_to_nonnull_string(column: ibis_types.Column) -> ibis_types.StringVa
6060
)
6161

6262

63-
def gen_default_ordering(
64-
table: ibis_types.Table, use_double_hash: bool = True
65-
) -> list[bigframes_vendored.ibis.Value]:
63+
def gen_row_key(
64+
columns: Sequence[ibis_types.Value],
65+
) -> bigframes_vendored.ibis.Value:
6666
ordering_hash_part = guid.generate_guid("bigframes_ordering_")
6767
ordering_hash_part2 = guid.generate_guid("bigframes_ordering_")
6868
ordering_rand_part = guid.generate_guid("bigframes_ordering_")
6969

7070
# All inputs into hash must be non-null or resulting hash will be null
71-
str_values = list(
72-
map(lambda col: _convert_to_nonnull_string(table[col]), table.columns)
73-
)
71+
str_values = list(map(_convert_to_nonnull_string, columns))
7472
full_row_str = (
7573
str_values[0].concat(*str_values[1:]) if len(str_values) > 1 else str_values[0]
7674
)
77-
full_row_hash = full_row_str.hash().name(ordering_hash_part)
75+
full_row_hash = (
76+
full_row_str.hash()
77+
.name(ordering_hash_part)
78+
.cast(ibis_dtypes.String(nullable=True))
79+
)
7880
# By modifying value slightly, we get another hash uncorrelated with the first
79-
full_row_hash_p2 = (full_row_str + "_").hash().name(ordering_hash_part2)
81+
full_row_hash_p2 = (
82+
(full_row_str + "_")
83+
.hash()
84+
.name(ordering_hash_part2)
85+
.cast(ibis_dtypes.String(nullable=True))
86+
)
8087
# Used to disambiguate between identical rows (which will have identical hash)
81-
random_value = bigframes_vendored.ibis.random().name(ordering_rand_part)
82-
83-
order_values = (
84-
[full_row_hash, full_row_hash_p2, random_value]
85-
if use_double_hash
86-
else [full_row_hash, random_value]
88+
random_value = (
89+
bigframes_vendored.ibis.random()
90+
.name(ordering_rand_part)
91+
.cast(ibis_dtypes.String(nullable=True))
8792
)
88-
return order_values
93+
94+
return full_row_hash.concat(full_row_hash_p2, random_value)

bigframes/core/compile/scalar_op_compiler.py

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -26,6 +26,7 @@
2626
import numpy as np
2727
import pandas as pd
2828

29+
import bigframes.core.compile.default_ordering
2930
import bigframes.core.compile.ibis_types
3031
import bigframes.core.expression as ex
3132
import bigframes.dtypes
@@ -1850,6 +1851,11 @@ def struct_op_impl(
18501851
return ibis_types.struct(data)
18511852

18521853

1854+
@scalar_op_compiler.register_nary_op(ops.RowKey, pass_op=True)
1855+
def rowkey_op_impl(*values: ibis_types.Value, op: ops.RowKey) -> ibis_types.Value:
1856+
return bigframes.core.compile.default_ordering.gen_row_key(values)
1857+
1858+
18531859
# Helpers
18541860
def is_null(value) -> bool:
18551861
# float NaN/inf should be treated as distinct from 'true' null values

bigframes/core/compile/single_column.py

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -50,6 +50,12 @@ def join_by_column_ordered(
5050
first the coalesced join keys, then, all the left columns, and
5151
finally, all the right columns.
5252
"""
53+
if type == "right":
54+
if left.order_non_deterministic:
55+
right = right._bake_ordering()
56+
else:
57+
if left.order_non_deterministic:
58+
left = left._bake_ordering()
5359

5460
# Do not reset the generator
5561
l_value_mapping = dict(zip(left.column_ids, left.column_ids))

bigframes/core/expression.py

Lines changed: 11 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -205,6 +205,10 @@ def bind_variables(
205205
def is_bijective(self) -> bool:
206206
return False
207207

208+
@property
209+
def deterministic(self) -> bool:
210+
return True
211+
208212
@property
209213
def is_identity(self) -> bool:
210214
"""True for identity operation that does not transform input."""
@@ -409,4 +413,10 @@ def bind_refs(
409413
@property
410414
def is_bijective(self) -> bool:
411415
# TODO: Mark individual functions as bijective?
412-
return False
416+
return all(input.is_bijective for input in self.inputs) and self.op.is_bijective
417+
418+
@property
419+
def deterministic(self) -> bool:
420+
return (
421+
all(input.deterministic for input in self.inputs) and self.op.deterministic
422+
)

bigframes/core/nodes.py

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -976,6 +976,9 @@ def remap_refs(self, mappings: Mapping[bfet_ids.ColumnId, bfet_ids.ColumnId]):
976976
@dataclasses.dataclass(frozen=True, eq=False)
977977
class OrderByNode(UnaryNode):
978978
by: Tuple[OrderingExpression, ...]
979+
# This is an optimization, if true, can discard previous orderings.
980+
# might be a total ordering even if false
981+
is_total_order: bool = False
979982

980983
@property
981984
def variables_introduced(self) -> int:

bigframes/core/ordering.py

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -59,6 +59,10 @@ class OrderingExpression:
5959
def referenced_columns(self) -> Set[ids.ColumnId]:
6060
return set(self.scalar_expression.column_references)
6161

62+
@property
63+
def deterministic(self) -> bool:
64+
return self.scalar_expression.deterministic
65+
6266
def remap_column_refs(
6367
self,
6468
mapping: Mapping[ids.ColumnId, ids.ColumnId],

0 commit comments

Comments
 (0)