Skip to content

Commit 570a40b

Browse files
test: Add ReadLocalNode tests (#1794)
* test: Add ReadLocalNode tests * adapt to canonical output types * fix sql snapshot expectation * comments
1 parent ba7c313 commit 570a40b

File tree

10 files changed

+415
-10
lines changed

10 files changed

+415
-10
lines changed

bigframes/core/local_data.py

Lines changed: 17 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -265,7 +265,13 @@ def _adapt_pandas_series(
265265
) -> tuple[Union[pa.ChunkedArray, pa.Array], bigframes.dtypes.Dtype]:
266266
# Mostly rely on pyarrow conversions, but have to convert geo without its help.
267267
if series.dtype == bigframes.dtypes.GEO_DTYPE:
268-
series = geopandas.GeoSeries(series).to_wkt(rounding_precision=-1)
268+
# geoseries produces eg "POINT (1, 1)", while bq uses style "POINT(1, 1)"
269+
# we normalize to bq style for consistency
270+
series = (
271+
geopandas.GeoSeries(series)
272+
.to_wkt(rounding_precision=-1)
273+
.str.replace(r"(\w+) \(", repl=r"\1(", regex=True)
274+
)
269275
return pa.array(series, type=pa.string()), bigframes.dtypes.GEO_DTYPE
270276
try:
271277
return _adapt_arrow_array(pa.array(series))
@@ -326,7 +332,7 @@ def _adapt_arrow_array(array: pa.Array) -> tuple[pa.Array, bigframes.dtypes.Dtyp
326332
return new_value.fill_null([]), bigframes.dtypes.list_type(values_type)
327333
if array.type == bigframes.dtypes.JSON_ARROW_TYPE:
328334
return _canonicalize_json(array), bigframes.dtypes.JSON_DTYPE
329-
target_type = _logical_type_replacements(array.type)
335+
target_type = logical_type_replacements(array.type)
330336
if target_type != array.type:
331337
# TODO: Maybe warn if lossy conversion?
332338
array = array.cast(target_type)
@@ -372,6 +378,10 @@ def recursive_f(type: pa.DataType) -> pa.DataType:
372378
if new_field_t != type.value_type:
373379
return pa.list_(new_field_t)
374380
return type
381+
# polars can produce large lists, and we want to map these down to regular lists
382+
if pa.types.is_large_list(type):
383+
new_field_t = recursive_f(type.value_type)
384+
return pa.list_(new_field_t)
375385
if pa.types.is_struct(type):
376386
struct_type = cast(pa.StructType, type)
377387
new_fields: list[pa.Field] = []
@@ -385,7 +395,7 @@ def recursive_f(type: pa.DataType) -> pa.DataType:
385395

386396

387397
@_recursive_map_types
388-
def _logical_type_replacements(type: pa.DataType) -> pa.DataType:
398+
def logical_type_replacements(type: pa.DataType) -> pa.DataType:
389399
if pa.types.is_timestamp(type):
390400
# This is potentially lossy, but BigFrames doesn't support ns
391401
new_tz = "UTC" if (type.tz is not None) else None
@@ -403,8 +413,11 @@ def _logical_type_replacements(type: pa.DataType) -> pa.DataType:
403413
if pa.types.is_large_string(type):
404414
# simple string type can handle the largest strings needed
405415
return pa.string()
416+
if pa.types.is_large_binary(type):
417+
# simple string type can handle the largest strings needed
418+
return pa.binary()
406419
if pa.types.is_dictionary(type):
407-
return _logical_type_replacements(type.value_type)
420+
return logical_type_replacements(type.value_type)
408421
if pa.types.is_null(type):
409422
# null as a type not allowed, default type is float64 for bigframes
410423
return pa.float64()

bigframes/core/nodes.py

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -601,6 +601,10 @@ class ScanList:
601601

602602
items: typing.Tuple[ScanItem, ...]
603603

604+
@classmethod
605+
def from_items(cls, items: Iterable[ScanItem]) -> ScanList:
606+
return cls(tuple(items))
607+
604608
def filter_cols(
605609
self,
606610
ids: AbstractSet[identifiers.ColumnId],

bigframes/core/pyarrow_utils.py

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -94,3 +94,9 @@ def append_offsets(
9494
return pa_table.append_column(
9595
offsets_col, pa.array(range(pa_table.num_rows), type=pa.int64())
9696
)
97+
98+
99+
def as_nullable(pa_table: pa.Table):
100+
"""Normalizes schema to nullable for value-wise comparisons."""
101+
nullable_schema = pa.schema(field.with_nullable(True) for field in pa_table.schema)
102+
return pa_table.cast(nullable_schema)
Lines changed: 76 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,76 @@
1+
# Copyright 2025 Google LLC
2+
#
3+
# Licensed under the Apache License, Version 2.0 (the "License");
4+
# you may not use this file except in compliance with the License.
5+
# You may obtain a copy of the License at
6+
#
7+
# http://www.apache.org/licenses/LICENSE-2.0
8+
#
9+
# Unless required by applicable law or agreed to in writing, software
10+
# distributed under the License is distributed on an "AS IS" BASIS,
11+
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12+
# See the License for the specific language governing permissions and
13+
# limitations under the License.
14+
from __future__ import annotations
15+
16+
from typing import Optional, Tuple
17+
18+
from google.cloud import bigquery
19+
import google.cloud.bigquery.job as bq_job
20+
import google.cloud.bigquery.table as bq_table
21+
22+
from bigframes.core import compile, nodes
23+
from bigframes.session import executor, semi_executor
24+
import bigframes.session._io.bigquery as bq_io
25+
26+
27+
# used only in testing right now, BigQueryCachingExecutor is the fully featured engine
28+
# simplified, doesnt not do large >10 gb result queries, error handling, respect global config
29+
# or record metrics. Also avoids caching, and most pre-compile rewrites, to better serve as a
30+
# reference for validating more complex executors.
31+
class DirectGbqExecutor(semi_executor.SemiExecutor):
32+
def __init__(self, bqclient: bigquery.Client):
33+
self.bqclient = bqclient
34+
35+
def execute(
36+
self,
37+
plan: nodes.BigFrameNode,
38+
ordered: bool,
39+
peek: Optional[int] = None,
40+
) -> executor.ExecuteResult:
41+
"""Just execute whatever plan as is, without further caching or decomposition."""
42+
# TODO(swast): plumb through the api_name of the user-facing api that
43+
# caused this query.
44+
45+
compiled = compile.compile_sql(
46+
compile.CompileRequest(plan, sort_rows=ordered, peek_count=peek)
47+
)
48+
iterator, query_job = self._run_execute_query(
49+
sql=compiled.sql,
50+
)
51+
52+
return executor.ExecuteResult(
53+
arrow_batches=iterator.to_arrow_iterable(),
54+
schema=plan.schema,
55+
query_job=query_job,
56+
total_rows=iterator.total_rows,
57+
)
58+
59+
def _run_execute_query(
60+
self,
61+
sql: str,
62+
job_config: Optional[bq_job.QueryJobConfig] = None,
63+
) -> Tuple[bq_table.RowIterator, Optional[bigquery.QueryJob]]:
64+
"""
65+
Starts BigQuery query job and waits for results.
66+
"""
67+
return bq_io.start_query_with_client(
68+
self.bqclient,
69+
sql,
70+
job_config=job_config or bq_job.QueryJobConfig(),
71+
project=None,
72+
location=None,
73+
timeout=None,
74+
metrics=None,
75+
query_with_job=False,
76+
)

bigframes/session/polars_executor.py

Lines changed: 80 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,80 @@
1+
# Copyright 2025 Google LLC
2+
#
3+
# Licensed under the Apache License, Version 2.0 (the "License");
4+
# you may not use this file except in compliance with the License.
5+
# You may obtain a copy of the License at
6+
#
7+
# http://www.apache.org/licenses/LICENSE-2.0
8+
#
9+
# Unless required by applicable law or agreed to in writing, software
10+
# distributed under the License is distributed on an "AS IS" BASIS,
11+
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12+
# See the License for the specific language governing permissions and
13+
# limitations under the License.
14+
from __future__ import annotations
15+
16+
from typing import Optional, TYPE_CHECKING
17+
18+
import pyarrow as pa
19+
20+
from bigframes.core import array_value, bigframe_node, local_data, nodes
21+
from bigframes.session import executor, semi_executor
22+
23+
if TYPE_CHECKING:
24+
import polars as pl
25+
26+
27+
_COMPATIBLE_NODES = (
28+
nodes.ReadLocalNode,
29+
nodes.OrderByNode,
30+
nodes.ReversedNode,
31+
nodes.SelectionNode,
32+
nodes.FilterNode, # partial support
33+
nodes.ProjectionNode, # partial support
34+
)
35+
36+
37+
class PolarsExecutor(semi_executor.SemiExecutor):
38+
def __init__(self):
39+
# This will error out if polars is not installed
40+
from bigframes.core.compile.polars import PolarsCompiler
41+
42+
self._compiler = PolarsCompiler()
43+
44+
def execute(
45+
self,
46+
plan: bigframe_node.BigFrameNode,
47+
ordered: bool,
48+
peek: Optional[int] = None,
49+
) -> Optional[executor.ExecuteResult]:
50+
if not self._can_execute(plan):
51+
return None
52+
# Note: Ignoring ordered flag, as just executing totally ordered is fine.
53+
try:
54+
lazy_frame: pl.LazyFrame = self._compiler.compile(
55+
array_value.ArrayValue(plan)
56+
)
57+
except Exception:
58+
return None
59+
if peek is not None:
60+
lazy_frame = lazy_frame.limit(peek)
61+
pa_table = lazy_frame.collect().to_arrow()
62+
return executor.ExecuteResult(
63+
arrow_batches=iter(map(self._adapt_batch, pa_table.to_batches())),
64+
schema=plan.schema,
65+
total_bytes=pa_table.nbytes,
66+
total_rows=pa_table.num_rows,
67+
)
68+
69+
def _can_execute(self, plan: bigframe_node.BigFrameNode):
70+
return all(isinstance(node, _COMPATIBLE_NODES) for node in plan.unique_nodes())
71+
72+
def _adapt_array(self, array: pa.Array) -> pa.Array:
73+
target_type = local_data.logical_type_replacements(array.type)
74+
if target_type != array.type:
75+
return array.cast(target_type)
76+
return array
77+
78+
def _adapt_batch(self, batch: pa.RecordBatch) -> pa.RecordBatch:
79+
new_arrays = [self._adapt_array(arr) for arr in batch.columns]
80+
return pa.RecordBatch.from_arrays(new_arrays, names=batch.column_names)

noxfile.py

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -108,8 +108,8 @@
108108
SYSTEM_TEST_EXTRAS_BY_PYTHON: Dict[str, List[str]] = {
109109
"3.9": ["tests"],
110110
"3.10": ["tests"],
111-
"3.12": ["tests", "scikit-learn"],
112-
"3.13": ["tests"],
111+
"3.12": ["tests", "scikit-learn", "polars"],
112+
"3.13": ["tests", "polars"],
113113
}
114114

115115
LOGGING_NAME_ENV_VAR = "BIGFRAMES_PERFORMANCE_LOG_NAME"
Lines changed: 13 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,13 @@
1+
# Copyright 2025 Google LLC
2+
#
3+
# Licensed under the Apache License, Version 2.0 (the "License");
4+
# you may not use this file except in compliance with the License.
5+
# You may obtain a copy of the License at
6+
#
7+
# http://www.apache.org/licenses/LICENSE-2.0
8+
#
9+
# Unless required by applicable law or agreed to in writing, software
10+
# distributed under the License is distributed on an "AS IS" BASIS,
11+
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12+
# See the License for the specific language governing permissions and
13+
# limitations under the License.
Lines changed: 81 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,81 @@
1+
# Copyright 2025 Google LLC
2+
#
3+
# Licensed under the Apache License, Version 2.0 (the "License");
4+
# you may not use this file except in compliance with the License.
5+
# You may obtain a copy of the License at
6+
#
7+
# http://www.apache.org/licenses/LICENSE-2.0
8+
#
9+
# Unless required by applicable law or agreed to in writing, software
10+
# distributed under the License is distributed on an "AS IS" BASIS,
11+
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12+
# See the License for the specific language governing permissions and
13+
# limitations under the License.
14+
import pathlib
15+
from typing import Generator
16+
17+
from google.cloud import bigquery
18+
import pandas as pd
19+
import pytest
20+
21+
import bigframes
22+
from bigframes.core import local_data
23+
from bigframes.session import (
24+
direct_gbq_execution,
25+
local_scan_executor,
26+
polars_executor,
27+
semi_executor,
28+
)
29+
30+
CURRENT_DIR = pathlib.Path(__file__).parent
31+
DATA_DIR = CURRENT_DIR.parent.parent.parent / "data"
32+
33+
34+
@pytest.fixture(scope="module")
35+
def fake_session() -> Generator[bigframes.Session, None, None]:
36+
import bigframes.core.global_session
37+
38+
# its a "polars session", but we are bypassing session-provided execution
39+
# we just want a minimal placeholder session without expensive setup
40+
from bigframes.testing import polars_session
41+
42+
session = polars_session.TestSession()
43+
with bigframes.core.global_session._GlobalSessionContext(session):
44+
yield session
45+
46+
47+
@pytest.fixture(scope="session", params=["pyarrow", "polars", "bq"])
48+
def engine(request, bigquery_client: bigquery.Client) -> semi_executor.SemiExecutor:
49+
if request.param == "pyarrow":
50+
return local_scan_executor.LocalScanExecutor()
51+
if request.param == "polars":
52+
return polars_executor.PolarsExecutor()
53+
if request.param == "bq":
54+
return direct_gbq_execution.DirectGbqExecutor(bigquery_client)
55+
raise ValueError(f"Unrecognized param: {request.param}")
56+
57+
58+
@pytest.fixture(scope="module")
59+
def managed_data_source(
60+
scalars_pandas_df_index: pd.DataFrame,
61+
) -> local_data.ManagedArrowTable:
62+
return local_data.ManagedArrowTable.from_pandas(scalars_pandas_df_index)
63+
64+
65+
@pytest.fixture(scope="module")
66+
def zero_row_source() -> local_data.ManagedArrowTable:
67+
return local_data.ManagedArrowTable.from_pandas(pd.DataFrame({"a": [], "b": []}))
68+
69+
70+
@pytest.fixture(scope="module")
71+
def nested_data_source(
72+
nested_pandas_df: pd.DataFrame,
73+
) -> local_data.ManagedArrowTable:
74+
return local_data.ManagedArrowTable.from_pandas(nested_pandas_df)
75+
76+
77+
@pytest.fixture(scope="module")
78+
def repeated_data_source(
79+
repeated_pandas_df: pd.DataFrame,
80+
) -> local_data.ManagedArrowTable:
81+
return local_data.ManagedArrowTable.from_pandas(repeated_pandas_df)

0 commit comments

Comments
 (0)