Skip to content

Commit d3b1413

Browse files
Merge branch 'main' into feature/aherrera/SNOW-2455523
2 parents bb088ae + db1b20f commit d3b1413

File tree

4 files changed

+132
-5
lines changed

4 files changed

+132
-5
lines changed

CHANGELOG.md

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -127,7 +127,11 @@
127127
- `expanding.std`
128128
- `expanding.var`
129129
- `expanding.sem`
130+
- `cumsum`
131+
- `cummin`
132+
- `cummax`
130133
- Make faster pandas disabled by default (opt-in instead of opt-out).
134+
- Improve performance of `drop_duplicates` by avoiding joins when `keep!=False` in faster pandas.
131135

132136
## 1.42.0 (2025-10-28)
133137

src/snowflake/snowpark/modin/plugin/compiler/snowflake_query_compiler.py

Lines changed: 85 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -9105,6 +9105,26 @@ def _concat_internal(
91059105
)
91069106
def cumsum(
91079107
self, axis: int = 0, skipna: bool = True, *args: Any, **kwargs: Any
9108+
) -> "SnowflakeQueryCompiler":
9109+
"""
9110+
Wrapper around _cumsum_internal to be supported in faster pandas.
9111+
"""
9112+
relaxed_query_compiler = None
9113+
if self._relaxed_query_compiler is not None:
9114+
relaxed_query_compiler = self._relaxed_query_compiler._cumsum_internal(
9115+
axis=axis,
9116+
skipna=skipna,
9117+
**kwargs,
9118+
)
9119+
qc = self._cumsum_internal(
9120+
axis=axis,
9121+
skipna=skipna,
9122+
**kwargs,
9123+
)
9124+
return self._maybe_set_relaxed_qc(qc, relaxed_query_compiler)
9125+
9126+
def _cumsum_internal(
9127+
self, axis: int = 0, skipna: bool = True, *args: Any, **kwargs: Any
91089128
) -> "SnowflakeQueryCompiler":
91099129
"""
91109130
Return cumulative sum over a DataFrame or Series axis.
@@ -9143,6 +9163,26 @@ def cumsum(
91439163
)
91449164
def cummin(
91459165
self, axis: int = 0, skipna: bool = True, *args: Any, **kwargs: Any
9166+
) -> "SnowflakeQueryCompiler":
9167+
"""
9168+
Wrapper around _cummin_internal to be supported in faster pandas.
9169+
"""
9170+
relaxed_query_compiler = None
9171+
if self._relaxed_query_compiler is not None:
9172+
relaxed_query_compiler = self._relaxed_query_compiler._cummin_internal(
9173+
axis=axis,
9174+
skipna=skipna,
9175+
**kwargs,
9176+
)
9177+
qc = self._cummin_internal(
9178+
axis=axis,
9179+
skipna=skipna,
9180+
**kwargs,
9181+
)
9182+
return self._maybe_set_relaxed_qc(qc, relaxed_query_compiler)
9183+
9184+
def _cummin_internal(
9185+
self, axis: int = 0, skipna: bool = True, *args: Any, **kwargs: Any
91469186
) -> "SnowflakeQueryCompiler":
91479187
"""
91489188
Return cumulative min over a DataFrame or Series axis.
@@ -9181,6 +9221,26 @@ def cummin(
91819221
)
91829222
def cummax(
91839223
self, axis: int = 0, skipna: bool = True, *args: Any, **kwargs: Any
9224+
) -> "SnowflakeQueryCompiler":
9225+
"""
9226+
Wrapper around _cummax_internal to be supported in faster pandas.
9227+
"""
9228+
relaxed_query_compiler = None
9229+
if self._relaxed_query_compiler is not None:
9230+
relaxed_query_compiler = self._relaxed_query_compiler._cummax_internal(
9231+
axis=axis,
9232+
skipna=skipna,
9233+
**kwargs,
9234+
)
9235+
qc = self._cummax_internal(
9236+
axis=axis,
9237+
skipna=skipna,
9238+
**kwargs,
9239+
)
9240+
return self._maybe_set_relaxed_qc(qc, relaxed_query_compiler)
9241+
9242+
def _cummax_internal(
9243+
self, axis: int = 0, skipna: bool = True, *args: Any, **kwargs: Any
91849244
) -> "SnowflakeQueryCompiler":
91859245
"""
91869246
Return cumulative max over a DataFrame or Series axis.
@@ -18622,6 +18682,31 @@ def add_substring(
1862218682
# Returning the query compiler with updated columns and index.
1862318683
return SnowflakeQueryCompiler(result_frame)
1862418684

18685+
def drop_duplicates(self) -> "SnowflakeQueryCompiler":
18686+
"""
18687+
Wrapper around _drop_duplicates_internal to be supported in faster pandas.
18688+
"""
18689+
relaxed_query_compiler = None
18690+
if self._relaxed_query_compiler is not None:
18691+
relaxed_query_compiler = (
18692+
self._relaxed_query_compiler._drop_duplicates_internal()
18693+
)
18694+
qc = self._drop_duplicates_internal()
18695+
return self._maybe_set_relaxed_qc(qc, relaxed_query_compiler)
18696+
18697+
def _drop_duplicates_internal(self) -> "SnowflakeQueryCompiler":
18698+
"""
18699+
Return a DataFrame or Series after dropping the duplicate rows.
18700+
"""
18701+
return self.groupby_agg(
18702+
by=self._modin_frame.data_column_pandas_labels,
18703+
agg_func={},
18704+
axis=0,
18705+
groupby_kwargs={"sort": False, "as_index": False, "dropna": False},
18706+
agg_args=[],
18707+
agg_kwargs={},
18708+
)
18709+
1862518710
def duplicated(
1862618711
self,
1862718712
subset: Union[Hashable, Sequence[Hashable]] = None,

src/snowflake/snowpark/modin/plugin/extensions/base_overrides.py

Lines changed: 6 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1529,8 +1529,12 @@ def drop_duplicates(
15291529
df = self[subset]
15301530
else:
15311531
df = self
1532-
duplicated = df.duplicated(keep=keep)
1533-
result = self[~duplicated]
1532+
if pd.session.dummy_row_pos_optimization_enabled and keep in ["first", "last"]:
1533+
result_qc = df._query_compiler.drop_duplicates()
1534+
result = self.__constructor__(query_compiler=result_qc)
1535+
else:
1536+
duplicated = df.duplicated(keep=keep)
1537+
result = self[~duplicated]
15341538
if ignore_index:
15351539
result.index = pandas.RangeIndex(stop=len(result))
15361540
if inplace:

tests/integ/modin/test_faster_pandas.py

Lines changed: 37 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -340,6 +340,40 @@ def test_copy(session):
340340
assert_frame_equal(snow_result, native_result)
341341

342342

343+
@pytest.mark.parametrize("func", ["cumsum", "cummin", "cummax"])
344+
@sql_count_checker(query_count=3)
345+
def test_cumulative_functions(session, func):
346+
with session_parameter_override(
347+
session, "dummy_row_pos_optimization_enabled", True
348+
):
349+
# create tables
350+
table_name = Utils.random_name_for_temp_object(TempObjectType.TABLE)
351+
session.create_dataframe(
352+
native_pd.DataFrame([[1, 11], [2, 12], [3, 13]], columns=["A", "B"])
353+
).write.save_as_table(table_name, table_type="temp")
354+
355+
# create snow dataframes
356+
df = pd.read_snowflake(table_name)
357+
snow_result = getattr(df["B"], func)()
358+
359+
# verify that the input dataframe has a populated relaxed query compiler
360+
assert df._query_compiler._relaxed_query_compiler is not None
361+
assert df._query_compiler._relaxed_query_compiler._dummy_row_pos_mode is True
362+
# verify that the output dataframe also has a populated relaxed query compiler
363+
assert snow_result._query_compiler._relaxed_query_compiler is not None
364+
assert (
365+
snow_result._query_compiler._relaxed_query_compiler._dummy_row_pos_mode
366+
is True
367+
)
368+
369+
# create pandas dataframes
370+
native_df = df.to_pandas()
371+
native_result = getattr(native_df["B"], func)()
372+
373+
# compare results
374+
assert_series_equal(snow_result, native_result)
375+
376+
343377
@sql_count_checker(query_count=3)
344378
def test_drop(session):
345379
with session_parameter_override(
@@ -373,15 +407,15 @@ def test_drop(session):
373407
assert_frame_equal(snow_result, native_result)
374408

375409

376-
@sql_count_checker(query_count=3, join_count=2)
410+
@sql_count_checker(query_count=3)
377411
def test_drop_duplicates(session):
378412
with session_parameter_override(
379413
session, "dummy_row_pos_optimization_enabled", True
380414
):
381415
# create tables
382416
table_name = Utils.random_name_for_temp_object(TempObjectType.TABLE)
383417
session.create_dataframe(
384-
native_pd.DataFrame([[2, 12], [2, 12], [3, 13]], columns=["A", "B"])
418+
native_pd.DataFrame([[2, 12], [3, 13], [2, 12]], columns=["A", "B"])
385419
).write.save_as_table(table_name, table_type="temp")
386420

387421
# create snow dataframes
@@ -403,7 +437,7 @@ def test_drop_duplicates(session):
403437
native_result = native_df.drop_duplicates()
404438

405439
# compare results
406-
assert_frame_equal(snow_result, native_result)
440+
assert_frame_equal(snow_result, native_result, check_index_type=False)
407441

408442

409443
@sql_count_checker(query_count=3, join_count=1)

0 commit comments

Comments
 (0)