diff --git a/bigframes/bigquery/_operations/json.py b/bigframes/bigquery/_operations/json.py index 152c93186a..843991807e 100644 --- a/bigframes/bigquery/_operations/json.py +++ b/bigframes/bigquery/_operations/json.py @@ -47,7 +47,7 @@ def json_set( >>> s = bpd.read_gbq("SELECT JSON '{\\\"a\\\": 1}' AS data")["data"] >>> bbq.json_set(s, json_path_value_pairs=[("$.a", 100), ("$.b", "hi")]) 0 {"a":100,"b":"hi"} - Name: data, dtype: string + Name: data, dtype: large_string[pyarrow] Args: input (bigframes.series.Series): diff --git a/bigframes/core/__init__.py b/bigframes/core/__init__.py index ac79ec8625..5e3f6df355 100644 --- a/bigframes/core/__init__.py +++ b/bigframes/core/__init__.py @@ -107,7 +107,7 @@ def from_table( raise ValueError("must set at most one of 'offests', 'primary_key'") if any(i.field_type == "JSON" for i in table.schema if i.name in schema.names): warnings.warn( - "Interpreting JSON column(s) as StringDtype. This behavior may change in future versions.", + "Interpreting JSON column(s) as StringDtype and pyarrow.large_string. This behavior may change in future versions.", bigframes.exceptions.PreviewWarning, ) # define data source only for needed columns, this makes row-hashing cheaper diff --git a/bigframes/core/blocks.py b/bigframes/core/blocks.py index d1b2f91d60..ca860612f8 100644 --- a/bigframes/core/blocks.py +++ b/bigframes/core/blocks.py @@ -2564,13 +2564,13 @@ def _get_rows_as_json_values(self) -> Block: ), T1 AS ( SELECT *, - JSON_OBJECT( + TO_JSON_STRING(JSON_OBJECT( "names", [{column_names_csv}], "types", [{column_types_csv}], "values", [{column_references_csv}], "indexlength", {index_columns_count}, "dtype", {pandas_row_dtype} - ) AS {googlesql.identifier(row_json_column_name)} FROM T0 + )) AS {googlesql.identifier(row_json_column_name)} FROM T0 ) SELECT {select_columns_csv} FROM T1 """ diff --git a/bigframes/core/compile/compiled.py b/bigframes/core/compile/compiled.py index bc3f9fffda..d4c814145b 100644 --- a/bigframes/core/compile/compiled.py +++ b/bigframes/core/compile/compiled.py @@ -464,7 +464,7 @@ def _set_or_replace_by_id( builder.columns = [*self.columns, new_value.name(id)] return builder.build() - def _select(self, values: typing.Tuple[ibis_types.Value]) -> UnorderedIR: + def _select(self, values: typing.Tuple[ibis_types.Value, ...]) -> UnorderedIR: builder = self.builder() builder.columns = values return builder.build() @@ -1129,7 +1129,7 @@ def _set_or_replace_by_id(self, id: str, new_value: ibis_types.Value) -> Ordered builder.columns = [*self.columns, new_value.name(id)] return builder.build() - def _select(self, values: typing.Tuple[ibis_types.Value]) -> OrderedIR: + def _select(self, values: typing.Tuple[ibis_types.Value, ...]) -> OrderedIR: """Safely assign by id while maintaining ordering integrity.""" # TODO: Split into explicit set and replace methods ordering_col_ids = set( diff --git a/bigframes/core/compile/compiler.py b/bigframes/core/compile/compiler.py index 295e323843..c28dbdfee0 100644 --- a/bigframes/core/compile/compiler.py +++ b/bigframes/core/compile/compiler.py @@ -59,11 +59,11 @@ def compile_sql( node = self.set_output_names(node, output_ids) if ordered: node, limit = rewrites.pullup_limit_from_slice(node) - return self.compile_ordered_ir(self._preprocess(node)).to_sql( - ordered=True, limit=limit - ) + ir = self.compile_ordered_ir(self._preprocess(node)) + return ir.to_sql(ordered=True, limit=limit) else: - return self.compile_unordered_ir(self._preprocess(node)).to_sql() + ir = self.compile_unordered_ir(self._preprocess(node)) # type: ignore + return ir.to_sql() def compile_peek_sql(self, node: nodes.BigFrameNode, n_rows: int) -> str: return self.compile_unordered_ir(self._preprocess(node)).peek_sql(n_rows) diff --git a/bigframes/core/compile/ibis_types.py b/bigframes/core/compile/ibis_types.py index 27dfee8b0d..544af69091 100644 --- a/bigframes/core/compile/ibis_types.py +++ b/bigframes/core/compile/ibis_types.py @@ -24,7 +24,6 @@ from bigframes_vendored.ibis.expr.datatypes.core import ( dtype as python_type_to_bigquery_type, ) -import bigframes_vendored.ibis.expr.operations as ibis_ops import bigframes_vendored.ibis.expr.types as ibis_types import geopandas as gpd # type: ignore import google.cloud.bigquery as bigquery @@ -46,6 +45,7 @@ ibis_dtypes.Binary, ibis_dtypes.Decimal, ibis_dtypes.GeoSpatial, + ibis_dtypes.JSON, ] @@ -74,6 +74,7 @@ ibis_dtypes.GeoSpatial(geotype="geography", srid=4326, nullable=True), gpd.array.GeometryDtype(), ), + (ibis_dtypes.json, pd.ArrowDtype(pa.large_string())), ) BIGFRAMES_TO_IBIS: Dict[bigframes.dtypes.Dtype, ibis_dtypes.DataType] = { @@ -219,12 +220,6 @@ def ibis_value_to_canonical_type(value: ibis_types.Value) -> ibis_types.Value: """ ibis_type = value.type() name = value.get_name() - if ibis_type.is_json(): - value = ibis_ops.ToJsonString(value).to_expr() # type: ignore - value = ( - value.case().when("null", bigframes_vendored.ibis.null()).else_(value).end() - ) - return value.name(name) # Allow REQUIRED fields to be joined with NULLABLE fields. nullable_type = ibis_type.copy(nullable=True) return value.cast(nullable_type).name(name) @@ -314,7 +309,7 @@ def ibis_dtype_to_bigframes_dtype( "Interpreting JSON as string. This behavior may change in future versions.", bigframes.exceptions.PreviewWarning, ) - return bigframes.dtypes.STRING_DTYPE + return bigframes.dtypes.JSON_DTYPE if ibis_dtype in IBIS_TO_BIGFRAMES: return IBIS_TO_BIGFRAMES[ibis_dtype] diff --git a/bigframes/core/compile/scalar_op_compiler.py b/bigframes/core/compile/scalar_op_compiler.py index 23b3d1b906..e1aeb5b2b9 100644 --- a/bigframes/core/compile/scalar_op_compiler.py +++ b/bigframes/core/compile/scalar_op_compiler.py @@ -1181,7 +1181,10 @@ def json_set_op_impl(x: ibis_types.Value, y: ibis_types.Value, op: ops.JSONSet): @scalar_op_compiler.register_unary_op(ops.JSONExtract, pass_op=True) def json_extract_op_impl(x: ibis_types.Value, op: ops.JSONExtract): - return json_extract(json_obj=x, json_path=op.json_path) + if x.type().is_json(): + return json_extract(json_obj=x, json_path=op.json_path) + # json string + return json_extract_string(json_obj=x, json_path=op.json_path) @scalar_op_compiler.register_unary_op(ops.JSONExtractArray, pass_op=True) @@ -1845,7 +1848,7 @@ def float_ceil(a: float) -> float: @ibis_udf.scalar.builtin(name="parse_json") -def parse_json(a: str) -> ibis_dtypes.JSON: # type: ignore[empty-body] +def parse_json(json_str: str) -> ibis_dtypes.JSON: # type: ignore[empty-body] """Converts a JSON-formatted STRING value to a JSON value.""" @@ -1860,7 +1863,14 @@ def json_set( # type: ignore[empty-body] def json_extract( # type: ignore[empty-body] json_obj: ibis_dtypes.JSON, json_path: ibis_dtypes.String ) -> ibis_dtypes.JSON: - """Extracts a JSON value and converts it to a SQL JSON-formatted STRING or JSON value.""" + """Extracts a JSON value and converts it to a JSON value.""" + + +@ibis_udf.scalar.builtin(name="json_extract") +def json_extract_string( # type: ignore[empty-body] + json_obj: ibis_dtypes.String, json_path: ibis_dtypes.String +) -> ibis_dtypes.String: + """Extracts a JSON SRING value and converts it to a SQL JSON-formatted STRING.""" @ibis_udf.scalar.builtin(name="json_extract_array") diff --git a/bigframes/dtypes.py b/bigframes/dtypes.py index 69faa056fe..8581e5ff84 100644 --- a/bigframes/dtypes.py +++ b/bigframes/dtypes.py @@ -55,6 +55,8 @@ BIGNUMERIC_DTYPE = pd.ArrowDtype(pa.decimal256(76, 38)) # No arrow equivalent GEO_DTYPE = gpd.array.GeometryDtype() +# JSON +JSON_DTYPE = pd.ArrowDtype(pa.large_string()) # Used when storing Null expressions DEFAULT_DTYPE = FLOAT_DTYPE @@ -132,6 +134,13 @@ class SimpleDtypeInfo: orderable=True, clusterable=True, ), + SimpleDtypeInfo( + dtype=JSON_DTYPE, + arrow_dtype=pa.large_string(), + type_kind=("JSON",), + orderable=False, + clusterable=False, + ), SimpleDtypeInfo( dtype=DATE_DTYPE, arrow_dtype=pa.date32(), @@ -281,7 +290,7 @@ def is_struct_like(type_: ExpressionType) -> bool: def is_json_like(type_: ExpressionType) -> bool: # TODO: Add JSON type support - return type_ == STRING_DTYPE + return type_ == JSON_DTYPE or type_ == STRING_DTYPE # Including JSON string def is_json_encoding_type(type_: ExpressionType) -> bool: @@ -455,8 +464,6 @@ def infer_literal_arrow_type(literal) -> typing.Optional[pa.DataType]: return bigframes_dtype_to_arrow_dtype(infer_literal_type(literal)) -# Don't have dtype for json, so just end up interpreting as STRING -_REMAPPED_TYPEKINDS = {"JSON": "STRING"} _TK_TO_BIGFRAMES = { type_kind: mapping.dtype for mapping in SIMPLE_TYPES @@ -480,16 +487,13 @@ def convert_schema_field( pa_struct = pa.struct(fields) pa_type = pa.list_(pa_struct) if is_repeated else pa_struct return field.name, pd.ArrowDtype(pa_type) - elif ( - field.field_type in _TK_TO_BIGFRAMES or field.field_type in _REMAPPED_TYPEKINDS - ): - singular_type = _TK_TO_BIGFRAMES[ - _REMAPPED_TYPEKINDS.get(field.field_type, field.field_type) - ] + elif field.field_type in _TK_TO_BIGFRAMES: if is_repeated: - pa_type = pa.list_(bigframes_dtype_to_arrow_dtype(singular_type)) + pa_type = pa.list_( + bigframes_dtype_to_arrow_dtype(_TK_TO_BIGFRAMES[field.field_type]) + ) return field.name, pd.ArrowDtype(pa_type) - return field.name, singular_type + return field.name, _TK_TO_BIGFRAMES[field.field_type] else: raise ValueError(f"Cannot handle type: {field.field_type}") @@ -639,7 +643,7 @@ def can_coerce(source_type: ExpressionType, target_type: ExpressionType) -> bool return True # None can be coerced to any supported type else: return (source_type == STRING_DTYPE) and ( - target_type in TEMPORAL_BIGFRAMES_TYPES + target_type in TEMPORAL_BIGFRAMES_TYPES + [JSON_DTYPE] ) diff --git a/bigframes/session/executor.py b/bigframes/session/executor.py index d19ec23501..1c6c8d46a5 100644 --- a/bigframes/session/executor.py +++ b/bigframes/session/executor.py @@ -393,6 +393,7 @@ def peek( def head( self, array_value: bigframes.core.ArrayValue, n_rows: int ) -> ExecuteResult: + maybe_row_count = self._local_get_row_count(array_value) if (maybe_row_count is not None) and (maybe_row_count <= n_rows): return self.execute(array_value, ordered=True) @@ -452,7 +453,7 @@ def cached( # use a heuristic for whether something needs to be cached if (not force) and self._is_trivially_executable(array_value): return - elif use_session: + if use_session: self._cache_with_session_awareness(array_value) else: self._cache_with_cluster_cols(array_value, cluster_cols=cluster_cols) @@ -656,7 +657,7 @@ def _sql_as_cached_temp_table( def _validate_result_schema( self, array_value: bigframes.core.ArrayValue, - bq_schema: list[bigquery.schema.SchemaField], + bq_schema: list[bigquery.SchemaField], ): actual_schema = tuple(bq_schema) ibis_schema = bigframes.core.compile.test_only_ibis_inferred_schema( @@ -665,6 +666,7 @@ def _validate_result_schema( internal_schema = array_value.schema if not bigframes.features.PANDAS_VERSIONS.is_arrow_list_dtype_usable: return + if internal_schema.to_bigquery() != actual_schema: raise ValueError( f"This error should only occur while testing. BigFrames internal schema: {internal_schema.to_bigquery()} does not match actual schema: {actual_schema}" diff --git a/tests/system/small/bigquery/test_json.py b/tests/system/small/bigquery/test_json.py index 75b9345107..3096897c80 100644 --- a/tests/system/small/bigquery/test_json.py +++ b/tests/system/small/bigquery/test_json.py @@ -134,6 +134,7 @@ def test_json_extract_from_string(): actual.to_pandas(), expected.to_pandas(), check_names=False, + check_dtype=False, # json_extract returns string type. While _get_series_from_json gives a JSON series (pa.large_string). ) @@ -200,3 +201,11 @@ def test_json_extract_string_array_as_float_array_from_array_strings(): def test_json_extract_string_array_w_invalid_series_type(): with pytest.raises(TypeError): bbq.json_extract_string_array(bpd.Series([1, 2])) + + +# b/381148539 +def test_json_in_struct(): + df = bpd.read_gbq( + "SELECT STRUCT(JSON '{\\\"a\\\": 1}' AS data, 1 AS number) as struct_col" + ) + assert df["struct_col"].struct.field("data")[0] == '{"a":1}' diff --git a/tests/system/small/test_dataframe_io.py b/tests/system/small/test_dataframe_io.py index ab1fdceae5..848e21f6bd 100644 --- a/tests/system/small/test_dataframe_io.py +++ b/tests/system/small/test_dataframe_io.py @@ -259,7 +259,7 @@ def test_load_json(session): { "json_column": ['{"bar":true,"foo":10}'], }, - dtype=pd.StringDtype(storage="pyarrow"), + dtype=pd.ArrowDtype(pa.large_string()), ) expected.index = expected.index.astype("Int64") pd.testing.assert_series_equal(result.dtypes, expected.dtypes) diff --git a/tests/system/small/test_series.py b/tests/system/small/test_series.py index 692b221a19..670828f616 100644 --- a/tests/system/small/test_series.py +++ b/tests/system/small/test_series.py @@ -281,7 +281,7 @@ def test_get_column(scalars_dfs, col_name, expected_dtype): def test_get_column_w_json(json_df, json_pandas_df): series = json_df["json_col"] series_pandas = series.to_pandas() - assert series.dtype == pd.StringDtype(storage="pyarrow") + assert series.dtype == pd.ArrowDtype(pa.large_string()) assert series_pandas.shape[0] == json_pandas_df.shape[0]