Skip to content
Merged
Show file tree
Hide file tree
Changes from 9 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 7 additions & 1 deletion CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,11 @@

- Added support for `restricted caller` permission of `execute_as` argument in `StoredProcedure.regsiter()`

#### Improvements

- Renamed the `relaxed_ordering` param into `enforce_ordering` for `DataFrame.read_snowflake`. Also the new default values is `enforce_ordering=False` which has the opposite effect of the previous default value, `relaxed_ordering=False`.


#### Bug Fixes

- Fixed a bug in `DataFrame.group_by().pivot().agg` when the pivot column and aggregate column are the same.
Expand Down Expand Up @@ -40,10 +45,11 @@
#### Improvements

- Improve performance of `DataFrame.groupby.apply` and `Series.groupby.apply` by avoiding expensive pivot step.
- Renamed the `relaxed_ordering` param into `enforce_ordering` for `pd.read_snowflake`. Also the new default values is `enforce_ordering=False` which has the opposite effect of the previous default value, `relaxed_ordering=False`.

#### Bug Fixes

- Fixed a bug for `pd.read_snowflake` when reading iceberg tables and `relaxed_ordering=False`.
- Fixed a bug for `pd.read_snowflake` when reading iceberg tables and `enforce_ordering=True`.

## 1.30.0 (2025-03-27)

