Skip to content
Merged
Show file tree
Hide file tree
Changes from 5 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 2 additions & 1 deletion CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@
#### Improvements

- Added support for reading XML files with namespaces using `rowTag` and `stripNamespaces` options.
- Added a new argument to `Dataframe.describe` called `strings_include_math_stats` that triggers `stddev` and `mean` to be calculated for String columns.

### Snowpark Local Testing Updates

Expand All @@ -16,7 +17,7 @@

- Set the default value of the `index` parameter to `False` for `DataFrame.to_view`, `Series.to_view`, `DataFrame.to_dynamic_table`, and `Series.to_dynamic_table`.
- Added `iceberg_version` option to table creation functions.
- Added a new argument to `Dataframe.describe` called `strings_include_math_stats` that triggers `stddev` and `mean` to be calculated for String columns.
- Reduced query count for many operations, including `insert`, `repr`, and `groupby` that previously issued a query to retrieve the input data's size.

## 1.32.0 (2025-05-15)

Expand Down
5 changes: 1 addition & 4 deletions src/snowflake/snowpark/modin/plugin/_internal/frame.py
Original file line number Diff line number Diff line change
Expand Up @@ -738,10 +738,7 @@ def num_rows(self) -> int:
Returns:
Number of rows in this frame.
"""
num_rows = count_rows(self.ordered_dataframe)
self.ordered_dataframe.row_count = num_rows
self.ordered_dataframe.row_count_upper_bound = num_rows
return num_rows
return count_rows(self.ordered_dataframe)

def has_unique_index(self, axis: Optional[int] = 0) -> bool:
"""
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -680,6 +680,7 @@ def select(
row_count_snowflake_quoted_identifier=self.row_count_snowflake_quoted_identifier,
)

