Skip to content
Merged
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
10 changes: 10 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -122,6 +122,16 @@
- `drop`
- `invert`
- `duplicated`
- `groupby.agg`
- `groupby.min`
- `groupby.max`
- `groupby.count`
- `groupby.sum`
- `groupby.mean`
- `groupby.median`
- `groupby.std`
- `groupby.var`

- Reuse row count from the relaxed query compiler in `get_axis_len`.

#### Bug Fixes
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4501,6 +4501,50 @@ def groupby_agg(
numeric_only: bool = False,
is_series_groupby: bool = False,
drop: bool = False,
) -> "SnowflakeQueryCompiler":
"""
Wrapper around _groupby_agg_internal to be supported in faster pandas.
"""
relaxed_query_compiler = None
if self._relaxed_query_compiler is not None:
relaxed_query_compiler = self._relaxed_query_compiler._groupby_agg_internal(
by=by,
agg_func=agg_func,
axis=axis,
groupby_kwargs=groupby_kwargs,
agg_args=agg_args,
agg_kwargs=agg_kwargs,
how=how,
numeric_only=numeric_only,
is_series_groupby=is_series_groupby,
drop=drop,
)
qc = self._groupby_agg_internal(
by=by,
agg_func=agg_func,
axis=axis,
groupby_kwargs=groupby_kwargs,
agg_args=agg_args,
agg_kwargs=agg_kwargs,
how=how,
numeric_only=numeric_only,
is_series_groupby=is_series_groupby,
drop=drop,
)
return self._maybe_set_relaxed_qc(qc, relaxed_query_compiler)

def _groupby_agg_internal(
self,
by: Any,
agg_func: AggFuncType,
axis: int,
groupby_kwargs: dict[str, Any],
agg_args: Any,
agg_kwargs: dict[str, Any],
how: str = "axis_wise",
numeric_only: bool = False,
is_series_groupby: bool = False,
drop: bool = False,
) -> "SnowflakeQueryCompiler":
"""
compute groupby with aggregation functions.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2642,7 +2642,7 @@ def seconds():
0 1
1 2
2 3
dtype: int64
dtype: int8
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why did data type change here?

Copy link
Contributor Author

@sfc-gh-helmeleegy sfc-gh-helmeleegy Oct 17, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm not 100% sure. But the sql queries look a bit different in faster pandas compared to the original snowpark pandas. So the new order of sql operations may have caused the typing system on the server side to infer a slightly different final type.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Are you sure this is not related to the snowflake-connector upgrade? This feels like an arrow change.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I see. In this case, let's fix this in a separate PR since the root cause is not related to this PR.
Here is the separate fix PR: #3911
@sfc-gh-jkew @sfc-gh-joshi @sfc-gh-nkrishna


For TimedeltaIndex:

Expand Down Expand Up @@ -2702,7 +2702,7 @@ def microseconds():
0 1
1 2
2 3
dtype: int64
dtype: int8

For TimedeltaIndex:

Expand Down Expand Up @@ -2734,7 +2734,7 @@ def nanoseconds():
0 1
1 2
2 3
dtype: int64
dtype: int8

For TimedeltaIndex:

Expand Down
67 changes: 61 additions & 6 deletions tests/integ/modin/test_faster_pandas.py
Original file line number Diff line number Diff line change
Expand Up @@ -123,9 +123,9 @@ def test_read_filter_join_on_index(session):
)


@sql_count_checker(query_count=3)
def test_read_filter_groupby_agg(session):
# test a chain of operations that are not fully supported in faster pandas
@sql_count_checker(query_count=3, join_count=2)
def test_read_filter_iloc_index(session):
# test a chain of operations that are not yet fully supported in faster pandas

# create tables
table_name = Utils.random_name_for_temp_object(TempObjectType.TABLE)
Expand All @@ -135,19 +135,19 @@ def test_read_filter_groupby_agg(session):

# create snow dataframes
df = pd.read_snowflake(table_name)
snow_result = df[df["B"] > 11].groupby("A").min()
snow_result = df.iloc[[0], :]

# verify that the input dataframe has a populated relaxed query compiler
assert df._query_compiler._relaxed_query_compiler is not None
assert df._query_compiler._relaxed_query_compiler._dummy_row_pos_mode is True
# verify that the output dataframe has an empty relaxed query compiler
# because groupby() and min() are not supported in faster pandas yet
# because iloc for index is not supported in faster pandas yet
assert snow_result._query_compiler._relaxed_query_compiler is None
assert snow_result._query_compiler._dummy_row_pos_mode is False

# create pandas dataframes
native_df = df.to_pandas()
native_result = native_df[native_df["B"] > 11].groupby("A").min()
native_result = native_df.iloc[[0], :]

# compare results
assert_frame_equal(snow_result, native_result)
Expand Down Expand Up @@ -252,6 +252,61 @@ def test_duplicated(session):
assert_series_equal(snow_result, native_result)


@pytest.mark.parametrize(
"func",
[
"min",
"max",
"count",
"sum",
"mean",
"median",
"std",
"var",
],
)
@sql_count_checker(query_count=6)
def test_groupby_agg(session, func):
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It seems like a lot of the fasters pandas tests have some pretty similar setup/assert/teardown code. Could you add some of that logic to a shared logic/fixture in the future?

# create tables
table_name = Utils.random_name_for_temp_object(TempObjectType.TABLE)
session.create_dataframe(
native_pd.DataFrame([[2, 12], [2, 11], [3, 13]], columns=["A", "B"])
).write.save_as_table(table_name, table_type="temp")

# create snow dataframes
df = pd.read_snowflake(table_name)
snow_result1 = getattr(df.groupby("A"), func)()
snow_result2 = df.groupby("A").agg([func])
snow_result3 = getattr(df.groupby("A")["B"], func)()
snow_result4 = df.groupby("A")["B"].agg([func])

# verify that the input dataframe has a populated relaxed query compiler
assert df._query_compiler._relaxed_query_compiler is not None
assert df._query_compiler._relaxed_query_compiler._dummy_row_pos_mode is True
# verify that the output dataframe also has a populated relaxed query compiler
assert snow_result1._query_compiler._relaxed_query_compiler is not None
assert (
snow_result1._query_compiler._relaxed_query_compiler._dummy_row_pos_mode is True
)
assert snow_result2._query_compiler._relaxed_query_compiler is not None
assert (
snow_result2._query_compiler._relaxed_query_compiler._dummy_row_pos_mode is True
)

# create pandas dataframes
native_df = df.to_pandas()
native_result1 = getattr(native_df.groupby("A"), func)()
native_result2 = native_df.groupby("A").agg([func])
native_result3 = getattr(native_df.groupby("A")["B"], func)()
native_result4 = native_df.groupby("A")["B"].agg([func])

# compare results
assert_frame_equal(snow_result1, native_result1, check_dtype=False)
assert_frame_equal(snow_result2, native_result2, check_dtype=False)
assert_series_equal(snow_result3, native_result3, check_dtype=False)
assert_frame_equal(snow_result4, native_result4, check_dtype=False)


@sql_count_checker(query_count=3)
def test_invert(session):
# create tables
Expand Down
Loading