Skip to content

Commit bdee173

Browse files
authored
feat: support array output in remote_function (#1057)
* feat: support array output in `remote_function` This is feature request to support use cases like creating custom feature vectors, embeddings etc. * add multiindex test * move array type conversion to bigquery module, test multiindex * add `bigframes.bigquery.json_extract_string_array`, support int and str array outputs * increase cleanup rate * update input and output types doc * support array output in DataFrame.apply * support read_gbq_function on a remote function created for array output * fix the json_set after variable renaming * add tests for output_type in read_gbq_function * temporarily exclude system 3.9 tests and include 3.10 and 3.11 * Revert "temporarily exclude system 3.9 tests and include 3.10 and 3.11" This reverts commit 2485aa3. * add more info in the unexpected exception * more debug info * use unique routine name across tests * Revert "more debug info" This reverts commit 86fe316. * Revert "add more info in the unexpected exception" This reverts commit fe010cb. * support array output in binary remote function operations * support array output in nary remote function operations * preserve array output type in function description to avoid explit output_type in read_gbq_function * fix one failing read_gbq_function test * make test parameterization order deterministic * fix sorting of types for mypy * remove test parameterization with sorting inside * include partial ordering mode testing for read_gbq_function * add remote function array out test in partial ordering mode * avoid repr-eval for output type serialization/deserialization * remove unsupported scenarios system tests, use common exception for unsupported
1 parent a05ffa2 commit bdee173

File tree

20 files changed

+859
-61
lines changed

20 files changed

+859
-61
lines changed

bigframes/clients.py

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -21,6 +21,7 @@
2121
from typing import cast, Optional
2222

2323
import google.api_core.exceptions
24+
import google.api_core.retry
2425
from google.cloud import bigquery_connection_v1, resourcemanager_v3
2526
from google.iam.v1 import iam_policy_pb2, policy_pb2
2627

bigframes/core/compile/ibis_types.py

Lines changed: 15 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -14,6 +14,7 @@
1414
from __future__ import annotations
1515

1616
import textwrap
17+
import typing
1718
from typing import Any, cast, Dict, Iterable, Optional, Tuple, Union
1819
import warnings
1920

@@ -22,7 +23,7 @@
2223
import bigframes_vendored.ibis.backends.bigquery.datatypes as third_party_ibis_bqtypes
2324
import bigframes_vendored.ibis.expr.datatypes as ibis_dtypes
2425
from bigframes_vendored.ibis.expr.datatypes.core import (
25-
dtype as python_type_to_bigquery_type,
26+
dtype as python_type_to_ibis_type,
2627
)
2728
import bigframes_vendored.ibis.expr.types as ibis_types
2829
import geopandas as gpd # type: ignore
@@ -472,12 +473,24 @@ class UnsupportedTypeError(ValueError):
472473
def __init__(self, type_, supported_types):
473474
self.type = type_
474475
self.supported_types = supported_types
476+
super().__init__(
477+
f"'{type_}' is not one of the supported types {supported_types}"
478+
)
475479

476480

477481
def ibis_type_from_python_type(t: type) -> ibis_dtypes.DataType:
478482
if t not in bigframes.dtypes.RF_SUPPORTED_IO_PYTHON_TYPES:
479483
raise UnsupportedTypeError(t, bigframes.dtypes.RF_SUPPORTED_IO_PYTHON_TYPES)
480-
return python_type_to_bigquery_type(t)
484+
return python_type_to_ibis_type(t)
485+
486+
487+
def ibis_array_output_type_from_python_type(t: type) -> ibis_dtypes.DataType:
488+
array_of = typing.get_args(t)[0]
489+
if array_of not in bigframes.dtypes.RF_SUPPORTED_ARRAY_OUTPUT_PYTHON_TYPES:
490+
raise UnsupportedTypeError(
491+
array_of, bigframes.dtypes.RF_SUPPORTED_ARRAY_OUTPUT_PYTHON_TYPES
492+
)
493+
return python_type_to_ibis_type(t)
481494

482495

483496
def ibis_type_from_type_kind(tk: bigquery.StandardSqlTypeNames) -> ibis_dtypes.DataType:

bigframes/dataframe.py

Lines changed: 13 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -4017,6 +4017,19 @@ def apply(self, func, *, axis=0, args: typing.Tuple = (), **kwargs):
40174017
ops.NaryRemoteFunctionOp(func=func), series_list[1:]
40184018
)
40194019
result_series.name = None
4020+
4021+
# if the output is an array, reconstruct it from the json serialized
4022+
# string form
4023+
if bigframes.dtypes.is_array_like(func.output_dtype):
4024+
import bigframes.bigquery as bbq
4025+
4026+
result_dtype = bigframes.dtypes.arrow_dtype_to_bigframes_dtype(
4027+
func.output_dtype.pyarrow_dtype.value_type
4028+
)
4029+
result_series = bbq.json_extract_string_array(
4030+
result_series, value_dtype=result_dtype
4031+
)
4032+
40204033
return result_series
40214034

