Skip to content

Commit 633bf98

Browse files
feat: add bpd.read_arrow to convert an Arrow object into a bigframes DataFrame (#1855)
* feat: Add read_arrow methods to Session and pandas Adds `read_arrow` methods to `bigframes.session.Session` and `bigframes.pandas.read_arrow` for creating BigQuery DataFrames DataFrames from PyArrow Tables. The implementation refactors existing logic from `bigframes.session._io.bigquery.read_gbq_query` for converting Arrow data into BigFrames DataFrames. Includes: - New file `bigframes/session/_io/arrow.py` with the core conversion logic. - `read_arrow(pa.Table) -> bpd.DataFrame` in `Session` class. - `read_arrow(pa.Table) -> bpd.DataFrame` in `pandas` module. - Unit and system tests for the new functionality. - Docstrings for new methods/functions. Note: Unit tests for direct DataFrame operations (shape, to_pandas) on the result of read_arrow are currently failing due to the complexity of mocking the session and executor for LocalDataNode interactions. System tests are recommended for full end-to-end validation. * rearrange * fix unit tests --------- Co-authored-by: google-labs-jules[bot] <161369871+google-labs-jules[bot]@users.noreply.github.com>
1 parent fab3c38 commit 633bf98

File tree

5 files changed

+200
-0
lines changed

5 files changed

+200
-0
lines changed

bigframes/core/blocks.py

Lines changed: 31 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -50,6 +50,7 @@
5050
import bigframes.core.identifiers
5151
import bigframes.core.join_def as join_defs
5252
import bigframes.core.ordering as ordering
53+
import bigframes.core.pyarrow_utils as pyarrow_utils
5354
import bigframes.core.schema as bf_schema
5455
import bigframes.core.sql as sql
5556
import bigframes.core.utils as utils
@@ -156,6 +157,36 @@ def __init__(
156157
self._view_ref: Optional[bigquery.TableReference] = None
157158
self._view_ref_dry_run: Optional[bigquery.TableReference] = None
158159

160+
@classmethod
161+
def from_pyarrow(
162+
cls,
163+
data: pa.Table,
164+
session: bigframes.Session,
165+
) -> Block:
166+
column_labels = data.column_names
167+
168+
# TODO(tswast): Use array_value.promote_offsets() instead once that node is
169+
# supported by the local engine.
170+
offsets_col = bigframes.core.guid.generate_guid()
171+
index_ids = [offsets_col]
172+
index_labels = [None]
173+
174+
# TODO(https://github.com/googleapis/python-bigquery-dataframes/issues/859):
175+
# Allow users to specify the "total ordering" column(s) or allow multiple
176+
# such columns.
177+
data = pyarrow_utils.append_offsets(data, offsets_col=offsets_col)
178+
179+
# from_pyarrow will normalize the types for us.
180+
managed_data = local_data.ManagedArrowTable.from_pyarrow(data)
181+
array_value = core.ArrayValue.from_managed(managed_data, session=session)
182+
block = cls(
183+
array_value,
184+
column_labels=column_labels,
185+
index_columns=index_ids,
186+
index_labels=index_labels,
187+
)
188+
return block
189+
159190
@classmethod
160191
def from_local(
161192
cls,

bigframes/pandas/__init__.py

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -40,6 +40,7 @@
4040
from bigframes.pandas.io.api import (
4141
_read_gbq_colab,
4242
from_glob_path,
43+
read_arrow,
4344
read_csv,
4445
read_gbq,
4546
read_gbq_function,
@@ -367,6 +368,7 @@ def reset_session():
367368
merge,
368369
qcut,
369370
read_csv,
371+
read_arrow,
370372
read_gbq,
371373
_read_gbq_colab,
372374
read_gbq_function,

bigframes/pandas/io/api.py

Lines changed: 16 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -44,6 +44,7 @@
4444
ReadPickleBuffer,
4545
StorageOptions,
4646
)
47+
import pyarrow as pa
4748

4849
import bigframes._config as config
4950
import bigframes.core.global_session as global_session
@@ -72,6 +73,21 @@
7273
# method and its arguments.
7374

7475

76+
def read_arrow(pa_table: pa.Table) -> bigframes.dataframe.DataFrame:
77+
"""Load a PyArrow Table to a BigQuery DataFrames DataFrame.
78+
79+
Args:
80+
pa_table (pyarrow.Table):
81+
PyArrow table to load data from.
82+
83+
Returns:
84+
bigframes.dataframe.DataFrame:
85+
A new DataFrame representing the data from the PyArrow table.
86+
"""
87+
session = global_session.get_global_session()
88+
return session.read_arrow(pa_table=pa_table)
89+
90+
7591
def read_csv(
7692
filepath_or_buffer: str | IO["bytes"],
7793
*,

bigframes/session/__init__.py

Lines changed: 18 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -55,12 +55,14 @@
5555
ReadPickleBuffer,
5656
StorageOptions,
5757
)
58+
import pyarrow as pa
5859

5960
from bigframes import exceptions as bfe
6061
from bigframes import version
6162
import bigframes._config.bigquery_options as bigquery_options
6263
import bigframes.clients
6364
import bigframes.constants
65+
import bigframes.core
6466
from bigframes.core import blocks, log_adapter, utils
6567
import bigframes.core.pyformat
6668

@@ -967,6 +969,22 @@ def _read_pandas_inline(
967969
local_block = blocks.Block.from_local(pandas_dataframe, self)
968970
return dataframe.DataFrame(local_block)
969971

972+
def read_arrow(self, pa_table: pa.Table) -> bigframes.dataframe.DataFrame:
973+
"""Load a PyArrow Table to a BigQuery DataFrames DataFrame.
974+
975+
Args:
976+
pa_table (pyarrow.Table):
977+
PyArrow table to load data from.
978+
979+
Returns:
980+
bigframes.dataframe.DataFrame:
981+
A new DataFrame representing the data from the PyArrow table.
982+
"""
983+
import bigframes.dataframe as dataframe
984+
985+
local_block = blocks.Block.from_pyarrow(pa_table, self)
986+
return dataframe.DataFrame(local_block)
987+
970988
def read_csv(
971989
self,
972990
filepath_or_buffer: str | IO["bytes"],

tests/unit/session/test_io_arrow.py

Lines changed: 133 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,133 @@
1+
# Copyright 2025 Google LLC
2+
#
3+
# Licensed under the Apache License, Version 2.0 (the "License");
4+
# you may not use this file except in compliance with the License.
5+
# You may obtain a copy of the License at
6+
#
7+
# http://www.apache.org/licenses/LICENSE-2.0
8+
#
9+
# Unless required by applicable law or agreed to in writing, software
10+
# distributed under the License is distributed on an "AS IS" BASIS,
11+
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12+
# See the License for the specific language governing permissions and
13+
# limitations under the License.
14+
15+
import datetime
16+
17+
import pyarrow as pa
18+
import pytest
19+
20+
import bigframes.pandas as bpd
21+
from bigframes.testing import mocks
22+
23+
24+
@pytest.fixture(scope="module")
25+
def session():
26+
# Use the mock session from bigframes.testing
27+
return mocks.create_bigquery_session()
28+
29+
30+
def test_read_arrow_empty_table(session):
31+
empty_table = pa.Table.from_pydict(
32+
{
33+
"col_a": pa.array([], type=pa.int64()),
34+
"col_b": pa.array([], type=pa.string()),
35+
}
36+
)
37+
df = session.read_arrow(empty_table)
38+
assert isinstance(df, bpd.DataFrame)
39+
assert df.shape == (0, 2)
40+
assert list(df.columns) == ["col_a", "col_b"]
41+
pd_df = df.to_pandas()
42+
assert pd_df.empty
43+
assert list(pd_df.columns) == ["col_a", "col_b"]
44+
assert pd_df["col_a"].dtype == "Int64"
45+
assert pd_df["col_b"].dtype == "string[pyarrow]"
46+
47+
48+
@pytest.mark.parametrize(
49+
"data,arrow_type,expected_bq_type_kind",
50+
[
51+
([1, 2], pa.int8(), "INTEGER"),
52+
([1, 2], pa.int16(), "INTEGER"),
53+
([1, 2], pa.int32(), "INTEGER"),
54+
([1, 2], pa.int64(), "INTEGER"),
55+
([1.0, 2.0], pa.float32(), "FLOAT"),
56+
([1.0, 2.0], pa.float64(), "FLOAT"),
57+
([True, False], pa.bool_(), "BOOLEAN"),
58+
(["a", "b"], pa.string(), "STRING"),
59+
(["a", "b"], pa.large_string(), "STRING"),
60+
([b"a", b"b"], pa.binary(), "BYTES"),
61+
([b"a", b"b"], pa.large_binary(), "BYTES"),
62+
(
63+
[
64+
pa.scalar(1000, type=pa.duration("s")),
65+
pa.scalar(2000, type=pa.duration("s")),
66+
],
67+
pa.duration("s"),
68+
"INTEGER",
69+
),
70+
([datetime.date(2023, 1, 1)], pa.date32(), "DATE"),
71+
(
72+
[datetime.datetime(2023, 1, 1, 12, 0, 0, tzinfo=datetime.timezone.utc)],
73+
pa.timestamp("s", tz="UTC"),
74+
"TIMESTAMP",
75+
),
76+
(
77+
[datetime.datetime(2023, 1, 1, 12, 0, 0, tzinfo=datetime.timezone.utc)],
78+
pa.timestamp("ms", tz="UTC"),
79+
"TIMESTAMP",
80+
),
81+
(
82+
[datetime.datetime(2023, 1, 1, 12, 0, 0, tzinfo=datetime.timezone.utc)],
83+
pa.timestamp("us", tz="UTC"),
84+
"TIMESTAMP",
85+
),
86+
([datetime.time(12, 34, 56, 789000)], pa.time64("us"), "TIME"),
87+
],
88+
)
89+
def test_read_arrow_type_mappings(session, data, arrow_type, expected_bq_type_kind):
90+
"""
91+
Tests that various arrow types are mapped to the expected BigQuery types.
92+
This is an indirect check via the resulting DataFrame's schema.
93+
"""
94+
pa_table = pa.Table.from_arrays([pa.array(data, type=arrow_type)], names=["col"])
95+
df = session.read_arrow(pa_table)
96+
97+
bigquery_schema = df._block.expr.schema.to_bigquery()
98+
assert len(bigquery_schema) == 2 # offsets + value
99+
field = bigquery_schema[-1]
100+
assert field.field_type.upper() == expected_bq_type_kind
101+
102+
# Also check pandas dtype after conversion for good measure
103+
pd_df = df.to_pandas()
104+
assert pd_df["col"].shape == (len(data),)
105+
106+
107+
def test_read_arrow_list_type(session):
108+
pa_table = pa.Table.from_arrays(
109+
[pa.array([[1, 2], [3, 4, 5]], type=pa.list_(pa.int64()))], names=["list_col"]
110+
)
111+
df = session.read_arrow(pa_table)
112+
113+
bigquery_schema = df._block.expr.schema.to_bigquery()
114+
assert len(bigquery_schema) == 2 # offsets + value
115+
field = bigquery_schema[-1]
116+
assert field.mode.upper() == "REPEATED"
117+
assert field.field_type.upper() == "INTEGER"
118+
119+
120+
def test_read_arrow_struct_type(session):
121+
struct_type = pa.struct([("a", pa.int64()), ("b", pa.string())])
122+
pa_table = pa.Table.from_arrays(
123+
[pa.array([{"a": 1, "b": "x"}, {"a": 2, "b": "y"}], type=struct_type)],
124+
names=["struct_col"],
125+
)
126+
df = session.read_arrow(pa_table)
127+
128+
bigquery_schema = df._block.expr.schema.to_bigquery()
129+
assert len(bigquery_schema) == 2 # offsets + value
130+
field = bigquery_schema[-1]
131+
assert field.field_type.upper() == "RECORD"
132+
assert field.fields[0].name == "a"
133+
assert field.fields[1].name == "b"

0 commit comments

Comments
 (0)