Skip to content

Commit c50cdeb

Browse files
SNOW-2189056: Support some snowflake extensions on other backends. (#3683)
Make sure that we support the function and method variants of `to_snowflake()`, `to_snowpark()`, `to_iceberg()`, `read_snowflake()`, and `to_pandas()` on the pandas and Ray backends. Prior to this commit, we only supported some of these functions and methods on the Pandas backend. Fixes SNOW-2189056 Signed-off-by: sfc-gh-mvashishtha <mahesh.vashishtha@snowflake.com> Co-authored-by: Hazem Elmeleegy <hazem.elmeleegy@snowflake.com>
1 parent 6acfee9 commit c50cdeb

File tree

7 files changed

+468
-109
lines changed

7 files changed

+468
-109
lines changed

CHANGELOG.md

Lines changed: 13 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -27,13 +27,26 @@
2727
### Snowpark pandas API Updates
2828

2929
#### New Features
30+
- Completed support for `pd.read_snowflake()`, `pd.to_iceberg()`,
31+
`pd.to_pandas()`, `pd.to_snowpark()`, `pd.to_snowflake()`,
32+
`DataFrame.to_iceberg()`, `DataFrame.to_pandas()`, `DataFrame.to_snowpark()`,
33+
`DataFrame.to_snowflake()`, `Series.to_iceberg()`, `Series.to_pandas()`,
34+
`Series.to_snowpark()`, and `Series.to_snowflake()` on the "Pandas" and "Ray"
35+
backends. Previously, only some of these functions and methods were supported
36+
on the Pandas backend.
3037

3138
#### Improvements
3239
- Set the default transfer limit in hybrid execution for data leaving Snowflake to 100k, which can be overridden with the SnowflakePandasTransferThreshold environment variable. This configuration is appropriate for scenarios with two available engines, "Pandas" and "Snowflake" on relational workloads.
3340
- Improve import error message by adding '--upgrade' to 'pip install "snowflake-snowpark-python[modin]"' in the error message.
3441

3542
#### Dependency Updates
43+
3644
#### Bug Fixes
45+
- Raised `NotImplementedError` instead of `AttributeError` on attempting to call
46+
Snowflake extension functions/methods `to_dynamic_table()`, `cache_result()`,
47+
`to_view()`, `create_or_replace_dynamic_table()`, and
48+
`create_or_replace_view()` on dataframes or series using the pandas or ray
49+
backends.
3750

3851
## 1.37.0 (2025-08-18)
3952

src/snowflake/snowpark/modin/plugin/extensions/dataframe_extensions.py

Lines changed: 6 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -21,7 +21,10 @@
2121
from snowflake.snowpark._internal.type_utils import ColumnOrName
2222
from snowflake.snowpark.async_job import AsyncJob
2323
from snowflake.snowpark.dataframe import DataFrame as SnowparkDataFrame
24-
from snowflake.snowpark.modin.plugin.extensions.utils import add_cache_result_docstring
24+
from snowflake.snowpark.modin.plugin.extensions.utils import (
25+
add_cache_result_docstring,
26+
register_non_snowflake_accessors,
27+
)
2528
from snowflake.snowpark.modin.plugin.utils.warning_message import (
2629
materialization_warning,
2730
)
@@ -30,17 +33,8 @@
3033
register_dataframe_accessor = functools.partial(
3134
_register_dataframe_accessor, backend="Snowflake"
3235
)
33-
_register_dataframe_accessor(name="to_pandas", backend="Pandas")(
34-
pd.DataFrame._to_pandas
35-
)
36-
_register_dataframe_accessor(name="to_snowflake", backend="Pandas")(
37-
lambda self, *args, **kwargs: self.move_to("Snowflake").to_snowflake(
38-
*args, **kwargs
39-
)
40-
)
41-
_register_dataframe_accessor(name="to_snowpark", backend="Pandas")(
42-
lambda self, *args, **kwargs: self.move_to("Snowflake").to_snowpark(*args, **kwargs)
43-
)
36+
37+
register_non_snowflake_accessors(_register_dataframe_accessor, "DataFrame")
4438

4539

4640
# Snowflake specific dataframe methods

src/snowflake/snowpark/modin/plugin/extensions/pd_extensions.py

Lines changed: 80 additions & 22 deletions
Original file line numberDiff line numberDiff line change
@@ -6,6 +6,7 @@
66
File containing top-level APIs defined in Snowpark pandas but not the Modin API layer
77
under the `pd` namespace, such as `pd.read_snowflake`.
88
"""
9+
from functools import wraps
910
from typing import Any, Iterable, List, Literal, Optional, Union
1011

1112
from modin.pandas import DataFrame, Series
@@ -16,7 +17,7 @@
1617
from snowflake.snowpark._internal.type_utils import ColumnOrName
1718
from snowflake.snowpark.async_job import AsyncJob
1819
from snowflake.snowpark.row import Row
19-
from .general_overrides import register_pd_accessor
20+
from .general_overrides import register_pd_accessor as register_snowflake_accessor
2021
from pandas._typing import IndexLabel
2122
import pandas as native_pd
2223
from snowflake.snowpark import DataFrame as SnowparkDataFrame
@@ -36,16 +37,32 @@
3637
)
3738

3839

39-
register_pd_accessor("Index")(Index)
40-
register_pd_accessor("DatetimeIndex")(DatetimeIndex)
41-
register_pd_accessor("TimedeltaIndex")(TimedeltaIndex)
40+
register_snowflake_accessor("Index")(Index)
41+
register_snowflake_accessor("DatetimeIndex")(DatetimeIndex)
42+
register_snowflake_accessor("TimedeltaIndex")(TimedeltaIndex)
4243

4344

4445
def _snowpark_pandas_obj_check(obj: Union[DataFrame, Series]):
4546
if not isinstance(obj, (DataFrame, Series)):
4647
raise TypeError("obj must be a Snowpark pandas DataFrame or Series")
4748

4849

50+
def _check_obj_and_set_backend_to_snowflake(
51+
obj: Any,
52+
) -> Union[Series, DataFrame]:
53+
"""
54+
Check if the object is a Snowpark pandas object and set the backend to Snowflake.
55+
56+
Args:
57+
obj: The object to be checked and moved to Snowflake backend.
58+
59+
Returns:
60+
The Series or DataFrame on the Snowflake backend.
61+
"""
62+
_snowpark_pandas_obj_check(obj)
63+
return obj.set_backend("Snowflake") if obj.get_backend() != "Snowflake" else obj
64+
65+
4966
# Use a template string so that we can share it between the read_snowflake
5067
# functions on the Snowflake and Pandas backends. We can't use the exact same
5168
# docstring because each doctest creates and inserts to a temp table, and also
@@ -397,7 +414,7 @@ def _snowpark_pandas_obj_check(obj: Union[DataFrame, Series]):
397414
"""
398415

399416

400-
@register_pd_accessor("read_snowflake")
417+
@register_snowflake_accessor("read_snowflake")
401418
@doc(
402419
_READ_SNOWFLAKE_DOC,
403420
table_name="RESULT_0",
@@ -441,7 +458,26 @@ def _read_snowflake_pandas_backend(
441458
return df.set_backend("Pandas")
442459

443460

444-
@register_pd_accessor("to_snowflake")
461+
@_register_pd_accessor("read_snowflake", backend="Ray")
462+
@doc(
463+
_READ_SNOWFLAKE_DOC,
464+
table_name="RESULT_2",
465+
stored_procedure_name="MULTIPLY_COL_BY_VALUE_2",
466+
)
467+
def _read_snowflake_ray_backend(
468+
name_or_query, index_col=None, columns=None, enforce_ordering=False
469+
) -> pd.DataFrame:
470+
with config_context(Backend="Snowflake"):
471+
df = pd.read_snowflake(
472+
name_or_query,
473+
index_col=index_col,
474+
columns=columns,
475+
enforce_ordering=enforce_ordering,
476+
)
477+
return df.set_backend("Ray")
478+
479+
480+
@_register_pd_accessor("to_snowflake")
445481
def to_snowflake(
446482
obj: Union[DataFrame, Series],
447483
name: Union[str, Iterable[str]],
@@ -477,14 +513,12 @@ def to_snowflake(
477513
- :func:`Series.to_snowflake <modin.pandas.Series.to_snowflake>`
478514
- :func:`read_snowflake <modin.pandas.read_snowflake>`
479515
"""
480-
_snowpark_pandas_obj_check(obj)
481-
482-
return obj._query_compiler.to_snowflake(
516+
return _check_obj_and_set_backend_to_snowflake(obj)._query_compiler.to_snowflake(
483517
name, if_exists, index, index_label, table_type
484518
)
485519

486520

487-
@register_pd_accessor("to_snowpark")
521+
@_register_pd_accessor("to_snowpark")
488522
def to_snowpark(
489523
obj: Union[DataFrame, Series],
490524
index: bool = True,
@@ -628,12 +662,12 @@ def to_snowpark(
628662
------------
629663
<BLANKLINE>
630664
"""
631-
_snowpark_pandas_obj_check(obj)
632-
633-
return obj._query_compiler.to_snowpark(index, index_label)
665+
return _check_obj_and_set_backend_to_snowflake(obj)._query_compiler.to_snowpark(
666+
index, index_label
667+
)
634668

635669

636-
@register_pd_accessor("to_pandas")
670+
@register_snowflake_accessor("to_pandas")
637671
@materialization_warning
638672
def to_pandas(
639673
obj: Union[DataFrame, Series],
@@ -678,7 +712,7 @@ def to_pandas(
678712
return obj.to_pandas(statement_params=statement_params, *kwargs)
679713

680714

681-
@register_pd_accessor("to_view")
715+
@register_snowflake_accessor("to_view")
682716
def to_view(
683717
obj: Union[DataFrame, Series],
684718
name: Union[str, Iterable[str]],
@@ -709,7 +743,6 @@ def to_view(
709743
then the index names are used. A sequence should be given if the DataFrame uses MultiIndex.
710744
"""
711745
_snowpark_pandas_obj_check(obj)
712-
713746
return obj.to_view(
714747
name=name,
715748
comment=comment,
@@ -718,7 +751,7 @@ def to_view(
718751
)
719752

720753

721-
@register_pd_accessor("to_dynamic_table")
754+
@register_snowflake_accessor("to_dynamic_table")
722755
def to_dynamic_table(
723756
obj: Union[DataFrame, Series],
724757
name: Union[str, Iterable[str]],
@@ -791,7 +824,6 @@ def to_dynamic_table(
791824
for more details on refresh mode.
792825
"""
793826
_snowpark_pandas_obj_check(obj)
794-
795827
return obj.to_dynamic_table(
796828
name=name,
797829
warehouse=warehouse,
@@ -810,7 +842,7 @@ def to_dynamic_table(
810842
)
811843

812844

813-
@register_pd_accessor("to_iceberg")
845+
@_register_pd_accessor("to_iceberg")
814846
def to_iceberg(
815847
obj: Union[DataFrame, Series],
816848
table_name: Union[str, Iterable[str]],
@@ -905,8 +937,7 @@ def to_iceberg(
905937
... }
906938
>>> pd.to_iceberg(df.to_snowpark_pandas(), "my_table", iceberg_config=iceberg_config, mode="overwrite") # doctest: +SKIP
907939
"""
908-
_snowpark_pandas_obj_check(obj)
909-
return obj._query_compiler.to_iceberg(
940+
return _check_obj_and_set_backend_to_snowflake(obj)._query_compiler.to_iceberg(
910941
table_name=table_name,
911942
iceberg_config=iceberg_config,
912943
mode=mode,
@@ -924,7 +955,34 @@ def to_iceberg(
924955
)
925956

926957

927-
@register_pd_accessor("explain_switch")
958+
def _make_unimplemented_extension(name: str, to_wrap: callable):
959+
"""
960+
Make an extension for an unimplemented function.
961+
962+
Args:
963+
name: The name of the function.
964+
to_wrap: The function to wrap.
965+
966+
Returns:
967+
A function that raises NotImplementedError.
968+
"""
969+
970+
@wraps(to_wrap)
971+
def _unimplemented_extension(obj, *args, **kwargs):
972+
_snowpark_pandas_obj_check(obj)
973+
# Let the object take care of raising the NotImplementedError.
974+
return getattr(obj, name)(*args, **kwargs)
975+
976+
return _unimplemented_extension
977+
978+
979+
for function in (to_dynamic_table, to_view):
980+
_register_pd_accessor(name=function.__name__, backend=None)(
981+
_make_unimplemented_extension(name=function.__name__, to_wrap=function)
982+
)
983+
984+
985+
@register_snowflake_accessor("explain_switch")
928986
def explain_switch(simple=True) -> Union[native_pd.DataFrame, None]:
929987
"""
930988
Shows a log of all backend switching decisions made by Snowpark pandas.

src/snowflake/snowpark/modin/plugin/extensions/series_extensions.py

Lines changed: 6 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -21,7 +21,10 @@
2121
from snowflake.snowpark._internal.type_utils import ColumnOrName
2222
from snowflake.snowpark.async_job import AsyncJob
2323
from snowflake.snowpark.dataframe import DataFrame as SnowparkDataFrame
24-
from snowflake.snowpark.modin.plugin.extensions.utils import add_cache_result_docstring
24+
from snowflake.snowpark.modin.plugin.extensions.utils import (
25+
add_cache_result_docstring,
26+
register_non_snowflake_accessors,
27+
)
2528
from snowflake.snowpark.modin.plugin.utils.warning_message import (
2629
materialization_warning,
2730
)
@@ -31,15 +34,8 @@
3134
register_series_accessor = functools.partial(
3235
_register_series_accessor, backend="Snowflake"
3336
)
34-
_register_series_accessor(name="to_pandas", backend="Pandas")(pd.Series._to_pandas)
35-
_register_series_accessor("to_snowflake", backend="Pandas")(
36-
lambda self, *args, **kwargs: self.move_to("Snowflake").to_snowflake(
37-
*args, **kwargs
38-
)
39-
)
40-
_register_series_accessor("to_snowpark", backend="Pandas")(
41-
lambda self, *args, **kwargs: self.move_to("Snowflake").to_snowpark(*args, **kwargs)
42-
)
37+
38+
register_non_snowflake_accessors(_register_series_accessor, "Series")
4339

4440

4541
@register_series_accessor("_set_axis_name")

src/snowflake/snowpark/modin/plugin/extensions/utils.py

Lines changed: 80 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -9,7 +9,7 @@
99

1010
from collections.abc import Hashable, Sequence
1111
from types import BuiltinFunctionType
12-
from typing import Any, Callable
12+
from typing import Any, Callable, NoReturn
1313

1414
import modin.pandas as pd
1515
import numpy as np
@@ -580,3 +580,82 @@ def try_convert_index_to_native(index_like: Any) -> Any:
580580
if isinstance(index_like, Index):
581581
index_like = index_like.to_pandas()
582582
return index_like
583+
584+
585+
def _make_non_snowflake_accessor(method: str) -> Callable:
586+
"""
587+
Create a method that moves the object to Snowflake backend and calls the specified method.
588+
589+
Args:
590+
method: The method name to call on the Snowflake-backend object.
591+
592+
Returns:
593+
A method that moves the object to Snowflake and calls the method.
594+
"""
595+
return lambda self, *args, **kwargs: getattr(self.move_to("Snowflake"), method)(
596+
*args, **kwargs
597+
)
598+
599+
600+
def _make_non_snowflake_not_implemented_accessor(
601+
backend: str, method: str, object_type: str
602+
) -> Callable[..., NoReturn]:
603+
"""
604+
Create a function that raises NotImplementedError for unsupported backend/method combinations.
605+
606+
Args:
607+
backend: The backend name (e.g., "Pandas", "Ray").
608+
method: The method name that is not supported.
609+
object_type: The object type ("DataFrame" or "Series").
610+
611+
Returns:
612+
A function that raises NotImplementedError when called.
613+
"""
614+
615+
def raise_error(self, *args, **kwargs) -> NoReturn:
616+
raise NotImplementedError(
617+
f"Modin supports the method {object_type}.{method} on the Snowflake backend, but not on the backend {backend}."
618+
)
619+
620+
return raise_error
621+
622+
623+
def register_non_snowflake_accessors(
624+
register_accessor_func: Callable, object_type: str
625+
) -> None:
626+
"""
627+
Register accessors for non-Snowflake backends.
628+
629+
Args:
630+
register_accessor_func: The function that registers the accessor.
631+
object_type: The name of the object type.
632+
"""
633+
for backend in ("Pandas", "Ray"):
634+
register_accessor_func(name="to_pandas", backend=backend)(
635+
pd.DataFrame._to_pandas
636+
if object_type == "DataFrame"
637+
else pd.Series._to_pandas
638+
)
639+
640+
# Register methods that move to Snowflake backend
641+
# TODO(SNOW-2288765): Improve performance of to_snowflake() by writing
642+
# to a Snowflake table directly instead of going through Snowpark
643+
# pandas.
644+
for method in ("to_snowflake", "to_snowpark", "to_iceberg"):
645+
register_accessor_func(name=method, backend=backend)(
646+
_make_non_snowflake_accessor(method=method)
647+
)
648+
649+
# Register methods that are not implemented on non-Snowflake backends
650+
for method in (
651+
"create_or_replace_view",
652+
"create_or_replace_dynamic_table",
653+
"to_view",
654+
"to_dynamic_table",
655+
"cache_result",
656+
):
657+
register_accessor_func(name=method, backend=backend)(
658+
_make_non_snowflake_not_implemented_accessor(
659+
backend=backend, method=method, object_type=object_type
660+
)
661+
)

0 commit comments

Comments
 (0)