Skip to content

Commit 2bbf53f

Browse files
feat: support interface for BigQuery managed functions (#1373)
* feat: support non vectorized managed function * fix mf tests * fix dataframe apply * fix series apply * fix the decorator in tests * fix test remote func * add more tests * remove unused import * refactor rf in bff session * del udf args * fix docstring * resolve the comments * fix the attribute naming * 🦉 Updates from OwlBot post-processor See https://github.com/googleapis/repo-automation-bots/blob/main/packages/owl-bot/README.md * resolve all comments * resolve comments * resolve comments * 🦉 Updates from OwlBot post-processor See https://github.com/googleapis/repo-automation-bots/blob/main/packages/owl-bot/README.md * fix lint * type ignore * resolve comments --------- Co-authored-by: Owl Bot <gcf-owl-bot[bot]@users.noreply.github.com>
1 parent 03a3a56 commit 2bbf53f

File tree

14 files changed

+1231
-365
lines changed

14 files changed

+1231
-365
lines changed

bigframes/_config/experiment_options.py

Lines changed: 15 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -25,6 +25,7 @@ class ExperimentOptions:
2525
def __init__(self):
2626
self._semantic_operators: bool = False
2727
self._blob: bool = False
28+
self._udf: bool = False
2829

2930
@property
3031
def semantic_operators(self) -> bool:
@@ -53,3 +54,17 @@ def blob(self, value: bool):
5354
)
5455
warnings.warn(msg, category=bfe.PreviewWarning)
5556
self._blob = value
57+
58+
@property
59+
def udf(self) -> bool:
60+
return self._udf
61+
62+
@udf.setter
63+
def udf(self, value: bool):
64+
if value is True:
65+
msg = (
66+
"BigFrames managed function (udf) is still under experiments. "
67+
"It may not work and subject to change in the future."
68+
)
69+
warnings.warn(msg, category=bfe.PreviewWarning)
70+
self._udf = value

bigframes/dataframe.py

Lines changed: 10 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -4098,9 +4098,16 @@ def apply(self, func, *, axis=0, args: typing.Tuple = (), **kwargs):
40984098
msg = "axis=1 scenario is in preview."
40994099
warnings.warn(msg, category=bfe.PreviewWarning)
41004100

4101-
# Check if the function is a remote function
4102-
if not hasattr(func, "bigframes_remote_function"):
4103-
raise ValueError("For axis=1 a remote function must be used.")
4101+
# TODO(jialuo): Deprecate the "bigframes_remote_function" attribute.
4102+
# We have some tests using pre-defined remote_function that were
4103+
# defined based on "bigframes_remote_function" instead of
4104+
# "bigframes_bigquery_function". So we need to fix those pre-defined
4105+
# remote functions before deprecating the "bigframes_remote_function"
4106+
# attribute. Check if the function is a remote function.
4107+
if not hasattr(func, "bigframes_remote_function") and not hasattr(
4108+
func, "bigframes_bigquery_function"
4109+
):
4110+
raise ValueError("For axis=1 a bigframes function must be used.")
41044111

41054112
is_row_processor = getattr(func, "is_row_processor")
41064113
if is_row_processor:

bigframes/functions/_function_client.py

Lines changed: 131 additions & 49 deletions
Original file line numberDiff line numberDiff line change
@@ -21,8 +21,8 @@
2121
import random
2222
import shutil
2323
import string
24-
import sys
2524
import tempfile
25+
import textwrap
2626
import types
2727
from typing import cast, Tuple, TYPE_CHECKING
2828

@@ -55,39 +55,86 @@
5555

5656

5757
class FunctionClient:
58-
# Wait time (in seconds) for an IAM binding to take effect after creation
58+
# Wait time (in seconds) for an IAM binding to take effect after creation.
5959
_iam_wait_seconds = 120
6060

