Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 2 additions & 1 deletion CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@
#### Improvements

- Added support for reading XML files with namespaces using `rowTag` and `stripNamespaces` options.
- Added a new argument to `Dataframe.describe` called `strings_include_math_stats` that triggers `stddev` and `mean` to be calculated for String columns.

### Snowpark Local Testing Updates

Expand All @@ -16,7 +17,7 @@

- Set the default value of the `index` parameter to `False` for `DataFrame.to_view`, `Series.to_view`, `DataFrame.to_dynamic_table`, and `Series.to_dynamic_table`.
- Added `iceberg_version` option to table creation functions.
- Added a new argument to `Dataframe.describe` called `strings_include_math_stats` that triggers `stddev` and `mean` to be calculated for String columns.
- Reduced query count for many operations, including `insert`, `repr`, and `groupby`, that previously issued a query to retrieve the input data's size.

## 1.32.0 (2025-05-15)

Expand Down
5 changes: 1 addition & 4 deletions src/snowflake/snowpark/modin/plugin/_internal/frame.py
Original file line number Diff line number Diff line change
Expand Up @@ -738,10 +738,7 @@ def num_rows(self) -> int:
Returns:
Number of rows in this frame.
"""
num_rows = count_rows(self.ordered_dataframe)
self.ordered_dataframe.row_count = num_rows
self.ordered_dataframe.row_count_upper_bound = num_rows
return num_rows
return count_rows(self.ordered_dataframe)

def has_unique_index(self, axis: Optional[int] = 0) -> bool:
"""
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -680,6 +680,7 @@ def select(
row_count_snowflake_quoted_identifier=self.row_count_snowflake_quoted_identifier,
)

