Skip to content

Commit bc885bd

Browse files
authored
chore: add compile_explode (#1848)
Fixes internal issue 427306238
1 parent 0709f17 commit bc885bd

File tree

6 files changed

+169
-0
lines changed

6 files changed

+169
-0
lines changed

bigframes/core/compile/sqlglot/compiler.py

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -229,6 +229,14 @@ def compile_concat(
229229
uid_gen=self.uid_gen,
230230
)
231231

232+
@_compile_node.register
233+
def compile_explode(
234+
self, node: nodes.ExplodeNode, child: ir.SQLGlotIR
235+
) -> ir.SQLGlotIR:
236+
offsets_col = node.offsets_col.sql if (node.offsets_col is not None) else None
237+
columns = tuple(ref.id.sql for ref in node.column_ids)
238+
return child.explode(columns, offsets_col)
239+
232240

233241
def _replace_unsupported_ops(node: nodes.BigFrameNode):
234242
node = nodes.bottom_up(node, rewrite.rewrite_slice)

bigframes/core/compile/sqlglot/sqlglot_ir.py

Lines changed: 90 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -290,6 +290,96 @@ def replace(
290290
).sql(dialect=self.dialect, pretty=self.pretty)
291291
return f"{merge_str}\n{whens_str}"
292292

293+
def explode(
294+
self,
295+
column_names: tuple[str, ...],
296+
offsets_col: typing.Optional[str],
297+
) -> SQLGlotIR:
298+
num_columns = len(list(column_names))
299+
assert num_columns > 0, "At least one column must be provided for explode."
300+
if num_columns == 1:
301+
return self._explode_single_column(column_names[0], offsets_col)
302+
else:
303+
return self._explode_multiple_columns(column_names, offsets_col)
304+
305+
def _explode_single_column(
306+
self, column_name: str, offsets_col: typing.Optional[str]
307+
) -> SQLGlotIR:
308+
"""Helper method to handle the case of exploding a single column."""
309+
310+
offset = (
311+
sge.to_identifier(offsets_col, quoted=self.quoted) if offsets_col else None
312+
)
313+
column = sge.to_identifier(column_name, quoted=self.quoted)
314+
unnested_column_alias = sge.to_identifier(
315+
next(self.uid_gen.get_uid_stream("bfcol_")), quoted=self.quoted
316+
)
317+
unnest_expr = sge.Unnest(
318+
expressions=[column],
319+
alias=sge.TableAlias(columns=[unnested_column_alias]),
320+
offset=offset,
321+
)
322+
selection = sge.Star(replace=[unnested_column_alias.as_(column)])
323+
# TODO: "CROSS" if not keep_empty else "LEFT"
324+
# TODO: overlaps_with_parent to replace existing column.
325+
new_expr = (
326+
self._encapsulate_as_cte()
327+
.select(selection, append=False)
328+
.join(unnest_expr, join_type="CROSS")
329+
)
330+
return SQLGlotIR(expr=new_expr, uid_gen=self.uid_gen)
331+
332+
def _explode_multiple_columns(
333+
self,
334+
column_names: tuple[str, ...],
335+
offsets_col: typing.Optional[str],
336+
) -> SQLGlotIR:
337+
"""Helper method to handle the case of exploding multiple columns."""
338+
offset = (
339+
sge.to_identifier(offsets_col, quoted=self.quoted) if offsets_col else None
340+
)
341+
columns = [
342+
sge.to_identifier(column_name, quoted=self.quoted)
343+
for column_name in column_names
344+
]
345+
346+
# If there are multiple columns, we need to unnest by zipping the arrays:
347+
# https://cloud.google.com/bigquery/docs/arrays#zipping_arrays
348+
column_lengths = [
349+
sge.func("ARRAY_LENGTH", sge.to_identifier(column, quoted=self.quoted)) - 1
350+
for column in columns
351+
]
352+
generate_array = sge.func(
353+
"GENERATE_ARRAY",
354+
sge.convert(0),
355+
sge.func("LEAST", *column_lengths),
356+
)
357+
unnested_offset_alias = sge.to_identifier(
358+
next(self.uid_gen.get_uid_stream("bfcol_")), quoted=self.quoted
359+
)
360+
unnest_expr = sge.Unnest(
361+
expressions=[generate_array],
362+
alias=sge.TableAlias(columns=[unnested_offset_alias]),
363+
offset=offset,
364+
)
365+
selection = sge.Star(
366+
replace=[
367+
sge.Bracket(
368+
this=column,
369+
expressions=[unnested_offset_alias],
370+
safe=True,
371+
offset=False,
372+
).as_(column)
373+
for column in columns
374+
]
375+
)
376+
new_expr = (
377+
self._encapsulate_as_cte()
378+
.select(selection, append=False)
379+
.join(unnest_expr, join_type="CROSS")
380+
)
381+
return SQLGlotIR(expr=new_expr, uid_gen=self.uid_gen)
382+
293383
def _encapsulate_as_cte(
294384
self,
295385
) -> sge.Select:
Lines changed: 21 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,21 @@
1+
WITH `bfcte_0` AS (
2+
SELECT
3+
`rowindex` AS `bfcol_0`,
4+
`int_list_col` AS `bfcol_1`,
5+
`string_list_col` AS `bfcol_2`
6+
FROM `bigframes-dev`.`sqlglot_test`.`repeated_types`
7+
), `bfcte_1` AS (
8+
SELECT
9+
*
10+
REPLACE (`bfcol_1`[SAFE_OFFSET(`bfcol_13`)] AS `bfcol_1`, `bfcol_2`[SAFE_OFFSET(`bfcol_13`)] AS `bfcol_2`)
11+
FROM `bfcte_0`
12+
CROSS JOIN UNNEST(GENERATE_ARRAY(0, LEAST(ARRAY_LENGTH(`bfcol_1`) - 1, ARRAY_LENGTH(`bfcol_2`) - 1))) AS `bfcol_13` WITH OFFSET AS `bfcol_7`
13+
)
14+
SELECT
15+
`bfcol_0` AS `rowindex`,
16+
`bfcol_0` AS `rowindex_1`,
17+
`bfcol_1` AS `int_list_col`,
18+
`bfcol_2` AS `string_list_col`
19+
FROM `bfcte_1`
20+
ORDER BY
21+
`bfcol_7` ASC NULLS LAST
Lines changed: 18 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,18 @@
1+
WITH `bfcte_0` AS (
2+
SELECT
3+
`rowindex` AS `bfcol_0`,
4+
`int_list_col` AS `bfcol_1`
5+
FROM `bigframes-dev`.`sqlglot_test`.`repeated_types`
6+
), `bfcte_1` AS (
7+
SELECT
8+
*
9+
REPLACE (`bfcol_8` AS `bfcol_1`)
10+
FROM `bfcte_0`
11+
CROSS JOIN UNNEST(`bfcol_1`) AS `bfcol_8` WITH OFFSET AS `bfcol_4`
12+
)
13+
SELECT
14+
`bfcol_0` AS `rowindex`,
15+
`bfcol_1` AS `int_list_col`
16+
FROM `bfcte_1`
17+
ORDER BY
18+
`bfcol_4` ASC NULLS LAST
Lines changed: 31 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,31 @@
1+
# Copyright 2025 Google LLC
2+
#
3+
# Licensed under the Apache License, Version 2.0 (the "License");
4+
# you may not use this file except in compliance with the License.
5+
# You may obtain a copy of the License at
6+
#
7+
# http://www.apache.org/licenses/LICENSE-2.0
8+
#
9+
# Unless required by applicable law or agreed to in writing, software
10+
# distributed under the License is distributed on an "AS IS" BASIS,
11+
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12+
# See the License for the specific language governing permissions and
13+
# limitations under the License.
14+
15+
import pytest
16+
17+
import bigframes.pandas as bpd
18+
19+
pytest.importorskip("pytest_snapshot")
20+
21+
22+
# TODO: check order by with offset
23+
def test_compile_explode_series(repeated_types_df: bpd.DataFrame, snapshot):
24+
s = repeated_types_df["int_list_col"].explode()
25+
snapshot.assert_match(s.to_frame().sql, "out.sql")
26+
27+
28+
def test_compile_explode_dataframe(repeated_types_df: bpd.DataFrame, snapshot):
29+
exploded_columns = ["int_list_col", "string_list_col"]
30+
df = repeated_types_df[["rowindex", *exploded_columns]].explode(exploded_columns)
31+
snapshot.assert_match(df.sql, "out.sql")

third_party/bigframes_vendored/pandas/core/frame.py

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -4124,6 +4124,7 @@ def explode(
41244124
**Examples:**
41254125
41264126
>>> import bigframes.pandas as bpd
4127+
>>> import numpy as np
41274128
>>> bpd.options.display.progress_bar = None
41284129
41294130
>>> df = bpd.DataFrame({'A': [[0, 1, 2], [], [], [3, 4]],

0 commit comments

Comments
 (0)