Skip to content

Commit 200c9bb

Browse files
authored
fix: json in struct destination type (#1187)
* fix: json in struct destination type * patch output json_str to json * fix type hint * fix type hint * remove draft * use pd.ArrowDtype(pa.large_string) as bf dtype * wording * fix tests * fix tests * remove json conversions * fix remote func
1 parent e698dbf commit 200c9bb

File tree

12 files changed

+57
-37
lines changed

12 files changed

+57
-37
lines changed

bigframes/bigquery/_operations/json.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -47,7 +47,7 @@ def json_set(
4747
>>> s = bpd.read_gbq("SELECT JSON '{\\\"a\\\": 1}' AS data")["data"]
4848
>>> bbq.json_set(s, json_path_value_pairs=[("$.a", 100), ("$.b", "hi")])
4949
0 {"a":100,"b":"hi"}
50-
Name: data, dtype: string
50+
Name: data, dtype: large_string[pyarrow]
5151
5252
Args:
5353
input (bigframes.series.Series):

bigframes/core/__init__.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -107,7 +107,7 @@ def from_table(
107107
raise ValueError("must set at most one of 'offests', 'primary_key'")
108108
if any(i.field_type == "JSON" for i in table.schema if i.name in schema.names):
109109
warnings.warn(
110-
"Interpreting JSON column(s) as StringDtype. This behavior may change in future versions.",
110+
"Interpreting JSON column(s) as StringDtype and pyarrow.large_string. This behavior may change in future versions.",
111111
bigframes.exceptions.PreviewWarning,
112112
)
113113
# define data source only for needed columns, this makes row-hashing cheaper

bigframes/core/blocks.py

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -2564,13 +2564,13 @@ def _get_rows_as_json_values(self) -> Block:
25642564
),
25652565
T1 AS (
25662566
SELECT *,
2567-
JSON_OBJECT(
2567+
TO_JSON_STRING(JSON_OBJECT(
25682568
"names", [{column_names_csv}],
25692569
"types", [{column_types_csv}],
25702570
"values", [{column_references_csv}],
25712571
"indexlength", {index_columns_count},
25722572
"dtype", {pandas_row_dtype}
2573-
) AS {googlesql.identifier(row_json_column_name)} FROM T0
2573+
)) AS {googlesql.identifier(row_json_column_name)} FROM T0
25742574
)
25752575
SELECT {select_columns_csv} FROM T1
25762576
"""

bigframes/core/compile/compiled.py

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -464,7 +464,7 @@ def _set_or_replace_by_id(
464464
builder.columns = [*self.columns, new_value.name(id)]
465465
return builder.build()
466466

467-
def _select(self, values: typing.Tuple[ibis_types.Value]) -> UnorderedIR:
467+
def _select(self, values: typing.Tuple[ibis_types.Value, ...]) -> UnorderedIR:
468468
builder = self.builder()
469469
builder.columns = values
470470
return builder.build()
@@ -1129,7 +1129,7 @@ def _set_or_replace_by_id(self, id: str, new_value: ibis_types.Value) -> Ordered
11291129
builder.columns = [*self.columns, new_value.name(id)]
11301130
return builder.build()
11311131

1132-
def _select(self, values: typing.Tuple[ibis_types.Value]) -> OrderedIR:
1132+
def _select(self, values: typing.Tuple[ibis_types.Value, ...]) -> OrderedIR:
11331133
"""Safely assign by id while maintaining ordering integrity."""
11341134
# TODO: Split into explicit set and replace methods
11351135
ordering_col_ids = set(

bigframes/core/compile/compiler.py

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -59,11 +59,11 @@ def compile_sql(
5959
node = self.set_output_names(node, output_ids)
6060
if ordered:
6161
node, limit = rewrites.pullup_limit_from_slice(node)
62-
return self.compile_ordered_ir(self._preprocess(node)).to_sql(
63-
ordered=True, limit=limit
64-
)
62+
ir = self.compile_ordered_ir(self._preprocess(node))
63+
return ir.to_sql(ordered=True, limit=limit)
6564
else:
66-
return self.compile_unordered_ir(self._preprocess(node)).to_sql()
65+
ir = self.compile_unordered_ir(self._preprocess(node)) # type: ignore
66+
return ir.to_sql()
6767

6868
def compile_peek_sql(self, node: nodes.BigFrameNode, n_rows: int) -> str:
6969
return self.compile_unordered_ir(self._preprocess(node)).peek_sql(n_rows)

bigframes/core/compile/ibis_types.py

Lines changed: 3 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -24,7 +24,6 @@
2424
from bigframes_vendored.ibis.expr.datatypes.core import (
2525
dtype as python_type_to_bigquery_type,
2626
)
27-
import bigframes_vendored.ibis.expr.operations as ibis_ops
2827
import bigframes_vendored.ibis.expr.types as ibis_types
2928
import geopandas as gpd # type: ignore
3029
import google.cloud.bigquery as bigquery
@@ -46,6 +45,7 @@
4645
ibis_dtypes.Binary,
4746
ibis_dtypes.Decimal,
4847
ibis_dtypes.GeoSpatial,
48+
ibis_dtypes.JSON,
4949
]
5050

5151

@@ -74,6 +74,7 @@
7474
ibis_dtypes.GeoSpatial(geotype="geography", srid=4326, nullable=True),
7575
gpd.array.GeometryDtype(),
7676
),
77+
(ibis_dtypes.json, pd.ArrowDtype(pa.large_string())),
7778
)
7879

7980
BIGFRAMES_TO_IBIS: Dict[bigframes.dtypes.Dtype, ibis_dtypes.DataType] = {
@@ -219,12 +220,6 @@ def ibis_value_to_canonical_type(value: ibis_types.Value) -> ibis_types.Value:
219220
"""
220221
ibis_type = value.type()
221222
name = value.get_name()
222-
if ibis_type.is_json():
223-
value = ibis_ops.ToJsonString(value).to_expr() # type: ignore
224-
value = (
225-
value.case().when("null", bigframes_vendored.ibis.null()).else_(value).end()
226-
)
227-
return value.name(name)
228223
# Allow REQUIRED fields to be joined with NULLABLE fields.
229224
nullable_type = ibis_type.copy(nullable=True)
230225
return value.cast(nullable_type).name(name)
@@ -314,7 +309,7 @@ def ibis_dtype_to_bigframes_dtype(
314309
"Interpreting JSON as string. This behavior may change in future versions.",
315310
bigframes.exceptions.PreviewWarning,
316311
)
317-
return bigframes.dtypes.STRING_DTYPE
312+
return bigframes.dtypes.JSON_DTYPE
318313

319314
if ibis_dtype in IBIS_TO_BIGFRAMES:
320315
return IBIS_TO_BIGFRAMES[ibis_dtype]

bigframes/core/compile/scalar_op_compiler.py

Lines changed: 13 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -1181,7 +1181,10 @@ def json_set_op_impl(x: ibis_types.Value, y: ibis_types.Value, op: ops.JSONSet):
11811181

11821182
@scalar_op_compiler.register_unary_op(ops.JSONExtract, pass_op=True)
11831183
def json_extract_op_impl(x: ibis_types.Value, op: ops.JSONExtract):
1184-
return json_extract(json_obj=x, json_path=op.json_path)
1184+
if x.type().is_json():
1185+
return json_extract(json_obj=x, json_path=op.json_path)
1186+
# json string
1187+
return json_extract_string(json_obj=x, json_path=op.json_path)
11851188

11861189

11871190
@scalar_op_compiler.register_unary_op(ops.JSONExtractArray, pass_op=True)
@@ -1845,7 +1848,7 @@ def float_ceil(a: float) -> float:
18451848

18461849

18471850
@ibis_udf.scalar.builtin(name="parse_json")
1848-
def parse_json(a: str) -> ibis_dtypes.JSON: # type: ignore[empty-body]
1851+
def parse_json(json_str: str) -> ibis_dtypes.JSON: # type: ignore[empty-body]
18491852
"""Converts a JSON-formatted STRING value to a JSON value."""
18501853

18511854

@@ -1860,7 +1863,14 @@ def json_set( # type: ignore[empty-body]
18601863
def json_extract( # type: ignore[empty-body]
18611864
json_obj: ibis_dtypes.JSON, json_path: ibis_dtypes.String
18621865
) -> ibis_dtypes.JSON:
1863-
"""Extracts a JSON value and converts it to a SQL JSON-formatted STRING or JSON value."""
1866+
"""Extracts a JSON value and converts it to a JSON value."""
1867+
1868+
1869+
@ibis_udf.scalar.builtin(name="json_extract")
1870+
def json_extract_string( # type: ignore[empty-body]
1871+
json_obj: ibis_dtypes.String, json_path: ibis_dtypes.String
1872+
) -> ibis_dtypes.String:
1873+
"""Extracts a JSON SRING value and converts it to a SQL JSON-formatted STRING."""
18641874

18651875

18661876
@ibis_udf.scalar.builtin(name="json_extract_array")

bigframes/dtypes.py

Lines changed: 16 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -55,6 +55,8 @@
5555
BIGNUMERIC_DTYPE = pd.ArrowDtype(pa.decimal256(76, 38))
5656
# No arrow equivalent
5757
GEO_DTYPE = gpd.array.GeometryDtype()
58+
# JSON
59+
JSON_DTYPE = pd.ArrowDtype(pa.large_string())
5860

5961
# Used when storing Null expressions
6062
DEFAULT_DTYPE = FLOAT_DTYPE
@@ -132,6 +134,13 @@ class SimpleDtypeInfo:
132134
orderable=True,
133135
clusterable=True,
134136
),
137+
SimpleDtypeInfo(
138+
dtype=JSON_DTYPE,
139+
arrow_dtype=pa.large_string(),
140+
type_kind=("JSON",),
141+
orderable=False,
142+
clusterable=False,
143+
),
135144
SimpleDtypeInfo(
136145
dtype=DATE_DTYPE,
137146
arrow_dtype=pa.date32(),
@@ -281,7 +290,7 @@ def is_struct_like(type_: ExpressionType) -> bool:
281290

282291
def is_json_like(type_: ExpressionType) -> bool:
283292
# TODO: Add JSON type support
284-
return type_ == STRING_DTYPE
293+
return type_ == JSON_DTYPE or type_ == STRING_DTYPE # Including JSON string
285294

286295

287296
def is_json_encoding_type(type_: ExpressionType) -> bool:
@@ -455,8 +464,6 @@ def infer_literal_arrow_type(literal) -> typing.Optional[pa.DataType]:
455464
return bigframes_dtype_to_arrow_dtype(infer_literal_type(literal))
456465

457466

458-
# Don't have dtype for json, so just end up interpreting as STRING
459-
_REMAPPED_TYPEKINDS = {"JSON": "STRING"}
460467
_TK_TO_BIGFRAMES = {
461468
type_kind: mapping.dtype
462469
for mapping in SIMPLE_TYPES
@@ -480,16 +487,13 @@ def convert_schema_field(
480487
pa_struct = pa.struct(fields)
481488
pa_type = pa.list_(pa_struct) if is_repeated else pa_struct
482489
return field.name, pd.ArrowDtype(pa_type)
483-
elif (
484-
field.field_type in _TK_TO_BIGFRAMES or field.field_type in _REMAPPED_TYPEKINDS
485-
):
486-
singular_type = _TK_TO_BIGFRAMES[
487-
_REMAPPED_TYPEKINDS.get(field.field_type, field.field_type)
488-
]
490+
elif field.field_type in _TK_TO_BIGFRAMES:
489491
if is_repeated:
490-
pa_type = pa.list_(bigframes_dtype_to_arrow_dtype(singular_type))
492+
pa_type = pa.list_(
493+
bigframes_dtype_to_arrow_dtype(_TK_TO_BIGFRAMES[field.field_type])
494+
)
491495
return field.name, pd.ArrowDtype(pa_type)
492-
return field.name, singular_type
496+
return field.name, _TK_TO_BIGFRAMES[field.field_type]
493497
else:
494498
raise ValueError(f"Cannot handle type: {field.field_type}")
495499

@@ -639,7 +643,7 @@ def can_coerce(source_type: ExpressionType, target_type: ExpressionType) -> bool
639643
return True # None can be coerced to any supported type
640644
else:
641645
return (source_type == STRING_DTYPE) and (
642-
target_type in TEMPORAL_BIGFRAMES_TYPES
646+
target_type in TEMPORAL_BIGFRAMES_TYPES + [JSON_DTYPE]
643647
)
644648

645649

bigframes/session/executor.py

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -393,6 +393,7 @@ def peek(
393393
def head(
394394
self, array_value: bigframes.core.ArrayValue, n_rows: int
395395
) -> ExecuteResult:
396+
396397
maybe_row_count = self._local_get_row_count(array_value)
397398
if (maybe_row_count is not None) and (maybe_row_count <= n_rows):
398399
return self.execute(array_value, ordered=True)
@@ -452,7 +453,7 @@ def cached(
452453
# use a heuristic for whether something needs to be cached
453454
if (not force) and self._is_trivially_executable(array_value):
454455
return
455-
elif use_session:
456+
if use_session:
456457
self._cache_with_session_awareness(array_value)
457458
else:
458459
self._cache_with_cluster_cols(array_value, cluster_cols=cluster_cols)
@@ -658,7 +659,7 @@ def _sql_as_cached_temp_table(
658659
def _validate_result_schema(
659660
self,
660661
array_value: bigframes.core.ArrayValue,
661-
bq_schema: list[bigquery.schema.SchemaField],
662+
bq_schema: list[bigquery.SchemaField],
662663
):
663664
actual_schema = tuple(bq_schema)
664665
ibis_schema = bigframes.core.compile.test_only_ibis_inferred_schema(
@@ -667,6 +668,7 @@ def _validate_result_schema(
667668
internal_schema = array_value.schema
668669
if not bigframes.features.PANDAS_VERSIONS.is_arrow_list_dtype_usable:
669670
return
671+
670672
if internal_schema.to_bigquery() != actual_schema:
671673
raise ValueError(
672674
f"This error should only occur while testing. BigFrames internal schema: {internal_schema.to_bigquery()} does not match actual schema: {actual_schema}"

tests/system/small/bigquery/test_json.py

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -134,6 +134,7 @@ def test_json_extract_from_string():
134134
actual.to_pandas(),
135135
expected.to_pandas(),
136136
check_names=False,
137+
check_dtype=False, # json_extract returns string type. While _get_series_from_json gives a JSON series (pa.large_string).
137138
)
138139

139140

@@ -200,3 +201,11 @@ def test_json_extract_string_array_as_float_array_from_array_strings():
200201
def test_json_extract_string_array_w_invalid_series_type():
201202
with pytest.raises(TypeError):
202203
bbq.json_extract_string_array(bpd.Series([1, 2]))
204+
205+
206+
# b/381148539
207+
def test_json_in_struct():
208+
df = bpd.read_gbq(
209+
"SELECT STRUCT(JSON '{\\\"a\\\": 1}' AS data, 1 AS number) as struct_col"
210+
)
211+
assert df["struct_col"].struct.field("data")[0] == '{"a":1}'

0 commit comments

Comments
 (0)