From e96db9e8d9718ce27430bdd32f2494bc9df23fb1 Mon Sep 17 00:00:00 2001 From: Garrett Wu Date: Wed, 4 Dec 2024 01:39:31 +0000 Subject: [PATCH 01/11] fix: json in struct destination type --- bigframes/core/__init__.py | 2 +- bigframes/dtypes.py | 31 +++++++++++++++--------- bigframes/session/executor.py | 8 +++++- tests/system/small/bigquery/test_json.py | 7 ++++++ 4 files changed, 35 insertions(+), 13 deletions(-) diff --git a/bigframes/core/__init__.py b/bigframes/core/__init__.py index 3b1bf48558..b22fef9fac 100644 --- a/bigframes/core/__init__.py +++ b/bigframes/core/__init__.py @@ -107,7 +107,7 @@ def from_table( raise ValueError("must set at most one of 'offests', 'primary_key'") if any(i.field_type == "JSON" for i in table.schema if i.name in schema.names): warnings.warn( - "Interpreting JSON column(s) as StringDtype. This behavior may change in future versions.", + "Interpreting JSON column(s) as StringDtype and pyarrow.large_string. This behavior may change in future versions.", bigframes.exceptions.PreviewWarning, ) # define data source only for needed columns, this makes row-hashing cheaper diff --git a/bigframes/dtypes.py b/bigframes/dtypes.py index c71531f9f3..340095da78 100644 --- a/bigframes/dtypes.py +++ b/bigframes/dtypes.py @@ -55,6 +55,12 @@ BIGNUMERIC_DTYPE = pd.ArrowDtype(pa.decimal256(76, 38)) # No arrow equivalent GEO_DTYPE = gpd.array.GeometryDtype() +# JSON +JSON_DTYPE = pd.StringDtype(storage="pyarrow") +# Monkey patching as JSON workaround +JSON_DTYPE.is_json = True # type: ignore +# To be updated when pa.json is supported. Using large_string as a walkaround. +JSON_PA_DTYPE = pa.large_string() # Used when storing Null expressions DEFAULT_DTYPE = FLOAT_DTYPE @@ -272,7 +278,7 @@ def is_struct_like(type_: ExpressionType) -> bool: def is_json_like(type_: ExpressionType) -> bool: # TODO: Add JSON type support - return type_ == STRING_DTYPE + return type_ == JSON_DTYPE # Including JSON string def is_json_encoding_type(type_: ExpressionType) -> bool: @@ -356,6 +362,8 @@ def dtype_for_etype(etype: ExpressionType) -> Dtype: def arrow_dtype_to_bigframes_dtype(arrow_dtype: pa.DataType) -> Dtype: + if arrow_dtype == JSON_PA_DTYPE: + return JSON_DTYPE if arrow_dtype in _ARROW_TO_BIGFRAMES: return _ARROW_TO_BIGFRAMES[arrow_dtype] if pa.types.is_list(arrow_dtype): @@ -382,6 +390,8 @@ def arrow_dtype_to_bigframes_dtype(arrow_dtype: pa.DataType) -> Dtype: def bigframes_dtype_to_arrow_dtype( bigframes_dtype: Dtype, ) -> pa.DataType: + if getattr(bigframes_dtype, "is_json", False): + return JSON_PA_DTYPE if bigframes_dtype in _BIGFRAMES_TO_ARROW: return _BIGFRAMES_TO_ARROW[bigframes_dtype] if isinstance(bigframes_dtype, pd.ArrowDtype): @@ -446,8 +456,6 @@ def infer_literal_arrow_type(literal) -> typing.Optional[pa.DataType]: return bigframes_dtype_to_arrow_dtype(infer_literal_type(literal)) -# Don't have dtype for json, so just end up interpreting as STRING -_REMAPPED_TYPEKINDS = {"JSON": "STRING"} _TK_TO_BIGFRAMES = { type_kind: mapping.dtype for mapping in SIMPLE_TYPES @@ -471,16 +479,15 @@ def convert_schema_field( pa_struct = pa.struct(fields) pa_type = pa.list_(pa_struct) if is_repeated else pa_struct return field.name, pd.ArrowDtype(pa_type) - elif ( - field.field_type in _TK_TO_BIGFRAMES or field.field_type in _REMAPPED_TYPEKINDS - ): - singular_type = _TK_TO_BIGFRAMES[ - _REMAPPED_TYPEKINDS.get(field.field_type, field.field_type) - ] + elif field.field_type == "JSON": + return field.name, JSON_DTYPE + elif field.field_type in _TK_TO_BIGFRAMES: if is_repeated: - pa_type = pa.list_(bigframes_dtype_to_arrow_dtype(singular_type)) + pa_type = pa.list_( + bigframes_dtype_to_arrow_dtype(_TK_TO_BIGFRAMES[field.field_type]) + ) return field.name, pd.ArrowDtype(pa_type) - return field.name, singular_type + return field.name, _TK_TO_BIGFRAMES[field.field_type] else: raise ValueError(f"Cannot handle type: {field.field_type}") @@ -489,6 +496,8 @@ def convert_to_schema_field( name: str, bigframes_dtype: Dtype, ) -> google.cloud.bigquery.SchemaField: + if getattr(bigframes_dtype, "is_json", False): + return google.cloud.bigquery.SchemaField(name, "JSON") if bigframes_dtype in _BIGFRAMES_TO_TK: return google.cloud.bigquery.SchemaField( name, _BIGFRAMES_TO_TK[bigframes_dtype] diff --git a/bigframes/session/executor.py b/bigframes/session/executor.py index d19ec23501..3387bb1134 100644 --- a/bigframes/session/executor.py +++ b/bigframes/session/executor.py @@ -656,7 +656,7 @@ def _sql_as_cached_temp_table( def _validate_result_schema( self, array_value: bigframes.core.ArrayValue, - bq_schema: list[bigquery.schema.SchemaField], + bq_schema: list[bigquery.SchemaField], ): actual_schema = tuple(bq_schema) ibis_schema = bigframes.core.compile.test_only_ibis_inferred_schema( @@ -665,6 +665,12 @@ def _validate_result_schema( internal_schema = array_value.schema if not bigframes.features.PANDAS_VERSIONS.is_arrow_list_dtype_usable: return + + # JSON acutual schema can be STRING. + for schema_field in internal_schema.to_bigquery(): + if schema_field.field_type == "JSON": + return + if internal_schema.to_bigquery() != actual_schema: raise ValueError( f"This error should only occur while testing. BigFrames internal schema: {internal_schema.to_bigquery()} does not match actual schema: {actual_schema}" diff --git a/tests/system/small/bigquery/test_json.py b/tests/system/small/bigquery/test_json.py index 75b9345107..fc33e2c413 100644 --- a/tests/system/small/bigquery/test_json.py +++ b/tests/system/small/bigquery/test_json.py @@ -200,3 +200,10 @@ def test_json_extract_string_array_as_float_array_from_array_strings(): def test_json_extract_string_array_w_invalid_series_type(): with pytest.raises(TypeError): bbq.json_extract_string_array(bpd.Series([1, 2])) + + +def test_json_in_struct(): + df = bpd.read_gbq( + "SELECT STRUCT(JSON '{\\\"a\\\": 1}' AS data, 1 AS number) as struct_col" + ) + assert df["struct_col"].struct.field("data")[0] == '{"a":1}' From e5ed6c5015eca62fb6a7ad106c30e5feed27e2b1 Mon Sep 17 00:00:00 2001 From: Garrett Wu Date: Thu, 5 Dec 2024 21:59:22 +0000 Subject: [PATCH 02/11] patch output json_str to json --- bigframes/core/blocks.py | 1 + bigframes/core/compile/compiler.py | 27 +++++++++++++++++--- bigframes/core/compile/scalar_op_compiler.py | 7 ++++- bigframes/operations/__init__.py | 4 +++ bigframes/operations/type.py | 5 ++++ bigframes/session/executor.py | 3 ++- 6 files changed, 41 insertions(+), 6 deletions(-) diff --git a/bigframes/core/blocks.py b/bigframes/core/blocks.py index 574bed00eb..27287ca231 100644 --- a/bigframes/core/blocks.py +++ b/bigframes/core/blocks.py @@ -1489,6 +1489,7 @@ def retrieve_repr_request_results( Returns a tuple of the dataframe and the overall number of rows of the query. """ + # self.apply_unary_op("ml_generate_embedding_statistics", ops.parse_json) # head caches full underlying expression, so row_count will be free after head_result = self.session._executor.head(self.expr, max_results) diff --git a/bigframes/core/compile/compiler.py b/bigframes/core/compile/compiler.py index 295e323843..d87d285dc3 100644 --- a/bigframes/core/compile/compiler.py +++ b/bigframes/core/compile/compiler.py @@ -59,11 +59,30 @@ def compile_sql( node = self.set_output_names(node, output_ids) if ordered: node, limit = rewrites.pullup_limit_from_slice(node) - return self.compile_ordered_ir(self._preprocess(node)).to_sql( - ordered=True, limit=limit - ) + ir = self.compile_ordered_ir(self._preprocess(node)) + ir = self._patch_json_type(node, ir) + return ir.to_sql(ordered=True, limit=limit) else: - return self.compile_unordered_ir(self._preprocess(node)).to_sql() + ir = self.compile_unordered_ir(self._preprocess(node)) # type: ignore + ir = self._patch_json_type(node, ir) + return ir.to_sql() + + def _patch_json_type( + self, node: nodes.BigFrameNode, ir: compiled.OrderedIR | compiled.UnorderedIR + ): + # Patch back to json type + json_col_ids = set() + for schema in node.schema.items: + if getattr(schema.dtype, "is_json", False): + json_col_ids.add(schema.column) + value_cols = tuple( + compile_scalar.parse_json(value).name(value.get_name()) + if (value.type().is_string() and value.get_name() in json_col_ids) + else value + for value in ir.columns + ) + + return ir._select(value_cols) def compile_peek_sql(self, node: nodes.BigFrameNode, n_rows: int) -> str: return self.compile_unordered_ir(self._preprocess(node)).peek_sql(n_rows) diff --git a/bigframes/core/compile/scalar_op_compiler.py b/bigframes/core/compile/scalar_op_compiler.py index 23b3d1b906..a72353203b 100644 --- a/bigframes/core/compile/scalar_op_compiler.py +++ b/bigframes/core/compile/scalar_op_compiler.py @@ -1196,6 +1196,11 @@ def json_extract_string_array_op_impl( return json_extract_string_array(json_obj=x, json_path=op.json_path) +@scalar_op_compiler.register_unary_op(ops.parse_json_op) +def parse_json_impl(x: ibis_types.Value): + return parse_json(json_str=x) + + # Blob Ops @scalar_op_compiler.register_unary_op(ops.obj_fetch_metadata_op) def obj_fetch_metadata_op_impl(x: ibis_types.Value): @@ -1845,7 +1850,7 @@ def float_ceil(a: float) -> float: @ibis_udf.scalar.builtin(name="parse_json") -def parse_json(a: str) -> ibis_dtypes.JSON: # type: ignore[empty-body] +def parse_json(json_str: str) -> ibis_dtypes.JSON: # type: ignore[empty-body] """Converts a JSON-formatted STRING value to a JSON value.""" diff --git a/bigframes/operations/__init__.py b/bigframes/operations/__init__.py index e7c65c6ead..b773a41169 100644 --- a/bigframes/operations/__init__.py +++ b/bigframes/operations/__init__.py @@ -727,6 +727,10 @@ def output_type(self, *input_types): ) +parse_json_op = create_unary_op( + name="parse_json", type_signature=op_typing.STRING_TO_JSON +) + # Binary Ops fillna_op = create_binary_op(name="fillna", type_signature=op_typing.COERCE) maximum_op = create_binary_op(name="maximum", type_signature=op_typing.COERCE) diff --git a/bigframes/operations/type.py b/bigframes/operations/type.py index 441134aff5..593ba4db03 100644 --- a/bigframes/operations/type.py +++ b/bigframes/operations/type.py @@ -225,6 +225,11 @@ def output_type( bigframes.dtypes.BOOL_DTYPE, description="string-like", ) +STRING_TO_JSON = FixedOutputType( + bigframes.dtypes.is_string_like, + bigframes.dtypes.JSON_DTYPE, + description="string-like", +) DATELIKE_ACCESSOR = FixedOutputType( bigframes.dtypes.is_date_like, bigframes.dtypes.INT_DTYPE, description="date-like" ) diff --git a/bigframes/session/executor.py b/bigframes/session/executor.py index 3387bb1134..0390d82c34 100644 --- a/bigframes/session/executor.py +++ b/bigframes/session/executor.py @@ -393,6 +393,7 @@ def peek( def head( self, array_value: bigframes.core.ArrayValue, n_rows: int ) -> ExecuteResult: + maybe_row_count = self._local_get_row_count(array_value) if (maybe_row_count is not None) and (maybe_row_count <= n_rows): return self.execute(array_value, ordered=True) @@ -452,7 +453,7 @@ def cached( # use a heuristic for whether something needs to be cached if (not force) and self._is_trivially_executable(array_value): return - elif use_session: + if use_session: self._cache_with_session_awareness(array_value) else: self._cache_with_cluster_cols(array_value, cluster_cols=cluster_cols) From d7b2be1a12dc7a7e05fd84ce66ba0517fb2385b1 Mon Sep 17 00:00:00 2001 From: Garrett Wu Date: Fri, 6 Dec 2024 01:16:35 +0000 Subject: [PATCH 03/11] fix type hint --- bigframes/core/compile/compiler.py | 5 ++++- 1 file changed, 4 insertions(+), 1 deletion(-) diff --git a/bigframes/core/compile/compiler.py b/bigframes/core/compile/compiler.py index d87d285dc3..40e8d252af 100644 --- a/bigframes/core/compile/compiler.py +++ b/bigframes/core/compile/compiler.py @@ -76,7 +76,10 @@ def _patch_json_type( if getattr(schema.dtype, "is_json", False): json_col_ids.add(schema.column) value_cols = tuple( - compile_scalar.parse_json(value).name(value.get_name()) + typing.cast( + ibis_types.Value, + compile_scalar.parse_json(value).name(value.get_name()), + ) if (value.type().is_string() and value.get_name() in json_col_ids) else value for value in ir.columns From c9b7410af09f23892a23d15779d62ae120a6f7ff Mon Sep 17 00:00:00 2001 From: Garrett Wu Date: Fri, 6 Dec 2024 01:29:23 +0000 Subject: [PATCH 04/11] fix type hint --- bigframes/core/compile/compiled.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/bigframes/core/compile/compiled.py b/bigframes/core/compile/compiled.py index bc3f9fffda..d4c814145b 100644 --- a/bigframes/core/compile/compiled.py +++ b/bigframes/core/compile/compiled.py @@ -464,7 +464,7 @@ def _set_or_replace_by_id( builder.columns = [*self.columns, new_value.name(id)] return builder.build() - def _select(self, values: typing.Tuple[ibis_types.Value]) -> UnorderedIR: + def _select(self, values: typing.Tuple[ibis_types.Value, ...]) -> UnorderedIR: builder = self.builder() builder.columns = values return builder.build() @@ -1129,7 +1129,7 @@ def _set_or_replace_by_id(self, id: str, new_value: ibis_types.Value) -> Ordered builder.columns = [*self.columns, new_value.name(id)] return builder.build() - def _select(self, values: typing.Tuple[ibis_types.Value]) -> OrderedIR: + def _select(self, values: typing.Tuple[ibis_types.Value, ...]) -> OrderedIR: """Safely assign by id while maintaining ordering integrity.""" # TODO: Split into explicit set and replace methods ordering_col_ids = set( From 02948379e039c3508ed1f42adcae11ddae4066f8 Mon Sep 17 00:00:00 2001 From: Garrett Wu Date: Fri, 6 Dec 2024 18:37:41 +0000 Subject: [PATCH 05/11] remove draft --- bigframes/core/blocks.py | 1 - bigframes/core/compile/scalar_op_compiler.py | 5 ----- bigframes/operations/__init__.py | 4 ---- bigframes/operations/type.py | 5 ----- tests/system/small/bigquery/test_json.py | 1 + 5 files changed, 1 insertion(+), 15 deletions(-) diff --git a/bigframes/core/blocks.py b/bigframes/core/blocks.py index 27287ca231..574bed00eb 100644 --- a/bigframes/core/blocks.py +++ b/bigframes/core/blocks.py @@ -1489,7 +1489,6 @@ def retrieve_repr_request_results( Returns a tuple of the dataframe and the overall number of rows of the query. """ - # self.apply_unary_op("ml_generate_embedding_statistics", ops.parse_json) # head caches full underlying expression, so row_count will be free after head_result = self.session._executor.head(self.expr, max_results) diff --git a/bigframes/core/compile/scalar_op_compiler.py b/bigframes/core/compile/scalar_op_compiler.py index a72353203b..9fb8a2f69f 100644 --- a/bigframes/core/compile/scalar_op_compiler.py +++ b/bigframes/core/compile/scalar_op_compiler.py @@ -1196,11 +1196,6 @@ def json_extract_string_array_op_impl( return json_extract_string_array(json_obj=x, json_path=op.json_path) -@scalar_op_compiler.register_unary_op(ops.parse_json_op) -def parse_json_impl(x: ibis_types.Value): - return parse_json(json_str=x) - - # Blob Ops @scalar_op_compiler.register_unary_op(ops.obj_fetch_metadata_op) def obj_fetch_metadata_op_impl(x: ibis_types.Value): diff --git a/bigframes/operations/__init__.py b/bigframes/operations/__init__.py index b773a41169..e7c65c6ead 100644 --- a/bigframes/operations/__init__.py +++ b/bigframes/operations/__init__.py @@ -727,10 +727,6 @@ def output_type(self, *input_types): ) -parse_json_op = create_unary_op( - name="parse_json", type_signature=op_typing.STRING_TO_JSON -) - # Binary Ops fillna_op = create_binary_op(name="fillna", type_signature=op_typing.COERCE) maximum_op = create_binary_op(name="maximum", type_signature=op_typing.COERCE) diff --git a/bigframes/operations/type.py b/bigframes/operations/type.py index 593ba4db03..441134aff5 100644 --- a/bigframes/operations/type.py +++ b/bigframes/operations/type.py @@ -225,11 +225,6 @@ def output_type( bigframes.dtypes.BOOL_DTYPE, description="string-like", ) -STRING_TO_JSON = FixedOutputType( - bigframes.dtypes.is_string_like, - bigframes.dtypes.JSON_DTYPE, - description="string-like", -) DATELIKE_ACCESSOR = FixedOutputType( bigframes.dtypes.is_date_like, bigframes.dtypes.INT_DTYPE, description="date-like" ) diff --git a/tests/system/small/bigquery/test_json.py b/tests/system/small/bigquery/test_json.py index fc33e2c413..95cef3a5d8 100644 --- a/tests/system/small/bigquery/test_json.py +++ b/tests/system/small/bigquery/test_json.py @@ -202,6 +202,7 @@ def test_json_extract_string_array_w_invalid_series_type(): bbq.json_extract_string_array(bpd.Series([1, 2])) +# b/381148539 def test_json_in_struct(): df = bpd.read_gbq( "SELECT STRUCT(JSON '{\\\"a\\\": 1}' AS data, 1 AS number) as struct_col" From ec43920b7f876891d3a086a9bdc9a4af3eec6c9e Mon Sep 17 00:00:00 2001 From: Garrett Wu Date: Tue, 10 Dec 2024 03:12:59 +0000 Subject: [PATCH 06/11] use pd.ArrowDtype(pa.large_string) as bf dtype --- bigframes/core/compile/compiler.py | 6 ++++-- bigframes/core/compile/ibis_types.py | 4 +++- bigframes/dtypes.py | 23 +++++++++-------------- bigframes/session/executor.py | 5 ++++- tests/system/small/bigquery/test_json.py | 1 + 5 files changed, 21 insertions(+), 18 deletions(-) diff --git a/bigframes/core/compile/compiler.py b/bigframes/core/compile/compiler.py index 40e8d252af..0a501f1df0 100644 --- a/bigframes/core/compile/compiler.py +++ b/bigframes/core/compile/compiler.py @@ -70,10 +70,12 @@ def compile_sql( def _patch_json_type( self, node: nodes.BigFrameNode, ir: compiled.OrderedIR | compiled.UnorderedIR ): - # Patch back to json type + # Patch back to json type by applying parse_json on json_str columns + import bigframes.dtypes + json_col_ids = set() for schema in node.schema.items: - if getattr(schema.dtype, "is_json", False): + if schema.dtype == bigframes.dtypes.JSON_DTYPE: json_col_ids.add(schema.column) value_cols = tuple( typing.cast( diff --git a/bigframes/core/compile/ibis_types.py b/bigframes/core/compile/ibis_types.py index 27dfee8b0d..d763fc6017 100644 --- a/bigframes/core/compile/ibis_types.py +++ b/bigframes/core/compile/ibis_types.py @@ -46,6 +46,7 @@ ibis_dtypes.Binary, ibis_dtypes.Decimal, ibis_dtypes.GeoSpatial, + ibis_dtypes.JSON, ] @@ -74,6 +75,7 @@ ibis_dtypes.GeoSpatial(geotype="geography", srid=4326, nullable=True), gpd.array.GeometryDtype(), ), + (ibis_dtypes.json, pd.ArrowDtype(pa.large_string())), ) BIGFRAMES_TO_IBIS: Dict[bigframes.dtypes.Dtype, ibis_dtypes.DataType] = { @@ -314,7 +316,7 @@ def ibis_dtype_to_bigframes_dtype( "Interpreting JSON as string. This behavior may change in future versions.", bigframes.exceptions.PreviewWarning, ) - return bigframes.dtypes.STRING_DTYPE + return bigframes.dtypes.JSON_DTYPE if ibis_dtype in IBIS_TO_BIGFRAMES: return IBIS_TO_BIGFRAMES[ibis_dtype] diff --git a/bigframes/dtypes.py b/bigframes/dtypes.py index 340095da78..6b9b7da565 100644 --- a/bigframes/dtypes.py +++ b/bigframes/dtypes.py @@ -56,11 +56,7 @@ # No arrow equivalent GEO_DTYPE = gpd.array.GeometryDtype() # JSON -JSON_DTYPE = pd.StringDtype(storage="pyarrow") -# Monkey patching as JSON workaround -JSON_DTYPE.is_json = True # type: ignore -# To be updated when pa.json is supported. Using large_string as a walkaround. -JSON_PA_DTYPE = pa.large_string() +JSON_DTYPE = pd.ArrowDtype(pa.large_string()) # Used when storing Null expressions DEFAULT_DTYPE = FLOAT_DTYPE @@ -138,6 +134,13 @@ class SimpleDtypeInfo: orderable=True, clusterable=True, ), + SimpleDtypeInfo( + dtype=JSON_DTYPE, + arrow_dtype=pa.large_string(), + type_kind=("JSON",), + orderable=False, + clusterable=False, + ), SimpleDtypeInfo( dtype=DATE_DTYPE, arrow_dtype=pa.date32(), @@ -278,7 +281,7 @@ def is_struct_like(type_: ExpressionType) -> bool: def is_json_like(type_: ExpressionType) -> bool: # TODO: Add JSON type support - return type_ == JSON_DTYPE # Including JSON string + return type_ == JSON_DTYPE or type_ == STRING_DTYPE # Including JSON string def is_json_encoding_type(type_: ExpressionType) -> bool: @@ -362,8 +365,6 @@ def dtype_for_etype(etype: ExpressionType) -> Dtype: def arrow_dtype_to_bigframes_dtype(arrow_dtype: pa.DataType) -> Dtype: - if arrow_dtype == JSON_PA_DTYPE: - return JSON_DTYPE if arrow_dtype in _ARROW_TO_BIGFRAMES: return _ARROW_TO_BIGFRAMES[arrow_dtype] if pa.types.is_list(arrow_dtype): @@ -390,8 +391,6 @@ def arrow_dtype_to_bigframes_dtype(arrow_dtype: pa.DataType) -> Dtype: def bigframes_dtype_to_arrow_dtype( bigframes_dtype: Dtype, ) -> pa.DataType: - if getattr(bigframes_dtype, "is_json", False): - return JSON_PA_DTYPE if bigframes_dtype in _BIGFRAMES_TO_ARROW: return _BIGFRAMES_TO_ARROW[bigframes_dtype] if isinstance(bigframes_dtype, pd.ArrowDtype): @@ -479,8 +478,6 @@ def convert_schema_field( pa_struct = pa.struct(fields) pa_type = pa.list_(pa_struct) if is_repeated else pa_struct return field.name, pd.ArrowDtype(pa_type) - elif field.field_type == "JSON": - return field.name, JSON_DTYPE elif field.field_type in _TK_TO_BIGFRAMES: if is_repeated: pa_type = pa.list_( @@ -496,8 +493,6 @@ def convert_to_schema_field( name: str, bigframes_dtype: Dtype, ) -> google.cloud.bigquery.SchemaField: - if getattr(bigframes_dtype, "is_json", False): - return google.cloud.bigquery.SchemaField(name, "JSON") if bigframes_dtype in _BIGFRAMES_TO_TK: return google.cloud.bigquery.SchemaField( name, _BIGFRAMES_TO_TK[bigframes_dtype] diff --git a/bigframes/session/executor.py b/bigframes/session/executor.py index 0390d82c34..dc349c9d6b 100644 --- a/bigframes/session/executor.py +++ b/bigframes/session/executor.py @@ -667,10 +667,13 @@ def _validate_result_schema( if not bigframes.features.PANDAS_VERSIONS.is_arrow_list_dtype_usable: return - # JSON acutual schema can be STRING. + # Since we are patching JSON to/from STRING in ibis compiler, the schemas aren't match for schema_field in internal_schema.to_bigquery(): if schema_field.field_type == "JSON": return + for schema_field in ibis_schema.to_bigquery(): + if schema_field.field_type == "JSON": + return if internal_schema.to_bigquery() != actual_schema: raise ValueError( diff --git a/tests/system/small/bigquery/test_json.py b/tests/system/small/bigquery/test_json.py index 95cef3a5d8..3096897c80 100644 --- a/tests/system/small/bigquery/test_json.py +++ b/tests/system/small/bigquery/test_json.py @@ -134,6 +134,7 @@ def test_json_extract_from_string(): actual.to_pandas(), expected.to_pandas(), check_names=False, + check_dtype=False, # json_extract returns string type. While _get_series_from_json gives a JSON series (pa.large_string). ) From e426ab6b937d5dc04c910b6f40a817f3ac3d8584 Mon Sep 17 00:00:00 2001 From: Garrett Wu Date: Tue, 10 Dec 2024 03:13:44 +0000 Subject: [PATCH 07/11] wording --- bigframes/session/executor.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/bigframes/session/executor.py b/bigframes/session/executor.py index dc349c9d6b..c085b04e71 100644 --- a/bigframes/session/executor.py +++ b/bigframes/session/executor.py @@ -667,7 +667,7 @@ def _validate_result_schema( if not bigframes.features.PANDAS_VERSIONS.is_arrow_list_dtype_usable: return - # Since we are patching JSON to/from STRING in ibis compiler, the schemas aren't match + # Since we are patching JSON to/from STRING in ibis compiler, the schemas aren't match. It will be resolved by pa.json supported. for schema_field in internal_schema.to_bigquery(): if schema_field.field_type == "JSON": return From 9bf25bd8fc24ecf35207366a08362f07efedfaec Mon Sep 17 00:00:00 2001 From: Garrett Wu Date: Tue, 10 Dec 2024 18:25:38 +0000 Subject: [PATCH 08/11] fix tests --- bigframes/dtypes.py | 3 ++- tests/system/small/test_dataframe_io.py | 2 +- tests/system/small/test_series.py | 2 +- 3 files changed, 4 insertions(+), 3 deletions(-) diff --git a/bigframes/dtypes.py b/bigframes/dtypes.py index 6b9b7da565..7af24eac3a 100644 --- a/bigframes/dtypes.py +++ b/bigframes/dtypes.py @@ -634,7 +634,8 @@ def can_coerce(source_type: ExpressionType, target_type: ExpressionType) -> bool return True # None can be coerced to any supported type else: return (source_type == STRING_DTYPE) and ( - target_type in (DATETIME_DTYPE, TIMESTAMP_DTYPE, TIME_DTYPE, DATE_DTYPE) + target_type + in (DATETIME_DTYPE, TIMESTAMP_DTYPE, TIME_DTYPE, DATE_DTYPE, JSON_DTYPE) ) diff --git a/tests/system/small/test_dataframe_io.py b/tests/system/small/test_dataframe_io.py index ab1fdceae5..848e21f6bd 100644 --- a/tests/system/small/test_dataframe_io.py +++ b/tests/system/small/test_dataframe_io.py @@ -259,7 +259,7 @@ def test_load_json(session): { "json_column": ['{"bar":true,"foo":10}'], }, - dtype=pd.StringDtype(storage="pyarrow"), + dtype=pd.ArrowDtype(pa.large_string()), ) expected.index = expected.index.astype("Int64") pd.testing.assert_series_equal(result.dtypes, expected.dtypes) diff --git a/tests/system/small/test_series.py b/tests/system/small/test_series.py index 692b221a19..670828f616 100644 --- a/tests/system/small/test_series.py +++ b/tests/system/small/test_series.py @@ -281,7 +281,7 @@ def test_get_column(scalars_dfs, col_name, expected_dtype): def test_get_column_w_json(json_df, json_pandas_df): series = json_df["json_col"] series_pandas = series.to_pandas() - assert series.dtype == pd.StringDtype(storage="pyarrow") + assert series.dtype == pd.ArrowDtype(pa.large_string()) assert series_pandas.shape[0] == json_pandas_df.shape[0] From d978358dc78408cafdf4771aa0d9315a86f09050 Mon Sep 17 00:00:00 2001 From: Garrett Wu Date: Tue, 10 Dec 2024 22:48:28 +0000 Subject: [PATCH 09/11] fix tests --- bigframes/bigquery/_operations/json.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/bigframes/bigquery/_operations/json.py b/bigframes/bigquery/_operations/json.py index 152c93186a..843991807e 100644 --- a/bigframes/bigquery/_operations/json.py +++ b/bigframes/bigquery/_operations/json.py @@ -47,7 +47,7 @@ def json_set( >>> s = bpd.read_gbq("SELECT JSON '{\\\"a\\\": 1}' AS data")["data"] >>> bbq.json_set(s, json_path_value_pairs=[("$.a", 100), ("$.b", "hi")]) 0 {"a":100,"b":"hi"} - Name: data, dtype: string + Name: data, dtype: large_string[pyarrow] Args: input (bigframes.series.Series): From 6d8fce27238395c2c66c16135cd22330d27fbe41 Mon Sep 17 00:00:00 2001 From: Garrett Wu Date: Thu, 12 Dec 2024 01:16:54 +0000 Subject: [PATCH 10/11] remove json conversions --- bigframes/core/compile/compiler.py | 24 -------------------- bigframes/core/compile/ibis_types.py | 7 ------ bigframes/core/compile/scalar_op_compiler.py | 14 ++++++++++-- bigframes/session/executor.py | 8 ------- 4 files changed, 12 insertions(+), 41 deletions(-) diff --git a/bigframes/core/compile/compiler.py b/bigframes/core/compile/compiler.py index 0a501f1df0..c28dbdfee0 100644 --- a/bigframes/core/compile/compiler.py +++ b/bigframes/core/compile/compiler.py @@ -60,35 +60,11 @@ def compile_sql( if ordered: node, limit = rewrites.pullup_limit_from_slice(node) ir = self.compile_ordered_ir(self._preprocess(node)) - ir = self._patch_json_type(node, ir) return ir.to_sql(ordered=True, limit=limit) else: ir = self.compile_unordered_ir(self._preprocess(node)) # type: ignore - ir = self._patch_json_type(node, ir) return ir.to_sql() - def _patch_json_type( - self, node: nodes.BigFrameNode, ir: compiled.OrderedIR | compiled.UnorderedIR - ): - # Patch back to json type by applying parse_json on json_str columns - import bigframes.dtypes - - json_col_ids = set() - for schema in node.schema.items: - if schema.dtype == bigframes.dtypes.JSON_DTYPE: - json_col_ids.add(schema.column) - value_cols = tuple( - typing.cast( - ibis_types.Value, - compile_scalar.parse_json(value).name(value.get_name()), - ) - if (value.type().is_string() and value.get_name() in json_col_ids) - else value - for value in ir.columns - ) - - return ir._select(value_cols) - def compile_peek_sql(self, node: nodes.BigFrameNode, n_rows: int) -> str: return self.compile_unordered_ir(self._preprocess(node)).peek_sql(n_rows) diff --git a/bigframes/core/compile/ibis_types.py b/bigframes/core/compile/ibis_types.py index d763fc6017..544af69091 100644 --- a/bigframes/core/compile/ibis_types.py +++ b/bigframes/core/compile/ibis_types.py @@ -24,7 +24,6 @@ from bigframes_vendored.ibis.expr.datatypes.core import ( dtype as python_type_to_bigquery_type, ) -import bigframes_vendored.ibis.expr.operations as ibis_ops import bigframes_vendored.ibis.expr.types as ibis_types import geopandas as gpd # type: ignore import google.cloud.bigquery as bigquery @@ -221,12 +220,6 @@ def ibis_value_to_canonical_type(value: ibis_types.Value) -> ibis_types.Value: """ ibis_type = value.type() name = value.get_name() - if ibis_type.is_json(): - value = ibis_ops.ToJsonString(value).to_expr() # type: ignore - value = ( - value.case().when("null", bigframes_vendored.ibis.null()).else_(value).end() - ) - return value.name(name) # Allow REQUIRED fields to be joined with NULLABLE fields. nullable_type = ibis_type.copy(nullable=True) return value.cast(nullable_type).name(name) diff --git a/bigframes/core/compile/scalar_op_compiler.py b/bigframes/core/compile/scalar_op_compiler.py index 9fb8a2f69f..e1aeb5b2b9 100644 --- a/bigframes/core/compile/scalar_op_compiler.py +++ b/bigframes/core/compile/scalar_op_compiler.py @@ -1181,7 +1181,10 @@ def json_set_op_impl(x: ibis_types.Value, y: ibis_types.Value, op: ops.JSONSet): @scalar_op_compiler.register_unary_op(ops.JSONExtract, pass_op=True) def json_extract_op_impl(x: ibis_types.Value, op: ops.JSONExtract): - return json_extract(json_obj=x, json_path=op.json_path) + if x.type().is_json(): + return json_extract(json_obj=x, json_path=op.json_path) + # json string + return json_extract_string(json_obj=x, json_path=op.json_path) @scalar_op_compiler.register_unary_op(ops.JSONExtractArray, pass_op=True) @@ -1860,7 +1863,14 @@ def json_set( # type: ignore[empty-body] def json_extract( # type: ignore[empty-body] json_obj: ibis_dtypes.JSON, json_path: ibis_dtypes.String ) -> ibis_dtypes.JSON: - """Extracts a JSON value and converts it to a SQL JSON-formatted STRING or JSON value.""" + """Extracts a JSON value and converts it to a JSON value.""" + + +@ibis_udf.scalar.builtin(name="json_extract") +def json_extract_string( # type: ignore[empty-body] + json_obj: ibis_dtypes.String, json_path: ibis_dtypes.String +) -> ibis_dtypes.String: + """Extracts a JSON SRING value and converts it to a SQL JSON-formatted STRING.""" @ibis_udf.scalar.builtin(name="json_extract_array") diff --git a/bigframes/session/executor.py b/bigframes/session/executor.py index c085b04e71..1c6c8d46a5 100644 --- a/bigframes/session/executor.py +++ b/bigframes/session/executor.py @@ -667,14 +667,6 @@ def _validate_result_schema( if not bigframes.features.PANDAS_VERSIONS.is_arrow_list_dtype_usable: return - # Since we are patching JSON to/from STRING in ibis compiler, the schemas aren't match. It will be resolved by pa.json supported. - for schema_field in internal_schema.to_bigquery(): - if schema_field.field_type == "JSON": - return - for schema_field in ibis_schema.to_bigquery(): - if schema_field.field_type == "JSON": - return - if internal_schema.to_bigquery() != actual_schema: raise ValueError( f"This error should only occur while testing. BigFrames internal schema: {internal_schema.to_bigquery()} does not match actual schema: {actual_schema}" From 70df7caa6ab371d2d11038f34133bac531609f11 Mon Sep 17 00:00:00 2001 From: Garrett Wu Date: Thu, 12 Dec 2024 21:23:23 +0000 Subject: [PATCH 11/11] fix remote func --- bigframes/core/blocks.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/bigframes/core/blocks.py b/bigframes/core/blocks.py index d1b2f91d60..ca860612f8 100644 --- a/bigframes/core/blocks.py +++ b/bigframes/core/blocks.py @@ -2564,13 +2564,13 @@ def _get_rows_as_json_values(self) -> Block: ), T1 AS ( SELECT *, - JSON_OBJECT( + TO_JSON_STRING(JSON_OBJECT( "names", [{column_names_csv}], "types", [{column_types_csv}], "values", [{column_references_csv}], "indexlength", {index_columns_count}, "dtype", {pandas_row_dtype} - ) AS {googlesql.identifier(row_json_column_name)} FROM T0 + )) AS {googlesql.identifier(row_json_column_name)} FROM T0 ) SELECT {select_columns_csv} FROM T1 """