Skip to content

Commit e03cf4a

Browse files
SNOW-2476998: Add support for expanding.min/max/count/sum/mean/std/var/sem in faster pandas (#3974)
1 parent 367fc26 commit e03cf4a

File tree

3 files changed

+299
-0
lines changed

3 files changed

+299
-0
lines changed

CHANGELOG.md

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -93,6 +93,14 @@
9393
- `rolling.var`
9494
- `rolling.sem`
9595
- `rolling.corr`
96+
- `expanding.min`
97+
- `expanding.max`
98+
- `expanding.count`
99+
- `expanding.sum`
100+
- `expanding.mean`
101+
- `expanding.std`
102+
- `expanding.var`
103+
- `expanding.sem`
96104
- Make faster pandas disabled by default (opt-in instead of opt-out).
97105

98106
## 1.42.0 (2025-10-28)

src/snowflake/snowpark/modin/plugin/compiler/snowflake_query_compiler.py

Lines changed: 245 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -17767,6 +17767,31 @@ def expanding_count(
1776717767
fold_axis: Union[int, str],
1776817768
expanding_kwargs: dict,
1776917769
numeric_only: bool = False,
17770+
) -> "SnowflakeQueryCompiler":
17771+
"""
17772+
Wrapper around _expanding_count_internal to be supported in faster pandas.
17773+
"""
17774+
relaxed_query_compiler = None
17775+
if self._relaxed_query_compiler is not None:
17776+
relaxed_query_compiler = (
17777+
self._relaxed_query_compiler._expanding_count_internal(
17778+
fold_axis=fold_axis,
17779+
expanding_kwargs=expanding_kwargs,
17780+
numeric_only=numeric_only,
17781+
)
17782+
)
17783+
qc = self._expanding_count_internal(
17784+
fold_axis=fold_axis,
17785+
expanding_kwargs=expanding_kwargs,
17786+
numeric_only=numeric_only,
17787+
)
17788+
return self._maybe_set_relaxed_qc(qc, relaxed_query_compiler)
17789+
17790+
def _expanding_count_internal(
17791+
self,
17792+
fold_axis: Union[int, str],
17793+
expanding_kwargs: dict,
17794+
numeric_only: bool = False,
1777017795
) -> "SnowflakeQueryCompiler":
1777117796
return self._window_agg(
1777217797
window_func=WindowFunction.EXPANDING,
@@ -17782,6 +17807,37 @@ def expanding_sum(
1778217807
numeric_only: bool = False,
1778317808
engine: Optional[Literal["cython", "numba"]] = None,
1778417809
engine_kwargs: Optional[dict[str, bool]] = None,
17810+
) -> "SnowflakeQueryCompiler":
17811+
"""
17812+
Wrapper around _expanding_sum_internal to be supported in faster pandas.
17813+
"""
17814+
relaxed_query_compiler = None
17815+
if self._relaxed_query_compiler is not None:
17816+
relaxed_query_compiler = (
17817+
self._relaxed_query_compiler._expanding_sum_internal(
17818+
fold_axis=fold_axis,
17819+
expanding_kwargs=expanding_kwargs,
17820+
numeric_only=numeric_only,
17821+
engine=engine,
17822+
engine_kwargs=engine_kwargs,
17823+
)
17824+
)
17825+
qc = self._expanding_sum_internal(
17826+
fold_axis=fold_axis,
17827+
expanding_kwargs=expanding_kwargs,
17828+
numeric_only=numeric_only,
17829+
engine=engine,
17830+
engine_kwargs=engine_kwargs,
17831+
)
17832+
return self._maybe_set_relaxed_qc(qc, relaxed_query_compiler)
17833+
17834+
def _expanding_sum_internal(
17835+
self,
17836+
fold_axis: Union[int, str],
17837+
expanding_kwargs: dict,
17838+
numeric_only: bool = False,
17839+
engine: Optional[Literal["cython", "numba"]] = None,
17840+
engine_kwargs: Optional[dict[str, bool]] = None,
1778517841
) -> "SnowflakeQueryCompiler":
1778617842
WarningMessage.warning_if_engine_args_is_set(
1778717843
"expanding_sum", engine, engine_kwargs
@@ -17800,6 +17856,37 @@ def expanding_mean(
1780017856
numeric_only: bool = False,
1780117857
engine: Optional[Literal["cython", "numba"]] = None,
1780217858
engine_kwargs: Optional[dict[str, bool]] = None,
17859+
) -> "SnowflakeQueryCompiler":
17860+
"""
17861+
Wrapper around _expanding_mean_internal to be supported in faster pandas.
17862+
"""
17863+
relaxed_query_compiler = None
17864+
if self._relaxed_query_compiler is not None:
17865+
relaxed_query_compiler = (
17866+
self._relaxed_query_compiler._expanding_mean_internal(
17867+
fold_axis=fold_axis,
17868+
expanding_kwargs=expanding_kwargs,
17869+
numeric_only=numeric_only,
17870+
engine=engine,
17871+
engine_kwargs=engine_kwargs,
17872+
)
17873+
)
17874+
qc = self._expanding_mean_internal(
17875+
fold_axis=fold_axis,
17876+
expanding_kwargs=expanding_kwargs,
17877+
numeric_only=numeric_only,
17878+
engine=engine,
17879+
engine_kwargs=engine_kwargs,
17880+
)
17881+
return self._maybe_set_relaxed_qc(qc, relaxed_query_compiler)
17882+
17883+
def _expanding_mean_internal(
17884+
self,
17885+
fold_axis: Union[int, str],
17886+
expanding_kwargs: dict,
17887+
numeric_only: bool = False,
17888+
engine: Optional[Literal["cython", "numba"]] = None,
17889+
engine_kwargs: Optional[dict[str, bool]] = None,
1780317890
) -> "SnowflakeQueryCompiler":
1780417891
WarningMessage.warning_if_engine_args_is_set(
1780517892
"expanding_mean", engine, engine_kwargs
@@ -17829,6 +17916,40 @@ def expanding_var(
1782917916
numeric_only: bool = False,
1783017917
engine: Optional[Literal["cython", "numba"]] = None,
1783117918
engine_kwargs: Optional[dict[str, bool]] = None,
17919+
) -> "SnowflakeQueryCompiler":
17920+
"""
17921+
Wrapper around _expanding_var_internal to be supported in faster pandas.
17922+
"""
17923+
relaxed_query_compiler = None
17924+
if self._relaxed_query_compiler is not None:
17925+
relaxed_query_compiler = (
17926+
self._relaxed_query_compiler._expanding_var_internal(
17927+
fold_axis=fold_axis,
17928+
expanding_kwargs=expanding_kwargs,
17929+
ddof=ddof,
17930+
numeric_only=numeric_only,
17931+
engine=engine,
17932+
engine_kwargs=engine_kwargs,
17933+
)
17934+
)
17935+
qc = self._expanding_var_internal(
17936+
fold_axis=fold_axis,
17937+
expanding_kwargs=expanding_kwargs,
17938+
ddof=ddof,
17939+
numeric_only=numeric_only,
17940+
engine=engine,
17941+
engine_kwargs=engine_kwargs,
17942+
)
17943+
return self._maybe_set_relaxed_qc(qc, relaxed_query_compiler)
17944+
17945+
def _expanding_var_internal(
17946+
self,
17947+
fold_axis: Union[int, str],
17948+
expanding_kwargs: dict,
17949+
ddof: int = 1,
17950+
numeric_only: bool = False,
17951+
engine: Optional[Literal["cython", "numba"]] = None,
17952+
engine_kwargs: Optional[dict[str, bool]] = None,
1783217953
) -> "SnowflakeQueryCompiler":
1783317954
WarningMessage.warning_if_engine_args_is_set(
1783417955
"rolling_var", engine, engine_kwargs
@@ -17848,6 +17969,40 @@ def expanding_std(
1784817969
numeric_only: bool = False,
1784917970
engine: Optional[Literal["cython", "numba"]] = None,
1785017971
engine_kwargs: Optional[dict[str, bool]] = None,
17972+
) -> "SnowflakeQueryCompiler":
17973+
"""
17974+
Wrapper around _expanding_std_internal to be supported in faster pandas.
17975+
"""
17976+
relaxed_query_compiler = None
17977+
if self._relaxed_query_compiler is not None:
17978+
relaxed_query_compiler = (
17979+
self._relaxed_query_compiler._expanding_std_internal(
17980+
fold_axis=fold_axis,
17981+
expanding_kwargs=expanding_kwargs,
17982+
ddof=ddof,
17983+
numeric_only=numeric_only,
17984+
engine=engine,
17985+
engine_kwargs=engine_kwargs,
17986+
)
17987+
)
17988+
qc = self._expanding_std_internal(
17989+
fold_axis=fold_axis,
17990+
expanding_kwargs=expanding_kwargs,
17991+
ddof=ddof,
17992+
numeric_only=numeric_only,
17993+
engine=engine,
17994+
engine_kwargs=engine_kwargs,
17995+
)
17996+
return self._maybe_set_relaxed_qc(qc, relaxed_query_compiler)
17997+
17998+
def _expanding_std_internal(
17999+
self,
18000+
fold_axis: Union[int, str],
18001+
expanding_kwargs: dict,
18002+
ddof: int = 1,
18003+
numeric_only: bool = False,
18004+
engine: Optional[Literal["cython", "numba"]] = None,
18005+
engine_kwargs: Optional[dict[str, bool]] = None,
1785118006
) -> "SnowflakeQueryCompiler":
1785218007
WarningMessage.warning_if_engine_args_is_set(
1785318008
"rolling_std", engine, engine_kwargs
@@ -17866,6 +18021,37 @@ def expanding_min(
1786618021
numeric_only: bool = False,
1786718022
engine: Optional[Literal["cython", "numba"]] = None,
1786818023
engine_kwargs: Optional[dict[str, bool]] = None,
18024+
) -> "SnowflakeQueryCompiler":
18025+
"""
18026+
Wrapper around _expanding_min_internal to be supported in faster pandas.
18027+
"""
18028+
relaxed_query_compiler = None
18029+
if self._relaxed_query_compiler is not None:
18030+
relaxed_query_compiler = (
18031+
self._relaxed_query_compiler._expanding_min_internal(
18032+
fold_axis=fold_axis,
18033+
expanding_kwargs=expanding_kwargs,
18034+
numeric_only=numeric_only,
18035+
engine=engine,
18036+
engine_kwargs=engine_kwargs,
18037+
)
18038+
)
18039+
qc = self._expanding_min_internal(
18040+
fold_axis=fold_axis,
18041+
expanding_kwargs=expanding_kwargs,
18042+
numeric_only=numeric_only,
18043+
engine=engine,
18044+
engine_kwargs=engine_kwargs,
18045+
)
18046+
return self._maybe_set_relaxed_qc(qc, relaxed_query_compiler)
18047+
18048+
def _expanding_min_internal(
18049+
self,
18050+
fold_axis: Union[int, str],
18051+
expanding_kwargs: dict,
18052+
numeric_only: bool = False,
18053+
engine: Optional[Literal["cython", "numba"]] = None,
18054+
engine_kwargs: Optional[dict[str, bool]] = None,
1786918055
) -> "SnowflakeQueryCompiler":
1787018056
WarningMessage.warning_if_engine_args_is_set(
1787118057
"expanding_min", engine, engine_kwargs
@@ -17884,6 +18070,37 @@ def expanding_max(
1788418070
numeric_only: bool = False,
1788518071
engine: Optional[Literal["cython", "numba"]] = None,
1788618072
engine_kwargs: Optional[dict[str, bool]] = None,
18073+
) -> "SnowflakeQueryCompiler":
18074+
"""
18075+
Wrapper around _expanding_max_internal to be supported in faster pandas.
18076+
"""
18077+
relaxed_query_compiler = None
18078+
if self._relaxed_query_compiler is not None:
18079+
relaxed_query_compiler = (
18080+
self._relaxed_query_compiler._expanding_max_internal(
18081+
fold_axis=fold_axis,
18082+
expanding_kwargs=expanding_kwargs,
18083+
numeric_only=numeric_only,
18084+
engine=engine,
18085+
engine_kwargs=engine_kwargs,
18086+
)
18087+
)
18088+
qc = self._expanding_max_internal(
18089+
fold_axis=fold_axis,
18090+
expanding_kwargs=expanding_kwargs,
18091+
numeric_only=numeric_only,
18092+
engine=engine,
18093+
engine_kwargs=engine_kwargs,
18094+
)
18095+
return self._maybe_set_relaxed_qc(qc, relaxed_query_compiler)
18096+
18097+
def _expanding_max_internal(
18098+
self,
18099+
fold_axis: Union[int, str],
18100+
expanding_kwargs: dict,
18101+
numeric_only: bool = False,
18102+
engine: Optional[Literal["cython", "numba"]] = None,
18103+
engine_kwargs: Optional[dict[str, bool]] = None,
1788718104
) -> "SnowflakeQueryCompiler":
1788818105
WarningMessage.warning_if_engine_args_is_set(
1788918106
"expanding_max", engine, engine_kwargs
@@ -17972,6 +18189,34 @@ def expanding_sem(
1797218189
expanding_kwargs: dict,
1797318190
ddof: int = 1,
1797418191
numeric_only: bool = False,
18192+
) -> "SnowflakeQueryCompiler":
18193+
"""
18194+
Wrapper around _expanding_sem_internal to be supported in faster pandas.
18195+
"""
18196+
relaxed_query_compiler = None
18197+
if self._relaxed_query_compiler is not None:
18198+
relaxed_query_compiler = (
18199+
self._relaxed_query_compiler._expanding_sem_internal(
18200+
fold_axis=fold_axis,
18201+
expanding_kwargs=expanding_kwargs,
18202+
ddof=ddof,
18203+
numeric_only=numeric_only,
18204+
)
18205+
)
18206+
qc = self._expanding_sem_internal(
18207+
fold_axis=fold_axis,
18208+
expanding_kwargs=expanding_kwargs,
18209+
ddof=ddof,
18210+
numeric_only=numeric_only,
18211+
)
18212+
return self._maybe_set_relaxed_qc(qc, relaxed_query_compiler)
18213+
18214+
def _expanding_sem_internal(
18215+
self,
18216+
fold_axis: Union[int, str],
18217+
expanding_kwargs: dict,
18218+
ddof: int = 1,
18219+
numeric_only: bool = False,
1797518220
) -> "SnowflakeQueryCompiler":
1797618221
return self._window_agg(
1797718222
window_func=WindowFunction.EXPANDING,

tests/integ/modin/test_faster_pandas.py

Lines changed: 46 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -439,6 +439,52 @@ def test_duplicated(session):
439439
assert_series_equal(snow_result, native_result)
440440

441441

442+
@pytest.mark.parametrize(
443+
"func",
444+
[
445+
"min",
446+
"max",
447+
"count",
448+
"sum",
449+
"mean",
450+
"std",
451+
"var",
452+
"sem",
453+
],
454+
)
455+
@sql_count_checker(query_count=3)
456+
def test_expanding(session, func):
457+
with session_parameter_override(
458+
session, "dummy_row_pos_optimization_enabled", True
459+
):
460+
# create tables
461+
table_name = Utils.random_name_for_temp_object(TempObjectType.TABLE)
462+
session.create_dataframe(
463+
native_pd.DataFrame([[1, 11], [2, 12], [3, 13]], columns=["A", "B"])
464+
).write.save_as_table(table_name, table_type="temp")
465+
466+
# create snow dataframes
467+
df = pd.read_snowflake(table_name)
468+
snow_result = getattr(df.expanding(), func)()
469+
470+
# verify that the input dataframe has a populated relaxed query compiler
471+
assert df._query_compiler._relaxed_query_compiler is not None
472+
assert df._query_compiler._relaxed_query_compiler._dummy_row_pos_mode is True
473+
# verify that the output dataframe also has a populated relaxed query compiler
474+
assert snow_result._query_compiler._relaxed_query_compiler is not None
475+
assert (
476+
snow_result._query_compiler._relaxed_query_compiler._dummy_row_pos_mode
477+
is True
478+
)
479+
480+
# create pandas dataframes
481+
native_df = df.to_pandas()
482+
native_result = getattr(native_df.expanding(), func)()
483+
484+
# compare results
485+
assert_frame_equal(snow_result, native_result, check_dtype=False)
486+
487+
442488
@pytest.mark.parametrize(
443489
"func",
444490
[

0 commit comments

Comments
 (0)