Skip to content

Commit 8baa912

Browse files
authored
feat: support params max_batching_rows, container_cpu, and container_memory for udf (#1897)
* feat: support new runtime options for udf * fix * fix test * fix test * resolve the comments
1 parent 40e7638 commit 8baa912

File tree

5 files changed

+155
-0
lines changed

5 files changed

+155
-0
lines changed

bigframes/functions/_function_client.py

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -202,6 +202,9 @@ def provision_bq_managed_function(
202202
output_type: str,
203203
name: Optional[str],
204204
packages: Optional[Sequence[str]],
205+
max_batching_rows: Optional[int],
206+
container_cpu: Optional[float],
207+
container_memory: Optional[str],
205208
is_row_processor: bool,
206209
bq_connection_id,
207210
*,
@@ -234,6 +237,12 @@ def provision_bq_managed_function(
234237
"runtime_version": _MANAGED_FUNC_PYTHON_VERSION,
235238
"entry_point": "bigframes_handler",
236239
}
240+
if max_batching_rows:
241+
managed_function_options["max_batching_rows"] = max_batching_rows
242+
if container_cpu:
243+
managed_function_options["container_cpu"] = container_cpu
244+
if container_memory:
245+
managed_function_options["container_memory"] = container_memory
237246

238247
# Augment user package requirements with any internal package
239248
# requirements.

bigframes/functions/_function_session.py

Lines changed: 21 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -702,6 +702,9 @@ def udf(
702702
bigquery_connection: Optional[str] = None,
703703
name: Optional[str] = None,
704704
packages: Optional[Sequence[str]] = None,
705+
max_batching_rows: Optional[int] = None,
706+
container_cpu: Optional[float] = None,
707+
container_memory: Optional[str] = None,
705708
):
706709
"""Decorator to turn a Python user defined function (udf) into a
707710
BigQuery managed function.
@@ -769,6 +772,21 @@ def udf(
769772
dependency is added to the `requirements.txt` as is, and can be
770773
of the form supported in
771774
https://pip.pypa.io/en/stable/reference/requirements-file-format/.
775+
max_batching_rows (int, Optional):
776+
The maximum number of rows in each batch. If you specify
777+
max_batching_rows, BigQuery determines the number of rows in a
778+
batch, up to the max_batching_rows limit. If max_batching_rows
779+
is not specified, the number of rows to batch is determined
780+
automatically.
781+
container_cpu (float, Optional):
782+
The CPU limits for containers that run Python UDFs. By default,
783+
the CPU allocated is 0.33 vCPU. See details at
784+
https://cloud.google.com/bigquery/docs/user-defined-functions-python#configure-container-limits.
785+
container_memory (str, Optional):
786+
The memory limits for containers that run Python UDFs. By
787+
default, the memory allocated to each container instance is
788+
512 MiB. See details at
789+
https://cloud.google.com/bigquery/docs/user-defined-functions-python#configure-container-limits.
772790
"""
773791

774792
warnings.warn("udf is in preview.", category=bfe.PreviewWarning, stacklevel=5)
@@ -854,6 +872,9 @@ def wrapper(func):
854872
output_type=udf_sig.sql_output_type,
855873
name=name,
856874
packages=packages,
875+
max_batching_rows=max_batching_rows,
876+
container_cpu=container_cpu,
877+
container_memory=container_memory,
857878
is_row_processor=is_row_processor,
858879
bq_connection_id=bq_connection_id,
859880
)

bigframes/pandas/__init__.py

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -142,6 +142,9 @@ def udf(
142142
bigquery_connection: Optional[str] = None,
143143
name: str,
144144
packages: Optional[Sequence[str]] = None,
145+
max_batching_rows: Optional[int] = None,
146+
container_cpu: Optional[float] = None,
147+
container_memory: Optional[str] = None,
145148
):
146149
return global_session.with_default_session(
147150
bigframes.session.Session.udf,
@@ -151,6 +154,9 @@ def udf(
151154
bigquery_connection=bigquery_connection,
152155
name=name,
153156
packages=packages,
157+
max_batching_rows=max_batching_rows,
158+
container_cpu=container_cpu,
159+
container_memory=container_memory,
154160
)
155161

156162

bigframes/session/__init__.py

Lines changed: 21 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1686,6 +1686,9 @@ def udf(
16861686
bigquery_connection: Optional[str] = None,
16871687
name: str,
16881688
packages: Optional[Sequence[str]] = None,
1689+
max_batching_rows: Optional[int] = None,
1690+
container_cpu: Optional[float] = None,
1691+
container_memory: Optional[str] = None,
16891692
):
16901693
"""Decorator to turn a Python user defined function (udf) into a
16911694
[BigQuery managed user-defined function](https://cloud.google.com/bigquery/docs/user-defined-functions-python).
@@ -1807,6 +1810,21 @@ def udf(
18071810
dependency is added to the `requirements.txt` as is, and can be
18081811
of the form supported in
18091812
https://pip.pypa.io/en/stable/reference/requirements-file-format/.
1813+
max_batching_rows (int, Optional):
1814+
The maximum number of rows in each batch. If you specify
1815+
max_batching_rows, BigQuery determines the number of rows in a
1816+
batch, up to the max_batching_rows limit. If max_batching_rows
1817+
is not specified, the number of rows to batch is determined
1818+
automatically.
1819+
container_cpu (float, Optional):
1820+
The CPU limits for containers that run Python UDFs. By default,
1821+
the CPU allocated is 0.33 vCPU. See details at
1822+
https://cloud.google.com/bigquery/docs/user-defined-functions-python#configure-container-limits.
1823+
container_memory (str, Optional):
1824+
The memory limits for containers that run Python UDFs. By
1825+
default, the memory allocated to each container instance is
1826+
512 MiB. See details at
1827+
https://cloud.google.com/bigquery/docs/user-defined-functions-python#configure-container-limits.
18101828
Returns:
18111829
collections.abc.Callable:
18121830
A managed function object pointing to the cloud assets created
@@ -1828,6 +1846,9 @@ def udf(
18281846
bigquery_connection=bigquery_connection,
18291847
name=name,
18301848
packages=packages,
1849+
max_batching_rows=max_batching_rows,
1850+
container_cpu=container_cpu,
1851+
container_memory=container_memory,
18311852
)
18321853

18331854
def read_gbq_function(

tests/system/large/functions/test_managed_function.py

Lines changed: 98 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -549,3 +549,101 @@ def foo(x: int) -> int:
549549
finally:
550550
# Clean up the gcp assets created for the managed function.
551551
cleanup_function_assets(foo, session.bqclient, ignore_failures=False)
552+
553+
554+
def test_managed_function_options(session, dataset_id, scalars_dfs):
555+
try:
556+
557+
def multiply_five(x: int) -> int:
558+
return x * 5
559+
560+
mf_multiply_five = session.udf(
561+
dataset=dataset_id,
562+
name=prefixer.create_prefix(),
563+
max_batching_rows=100,
564+
container_cpu=2,
565+
container_memory="2Gi",
566+
)(multiply_five)
567+
568+
scalars_df, scalars_pandas_df = scalars_dfs
569+
570+
bf_int64_df = scalars_df["int64_col"]
571+
bf_int64_df_filtered = bf_int64_df.dropna()
572+
bf_result = bf_int64_df_filtered.apply(mf_multiply_five).to_pandas()
573+
574+
pd_int64_df = scalars_pandas_df["int64_col"]
575+
pd_int64_df_filtered = pd_int64_df.dropna()
576+
pd_result = pd_int64_df_filtered.apply(multiply_five)
577+
578+
pandas.testing.assert_series_equal(bf_result, pd_result, check_dtype=False)
579+
580+
# Make sure the read_gbq_function path works for this function.
581+
multiply_five_ref = session.read_gbq_function(
582+
function_name=mf_multiply_five.bigframes_bigquery_function, # type: ignore
583+
)
584+
assert mf_multiply_five.bigframes_bigquery_function == multiply_five_ref.bigframes_bigquery_function # type: ignore
585+
586+
bf_result = bf_int64_df_filtered.apply(multiply_five_ref).to_pandas()
587+
pandas.testing.assert_series_equal(bf_result, pd_result, check_dtype=False)
588+
589+
# Retrieve the routine and validate its runtime configuration.
590+
routine = session.bqclient.get_routine(
591+
mf_multiply_five.bigframes_bigquery_function
592+
)
593+
594+
# TODO(jialuo): Use the newly exposed class properties instead of
595+
# accessing the hidden _properties after resolve of this issue:
596+
# https://github.com/googleapis/python-bigquery/issues/2240.
597+
assert routine._properties["externalRuntimeOptions"]["maxBatchingRows"] == "100"
598+
assert routine._properties["externalRuntimeOptions"]["containerCpu"] == 2
599+
assert routine._properties["externalRuntimeOptions"]["containerMemory"] == "2Gi"
600+
601+
finally:
602+
# Clean up the gcp assets created for the managed function.
603+
cleanup_function_assets(
604+
mf_multiply_five, session.bqclient, ignore_failures=False
605+
)
606+
607+
608+
def test_managed_function_options_errors(session, dataset_id):
609+
def foo(x: int) -> int:
610+
return 0
611+
612+
with pytest.raises(
613+
google.api_core.exceptions.BadRequest,
614+
# For CPU Value >= 1.0, the value must be one of [1, 2, ...].
615+
match="Invalid container_cpu function OPTIONS value",
616+
):
617+
session.udf(
618+
dataset=dataset_id,
619+
name=prefixer.create_prefix(),
620+
max_batching_rows=100,
621+
container_cpu=2.5,
622+
container_memory="2Gi",
623+
)(foo)
624+
625+
with pytest.raises(
626+
google.api_core.exceptions.BadRequest,
627+
# For less than 1.0 CPU, the value must be no less than 0.33.
628+
match="Invalid container_cpu function OPTIONS value",
629+
):
630+
session.udf(
631+
dataset=dataset_id,
632+
name=prefixer.create_prefix(),
633+
max_batching_rows=100,
634+
container_cpu=0.10,
635+
container_memory="512Mi",
636+
)(foo)
637+
638+
with pytest.raises(
639+
google.api_core.exceptions.BadRequest,
640+
# For 2.00 CPU, the memory must be in the range of [256Mi, 8Gi].
641+
match="Invalid container_memory function OPTIONS value",
642+
):
643+
session.udf(
644+
dataset=dataset_id,
645+
name=prefixer.create_prefix(),
646+
max_batching_rows=100,
647+
container_cpu=2,
648+
container_memory="64Mi",
649+
)(foo)

0 commit comments

Comments
 (0)