Skip to content

Commit 9d0f435

Browse files
SNOW-2022593, SNOW-2022613: Add support for DataFrame/Series.to_view (#3238)
1 parent c6ea589 commit 9d0f435

File tree

7 files changed

+466
-4
lines changed

7 files changed

+466
-4
lines changed

CHANGELOG.md

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -29,6 +29,7 @@
2929

3030
- Added support for `DataFrame.create_or_replace_view` and `Series.create_or_replace_view`.
3131
- Added support for `DataFrame.create_or_replace_dynamic_table` and `Series.create_or_replace_dynamic_table`.
32+
- Added support for `DataFrame.to_view` and `Series.to_view`.
3233
- Added support for `DataFrame.to_dynamic_table` and `Series.to_dynamic_table`.
3334

3435
#### Improvements

docs/source/modin/dataframe.rst

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -40,6 +40,7 @@ DataFrame
4040
DataFrame.cache_result
4141
DataFrame.create_or_replace_view
4242
DataFrame.create_or_replace_dynamic_table
43+
DataFrame.to_view
4344
DataFrame.to_dynamic_table
4445

4546
.. rubric:: Conversion

docs/source/modin/series.rst

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -46,6 +46,7 @@ Series
4646
Series.cache_result
4747
Series.create_or_replace_view
4848
Series.create_or_replace_dynamic_table
49+
Series.to_view
4950
Series.to_dynamic_table
5051

5152
.. rubric:: Conversion

src/snowflake/snowpark/modin/plugin/extensions/dataframe_extensions.py

Lines changed: 37 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -345,7 +345,6 @@ def create_or_replace_dynamic_table(
345345
max_data_extension_time: Specifies the maximum number of days for which Snowflake can extend
346346
the data retention period of the dynamic table to prevent streams on the dynamic table
347347
from becoming stale.
348-
statement_params: Dictionary of statement level parameters to be set while executing this action.
349348
iceberg_config: A dictionary that can contain the following iceberg configuration values:
350349
351350
- external_volume: specifies the identifier for the external volume where
@@ -383,6 +382,43 @@ def create_or_replace_dynamic_table(
383382
)
384383

385384

385+
@register_dataframe_accessor("to_view")
386+
def to_view(
387+
self,
388+
name: Union[str, Iterable[str]],
389+
*,
390+
comment: Optional[str] = None,
391+
index: bool = True,
392+
index_label: Optional[IndexLabel] = None,
393+
) -> List[Row]:
394+
"""
395+
Creates a view that captures the computation expressed by this DataFrame.
396+
397+
For ``name``, you can include the database and schema name (i.e. specify a
398+
fully-qualified name). If no database name or schema name are specified, the
399+
view will be created in the current database or schema.
400+
401+
``name`` must be a valid `Snowflake identifier <https://docs.snowflake.com/en/sql-reference/identifiers-syntax.html>`_.
402+
403+
Args:
404+
name: The name of the view to create or replace. Can be a list of strings
405+
that specifies the database name, schema name, and view name.
406+
comment: Adds a comment for the created view. See
407+
`COMMENT <https://docs.snowflake.com/en/sql-reference/sql/comment>`_.
408+
index: default True
409+
If true, save DataFrame index columns in view columns.
410+
index_label:
411+
Column label for index column(s). If None is given (default) and index is True,
412+
then the index names are used. A sequence should be given if the DataFrame uses MultiIndex.
413+
"""
414+
return self.create_or_replace_view(
415+
name=name,
416+
comment=comment,
417+
index=index,
418+
index_label=index_label,
419+
)
420+
421+
386422
@register_dataframe_accessor("to_dynamic_table")
387423
def to_dynamic_table(
388424
self,
@@ -435,7 +471,6 @@ def to_dynamic_table(
435471
max_data_extension_time: Specifies the maximum number of days for which Snowflake can extend
436472
the data retention period of the dynamic table to prevent streams on the dynamic table
437473
from becoming stale.
438-
statement_params: Dictionary of statement level parameters to be set while executing this action.
439474
iceberg_config: A dictionary that can contain the following iceberg configuration values:
440475
441476
- external_volume: specifies the identifier for the external volume where

src/snowflake/snowpark/modin/plugin/extensions/series_extensions.py

Lines changed: 37 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -335,7 +335,6 @@ def create_or_replace_dynamic_table(
335335
max_data_extension_time: Specifies the maximum number of days for which Snowflake can extend
336336
the data retention period of the dynamic table to prevent streams on the dynamic table
337337
from becoming stale.
338-
statement_params: Dictionary of statement level parameters to be set while executing this action.
339338
iceberg_config: A dictionary that can contain the following iceberg configuration values:
340339
341340
- external_volume: specifies the identifier for the external volume where
@@ -373,6 +372,43 @@ def create_or_replace_dynamic_table(
373372
)
374373

375374

375+
@register_series_accessor("to_view")
376+
def to_view(
377+
self,
378+
name: Union[str, Iterable[str]],
379+
*,
380+
comment: Optional[str] = None,
381+
index: bool = True,
382+
index_label: Optional[IndexLabel] = None,
383+
) -> List[Row]:
384+
"""
385+
Creates a view that captures the computation expressed by this Series.
386+
387+
For ``name``, you can include the database and schema name (i.e. specify a
388+
fully-qualified name). If no database name or schema name are specified, the
389+
view will be created in the current database or schema.
390+
391+
``name`` must be a valid `Snowflake identifier <https://docs.snowflake.com/en/sql-reference/identifiers-syntax.html>`_.
392+
393+
Args:
394+
name: The name of the view to create or replace. Can be a list of strings
395+
that specifies the database name, schema name, and view name.
396+
comment: Adds a comment for the created view. See
397+
`COMMENT <https://docs.snowflake.com/en/sql-reference/sql/comment>`_.
398+
index: default True
399+
If true, save DataFrame index columns in view columns.
400+
index_label:
401+
Column label for index column(s). If None is given (default) and index is True,
402+
then the index names are used. A sequence should be given if the DataFrame uses MultiIndex.
403+
"""
404+
return self.create_or_replace_view(
405+
name=name,
406+
comment=comment,
407+
index=index,
408+
index_label=index_label,
409+
)
410+
411+
376412
@register_series_accessor("to_dynamic_table")
377413
def to_dynamic_table(
378414
self,
@@ -425,7 +461,6 @@ def to_dynamic_table(
425461
max_data_extension_time: Specifies the maximum number of days for which Snowflake can extend
426462
the data retention period of the dynamic table to prevent streams on the dynamic table
427463
from becoming stale.
428-
statement_params: Dictionary of statement level parameters to be set while executing this action.
429464
iceberg_config: A dictionary that can contain the following iceberg configuration values:
430465
431466
- external_volume: specifies the identifier for the external volume where
Lines changed: 199 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,199 @@
1+
#
2+
# Copyright (c) 2012-2025 Snowflake Computing Inc. All rights reserved.
3+
#
4+
5+
import modin.pandas as pd
6+
import pandas as native_pd
7+
import pytest
8+
9+
import snowflake.snowpark.modin.plugin # noqa: F401
10+
from snowflake.snowpark.exceptions import SnowparkSQLException
11+
from snowflake.snowpark.session import Session
12+
from tests.integ.modin.utils import BASIC_TYPE_DATA1, BASIC_TYPE_DATA2
13+
from tests.integ.utils.sql_counter import sql_count_checker
14+
from tests.utils import Utils
15+
16+
17+
@pytest.fixture(scope="function")
18+
def native_pandas_df_basic():
19+
native_df = native_pd.DataFrame(
20+
{
21+
"ID": [1, 2, 3],
22+
"FOOT_SIZE": [32.0, 27.0, 40.0],
23+
"SHOE_MODEL": ["medium", "small", "large"],
24+
}
25+
)
26+
native_df = native_df.set_index("ID")
27+
return native_df
28+
29+
30+
@sql_count_checker(query_count=2)
31+
def test_to_view_basic(session, native_pandas_df_basic) -> None:
32+
view_name = Utils.random_view_name()
33+
try:
34+
snow_dataframe = pd.DataFrame(native_pandas_df_basic)
35+
36+
assert (
37+
"successfully created"
38+
in snow_dataframe.to_view(name=view_name)[0]["status"]
39+
)
40+
41+
finally:
42+
Utils.drop_view(session, view_name)
43+
44+
45+
@sql_count_checker(query_count=6)
46+
def test_to_view_multiple_sessions_no_relaxed_ordering_raises(
47+
session,
48+
db_parameters,
49+
) -> None:
50+
try:
51+
# create table
52+
table_name = Utils.random_table_name()
53+
session.create_dataframe(
54+
[BASIC_TYPE_DATA1, BASIC_TYPE_DATA2]
55+
).write.save_as_table(table_name)
56+
57+
# create dataframe with relaxed_ordering disabled
58+
snow_dataframe = pd.read_snowflake(
59+
f"(((SELECT * FROM {table_name})))", relaxed_ordering=False
60+
)
61+
62+
# create view
63+
view_name = Utils.random_view_name()
64+
assert (
65+
"successfully created"
66+
in snow_dataframe.to_view(name=view_name)[0]["status"]
67+
)
68+
69+
# another session
70+
new_session = Session.builder.configs(db_parameters).create()
71+
pd.session = session
72+
73+
# accessing the created view in another session fails when relaxed_ordering is disabled
74+
with pytest.raises(
75+
SnowparkSQLException,
76+
match="Object 'VIEW_NAME' does not exist or not authorized",
77+
):
78+
new_session.sql("select * from view_name").collect()
79+
new_session.close()
80+
81+
finally:
82+
# cleanup
83+
Utils.drop_view(session, view_name)
84+
Utils.drop_table(session, table_name)
85+
pd.session = session
86+
87+
88+
@sql_count_checker(query_count=4)
89+
def test_to_view_multiple_sessions_relaxed_ordering(
90+
session,
91+
db_parameters,
92+
) -> None:
93+
try:
94+
# create table
95+
table_name = Utils.random_table_name()
96+
session.create_dataframe(
97+
[BASIC_TYPE_DATA1, BASIC_TYPE_DATA2]
98+
).write.save_as_table(table_name)
99+
100+
# create dataframe with relaxed_ordering enabled
101+
snow_dataframe = pd.read_snowflake(
102+
f"(((SELECT * FROM {table_name})))", relaxed_ordering=True
103+
)
104+
105+
# create view
106+
view_name = Utils.random_view_name()
107+
assert (
108+
"successfully created"
109+
in snow_dataframe.to_view(name=view_name)[0]["status"]
110+
)
111+
112+
# another session
113+
new_session = Session.builder.configs(db_parameters).create()
114+
pd.session = new_session
115+
116+
# accessing the created view in another session succeeds when relaxed_ordering is enabled
117+
res = new_session.sql(f"select * from {view_name}").collect()
118+
assert len(res) == 2
119+
new_session.close()
120+
121+
finally:
122+
# cleanup
123+
Utils.drop_view(session, view_name)
124+
Utils.drop_table(session, table_name)
125+
pd.session = session
126+
127+
128+
@pytest.mark.parametrize("index", [True, False])
129+
@pytest.mark.parametrize("index_labels", [None, ["my_index"]])
130+
@sql_count_checker(query_count=6)
131+
def test_to_view_index(session, index, index_labels):
132+
try:
133+
# create table
134+
table_name = Utils.random_table_name()
135+
session.create_dataframe(
136+
[BASIC_TYPE_DATA1, BASIC_TYPE_DATA2]
137+
).write.save_as_table(table_name)
138+
139+
# create dataframe with relaxed_ordering enabled
140+
snow_dataframe = pd.read_snowflake(
141+
f"(((SELECT * FROM {table_name})))", relaxed_ordering=True
142+
)
143+
144+
view_name = Utils.random_view_name()
145+
snow_dataframe.to_view(name=view_name, index=index, index_label=index_labels)
146+
expected_columns = []
147+
if index:
148+
# if index is retained in the result, add it as the first expected column
149+
expected_index = ["index"]
150+
if index_labels:
151+
expected_index = index_labels
152+
expected_columns = expected_columns + expected_index
153+
# add the expected data columns
154+
expected_columns = expected_columns + ["_1", "_2", "_3", "_4", "_5", "_6", "_7"]
155+
156+
# verify columns
157+
actual = pd.read_snowflake(view_name).columns
158+
assert actual.tolist() == expected_columns
159+
finally:
160+
# cleanup
161+
Utils.drop_view(session, view_name)
162+
Utils.drop_table(session, table_name)
163+
164+
165+
@sql_count_checker(query_count=6)
166+
def test_to_view_multiindex(session):
167+
try:
168+
# create table
169+
table_name = Utils.random_table_name()
170+
session.create_dataframe(
171+
[BASIC_TYPE_DATA1, BASIC_TYPE_DATA2]
172+
).write.save_as_table(table_name)
173+
174+
# create dataframe with relaxed_ordering enabled
175+
snow_dataframe = pd.read_snowflake(
176+
f"(((SELECT * FROM {table_name})))", relaxed_ordering=True
177+
)
178+
179+
# make sure dataframe has a multi-index
180+
snow_dataframe = snow_dataframe.set_index(["_1", "_2"])
181+
182+
view_name = Utils.random_view_name()
183+
snow_dataframe.to_view(
184+
name=view_name,
185+
index=True,
186+
)
187+
188+
# verify columns
189+
actual = pd.read_snowflake(view_name).columns
190+
assert actual.tolist() == ["_1", "_2", "_3", "_4", "_5", "_6", "_7"]
191+
192+
with pytest.raises(
193+
ValueError, match="Length of 'index_label' should match number of levels"
194+
):
195+
snow_dataframe.to_view(name=view_name, index=True, index_label=["a"])
196+
finally:
197+
# cleanup
198+
Utils.drop_view(session, view_name)
199+
Utils.drop_table(session, table_name)

0 commit comments

Comments
 (0)