Skip to content
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
9 changes: 9 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -112,6 +112,15 @@
- `loc` (setting columns)
- `to_datetime`
- `drop`
- `agg`
- `min`
- `max`
- `count`
- `sum`
- `mean`
- `median`
- `std`
- `var`
- Reuse row count from the relaxed query compiler in `get_axis_len`.

#### Bug Fixes
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2314,6 +2314,20 @@ def cache_result(self) -> "SnowflakeQueryCompiler":

@snowpark_pandas_type_immutable_check
def set_columns(self, new_pandas_labels: Axes) -> "SnowflakeQueryCompiler":
Copy link
Contributor Author

@sfc-gh-helmeleegy sfc-gh-helmeleegy Oct 15, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This was necessary for the aggregate functions to work in faster pandas.

"""
Wrapper around _set_columns_internal to be supported in faster pandas.
"""
relaxed_query_compiler = None
if self._relaxed_query_compiler is not None:
relaxed_query_compiler = self._relaxed_query_compiler._set_columns_internal(
new_pandas_labels=new_pandas_labels
)
qc = self._set_columns_internal(new_pandas_labels=new_pandas_labels)
return self._maybe_set_relaxed_qc(qc, relaxed_query_compiler)

def _set_columns_internal(
self, new_pandas_labels: Axes
) -> "SnowflakeQueryCompiler":
"""
Set pandas column labels with the new column labels

Expand Down Expand Up @@ -7103,6 +7117,33 @@ def agg(
axis: int,
args: Any,
kwargs: dict[str, Any],
) -> "SnowflakeQueryCompiler":
"""
Wrapper around _agg_internal to be supported in faster pandas.
"""
relaxed_query_compiler = None
if self._relaxed_query_compiler is not None:
relaxed_query_compiler = self._relaxed_query_compiler._agg_internal(
func=func,
axis=axis,
args=args,
kwargs=kwargs,
)
qc = self._agg_internal(
func=func,
axis=axis,
args=args,
kwargs=kwargs,
)
qc = self._maybe_set_relaxed_qc(qc, relaxed_query_compiler)
return qc

def _agg_internal(
self,
func: AggFuncType,
axis: int,
args: Any,
kwargs: dict[str, Any],
) -> "SnowflakeQueryCompiler":
"""
Aggregate using one or more operations over the specified axis.
Expand Down
86 changes: 86 additions & 0 deletions tests/integ/modin/test_faster_pandas.py
Original file line number Diff line number Diff line change
Expand Up @@ -194,6 +194,61 @@ def test_read_filter_join_flag_disabled(session):
assert_frame_equal(snow_result, native_result)


@pytest.mark.parametrize(
"func",
[
"min",
"max",
"count",
"sum",
"mean",
"median",
"std",
"var",
],
)
@sql_count_checker(query_count=6)
def test_agg(session, func):
# create tables
table_name = Utils.random_name_for_temp_object(TempObjectType.TABLE)
session.create_dataframe(
native_pd.DataFrame([[2, 12], [1, 11], [3, 13]], columns=["A", "B"])
).write.save_as_table(table_name, table_type="temp")

# create snow dataframes
df = pd.read_snowflake(table_name)
snow_result1 = getattr(df, func)()
snow_result2 = df.agg([func])
snow_result3 = getattr(df["B"], func)()
snow_result4 = df["B"].agg([func])

# verify that the input dataframe has a populated relaxed query compiler
assert df._query_compiler._relaxed_query_compiler is not None
assert df._query_compiler._relaxed_query_compiler._dummy_row_pos_mode is True
# verify that the output dataframe also has a populated relaxed query compiler
assert snow_result1._query_compiler._relaxed_query_compiler is not None
assert (
snow_result1._query_compiler._relaxed_query_compiler._dummy_row_pos_mode is True
)
assert snow_result2._query_compiler._relaxed_query_compiler is not None
assert (
snow_result2._query_compiler._relaxed_query_compiler._dummy_row_pos_mode is True
)

# create pandas dataframes
native_df = df.to_pandas()
native_result1 = getattr(native_df, func)()
native_result2 = native_df.agg([func])
native_result3 = getattr(native_df["B"], func)()
native_result4 = native_df["B"].agg([func])

# compare results
assert_series_equal(snow_result1, native_result1, check_dtype=False)
assert_frame_equal(snow_result2, native_result2, check_dtype=False)
assert snow_result3 == native_result3
assert_series_equal(snow_result4, native_result4, check_dtype=False)


@sql_count_checker(query_count=3)
def test_drop(session):
# create tables
Expand Down Expand Up @@ -604,6 +659,37 @@ def test_set_2d_labels_from_different_df(session, input_df2):
assert_frame_equal(snow_result, native_result)


@sql_count_checker(query_count=3)
def test_set_columns(session):
# create tables
table_name = Utils.random_name_for_temp_object(TempObjectType.TABLE)
session.create_dataframe(
native_pd.DataFrame([[2, 12], [1, 11], [3, 13]], columns=["A", "B"])
).write.save_as_table(table_name, table_type="temp")

# create snow dataframes
df = pd.read_snowflake(table_name)
snow_result = df
snow_result.columns = ["X", "Y"]

# verify that the input dataframe has a populated relaxed query compiler
assert df._query_compiler._relaxed_query_compiler is not None
assert df._query_compiler._relaxed_query_compiler._dummy_row_pos_mode is True
# verify that the output dataframe also has a populated relaxed query compiler
assert snow_result._query_compiler._relaxed_query_compiler is not None
assert (
snow_result._query_compiler._relaxed_query_compiler._dummy_row_pos_mode is True
)

# create pandas dataframes
native_df = df.to_pandas()
native_result = native_df
native_result.columns = ["X", "Y"]

# compare results
assert_frame_equal(snow_result, native_result)


@sql_count_checker(query_count=3)
def test_dataframe_to_datetime(session):
# create tables
Expand Down
Loading