Skip to content

Commit 57746e1

Browse files
refactor: ReadLocal and Explode nodes support offset outputs (#1301)
1 parent d941a84 commit 57746e1

File tree

4 files changed

+191
-122
lines changed

4 files changed

+191
-122
lines changed

bigframes/core/compile/compiled.py

Lines changed: 0 additions & 116 deletions
Original file line numberDiff line numberDiff line change
@@ -40,7 +40,6 @@
4040
from bigframes.core.ordering import (
4141
ascending_over,
4242
encode_order_string,
43-
join_orderings,
4443
OrderingExpression,
4544
RowOrdering,
4645
TotalOrdering,
@@ -420,50 +419,6 @@ def _uniform_sampling(self, fraction: float) -> UnorderedIR:
420419
columns=columns,
421420
)
422421

423-
def explode(self, columns: typing.Sequence[ex.DerefOp]) -> UnorderedIR:
424-
table = self._to_ibis_expr()
425-
column_ids = tuple(ref.id.sql for ref in columns)
426-
427-
# The offset array ensures null represents empty arrays after unnesting.
428-
offset_array_id = bigframes.core.guid.generate_guid("offset_array_")
429-
offset_array = bigframes_vendored.ibis.range(
430-
0,
431-
bigframes_vendored.ibis.greatest(
432-
1, # We always want at least 1 element to fill in NULLs for empty arrays.
433-
bigframes_vendored.ibis.least(
434-
*[table[column_id].length() for column_id in column_ids]
435-
),
436-
),
437-
1,
438-
).name(offset_array_id)
439-
table_w_offset_array = table.select(
440-
offset_array,
441-
*self._column_names,
442-
)
443-
444-
unnest_offset_id = bigframes.core.guid.generate_guid("unnest_offset_")
445-
unnest_offset = (
446-
table_w_offset_array[offset_array_id].unnest().name(unnest_offset_id)
447-
)
448-
table_w_offset = table_w_offset_array.select(
449-
unnest_offset,
450-
*self._column_names,
451-
)
452-
453-
unnested_columns = [
454-
table_w_offset[column_id][table_w_offset[unnest_offset_id]].name(column_id)
455-
if column_id in column_ids
456-
else table_w_offset[column_id]
457-
for column_id in self._column_names
458-
]
459-
table_w_unnest = table_w_offset.select(*unnested_columns)
460-
461-
columns = [table_w_unnest[column_name] for column_name in self._column_names]
462-
return UnorderedIR(
463-
table_w_unnest,
464-
columns=columns, # type: ignore
465-
)
466-
467422
def as_ordered_ir(self) -> OrderedIR:
468423
"""Convert to OrderedIr, but without any definite ordering."""
469424
return OrderedIR(self._table, self._columns, predicates=self._predicates)
@@ -746,77 +701,6 @@ def _uniform_sampling(self, fraction: float) -> OrderedIR:
746701
ordering=self._ordering,
747702
)
748703

