Skip to content

Commit 47696e4

Browse files
SNOW-1830529 Add decoder logic for to_local_iterator, to_pandas, to_snowpark_pandas, and to_pandas_batch (#2866)
1. Which Jira issue is this PR addressing? Make sure that there is an accompanying issue to your PR. Fixes SNOW-1830529 2. Fill out the following pre-review checklist: - [ ] I am adding a new automated test(s) to verify correctness of my new code - [ ] If this test skips Local Testing mode, I'm requesting review from @snowflakedb/local-testing - [ ] I am adding new logging messages - [ ] I am adding a new telemetry message - [ ] I am adding new credentials - [ ] I am adding a new dependency - [ ] If this is a new feature/behavior, I'm adding the Local Testing parity changes. - [x] I acknowledge that I have ensured my changes to be thread-safe. Follow the link for more information: [Thread-safe Developer Guidelines](https://github.com/snowflakedb/snowpark-python/blob/main/CONTRIBUTING.md#thread-safe-development) 3. Please describe how your code solves the related issue. Added decoder logic for `to_local_iterator`, `to_pandas`, `to_snowpark_pandas`, and `to_pandas_batch`.
1 parent 7a22e75 commit 47696e4

File tree

4 files changed

+87
-3
lines changed

4 files changed

+87
-3
lines changed

src/snowflake/snowpark/mock/_nop_connection.py

Lines changed: 26 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -21,7 +21,10 @@
2121
from snowflake.connector.cursor import ResultMetadata
2222
from snowflake.snowpark._internal.analyzer.analyzer_utils import unquote_if_quoted
2323
from snowflake.snowpark._internal.analyzer.expression import Attribute
24-
from snowflake.snowpark._internal.analyzer.snowflake_plan import SnowflakePlan
24+
from snowflake.snowpark._internal.analyzer.snowflake_plan import (
25+
SnowflakePlan,
26+
PlanQueryType,
27+
)
2528
from snowflake.snowpark._internal.analyzer.snowflake_plan_node import (
2629
LogicalPlan,
2730
SaveMode,
@@ -120,6 +123,28 @@ def execute(
120123
]:
121124
source_plan = plan.source_plan
122125

126+
if hasattr(source_plan, "execution_queries"):
127+
# If temp read-only table, explicitly create it.
128+
# This occurs when code such as to_snowpark_pandas is run where the Snowpark version of the table is
129+
# cloned and then read.
130+
from snowflake.snowpark.mock import TableEmulator
131+
132+
for plan_query_type, query in source_plan.execution_queries.items():
133+
if query:
134+
query_sql = query[0].sql
135+
if (
136+
plan_query_type == PlanQueryType.QUERIES
137+
and "TEMPORARY READ ONLY TABLE" in query_sql
138+
):
139+
temp_table_name = query_sql.split("TEMPORARY READ ONLY TABLE ")[
140+
1
141+
].split(" ")[0]
142+
self.entity_registry.write_table(
143+
temp_table_name,
144+
TableEmulator({"A": [1], "B": [1], "C": [1]}),
145+
SaveMode.IGNORE,
146+
)
147+
123148
if isinstance(source_plan, SnowflakeCreateTable):
124149
result = self.entity_registry.write_table(
125150
source_plan.table_name,

src/snowflake/snowpark/mock/_nop_plan.py

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -6,6 +6,7 @@
66
from typing import Any, Dict, List, Optional
77

88
import snowflake.snowpark
9+
from snowflake.snowpark.mock import TableEmulator
910
from snowflake.snowpark._internal.analyzer.analyzer_utils import unquote_if_quoted
1011
from snowflake.snowpark._internal.analyzer.binary_plan_node import Join
1112
from snowflake.snowpark._internal.analyzer.expression import (
@@ -117,6 +118,9 @@ def resolve_attributes(
117118
pivot_attrs.extend(pivot_result_cols)
118119
attributes = pivot_attrs
119120

121+
elif isinstance(plan, TableEmulator):
122+
attributes = [Attribute(name, _NumericType(), False) for name in plan.columns]
123+
120124
elif isinstance(plan, TableUpdate):
121125
attributes = [
122126
Attribute(name, _NumericType(), False)

tests/ast/data/Dataframe.to_snowpark_pandas.test

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -30,8 +30,10 @@ body {
3030
expr {
3131
sp_table {
3232
name {
33-
sp_table_name_flat {
34-
name: "table1"
33+
name {
34+
sp_name_flat {
35+
name: "table1"
36+
}
3537
}
3638
}
3739
src {

tests/ast/decoder.py

Lines changed: 53 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1437,6 +1437,41 @@ def decode_expr(self, expr: proto.Expr) -> Any:
14371437
else:
14381438
return df.to_df(col_names)
14391439

1440+
case "sp_dataframe_to_local_iterator":
1441+
df = self.symbol_table[
1442+
expr.sp_dataframe_to_local_iterator.id.bitfield1
1443+
][1]
1444+
statement_params = self.get_statement_params(
1445+
MessageToDict(expr.sp_dataframe_to_local_iterator)
1446+
)
1447+
block = expr.sp_dataframe_to_local_iterator.block
1448+
case_sensitive = expr.sp_dataframe_to_local_iterator.case_sensitive
1449+
return df.to_local_iterator(
1450+
statement_params=statement_params,
1451+
block=block,
1452+
case_sensitive=case_sensitive,
1453+
)
1454+
1455+
case "sp_dataframe_to_pandas":
1456+
df = self.symbol_table[expr.sp_dataframe_to_pandas.id.bitfield1][1]
1457+
statement_params = self.get_statement_params(
1458+
MessageToDict(expr.sp_dataframe_to_pandas)
1459+
)
1460+
block = expr.sp_dataframe_to_pandas.block
1461+
return df.to_pandas(statement_params=statement_params, block=block)
1462+
1463+
case "sp_dataframe_to_pandas_batches":
1464+
df = self.symbol_table[
1465+
expr.sp_dataframe_to_pandas_batches.id.bitfield1
1466+
][1]
1467+
statement_params = self.get_statement_params(
1468+
MessageToDict(expr.sp_dataframe_to_pandas_batches)
1469+
)
1470+
block = expr.sp_dataframe_to_pandas_batches.block
1471+
return df.to_pandas_batches(
1472+
statement_params=statement_params, block=block
1473+
)
1474+
14401475
case "sp_dataframe_unpivot":
14411476
df = self.decode_expr(expr.sp_dataframe_unpivot.df)
14421477
column_list = [
@@ -1520,6 +1555,24 @@ def decode_expr(self, expr: proto.Expr) -> Any:
15201555
table_name = self.decode_name_expr(expr.sp_table.name)
15211556
return self.session.table(table_name)
15221557

1558+
case "sp_to_snowpark_pandas":
1559+
df = self.decode_expr(expr.sp_to_snowpark_pandas.df)
1560+
d = MessageToDict(expr.sp_to_snowpark_pandas)
1561+
index_col, columns = None, None
1562+
if "indexCol" in d:
1563+
index_col = [
1564+
col for col in expr.sp_to_snowpark_pandas.index_col.list
1565+
]
1566+
if "columns" in d:
1567+
columns = [col for col in expr.sp_to_snowpark_pandas.columns.list]
1568+
# Returning the result of to_snowpark_pandas causes recursion issues when local_testing_mode is enabled.
1569+
# When disabled, to_snowpark_pandas will raise an error since df will be an empty Dataframe
1570+
# (passing non-None values of index_col or columns will make the snowpark_to_pandas_helper complain
1571+
# about columns that do not exist).
1572+
# Therefore, silently execute to_snowpark_pandas to record the AST and return None.
1573+
df.to_snowpark_pandas(index_col, columns)
1574+
return None
1575+
15231576
case "udf":
15241577
return_type = self.decode_data_type_expr(expr.udf.return_type)
15251578
input_types = [

0 commit comments

Comments
 (0)