new_df.row_count = self.row_count
# Update the row count upper bound
new_df.row_count_upper_bound = RowCountEstimator.upper_bound(
self, DataFrameOperation.SELECT, args={}
Expand Down Expand Up @@ -746,6 +747,8 @@ def union_all(self, other: "OrderedDataFrame") -> "OrderedDataFrame":
DataFrameReference(snowpark_dataframe, result_column_quoted_identifiers),
projected_column_snowflake_quoted_identifiers=result_column_quoted_identifiers,
)
if self.row_count is not None and other.row_count is not None:
new_df.row_count = self.row_count + other.row_count
# Update the row count upper bound
new_df.row_count_upper_bound = RowCountEstimator.upper_bound(
self, DataFrameOperation.UNION_ALL, args={"other": other}
Expand Down Expand Up @@ -849,6 +852,7 @@ def sort(
# No need to reset row count, since sorting should not add/drop rows.
row_count_snowflake_quoted_identifier=self.row_count_snowflake_quoted_identifier,
)
new_df.row_count = self.row_count
# Update the row count upper bound
new_df.row_count_upper_bound = RowCountEstimator.upper_bound(
self, DataFrameOperation.SORT, args={}
Expand Down
7 changes: 6 additions & 1 deletion src/snowflake/snowpark/modin/plugin/_internal/utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -1889,9 +1889,14 @@ def count_rows(df: OrderedDataFrame) -> int:
"""
Returns the number of rows of a Snowpark DataFrame.
"""
if df.row_count is not None:
return df.row_count
df = df.ensure_row_count_column()
rowset = df.select(df.row_count_snowflake_quoted_identifier).limit(1).collect()
return 0 if len(rowset) == 0 else rowset[0][0]
row_count = 0 if len(rowset) == 0 else rowset[0][0]
df.row_count = row_count
df.row_count_upper_bound = row_count
return row_count


def append_columns(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13216,62 +13216,68 @@ def build_repr_df(
# 2. retrieve all columns
# 3. filter on rows with recursive count

# Previously, 2 queries were issued, and a first version replaced them with a single query and a join
# the solution here uses a window function. This may lead to perf regressions, track these here SNOW-984177.
# Ensure that our reference to self._modin_frame is updated with cached row count and position.
self._modin_frame = (
self._modin_frame.ensure_row_position_column().ensure_row_count_column()
)
row_count_pandas_label = (
ROW_COUNT_COLUMN_LABEL
if len(self._modin_frame.data_column_pandas_index_names) == 1
else (ROW_COUNT_COLUMN_LABEL,)
* len(self._modin_frame.data_column_pandas_index_names)
)
frame_with_row_count_and_position = InternalFrame.create(
ordered_dataframe=self._modin_frame.ordered_dataframe,
data_column_pandas_labels=self._modin_frame.data_column_pandas_labels
+ [row_count_pandas_label],
data_column_snowflake_quoted_identifiers=self._modin_frame.data_column_snowflake_quoted_identifiers
+ [self._modin_frame.row_count_snowflake_quoted_identifier],
data_column_pandas_index_names=self._modin_frame.data_column_pandas_index_names,
index_column_pandas_labels=self._modin_frame.index_column_pandas_labels,
index_column_snowflake_quoted_identifiers=self._modin_frame.index_column_snowflake_quoted_identifiers,
data_column_types=self._modin_frame.cached_data_column_snowpark_pandas_types
+ [None],
index_column_types=self._modin_frame.cached_index_column_snowpark_pandas_types,
)
frame = self._modin_frame.ensure_row_position_column()
use_cached_row_count = frame.ordered_dataframe.row_count is not None

row_count_identifier = (
frame_with_row_count_and_position.row_count_snowflake_quoted_identifier
)
# If the row count is already cached, there's no need to include it in the query.
if use_cached_row_count:
row_count_expr = pandas_lit(frame.ordered_dataframe.row_count)
else:
# Previously, 2 queries were issued, and a first version replaced them with a single query and a join
# the solution here uses a window function. This may lead to perf regressions, track these here SNOW-984177.
# Ensure that our reference to self._modin_frame is updated with cached row count and position.
frame = frame.ensure_row_count_column()
row_count_pandas_label = (
ROW_COUNT_COLUMN_LABEL
if len(frame.data_column_pandas_index_names) == 1
else (ROW_COUNT_COLUMN_LABEL,)
* len(frame.data_column_pandas_index_names)
)
frame = InternalFrame.create(
ordered_dataframe=frame.ordered_dataframe,
data_column_pandas_labels=frame.data_column_pandas_labels
+ [row_count_pandas_label],
data_column_snowflake_quoted_identifiers=frame.data_column_snowflake_quoted_identifiers
+ [frame.row_count_snowflake_quoted_identifier],
data_column_pandas_index_names=frame.data_column_pandas_index_names,
index_column_pandas_labels=frame.index_column_pandas_labels,
index_column_snowflake_quoted_identifiers=frame.index_column_snowflake_quoted_identifiers,
data_column_types=frame.cached_data_column_snowpark_pandas_types
+ [None],
index_column_types=frame.cached_index_column_snowpark_pandas_types,
)

row_count_expr = col(frame.row_count_snowflake_quoted_identifier)
row_position_snowflake_quoted_identifier = (
frame_with_row_count_and_position.row_position_snowflake_quoted_identifier
frame.row_position_snowflake_quoted_identifier
)

# filter frame based on num_rows.
# always return all columns as this may also result in a query.
# in the future could analyze plan to see whether retrieving column count would trigger a query, if not
# simply filter out based on static schema
num_rows_for_head_and_tail = num_rows_to_display // 2 + 1
new_frame = frame_with_row_count_and_position.filter(
new_frame = frame.filter(
(
col(row_position_snowflake_quoted_identifier)
<= num_rows_for_head_and_tail
)
| (
col(row_position_snowflake_quoted_identifier)
>= col(row_count_identifier) - num_rows_for_head_and_tail
>= row_count_expr - num_rows_for_head_and_tail
)
)

# retrieve frame as pandas object
new_qc = SnowflakeQueryCompiler(new_frame)
pandas_frame = new_qc.to_pandas()

# remove last column after first retrieving row count
row_count = 0 if 0 == len(pandas_frame) else pandas_frame.iat[0, -1]
pandas_frame = pandas_frame.iloc[:, :-1]
if use_cached_row_count:
row_count = frame.ordered_dataframe.row_count
else:
# remove last column after first retrieving row count
row_count = 0 if len(pandas_frame) == 0 else pandas_frame.iat[0, -1]
pandas_frame = pandas_frame.iloc[:, :-1]
col_count = len(pandas_frame.columns)

return row_count, col_count, pandas_frame
Expand Down
8 changes: 4 additions & 4 deletions tests/integ/modin/crosstab/test_crosstab.py
Original file line number Diff line number Diff line change
Expand Up @@ -223,7 +223,7 @@ def eval_func(args_list):
def test_basic_crosstab_with_df_and_series_objs_pandas_errors_columns(
self, dropna, a, b, c
):
query_count = 4
query_count = 2
join_count = 1 if dropna else 2
a = native_pd.Series(
a,
Expand Down Expand Up @@ -269,7 +269,7 @@ def eval_func(args_list):
def test_basic_crosstab_with_df_and_series_objs_pandas_errors_index(
self, dropna, a, b, c
):
query_count = 6
query_count = 4
join_count = 5 if dropna else 11
a = native_pd.Series(
a,
Expand Down Expand Up @@ -556,7 +556,7 @@ def test_values(self, dropna, aggfunc, basic_crosstab_dfs):

@pytest.mark.parametrize("aggfunc", AGGFUNCS_THAT_CANNOT_PRODUCE_NAN)
def test_values_series_like(self, dropna, aggfunc, basic_crosstab_dfs):
query_count = 5
query_count = 3
join_count = 2 if dropna else 3
native_df, snow_df = basic_crosstab_dfs

Expand Down Expand Up @@ -646,7 +646,7 @@ def test_values_unsupported_aggfunc(basic_crosstab_dfs):
)


@sql_count_checker(query_count=4)
@sql_count_checker(query_count=2)
def test_values_series_like_unsupported_aggfunc(basic_crosstab_dfs):
# The query count above comes from building the DataFrame
# that we pass in to pivot table.
Expand Down
2 changes: 1 addition & 1 deletion tests/integ/modin/frame/test_empty.py
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@
({"A": [np.nan]}, "np nan column"),
],
)
@sql_count_checker(query_count=1)
@sql_count_checker(query_count=0)
def test_dataframe_empty_param(dataframe_input, test_case_name):
eval_snowpark_pandas_result(
pd.DataFrame(dataframe_input),
Expand Down
2 changes: 1 addition & 1 deletion tests/integ/modin/frame/test_from_dict.py
Original file line number Diff line number Diff line change
Expand Up @@ -72,7 +72,7 @@ def test_from_dict_orient_tight():
)


@sql_count_checker(query_count=7)
@sql_count_checker(query_count=5)
def test_from_dict_series_values():
# TODO(SNOW-1857349): Proved a lazy implementation for this case
data = {i: pd.Series(range(1)) for i in range(2)}
Expand Down
8 changes: 4 additions & 4 deletions tests/integ/modin/frame/test_getitem.py
Original file line number Diff line number Diff line change
Expand Up @@ -84,8 +84,8 @@ def get_helper(df):
else:
return df[key]

# 5 extra queries for iter
with SqlCounter(query_count=6 if isinstance(key, native_pd.Index) else 1):
# 4 extra queries for iter
with SqlCounter(query_count=5 if isinstance(key, native_pd.Index) else 1):
eval_snowpark_pandas_result(
default_index_snowpark_pandas_df,
default_index_native_df,
Expand Down Expand Up @@ -119,8 +119,8 @@ def get_helper(df):
native_df = native_pd.DataFrame(data)
snowpark_df = pd.DataFrame(native_df)

# 5 extra queries for iter
with SqlCounter(query_count=6 if isinstance(key, native_pd.Index) else 1):
# 4 extra queries for iter
with SqlCounter(query_count=5 if isinstance(key, native_pd.Index) else 1):
eval_snowpark_pandas_result(
snowpark_df,
native_df,
Expand Down
6 changes: 3 additions & 3 deletions tests/integ/modin/frame/test_iloc.py
Original file line number Diff line number Diff line change
Expand Up @@ -425,7 +425,7 @@ def test_df_iloc_get_empty_key(
)


@sql_count_checker(query_count=2)
@sql_count_checker(query_count=1)
def test_df_iloc_get_empty(empty_snowpark_pandas_df):
_ = empty_snowpark_pandas_df.iloc[0]

Expand Down Expand Up @@ -1811,8 +1811,8 @@ def test_df_iloc_set_with_row_key_list(
else:
snow_row_pos = row_pos

# 2 extra queries for iter
expected_query_count = 3 if isinstance(snow_row_pos, pd.Index) else 1
# 1 extra query for iter
expected_query_count = 2 if isinstance(snow_row_pos, pd.Index) else 1
expected_join_count = 2 if isinstance(item_values, int) else 3

with SqlCounter(query_count=expected_query_count, join_count=expected_join_count):
Expand Down
Loading
Loading