40224035
# Per-column apply

bigframes/dtypes.py

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -701,6 +701,13 @@ def lcd_type_or_throw(dtype1: Dtype, dtype2: Dtype) -> Dtype:
701701
# https://cloud.google.com/bigquery/docs/remote-functions#limitations
702702
RF_SUPPORTED_IO_PYTHON_TYPES = {bool, bytes, float, int, str}
703703

704+
# Support array output types in BigQuery DataFrames remote functions even though
705+
# it is not currently (2024-10-06) supported in BigQuery remote functions.
706+
# https://cloud.google.com/bigquery/docs/remote-functions#limitations
707+
# TODO(b/284515241): remove this special handling when BigQuery remote functions
708+
# support array.
709+
RF_SUPPORTED_ARRAY_OUTPUT_PYTHON_TYPES = {bool, float, int, str}
710+
704711
RF_SUPPORTED_IO_BIGQUERY_TYPEKINDS = {
705712
"BOOLEAN",
706713
"BOOL",

bigframes/functions/_remote_function_client.py

Lines changed: 11 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -95,6 +95,7 @@ def create_bq_remote_function(
9595
endpoint,
9696
bq_function_name,
9797
max_batching_rows,
98+
metadata,
9899
):
99100
"""Create a BigQuery remote function given the artifacts of a user defined
100101
function and the http endpoint of a corresponding cloud function."""
@@ -120,9 +121,14 @@ def create_bq_remote_function(
120121
"max_batching_rows": max_batching_rows,
121122
}
122123

124+
if metadata:
125+
# We are using the description field to store this structured
126+
# bigframes specific metadata for the lack of a better option
127+
remote_function_options["description"] = metadata
128+
123129
remote_function_options_str = ", ".join(
124130
[
125-
f'{key}="{val}"' if isinstance(val, str) else f"{key}={val}"
131+
f"{key}='{val}'" if isinstance(val, str) else f"{key}={val}"
126132
for key, val in remote_function_options.items()
127133
if val is not None
128134
]
@@ -200,14 +206,7 @@ def generate_cloud_function_code(
200206
package_requirements=None,
201207
is_row_processor=False,
202208
):
203-
"""Generate the cloud function code for a given user defined function.
204-
205-
Args:
206-
input_types (tuple[str]):
207-
Types of the input arguments in BigQuery SQL data type names.
208-
output_type (str):
209-
Types of the output scalar as a BigQuery SQL data type name.
210-
"""
209+
"""Generate the cloud function code for a given user defined function."""
211210

212211
# requirements.txt
213212
if package_requirements:
@@ -240,14 +239,7 @@ def create_cloud_function(
240239
memory_mib=1024,
241240
ingress_settings="all",
242241
):
243-
"""Create a cloud function from the given user defined function.
244-
245-
Args:
246-
input_types (tuple[str]):
247-
Types of the input arguments in BigQuery SQL data type names.
248-
output_type (str):
249-
Types of the output scalar as a BigQuery SQL data type name.
250-
"""
242+
"""Create a cloud function from the given user defined function."""
251243

252244
# Build and deploy folder structure containing cloud function
253245
with tempfile.TemporaryDirectory() as directory:
@@ -394,6 +386,7 @@ def provision_bq_remote_function(
394386
cloud_function_vpc_connector,
395387
cloud_function_memory_mib,
396388
cloud_function_ingress_settings,
389+
bq_metadata,
397390
):
398391
"""Provision a BigQuery remote function."""
399392
# Augment user package requirements with any internal package
@@ -473,6 +466,7 @@ def provision_bq_remote_function(
473466
cf_endpoint,
474467
remote_function_name,
475468
max_batching_rows,
469+
bq_metadata,
476470
)
477471

478472
created_new = True

bigframes/functions/_remote_function_session.py

Lines changed: 34 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -34,6 +34,7 @@
3434

3535
import bigframes_vendored.constants as constants
3636
import bigframes_vendored.ibis.backends.bigquery.datatypes as third_party_ibis_bqtypes
37+
import bigframes_vendored.ibis.expr.datatypes as ibis_dtypes
3738
import bigframes_vendored.ibis.expr.operations.udf as ibis_udf
3839
import cloudpickle
3940
import google.api_core.exceptions
@@ -167,12 +168,19 @@ def remote_function(
167168
`$ gcloud projects add-iam-policy-binding PROJECT_ID --member="serviceAccount:CONNECTION_SERVICE_ACCOUNT_ID" --role="roles/run.invoker"`.
168169
169170
Args:
170-
input_types (None, type, or sequence(type)):
171+
input_types (type or sequence(type), Optional):
171172
For scalar user defined function it should be the input type or
172-
sequence of input types. For row processing user defined function,
173-
type `Series` should be specified.
174-
output_type (Optional[type]):
175-
Data type of the output in the user defined function.
173+
sequence of input types. The supported scalar input types are
174+
`bool`, `bytes`, `float`, `int`, `str`. For row processing user
175+
defined function (i.e. functions that receive a single input
176+
representing a row in form of a Series), type `Series` should be
177+
specified.
178+
output_type (type, Optional):
179+
Data type of the output in the user defined function. If the
180+
user defined function returns an array, then `list[type]` should
181+
be specified. The supported output types are `bool`, `bytes`,
182+
`float`, `int`, `str`, `list[bool]`, `list[float]`, `list[int]`
183+
and `list[str]`.
176184
session (bigframes.Session, Optional):
177185
BigQuery DataFrames session to use for getting default project,
178186
dataset and BigQuery connection.
@@ -497,6 +505,24 @@ def try_delattr(attr):
497505
try_delattr("is_row_processor")
498506
try_delattr("ibis_node")
499507

508+
# resolve the output type that can be supported in the bigframes,
509+
# ibis, BQ remote functions and cloud functions integration
510+
ibis_output_type_for_bqrf = ibis_signature.output_type
511+
bqrf_metadata = None
512+
if isinstance(ibis_signature.output_type, ibis_dtypes.Array):
513+
# TODO(b/284515241): remove this special handling to support
514+
# array output types once BQ remote functions support ARRAY.
515+
# Until then, use json serialized strings at the cloud function
516+
# and BQ level, and parse that to the intended output type at
517+
# the bigframes level.
518+
ibis_output_type_for_bqrf = ibis_dtypes.String()
519+
bqrf_metadata = _utils.get_bigframes_metadata(
520+
python_output_type=output_type
521+
)
522+
bqrf_output_type = third_party_ibis_bqtypes.BigQueryType.from_ibis(
523+
ibis_output_type_for_bqrf
524+
)
525+
500526
(
501527
rf_name,
502528
cf_name,
@@ -508,9 +534,7 @@ def try_delattr(attr):
508534
for type_ in ibis_signature.input_types
509535
if type_ is not None
510536
),
511-
output_type=third_party_ibis_bqtypes.BigQueryType.from_ibis(
512-
ibis_signature.output_type
513-
),
537+
output_type=bqrf_output_type,
514538
reuse=reuse,
515539
name=name,
516540
package_requirements=packages,
@@ -521,6 +545,7 @@ def try_delattr(attr):
521545
cloud_function_vpc_connector=cloud_function_vpc_connector,
522546
cloud_function_memory_mib=cloud_function_memory_mib,
523547
cloud_function_ingress_settings=cloud_function_ingress_settings,
548+
bq_metadata=bqrf_metadata,
524549
)
525550

526551
# TODO(shobs): Find a better way to support udfs with param named "name".
@@ -541,7 +566,7 @@ def try_delattr(attr):
541566
name=rf_name,
542567
catalog=dataset_ref.project,
543568
database=dataset_ref.dataset_id,
544-
signature=(ibis_signature.input_types, ibis_signature.output_type),
569+
signature=(ibis_signature.input_types, ibis_output_type_for_bqrf),
545570
) # type: ignore
546571
func.bigframes_cloud_function = (
547572
remote_function_client.get_cloud_function_fully_qualified_name(cf_name)

bigframes/functions/_utils.py

Lines changed: 75 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -15,6 +15,8 @@
1515

1616
import hashlib
1717
import inspect
18+
import json
19+
import typing
1820
from typing import cast, List, NamedTuple, Optional, Sequence, Set
1921

2022
import bigframes_vendored.ibis.expr.datatypes.core as ibis_dtypes
@@ -26,6 +28,7 @@
2628
import pyarrow
2729

2830
import bigframes.core.compile.ibis_types
31+
import bigframes.dtypes
2932

3033
# Naming convention for the remote function artifacts
3134
_BIGFRAMES_REMOTE_FUNCTION_PREFIX = "bigframes"
@@ -194,6 +197,7 @@ class IbisSignature(NamedTuple):
194197
parameter_names: List[str]
195198
input_types: List[Optional[ibis_dtypes.DataType]]
196199
output_type: ibis_dtypes.DataType
200+
output_type_override: Optional[ibis_dtypes.DataType] = None
197201

198202

199203
def ibis_signature_from_python_signature(
@@ -202,13 +206,77 @@ def ibis_signature_from_python_signature(
202206
output_type: type,
203207
) -> IbisSignature:
204208

209+
ibis_input_types: List[Optional[ibis_dtypes.DataType]] = [
210+
bigframes.core.compile.ibis_types.ibis_type_from_python_type(t)
211+
for t in input_types
212+
]
213+
214+
if typing.get_origin(output_type) is list:
215+
ibis_output_type = (
216+
bigframes.core.compile.ibis_types.ibis_array_output_type_from_python_type(
217+
output_type
218+
)
219+
)
220+
else:
221+
ibis_output_type = bigframes.core.compile.ibis_types.ibis_type_from_python_type(
222+
output_type
223+
)
224+
205225
return IbisSignature(
206226
parameter_names=list(signature.parameters.keys()),
207-
input_types=[
208-
bigframes.core.compile.ibis_types.ibis_type_from_python_type(t)
209-
for t in input_types
210-
],
211-
output_type=bigframes.core.compile.ibis_types.ibis_type_from_python_type(
212-
output_type
213-
),
227+
input_types=ibis_input_types,
228+
output_type=ibis_output_type,
214229
)
230+
231+
232+
def get_python_output_type_from_bigframes_metadata(
233+
metadata_text: str,
234+
) -> Optional[type]:
235+
try:
236+
metadata_dict = json.loads(metadata_text)
237+
except (TypeError, json.decoder.JSONDecodeError):
238+
return None
239+
240+
try:
241+
output_type = metadata_dict["value"]["python_array_output_type"]
242+
except KeyError:
243+
return None
244+
245+
for (
246+
python_output_array_type
247+
) in bigframes.dtypes.RF_SUPPORTED_ARRAY_OUTPUT_PYTHON_TYPES:
248+
if python_output_array_type.__name__ == output_type:
249+
return list[python_output_array_type] # type: ignore
250+
251+
return None
252+
253+
254+
def get_bigframes_metadata(*, python_output_type: Optional[type] = None) -> str:
255+
# Let's keep the actual metadata inside one level of nesting so that in
256+
# future we can use a top level key "version" (parallel to "value"), based
257+
# on which "value" can be interpreted according to the "version". The
258+
# absence of "version" should be interpreted as default version.
259+
inner_metadata = {}
260+
if typing.get_origin(python_output_type) is list:
261+
python_output_array_type = typing.get_args(python_output_type)[0]
262+
if (
263+
python_output_array_type
264+
in bigframes.dtypes.RF_SUPPORTED_ARRAY_OUTPUT_PYTHON_TYPES
265+
):
266+
inner_metadata[
267+
"python_array_output_type"
268+
] = python_output_array_type.__name__
269+
270+
metadata = {"value": inner_metadata}
271+
metadata_ser = json.dumps(metadata)
272+
273+
# let's make sure the serialized value is deserializable
274+
if (
275+
get_python_output_type_from_bigframes_metadata(metadata_ser)
276+
!= python_output_type
277+
):
278+
raise ValueError(
279+
f"python_output_type {python_output_type} is not serializable."
280+
)
281+
282+
return metadata_ser

0 commit comments

Comments
 (0)