Skip to content
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
37 changes: 25 additions & 12 deletions bigframes/dataframe.py
Original file line number Diff line number Diff line change
Expand Up @@ -76,6 +76,7 @@
import bigframes.exceptions as bfe
import bigframes.formatting_helpers as formatter
import bigframes.functions
from bigframes.functions import function_typing
import bigframes.operations as ops
import bigframes.operations.aggregations as agg_ops
import bigframes.operations.ai
Expand Down Expand Up @@ -4815,11 +4816,11 @@ def apply(self, func, *, axis=0, args: typing.Tuple = (), **kwargs):
else:
# This is a special case where we are providing not-pandas-like
# extension. If the bigquery function can take one or more
# params (exclude the args) then we assume that here the user
# params (excluding the args) then we assume that here the user
# intention is to use the column values of the dataframe as
# arguments to the function. For this to work the following
# condition must be true:
# 1. The number or input params (exclude the args) in the
# 1. The number or input params (excluding the args) in the
# function must be same as the number of columns in the
# dataframe.
# 2. The dtypes of the columns in the dataframe must be
Expand All @@ -4829,23 +4830,35 @@ def apply(self, func, *, axis=0, args: typing.Tuple = (), **kwargs):
udf_input_dtypes = func.udf_def.signature.bf_input_types
if len(udf_input_dtypes) != len(self.columns) + len(args):
raise ValueError(
f"Column count mismatch: BigFrames BigQuery function"
f" expected {len(udf_input_dtypes) - len(args)} columns"
f" from DataFrame but received {len(self.columns)}."
f"Parameter count mismatch: BigFrames BigQuery function"
f" (including the args) expected {len(udf_input_dtypes)}"
f" but received {len(self.columns) + len(args)}."
)
end_slice = -len(args) if args else None
if udf_input_dtypes[:end_slice] != tuple(self.dtypes.to_list()):
raise ValueError(
f"Data type mismatch: BigFrames BigQuery function takes"
f" arguments of types {udf_input_dtypes} but DataFrame"
f" dtypes are {tuple(self.dtypes)}."
f"Data type mismatch for DataFrame columns:"
f" Expected {udf_input_dtypes[:end_slice]}"
f" Received {tuple(self.dtypes)}."
)
if args:
bq_types = (
function_typing.sdk_type_from_python_type(type(arg))
for arg in args
)
args_dtype = tuple(
function_typing.sdk_type_to_bf_type(bq_type)
for bq_type in bq_types
)
if udf_input_dtypes[end_slice:] != args_dtype:
raise ValueError(
f"Data type mismatch for 'args' parameter:"
f" Expected {udf_input_dtypes[end_slice:]}"
f" Received {args_dtype}."
)

series_list = [self[col] for col in self.columns]
if args:
op_list = series_list[1:] + list(args)
else:
op_list = series_list[1:]
op_list = series_list[1:] + list(args)
result_series = series_list[0]._apply_nary_op(
ops.NaryRemoteFunctionOp(function_def=func.udf_def), op_list
)
Expand Down
23 changes: 12 additions & 11 deletions bigframes/functions/_function_session.py
Original file line number Diff line number Diff line change
Expand Up @@ -959,15 +959,16 @@ def _convert_row_processor_sig(
) -> Optional[inspect.Signature]:
import bigframes.series as bf_series

first_param = next(iter(signature.parameters.values()))
param_type = first_param.annotation
if (param_type == bf_series.Series) or (param_type == pandas.Series):
msg = bfe.format_message("input_types=Series is in preview.")
warnings.warn(msg, stacklevel=1, category=bfe.PreviewWarning)
return signature.replace(
parameters=[
p.replace(annotation=str) if i == 0 else p
for i, p in enumerate(signature.parameters.values())
]
)
if len(signature.parameters) >= 1:
first_param = next(iter(signature.parameters.values()))
param_type = first_param.annotation
if (param_type == bf_series.Series) or (param_type == pandas.Series):
msg = bfe.format_message("input_types=Series is in preview.")
warnings.warn(msg, stacklevel=1, category=bfe.PreviewWarning)
return signature.replace(
parameters=[
p.replace(annotation=str) if i == 0 else p
for i, p in enumerate(signature.parameters.values())
]
)
return None
20 changes: 12 additions & 8 deletions tests/system/large/functions/test_managed_function.py
Original file line number Diff line number Diff line change
Expand Up @@ -467,21 +467,19 @@ def foo(x, y, z):