6161
# TODO(b/392707725): Convert all necessary parameters for cloud function
6262
# deployment into method parameters.
6363
def __init__(
6464
self,
6565
gcp_project_id,
66-
cloud_function_region,
67-
cloud_functions_client,
6866
bq_location,
6967
bq_dataset,
7068
bq_client,
7169
bq_connection_id,
7270
bq_connection_manager,
73-
cloud_function_service_account,
74-
cloud_function_kms_key_name,
75-
cloud_function_docker_repository,
71+
cloud_function_region=None,
72+
cloud_functions_client=None,
73+
cloud_function_service_account=None,
74+
cloud_function_kms_key_name=None,
75+
cloud_function_docker_repository=None,
7676
*,
7777
session: Session,
7878
):
7979
self._gcp_project_id = gcp_project_id
80-
self._cloud_function_region = cloud_function_region
81-
self._cloud_functions_client = cloud_functions_client
8280
self._bq_location = bq_location
8381
self._bq_dataset = bq_dataset
8482
self._bq_client = bq_client
8583
self._bq_connection_id = bq_connection_id
8684
self._bq_connection_manager = bq_connection_manager
85+
self._session = session
86+
87+
# Optional attributes only for remote functions.
88+
self._cloud_function_region = cloud_function_region
89+
self._cloud_functions_client = cloud_functions_client
8790
self._cloud_function_service_account = cloud_function_service_account
8891
self._cloud_function_kms_key_name = cloud_function_kms_key_name
8992
self._cloud_function_docker_repository = cloud_function_docker_repository
90-
self._session = session
93+
94+
def _create_bq_connection(self) -> None:
95+
if self._bq_connection_manager:
96+
self._bq_connection_manager.create_bq_connection(
97+
self._gcp_project_id,
98+
self._bq_location,
99+
self._bq_connection_id,
100+
"run.invoker",
101+
)
102+
103+
def _ensure_dataset_exists(self) -> None:
104+
# Make sure the dataset exists, i.e. if it doesn't exist, go ahead and
105+
# create it.
106+
dataset = bigquery.Dataset(
107+
bigquery.DatasetReference.from_string(
108+
self._bq_dataset, default_project=self._gcp_project_id
109+
)
110+
)
111+
dataset.location = self._bq_location
112+
try:
113+
# This check does not require bigquery.datasets.create IAM
114+
# permission. So, if the data set already exists, then user can work
115+
# without having that permission.
116+
self._bq_client.get_dataset(dataset)
117+
except google.api_core.exceptions.NotFound:
118+
# This requires bigquery.datasets.create IAM permission.
119+
self._bq_client.create_dataset(dataset, exists_ok=True)
120+
121+
def _create_bq_function(self, create_function_ddl: str) -> None:
122+
# TODO(swast): plumb through the original, user-facing api_name.
123+
_, query_job = bigframes.session._io.bigquery.start_query_with_client(
124+
self._session.bqclient,
125+
create_function_ddl,
126+
job_config=bigquery.QueryJobConfig(),
127+
)
128+
logger.info(f"Created bigframes function {query_job.ddl_target_routine}")
129+
130+
def _format_function_options(self, function_options: dict) -> str:
131+
return ", ".join(
132+
[
133+
f"{key}='{val}'" if isinstance(val, str) else f"{key}={val}"
134+
for key, val in function_options.items()
135+
if val is not None
136+
]
137+
)
91138

