Skip to content

Commit daf0c3b

Browse files
feat: Add experimental polars execution (#1747)
1 parent 9fb3cb4 commit daf0c3b

File tree

10 files changed

+157
-11
lines changed

10 files changed

+157
-11
lines changed

bigframes/_config/bigquery_options.py

Lines changed: 24 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -22,6 +22,7 @@
2222
import google.auth.credentials
2323
import requests.adapters
2424

25+
import bigframes._importing
2526
import bigframes.enums
2627
import bigframes.exceptions as bfe
2728

@@ -94,6 +95,7 @@ def __init__(
9495
requests_transport_adapters: Sequence[
9596
Tuple[str, requests.adapters.BaseAdapter]
9697
] = (),
98+
enable_polars_execution: bool = False,
9799
):
98100
self._credentials = credentials
99101
self._project = project
@@ -113,6 +115,9 @@ def __init__(
113115
client_endpoints_override = {}
114116

115117
self._client_endpoints_override = client_endpoints_override
118+
if enable_polars_execution:
119+
bigframes._importing.import_polars()
120+
self._enable_polars_execution = enable_polars_execution
116121

117122
@property
118123
def application_name(self) -> Optional[str]:
@@ -424,3 +429,22 @@ def requests_transport_adapters(
424429
SESSION_STARTED_MESSAGE.format(attribute="requests_transport_adapters")
425430
)
426431
self._requests_transport_adapters = value
432+
433+
@property
434+
def enable_polars_execution(self) -> bool:
435+
"""If True, will use polars to execute some simple query plans locally."""
436+
return self._enable_polars_execution
437+
438+
@enable_polars_execution.setter
439+
def enable_polars_execution(self, value: bool):
440+
if self._session_started and self._enable_polars_execution != value:
441+
raise ValueError(
442+
SESSION_STARTED_MESSAGE.format(attribute="enable_polars_execution")
443+
)
444+
if value is True:
445+
msg = bfe.format_message(
446+
"Polars execution is an experimental feature, and may not be stable. Must have polars installed."
447+
)
448+
warnings.warn(msg, category=bfe.PreviewWarning)
449+
bigframes._importing.import_polars()
450+
self._enable_polars_execution = value

bigframes/_importing.py

Lines changed: 30 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,30 @@
1+
# Copyright 2025 Google LLC
2+
#
3+
# Licensed under the Apache License, Version 2.0 (the "License");
4+
# you may not use this file except in compliance with the License.
5+
# You may obtain a copy of the License at
6+
#
7+
# http://www.apache.org/licenses/LICENSE-2.0
8+
#
9+
# Unless required by applicable law or agreed to in writing, software
10+
# distributed under the License is distributed on an "AS IS" BASIS,
11+
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12+
# See the License for the specific language governing permissions and
13+
# limitations under the License.
14+
import importlib
15+
from types import ModuleType
16+
17+
from packaging import version
18+
19+
# Keep this in sync with setup.py
20+
POLARS_MIN_VERSION = version.Version("1.7.0")
21+
22+
23+
def import_polars() -> ModuleType:
24+
polars_module = importlib.import_module("polars")
25+
imported_version = version.Version(polars_module.build_info()["version"])
26+
if imported_version < POLARS_MIN_VERSION:
27+
raise ImportError(
28+
f"Imported polars version: {imported_version} is below the minimum version: {POLARS_MIN_VERSION}"
29+
)
30+
return polars_module

bigframes/core/compile/polars/compiler.py

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -393,15 +393,15 @@ class PolarsCompiler:
393393
expr_compiler = PolarsExpressionCompiler()
394394
agg_compiler = PolarsAggregateCompiler()
395395

396-
def compile(self, array_value: bigframes.core.ArrayValue) -> pl.LazyFrame:
396+
def compile(self, plan: nodes.BigFrameNode) -> pl.LazyFrame:
397397
if not polars_installed:
398398
raise ValueError(
399399
"Polars is not installed, cannot compile to polars engine."
400400
)
401401

402402
# TODO: Create standard way to configure BFET -> BFET rewrites
403403
# Polars has incomplete slice support in lazy mode
404-
node = array_value.node
404+
node = plan
405405
node = bigframes.core.rewrite.column_pruning(node)
406406
node = nodes.bottom_up(node, bigframes.core.rewrite.rewrite_slice)
407407
node = bigframes.core.rewrite.pull_out_window_order(node)

bigframes/session/__init__.py

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -255,6 +255,7 @@ def __init__(
255255
storage_manager=self._temp_storage_manager,
256256
strictly_ordered=self._strictly_ordered,
257257
metrics=self._metrics,
258+
enable_polars_execution=context.enable_polars_execution,
258259
)
259260

260261
def __del__(self):

bigframes/session/bq_caching_executor.py

Lines changed: 19 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -41,7 +41,13 @@
4141
import bigframes.core.tree_properties as tree_properties
4242
import bigframes.dtypes
4343
import bigframes.features
44-
from bigframes.session import executor, loader, local_scan_executor, read_api_execution
44+
from bigframes.session import (
45+
executor,
46+
loader,
47+
local_scan_executor,
48+
read_api_execution,
49+
semi_executor,
50+
)
4551
import bigframes.session._io.bigquery as bq_io
4652
import bigframes.session.metrics
4753
import bigframes.session.planner
@@ -147,6 +153,7 @@ def __init__(
147153
*,
148154
strictly_ordered: bool = True,
149155
metrics: Optional[bigframes.session.metrics.ExecutionMetrics] = None,
156+
enable_polars_execution: bool = False,
150157
):
151158
self.bqclient = bqclient
152159
self.storage_manager = storage_manager
@@ -155,14 +162,21 @@ def __init__(
155162
self.metrics = metrics
156163
self.loader = loader
157164
self.bqstoragereadclient = bqstoragereadclient
158-
# Simple left-to-right precedence for now
159-
self._semi_executors = (
165+
self._enable_polars_execution = enable_polars_execution
166+
self._semi_executors: Sequence[semi_executor.SemiExecutor] = (
160167
read_api_execution.ReadApiSemiExecutor(
161168
bqstoragereadclient=bqstoragereadclient,
162169
project=self.bqclient.project,
163170
),
164171
local_scan_executor.LocalScanExecutor(),
165172
)
173+
if enable_polars_execution:
174+
from bigframes.session import polars_executor
175+
176+
self._semi_executors = (
177+
*self._semi_executors,
178+
polars_executor.PolarsExecutor(),
179+
)
166180
self._upload_lock = threading.Lock()
167181

168182
def to_sql(
@@ -637,8 +651,8 @@ def _execute_plan(
637651
"""Just execute whatever plan as is, without further caching or decomposition."""
638652
# First try to execute fast-paths
639653
if not output_spec.require_bq_table:
640-
for semi_executor in self._semi_executors:
641-
maybe_result = semi_executor.execute(plan, ordered=ordered, peek=peek)
654+
for exec in self._semi_executors:
655+
maybe_result = exec.execute(plan, ordered=ordered, peek=peek)
642656
if maybe_result:
643657
return maybe_result
644658

bigframes/session/polars_executor.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -73,7 +73,7 @@ def execute(
7373
# Note: Ignoring ordered flag, as just executing totally ordered is fine.
7474
try:
7575
lazy_frame: pl.LazyFrame = self._compiler.compile(
76-
array_value.ArrayValue(plan)
76+
array_value.ArrayValue(plan).node
7777
)
7878
except Exception:
7979
return None

bigframes/testing/polars_session.py

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -41,7 +41,7 @@ def peek(
4141
"""
4242
A 'peek' efficiently accesses a small number of rows in the dataframe.
4343
"""
44-
lazy_frame: polars.LazyFrame = self.compiler.compile(array_value)
44+
lazy_frame: polars.LazyFrame = self.compiler.compile(array_value.node)
4545
pa_table = lazy_frame.collect().limit(n_rows).to_arrow()
4646
# Currently, pyarrow types might not quite be exactly the ones in the bigframes schema.
4747
# Nullability may be different, and might use large versions of list, string datatypes.
@@ -64,7 +64,7 @@ def execute(
6464
"""
6565
Execute the ArrayValue, storing the result to a temporary session-owned table.
6666
"""
67-
lazy_frame: polars.LazyFrame = self.compiler.compile(array_value)
67+
lazy_frame: polars.LazyFrame = self.compiler.compile(array_value.node)
6868
pa_table = lazy_frame.collect().to_arrow()
6969
# Currently, pyarrow types might not quite be exactly the ones in the bigframes schema.
7070
# Nullability may be different, and might use large versions of list, string datatypes.

noxfile.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -108,7 +108,7 @@
108108
SYSTEM_TEST_EXTRAS: List[str] = []
109109
SYSTEM_TEST_EXTRAS_BY_PYTHON: Dict[str, List[str]] = {
110110
"3.9": ["tests", "anywidget"],
111-
"3.10": ["tests"],
111+
"3.10": ["tests", "polars"],
112112
"3.12": ["tests", "scikit-learn", "polars", "anywidget"],
113113
"3.13": ["tests", "polars"],
114114
}

testing/constraints-3.10.txt

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -15,3 +15,4 @@ matplotlib==3.7.1
1515
psutil==5.9.5
1616
seaborn==0.13.1
1717
traitlets==5.7.1
18+
polars==1.7.0
Lines changed: 76 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,76 @@
1+
# Copyright 2025 Google LLC
2+
#
3+
# Licensed under the Apache License, Version 2.0 (the "License");
4+
# you may not use this file except in compliance with the License.
5+
# You may obtain a copy of the License at
6+
#
7+
# http://www.apache.org/licenses/LICENSE-2.0
8+
#
9+
# Unless required by applicable law or agreed to in writing, software
10+
# distributed under the License is distributed on an "AS IS" BASIS,
11+
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12+
# See the License for the specific language governing permissions and
13+
# limitations under the License.
14+
import pytest
15+
16+
import bigframes
17+
from bigframes.testing.utils import assert_pandas_df_equal
18+
19+
polars = pytest.importorskip("polars", reason="polars is required for this test")
20+
21+
22+
@pytest.fixture(scope="module")
23+
def session_w_polars():
24+
context = bigframes.BigQueryOptions(location="US", enable_polars_execution=True)
25+
session = bigframes.Session(context=context)
26+
yield session
27+
session.close() # close generated session at cleanup time
28+
29+
30+
def test_polar_execution_sorted(session_w_polars, scalars_pandas_df_index):
31+
execution_count_before = session_w_polars._metrics.execution_count
32+
bf_df = session_w_polars.read_pandas(scalars_pandas_df_index)
33+
34+
pd_result = scalars_pandas_df_index.sort_index(ascending=False)[
35+
["int64_too", "bool_col"]
36+
]
37+
bf_result = bf_df.sort_index(ascending=False)[["int64_too", "bool_col"]].to_pandas()
38+
39+
assert session_w_polars._metrics.execution_count == execution_count_before
40+
assert_pandas_df_equal(bf_result, pd_result)
41+
42+
43+
def test_polar_execution_sorted_filtered(session_w_polars, scalars_pandas_df_index):
44+
execution_count_before = session_w_polars._metrics.execution_count
45+
bf_df = session_w_polars.read_pandas(scalars_pandas_df_index)
46+
47+
pd_result = scalars_pandas_df_index.sort_index(ascending=False).dropna(
48+
subset=["int64_col", "string_col"]
49+
)
50+
bf_result = (
51+
bf_df.sort_index(ascending=False)
52+
.dropna(subset=["int64_col", "string_col"])
53+
.to_pandas()
54+
)
55+
56+
# Filter and isnull not supported by polar engine yet, so falls back to bq execution
57+
assert session_w_polars._metrics.execution_count == (execution_count_before + 1)
58+
assert_pandas_df_equal(bf_result, pd_result)
59+
60+
61+
def test_polar_execution_unsupported_sql_fallback(
62+
session_w_polars, scalars_pandas_df_index
63+
):
64+
execution_count_before = session_w_polars._metrics.execution_count
65+
bf_df = session_w_polars.read_pandas(scalars_pandas_df_index)
66+
67+
pd_df = scalars_pandas_df_index.copy()
68+
pd_df["str_len_col"] = pd_df.string_col.str.len()
69+
pd_result = pd_df
70+
71+
bf_df["str_len_col"] = bf_df.string_col.str.len()
72+
bf_result = bf_df.to_pandas()
73+
74+
# str len not supported by polar engine yet, so falls back to bq execution
75+
assert session_w_polars._metrics.execution_count == (execution_count_before + 1)
76+
assert_pandas_df_equal(bf_result, pd_result)

0 commit comments

Comments
 (0)