# Fails to apply on dataframe with incompatible number of columns.
with pytest.raises(
ValueError,
match="^Column count mismatch: BigFrames BigQuery function expected 3 columns from DataFrame but received 2\\.$",
ValueError, match="^Parameter count mismatch:.* expected 3 but received 2."
):
bf_df[["Id", "Age"]].apply(foo, axis=1)

with pytest.raises(
ValueError,
match="^Column count mismatch: BigFrames BigQuery function expected 3 columns from DataFrame but received 4\\.$",
ValueError, match="^Parameter count mismatch:.* expected 3 but received 4."
):
bf_df.assign(Country="lalaland").apply(foo, axis=1)

# Fails to apply on dataframe with incompatible column datatypes.
with pytest.raises(
ValueError,
match="^Data type mismatch: BigFrames BigQuery function takes arguments of types .* but DataFrame dtypes are .*",
match="^Data type mismatch for DataFrame columns: Expected .* Received .*",
):
bf_df.assign(Age=bf_df["Age"].astype("Int64")).apply(foo, axis=1)

Expand Down Expand Up @@ -985,20 +983,26 @@ def the_sum(s1, s2, x):

# Fails to apply on dataframe with incompatible number of columns.
with pytest.raises(
ValueError,
match="^Column count mismatch: BigFrames BigQuery function expected 2 columns from DataFrame but received 3\\.$",
ValueError, match="^Parameter count mismatch:.* expected 3 but received 4."
):
scalars_df[columns + ["float64_col"]].apply(the_sum_mf, axis=1, args=args1)

# Fails to apply on dataframe with incompatible column datatypes.
with pytest.raises(
ValueError,
match="^Data type mismatch: BigFrames BigQuery function takes arguments of types .* but DataFrame dtypes are .*",
match="^Data type mismatch for DataFrame columns: Expected .* Received .*",
):
scalars_df[columns].assign(
int64_col=lambda df: df["int64_col"].astype("Float64")
).apply(the_sum_mf, axis=1, args=args1)

# Fails to apply on dataframe with incompatible args datatypes.
with pytest.raises(
ValueError,
match="^Data type mismatch for 'args' parameter: Expected .* Received .*",
):
scalars_df[columns].apply(the_sum_mf, axis=1, args=(1.3,))

