Skip to content

Commit 1e5b025

Browse files
SNOW-2296598: Estimate row count only when Hybrid is enabled (#3708)
1 parent 8c54efe commit 1e5b025

39 files changed

+221
-230
lines changed

CHANGELOG.md

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -51,6 +51,8 @@
5151
- Set the default transfer limit in hybrid execution for data leaving Snowflake to 100k, which can be overridden with the SnowflakePandasTransferThreshold environment variable. This configuration is appropriate for scenarios with two available engines, "Pandas" and "Snowflake" on relational workloads.
5252
- Improve import error message by adding '--upgrade' to 'pip install "snowflake-snowpark-python[modin]"' in the error message.
5353
- Reduce the telemetry messages from the modin client by pre-aggregating into 5 second windows and only keeping a narrow band of metrics which are useful for tracking hybrid execution and native pandas performance.
54+
- Set the initial row count only when hybrid execution is enabled. This reduces the number of queries issued for many workloads.
55+
5456

5557
#### Dependency Updates
5658

src/snowflake/snowpark/modin/plugin/_internal/row_count_estimation.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -76,7 +76,7 @@ def upper_bound(
7676
elif operation == DataFrameOperation.UNION_ALL:
7777
other: OrderedDataFrame = args["other"]
7878
other_bound = other.row_count_upper_bound or other.row_count
79-
if other_bound is None:
79+
if other_bound is None or current is None:
8080
# Cannot estimate row count: other DataFrame has no row count information
8181
return None
8282
return current + other_bound

src/snowflake/snowpark/modin/plugin/_internal/utils.py

Lines changed: 14 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -498,12 +498,20 @@ def create_initial_ordered_dataframe(
498498
row_position_snowflake_quoted_identifier = (
499499
ordered_dataframe.row_position_snowflake_quoted_identifier
500500
)
501-
# Set the materialized row count
502-
materialized_row_count = ordered_dataframe._dataframe_ref.snowpark_dataframe.count(
503-
statement_params=get_default_snowpark_pandas_statement_params(), _emit_ast=False
504-
)
505-
ordered_dataframe.row_count = materialized_row_count
506-
ordered_dataframe.row_count_upper_bound = materialized_row_count
501+
502+
from modin.config import AutoSwitchBackend
503+
504+
if AutoSwitchBackend.get():
505+
# Set the materialized row count
506+
materialized_row_count = (
507+
initial_ordered_dataframe._dataframe_ref.snowpark_dataframe.count(
508+
statement_params=get_default_snowpark_pandas_statement_params(),
509+
_emit_ast=False,
510+
)
511+
)
512+
ordered_dataframe.row_count = materialized_row_count
513+
ordered_dataframe.row_count_upper_bound = materialized_row_count
514+
507515
return ordered_dataframe, row_position_snowflake_quoted_identifier
508516

509517

tests/integ/modin/frame/test_aggregate.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -812,7 +812,7 @@ def test_sum_min_count(min_count, axis):
812812
)
813813

814814

815-
@sql_count_checker(query_count=3, union_count=4)
815+
@sql_count_checker(query_count=2, union_count=4)
816816
def test_agg_valid_variant_col(session, test_table_name):
817817
pandas_df = native_pd.DataFrame(
818818
{

tests/integ/modin/frame/test_create_or_replace_dynamic_table.py

Lines changed: 5 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -14,7 +14,7 @@
1414
from tests.utils import Utils
1515

1616

17-
@sql_count_checker(query_count=7)
17+
@sql_count_checker(query_count=5)
1818
def test_create_or_replace_dynamic_table_enforce_ordering_raises(session) -> None:
1919
try:
2020
# create table
@@ -48,7 +48,7 @@ def test_create_or_replace_dynamic_table_enforce_ordering_raises(session) -> Non
4848
Utils.drop_table(session, table_name)
4949

5050

51-
@sql_count_checker(query_count=6)
51+
@sql_count_checker(query_count=5)
5252
def test_create_or_replace_dynamic_table_no_enforce_ordering(session) -> None:
5353
try:
5454
# create table
@@ -84,7 +84,7 @@ def test_create_or_replace_dynamic_table_no_enforce_ordering(session) -> None:
8484
Utils.drop_table(session, table_name)
8585

8686

87-
@sql_count_checker(query_count=5)
87+
@sql_count_checker(query_count=4)
8888
def test_create_or_replace_dynamic_table_multiple_sessions_no_enforce_ordering(
8989
session,
9090
db_parameters,
@@ -131,7 +131,7 @@ def test_create_or_replace_dynamic_table_multiple_sessions_no_enforce_ordering(
131131

132132
@pytest.mark.parametrize("index", [True, False])
133133
@pytest.mark.parametrize("index_labels", [None, ["my_index"]])
134-
@sql_count_checker(query_count=6)
134+
@sql_count_checker(query_count=4)
135135
def test_create_or_replace_dynamic_table_index(session, index, index_labels):
136136
try:
137137
# create table
@@ -177,7 +177,7 @@ def test_create_or_replace_dynamic_table_index(session, index, index_labels):
177177
Utils.drop_table(session, table_name)
178178

179179

180-
@sql_count_checker(query_count=6)
180+
@sql_count_checker(query_count=4)
181181
def test_create_or_replace_dynamic_table_multiindex(session):
182182
try:
183183
# create table

tests/integ/modin/frame/test_create_or_replace_view.py

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -42,7 +42,7 @@ def test_create_or_replace_view_basic(session, native_pandas_df_basic) -> None:
4242
Utils.drop_view(session, view_name)
4343

4444

45-
@sql_count_checker(query_count=8)
45+
@sql_count_checker(query_count=6)
4646
def test_create_or_replace_view_multiple_sessions_enforce_ordering_raises(
4747
session,
4848
db_parameters,
@@ -85,7 +85,7 @@ def test_create_or_replace_view_multiple_sessions_enforce_ordering_raises(
8585
pd.session = session
8686

8787

88-
@sql_count_checker(query_count=5)
88+
@sql_count_checker(query_count=4)
8989
def test_create_or_replace_view_multiple_sessions_no_enforce_ordering(
9090
session,
9191
db_parameters,
@@ -127,7 +127,7 @@ def test_create_or_replace_view_multiple_sessions_no_enforce_ordering(
127127

128128
@pytest.mark.parametrize("index", [True, False])
129129
@pytest.mark.parametrize("index_labels", [None, ["my_index"]])
130-
@sql_count_checker(query_count=6)
130+
@sql_count_checker(query_count=4)
131131
def test_create_or_replace_view_index(session, index, index_labels):
132132
try:
133133
# create table
@@ -167,7 +167,7 @@ def test_create_or_replace_view_index(session, index, index_labels):
167167
Utils.drop_table(session, table_name)
168168

169169

170-
@sql_count_checker(query_count=6)
170+
@sql_count_checker(query_count=4)
171171
def test_create_or_replace_view_multiindex(session):
172172
try:
173173
# create table

tests/integ/modin/frame/test_describe.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -349,7 +349,7 @@ def helper(df):
349349

350350

351351
@sql_count_checker(
352-
query_count=4,
352+
query_count=3,
353353
union_count=8,
354354
)
355355
# SNOW-1320296 - pd.concat SQL Compilation ambigious __row_position__ issue

tests/integ/modin/frame/test_to_dynamic_table.py

Lines changed: 5 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -27,7 +27,7 @@ def to_dynamic_table(request):
2727
return request.param
2828

2929

30-
@sql_count_checker(query_count=5)
30+
@sql_count_checker(query_count=4)
3131
def test_to_dynamic_table_enforce_ordering_raises(session, to_dynamic_table) -> None:
3232
try:
3333
# create table
@@ -62,7 +62,7 @@ def test_to_dynamic_table_enforce_ordering_raises(session, to_dynamic_table) ->
6262
Utils.drop_table(session, table_name)
6363

6464

65-
@sql_count_checker(query_count=6)
65+
@sql_count_checker(query_count=5)
6666
def test_to_dynamic_table_no_enforce_ordering(session, to_dynamic_table) -> None:
6767
try:
6868
# create table
@@ -98,7 +98,7 @@ def test_to_dynamic_table_no_enforce_ordering(session, to_dynamic_table) -> None
9898
Utils.drop_table(session, table_name)
9999

100100

101-
@sql_count_checker(query_count=5)
101+
@sql_count_checker(query_count=4)
102102
def test_to_dynamic_table_multiple_sessions_no_enforce_ordering(
103103
session,
104104
db_parameters,
@@ -153,7 +153,7 @@ def test_to_dynamic_table_multiple_sessions_no_enforce_ordering(
153153
(False, ["my_index"], []),
154154
],
155155
)
156-
@sql_count_checker(query_count=6)
156+
@sql_count_checker(query_count=4)
157157
def test_to_dynamic_table_index(
158158
session, index, index_labels, expected_index_columns, to_dynamic_table
159159
):
@@ -204,7 +204,7 @@ def test_to_dynamic_table_index(
204204
Utils.drop_table(session, table_name)
205205

206206

207-
@sql_count_checker(query_count=6)
207+
@sql_count_checker(query_count=4)
208208
def test_to_dynamic_table_multiindex(session, to_dynamic_table):
209209
try:
210210
# create table

tests/integ/modin/frame/test_to_iceberg.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -41,7 +41,7 @@ def to_iceberg(request):
4141
return request.param
4242

4343

44-
@sql_count_checker(query_count=6)
44+
@sql_count_checker(query_count=5)
4545
def test_to_iceberg(session, native_pandas_df_basic, to_iceberg):
4646
if not iceberg_supported(session, local_testing_mode=False):
4747
pytest.skip("Test requires iceberg support.")

tests/integ/modin/frame/test_to_snowflake.py

Lines changed: 15 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -17,7 +17,7 @@
1717
@pytest.mark.parametrize("index", [True, False])
1818
@pytest.mark.parametrize("index_labels", [None, ["my_index"]])
1919
# one extra query to convert index to native pandas when creating the snowpark pandas dataframe
20-
@sql_count_checker(query_count=3)
20+
@sql_count_checker(query_count=2)
2121
def test_to_snowflake_index(test_table_name, index, index_labels):
2222
df = pd.DataFrame(
2323
{"a": [1, 2, 3], "b": [4, 5, 6]}, index=pd.Index([2, 3, 4], name="index")
@@ -38,7 +38,7 @@ def test_to_snowflake_index(test_table_name, index, index_labels):
3838
verify_columns(test_table_name, expected_columns)
3939

4040

41-
@sql_count_checker(query_count=2)
41+
@sql_count_checker(query_count=1)
4242
def test_to_snowflake_multiindex(test_table_name):
4343
index = native_pd.MultiIndex.from_arrays(
4444
[[1, 1, 2, 2], ["red", "blue", "red", "blue"]], names=("number", "color")
@@ -94,7 +94,7 @@ def test_to_snowflake_if_exists(session, test_table_name):
9494
df = pd.DataFrame({"a": [1, 2, 3], "b": [4, 5, 6]})
9595

9696
# Verify new table is created
97-
with SqlCounter(query_count=3):
97+
with SqlCounter(query_count=2):
9898
df.to_snowflake(test_table_name, if_exists="fail", index=False)
9999
verify_columns(test_table_name, ["a", "b"])
100100

@@ -110,19 +110,19 @@ def test_to_snowflake_if_exists(session, test_table_name):
110110

111111
# Verify existing table is replaced with new data
112112
df = pd.DataFrame({"a": [1, 2, 3], "c": [4, 5, 6]})
113-
with SqlCounter(query_count=3):
113+
with SqlCounter(query_count=2):
114114
df.to_snowflake(test_table_name, if_exists="replace", index=False)
115115
verify_columns(test_table_name, ["a", "c"])
116116
verify_num_rows(session, test_table_name, 3)
117117

118118
# Verify data is appended to existing table
119-
with SqlCounter(query_count=4):
119+
with SqlCounter(query_count=3):
120120
df.to_snowflake(test_table_name, if_exists="append", index=False)
121121
verify_columns(test_table_name, ["a", "c"])
122122
verify_num_rows(session, test_table_name, 6)
123123

124124
# Verify pd.to_snowflake operates the same
125-
with SqlCounter(query_count=4):
125+
with SqlCounter(query_count=3):
126126
pd.to_snowflake(df, test_table_name, if_exists="append", index=False)
127127
verify_columns(test_table_name, ["a", "c"])
128128
verify_num_rows(session, test_table_name, 9)
@@ -134,7 +134,7 @@ def test_to_snowflake_if_exists(session, test_table_name):
134134

135135

136136
@pytest.mark.parametrize("index_label", VALID_PANDAS_LABELS)
137-
@sql_count_checker(query_count=2)
137+
@sql_count_checker(query_count=1)
138138
def test_to_snowflake_index_labels(index_label, test_table_name):
139139
df = pd.DataFrame({"a": [1, 2, 3], "b": [4, 5, 6]})
140140
df.to_snowflake(
@@ -144,7 +144,7 @@ def test_to_snowflake_index_labels(index_label, test_table_name):
144144

145145

146146
@pytest.mark.parametrize("col_name", VALID_PANDAS_LABELS)
147-
@sql_count_checker(query_count=2)
147+
@sql_count_checker(query_count=1)
148148
def test_to_snowflake_column_names_from_panadas(col_name, test_table_name):
149149
df = pd.DataFrame({col_name: [1, 2, 3], "b": [4, 5, 6]})
150150
df.to_snowflake(test_table_name, if_exists="replace", index=False)
@@ -156,7 +156,7 @@ def test_to_snowflake_column_names_from_panadas(col_name, test_table_name):
156156
def test_column_names_with_read_snowflake_and_to_snowflake(
157157
col_name, if_exists, session
158158
):
159-
with SqlCounter(query_count=7 if if_exists == "append" else 6):
159+
with SqlCounter(query_count=6 if if_exists == "append" else 5):
160160
# Create a table
161161
session.sql(f"create or replace table t1 ({col_name} int)").collect()
162162
session.sql("insert into t1 values (1), (2), (3)").collect()
@@ -173,7 +173,7 @@ def test_column_names_with_read_snowflake_and_to_snowflake(
173173
assert len(data) == (6 if if_exists == "append" else 3)
174174

175175

176-
@sql_count_checker(query_count=2)
176+
@sql_count_checker(query_count=1)
177177
def test_to_snowflake_column_with_quotes(session, test_table_name):
178178
df = pd.DataFrame({'a"b': [1, 2, 3], 'a""b': [4, 5, 6]})
179179
df.to_snowflake(test_table_name, if_exists="replace", index=False)
@@ -183,28 +183,28 @@ def test_to_snowflake_column_with_quotes(session, test_table_name):
183183
# one extra query to convert index to native pandas when creating the snowpark pandas dataframe
184184
def test_to_snowflake_index_label_none(test_table_name):
185185
# no index
186-
with SqlCounter(query_count=2):
186+
with SqlCounter(query_count=1):
187187
df = pd.DataFrame({"a": [1, 2, 3], "b": [4, 5, 6]})
188188
df.to_snowflake(test_table_name, if_exists="replace")
189189
verify_columns(test_table_name, ["index", "a", "b"])
190190

191191
# named index
192-
with SqlCounter(query_count=3):
192+
with SqlCounter(query_count=2):
193193
df = pd.DataFrame(
194194
{"a": [1, 2, 3], "b": [4, 5, 6]}, index=pd.Index([2, 3, 4], name="index")
195195
)
196196
df.to_snowflake(test_table_name, if_exists="replace", index_label=[None])
197197
verify_columns(test_table_name, ["index", "a", "b"])
198198

199199
# nameless index
200-
with SqlCounter(query_count=3):
200+
with SqlCounter(query_count=2):
201201
df = pd.DataFrame({"a": [1, 2, 3], "b": [4, 5, 6]}, index=pd.Index([2, 3, 4]))
202202
df.to_snowflake(test_table_name, if_exists="replace", index_label=[None])
203203
verify_columns(test_table_name, ["index", "a", "b"])
204204

205205

206206
# one extra query to convert index to native pandas when creating the snowpark pandas dataframe
207-
@sql_count_checker(query_count=6)
207+
@sql_count_checker(query_count=4)
208208
def test_to_snowflake_index_label_none_data_column_conflict(test_table_name):
209209
df = pd.DataFrame({"index": [1, 2, 3], "a": [4, 5, 6]})
210210
df.to_snowflake(test_table_name, if_exists="replace")
@@ -260,7 +260,7 @@ def verify_num_rows(session, table_name: str, expected: int) -> None:
260260
assert actual == expected
261261

262262

263-
@sql_count_checker(query_count=2)
263+
@sql_count_checker(query_count=1)
264264
def test_timedelta_to_snowflake_with_read_snowflake(test_table_name, caplog):
265265
with caplog.at_level(logging.WARNING):
266266
df = pd.DataFrame(

0 commit comments

Comments
 (0)