Skip to content

Commit a6d1b88

Browse files
SNOW-2504821: Add support for cumsum/cummin/cummax in faster pandas (#3978)
1 parent e64dbc9 commit a6d1b88

File tree

3 files changed

+97
-0
lines changed

3 files changed

+97
-0
lines changed

CHANGELOG.md

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -120,6 +120,9 @@
120120
- `expanding.std`
121121
- `expanding.var`
122122
- `expanding.sem`
123+
- `cumsum`
124+
- `cummin`
125+
- `cummax`
123126
- Make faster pandas disabled by default (opt-in instead of opt-out).
124127

125128
## 1.42.0 (2025-10-28)

src/snowflake/snowpark/modin/plugin/compiler/snowflake_query_compiler.py

Lines changed: 60 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -9105,6 +9105,26 @@ def _concat_internal(
91059105
)
91069106
def cumsum(
91079107
self, axis: int = 0, skipna: bool = True, *args: Any, **kwargs: Any
9108+
) -> "SnowflakeQueryCompiler":
9109+
"""
9110+
Wrapper around _cumsum_internal to be supported in faster pandas.
9111+
"""
9112+
relaxed_query_compiler = None
9113+
if self._relaxed_query_compiler is not None:
9114+
relaxed_query_compiler = self._relaxed_query_compiler._cumsum_internal(
9115+
axis=axis,
9116+
skipna=skipna,
9117+
**kwargs,
9118+
)
9119+
qc = self._cumsum_internal(
9120+
axis=axis,
9121+
skipna=skipna,
9122+
**kwargs,
9123+
)
9124+
return self._maybe_set_relaxed_qc(qc, relaxed_query_compiler)
9125+
9126+
def _cumsum_internal(
9127+
self, axis: int = 0, skipna: bool = True, *args: Any, **kwargs: Any
91089128
) -> "SnowflakeQueryCompiler":
91099129
"""
91109130
Return cumulative sum over a DataFrame or Series axis.
@@ -9143,6 +9163,26 @@ def cumsum(
91439163
)
91449164
def cummin(
91459165
self, axis: int = 0, skipna: bool = True, *args: Any, **kwargs: Any
9166+
) -> "SnowflakeQueryCompiler":
9167+
"""
9168+
Wrapper around _cummin_internal to be supported in faster pandas.
9169+
"""
9170+
relaxed_query_compiler = None
9171+
if self._relaxed_query_compiler is not None:
9172+
relaxed_query_compiler = self._relaxed_query_compiler._cummin_internal(
9173+
axis=axis,
9174+
skipna=skipna,
9175+
**kwargs,
9176+
)
9177+
qc = self._cummin_internal(
9178+
axis=axis,
9179+
skipna=skipna,
9180+
**kwargs,
9181+
)
9182+
return self._maybe_set_relaxed_qc(qc, relaxed_query_compiler)
9183+
9184+
def _cummin_internal(
9185+
self, axis: int = 0, skipna: bool = True, *args: Any, **kwargs: Any
91469186
) -> "SnowflakeQueryCompiler":
91479187
"""
91489188
Return cumulative min over a DataFrame or Series axis.
@@ -9181,6 +9221,26 @@ def cummin(
91819221
)
91829222
def cummax(
91839223
self, axis: int = 0, skipna: bool = True, *args: Any, **kwargs: Any
9224+
) -> "SnowflakeQueryCompiler":
9225+
"""
9226+
Wrapper around _cummax_internal to be supported in faster pandas.
9227+
"""
9228+
relaxed_query_compiler = None
9229+
if self._relaxed_query_compiler is not None:
9230+
relaxed_query_compiler = self._relaxed_query_compiler._cummax_internal(
9231+
axis=axis,
9232+
skipna=skipna,
9233+
**kwargs,
9234+
)
9235+
qc = self._cummax_internal(
9236+
axis=axis,
9237+
skipna=skipna,
9238+
**kwargs,
9239+
)
9240+
return self._maybe_set_relaxed_qc(qc, relaxed_query_compiler)
9241+
9242+
def _cummax_internal(
9243+
self, axis: int = 0, skipna: bool = True, *args: Any, **kwargs: Any
91849244
) -> "SnowflakeQueryCompiler":
91859245
"""
91869246
Return cumulative max over a DataFrame or Series axis.

tests/integ/modin/test_faster_pandas.py

Lines changed: 34 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -340,6 +340,40 @@ def test_copy(session):
340340
assert_frame_equal(snow_result, native_result)
341341

342342

343+
@pytest.mark.parametrize("func", ["cumsum", "cummin", "cummax"])
344+
@sql_count_checker(query_count=3)
345+
def test_cumulative_functions(session, func):
346+
with session_parameter_override(
347+
session, "dummy_row_pos_optimization_enabled", True
348+
):
349+
# create tables
350+
table_name = Utils.random_name_for_temp_object(TempObjectType.TABLE)
351+
session.create_dataframe(
352+
native_pd.DataFrame([[1, 11], [2, 12], [3, 13]], columns=["A", "B"])
353+
).write.save_as_table(table_name, table_type="temp")
354+
355+
# create snow dataframes
356+
df = pd.read_snowflake(table_name)
357+
snow_result = getattr(df["B"], func)()
358+
359+
# verify that the input dataframe has a populated relaxed query compiler
360+
assert df._query_compiler._relaxed_query_compiler is not None
361+
assert df._query_compiler._relaxed_query_compiler._dummy_row_pos_mode is True
362+
# verify that the output dataframe also has a populated relaxed query compiler
363+
assert snow_result._query_compiler._relaxed_query_compiler is not None
364+
assert (
365+
snow_result._query_compiler._relaxed_query_compiler._dummy_row_pos_mode
366+
is True
367+
)
368+
369+
# create pandas dataframes
370+
native_df = df.to_pandas()
371+
native_result = getattr(native_df["B"], func)()
372+
373+
# compare results
374+
assert_series_equal(snow_result, native_result)
375+
376+
343377
@sql_count_checker(query_count=3)
344378
def test_drop(session):
345379
with session_parameter_override(

0 commit comments

Comments
 (0)