Skip to content

Commit 266334b

Browse files
SNOW-2872192: Support targeted delete-insert in save_as_table (snowflakedb#4031)
1 parent b0b209f commit 266334b

File tree

8 files changed

+1022
-445
lines changed

8 files changed

+1022
-445
lines changed

CHANGELOG.md

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -6,6 +6,8 @@
66

77
#### New Features
88

9+
- Added support for targeted delete-insert via the `overwrite_condition` parameter in `DataFrameWriter.save_as_table`
10+
911
#### Bug Fixes
1012

1113
#### Improvements

src/snowflake/snowpark/_internal/analyzer/analyzer.py

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1228,6 +1228,12 @@ def do_resolve_with_resolved_children(
12281228
child_attributes=resolved_child.attributes,
12291229
iceberg_config=iceberg_config,
12301230
table_exists=logical_plan.table_exists,
1231+
overwrite_condition=self.analyze(
1232+
logical_plan.overwrite_condition,
1233+
df_aliased_col_name_to_real_col_name,
1234+
)
1235+
if logical_plan.overwrite_condition
1236+
else None,
12311237
)
12321238

12331239
if isinstance(logical_plan, Limit):

src/snowflake/snowpark/_internal/analyzer/snowflake_plan.py

Lines changed: 48 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1267,6 +1267,7 @@ def save_as_table(
12671267
child_attributes: Optional[List[Attribute]],
12681268
iceberg_config: Optional[dict] = None,
12691269
table_exists: Optional[bool] = None,
1270+
overwrite_condition: Optional[str] = None,
12701271
) -> SnowflakePlan:
12711272
"""Returns a SnowflakePlan to materialize the child plan into a table.
12721273
@@ -1417,6 +1418,47 @@ def get_create_and_insert_plan(child: SnowflakePlan, replace, error):
14171418
referenced_ctes=child.referenced_ctes,
14181419
)
14191420

1421+
def get_overwrite_delete_insert_plan(child: SnowflakePlan):
1422+
"""Build a plan for targeted delete + insert with transaction.
1423+
1424+
Deletes rows matching the overwrite_condition condition, then inserts
1425+
all rows from the source DataFrame. Wrapped in a transaction for atomicity.
1426+
"""
1427+
child = self.add_result_scan_if_not_select(child)
1428+
1429+
return SnowflakePlan(
1430+
[
1431+
*child.queries[:-1],
1432+
Query("BEGIN TRANSACTION"),
1433+
Query(
1434+
delete_statement(
1435+
table_name=full_table_name,
1436+
condition=overwrite_condition,
1437+
source_data=None,
1438+
),
1439+
params=child.queries[-1].params,
1440+
is_ddl_on_temp_object=is_temp_table_type,
1441+
),
1442+
Query(
1443+
insert_into_statement(
1444+
table_name=full_table_name,
1445+
child=child.queries[-1].sql,
1446+
column_names=column_names,
1447+
),
1448+
params=child.queries[-1].params,
1449+
is_ddl_on_temp_object=is_temp_table_type,
1450+
),
1451+
Query("COMMIT"),
1452+
],
1453+
schema_query=None,
1454+
post_actions=child.post_actions,
1455+
expr_to_alias={},
1456+
source_plan=source_plan,
1457+
api_calls=child.api_calls,
1458+
session=self.session,
1459+
referenced_ctes=child.referenced_ctes,
1460+
)
1461+
14201462
if mode == SaveMode.APPEND:
14211463
assert table_exists is not None
14221464
if table_exists:
@@ -1446,7 +1488,12 @@ def get_create_and_insert_plan(child: SnowflakePlan, replace, error):
14461488
return get_create_table_as_select_plan(child, replace=True, error=True)
14471489

14481490
elif mode == SaveMode.OVERWRITE:
1449-
return get_create_table_as_select_plan(child, replace=True, error=True)
1491+
if overwrite_condition is not None and table_exists:
1492+
# Selective overwrite: delete matching rows, then insert
1493+
return get_overwrite_delete_insert_plan(child)
1494+
else:
1495+
# Default overwrite: drop and recreate table
1496+
return get_create_table_as_select_plan(child, replace=True, error=True)
14501497

14511498
elif mode == SaveMode.IGNORE:
14521499
return get_create_table_as_select_plan(child, replace=False, error=False)

src/snowflake/snowpark/_internal/analyzer/snowflake_plan_node.py

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -243,6 +243,7 @@ def __init__(
243243
copy_grants: bool = False,
244244
iceberg_config: Optional[dict] = None,
245245
table_exists: Optional[bool] = None,
246+
overwrite_condition: Optional[Expression] = None,
246247
) -> None:
247248
super().__init__()
248249

@@ -267,6 +268,7 @@ def __init__(
267268
# whether the table already exists in the database
268269
# determines the compiled SQL for APPEND and TRUNCATE mode
269270
self.table_exists = table_exists
271+
self.overwrite_condition = overwrite_condition
270272

271273
@property
272274
def individual_node_complexity(self) -> Dict[PlanNodeCategory, int]:

src/snowflake/snowpark/_internal/proto/ast.proto

Lines changed: 17 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
// N.B. This file is generated by `//ir-dsl-c`. DO NOT EDIT!
2-
// Generated from `{[email protected]:snowflakedb/snowflake.git}/Snowpark/ast`.
2+
// Generated from `{[email protected]:snowflake-eng/snowflake.git}/Snowpark/ast`.
33

44
syntax = "proto3";
55

@@ -987,7 +987,7 @@ message DataframeCollect {
987987
repeated Tuple_String_String statement_params = 7;
988988
}
989989

990-
// dataframe-io.ir:165
990+
// dataframe-io.ir:167
991991
message DataframeCopyIntoTable {
992992
repeated Tuple_String_Expr copy_options = 1;
993993
Expr df = 2;
@@ -1011,7 +1011,7 @@ message DataframeCount {
10111011
repeated Tuple_String_String statement_params = 4;
10121012
}
10131013

1014-
// dataframe-io.ir:148
1014+
// dataframe-io.ir:150
10151015
message DataframeCreateOrReplaceDynamicTable {
10161016
repeated Expr clustering_keys = 1;
10171017
google.protobuf.StringValue comment = 2;
@@ -1030,7 +1030,7 @@ message DataframeCreateOrReplaceDynamicTable {
10301030
string warehouse = 15;
10311031
}
10321032

1033-
// dataframe-io.ir:139
1033+
// dataframe-io.ir:141
10341034
message DataframeCreateOrReplaceView {
10351035
google.protobuf.StringValue comment = 1;
10361036
bool copy_grants = 2;
@@ -2685,7 +2685,7 @@ message WindowSpecRowsBetween {
26852685
WindowSpecExpr wnd = 4;
26862686
}
26872687

2688-
// dataframe-io.ir:116
2688+
// dataframe-io.ir:118
26892689
message WriteCopyIntoLocation {
26902690
bool block = 1;
26912691
repeated Tuple_String_Expr copy_options = 2;
@@ -2704,7 +2704,7 @@ message WriteCopyIntoLocation {
27042704
Expr writer = 15;
27052705
}
27062706

2707-
// dataframe-io.ir:123
2707+
// dataframe-io.ir:125
27082708
message WriteCsv {
27092709
bool block = 1;
27102710
repeated Tuple_String_Expr copy_options = 2;
@@ -2731,15 +2731,15 @@ message WriteFile {
27312731
}
27322732
}
27332733

2734-
// dataframe-io.ir:129
2734+
// dataframe-io.ir:131
27352735
message WriteInsertInto {
27362736
bool overwrite = 1;
27372737
SrcPosition src = 2;
27382738
NameRef table_name = 3;
27392739
Expr writer = 4;
27402740
}
27412741

2742-
// dataframe-io.ir:125
2742+
// dataframe-io.ir:127
27432743
message WriteJson {
27442744
bool block = 1;
27452745
repeated Tuple_String_Expr copy_options = 2;
@@ -2773,7 +2773,7 @@ message WritePandas {
27732773
string table_type = 13;
27742774
}
27752775

2776-
// dataframe-io.ir:127
2776+
// dataframe-io.ir:129
27772777
message WriteParquet {
27782778
bool block = 1;
27792779
repeated Tuple_String_Expr copy_options = 2;
@@ -2790,7 +2790,7 @@ message WriteParquet {
27902790
Expr writer = 13;
27912791
}
27922792

2793-
// dataframe-io.ir:121
2793+
// dataframe-io.ir:123
27942794
message WriteSave {
27952795
bool block = 1;
27962796
repeated Tuple_String_Expr copy_options = 2;
@@ -2821,9 +2821,11 @@ message WriteTable {
28212821
repeated Tuple_String_Expr iceberg_config = 10;
28222822
google.protobuf.Int64Value max_data_extension_time = 11;
28232823
SaveMode mode = 12;
2824-
SrcPosition src = 13;
2825-
repeated Tuple_String_String statement_params = 14;
2826-
NameRef table_name = 15;
2827-
string table_type = 16;
2828-
Expr writer = 17;
2824+
Expr overwrite_condition = 13;
2825+
SrcPosition src = 14;
2826+
repeated Tuple_String_String statement_params = 15;
2827+
google.protobuf.BoolValue table_exists = 16;
2828+
NameRef table_name = 17;
2829+
string table_type = 18;
2830+
Expr writer = 19;
28292831
}

src/snowflake/snowpark/dataframe_writer.py

Lines changed: 56 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -48,7 +48,7 @@
4848
warning,
4949
)
5050
from snowflake.snowpark.async_job import AsyncJob, _AsyncResultType
51-
from snowflake.snowpark.column import Column, _to_col_if_str
51+
from snowflake.snowpark.column import Column, _to_col_if_str, _to_col_if_sql_expr
5252
from snowflake.snowpark.exceptions import SnowparkClientException
5353
from snowflake.snowpark.functions import sql_expr
5454
from snowflake.snowpark.mock._connection import MockServerConnection
@@ -256,6 +256,7 @@ def save_as_table(
256256
Dict[str, Union[str, Iterable[ColumnOrSqlExpr]]]
257257
] = None,
258258
table_exists: Optional[bool] = None,
259+
overwrite_condition: Optional[ColumnOrSqlExpr] = None,
259260
_emit_ast: bool = True,
260261
**kwargs: Optional[Dict[str, Any]],
261262
) -> Optional[AsyncJob]:
@@ -270,7 +271,9 @@ def save_as_table(
270271
271272
"append": Append data of this DataFrame to the existing table. Creates a table if it does not exist.
272273
273-
"overwrite": Overwrite the existing table by dropping old table.
274+
"overwrite": Overwrite the existing table. By default, drops and recreates the table.
275+
When ``overwrite_condition`` is specified, performs selective overwrite: deletes only
276+
rows matching the condition, then inserts new data.
274277
275278
"truncate": Overwrite the existing table by truncating old table.
276279
@@ -330,7 +333,12 @@ def save_as_table(
330333
* iceberg_version: Overrides the version of iceberg to use. Defaults to 2 when unset.
331334
table_exists: Optional parameter to specify if the table is known to exist or not.
332335
Set to ``True`` if table exists, ``False`` if it doesn't, or ``None`` (default) for automatic detection.
333-
Primarily useful for "append" and "truncate" modes to avoid running query for automatic detection.
336+
Primarily useful for "append", "truncate", and "overwrite" with overwrite_condition modes to avoid running query for automatic detection.
337+
overwrite_condition: Specifies the overwrite condition to perform atomic targeted delete-insert.
338+
Can only be used when ``mode`` is "overwrite". When provided and the table exists, rows matching
339+
the condition are atomically deleted and all rows from the DataFrame are inserted, preserving
340+
non-matching rows. When not provided, the default "overwrite" behavior applies (drop and recreate table).
341+
If the table does not exist, ``overwrite_condition`` is ignored and the table is created normally.
334342
335343
336344
Example 1::
@@ -364,6 +372,21 @@ def save_as_table(
364372
... "partition_by": ["a", bucket(3, col("b"))],
365373
... }
366374
>>> df.write.mode("overwrite").save_as_table("my_table", iceberg_config=iceberg_config) # doctest: +SKIP
375+
376+
Example 3::
377+
378+
Using overwrite_condition for targeted delete and insert:
379+
380+
>>> from snowflake.snowpark.functions import col
381+
>>> df = session.create_dataframe([[1, "a"], [2, "b"], [3, "c"]], schema=["id", "val"])
382+
>>> df.write.mode("overwrite").save_as_table("my_table", table_type="temporary")
383+
>>> session.table("my_table").order_by("id").collect()
384+
[Row(ID=1, VAL='a'), Row(ID=2, VAL='b'), Row(ID=3, VAL='c')]
385+
386+
>>> new_df = session.create_dataframe([[2, "updated2"], [5, "updated5"]], schema=["id", "val"])
387+
>>> new_df.write.mode("overwrite").save_as_table("my_table", overwrite_condition="id = 1 or val = 'b'")
388+
>>> session.table("my_table").order_by("id").collect()
389+
[Row(ID=2, VAL='updated2'), Row(ID=3, VAL='c'), Row(ID=5, VAL='updated5')]
367390
"""
368391

369392
statement_params = track_data_source_statement_params(
@@ -392,6 +415,8 @@ def save_as_table(
392415
# change_tracking: Optional[bool] = None,
393416
# copy_grants: bool = False,
394417
# iceberg_config: Optional[dict] = None,
418+
# table_exists: Optional[bool] = None,
419+
# overwrite_condition: Optional[ColumnOrSqlExpr] = None,
395420

396421
build_table_name(expr.table_name, table_name)
397422

@@ -433,6 +458,12 @@ def save_as_table(
433458
t = expr.iceberg_config.add()
434459
t._1 = k
435460
build_expr_from_python_val(t._2, v)
461+
if table_exists is not None:
462+
expr.table_exists.value = table_exists
463+
if overwrite_condition is not None:
464+
build_expr_from_snowpark_column_or_sql_str(
465+
expr.overwrite_condition, overwrite_condition
466+
)
436467

437468
self._dataframe._session._ast_batch.eval(stmt)
438469

@@ -486,18 +517,33 @@ def save_as_table(
486517
f"Unsupported table type. Expected table types: {SUPPORTED_TABLE_TYPES}"
487518
)
488519

520+
# overwrite_condition must be used with OVERWRITE mode only
521+
if overwrite_condition is not None and save_mode != SaveMode.OVERWRITE:
522+
raise ValueError(
523+
f"'overwrite_condition' is only supported with mode='overwrite'. "
524+
f"Got mode='{save_mode.value}'."
525+
)
526+
527+
overwrite_condition_expr = (
528+
_to_col_if_sql_expr(
529+
overwrite_condition, "DataFrameWriter.save_as_table"
530+
)._expression
531+
if overwrite_condition is not None
532+
else None
533+
)
534+
489535
session = self._dataframe._session
536+
needs_table_exists_check = save_mode in [
537+
SaveMode.APPEND,
538+
SaveMode.TRUNCATE,
539+
] or (save_mode == SaveMode.OVERWRITE and overwrite_condition is not None)
490540
if (
491541
table_exists is None
492542
and not isinstance(session._conn, MockServerConnection)
493-
and save_mode
494-
in [
495-
SaveMode.APPEND,
496-
SaveMode.TRUNCATE,
497-
]
543+
and needs_table_exists_check
498544
):
499545
# whether the table already exists in the database
500-
# determines the compiled SQL for APPEND and TRUNCATE mode
546+
# determines the compiled SQL for APPEND, TRUNCATE, and OVERWRITE with overwrite_condition
501547
# if the table does not exist, we need to create it first;
502548
# if the table exists, we can skip the creation step and insert data directly
503549
table_exists = session._table_exists(table_name)
@@ -518,6 +564,7 @@ def save_as_table(
518564
copy_grants,
519565
iceberg_config,
520566
table_exists,
567+
overwrite_condition_expr,
521568
)
522569
snowflake_plan = session._analyzer.resolve(create_table_logic_plan)
523570
result = session._conn.execute(

0 commit comments

Comments
 (0)