Expand Down
17 changes: 10 additions & 7 deletions src/snowflake/snowpark/dataframe.py
Original file line number Diff line number Diff line change
Expand Up @@ -1283,7 +1283,7 @@ def to_snowpark_pandas(
self,
index_col: Optional[Union[str, List[str]]] = None,
columns: Optional[List[str]] = None,
relaxed_ordering: bool = False,
enforce_ordering: bool = False,
_emit_ast: bool = True,
) -> "modin.pandas.DataFrame":
"""
Expand All @@ -1293,7 +1293,7 @@ def to_snowpark_pandas(
index_col: A column name or a list of column names to use as index.
columns: A list of column names for the columns to select from the Snowpark DataFrame. If not specified, select
all columns except ones configured in index_col.
relaxed_ordering: If True, Snowpark pandas will provide relaxed consistency and ordering guarantees for the returned
enforce_ordering: If False, Snowpark pandas will provide relaxed consistency and ordering guarantees for the returned
DataFrame object. Otherwise, strict consistency and ordering guarantees are provided. Please refer to the
documentation of :func:`~modin.pandas.read_snowflake` for more details.

Expand Down Expand Up @@ -1378,7 +1378,7 @@ def to_snowpark_pandas(
if columns is not None:
ast.columns.extend(columns if isinstance(columns, list) else [columns])

if not relaxed_ordering:
if enforce_ordering:
# create a temporary table out of the current snowpark dataframe
temporary_table_name = random_name_for_temp_object(
TempObjectType.TABLE
Expand All @@ -1394,19 +1394,22 @@ def to_snowpark_pandas(
self._ast_id = ast_id # reset the AST ID.

snowpandas_df = pd.read_snowflake(
name_or_query=temporary_table_name, index_col=index_col, columns=columns
name_or_query=temporary_table_name,
index_col=index_col,
columns=columns,
enforce_ordering=True,
) # pragma: no cover
else:
if len(self.queries["queries"]) > 1:
raise NotImplementedError(
"Setting 'relaxed_ordering=True' in 'to_snowpark_pandas' is not supported when the input "
"dataframe includes DDL or DML operations. Please use 'relaxed_ordering=False' instead."
"Setting 'enforce_ordering=False' in 'to_snowpark_pandas' is not supported when the input "
"dataframe includes DDL or DML operations. Please use 'enforce_ordering=True' instead."
)
snowpandas_df = pd.read_snowflake(
name_or_query=self.queries["queries"][0],
index_col=index_col,
columns=columns,
relaxed_ordering=True,
enforce_ordering=False,
) # pragma: no cover

if _emit_ast:
Expand Down
20 changes: 11 additions & 9 deletions src/snowflake/snowpark/modin/plugin/_internal/utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -318,7 +318,7 @@ def _create_read_only_table(

def create_initial_ordered_dataframe(
table_name_or_query: Union[str, Iterable[str]],
relaxed_ordering: bool,
enforce_ordering: bool,
) -> tuple[OrderedDataFrame, str]:
"""
create read only temp table on top of the existing table or Snowflake query if required, and create a OrderedDataFrame
Expand All @@ -327,7 +327,7 @@ def create_initial_ordered_dataframe(
Args:
table_name_or_query: A string or list of strings that specify the table name or
fully-qualified object identifier (database name, schema name, and table name) or SQL query.
relaxed_ordering: If False, create a read only temp table on top of the existing table or Snowflake query,
enforce_ordering: If True, create a read only temp table on top of the existing table or Snowflake query,
and create the OrderedDataFrame using the read only temp table created.
Otherwise, directly using the existing table.

Expand All @@ -350,8 +350,8 @@ def create_initial_ordered_dataframe(
table_name_or_query
)
is_query = not _is_table_name(table_name_or_query)
if not is_query or relaxed_ordering:
if not relaxed_ordering:
if not is_query or not enforce_ordering:
if enforce_ordering:
try:
readonly_table_name = _create_read_only_table(
table_name=table_name_or_query,
Expand Down Expand Up @@ -407,7 +407,7 @@ def create_initial_ordered_dataframe(

initial_ordered_dataframe = OrderedDataFrame(
DataFrameReference(session.table(readonly_table_name, _emit_ast=False))
if not relaxed_ordering
if enforce_ordering
else DataFrameReference(session.sql(table_name_or_query, _emit_ast=False))
if is_query
else DataFrameReference(session.table(table_name_or_query, _emit_ast=False))
Expand All @@ -425,7 +425,7 @@ def create_initial_ordered_dataframe(

# create snowpark dataframe with columns: row_position_snowflake_quoted_identifier + snowflake_quoted_identifiers
# if no snowflake_quoted_identifiers is specified, all columns will be selected
if not relaxed_ordering:
if enforce_ordering:
row_position_column_str = f"{METADATA_ROW_POSITION_COLUMN} as {row_position_snowflake_quoted_identifier}"
else:
row_position_column_str = f"ROW_NUMBER() OVER (ORDER BY 1) - 1 as {row_position_snowflake_quoted_identifier}"
Expand All @@ -438,7 +438,7 @@ def create_initial_ordered_dataframe(
# which creates a view without metadata column, we won't be able to access the metadata columns
# with the created snowpark dataframe. In order to get the metadata column access in the created
# dataframe, we create dataframe through sql which access the corresponding metadata column.
if not relaxed_ordering:
if enforce_ordering:
dataframe_sql = f"SELECT {columns_to_select} FROM {readonly_table_name}"
else:
dataframe_sql = f"SELECT {columns_to_select} FROM ({table_name_or_query})"
Expand All @@ -455,7 +455,7 @@ def create_initial_ordered_dataframe(
row_position_snowflake_quoted_identifier=row_position_snowflake_quoted_identifier,
)
else:
assert is_query and not relaxed_ordering
assert is_query and enforce_ordering

# If the string passed in to `pd.read_snowflake` is a SQL query, we can simply create
# a Snowpark DataFrame, and convert that to a Snowpark pandas DataFrame, and extract
Expand All @@ -477,7 +477,9 @@ def create_initial_ordered_dataframe(
# so we lose the data isolation quality of pandas that we are attempting to replicate. By
# creating a read only clone, we ensure that the underlying data cannot be modified by anyone
# else.
snowpark_pandas_df = session.sql(table_name_or_query).to_snowpark_pandas()
snowpark_pandas_df = session.sql(table_name_or_query).to_snowpark_pandas(
enforce_ordering=enforce_ordering
)
except SnowparkSQLException as ex:
raise SnowparkPandasException(
f"Failed to create Snowpark pandas DataFrame out of query {table_name_or_query} with error {ex}",
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1029,7 +1029,7 @@ def from_snowflake(
name_or_query: Union[str, Iterable[str]],
index_col: Optional[Union[str, list[str]]] = None,
columns: Optional[list[str]] = None,
relaxed_ordering: bool = False,
enforce_ordering: bool = False,
) -> "SnowflakeQueryCompiler":
"""
See detailed docstring and examples in ``read_snowflake`` in frontend layer:
Expand All @@ -1044,7 +1044,7 @@ def from_snowflake(
row_position_snowflake_quoted_identifier,
) = create_initial_ordered_dataframe(
table_name_or_query=name_or_query,
relaxed_ordering=relaxed_ordering,
enforce_ordering=enforce_ordering,
)
pandas_labels_to_snowflake_quoted_identifiers_map = {
# pandas labels of resulting Snowpark pandas dataframe will be snowflake identifier
Expand Down Expand Up @@ -1228,7 +1228,7 @@ def is_names_set(kwargs: Any) -> bool:
table_type="temporary",
use_logical_type=True,
)
qc = cls.from_snowflake(temporary_table_name, relaxed_ordering=False)
qc = cls.from_snowflake(temporary_table_name, enforce_ordering=True)
return cls._post_process_file(qc, filetype="csv", **kwargs)

@classmethod
Expand Down Expand Up @@ -1292,7 +1292,7 @@ def from_file_with_snowflake(
)

qc = cls.from_snowflake(
name_or_query=temporary_table_name, relaxed_ordering=False
name_or_query=temporary_table_name, enforce_ordering=True
)

return cls._post_process_file(qc=qc, filetype=filetype, **kwargs)
Expand Down
25 changes: 13 additions & 12 deletions src/snowflake/snowpark/modin/plugin/extensions/pd_extensions.py
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,7 @@ def read_snowflake(
name_or_query: Union[str, Iterable[str]],
index_col: Union[str, list[str], None] = None,
columns: Optional[list[str]] = None,
relaxed_ordering: bool = False,
enforce_ordering: bool = False,
) -> DataFrame:
"""
Read a Snowflake table or SQL Query to a Snowpark pandas DataFrame.
Expand All @@ -52,8 +52,8 @@ def read_snowflake(
A column name or a list of column names to use as index.
columns:
A list of column names to select from the table. If not specified, select all columns.
relaxed_ordering:
If True, Snowpark pandas will provide relaxed consistency and ordering guarantees for the returned
enforce_ordering:
If False, Snowpark pandas will provide relaxed consistency and ordering guarantees for the returned
DataFrame object. Otherwise, strict consistency and ordering guarantees are provided. See the Notes
section for more details.

Expand All @@ -73,7 +73,7 @@ def read_snowflake(

Snowpark pandas provides two modes of consistency and ordering semantics.

* When `relaxed_ordering` is set to True, Snowpark pandas provides relaxed consistency and ordering guarantees. In particular, the returned DataFrame object will be
* When `enforce_ordering` is set to False, Snowpark pandas provides relaxed consistency and ordering guarantees. In particular, the returned DataFrame object will be
directly based on the source given by `name_or_query`. Consistency and isolation guarantees are relaxed in this case because any changes that happen to the source will be reflected in the
DataFrame object returned by `pd.read_snowflake`.

Expand All @@ -87,7 +87,7 @@ def read_snowflake(
Note that when `name_or_query` is a query with an ORDER BY clause, this will only guarantee that the immediate results of the input query are sorted. But it still gives no guarantees
on the order of the final results (after applying a sequence of pandas operations to those initial results).

* When `relaxed_ordering` is set to False, Snowpark pandas provides the same consistency and ordering guarantees for `read_snowflake` as if local files were read.
* When `enforce_ordering` is set to True, Snowpark pandas provides the same consistency and ordering guarantees for `read_snowflake` as if local files were read.
For example, calling `df.head(5)` two consecutive times is guaranteed to result in the exact same set of 5 rows each time and with the same ordering.
Depending on the type of source, `pd.read_snowflake` will do one of the following
at the time of calling `pd.read_snowflake`:
Expand Down Expand Up @@ -251,8 +251,9 @@ def read_snowflake(

>>> pd.read_snowflake(f'''-- SQL Comment 1
... -- SQL Comment 2
... SELECT * FROM {table_name} WHERE A > 0
... -- SQL Comment 3''')
... SELECT * FROM {table_name}
... -- SQL Comment 3
... WHERE A > 0''')
A B C
0 1 2 3

Expand Down Expand Up @@ -294,7 +295,7 @@ def read_snowflake(
- Anonymous Stored Procedures (using CTEs) may also be used (although special care must be taken with respect to indentation of the code block,
since the entire string encapsulated by the `$$` will be passed directly to a Python interpreter. In the example below, the lines within
the function are indented, but not the import statement or function definition). The output schema must be specified when defining
an anonymous stored procedure.
an anonymous stored procedure. Currently CALL statements are only supported when `enforce_ordering=True`.

>>> pd.read_snowflake('''WITH filter_rows AS PROCEDURE (table_name VARCHAR, column_to_filter VARCHAR, value NUMBER)
... RETURNS TABLE(A NUMBER, B NUMBER, C NUMBER)
Expand All @@ -306,7 +307,7 @@ def read_snowflake(
... def filter_rows(session, table_name, column_to_filter, value):
... df = session.table(table_name)
... return df.filter(col(column_to_filter) == value)$$
... ''' + f"CALL filter_rows('{table_name}', 'A', 1)")
... ''' + f"CALL filter_rows('{table_name}', 'A', 1)", enforce_ordering=True)
A B C
0 1 2 3

Expand All @@ -331,7 +332,7 @@ def read_snowflake(
... }
... }
... $$
... ''' + f"CALL filter_rows('{table_name}', 'A', -1)")
... ''' + f"CALL filter_rows('{table_name}', 'A', -1)", enforce_ordering=True)
A B C
0 -1 -2 -3

Expand All @@ -347,7 +348,7 @@ def read_snowflake(
... df = session_.table(table_name)
... return df.select('*', (col(col_to_multiply)*value).as_("D"))

>>> pd.read_snowflake(f"CALL multiply_col_by_value('{table_name}', 'A', 2)")
>>> pd.read_snowflake(f"CALL multiply_col_by_value('{table_name}', 'A', 2)", enforce_ordering=True)
A B C D
0 1 2 3 2
1 -1 -2 -3 -2
Expand Down Expand Up @@ -391,7 +392,7 @@ def read_snowflake(
name_or_query,
index_col=index_col,
columns=columns,
relaxed_ordering=relaxed_ordering,
enforce_ordering=enforce_ordering,
)
)

Expand Down
4 changes: 2 additions & 2 deletions src/snowflake/snowpark/modin/plugin/io/snow_io.py
Original file line number Diff line number Diff line change
Expand Up @@ -212,14 +212,14 @@ def read_snowflake(
name_or_query: Union[str, Iterable[str]],
index_col: Optional[Union[str, list[str]]] = None,
columns: Optional[list[str]] = None,
relaxed_ordering: bool = False,
enforce_ordering: bool = False,
):
"""
See detailed docstring and examples in ``read_snowflake`` in frontend layer:
src/snowflake/snowpark/modin/plugin/pd_extensions.py
"""
return cls.query_compiler_cls.from_snowflake(
name_or_query, index_col, columns, relaxed_ordering=relaxed_ordering
name_or_query, index_col, columns, enforce_ordering=enforce_ordering
)

@classmethod
Expand Down
2 changes: 1 addition & 1 deletion tests/integ/modin/frame/test_aggregate.py
Original file line number Diff line number Diff line change
Expand Up @@ -812,7 +812,7 @@ def test_sum_min_count(min_count, axis):
)


@sql_count_checker(query_count=3, union_count=4)
@sql_count_checker(query_count=2, union_count=4)
def test_agg_valid_variant_col(session, test_table_name):
pandas_df = native_pd.DataFrame(
{
Expand Down
2 changes: 1 addition & 1 deletion tests/integ/modin/frame/test_align.py
Original file line number Diff line number Diff line change
Expand Up @@ -328,7 +328,7 @@ def test_align_basic_axis0_on_row_position_columns(join):
native_df2 = df2.to_pandas()

# verify that no window function is generated
with SqlCounter(query_count=2, join_count=2, window_count=0):
with SqlCounter(query_count=2, join_count=2, window_count=2):
native_left, native_right = native_df1.align(
native_df2,
join=join,
Expand Down
Loading
Loading