92139
def create_bq_remote_function(
93140
self,
@@ -101,13 +148,7 @@ def create_bq_remote_function(
101148
):
102149
"""Create a BigQuery remote function given the artifacts of a user defined
103150
function and the http endpoint of a corresponding cloud function."""
104-
if self._bq_connection_manager:
105-
self._bq_connection_manager.create_bq_connection(
106-
self._gcp_project_id,
107-
self._bq_location,
108-
self._bq_connection_id,
109-
"run.invoker",
110-
)
151+
self._create_bq_connection()
111152

112153
# Create BQ function
113154
# https://cloud.google.com/bigquery/docs/reference/standard-sql/remote-functions#create_a_remote_function_2
@@ -128,12 +169,8 @@ def create_bq_remote_function(
128169
# bigframes specific metadata for the lack of a better option
129170
remote_function_options["description"] = metadata
130171

131-
remote_function_options_str = ", ".join(
132-
[
133-
f"{key}='{val}'" if isinstance(val, str) else f"{key}={val}"
134-
for key, val in remote_function_options.items()
135-
if val is not None
136-
]
172+
remote_function_options_str = self._format_function_options(
173+
remote_function_options
137174
)
138175

139176
create_function_ddl = f"""
@@ -144,31 +181,78 @@ def create_bq_remote_function(
144181

145182
logger.info(f"Creating BQ remote function: {create_function_ddl}")
146183

147-
# Make sure the dataset exists. I.e. if it doesn't exist, go ahead and
148-
# create it
149-
dataset = bigquery.Dataset(
150-
bigquery.DatasetReference.from_string(
151-
self._bq_dataset, default_project=self._gcp_project_id
152-
)
153-
)
154-
dataset.location = self._bq_location
155-
try:
156-
# This check does not require bigquery.datasets.create IAM
157-
# permission. So, if the data set already exists, then user can work
158-
# without having that permission.
159-
self._bq_client.get_dataset(dataset)
160-
except google.api_core.exceptions.NotFound:
161-
# This requires bigquery.datasets.create IAM permission
162-
self._bq_client.create_dataset(dataset, exists_ok=True)
184+
self._ensure_dataset_exists()
185+
self._create_bq_function(create_function_ddl)
163186

164-
# TODO(swast): plumb through the original, user-facing api_name.
165-
_, query_job = bigframes.session._io.bigquery.start_query_with_client(
166-
self._session.bqclient,
167-
create_function_ddl,
168-
job_config=bigquery.QueryJobConfig(),
187+
def provision_bq_managed_function(
188+
self,
189+
func,
190+
input_types,
191+
output_type,
192+
name,
193+
packages,
194+
is_row_processor,
195+
):
196+
"""Create a BigQuery managed function."""
197+
import cloudpickle
198+
199+
pickled = cloudpickle.dumps(func)
200+
201+
# Create BQ managed function.
202+
bq_function_args = []
203+
bq_function_return_type = output_type
204+
205+
input_args = inspect.getargs(func.__code__).args
206+
# We expect the input type annotations to be 1:1 with the input args.
207+
for name_, type_ in zip(input_args, input_types):
208+
bq_function_args.append(f"{name_} {type_}")
209+
210+
managed_function_options = {
211+
"runtime_version": _utils.get_python_version(),
212+
"entry_point": "bigframes_handler",
213+
}
214+
215+
# Augment user package requirements with any internal package
216+
# requirements.
217+
packages = _utils._get_updated_package_requirements(packages, is_row_processor)
218+
if packages:
219+
managed_function_options["packages"] = packages
220+
managed_function_options_str = self._format_function_options(
221+
managed_function_options
169222
)
170223

171-
logger.info(f"Created remote function {query_job.ddl_target_routine}")
224+
session_id = None if name else self._session.session_id
225+
bq_function_name = name
226+
if not bq_function_name:
227+
# Compute a unique hash representing the user code.
228+
function_hash = _utils._get_hash(func, packages)
229+
bq_function_name = _utils.get_bigframes_function_name(
230+
function_hash,
231+
session_id,
232+
)
233+
234+
persistent_func_id = (
235+
f"`{self._gcp_project_id}.{self._bq_dataset}`.{bq_function_name}"
236+
)
237+
create_function_ddl = textwrap.dedent(
238+
f"""
239+
CREATE OR REPLACE FUNCTION {persistent_func_id}({','.join(bq_function_args)})
240+
RETURNS {bq_function_return_type}
241+
LANGUAGE python
242+
OPTIONS ({managed_function_options_str})
243+
AS r'''
244+
import cloudpickle
245+
udf = cloudpickle.loads({pickled})
246+
def bigframes_handler(*args):
247+
return udf(*args)
248+
'''
249+
"""
250+
).strip()
251+
252+
self._ensure_dataset_exists()
253+
self._create_bq_function(create_function_ddl)
254+
255+
return bq_function_name
172256

173257
def get_cloud_function_fully_qualified_parent(self):
174258
"Get the fully qualilfied parent for a cloud function."
@@ -262,9 +346,7 @@ def create_cloud_function(
262346
# TODO(shobs): Figure out how to achieve version compatibility, specially
263347
# when pickle (internally used by cloudpickle) guarantees that:
264348
# https://docs.python.org/3/library/pickle.html#:~:text=The%20pickle%20serialization%20format%20is,unique%20breaking%20change%20language%20boundary.
265-
python_version = "python{}{}".format(
266-
sys.version_info.major, sys.version_info.minor
267-
)
349+
python_version = _utils.get_python_version(is_compat=True)
268350

269351
# Determine an upload URL for user code
270352
upload_url_request = functions_v2.GenerateUploadUrlRequest(
@@ -443,7 +525,7 @@ def provision_bq_remote_function(
443525
# Derive the name of the remote function
444526
remote_function_name = name
445527
if not remote_function_name:
446-
remote_function_name = _utils.get_remote_function_name(
528+
remote_function_name = _utils.get_bigframes_function_name(
447529
function_hash, self._session.session_id, uniq_suffix
448530
)
449531
rf_endpoint, rf_conn = self.get_remote_function_specs(remote_function_name)

0 commit comments

Comments
 (0)