Skip to content

Commit 2c31fd6

Browse files
SNOW-213702, SNOW-2014127: Add support for DataFrame/Series.create_or_replace_dynamic_table (#3209)
1 parent 87cce63 commit 2c31fd6

File tree

7 files changed

+431
-0
lines changed

7 files changed

+431
-0
lines changed

CHANGELOG.md

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -53,6 +53,7 @@
5353
- Added support for `pd.Grouper` objects in group by operations. When `freq` is specified, the default values of the `sort`, `closed`, `label`, and `convention` arguments are supported; `origin` is supported when it is `start` or `start_day`.
5454
- Added support for relaxed consistency and ordering guarantees in `pd.read_snowflake` for both named data sources (e.g., tables and views) and query data sources by introducing the new parameter `relaxed_ordering`.
5555
- Added support for `DataFrame.create_or_replace_view` and `Series.create_or_replace_view`.
56+
- Added support for `DataFrame.create_or_replace_dynamic_table` and `Series.create_or_replace_dynamic_table`.
5657

5758
#### Improvements
5859

docs/source/modin/dataframe.rst

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -39,6 +39,7 @@ DataFrame
3939
DataFrame.to_snowpark
4040
DataFrame.cache_result
4141
DataFrame.create_or_replace_view
42+
DataFrame.create_or_replace_dynamic_table
4243

4344
.. rubric:: Conversion
4445

docs/source/modin/series.rst

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -45,6 +45,7 @@ Series
4545
Series.to_snowpark
4646
Series.cache_result
4747
Series.create_or_replace_view
48+
Series.create_or_replace_dynamic_table
4849

4950
.. rubric:: Conversion
5051

src/snowflake/snowpark/modin/plugin/extensions/dataframe_extensions.py

Lines changed: 85 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -15,6 +15,7 @@
1515
from modin.pandas.api.extensions import register_dataframe_accessor
1616
from pandas._typing import IndexLabel
1717

18+
from snowflake.snowpark._internal.type_utils import ColumnOrName
1819
from snowflake.snowpark.dataframe import DataFrame as SnowparkDataFrame
1920
from snowflake.snowpark.modin.plugin.extensions.utils import add_cache_result_docstring
2021
from snowflake.snowpark.modin.plugin.utils.warning_message import (
@@ -284,3 +285,87 @@ def create_or_replace_view(
284285
comment=comment,
285286
statement_params=statement_params,
286287
)
288+
289+
290+
@register_dataframe_accessor("create_or_replace_dynamic_table")
291+
def create_or_replace_dynamic_table(
292+
self,
293+
name: Union[str, Iterable[str]],
294+
*,
295+
warehouse: str,
296+
lag: str,
297+
comment: Optional[str] = None,
298+
mode: str = "overwrite",
299+
refresh_mode: Optional[str] = None,
300+
initialize: Optional[str] = None,
301+
clustering_keys: Optional[Iterable[ColumnOrName]] = None,
302+
is_transient: bool = False,
303+
data_retention_time: Optional[int] = None,
304+
max_data_extension_time: Optional[int] = None,
305+
statement_params: Optional[Dict[str, str]] = None,
306+
iceberg_config: Optional[dict] = None,
307+
_emit_ast: bool = True,
308+
) -> List[Row]:
309+
"""
310+
Creates a dynamic table that captures the computation expressed by this DataFrame.
311+
312+
For ``name``, you can include the database and schema name (i.e. specify a
313+
fully-qualified name). If no database name or schema name are specified, the
314+
dynamic table will be created in the current database or schema.
315+
316+
``name`` must be a valid `Snowflake identifier <https://docs.snowflake.com/en/sql-reference/identifiers-syntax.html>`_.
317+
318+
Args:
319+
name: The name of the dynamic table to create or replace. Can be a list of strings
320+
that specifies the database name, schema name, and view name.
321+
warehouse: The name of the warehouse used to refresh the dynamic table.
322+
lag: specifies the target data freshness
323+
comment: Adds a comment for the created table. See
324+
`COMMENT <https://docs.snowflake.com/en/sql-reference/sql/comment>`_.
325+
mode: Specifies the behavior of create dynamic table. Allowed values are:
326+
- "overwrite" (default): Overwrite the table by dropping the old table.
327+
- "errorifexists": Throw and exception if the table already exists.
328+
- "ignore": Ignore the operation if table already exists.
329+
refresh_mode: Specifies the refresh mode of the dynamic table. The value can be "AUTO",
330+
"FULL", or "INCREMENTAL".
331+
initialize: Specifies the behavior of initial refresh. The value can be "ON_CREATE" or
332+
"ON_SCHEDULE".
333+
clustering_keys: Specifies one or more columns or column expressions in the table as the clustering key.
334+
See `Clustering Keys & Clustered Tables <https://docs.snowflake.com/en/user-guide/tables-clustering-keys>`_
335+
for more details.
336+
is_transient: A boolean value that specifies whether the dynamic table is transient.
337+
data_retention_time: Specifies the retention period for the dynamic table in days so that
338+
Time Travel actions can be performed on historical data in the dynamic table.
339+
max_data_extension_time: Specifies the maximum number of days for which Snowflake can extend
340+
the data retention period of the dynamic table to prevent streams on the dynamic table
341+
from becoming stale.
342+
statement_params: Dictionary of statement level parameters to be set while executing this action.
343+
iceberg_config: A dictionary that can contain the following iceberg configuration values:
344+
345+
- external_volume: specifies the identifier for the external volume where
346+
the Iceberg table stores its metadata files and data in Parquet format.
347+
- catalog: specifies either Snowflake or a catalog integration to use for this table.
348+
- base_location: the base directory that snowflake can write iceberg metadata and files to.
349+
- catalog_sync: optionally sets the catalog integration configured for Polaris Catalog.
350+
- storage_serialization_policy: specifies the storage serialization policy for the table.
351+
352+
353+
Note:
354+
See `understanding dynamic table refresh <https://docs.snowflake.com/en/user-guide/dynamic-tables-refresh>`_.
355+
for more details on refresh mode.
356+
"""
357+
return self.to_snowpark().create_or_replace_dynamic_table(
358+
name=name,
359+
warehouse=warehouse,
360+
lag=lag,
361+
comment=comment,
362+
mode=mode,
363+
refresh_mode=refresh_mode,
364+
initialize=initialize,
365+
clustering_keys=clustering_keys,
366+
is_transient=is_transient,
367+
data_retention_time=data_retention_time,
368+
max_data_extension_time=max_data_extension_time,
369+
statement_params=statement_params,
370+
iceberg_config=iceberg_config,
371+
)

src/snowflake/snowpark/modin/plugin/extensions/series_extensions.py

Lines changed: 85 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -15,6 +15,7 @@
1515
from modin.pandas.api.extensions import register_series_accessor
1616
from pandas._typing import Axis, IndexLabel
1717

18+
from snowflake.snowpark._internal.type_utils import ColumnOrName
1819
from snowflake.snowpark.dataframe import DataFrame as SnowparkDataFrame
1920
from snowflake.snowpark.modin.plugin.extensions.utils import add_cache_result_docstring
2021
from snowflake.snowpark.modin.plugin.utils.warning_message import (
@@ -274,3 +275,87 @@ def create_or_replace_view(
274275
comment=comment,
275276
statement_params=statement_params,
276277
)
278+
279+
280+
@register_series_accessor("create_or_replace_dynamic_table")
281+
def create_or_replace_dynamic_table(
282+
self,
283+
name: Union[str, Iterable[str]],
284+
*,
285+
warehouse: str,
286+
lag: str,
287+
comment: Optional[str] = None,
288+
mode: str = "overwrite",
289+
refresh_mode: Optional[str] = None,
290+
initialize: Optional[str] = None,
291+
clustering_keys: Optional[Iterable[ColumnOrName]] = None,
292+
is_transient: bool = False,
293+
data_retention_time: Optional[int] = None,
294+
max_data_extension_time: Optional[int] = None,
295+
statement_params: Optional[Dict[str, str]] = None,
296+
iceberg_config: Optional[dict] = None,
297+
_emit_ast: bool = True,
298+
) -> List[Row]:
299+
"""
300+
Creates a dynamic table that captures the computation expressed by this Series.
301+
302+
For ``name``, you can include the database and schema name (i.e. specify a
303+
fully-qualified name). If no database name or schema name are specified, the
304+
dynamic table will be created in the current database or schema.
305+
306+
``name`` must be a valid `Snowflake identifier <https://docs.snowflake.com/en/sql-reference/identifiers-syntax.html>`_.
307+
308+
Args:
309+
name: The name of the dynamic table to create or replace. Can be a list of strings
310+
that specifies the database name, schema name, and view name.
311+
warehouse: The name of the warehouse used to refresh the dynamic table.
312+
lag: specifies the target data freshness
313+
comment: Adds a comment for the created table. See
314+
`COMMENT <https://docs.snowflake.com/en/sql-reference/sql/comment>`_.
315+
mode: Specifies the behavior of create dynamic table. Allowed values are:
316+
- "overwrite" (default): Overwrite the table by dropping the old table.
317+
- "errorifexists": Throw and exception if the table already exists.
318+
- "ignore": Ignore the operation if table already exists.
319+
refresh_mode: Specifies the refresh mode of the dynamic table. The value can be "AUTO",
320+
"FULL", or "INCREMENTAL".
321+
initialize: Specifies the behavior of initial refresh. The value can be "ON_CREATE" or
322+
"ON_SCHEDULE".
323+
clustering_keys: Specifies one or more columns or column expressions in the table as the clustering key.
324+
See `Clustering Keys & Clustered Tables <https://docs.snowflake.com/en/user-guide/tables-clustering-keys>`_
325+
for more details.
326+
is_transient: A boolean value that specifies whether the dynamic table is transient.
327+
data_retention_time: Specifies the retention period for the dynamic table in days so that
328+
Time Travel actions can be performed on historical data in the dynamic table.
329+
max_data_extension_time: Specifies the maximum number of days for which Snowflake can extend
330+
the data retention period of the dynamic table to prevent streams on the dynamic table
331+
from becoming stale.
332+
statement_params: Dictionary of statement level parameters to be set while executing this action.
333+
iceberg_config: A dictionary that can contain the following iceberg configuration values:
334+
335+
- external_volume: specifies the identifier for the external volume where
336+
the Iceberg table stores its metadata files and data in Parquet format.
337+
- catalog: specifies either Snowflake or a catalog integration to use for this table.
338+
- base_location: the base directory that snowflake can write iceberg metadata and files to.
339+
- catalog_sync: optionally sets the catalog integration configured for Polaris Catalog.
340+
- storage_serialization_policy: specifies the storage serialization policy for the table.
341+
342+
343+
Note:
344+
See `understanding dynamic table refresh <https://docs.snowflake.com/en/user-guide/dynamic-tables-refresh>`_.
345+
for more details on refresh mode.
346+
"""
347+
return self.to_snowpark().create_or_replace_dynamic_table(
348+
name=name,
349+
warehouse=warehouse,
350+
lag=lag,
351+
comment=comment,
352+
mode=mode,
353+
refresh_mode=refresh_mode,
354+
initialize=initialize,
355+
clustering_keys=clustering_keys,
356+
is_transient=is_transient,
357+
data_retention_time=data_retention_time,
358+
max_data_extension_time=max_data_extension_time,
359+
statement_params=statement_params,
360+
iceberg_config=iceberg_config,
361+
)
Lines changed: 129 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,129 @@
1+
#
2+
# Copyright (c) 2012-2025 Snowflake Computing Inc. All rights reserved.
3+
#
4+
5+
import modin.pandas as pd
6+
import pytest
7+
8+
import snowflake.snowpark.modin.plugin # noqa: F401
9+
from snowflake.snowpark._internal.utils import TempObjectType
10+
from snowflake.snowpark.exceptions import SnowparkSQLException
11+
from snowflake.snowpark.session import Session
12+
from tests.integ.modin.utils import BASIC_TYPE_DATA1, BASIC_TYPE_DATA2
13+
from tests.integ.utils.sql_counter import sql_count_checker
14+
from tests.utils import Utils
15+
16+
17+
@sql_count_checker(query_count=5)
18+
def test_create_or_replace_dynamic_table_no_relaxed_ordering_raises(session) -> None:
19+
try:
20+
# create table
21+
table_name = Utils.random_table_name()
22+
session.create_dataframe(
23+
[BASIC_TYPE_DATA1, BASIC_TYPE_DATA2]
24+
).write.save_as_table(table_name)
25+
26+
# create dataframe with relaxed_ordering disabled
27+
snow_dataframe = pd.read_snowflake(
28+
f"(((SELECT * FROM {table_name})))", relaxed_ordering=False
29+
)
30+
31+
# creating dynamic_table fails when relaxed_ordering is disabled
32+
# because it cannot depend on a temp table
33+
dynamic_table_name = Utils.random_name_for_temp_object(
34+
TempObjectType.DYNAMIC_TABLE
35+
)
36+
with pytest.raises(
37+
SnowparkSQLException,
38+
match="Dynamic Tables cannot depend on a temporary object",
39+
):
40+
snow_dataframe.create_or_replace_dynamic_table(
41+
name=dynamic_table_name,
42+
warehouse=session.get_current_warehouse(),
43+
lag="1000 minutes",
44+
)
45+
finally:
46+
# cleanup
47+
Utils.drop_dynamic_table(session, dynamic_table_name)
48+
Utils.drop_table(session, table_name)
49+
50+
51+
@sql_count_checker(query_count=5)
52+
def test_create_or_replace_dynamic_table_relaxed_ordering(session) -> None:
53+
try:
54+
# create table
55+
table_name = Utils.random_name_for_temp_object(TempObjectType.DYNAMIC_TABLE)
56+
session.create_dataframe(
57+
[BASIC_TYPE_DATA1, BASIC_TYPE_DATA2]
58+
).write.save_as_table(table_name)
59+
60+
# create dataframe with relaxed_ordering enabled
61+
snow_dataframe = pd.read_snowflake(
62+
f"(((SELECT * FROM {table_name})))", relaxed_ordering=True
63+
)
64+
65+
# creating dynamic_table succeeds when relaxed_ordering is enabled
66+
dynamic_table_name = Utils.random_name_for_temp_object(
67+
TempObjectType.DYNAMIC_TABLE
68+
)
69+
assert (
70+
"successfully created"
71+
in snow_dataframe.create_or_replace_dynamic_table(
72+
name=dynamic_table_name,
73+
warehouse=session.get_current_warehouse(),
74+
lag="1000 minutes",
75+
)[0]["status"]
76+
)
77+
78+
# accessing the created dynamic_table in the same session also succeeds
79+
res = session.sql(f"select * from {dynamic_table_name}").collect()
80+
assert len(res) == 2
81+
finally:
82+
# cleanup
83+
Utils.drop_dynamic_table(session, dynamic_table_name)
84+
Utils.drop_table(session, table_name)
85+
86+
87+
@sql_count_checker(query_count=4)
88+
def test_create_or_replace_dynamic_table_multiple_sessions_relaxed_ordering(
89+
session,
90+
db_parameters,
91+
) -> None:
92+
try:
93+
# create table
94+
table_name = Utils.random_name_for_temp_object(TempObjectType.DYNAMIC_TABLE)
95+
session.create_dataframe(
96+
[BASIC_TYPE_DATA1, BASIC_TYPE_DATA2]
97+
).write.save_as_table(table_name)
98+
99+
# create dataframe with relaxed_ordering enabled
100+
snow_dataframe = pd.read_snowflake(
101+
f"(((SELECT * FROM {table_name})))", relaxed_ordering=True
102+
)
103+
104+
# creating dynamic_table succeeds when relaxed_ordering is enabled
105+
dynamic_table_name = Utils.random_name_for_temp_object(
106+
TempObjectType.DYNAMIC_TABLE
107+
)
108+
assert (
109+
"successfully created"
110+
in snow_dataframe.create_or_replace_dynamic_table(
111+
name=dynamic_table_name,
112+
warehouse=session.get_current_warehouse(),
113+
lag="1000 minutes",
114+
)[0]["status"]
115+
)
116+
117+
# another session
118+
new_session = Session.builder.configs(db_parameters).create()
119+
pd.session = new_session
120+
121+
# accessing the created dynamic_table in another session also succeeds
122+
res = new_session.sql(f"select * from {dynamic_table_name}").collect()
123+
assert len(res) == 2
124+
new_session.close()
125+
finally:
126+
# cleanup
127+
Utils.drop_dynamic_table(session, dynamic_table_name)
128+
Utils.drop_table(session, table_name)
129+
pd.session = session

0 commit comments

Comments
 (0)