Skip to content

Commit 516fa67

Browse files
Merge branch 'main' into helmeleegy-SNOW-2504821
2 parents c6e88b4 + e03cf4a commit 516fa67

File tree

3 files changed

+299
-0
lines changed

3 files changed

+299
-0
lines changed

CHANGELOG.md

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -93,6 +93,14 @@
9393
- `rolling.var`
9494
- `rolling.sem`
9595
- `rolling.corr`
96+
- `expanding.min`
97+
- `expanding.max`
98+
- `expanding.count`
99+
- `expanding.sum`
100+
- `expanding.mean`
101+
- `expanding.std`
102+
- `expanding.var`
103+
- `expanding.sem`
96104
- `cumsum`
97105
- `cummin`
98106
- `cummax`

src/snowflake/snowpark/modin/plugin/compiler/snowflake_query_compiler.py

Lines changed: 245 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -17833,6 +17833,31 @@ def expanding_count(
1783317833
fold_axis: Union[int, str],
1783417834
expanding_kwargs: dict,
1783517835
numeric_only: bool = False,
17836+
) -> "SnowflakeQueryCompiler":
17837+
"""
17838+
Wrapper around _expanding_count_internal to be supported in faster pandas.
17839+
"""
17840+
relaxed_query_compiler = None
17841+
if self._relaxed_query_compiler is not None:
17842+
relaxed_query_compiler = (
17843+
self._relaxed_query_compiler._expanding_count_internal(
17844+
fold_axis=fold_axis,
17845+
expanding_kwargs=expanding_kwargs,
17846+
numeric_only=numeric_only,
17847+
)
17848+
)
17849+
qc = self._expanding_count_internal(
17850+
fold_axis=fold_axis,
17851+
expanding_kwargs=expanding_kwargs,
17852+
numeric_only=numeric_only,
17853+
)
17854+
return self._maybe_set_relaxed_qc(qc, relaxed_query_compiler)
17855+
17856+
def _expanding_count_internal(
17857+
self,
17858+
fold_axis: Union[int, str],
17859+
expanding_kwargs: dict,
17860+
numeric_only: bool = False,
1783617861
) -> "SnowflakeQueryCompiler":
1783717862
return self._window_agg(
1783817863
window_func=WindowFunction.EXPANDING,
@@ -17848,6 +17873,37 @@ def expanding_sum(
1784817873
numeric_only: bool = False,
1784917874
engine: Optional[Literal["cython", "numba"]] = None,
1785017875
engine_kwargs: Optional[dict[str, bool]] = None,
17876+
) -> "SnowflakeQueryCompiler":
17877+
"""
17878+
Wrapper around _expanding_sum_internal to be supported in faster pandas.
17879+
"""
17880+
relaxed_query_compiler = None
17881+
if self._relaxed_query_compiler is not None:
17882+
relaxed_query_compiler = (
17883+
self._relaxed_query_compiler._expanding_sum_internal(
17884+
fold_axis=fold_axis,
17885+
expanding_kwargs=expanding_kwargs,
17886+
numeric_only=numeric_only,
17887+
engine=engine,
17888+
engine_kwargs=engine_kwargs,
17889+
)
17890+
)
17891+
qc = self._expanding_sum_internal(
17892+
fold_axis=fold_axis,
17893+
expanding_kwargs=expanding_kwargs,
17894+
numeric_only=numeric_only,
17895+
engine=engine,
17896+
engine_kwargs=engine_kwargs,
17897+
)
17898+
return self._maybe_set_relaxed_qc(qc, relaxed_query_compiler)
17899+
17900+
def _expanding_sum_internal(
17901+
self,
17902+
fold_axis: Union[int, str],
17903+
expanding_kwargs: dict,
17904+
numeric_only: bool = False,
17905+
engine: Optional[Literal["cython", "numba"]] = None,
17906+
engine_kwargs: Optional[dict[str, bool]] = None,
1785117907
) -> "SnowflakeQueryCompiler":
1785217908
WarningMessage.warning_if_engine_args_is_set(
1785317909
"expanding_sum", engine, engine_kwargs
@@ -17866,6 +17922,37 @@ def expanding_mean(
1786617922
numeric_only: bool = False,
1786717923
engine: Optional[Literal["cython", "numba"]] = None,
1786817924
engine_kwargs: Optional[dict[str, bool]] = None,
17925+
) -> "SnowflakeQueryCompiler":
17926+
"""
17927+
Wrapper around _expanding_mean_internal to be supported in faster pandas.
17928+
"""
17929+
relaxed_query_compiler = None
17930+
if self._relaxed_query_compiler is not None:
17931+
relaxed_query_compiler = (
17932+
self._relaxed_query_compiler._expanding_mean_internal(
17933+
fold_axis=fold_axis,
17934+
expanding_kwargs=expanding_kwargs,
17935+
numeric_only=numeric_only,
17936+
engine=engine,
17937+
engine_kwargs=engine_kwargs,
17938+
)
17939+
)
17940+
qc = self._expanding_mean_internal(
17941+
fold_axis=fold_axis,
17942+
expanding_kwargs=expanding_kwargs,
17943+
numeric_only=numeric_only,
17944+
engine=engine,
17945+
engine_kwargs=engine_kwargs,
17946+
)
17947+
return self._maybe_set_relaxed_qc(qc, relaxed_query_compiler)
17948+
17949+
def _expanding_mean_internal(
17950+
self,
17951+
fold_axis: Union[int, str],
17952+
expanding_kwargs: dict,
17953+
numeric_only: bool = False,
17954+
engine: Optional[Literal["cython", "numba"]] = None,
17955+
engine_kwargs: Optional[dict[str, bool]] = None,
1786917956
) -> "SnowflakeQueryCompiler":
1787017957
WarningMessage.warning_if_engine_args_is_set(
1787117958
"expanding_mean", engine, engine_kwargs
@@ -17895,6 +17982,40 @@ def expanding_var(
1789517982
numeric_only: bool = False,
1789617983
engine: Optional[Literal["cython", "numba"]] = None,
1789717984
engine_kwargs: Optional[dict[str, bool]] = None,
17985+
) -> "SnowflakeQueryCompiler":
17986+
"""
17987+
Wrapper around _expanding_var_internal to be supported in faster pandas.
17988+
"""
17989+
relaxed_query_compiler = None
17990+
if self._relaxed_query_compiler is not None:
17991+
relaxed_query_compiler = (
17992+
self._relaxed_query_compiler._expanding_var_internal(
17993+
fold_axis=fold_axis,
17994+
expanding_kwargs=expanding_kwargs,
17995+
ddof=ddof,
17996+
numeric_only=numeric_only,
17997+
engine=engine,
17998+
engine_kwargs=engine_kwargs,
17999+
)
18000+
)
18001+
qc = self._expanding_var_internal(
18002+
fold_axis=fold_axis,
18003+
expanding_kwargs=expanding_kwargs,
18004+
ddof=ddof,
18005+
numeric_only=numeric_only,
18006+
engine=engine,
18007+
engine_kwargs=engine_kwargs,
18008+
)
18009+
return self._maybe_set_relaxed_qc(qc, relaxed_query_compiler)
18010+
18011+
def _expanding_var_internal(
18012+
self,
18013+
fold_axis: Union[int, str],
18014+
expanding_kwargs: dict,
18015+
ddof: int = 1,
18016+
numeric_only: bool = False,
18017+
engine: Optional[Literal["cython", "numba"]] = None,
18018+
engine_kwargs: Optional[dict[str, bool]] = None,
1789818019
) -> "SnowflakeQueryCompiler":
1789918020
WarningMessage.warning_if_engine_args_is_set(
1790018021
"rolling_var", engine, engine_kwargs
@@ -17914,6 +18035,40 @@ def expanding_std(
1791418035
numeric_only: bool = False,
1791518036
engine: Optional[Literal["cython", "numba"]] = None,
1791618037
engine_kwargs: Optional[dict[str, bool]] = None,
18038+
) -> "SnowflakeQueryCompiler":
18039+
"""
18040+
Wrapper around _expanding_std_internal to be supported in faster pandas.
18041+
"""
18042+
relaxed_query_compiler = None
18043+
if self._relaxed_query_compiler is not None:
18044+
relaxed_query_compiler = (
18045+
self._relaxed_query_compiler._expanding_std_internal(
18046+
fold_axis=fold_axis,
18047+
expanding_kwargs=expanding_kwargs,
18048+
ddof=ddof,
18049+
numeric_only=numeric_only,
18050+
engine=engine,
18051+
engine_kwargs=engine_kwargs,
18052+
)
18053+
)
18054+
qc = self._expanding_std_internal(
18055+
fold_axis=fold_axis,
18056+
expanding_kwargs=expanding_kwargs,
18057+
ddof=ddof,
18058+
numeric_only=numeric_only,
18059+
engine=engine,
18060+
engine_kwargs=engine_kwargs,
18061+
)
18062+
return self._maybe_set_relaxed_qc(qc, relaxed_query_compiler)
18063+
18064+
def _expanding_std_internal(
18065+
self,
18066+
fold_axis: Union[int, str],
18067+
expanding_kwargs: dict,
18068+
ddof: int = 1,
18069+
numeric_only: bool = False,
18070+
engine: Optional[Literal["cython", "numba"]] = None,
18071+
engine_kwargs: Optional[dict[str, bool]] = None,
1791718072
) -> "SnowflakeQueryCompiler":
1791818073
WarningMessage.warning_if_engine_args_is_set(
1791918074
"rolling_std", engine, engine_kwargs
@@ -17932,6 +18087,37 @@ def expanding_min(
1793218087
numeric_only: bool = False,
1793318088
engine: Optional[Literal["cython", "numba"]] = None,
1793418089
engine_kwargs: Optional[dict[str, bool]] = None,
18090+
) -> "SnowflakeQueryCompiler":
18091+
"""
18092+
Wrapper around _expanding_min_internal to be supported in faster pandas.
18093+
"""
18094+
relaxed_query_compiler = None
18095+
if self._relaxed_query_compiler is not None:
18096+
relaxed_query_compiler = (
18097+
self._relaxed_query_compiler._expanding_min_internal(
18098+
fold_axis=fold_axis,
18099+
expanding_kwargs=expanding_kwargs,
18100+
numeric_only=numeric_only,
18101+
engine=engine,
18102+
engine_kwargs=engine_kwargs,
18103+
)
18104+
)
18105+
qc = self._expanding_min_internal(
18106+
fold_axis=fold_axis,
18107+
expanding_kwargs=expanding_kwargs,
18108+
numeric_only=numeric_only,
18109+
engine=engine,
18110+
engine_kwargs=engine_kwargs,
18111+
)
18112+
return self._maybe_set_relaxed_qc(qc, relaxed_query_compiler)
18113+
18114+
def _expanding_min_internal(
18115+
self,
18116+
fold_axis: Union[int, str],
18117+
expanding_kwargs: dict,
18118+
numeric_only: bool = False,
18119+
engine: Optional[Literal["cython", "numba"]] = None,
18120+
engine_kwargs: Optional[dict[str, bool]] = None,
1793518121
) -> "SnowflakeQueryCompiler":
1793618122
WarningMessage.warning_if_engine_args_is_set(
1793718123
"expanding_min", engine, engine_kwargs
@@ -17950,6 +18136,37 @@ def expanding_max(
1795018136
numeric_only: bool = False,
1795118137
engine: Optional[Literal["cython", "numba"]] = None,
1795218138
engine_kwargs: Optional[dict[str, bool]] = None,
18139+
) -> "SnowflakeQueryCompiler":
18140+
"""
18141+
Wrapper around _expanding_max_internal to be supported in faster pandas.
18142+
"""
18143+
relaxed_query_compiler = None
18144+
if self._relaxed_query_compiler is not None:
18145+
relaxed_query_compiler = (
18146+
self._relaxed_query_compiler._expanding_max_internal(
18147+
fold_axis=fold_axis,
18148+
expanding_kwargs=expanding_kwargs,
18149+
numeric_only=numeric_only,
18150+
engine=engine,
18151+
engine_kwargs=engine_kwargs,
18152+
)
18153+
)
18154+
qc = self._expanding_max_internal(
18155+
fold_axis=fold_axis,
18156+
expanding_kwargs=expanding_kwargs,
18157+
numeric_only=numeric_only,
18158+
engine=engine,
18159+
engine_kwargs=engine_kwargs,
18160+
)
18161+
return self._maybe_set_relaxed_qc(qc, relaxed_query_compiler)
18162+
18163+
def _expanding_max_internal(
18164+
self,
18165+
fold_axis: Union[int, str],
18166+
expanding_kwargs: dict,
18167+
numeric_only: bool = False,
18168+
engine: Optional[Literal["cython", "numba"]] = None,
18169+
engine_kwargs: Optional[dict[str, bool]] = None,
1795318170
) -> "SnowflakeQueryCompiler":
1795418171
WarningMessage.warning_if_engine_args_is_set(
1795518172
"expanding_max", engine, engine_kwargs
@@ -18038,6 +18255,34 @@ def expanding_sem(
1803818255
expanding_kwargs: dict,
1803918256
ddof: int = 1,
1804018257
numeric_only: bool = False,
18258+
) -> "SnowflakeQueryCompiler":
18259+
"""
18260+
Wrapper around _expanding_sem_internal to be supported in faster pandas.
18261+
"""
18262+
relaxed_query_compiler = None
18263+
if self._relaxed_query_compiler is not None:
18264+
relaxed_query_compiler = (
18265+
self._relaxed_query_compiler._expanding_sem_internal(
18266+
fold_axis=fold_axis,
18267+
expanding_kwargs=expanding_kwargs,
18268+
ddof=ddof,
18269+
numeric_only=numeric_only,
18270+
)
18271+
)
18272+
qc = self._expanding_sem_internal(
18273+
fold_axis=fold_axis,
18274+
expanding_kwargs=expanding_kwargs,
18275+
ddof=ddof,
18276+
numeric_only=numeric_only,
18277+
)
18278+
return self._maybe_set_relaxed_qc(qc, relaxed_query_compiler)
18279+
18280+
def _expanding_sem_internal(
18281+
self,
18282+
fold_axis: Union[int, str],
18283+
expanding_kwargs: dict,
18284+
ddof: int = 1,
18285+
numeric_only: bool = False,
1804118286
) -> "SnowflakeQueryCompiler":
1804218287
return self._window_agg(
1804318288
window_func=WindowFunction.EXPANDING,

tests/integ/modin/test_faster_pandas.py

Lines changed: 46 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -473,6 +473,52 @@ def test_duplicated(session):
473473
assert_series_equal(snow_result, native_result)
474474

475475

476+
@pytest.mark.parametrize(
477+
"func",
478+
[
479+
"min",
480+
"max",
481+
"count",
482+
"sum",
483+
"mean",
484+
"std",
485+
"var",
486+
"sem",
487+
],
488+
)
489+
@sql_count_checker(query_count=3)
490+
def test_expanding(session, func):
491+
with session_parameter_override(
492+
session, "dummy_row_pos_optimization_enabled", True
493+
):
494+
# create tables
495+
table_name = Utils.random_name_for_temp_object(TempObjectType.TABLE)
496+
session.create_dataframe(
497+
native_pd.DataFrame([[1, 11], [2, 12], [3, 13]], columns=["A", "B"])
498+
).write.save_as_table(table_name, table_type="temp")
499+
500+
# create snow dataframes
501+
df = pd.read_snowflake(table_name)
502+
snow_result = getattr(df.expanding(), func)()
503+
504+
# verify that the input dataframe has a populated relaxed query compiler
505+
assert df._query_compiler._relaxed_query_compiler is not None
506+
assert df._query_compiler._relaxed_query_compiler._dummy_row_pos_mode is True
507+
# verify that the output dataframe also has a populated relaxed query compiler
508+
assert snow_result._query_compiler._relaxed_query_compiler is not None
509+
assert (
510+
snow_result._query_compiler._relaxed_query_compiler._dummy_row_pos_mode
511+
is True
512+
)
513+
514+
# create pandas dataframes
515+
native_df = df.to_pandas()
516+
native_result = getattr(native_df.expanding(), func)()
517+
518+
# compare results
519+
assert_frame_equal(snow_result, native_result, check_dtype=False)
520+
521+
476522
@pytest.mark.parametrize(
477523
"func",
478524
[

0 commit comments

Comments
 (0)