|
14 | 14 |
|
15 | 15 | from __future__ import annotations |
16 | 16 |
|
17 | | -import functools |
18 | 17 | import hashlib |
19 | 18 | import inspect |
20 | 19 | import logging |
|
28 | 27 | import textwrap |
29 | 28 | from typing import List, NamedTuple, Optional, Sequence, TYPE_CHECKING |
30 | 29 |
|
| 30 | +import ibis |
31 | 31 | import requests |
32 | 32 |
|
33 | 33 | if TYPE_CHECKING: |
|
43 | 43 | resourcemanager_v3, |
44 | 44 | ) |
45 | 45 | import google.iam.v1 |
46 | | -from ibis.backends.bigquery.compiler import compiles |
47 | | -from ibis.backends.bigquery.datatypes import BigQueryType |
48 | 46 | from ibis.expr.datatypes.core import DataType as IbisDataType |
49 | | -import ibis.expr.operations as ops |
50 | | -import ibis.expr.rules as rlz |
51 | 47 |
|
52 | 48 | from bigframes import clients |
53 | 49 | import bigframes.constants as constants |
54 | 50 | import bigframes.dtypes |
| 51 | +import third_party.bigframes_vendored.ibis.backends.bigquery.datatypes as third_party_ibis_bqtypes |
55 | 52 |
|
56 | 53 | logger = logging.getLogger(__name__) |
57 | 54 |
|
@@ -173,12 +170,14 @@ def create_bq_remote_function( |
173 | 170 | # Create BQ function |
174 | 171 | # https://cloud.google.com/bigquery/docs/reference/standard-sql/remote-functions#create_a_remote_function_2 |
175 | 172 | bq_function_args = [] |
176 | | - bq_function_return_type = BigQueryType.from_ibis(output_type) |
| 173 | + bq_function_return_type = third_party_ibis_bqtypes.BigQueryType.from_ibis( |
| 174 | + output_type |
| 175 | + ) |
177 | 176 |
|
178 | 177 | # We are expecting the input type annotations to be 1:1 with the input args |
179 | 178 | for idx, name in enumerate(input_args): |
180 | 179 | bq_function_args.append( |
181 | | - f"{name} {BigQueryType.from_ibis(input_types[idx])}" |
| 180 | + f"{name} {third_party_ibis_bqtypes.BigQueryType.from_ibis(input_types[idx])}" |
182 | 181 | ) |
183 | 182 | create_function_ddl = f""" |
184 | 183 | CREATE OR REPLACE FUNCTION `{self._gcp_project_id}.{self._bq_dataset}`.{bq_function_name}({','.join(bq_function_args)}) |
@@ -515,33 +514,10 @@ def get_remote_function_specs(self, remote_function_name): |
515 | 514 | return (http_endpoint, bq_connection) |
516 | 515 |
|
517 | 516 |
|
518 | | -def remote_function_node( |
519 | | - routine_ref: bigquery.RoutineReference, ibis_signature: IbisSignature |
520 | | -): |
521 | | - """Creates an Ibis node representing a remote function call.""" |
522 | | - |
523 | | - fields = { |
524 | | - name: rlz.ValueOf(None if type_ == "ANY TYPE" else type_) |
525 | | - for name, type_ in zip( |
526 | | - ibis_signature.parameter_names, ibis_signature.input_types |
527 | | - ) |
528 | | - } |
529 | | - |
530 | | - fields["dtype"] = ibis_signature.output_type # type: ignore |
531 | | - fields["shape"] = rlz.shape_like("args") |
532 | | - |
533 | | - node = type(routine_ref_to_string_for_query(routine_ref), (ops.ValueOp,), fields) # type: ignore |
534 | | - |
535 | | - @compiles(node) |
536 | | - def compile_node(t, op): |
537 | | - return "{}({})".format(node.__name__, ", ".join(map(t.translate, op.args))) |
538 | | - |
539 | | - def f(*args, **kwargs): |
540 | | - return node(*args, **kwargs).to_expr() |
541 | | - |
542 | | - f.bigframes_remote_function = str(routine_ref) # type: ignore |
543 | | - |
544 | | - return f |
| 517 | +class UnsupportedTypeError(ValueError): |
| 518 | + def __init__(self, type_, supported_types): |
| 519 | + self.type = type_ |
| 520 | + self.supported_types = supported_types |
545 | 521 |
|
546 | 522 |
|
547 | 523 | def ibis_signature_from_python_signature( |
@@ -831,14 +807,16 @@ def wrapper(f): |
831 | 807 | packages, |
832 | 808 | ) |
833 | 809 |
|
834 | | - node = remote_function_node(dataset_ref.routine(rf_name), ibis_signature) |
835 | | - |
836 | | - node = functools.wraps(f)(node) |
837 | | - node.__signature__ = signature |
| 810 | + node = ibis.udf.scalar.builtin( |
| 811 | + f, |
| 812 | + name=rf_name, |
| 813 | + schema=f"{dataset_ref.project}.{dataset_ref.dataset_id}", |
| 814 | + signature=(ibis_signature.input_types, ibis_signature.output_type), |
| 815 | + ) |
838 | 816 | node.bigframes_cloud_function = ( |
839 | 817 | remote_function_client.get_cloud_function_fully_qualified_name(cf_name) |
840 | 818 | ) |
841 | | - |
| 819 | + node.bigframes_remote_function = str(dataset_ref.routine(rf_name)) # type: ignore |
842 | 820 | return node |
843 | 821 |
|
844 | 822 | return wrapper |
@@ -888,4 +866,17 @@ def read_gbq_function( |
888 | 866 | f"{constants.FEEDBACK_LINK}" |
889 | 867 | ) |
890 | 868 |
|
891 | | - return remote_function_node(routine_ref, ibis_signature) |
| 869 | + # The name "args" conflicts with the Ibis operator, so we use |
| 870 | + # non-standard names for the arguments here. |
| 871 | + def node(*ignored_args, **ignored_kwargs): |
| 872 | + f"""Remote function {str(routine_ref)}.""" |
| 873 | + |
| 874 | + node.__name__ = routine_ref.routine_id |
| 875 | + node = ibis.udf.scalar.builtin( |
| 876 | + node, |
| 877 | + name=routine_ref.routine_id, |
| 878 | + schema=f"{routine_ref.project}.{routine_ref.dataset_id}", |
| 879 | + signature=(ibis_signature.input_types, ibis_signature.output_type), |
| 880 | + ) |
| 881 | + node.bigframes_remote_function = str(routine_ref) # type: ignore |
| 882 | + return node |
0 commit comments