Skip to content

Commit 813624d

Browse files
feat: Add concat pushdown for hybrid engine (#1891)
1 parent e1ebc53 commit 813624d

File tree

4 files changed

+58
-1
lines changed

4 files changed

+58
-1
lines changed

bigframes/core/compile/polars/compiler.py

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -547,6 +547,11 @@ def compile_concat(self, node: nodes.ConcatNode):
547547
child_frames = [
548548
frame.rename(
549549
{col: id.sql for col, id in zip(frame.columns, node.output_ids)}
550+
).cast(
551+
{
552+
field.id.sql: _bigframes_dtype_to_polars_dtype(field.dtype)
553+
for field in node.fields
554+
}
550555
)
551556
for frame in child_frames
552557
]

bigframes/core/nodes.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -424,7 +424,7 @@ def remap_refs(
424424

425425
@dataclasses.dataclass(frozen=True, eq=False)
426426
class ConcatNode(BigFrameNode):
427-
# TODO: Explcitly map column ids from each child
427+
# TODO: Explcitly map column ids from each child?
428428
children: Tuple[BigFrameNode, ...]
429429
output_ids: Tuple[identifiers.ColumnId, ...]
430430

bigframes/session/polars_executor.py

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -36,6 +36,7 @@
3636
nodes.SliceNode,
3737
nodes.AggregateNode,
3838
nodes.FilterNode,
39+
nodes.ConcatNode,
3940
)
4041

4142
_COMPATIBLE_SCALAR_OPS = (
Lines changed: 51 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,51 @@
1+
# Copyright 2025 Google LLC
2+
#
3+
# Licensed under the Apache License, Version 2.0 (the "License");
4+
# you may not use this file except in compliance with the License.
5+
# You may obtain a copy of the License at
6+
#
7+
# http://www.apache.org/licenses/LICENSE-2.0
8+
#
9+
# Unless required by applicable law or agreed to in writing, software
10+
# distributed under the License is distributed on an "AS IS" BASIS,
11+
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12+
# See the License for the specific language governing permissions and
13+
# limitations under the License.
14+
15+
import pytest
16+
17+
from bigframes.core import array_value, ordering
18+
from bigframes.session import polars_executor
19+
from bigframes.testing.engine_utils import assert_equivalence_execution
20+
21+
pytest.importorskip("polars")
22+
23+
# Polars used as reference as its fast and local. Generally though, prefer gbq engine where they disagree.
24+
REFERENCE_ENGINE = polars_executor.PolarsExecutor()
25+
26+
27+
@pytest.mark.parametrize("engine", ["polars", "bq"], indirect=True)
28+
def test_engines_concat_self(
29+
scalars_array_value: array_value.ArrayValue,
30+
engine,
31+
):
32+
result = scalars_array_value.concat([scalars_array_value, scalars_array_value])
33+
34+
assert_equivalence_execution(result.node, REFERENCE_ENGINE, engine)
35+
36+
37+
@pytest.mark.parametrize("engine", ["polars", "bq"], indirect=True)
38+
def test_engines_concat_filtered_sorted(
39+
scalars_array_value: array_value.ArrayValue,
40+
engine,
41+
):
42+
input_1 = scalars_array_value.select_columns(["float64_col", "int64_col"]).order_by(
43+
[ordering.ascending_over("int64_col")]
44+
)
45+
input_2 = scalars_array_value.filter_by_id("bool_col").select_columns(
46+
["float64_col", "int64_too"]
47+
)
48+
49+
result = input_1.concat([input_2, input_1, input_2])
50+
51+
assert_equivalence_execution(result.node, REFERENCE_ENGINE, engine)

0 commit comments

Comments
 (0)