bf_result = (
scalars_df[columns]
.dropna()
Expand Down
45 changes: 26 additions & 19 deletions tests/system/large/functions/test_remote_function.py
Original file line number Diff line number Diff line change
Expand Up @@ -1958,20 +1958,33 @@ def the_sum(s1, s2, x):

# Fails to apply on dataframe with incompatible number of columns.
with pytest.raises(
ValueError,
match="^Column count mismatch: BigFrames BigQuery function expected 2 columns from DataFrame but received 3\\.$",
ValueError, match="^Parameter count mismatch:.* expected 3 but received 4."
):
scalars_df[columns + ["float64_col"]].apply(the_sum_mf, axis=1, args=args1)
scalars_df[columns].apply(
the_sum_mf,
axis=1,
args=(
1,
1,
),
)

# Fails to apply on dataframe with incompatible column datatypes.
with pytest.raises(
ValueError,
match="^Data type mismatch: BigFrames BigQuery function takes arguments of types .* but DataFrame dtypes are .*",
match="^Data type mismatch for DataFrame columns: Expected .* Received .*",
):
scalars_df[columns].assign(
int64_col=lambda df: df["int64_col"].astype("Float64")
).apply(the_sum_mf, axis=1, args=args1)

# Fails to apply on dataframe with incompatible args datatypes.
with pytest.raises(
ValueError,
match="^Data type mismatch for 'args' parameter: Expected .* Received .*",
):
scalars_df[columns].apply(the_sum_mf, axis=1, args=("hello world",))

bf_result = (
scalars_df[columns]
.dropna()
Expand Down Expand Up @@ -2293,20 +2306,18 @@ def foo(x, y, z):

# Fails to apply on dataframe with incompatible number of columns
with pytest.raises(
ValueError,
match="^Column count mismatch: BigFrames BigQuery function expected 3 columns from DataFrame but received 2\\.$",
ValueError, match="^Parameter count mismatch:.* expected 3 but received 2."
):
bf_df[["Id", "Age"]].apply(foo, axis=1)
with pytest.raises(
ValueError,
match="^Column count mismatch: BigFrames BigQuery function expected 3 columns from DataFrame but received 4\\.$",
ValueError, match="^Parameter count mismatch:.* expected 3 but received 4."
):
bf_df.assign(Country="lalaland").apply(foo, axis=1)

# Fails to apply on dataframe with incompatible column datatypes
with pytest.raises(
ValueError,
match="^Data type mismatch: BigFrames BigQuery function takes arguments of types .* but DataFrame dtypes are .*",
match="^Data type mismatch for DataFrame columns: Expected .* Received .*",
):
bf_df.assign(Age=bf_df["Age"].astype("Int64")).apply(foo, axis=1)

Expand Down Expand Up @@ -2377,20 +2388,18 @@ def foo(x, y, z):

# Fails to apply on dataframe with incompatible number of columns
with pytest.raises(
ValueError,
match="^Column count mismatch: BigFrames BigQuery function expected 3 columns from DataFrame but received 2\\.$",
ValueError, match="^Parameter count mismatch:.* expected 3 but received 2."
):
bf_df[["Id", "Age"]].apply(foo, axis=1)
with pytest.raises(
ValueError,
match="^Column count mismatch: BigFrames BigQuery function expected 3 columns from DataFrame but received 4\\.$",
ValueError, match="^Parameter count mismatch:.* expected 3 but received 4."
):
bf_df.assign(Country="lalaland").apply(foo, axis=1)

# Fails to apply on dataframe with incompatible column datatypes
with pytest.raises(
ValueError,
match="^Data type mismatch: BigFrames BigQuery function takes arguments of types .* but DataFrame dtypes are .*",
match="^Data type mismatch for DataFrame columns: Expected .* Received .*",
):
bf_df.assign(Age=bf_df["Age"].astype("Int64")).apply(foo, axis=1)

Expand Down Expand Up @@ -2451,20 +2460,18 @@ def foo(x):

# Fails to apply on dataframe with incompatible number of columns
with pytest.raises(
ValueError,
match="^Column count mismatch: BigFrames BigQuery function expected 1 columns from DataFrame but received 0\\.$",
ValueError, match="^Parameter count mismatch:.* expected 1 but received 0."
):
bf_df[[]].apply(foo, axis=1)
with pytest.raises(
ValueError,
match="^Column count mismatch: BigFrames BigQuery function expected 1 columns from DataFrame but received 2\\.$",
ValueError, match="^Parameter count mismatch:.* expected 1 but received 2."
):
bf_df.assign(Country="lalaland").apply(foo, axis=1)

# Fails to apply on dataframe with incompatible column datatypes
with pytest.raises(
ValueError,
match="^Data type mismatch: BigFrames BigQuery function takes arguments of types .* but DataFrame dtypes are .*",
match="^Data type mismatch for DataFrame columns: Expected .* Received .*",
):
bf_df.assign(Id=bf_df["Id"].astype("Float64")).apply(foo, axis=1)

Expand Down