749-
def explode(self, columns: typing.Sequence[ex.DerefOp]) -> OrderedIR:
750-
if self.order_non_deterministic:
751-
id = bigframes.core.guid.generate_guid()
752-
return self.promote_offsets(id)
753-
table = self._to_ibis_expr(ordering_mode="unordered", expose_hidden_cols=True)
754-
column_ids = tuple(ref.id.sql for ref in columns)
755-
756-
offset_array_id = bigframes.core.guid.generate_guid("offset_array_")
757-
offset_array = bigframes_vendored.ibis.range(
758-
0,
759-
bigframes_vendored.ibis.greatest(
760-
1, # We always want at least 1 element to fill in NULLs for empty arrays.
761-
bigframes_vendored.ibis.least(
762-
*[table[column_id].length() for column_id in column_ids]
763-
),
764-
),
765-
1,
766-
).name(offset_array_id)
767-
table_w_offset_array = table.select(
768-
offset_array,
769-
*self._column_names,
770-
*self._hidden_ordering_column_names,
771-
)
772-
773-
unnest_offset_id = bigframes.core.guid.generate_guid("unnest_offset_")
774-
unnest_offset = (
775-
table_w_offset_array[offset_array_id].unnest().name(unnest_offset_id)
776-
)
777-
table_w_offset = table_w_offset_array.select(
778-
unnest_offset,
779-
*self._column_names,
780-
*self._hidden_ordering_column_names,
781-
)
782-
783-
unnested_columns = [
784-
table_w_offset[column_id][table_w_offset[unnest_offset_id]].name(column_id)
785-
if column_id in column_ids
786-
else table_w_offset[column_id]
787-
for column_id in self._column_names
788-
]
789-
790-
table_w_unnest = table_w_offset.select(
791-
table_w_offset[unnest_offset_id],
792-
*unnested_columns,
793-
*self._hidden_ordering_column_names,
794-
)
795-
796-
columns = [table_w_unnest[column_name] for column_name in self._column_names]
797-
hidden_ordering_columns = [
798-
*[
799-
table_w_unnest[column_name]
800-
for column_name in self._hidden_ordering_column_names
801-
],
802-
table_w_unnest[unnest_offset_id],
803-
]
804-
l_mappings = {id: id for id in self._ordering.referenced_columns}
805-
r_mappings = {ids.ColumnId(unnest_offset_id): ids.ColumnId(unnest_offset_id)}
806-
ordering = join_orderings(
807-
self._ordering,
808-
TotalOrdering.from_offset_col(unnest_offset_id),
809-
l_mappings,
810-
r_mappings,
811-
)
812-
813-
return OrderedIR(
814-
table_w_unnest,
815-
columns=columns, # type: ignore
816-
hidden_ordering_columns=hidden_ordering_columns,
817-
ordering=ordering,
818-
)
819-
820704
def promote_offsets(self, col_id: str) -> OrderedIR:
821705
"""
822706
Convenience function to promote copy of column offsets to a value column. Can be used to reset index.

bigframes/core/compile/compiler.py

Lines changed: 10 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -26,6 +26,7 @@
2626

2727
import bigframes.core.compile.compiled as compiled
2828
import bigframes.core.compile.concat as concat_impl
29+
import bigframes.core.compile.explode
2930
import bigframes.core.compile.ibis_types
3031
import bigframes.core.compile.scalar_op_compiler
3132
import bigframes.core.compile.scalar_op_compiler as compile_scalar
@@ -373,7 +374,15 @@ def compile_window(self, node: nodes.WindowOpNode, ordered: bool = True):
373374

374375
@_compile_node.register
375376
def compile_explode(self, node: nodes.ExplodeNode, ordered: bool = True):
376-
return self.compile_node(node.child, ordered).explode(node.column_ids)
377+
offsets_col = node.offsets_col.sql if (node.offsets_col is not None) else None
378+
if ordered:
379+
return bigframes.core.compile.explode.explode_ordered(
380+
self.compile_ordered_ir(node.child), node.column_ids, offsets_col
381+
)
382+
else:
383+
return bigframes.core.compile.explode.explode_unordered(
384+
self.compile_unordered_ir(node.child), node.column_ids, offsets_col
385+
)
377386

378387
@_compile_node.register
379388
def compile_random_sample(self, node: nodes.RandomSampleNode, ordered: bool = True):

bigframes/core/compile/explode.py

Lines changed: 151 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,151 @@
1+
# Copyright 2023 Google LLC
2+
#
3+
# Licensed under the Apache License, Version 2.0 (the "License");
4+
# you may not use this file except in compliance with the License.
5+
# You may obtain a copy of the License at
6+
#
7+
# http://www.apache.org/licenses/LICENSE-2.0
8+
#
9+
# Unless required by applicable law or agreed to in writing, software
10+
# distributed under the License is distributed on an "AS IS" BASIS,
11+
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12+
# See the License for the specific language governing permissions and
13+
# limitations under the License.
14+
from __future__ import annotations
15+
16+
import typing
17+
18+
import bigframes_vendored.ibis
19+
20+
import bigframes.core.compile.compiled as compiled
21+
import bigframes.core.expression as ex
22+
import bigframes.core.guid
23+
import bigframes.core.identifiers as ids
24+
import bigframes.core.ordering
25+
from bigframes.core.ordering import TotalOrdering
26+
27+
28+
def explode_unordered(
29+
input: compiled.UnorderedIR,
30+
columns: typing.Sequence[ex.DerefOp],
31+
offsets_id: typing.Optional[str],
32+
) -> compiled.UnorderedIR:
33+
table = input._to_ibis_expr()
34+
column_ids = tuple(ref.id.sql for ref in columns)
35+
36+
# The offset array ensures null represents empty arrays after unnesting.
37+
offset_array_id = bigframes.core.guid.generate_guid("offset_array_")
38+
offset_array = bigframes_vendored.ibis.range(
39+
0,
40+
bigframes_vendored.ibis.greatest(
41+
1, # We always want at least 1 element to fill in NULLs for empty arrays.
42+
bigframes_vendored.ibis.least(
43+
*[table[column_id].length() for column_id in column_ids]
44+
),
45+
),
46+
1,
47+
).name(offset_array_id)
48+
table_w_offset_array = table.select(
49+
offset_array,
50+
*input._column_names,
51+
)
52+
53+
unnest_offset_id = offsets_id or bigframes.core.guid.generate_guid("unnest_offset_")
54+
unnest_offset = (
55+
table_w_offset_array[offset_array_id].unnest().name(unnest_offset_id)
56+
)
57+
table_w_offset = table_w_offset_array.select(
58+
unnest_offset,
59+
*input._column_names,
60+
)
61+
62+
output_cols = tuple(input.column_ids) + ((offsets_id,) if offsets_id else ())
63+
unnested_columns = [
64+
table_w_offset[column_id][table_w_offset[unnest_offset_id]].name(column_id)
65+
if column_id in column_ids
66+
else table_w_offset[column_id]
67+
for column_id in output_cols
68+
]
69+
table_w_unnest = table_w_offset.select(*unnested_columns)
70+
71+
columns = [table_w_unnest[column_name] for column_name in output_cols]
72+
return compiled.UnorderedIR(
73+
table_w_unnest,
74+
columns=columns, # type: ignore
75+
)
76+
77+
78+
def explode_ordered(
79+
input: compiled.OrderedIR,
80+
columns: typing.Sequence[ex.DerefOp],
81+
offsets_id: typing.Optional[str],
82+
) -> compiled.OrderedIR:
83+
if input.order_non_deterministic:
84+
id = bigframes.core.guid.generate_guid()
85+
return input.promote_offsets(id)
86+
table = input._to_ibis_expr(ordering_mode="unordered", expose_hidden_cols=True)
87+
column_ids = tuple(ref.id.sql for ref in columns)
88+
89+
offset_array_id = bigframes.core.guid.generate_guid("offset_array_")
90+
offset_array = bigframes_vendored.ibis.range(
91+
0,
92+
bigframes_vendored.ibis.greatest(
93+
1, # We always want at least 1 element to fill in NULLs for empty arrays.
94+
bigframes_vendored.ibis.least(
95+
*[table[column_id].length() for column_id in column_ids]
96+
),
97+
),
98+
1,
99+
).name(offset_array_id)
100+
table_w_offset_array = table.select(
101+
offset_array,
102+
*input._column_names,
103+
*input._hidden_ordering_column_names,
104+
)
105+
106+
unnest_offset_id = offsets_id or bigframes.core.guid.generate_guid("unnest_offset_")
107+
unnest_offset = (
108+
table_w_offset_array[offset_array_id].unnest().name(unnest_offset_id)
109+
)
110+
table_w_offset = table_w_offset_array.select(
111+
unnest_offset,
112+
*input._column_names,
113+
*input._hidden_ordering_column_names,
114+
)
115+
116+
unnested_columns = [
117+
table_w_offset[column_id][table_w_offset[unnest_offset_id]].name(column_id)
118+
if column_id in column_ids
119+
else table_w_offset[column_id]
120+
for column_id in input._column_names
121+
]
122+
123+
table_w_unnest = table_w_offset.select(
124+
table_w_offset[unnest_offset_id],
125+
*unnested_columns,
126+
*input._hidden_ordering_column_names,
127+
)
128+
129+
output_cols = tuple(input.column_ids) + ((offsets_id,) if offsets_id else ())
130+
columns = [table_w_unnest[column_name] for column_name in output_cols]
131+
hidden_ordering_columns = [
132+
table_w_unnest[column_name]
133+
for column_name in input._hidden_ordering_column_names
134+
]
135+
if offsets_id is None:
136+
hidden_ordering_columns.append(table_w_unnest[unnest_offset_id])
137+
l_mappings = {id: id for id in input._ordering.referenced_columns}
138+
r_mappings = {ids.ColumnId(unnest_offset_id): ids.ColumnId(unnest_offset_id)}
139+
ordering = bigframes.core.ordering.join_orderings(
140+
input._ordering,
141+
TotalOrdering.from_offset_col(unnest_offset_id),
142+
l_mappings,
143+
r_mappings,
144+
)
145+
146+
return compiled.OrderedIR(
147+
table_w_unnest,
148+
columns=columns, # type: ignore
149+
hidden_ordering_columns=hidden_ordering_columns,
150+
ordering=ordering,
151+
)

0 commit comments

Comments
 (0)