Skip to content

Commit 4915424

Browse files
authored
feat: support bytes type in remote_function (#761)
* feat: support bytes type in `remote_function` * Update bigframes/functions/remote_function_template.py * Update dataframe.py * print more information about the current exception * fix axis=1 * restore test retries
1 parent dad66fd commit 4915424

File tree

8 files changed

+362
-59
lines changed

8 files changed

+362
-59
lines changed

bigframes/core/blocks.py

Lines changed: 11 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -2339,12 +2339,19 @@ def _get_rows_as_json_values(self) -> Block:
23392339
index_columns_count = len(self.index_columns)
23402340

23412341
# column references to form the array of values for the row
2342-
column_references_csv = sql.csv(
2343-
[sql.cast_as_string(col) for col in self.expr.column_ids]
2344-
)
2342+
column_types = list(self.index.dtypes) + list(self.dtypes)
2343+
column_references = []
2344+
for type_, col in zip(column_types, self.expr.column_ids):
2345+
if isinstance(type_, pd.ArrowDtype) and pa.types.is_binary(
2346+
type_.pyarrow_dtype
2347+
):
2348+
column_references.append(sql.to_json_string(col))
2349+
else:
2350+
column_references.append(sql.cast_as_string(col))
2351+
2352+
column_references_csv = sql.csv(column_references)
23452353

23462354
# types of the columns to serialize for the row
2347-
column_types = list(self.index.dtypes) + list(self.dtypes)
23482355
column_types_csv = sql.csv(
23492356
[sql.simple_literal(str(typ)) for typ in column_types]
23502357
)

bigframes/core/sql.py

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -96,6 +96,12 @@ def cast_as_string(column_name: str) -> str:
9696
return f"CAST({identifier(column_name)} AS STRING)"
9797

9898

99+
def to_json_string(column_name: str) -> str:
100+
"""Return a string representing JSON version of a column."""
101+
102+
return f"TO_JSON_STRING({identifier(column_name)})"
103+
104+
99105
def csv(values: Iterable[str]) -> str:
100106
"""Return a string of comma separated values."""
101107
return ", ".join(values)

bigframes/dataframe.py

Lines changed: 24 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -3313,22 +3313,43 @@ def apply(self, func, *, axis=0, args: typing.Tuple = (), **kwargs):
33133313
# Early check whether the dataframe dtypes are currently supported
33143314
# in the remote function
33153315
# NOTE: Keep in sync with the value converters used in the gcf code
3316-
# generated in generate_cloud_function_main_code in remote_function.py
3316+
# generated in remote_function_template.py
33173317
remote_function_supported_dtypes = (
33183318
bigframes.dtypes.INT_DTYPE,
33193319
bigframes.dtypes.FLOAT_DTYPE,
33203320
bigframes.dtypes.BOOL_DTYPE,
3321+
bigframes.dtypes.BYTES_DTYPE,
33213322
bigframes.dtypes.STRING_DTYPE,
33223323
)
33233324
supported_dtypes_types = tuple(
3324-
type(dtype) for dtype in remote_function_supported_dtypes
3325+
type(dtype)
3326+
for dtype in remote_function_supported_dtypes
3327+
if not isinstance(dtype, pandas.ArrowDtype)
3328+
)
3329+
# Check ArrowDtype separately since multiple BigQuery types map to
3330+
# ArrowDtype, including BYTES and TIMESTAMP.
3331+
supported_arrow_types = tuple(
3332+
dtype.pyarrow_dtype
3333+
for dtype in remote_function_supported_dtypes
3334+
if isinstance(dtype, pandas.ArrowDtype)
33253335
)
33263336
supported_dtypes_hints = tuple(
33273337
str(dtype) for dtype in remote_function_supported_dtypes
33283338
)
33293339

33303340
for dtype in self.dtypes:
3331-
if not isinstance(dtype, supported_dtypes_types):
3341+
if (
3342+
# Not one of the pandas/numpy types.
3343+
not isinstance(dtype, supported_dtypes_types)
3344+
# And not one of the arrow types.
3345+
and not (
3346+
isinstance(dtype, pandas.ArrowDtype)
3347+
and any(
3348+
dtype.pyarrow_dtype.equals(arrow_type)
3349+
for arrow_type in supported_arrow_types
3350+
)
3351+
)
3352+
):
33323353
raise NotImplementedError(
33333354
f"DataFrame has a column of dtype '{dtype}' which is not supported with axis=1."
33343355
f" Supported dtypes are {supported_dtypes_hints}."

bigframes/dtypes.py

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -383,11 +383,12 @@ def bf_type_from_type_kind(bf_schema) -> Dict[str, Dtype]:
383383
# Input and output types supported by BigQuery DataFrames remote functions.
384384
# TODO(shobs): Extend the support to all types supported by BQ remote functions
385385
# https://cloud.google.com/bigquery/docs/remote-functions#limitations
386-
RF_SUPPORTED_IO_PYTHON_TYPES = {bool, float, int, str}
386+
RF_SUPPORTED_IO_PYTHON_TYPES = {bool, bytes, float, int, str}
387387

388388
RF_SUPPORTED_IO_BIGQUERY_TYPEKINDS = {
389389
"BOOLEAN",
390390
"BOOL",
391+
"BYTES",
391392
"FLOAT",
392393
"FLOAT64",
393394
"INT64",

bigframes/functions/remote_function.py

Lines changed: 66 additions & 27 deletions
Original file line numberDiff line numberDiff line change
@@ -32,13 +32,15 @@
3232
NamedTuple,
3333
Optional,
3434
Sequence,
35+
Tuple,
3536
TYPE_CHECKING,
3637
Union,
3738
)
3839
import warnings
3940

4041
import ibis
4142
import pandas
43+
import pyarrow
4244
import requests
4345

4446
if TYPE_CHECKING:
@@ -182,15 +184,11 @@ def create_bq_remote_function(
182184
# Create BQ function
183185
# https://cloud.google.com/bigquery/docs/reference/standard-sql/remote-functions#create_a_remote_function_2
184186
bq_function_args = []
185-
bq_function_return_type = third_party_ibis_bqtypes.BigQueryType.from_ibis(
186-
output_type
187-
)
187+
bq_function_return_type = output_type
188188

189189
# We are expecting the input type annotations to be 1:1 with the input args
190-
for idx, name in enumerate(input_args):
191-
bq_function_args.append(
192-
f"{name} {third_party_ibis_bqtypes.BigQueryType.from_ibis(input_types[idx])}"
193-
)
190+
for name, type_ in zip(input_args, input_types):
191+
bq_function_args.append(f"{name} {type_}")
194192

195193
remote_function_options = {
196194
"endpoint": endpoint,
@@ -259,16 +257,31 @@ def get_cloud_function_endpoint(self, name):
259257
return None
260258

261259
def generate_cloud_function_code(
262-
self, def_, directory, package_requirements=None, is_row_processor=False
260+
self,
261+
def_,
262+
directory,
263+
*,
264+
input_types: Tuple[str],
265+
output_type: str,
266+
package_requirements=None,
267+
is_row_processor=False,
263268
):
264-
"""Generate the cloud function code for a given user defined function."""
269+
"""Generate the cloud function code for a given user defined function.
270+
271+
Args:
272+
input_types (tuple[str]):
273+
Types of the input arguments in BigQuery SQL data type names.
274+
output_type (str):
275+
Types of the output scalar as a BigQuery SQL data type name.
276+
"""
265277

266278
# requirements.txt
267279
requirements = ["cloudpickle >= 2.1.0"]
268280
if is_row_processor:
269281
# bigframes remote function will send an entire row of data as json,
270282
# which would be converted to a pandas series and processed
271283
requirements.append(f"pandas=={pandas.__version__}")
284+
requirements.append(f"pyarrow=={pyarrow.__version__}")
272285
if package_requirements:
273286
requirements.extend(package_requirements)
274287
requirements = sorted(requirements)
@@ -278,26 +291,45 @@ def generate_cloud_function_code(
278291

279292
# main.py
280293
entry_point = bigframes.functions.remote_function_template.generate_cloud_function_main_code(
281-
def_, directory, is_row_processor
294+
def_,
295+
directory,
296+
input_types=input_types,
297+
output_type=output_type,
298+
is_row_processor=is_row_processor,
282299
)
283300
return entry_point
284301

285302
def create_cloud_function(
286303
self,
287304
def_,
288305
cf_name,
306+
*,
307+
input_types: Tuple[str],
308+
output_type: str,
289309
package_requirements=None,
290310
timeout_seconds=600,
291311
max_instance_count=None,
292312
is_row_processor=False,
293313
vpc_connector=None,
294314
):
295-
"""Create a cloud function from the given user defined function."""
315+
"""Create a cloud function from the given user defined function.
316+
317+
Args:
318+
input_types (tuple[str]):
319+
Types of the input arguments in BigQuery SQL data type names.
320+
output_type (str):
321+
Types of the output scalar as a BigQuery SQL data type name.
322+
"""
296323

297324
# Build and deploy folder structure containing cloud function
298325
with tempfile.TemporaryDirectory() as directory:
299326
entry_point = self.generate_cloud_function_code(
300-
def_, directory, package_requirements, is_row_processor
327+
def_,
328+
directory,
329+
package_requirements=package_requirements,
330+
input_types=input_types,
331+
output_type=output_type,
332+
is_row_processor=is_row_processor,
301333
)
302334
archive_path = shutil.make_archive(directory, "zip", directory)
303335

@@ -444,11 +476,13 @@ def provision_bq_remote_function(
444476
cf_endpoint = self.create_cloud_function(
445477
def_,
446478
cloud_function_name,
447-
package_requirements,
448-
cloud_function_timeout,
449-
cloud_function_max_instance_count,
450-
is_row_processor,
451-
cloud_function_vpc_connector,
479+
input_types=input_types,
480+
output_type=output_type,
481+
package_requirements=package_requirements,
482+
timeout_seconds=cloud_function_timeout,
483+
max_instance_count=cloud_function_max_instance_count,
484+
is_row_processor=is_row_processor,
485+
vpc_connector=cloud_function_vpc_connector,
452486
)
453487
else:
454488
logger.info(f"Cloud function {cloud_function_name} already exists.")
@@ -957,16 +991,21 @@ def try_delattr(attr):
957991

958992
rf_name, cf_name = remote_function_client.provision_bq_remote_function(
959993
func,
960-
ibis_signature.input_types,
961-
ibis_signature.output_type,
962-
reuse,
963-
name,
964-
packages,
965-
max_batching_rows,
966-
cloud_function_timeout,
967-
cloud_function_max_instances,
968-
is_row_processor,
969-
cloud_function_vpc_connector,
994+
input_types=tuple(
995+
third_party_ibis_bqtypes.BigQueryType.from_ibis(type_)
996+
for type_ in ibis_signature.input_types
997+
),
998+
output_type=third_party_ibis_bqtypes.BigQueryType.from_ibis(
999+
ibis_signature.output_type
1000+
),
1001+
reuse=reuse,
1002+
name=name,
1003+
package_requirements=packages,
1004+
max_batching_rows=max_batching_rows,
1005+
cloud_function_timeout=cloud_function_timeout,
1006+
cloud_function_max_instance_count=cloud_function_max_instances,
1007+
is_row_processor=is_row_processor,
1008+
cloud_function_vpc_connector=cloud_function_vpc_connector,
9701009
)
9711010

9721011
# TODO: Move ibis logic to compiler step

0 commit comments

Comments
 (0)