Skip to content
Merged
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -144,6 +144,7 @@
- `cumsum`
- `cummin`
- `cummax`
- `to_snowpark`
- Make faster pandas disabled by default (opt-in instead of opt-out).
- Improve performance of `drop_duplicates` by avoiding joins when `keep!=False` in faster pandas.

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -833,9 +833,8 @@ def _raise_not_implemented_error_for_timedelta(
method = inspect.currentframe().f_back.f_back.f_code.co_name # type: ignore[union-attr]
ErrorMessage.not_implemented_for_timedelta(method)

def _warn_lost_snowpark_pandas_type(self) -> None:
def _warn_lost_snowpark_pandas_type(self, method: str) -> None:
"""Warn Snowpark pandas type can be lost in current operation."""
method = inspect.currentframe().f_back.f_back.f_code.co_name # type: ignore[union-attr]
snowpark_pandas_types = [
type(t).__name__
for t in set(
Expand Down Expand Up @@ -2281,7 +2280,7 @@ def to_snowflake(
index_label: Optional[IndexLabel] = None,
table_type: Literal["", "temp", "temporary", "transient"] = "",
) -> None:
self._warn_lost_snowpark_pandas_type()
self._warn_lost_snowpark_pandas_type("to_snowflake")
handle_if_exists_for_to_snowflake(if_exists=if_exists, name=name)

if if_exists == "fail":
Expand All @@ -2302,6 +2301,22 @@ def to_snowflake(

def to_snowpark(
self, index: bool = True, index_label: Optional[IndexLabel] = None
) -> SnowparkDataFrame:
"""
Wrapper around _to_snowpark_internal to be supported in faster pandas.
"""
if self._relaxed_query_compiler is not None and not index:
return self._relaxed_query_compiler._to_snowpark_internal(
index=index,
index_label=index_label,
)
return self._to_snowpark_internal(
index=index,
index_label=index_label,
)

def _to_snowpark_internal(
self, index: bool = True, index_label: Optional[IndexLabel] = None
) -> SnowparkDataFrame:
"""
Convert the Snowpark pandas Dataframe to Snowpark Dataframe. The Snowpark Dataframe is created by selecting
Expand All @@ -2319,7 +2334,7 @@ def to_snowpark(

For details, please see comment in _to_snowpark_dataframe_of_pandas_dataframe.
"""
self._warn_lost_snowpark_pandas_type()
self._warn_lost_snowpark_pandas_type("to_snowpark")

return self._to_snowpark_dataframe_from_snowpark_pandas_dataframe(
index, index_label
Expand Down
34 changes: 34 additions & 0 deletions tests/integ/modin/test_faster_pandas.py
Original file line number Diff line number Diff line change
Expand Up @@ -1673,6 +1673,40 @@ def test_series_to_datetime(session):
assert_series_equal(snow_result, native_result)


@sql_count_checker(query_count=3)
def test_to_snowpark(session):
with session_parameter_override(
session, "dummy_row_pos_optimization_enabled", True
):
# create tables
table_name = Utils.random_name_for_temp_object(TempObjectType.TABLE)
session.create_dataframe(
native_pd.DataFrame([[2, 12], [1, 11], [3, 13]], columns=["A", "B"])
).write.save_as_table(table_name, table_type="temp")

# create snow dataframes
df = pd.read_snowflake(table_name)
sdf = df.to_snowpark(index=False)
snow_result = sdf.to_snowpark_pandas()

# verify that the input dataframe has a populated relaxed query compiler
assert df._query_compiler._relaxed_query_compiler is not None
assert df._query_compiler._relaxed_query_compiler._dummy_row_pos_mode is True
# verify that the output dataframe also has a populated relaxed query compiler
assert snow_result._query_compiler._relaxed_query_compiler is not None
assert (
snow_result._query_compiler._relaxed_query_compiler._dummy_row_pos_mode
is True
)

# create pandas dataframes
native_df = df.to_pandas()
native_result = native_df

# compare results
assert_frame_equal(snow_result, native_result)


@sql_count_checker(query_count=0)
def test_dummy_row_pos_optimization_enabled_on_session(db_parameters):
with Session.builder.configs(db_parameters).create() as new_session:
Expand Down