Skip to content

Commit 94215a4

Browse files
authored
SNOW-2217486: Fix some apply-related Jenkins failures (#3572)
1 parent 25ebb69 commit 94215a4

File tree

3 files changed

+76
-79
lines changed

3 files changed

+76
-79
lines changed

src/snowflake/snowpark/modin/plugin/_internal/apply_utils.py

Lines changed: 68 additions & 71 deletions
Original file line numberDiff line numberDiff line change
@@ -410,17 +410,19 @@ def end_partition(self, df): # type: ignore[no-untyped-def] # pragma: no cover
410410
APPLY_LABEL_COLUMN_QUOTED_IDENTIFIER,
411411
APPLY_VALUE_COLUMN_QUOTED_IDENTIFIER,
412412
]
413-
cache_key = UDTFCacheKey(
414-
pickle_function(ApplyFunc.end_partition),
415-
tuple(col_types),
416-
tuple(col_identifiers),
417-
tuple([LongType()] + input_types),
418-
tuple(pkg.__name__ if isinstance(pkg, ModuleType) else pkg for pkg in packages),
419-
)
420-
cache = session_apply_axis_1_udtf_cache[session]
421-
if cache_key not in cache:
422-
try:
423-
new_udtf = sp_func.udtf(
413+
try:
414+
cache_key = UDTFCacheKey(
415+
pickle_function(ApplyFunc.end_partition),
416+
tuple(col_types),
417+
tuple(col_identifiers),
418+
tuple([LongType()] + input_types),
419+
tuple(
420+
pkg.__name__ if isinstance(pkg, ModuleType) else pkg for pkg in packages
421+
),
422+
)
423+
cache = session_apply_axis_1_udtf_cache[session]
424+
if cache_key not in cache:
425+
cache[cache_key] = sp_func.udtf(
424426
ApplyFunc,
425427
output_schema=PandasDataFrameType(
426428
col_types,
@@ -432,13 +434,12 @@ def end_partition(self, df): # type: ignore[no-untyped-def] # pragma: no cover
432434
session=session,
433435
statement_params=get_default_snowpark_pandas_statement_params(),
434436
)
435-
cache[cache_key] = new_udtf
436-
except NotImplementedError: # pragma: no cover
437-
# When a Snowpark object is passed to a UDF, a NotImplementedError with message
438-
# 'Snowpark pandas does not yet support the method DataFrame.__reduce__' is raised. Instead,
439-
# catch this exception and return a more user-friendly error message.
440-
raise ValueError(APPLY_WITH_SNOWPARK_OBJECT_ERROR_MSG)
441-
return cache[cache_key]
437+
return cache[cache_key]
438+
except NotImplementedError: # pragma: no cover
439+
# When a Snowpark object is passed to a UDF, a NotImplementedError with message
440+
# 'Snowpark pandas does not yet support the method DataFrame.__reduce__' is raised. Instead,
441+
# catch this exception and return a more user-friendly error message.
442+
raise ValueError(APPLY_WITH_SNOWPARK_OBJECT_ERROR_MSG)
442443

443444

444445
def convert_groupby_apply_dataframe_result_to_standard_schema(
@@ -797,17 +798,17 @@ def end_partition(self, df: native_pd.DataFrame): # type: ignore[no-untyped-def
797798
func_result.insert(0, "__min_row_position__", min_row_position)
798799
return func_result
799800

800-
cache_key = UDTFCacheKey(
801-
pickle_function(ApplyFunc.end_partition),
802-
tuple(output_schema.column_types),
803-
tuple(output_schema.column_ids),
804-
tuple(input_column_types),
805-
tuple(session.get_packages().values()),
806-
)
807-
cache = session_groupby_apply_no_pivot_udtf_cache[session]
808-
if cache_key not in cache:
809-
try:
810-
new_udtf = sp_func.udtf(
801+
try:
802+
cache_key = UDTFCacheKey(
803+
pickle_function(ApplyFunc.end_partition),
804+
tuple(output_schema.column_types),
805+
tuple(output_schema.column_ids),
806+
tuple(input_column_types),
807+
tuple(session.get_packages().values()),
808+
)
809+
cache = session_groupby_apply_no_pivot_udtf_cache[session]
810+
if cache_key not in cache:
811+
cache[cache_key] = sp_func.udtf(
811812
ApplyFunc,
812813
output_schema=PandasDataFrameType(
813814
output_schema.column_types, output_schema.column_ids
@@ -819,13 +820,12 @@ def end_partition(self, df: native_pd.DataFrame): # type: ignore[no-untyped-def
819820
session=session,
820821
statement_params=get_default_snowpark_pandas_statement_params(),
821822
)
822-
cache[cache_key] = new_udtf
823-
except NotImplementedError: # pragma: no cover
824-
# When a Snowpark object is passed to a UDF, a NotImplementedError with message
825-
# 'Snowpark pandas does not yet support the method DataFrame.__reduce__' is raised. Instead,
826-
# catch this exception and return a more user-friendly error message.
827-
raise ValueError(APPLY_WITH_SNOWPARK_OBJECT_ERROR_MSG)
828-
return cache[cache_key]
823+
return cache[cache_key]
824+
except NotImplementedError: # pragma: no cover
825+
# When a Snowpark object is passed to a UDF, a NotImplementedError with message
826+
# 'Snowpark pandas does not yet support the method DataFrame.__reduce__' is raised. Instead,
827+
# catch this exception and return a more user-friendly error message.
828+
raise ValueError(APPLY_WITH_SNOWPARK_OBJECT_ERROR_MSG)
829829

830830

831831
def infer_output_schema_for_apply(
@@ -1183,18 +1183,17 @@ def end_partition(self, df: native_pd.DataFrame): # type: ignore[no-untyped-def
11831183
IntegerType(),
11841184
IntegerType(),
11851185
]
1186-
cache_key = UDTFCacheKey(
1187-
pickle_function(ApplyFunc.end_partition),
1188-
tuple(col_types),
1189-
tuple(col_names),
1190-
tuple(input_types),
1191-
tuple(session.get_packages().values()),
1192-
)
1193-
cache = session_groupby_apply_udtf_cache[session]
1194-
1195-
if cache_key not in cache:
1196-
try:
1197-
new_udtf = sp_func.udtf(
1186+
try:
1187+
cache_key = UDTFCacheKey(
1188+
pickle_function(ApplyFunc.end_partition),
1189+
tuple(col_types),
1190+
tuple(col_names),
1191+
tuple(input_types),
1192+
tuple(session.get_packages().values()),
1193+
)
1194+
cache = session_groupby_apply_udtf_cache[session]
1195+
if cache_key not in cache:
1196+
cache[cache_key] = sp_func.udtf(
11981197
ApplyFunc,
11991198
output_schema=PandasDataFrameType(
12001199
col_types,
@@ -1207,13 +1206,12 @@ def end_partition(self, df: native_pd.DataFrame): # type: ignore[no-untyped-def
12071206
session=session,
12081207
statement_params=get_default_snowpark_pandas_statement_params(),
12091208
)
1210-
cache[cache_key] = new_udtf
1211-
except NotImplementedError: # pragma: no cover
1212-
# When a Snowpark object is passed to a UDF, a NotImplementedError with message
1213-
# 'Snowpark pandas does not yet support the method DataFrame.__reduce__' is raised. Instead,
1214-
# catch this exception and return a more user-friendly error message.
1215-
raise ValueError(APPLY_WITH_SNOWPARK_OBJECT_ERROR_MSG)
1216-
return None, cache[cache_key]
1209+
return None, cache[cache_key]
1210+
except NotImplementedError: # pragma: no cover
1211+
# When a Snowpark object is passed to a UDF, a NotImplementedError with message
1212+
# 'Snowpark pandas does not yet support the method DataFrame.__reduce__' is raised. Instead,
1213+
# catch this exception and return a more user-friendly error message.
1214+
raise ValueError(APPLY_WITH_SNOWPARK_OBJECT_ERROR_MSG)
12171215

12181216

12191217
def create_udf_for_series_apply(
@@ -1279,16 +1277,15 @@ def apply_func(x): # type: ignore[no-untyped-def] # pragma: no cover
12791277
return x.apply(func, args=args, **kwargs)
12801278

12811279
strict = na_action == "ignore"
1282-
cache_key = UDFCacheKey(
1283-
pickle_function(apply_func),
1284-
return_type,
1285-
input_type,
1286-
strict,
1287-
)
1288-
cache = session_udf_cache[session]
1289-
1290-
if cache_key not in cache:
1291-
try:
1280+
try:
1281+
cache_key = UDFCacheKey(
1282+
pickle_function(apply_func),
1283+
return_type,
1284+
input_type,
1285+
strict,
1286+
)
1287+
cache = session_udf_cache[session]
1288+
if cache_key not in cache:
12921289
cache[cache_key] = sp_func.udf(
12931290
apply_func,
12941291
return_type=PandasSeriesType(return_type),
@@ -1298,12 +1295,12 @@ def apply_func(x): # type: ignore[no-untyped-def] # pragma: no cover
12981295
packages=packages,
12991296
statement_params=get_default_snowpark_pandas_statement_params(),
13001297
)
1301-
except NotImplementedError: # pragma: no cover
1302-
# When a Snowpark object is passed to a UDF, a NotImplementedError with message
1303-
# 'Snowpark pandas does not yet support the method DataFrame.__reduce__' is raised. Instead,
1304-
# catch this exception and return a more user-friendly error message.
1305-
raise ValueError(APPLY_WITH_SNOWPARK_OBJECT_ERROR_MSG)
1306-
return cache[cache_key]
1298+
return cache[cache_key]
1299+
except NotImplementedError: # pragma: no cover
1300+
# When a Snowpark object is passed to a UDF, a NotImplementedError with message
1301+
# 'Snowpark pandas does not yet support the method DataFrame.__reduce__' is raised. Instead,
1302+
# catch this exception and return a more user-friendly error message.
1303+
raise ValueError(APPLY_WITH_SNOWPARK_OBJECT_ERROR_MSG)
13071304

13081305

13091306
def handle_missing_value_in_variant(value: Any) -> Any:

tests/integ/modin/frame/test_apply.py

Lines changed: 6 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -1007,34 +1007,34 @@ def test_udfs_and_udtfs_with_snowpark_object_error_msg():
10071007
)
10081008
snow_df = pd.DataFrame([7, 8, 9])
10091009
with SqlCounter(
1010-
query_count=16,
1010+
query_count=14,
10111011
high_count_expected=True,
10121012
high_count_reason="Series.apply has high query count",
10131013
):
10141014
with pytest.raises(ValueError, match=expected_error_msg): # Series.apply
10151015
snow_df[0].apply(lambda row: snow_df.iloc[0, 0])
1016-
with SqlCounter(query_count=2):
1016+
with SqlCounter(query_count=0):
10171017
with pytest.raises(
10181018
ValueError, match=expected_error_msg
10191019
): # DataFrame.apply axis=0
10201020
snow_df.apply(lambda row: snow_df.iloc[0, 0])
1021-
with SqlCounter(query_count=2):
1021+
with SqlCounter(query_count=0):
10221022
with pytest.raises(
10231023
ValueError, match=expected_error_msg
10241024
): # DataFrame.apply axis=1
10251025
snow_df.apply(lambda row: snow_df.iloc[0, 0], axis=1)
1026-
with SqlCounter(query_count=2):
1026+
with SqlCounter(query_count=0):
10271027
with pytest.raises(ValueError, match=expected_error_msg): # DataFrame.transform
10281028
snow_df.transform(lambda row: snow_df.iloc[0, 0])
10291029
with SqlCounter(
1030-
query_count=16,
1030+
query_count=14,
10311031
high_count_expected=True,
10321032
high_count_reason="DataFrame.map has high query count",
10331033
):
10341034
with pytest.raises(ValueError, match=expected_error_msg): # DataFrame.map
10351035
snow_df.map(lambda row: snow_df.iloc[0, 0])
10361036
with SqlCounter(
1037-
query_count=16,
1037+
query_count=14,
10381038
high_count_expected=True,
10391039
high_count_reason="Series.map has high query count",
10401040
):

tests/integ/modin/frame/test_cache_result.py

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -213,7 +213,7 @@ def test_cache_result_post_applymap(self, inplace, simple_test_data):
213213
native_pd.DataFrame(simple_test_data).applymap(lambda x: x + x), native_pd
214214
)
215215
with SqlCounter(
216-
query_count=11,
216+
query_count=8,
217217
union_count=9,
218218
udf_count=1,
219219
high_count_expected=True,
@@ -227,7 +227,7 @@ def test_cache_result_post_applymap(self, inplace, simple_test_data):
227227
)
228228

229229
with SqlCounter(
230-
query_count=10,
230+
query_count=7,
231231
high_count_expected=True,
232232
high_count_reason="applymap requires additional queries to setup the UDF.",
233233
):

0 commit comments

Comments
 (0)