new_df.row_count = self.row_count
# Update the row count upper bound
new_df.row_count_upper_bound = RowCountEstimator.upper_bound(
self, DataFrameOperation.SELECT, args={}
Expand Down Expand Up @@ -746,6 +747,8 @@ def union_all(self, other: "OrderedDataFrame") -> "OrderedDataFrame":
DataFrameReference(snowpark_dataframe, result_column_quoted_identifiers),
projected_column_snowflake_quoted_identifiers=result_column_quoted_identifiers,
)
if self.row_count is not None and other.row_count is not None:
new_df.row_count = self.row_count + other.row_count
# Update the row count upper bound
new_df.row_count_upper_bound = RowCountEstimator.upper_bound(
self, DataFrameOperation.UNION_ALL, args={"other": other}
Expand Down Expand Up @@ -849,6 +852,7 @@ def sort(
# No need to reset row count, since sorting should not add/drop rows.
row_count_snowflake_quoted_identifier=self.row_count_snowflake_quoted_identifier,
)
new_df.row_count = self.row_count
# Update the row count upper bound
new_df.row_count_upper_bound = RowCountEstimator.upper_bound(
self, DataFrameOperation.SORT, args={}
Expand Down
7 changes: 6 additions & 1 deletion src/snowflake/snowpark/modin/plugin/_internal/utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -1889,9 +1889,14 @@ def count_rows(df: OrderedDataFrame) -> int:
"""
Returns the number of rows of a Snowpark DataFrame.
"""
if df.row_count is not None:
return df.row_count
df = df.ensure_row_count_column()
rowset = df.select(df.row_count_snowflake_quoted_identifier).limit(1).collect()
return 0 if len(rowset) == 0 else rowset[0][0]
row_count = 0 if len(rowset) == 0 else rowset[0][0]
df.row_count = row_count
df.row_count_upper_bound = row_count
return row_count


def append_columns(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13216,62 +13216,68 @@ def build_repr_df(
# 2. retrieve all columns
# 3. filter on rows with recursive count

# Previously, 2 queries were issued, and a first version replaced them with a single query and a join
# the solution here uses a window function. This may lead to perf regressions, track these here SNOW-984177.
# Ensure that our reference to self._modin_frame is updated with cached row count and position.
self._modin_frame = (
self._modin_frame.ensure_row_position_column().ensure_row_count_column()
)
row_count_pandas_label = (
ROW_COUNT_COLUMN_LABEL
if len(self._modin_frame.data_column_pandas_index_names) == 1
else (ROW_COUNT_COLUMN_LABEL,)
* len(self._modin_frame.data_column_pandas_index_names)
)
frame_with_row_count_and_position = InternalFrame.create(
ordered_dataframe=self._modin_frame.ordered_dataframe,
data_column_pandas_labels=self._modin_frame.data_column_pandas_labels
+ [row_count_pandas_label],
data_column_snowflake_quoted_identifiers=self._modin_frame.data_column_snowflake_quoted_identifiers
+ [self._modin_frame.row_count_snowflake_quoted_identifier],
data_column_pandas_index_names=self._modin_frame.data_column_pandas_index_names,
index_column_pandas_labels=self._modin_frame.index_column_pandas_labels,
index_column_snowflake_quoted_identifiers=self._modin_frame.index_column_snowflake_quoted_identifiers,
data_column_types=self._modin_frame.cached_data_column_snowpark_pandas_types
+ [None],
index_column_types=self._modin_frame.cached_index_column_snowpark_pandas_types,
)
frame = self._modin_frame.ensure_row_position_column()
use_cached_row_count = frame.ordered_dataframe.row_count is not None

row_count_identifier = (
frame_with_row_count_and_position.row_count_snowflake_quoted_identifier
)
# If the row count is already cached, there's no need to include it in the query.
if use_cached_row_count:
row_count_expr = pandas_lit(frame.ordered_dataframe.row_count)
else:
# Previously, 2 queries were issued, and a first version replaced them with a single query and a join
# the solution here uses a window function. This may lead to perf regressions, track these here SNOW-984177.
# Ensure that our reference to self._modin_frame is updated with cached row count and position.
frame = frame.ensure_row_count_column()
row_count_pandas_label = (
ROW_COUNT_COLUMN_LABEL
if len(frame.data_column_pandas_index_names) == 1
else (ROW_COUNT_COLUMN_LABEL,)
* len(frame.data_column_pandas_index_names)
)
frame = InternalFrame.create(
ordered_dataframe=frame.ordered_dataframe,
data_column_pandas_labels=frame.data_column_pandas_labels
+ [row_count_pandas_label],
data_column_snowflake_quoted_identifiers=frame.data_column_snowflake_quoted_identifiers
+ [frame.row_count_snowflake_quoted_identifier],
data_column_pandas_index_names=frame.data_column_pandas_index_names,
index_column_pandas_labels=frame.index_column_pandas_labels,
index_column_snowflake_quoted_identifiers=frame.index_column_snowflake_quoted_identifiers,
data_column_types=frame.cached_data_column_snowpark_pandas_types
+ [None],
index_column_types=frame.cached_index_column_snowpark_pandas_types,
)

row_count_expr = col(frame.row_count_snowflake_quoted_identifier)
row_position_snowflake_quoted_identifier = (
frame_with_row_count_and_position.row_position_snowflake_quoted_identifier
frame.row_position_snowflake_quoted_identifier
)

# filter frame based on num_rows.
# always return all columns as this may also result in a query.
# in the future could analyze plan to see whether retrieving column count would trigger a query, if not
# simply filter out based on static schema
num_rows_for_head_and_tail = num_rows_to_display // 2 + 1
new_frame = frame_with_row_count_and_position.filter(
new_frame = frame.filter(
(
col(row_position_snowflake_quoted_identifier)
<= num_rows_for_head_and_tail
)
| (
col(row_position_snowflake_quoted_identifier)
>= col(row_count_identifier) - num_rows_for_head_and_tail
>= row_count_expr - num_rows_for_head_and_tail
)
)

# retrieve frame as pandas object
new_qc = SnowflakeQueryCompiler(new_frame)
pandas_frame = new_qc.to_pandas()

# remove last column after first retrieving row count
row_count = 0 if 0 == len(pandas_frame) else pandas_frame.iat[0, -1]
pandas_frame = pandas_frame.iloc[:, :-1]
if use_cached_row_count:
row_count = frame.ordered_dataframe.row_count
else:
# remove last column after first retrieving row count
row_count = 0 if len(pandas_frame) == 0 else pandas_frame.iat[0, -1]
pandas_frame = pandas_frame.iloc[:, :-1]
col_count = len(pandas_frame.columns)

return row_count, col_count, pandas_frame
Expand Down
8 changes: 4 additions & 4 deletions tests/integ/modin/crosstab/test_crosstab.py
Original file line number Diff line number Diff line change
Expand Up @@ -223,7 +223,7 @@ def eval_func(args_list):
def test_basic_crosstab_with_df_and_series_objs_pandas_errors_columns(
self, dropna, a, b, c
):
query_count = 4
query_count = 2
join_count = 1 if dropna else 2
a = native_pd.Series(
a,
Expand Down Expand Up @@ -269,7 +269,7 @@ def eval_func(args_list):
def test_basic_crosstab_with_df_and_series_objs_pandas_errors_index(
self, dropna, a, b, c
):
query_count = 6
query_count = 4
join_count = 5 if dropna else 11
a = native_pd.Series(
a,
Expand Down Expand Up @@ -556,7 +556,7 @@ def test_values(self, dropna, aggfunc, basic_crosstab_dfs):

@pytest.mark.parametrize("aggfunc", AGGFUNCS_THAT_CANNOT_PRODUCE_NAN)
def test_values_series_like(self, dropna, aggfunc, basic_crosstab_dfs):
query_count = 5
query_count = 3
join_count = 2 if dropna else 3
native_df, snow_df = basic_crosstab_dfs

Expand Down Expand Up @@ -646,7 +646,7 @@ def test_values_unsupported_aggfunc(basic_crosstab_dfs):
)


@sql_count_checker(query_count=4)
@sql_count_checker(query_count=2)
def test_values_series_like_unsupported_aggfunc(basic_crosstab_dfs):
# The query count above comes from building the DataFrame
# that we pass in to pivot table.
Expand Down
2 changes: 1 addition & 1 deletion tests/integ/modin/frame/test_empty.py
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@
({"A": [np.nan]}, "np nan column"),
],
)
@sql_count_checker(query_count=1)
@sql_count_checker(query_count=0)
def test_dataframe_empty_param(dataframe_input, test_case_name):
eval_snowpark_pandas_result(
pd.DataFrame(dataframe_input),
Expand Down
2 changes: 1 addition & 1 deletion tests/integ/modin/frame/test_from_dict.py
Original file line number Diff line number Diff line change
Expand Up @@ -72,7 +72,7 @@ def test_from_dict_orient_tight():
)


@sql_count_checker(query_count=7)
@sql_count_checker(query_count=5)
def test_from_dict_series_values():
# TODO(SNOW-1857349): Proved a lazy implementation for this case
data = {i: pd.Series(range(1)) for i in range(2)}
Expand Down
8 changes: 4 additions & 4 deletions tests/integ/modin/frame/test_getitem.py
Original file line number Diff line number Diff line change
Expand Up @@ -84,8 +84,8 @@ def get_helper(df):
else:
return df[key]

# 5 extra queries for iter
with SqlCounter(query_count=6 if isinstance(key, native_pd.Index) else 1):
# 4 extra queries for iter
with SqlCounter(query_count=5 if isinstance(key, native_pd.Index) else 1):
eval_snowpark_pandas_result(
default_index_snowpark_pandas_df,
default_index_native_df,
Expand Down Expand Up @@ -119,8 +119,8 @@ def get_helper(df):
native_df = native_pd.DataFrame(data)
snowpark_df = pd.DataFrame(native_df)

# 5 extra queries for iter
with SqlCounter(query_count=6 if isinstance(key, native_pd.Index) else 1):
# 4 extra queries for iter
with SqlCounter(query_count=5 if isinstance(key, native_pd.Index) else 1):
eval_snowpark_pandas_result(
snowpark_df,
native_df,
Expand Down
6 changes: 3 additions & 3 deletions tests/integ/modin/frame/test_iloc.py
Original file line number Diff line number Diff line change
Expand Up @@ -425,7 +425,7 @@ def test_df_iloc_get_empty_key(
)


@sql_count_checker(query_count=2)
@sql_count_checker(query_count=1)
def test_df_iloc_get_empty(empty_snowpark_pandas_df):
_ = empty_snowpark_pandas_df.iloc[0]

Expand Down Expand Up @@ -1811,8 +1811,8 @@ def test_df_iloc_set_with_row_key_list(
else:
snow_row_pos = row_pos

# 2 extra queries for iter
expected_query_count = 3 if isinstance(snow_row_pos, pd.Index) else 1
# 1 extra query for iter
expected_query_count = 2 if isinstance(snow_row_pos, pd.Index) else 1
expected_join_count = 2 if isinstance(item_values, int) else 3

with SqlCounter(query_count=expected_query_count, join_count=expected_join_count):
Expand Down
Loading
Loading