Skip to content

Commit 6b34244

Browse files
test: ensure all remote_function APIs work in partial ordering mode (#1000)
* test: ensure all `remote_function` APIs work in partial ordering mode * remove force_reproject from more APIs * 🦉 Updates from OwlBot post-processor See https://github.com/googleapis/repo-automation-bots/blob/main/packages/owl-bot/README.md * temporarily skip multiindex test for axis=1 --------- Co-authored-by: Owl Bot <gcf-owl-bot[bot]@users.noreply.github.com>
1 parent f89785f commit 6b34244

File tree

3 files changed

+224
-42
lines changed

3 files changed

+224
-42
lines changed

bigframes/dataframe.py

Lines changed: 2 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -3473,11 +3473,7 @@ def map(self, func, na_action: Optional[str] = None) -> DataFrame:
34733473
raise ValueError(f"na_action={na_action} not supported")
34743474

34753475
# TODO(shobs): Support **kwargs
3476-
# Reproject as workaround to applying filter too late. This forces the
3477-
# filter to be applied before passing data to remote function,
3478-
# protecting from bad inputs causing errors.
3479-
reprojected_df = DataFrame(self._block._force_reproject())
3480-
return reprojected_df._apply_unary_op(
3476+
return self._apply_unary_op(
34813477
ops.RemoteFunctionOp(func=func, apply_on_null=(na_action is None))
34823478
)
34833479

@@ -3572,13 +3568,7 @@ def apply(self, func, *, axis=0, args: typing.Tuple = (), **kwargs):
35723568
)
35733569

35743570
series_list = [self[col] for col in self.columns]
3575-
# Reproject as workaround to applying filter too late. This forces the
3576-
# filter to be applied before passing data to remote function,
3577-
# protecting from bad inputs causing errors.
3578-
reprojected_series = bigframes.series.Series(
3579-
series_list[0]._block._force_reproject()
3580-
)
3581-
result_series = reprojected_series._apply_nary_op(
3571+
result_series = series_list[0]._apply_nary_op(
35823572
ops.NaryRemoteFunctionOp(func=func), series_list[1:]
35833573
)
35843574
result_series.name = None

bigframes/series.py

Lines changed: 1 addition & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -1480,11 +1480,7 @@ def combine(
14801480
ex.message += f"\n{_remote_function_recommendation_message}"
14811481
raise
14821482

1483-
# Reproject as workaround to applying filter too late. This forces the
1484-
# filter to be applied before passing data to remote function,
1485-
# protecting from bad inputs causing errors.
1486-
reprojected_series = Series(self._block._force_reproject())
1487-
result_series = reprojected_series._apply_binary_op(
1483+
result_series = self._apply_binary_op(
14881484
other, ops.BinaryRemoteFunctionOp(func=func)
14891485
)
14901486

tests/system/small/test_remote_function.py

Lines changed: 221 additions & 25 deletions
Original file line numberDiff line numberDiff line change
@@ -498,6 +498,37 @@ def add_one(x):
498498
assert_pandas_df_equal(bf_result, pd_result)
499499

500500

501+
@pytest.mark.flaky(retries=2, delay=120)
502+
def test_dataframe_applymap_explicit_filter(
503+
session_with_bq_connection, scalars_dfs, dataset_id_permanent
504+
):
505+
def add_one(x):
506+
return x + 1
507+
508+
remote_add_one = session_with_bq_connection.remote_function(
509+
[int], int, dataset_id_permanent, name=get_rf_name(add_one)
510+
)(add_one)
511+
512+
scalars_df, scalars_pandas_df = scalars_dfs
513+
int64_cols = ["int64_col", "int64_too"]
514+
515+
bf_int64_df = scalars_df[int64_cols]
516+
bf_int64_df_filtered = bf_int64_df[bf_int64_df["int64_col"].notnull()]
517+
bf_result = bf_int64_df_filtered.applymap(remote_add_one).to_pandas()
518+
519+
pd_int64_df = scalars_pandas_df[int64_cols]
520+
pd_int64_df_filtered = pd_int64_df[pd_int64_df["int64_col"].notnull()]
521+
pd_result = pd_int64_df_filtered.applymap(add_one)
522+
# TODO(shobs): Figure why pandas .applymap() changes the dtype, i.e.
523+
# pd_int64_df_filtered.dtype is Int64Dtype()
524+
# pd_int64_df_filtered.applymap(lambda x: x).dtype is int64.
525+
# For this test let's force the pandas dtype to be same as input.
526+
for col in pd_result:
527+
pd_result[col] = pd_result[col].astype(pd_int64_df_filtered[col].dtype)
528+
529+
assert_pandas_df_equal(bf_result, pd_result)
530+
531+
501532
@pytest.mark.flaky(retries=2, delay=120)
502533
def test_dataframe_applymap_na_ignore(
503534
session_with_bq_connection, scalars_dfs, dataset_id_permanent
@@ -1024,12 +1055,21 @@ def test_read_gbq_function_application_repr(session, dataset_id, scalars_df_inde
10241055
repr(s.mask(should_mask, "REDACTED"))
10251056

10261057

1058+
@pytest.mark.parametrize(
1059+
("method",),
1060+
[
1061+
pytest.param("apply"),
1062+
pytest.param("map"),
1063+
pytest.param("mask"),
1064+
],
1065+
)
10271066
@pytest.mark.flaky(retries=2, delay=120)
1028-
def test_remote_function_apply_after_filter(session, dataset_id_permanent, scalars_dfs):
1029-
1067+
def test_remote_function_unary_applied_after_filter(
1068+
session, dataset_id_permanent, scalars_dfs, method
1069+
):
10301070
# This function is deliberately written to not work with NA input
1031-
def plus_one(x: int) -> int:
1032-
return x + 1
1071+
def is_odd(x: int) -> bool:
1072+
return x % 2 == 1
10331073

10341074
scalars_df, scalars_pandas_df = scalars_dfs
10351075
int_col_name_with_nulls = "int64_col"
@@ -1038,47 +1078,203 @@ def plus_one(x: int) -> int:
10381078
assert any([pd.isna(val) for val in scalars_df[int_col_name_with_nulls]])
10391079

10401080
# create a remote function
1041-
plus_one_remote = session.remote_function(
1042-
dataset=dataset_id_permanent, name=get_rf_name(plus_one)
1043-
)(plus_one)
1081+
is_odd_remote = session.remote_function(
1082+
dataset=dataset_id_permanent, name=get_rf_name(is_odd)
1083+
)(is_odd)
10441084

10451085
# with nulls in the series the remote function application would fail
10461086
with pytest.raises(
10471087
google.api_core.exceptions.BadRequest, match="unsupported operand"
10481088
):
1049-
scalars_df[int_col_name_with_nulls].apply(plus_one_remote).to_pandas()
1089+
bf_method = getattr(scalars_df[int_col_name_with_nulls], method)
1090+
bf_method(is_odd_remote).to_pandas()
10501091

1051-
# after filtering out nulls the remote function application should works
1092+
# after filtering out nulls the remote function application should work
10521093
# similar to pandas
1053-
pd_result = scalars_pandas_df[scalars_pandas_df[int_col_name_with_nulls].notnull()][
1054-
int_col_name_with_nulls
1055-
].apply(plus_one)
1056-
bf_result = (
1094+
pd_method = getattr(
1095+
scalars_pandas_df[scalars_pandas_df[int_col_name_with_nulls].notnull()][
1096+
int_col_name_with_nulls
1097+
],
1098+
method,
1099+
)
1100+
pd_result = pd_method(is_odd)
1101+
bf_method = getattr(
10571102
scalars_df[scalars_df[int_col_name_with_nulls].notnull()][
10581103
int_col_name_with_nulls
1059-
]
1060-
.apply(plus_one_remote)
1104+
],
1105+
method,
1106+
)
1107+
bf_result = bf_method(is_odd_remote).to_pandas()
1108+
1109+
# ignore any dtype difference
1110+
pd.testing.assert_series_equal(pd_result, bf_result, check_dtype=False)
1111+
1112+
1113+
@pytest.mark.flaky(retries=2, delay=120)
1114+
def test_remote_function_binary_applied_after_filter(
1115+
session, dataset_id_permanent, scalars_dfs
1116+
):
1117+
# This function is deliberately written to not work with NA input
1118+
def add(x: int, y: int) -> int:
1119+
return x + y
1120+
1121+
scalars_df, scalars_pandas_df = scalars_dfs
1122+
int_col_name_with_nulls = "int64_col"
1123+
int_col_name_no_nulls = "int64_too"
1124+
bf_df = scalars_df[[int_col_name_with_nulls, int_col_name_no_nulls]]
1125+
pd_df = scalars_pandas_df[[int_col_name_with_nulls, int_col_name_no_nulls]]
1126+
1127+
# make sure there are NA values in the test column
1128+
assert any([pd.isna(val) for val in bf_df[int_col_name_with_nulls]])
1129+
1130+
# create a remote function
1131+
add_remote = session.remote_function(
1132+
dataset=dataset_id_permanent, name=get_rf_name(add)
1133+
)(add)
1134+
1135+
# with nulls in the series the remote function application would fail
1136+
with pytest.raises(
1137+
google.api_core.exceptions.BadRequest, match="unsupported operand"
1138+
):
1139+
bf_df[int_col_name_with_nulls].combine(
1140+
bf_df[int_col_name_no_nulls], add_remote
1141+
).to_pandas()
1142+
1143+
# after filtering out nulls the remote function application should work
1144+
# similar to pandas
1145+
pd_filter = pd_df[int_col_name_with_nulls].notnull()
1146+
pd_result = pd_df[pd_filter][int_col_name_with_nulls].combine(
1147+
pd_df[pd_filter][int_col_name_no_nulls], add
1148+
)
1149+
bf_filter = bf_df[int_col_name_with_nulls].notnull()
1150+
bf_result = (
1151+
bf_df[bf_filter][int_col_name_with_nulls]
1152+
.combine(bf_df[bf_filter][int_col_name_no_nulls], add_remote)
10611153
.to_pandas()
10621154
)
10631155

1064-
# ignore pandas "int64" vs bigframes "Int64" dtype difference
1156+
# ignore any dtype difference
10651157
pd.testing.assert_series_equal(pd_result, bf_result, check_dtype=False)
10661158

10671159

10681160
@pytest.mark.flaky(retries=2, delay=120)
1069-
def test_remote_function_apply_assign_partial_ordering_mode(dataset_id_permanent):
1070-
session = bigframes.Session(bigframes.BigQueryOptions(ordering_mode="partial"))
1161+
def test_remote_function_nary_applied_after_filter(
1162+
session, dataset_id_permanent, scalars_dfs
1163+
):
1164+
# This function is deliberately written to not work with NA input
1165+
def add(x: int, y: int, z: float) -> float:
1166+
return x + y + z
10711167

1072-
df = session.read_gbq("bigquery-public-data.baseball.schedules")[
1168+
scalars_df, scalars_pandas_df = scalars_dfs
1169+
int_col_name_with_nulls = "int64_col"
1170+
int_col_name_no_nulls = "int64_too"
1171+
float_col_name_with_nulls = "float64_col"
1172+
bf_df = scalars_df[
1173+
[int_col_name_with_nulls, int_col_name_no_nulls, float_col_name_with_nulls]
1174+
]
1175+
pd_df = scalars_pandas_df[
1176+
[int_col_name_with_nulls, int_col_name_no_nulls, float_col_name_with_nulls]
1177+
]
1178+
1179+
# make sure there are NA values in the test columns
1180+
assert any([pd.isna(val) for val in bf_df[int_col_name_with_nulls]])
1181+
assert any([pd.isna(val) for val in bf_df[float_col_name_with_nulls]])
1182+
1183+
# create a remote function
1184+
add_remote = session.remote_function(
1185+
dataset=dataset_id_permanent, name=get_rf_name(add)
1186+
)(add)
1187+
1188+
# pandas does not support nary functions, so let's create a proxy function
1189+
# for testing purpose that takes a series and in turn calls the naray function
1190+
def add_pandas(s: pd.Series) -> float:
1191+
return add(
1192+
s[int_col_name_with_nulls],
1193+
s[int_col_name_no_nulls],
1194+
s[float_col_name_with_nulls],
1195+
)
1196+
1197+
# with nulls in the series the remote function application would fail
1198+
with pytest.raises(
1199+
google.api_core.exceptions.BadRequest, match="unsupported operand"
1200+
):
1201+
bf_df.apply(add_remote, axis=1).to_pandas()
1202+
1203+
# after filtering out nulls the remote function application should work
1204+
# similar to pandas
1205+
pd_filter = (
1206+
pd_df[int_col_name_with_nulls].notnull()
1207+
& pd_df[float_col_name_with_nulls].notnull()
1208+
)
1209+
pd_result = pd_df[pd_filter].apply(add_pandas, axis=1)
1210+
bf_filter = (
1211+
bf_df[int_col_name_with_nulls].notnull()
1212+
& bf_df[float_col_name_with_nulls].notnull()
1213+
)
1214+
bf_result = bf_df[bf_filter].apply(add_remote, axis=1).to_pandas()
1215+
1216+
# ignore any dtype difference
1217+
pd.testing.assert_series_equal(pd_result, bf_result, check_dtype=False)
1218+
1219+
1220+
@pytest.mark.parametrize(
1221+
("method",),
1222+
[
1223+
pytest.param("apply"),
1224+
pytest.param("map"),
1225+
pytest.param("mask"),
1226+
],
1227+
)
1228+
@pytest.mark.flaky(retries=2, delay=120)
1229+
def test_remote_function_unary_partial_ordering_mode_assign(
1230+
unordered_session, dataset_id_permanent, method
1231+
):
1232+
df = unordered_session.read_gbq("bigquery-public-data.baseball.schedules")[
10731233
["duration_minutes"]
10741234
]
10751235

1076-
def plus_one(x: int) -> int:
1077-
return x + 1
1236+
def is_long_duration(minutes: int) -> bool:
1237+
return minutes >= 120
1238+
1239+
is_long_duration = unordered_session.remote_function(
1240+
dataset=dataset_id_permanent, name=get_rf_name(is_long_duration)
1241+
)(is_long_duration)
1242+
1243+
method = getattr(df["duration_minutes"], method)
1244+
1245+
df1 = df.assign(duration_meta=method(is_long_duration))
1246+
repr(df1)
1247+
1248+
1249+
@pytest.mark.flaky(retries=2, delay=120)
1250+
def test_remote_function_binary_partial_ordering_mode_assign(
1251+
unordered_session, dataset_id_permanent, scalars_df_index
1252+
):
1253+
def combiner(x: int, y: int) -> int:
1254+
if x is None:
1255+
return y
1256+
return x
1257+
1258+
combiner = unordered_session.remote_function(
1259+
dataset=dataset_id_permanent, name=get_rf_name(combiner)
1260+
)(combiner)
1261+
1262+
df = scalars_df_index[["int64_col", "int64_too", "float64_col", "string_col"]]
1263+
df1 = df.assign(int64_combined=df["int64_col"].combine(df["int64_too"], combiner))
1264+
repr(df1)
1265+
1266+
1267+
@pytest.mark.flaky(retries=2, delay=120)
1268+
def test_remote_function_nary_partial_ordering_mode_assign(
1269+
unordered_session, dataset_id_permanent, scalars_df_index
1270+
):
1271+
def processor(x: int, y: int, z: float, w: str) -> str:
1272+
return f"I got x={x}, y={y}, z={z} and w={w}"
10781273

1079-
plus_one = session.remote_function(
1080-
dataset=dataset_id_permanent, name=get_rf_name(plus_one)
1081-
)(plus_one)
1274+
processor = unordered_session.remote_function(
1275+
dataset=dataset_id_permanent, name=get_rf_name(processor)
1276+
)(processor)
10821277

1083-
df1 = df.assign(duration_cat=df["duration_minutes"].apply(plus_one))
1278+
df = scalars_df_index[["int64_col", "int64_too", "float64_col", "string_col"]]
1279+
df1 = df.assign(combined=df.apply(processor, axis=1))
10841280
repr(df1)

0 commit comments

Comments
 (0)