Skip to content

Commit 6454aff

Browse files
feat: Add filter pushdown to hybrid engine (#1871)
1 parent 23d6fb4 commit 6454aff

File tree

3 files changed

+69
-2
lines changed

3 files changed

+69
-2
lines changed

bigframes/session/polars_executor.py

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -35,6 +35,7 @@
3535
nodes.ProjectionNode,
3636
nodes.SliceNode,
3737
nodes.AggregateNode,
38+
nodes.FilterNode,
3839
)
3940

4041
_COMPATIBLE_SCALAR_OPS = (
Lines changed: 67 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,67 @@
1+
# Copyright 2025 Google LLC
2+
#
3+
# Licensed under the Apache License, Version 2.0 (the "License");
4+
# you may not use this file except in compliance with the License.
5+
# You may obtain a copy of the License at
6+
#
7+
# http://www.apache.org/licenses/LICENSE-2.0
8+
#
9+
# Unless required by applicable law or agreed to in writing, software
10+
# distributed under the License is distributed on an "AS IS" BASIS,
11+
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12+
# See the License for the specific language governing permissions and
13+
# limitations under the License.
14+
import pytest
15+
16+
from bigframes.core import array_value, expression, nodes
17+
import bigframes.operations as ops
18+
from bigframes.session import polars_executor
19+
from bigframes.testing.engine_utils import assert_equivalence_execution
20+
21+
pytest.importorskip("polars")
22+
23+
# Polars used as reference as its fast and local. Generally though, prefer gbq engine where they disagree.
24+
REFERENCE_ENGINE = polars_executor.PolarsExecutor()
25+
26+
27+
@pytest.mark.parametrize("engine", ["polars", "bq"], indirect=True)
28+
def test_engines_filter_bool_col(
29+
scalars_array_value: array_value.ArrayValue,
30+
engine,
31+
):
32+
node = nodes.FilterNode(
33+
scalars_array_value.node, predicate=expression.deref("bool_col")
34+
)
35+
assert_equivalence_execution(node, REFERENCE_ENGINE, engine)
36+
37+
38+
@pytest.mark.parametrize("engine", ["polars", "bq"], indirect=True)
39+
def test_engines_filter_expr_cond(
40+
scalars_array_value: array_value.ArrayValue,
41+
engine,
42+
):
43+
predicate = ops.gt_op.as_expr(
44+
expression.deref("float64_col"), expression.deref("int64_col")
45+
)
46+
node = nodes.FilterNode(scalars_array_value.node, predicate=predicate)
47+
assert_equivalence_execution(node, REFERENCE_ENGINE, engine)
48+
49+
50+
@pytest.mark.parametrize("engine", ["polars", "bq"], indirect=True)
51+
def test_engines_filter_true(
52+
scalars_array_value: array_value.ArrayValue,
53+
engine,
54+
):
55+
predicate = expression.const(True)
56+
node = nodes.FilterNode(scalars_array_value.node, predicate=predicate)
57+
assert_equivalence_execution(node, REFERENCE_ENGINE, engine)
58+
59+
60+
@pytest.mark.parametrize("engine", ["polars", "bq"], indirect=True)
61+
def test_engines_filter_false(
62+
scalars_array_value: array_value.ArrayValue,
63+
engine,
64+
):
65+
predicate = expression.const(False)
66+
node = nodes.FilterNode(scalars_array_value.node, predicate=predicate)
67+
assert_equivalence_execution(node, REFERENCE_ENGINE, engine)

tests/system/small/test_polars_execution.py

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -53,8 +53,7 @@ def test_polar_execution_sorted_filtered(session_w_polars, scalars_pandas_df_ind
5353
.to_pandas()
5454
)
5555

56-
# Filter and isnull not supported by polar engine yet, so falls back to bq execution
57-
assert session_w_polars._metrics.execution_count == (execution_count_before + 1)
56+
assert session_w_polars._metrics.execution_count == execution_count_before
5857
assert_pandas_df_equal(bf_result, pd_result)
5958

6059

0 commit comments

Comments
 (0)