Skip to content

Commit 44f4137

Browse files
authored
feat: JSON dtype support for read_pandas and Series constructor (#1391)
* feat: JSON dtype support for read_pandas and Series constructor * add read_pandas tests * address comments
1 parent 7e9e93a commit 44f4137

File tree

9 files changed

+187
-49
lines changed

9 files changed

+187
-49
lines changed

bigframes/core/compile/compiler.py

Lines changed: 14 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -20,16 +20,17 @@
2020

2121
import bigframes_vendored.ibis.backends.bigquery as ibis_bigquery
2222
import bigframes_vendored.ibis.expr.api as ibis_api
23+
import bigframes_vendored.ibis.expr.datatypes as ibis_dtypes
2324
import bigframes_vendored.ibis.expr.types as ibis_types
2425
import google.cloud.bigquery
2526
import pandas as pd
2627

28+
from bigframes import dtypes
2729
from bigframes.core import utils
2830
import bigframes.core.compile.compiled as compiled
2931
import bigframes.core.compile.concat as concat_impl
3032
import bigframes.core.compile.explode
3133
import bigframes.core.compile.ibis_types
32-
import bigframes.core.compile.scalar_op_compiler
3334
import bigframes.core.compile.scalar_op_compiler as compile_scalar
3435
import bigframes.core.compile.schema_translator
3536
import bigframes.core.expression as ex
@@ -224,6 +225,18 @@ def compile_read_table_unordered(
224225
ibis_table = self.read_table_as_unordered_ibis(
225226
source, scan_cols=[col.source_id for col in scan.items]
226227
)
228+
229+
# TODO(b/395912450): Remove workaround solution once b/374784249 got resolved.
230+
for scan_item in scan.items:
231+
if (
232+
scan_item.dtype == dtypes.JSON_DTYPE
233+
and ibis_table[scan_item.source_id].type() == ibis_dtypes.string
234+
):
235+
json_column = compile_scalar.parse_json(
236+
ibis_table[scan_item.source_id]
237+
).name(scan_item.source_id)
238+
ibis_table = ibis_table.mutate(json_column)
239+
227240
return compiled.UnorderedIR(
228241
ibis_table,
229242
tuple(

bigframes/core/utils.py

Lines changed: 22 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -24,6 +24,7 @@
2424
import pandas.api.types as pdtypes
2525
import typing_extensions
2626

27+
import bigframes.dtypes as dtypes
2728
import bigframes.exceptions as bfe
2829

2930
UNNAMED_COLUMN_ID = "bigframes_unnamed_column"
@@ -226,3 +227,24 @@ def replace_timedeltas_with_micros(dataframe: pd.DataFrame) -> List[str]:
226227
updated_columns.append(dataframe.index.name)
227228

228229
return updated_columns
230+
231+
232+
def replace_json_with_string(dataframe: pd.DataFrame) -> List[str]:
233+
"""
234+
Due to a BigQuery IO limitation with loading JSON from Parquet files (b/374784249),
235+
we're using a workaround: storing JSON as strings and then parsing them into JSON
236+
objects.
237+
TODO(b/395912450): Remove workaround solution once b/374784249 got resolved.
238+
"""
239+
updated_columns = []
240+
241+
for col in dataframe.columns:
242+
if dataframe[col].dtype == dtypes.JSON_DTYPE:
243+
dataframe[col] = dataframe[col].astype(dtypes.STRING_DTYPE)
244+
updated_columns.append(col)
245+
246+
if dataframe.index.dtype == dtypes.JSON_DTYPE:
247+
dataframe.index = dataframe.index.astype(dtypes.STRING_DTYPE)
248+
updated_columns.append(dataframe.index.name)
249+
250+
return updated_columns

bigframes/dtypes.py

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -301,6 +301,7 @@ def is_object_like(type_: Union[ExpressionType, str]) -> bool:
301301
return type_ in ("object", "O") or (
302302
getattr(type_, "kind", None) == "O"
303303
and getattr(type_, "storage", None) != "pyarrow"
304+
and getattr(type_, "name", None) != "dbjson"
304305
)
305306

306307

bigframes/session/_io/pandas.py

Lines changed: 9 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -14,7 +14,8 @@
1414
from __future__ import annotations
1515

1616
import dataclasses
17-
from typing import Collection, List, Union
17+
import typing
18+
from typing import Collection, Union
1819

1920
import bigframes_vendored.constants as constants
2021
import db_dtypes # type: ignore
@@ -38,7 +39,7 @@ class DataFrameAndLabels:
3839
column_labels: Collection
3940
index_labels: Collection
4041
ordering_col: str
41-
timedelta_cols: List[str]
42+
col_type_overrides: typing.Dict[str, bigframes.dtypes.Dtype]
4243

4344

4445
def _arrow_to_pandas_arrowdtype(
@@ -165,11 +166,16 @@ def pandas_to_bq_compatible(pandas_dataframe: pandas.DataFrame) -> DataFrameAndL
165166
pandas_dataframe_copy[ordering_col] = np.arange(pandas_dataframe_copy.shape[0])
166167

167168
timedelta_cols = utils.replace_timedeltas_with_micros(pandas_dataframe_copy)
169+
json_cols = utils.replace_json_with_string(pandas_dataframe_copy)
170+
col_type_overrides: typing.Dict[str, bigframes.dtypes.Dtype] = {
171+
**{col: bigframes.dtypes.TIMEDELTA_DTYPE for col in timedelta_cols},
172+
**{col: bigframes.dtypes.JSON_DTYPE for col in json_cols},
173+
}
168174

169175
return DataFrameAndLabels(
170176
df=pandas_dataframe_copy,
171177
column_labels=col_labels,
172178
index_labels=idx_labels,
173179
ordering_col=ordering_col,
174-
timedelta_cols=timedelta_cols,
180+
col_type_overrides=col_type_overrides,
175181
)

bigframes/session/loader.py

Lines changed: 2 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -176,15 +176,11 @@ def read_pandas_load_job(
176176
self._start_generic_job(load_job)
177177

178178
destination_table = self._bqclient.get_table(load_table_destination)
179-
col_type_overrides: typing.Dict[str, bigframes.dtypes.Dtype] = {
180-
col: bigframes.dtypes.TIMEDELTA_DTYPE
181-
for col in df_and_labels.timedelta_cols
182-
}
183179
array_value = core.ArrayValue.from_table(
184180
table=destination_table,
185181
# TODO (b/394156190): Generate this directly from original pandas df.
186182
schema=schemata.ArraySchema.from_bq_table(
187-
destination_table, col_type_overrides
183+
destination_table, df_and_labels.col_type_overrides
188184
),
189185
session=self._session,
190186
offsets_col=ordering_col,
@@ -234,16 +230,11 @@ def read_pandas_streaming(
234230
raise ValueError(
235231
f"Problem loading at least one row from DataFrame: {errors}. {constants.FEEDBACK_LINK}"
236232
)
237-
238-
col_type_overrides: typing.Dict[str, bigframes.dtypes.Dtype] = {
239-
col: bigframes.dtypes.TIMEDELTA_DTYPE
240-
for col in df_and_labels.timedelta_cols
241-
}
242233
array_value = (
243234
core.ArrayValue.from_table(
244235
table=destination_table,
245236
schema=schemata.ArraySchema.from_bq_table(
246-
destination_table, col_type_overrides
237+
destination_table, df_and_labels.col_type_overrides
247238
),
248239
session=self._session,
249240
# Don't set the offsets column because we want to group by it.

tests/system/small/bigquery/test_json.py

Lines changed: 31 additions & 33 deletions
Original file line numberDiff line numberDiff line change
@@ -12,8 +12,7 @@
1212
# See the License for the specific language governing permissions and
1313
# limitations under the License.
1414

15-
import json
16-
15+
import db_dtypes # type: ignore
1716
import geopandas as gpd # type: ignore
1817
import pandas as pd
1918
import pyarrow as pa
@@ -24,19 +23,6 @@
2423
import bigframes.pandas as bpd
2524

2625

27-
def _get_series_from_json(json_data):
28-
# Note: converts None to sql "null" and not to json none.
29-
values = [
30-
f"JSON '{json.dumps(data)}'" if data is not None else "NULL"
31-
for data in json_data
32-
]
33-
sql = " UNION ALL ".join(
34-
[f"SELECT {id} AS id, {value} AS data" for id, value in enumerate(values)]
35-
)
36-
df = bpd.read_gbq(sql).set_index("id").sort_index()
37-
return df["data"]
38-
39-
4026
@pytest.mark.parametrize(
4127
("json_path", "expected_json"),
4228
[
@@ -45,10 +31,11 @@ def _get_series_from_json(json_data):
4531
],
4632
)
4733
def test_json_set_at_json_path(json_path, expected_json):
48-
s = _get_series_from_json([{"a": {"b": {"c": "tester", "d": []}}}])
34+
original_json = [{"a": {"b": {"c": "tester", "d": []}}}]
35+
s = bpd.Series(original_json, dtype=db_dtypes.JSONDtype())
4936
actual = bbq.json_set(s, json_path_value_pairs=[(json_path, 10)])
5037

51-
expected = _get_series_from_json(expected_json)
38+
expected = bpd.Series(expected_json, dtype=db_dtypes.JSONDtype())
5239
pd.testing.assert_series_equal(
5340
actual.to_pandas(),
5441
expected.to_pandas(),
@@ -65,41 +52,43 @@ def test_json_set_at_json_path(json_path, expected_json):
6552
],
6653
)
6754
def test_json_set_at_json_value_type(json_value, expected_json):
68-
s = _get_series_from_json([{"a": {"b": "dev"}}, {"a": {"b": [1, 2]}}])
55+
original_json = [{"a": {"b": "dev"}}, {"a": {"b": [1, 2]}}]
56+
s = bpd.Series(original_json, dtype=db_dtypes.JSONDtype())
6957
actual = bbq.json_set(s, json_path_value_pairs=[("$.a.b", json_value)])
7058

71-
expected = _get_series_from_json(expected_json)
59+
expected = bpd.Series(expected_json, dtype=db_dtypes.JSONDtype())
7260
pd.testing.assert_series_equal(
7361
actual.to_pandas(),
7462
expected.to_pandas(),
7563
)
7664

7765

7866
def test_json_set_w_more_pairs():
79-
s = _get_series_from_json([{"a": 2}, {"b": 5}, {"c": 1}])
67+
original_json = [{"a": 2}, {"b": 5}, {"c": 1}]
68+
s = bpd.Series(original_json, dtype=db_dtypes.JSONDtype())
8069
actual = bbq.json_set(
8170
s, json_path_value_pairs=[("$.a", 1), ("$.b", 2), ("$.a", [3, 4, 5])]
8271
)
83-
expected = _get_series_from_json(
84-
[{"a": 3, "b": 2}, {"a": 4, "b": 2}, {"a": 5, "b": 2, "c": 1}]
85-
)
72+
73+
expected_json = [{"a": 3, "b": 2}, {"a": 4, "b": 2}, {"a": 5, "b": 2, "c": 1}]
74+
expected = bpd.Series(expected_json, dtype=db_dtypes.JSONDtype())
8675
pd.testing.assert_series_equal(
8776
actual.to_pandas(),
8877
expected.to_pandas(),
8978
)
9079

9180

9281
def test_json_set_w_invalid_json_path_value_pairs():
82+
s = bpd.Series([{"a": 10}], dtype=db_dtypes.JSONDtype())
9383
with pytest.raises(ValueError):
94-
bbq.json_set(
95-
_get_series_from_json([{"a": 10}]), json_path_value_pairs=[("$.a", 1, 100)] # type: ignore
96-
)
84+
bbq.json_set(s, json_path_value_pairs=[("$.a", 1, 100)]) # type: ignore
9785

9886

9987
def test_json_set_w_invalid_value_type():
88+
s = bpd.Series([{"a": 10}], dtype=db_dtypes.JSONDtype())
10089
with pytest.raises(TypeError):
10190
bbq.json_set(
102-
_get_series_from_json([{"a": 10}]),
91+
s,
10392
json_path_value_pairs=[
10493
(
10594
"$.a",
@@ -117,19 +106,25 @@ def test_json_set_w_invalid_series_type():
117106

118107

119108
def test_json_extract_from_json():
120-
s = _get_series_from_json([{"a": {"b": [1, 2]}}, {"a": {"c": 1}}, {"a": {"b": 0}}])
109+
s = bpd.Series(
110+
[{"a": {"b": [1, 2]}}, {"a": {"c": 1}}, {"a": {"b": 0}}],
111+
dtype=db_dtypes.JSONDtype(),
112+
)
121113
actual = bbq.json_extract(s, "$.a.b").to_pandas()
122-
expected = _get_series_from_json([[1, 2], None, 0]).to_pandas()
114+
expected = bpd.Series([[1, 2], None, 0], dtype=db_dtypes.JSONDtype()).to_pandas()
123115
pd.testing.assert_series_equal(
124116
actual,
125117
expected,
126118
)
127119

128120

129121
def test_json_extract_from_string():
130-
s = bpd.Series(['{"a": {"b": [1, 2]}}', '{"a": {"c": 1}}', '{"a": {"b": 0}}'])
122+
s = bpd.Series(
123+
['{"a": {"b": [1, 2]}}', '{"a": {"c": 1}}', '{"a": {"b": 0}}'],
124+
dtype=pd.StringDtype(storage="pyarrow"),
125+
)
131126
actual = bbq.json_extract(s, "$.a.b")
132-
expected = bpd.Series(["[1,2]", None, "0"])
127+
expected = bpd.Series(["[1,2]", None, "0"], dtype=pd.StringDtype(storage="pyarrow"))
133128
pd.testing.assert_series_equal(
134129
actual.to_pandas(),
135130
expected.to_pandas(),
@@ -142,8 +137,9 @@ def test_json_extract_w_invalid_series_type():
142137

143138

144139
def test_json_extract_array_from_json():
145-
s = _get_series_from_json(
146-
[{"a": ["ab", "2", "3 xy"]}, {"a": []}, {"a": ["4", "5"]}, {}]
140+
s = bpd.Series(
141+
[{"a": ["ab", "2", "3 xy"]}, {"a": []}, {"a": ["4", "5"]}, {}],
142+
dtype=db_dtypes.JSONDtype(),
147143
)
148144
actual = bbq.json_extract_array(s, "$.a")
149145

@@ -160,6 +156,8 @@ def test_json_extract_array_from_json():
160156
"""
161157
df = bpd.read_gbq(sql).set_index("id").sort_index()
162158
expected = df["data"]
159+
expected.index.name = None
160+
expected.name = None
163161

164162
pd.testing.assert_series_equal(
165163
actual.to_pandas(),

tests/system/small/test_dataframe_io.py

Lines changed: 15 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -658,6 +658,21 @@ def test_to_gbq_w_invalid_destination_table(scalars_df_index):
658658
scalars_df_index.to_gbq("table_id")
659659

660660

661+
def test_to_gbq_w_json(bigquery_client):
662+
"""Test the `to_gbq` API can get a JSON column."""
663+
s1 = bpd.Series([1, 2, 3, 4])
664+
s2 = bpd.Series(
665+
["a", 1, False, ["a", {"b": 1}], {"c": [1, 2, 3]}], dtype=db_dtypes.JSONDtype()
666+
)
667+
668+
df = bpd.DataFrame({"id": s1, "json_col": s2})
669+
destination_table = df.to_gbq()
670+
table = bigquery_client.get_table(destination_table)
671+
672+
assert table.schema[1].name == "json_col"
673+
assert table.schema[1].field_type == "JSON"
674+
675+
661676
@pytest.mark.parametrize(
662677
("index"),
663678
[True, False],

tests/system/small/test_series.py

Lines changed: 21 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -237,7 +237,7 @@ def test_series_construct_geodata():
237237
pytest.param(pd.StringDtype(storage="pyarrow"), id="string"),
238238
],
239239
)
240-
def test_series_construct_w_dtype_for_int(dtype):
240+
def test_series_construct_w_dtype(dtype):
241241
data = [1, 2, 3]
242242
expected = pd.Series(data, dtype=dtype)
243243
expected.index = expected.index.astype("Int64")
@@ -302,6 +302,26 @@ def test_series_construct_w_dtype_for_array_struct():
302302
)
303303

304304

305+
def test_series_construct_w_dtype_for_json():
306+
data = [
307+
1,
308+
"str",
309+
False,
310+
["a", {"b": 1}, None],
311+
None,
312+
{"a": {"b": [1, 2, 3], "c": True}},
313+
]
314+
s = bigframes.pandas.Series(data, dtype=db_dtypes.JSONDtype())
315+
316+
assert s[0] == 1
317+
assert s[1] == "str"
318+
assert s[2] is False
319+
assert s[3][0] == "a"
320+
assert s[3][1]["b"] == 1
321+
assert pd.isna(s[4])
322+
assert s[5]["a"] == {"b": [1, 2, 3], "c": True}
323+
324+
305325
def test_series_keys(scalars_dfs):
306326
scalars_df, scalars_pandas_df = scalars_dfs
307327
bf_result = scalars_df["int64_col"].keys().to_pandas()

0 commit comments

Comments
 (0)