4848 warning ,
4949)
5050from snowflake .snowpark .async_job import AsyncJob , _AsyncResultType
51- from snowflake .snowpark .column import Column , _to_col_if_str
51+ from snowflake .snowpark .column import Column , _to_col_if_str , _to_col_if_sql_expr
5252from snowflake .snowpark .exceptions import SnowparkClientException
5353from snowflake .snowpark .functions import sql_expr
5454from snowflake .snowpark .mock ._connection import MockServerConnection
@@ -256,6 +256,7 @@ def save_as_table(
256256 Dict [str , Union [str , Iterable [ColumnOrSqlExpr ]]]
257257 ] = None ,
258258 table_exists : Optional [bool ] = None ,
259+ overwrite_condition : Optional [ColumnOrSqlExpr ] = None ,
259260 _emit_ast : bool = True ,
260261 ** kwargs : Optional [Dict [str , Any ]],
261262 ) -> Optional [AsyncJob ]:
@@ -270,7 +271,9 @@ def save_as_table(
270271
271272 "append": Append data of this DataFrame to the existing table. Creates a table if it does not exist.
272273
273- "overwrite": Overwrite the existing table by dropping old table.
274+ "overwrite": Overwrite the existing table. By default, drops and recreates the table.
275+ When ``overwrite_condition`` is specified, performs selective overwrite: deletes only
276+ rows matching the condition, then inserts new data.
274277
275278 "truncate": Overwrite the existing table by truncating old table.
276279
@@ -330,7 +333,12 @@ def save_as_table(
330333 * iceberg_version: Overrides the version of iceberg to use. Defaults to 2 when unset.
331334 table_exists: Optional parameter to specify if the table is known to exist or not.
332335 Set to ``True`` if table exists, ``False`` if it doesn't, or ``None`` (default) for automatic detection.
333- Primarily useful for "append" and "truncate" modes to avoid running query for automatic detection.
336+ Primarily useful for "append", "truncate", and "overwrite" with overwrite_condition modes to avoid running query for automatic detection.
337+ overwrite_condition: Specifies the overwrite condition to perform atomic targeted delete-insert.
338+ Can only be used when ``mode`` is "overwrite". When provided and the table exists, rows matching
339+ the condition are atomically deleted and all rows from the DataFrame are inserted, preserving
340+ non-matching rows. When not provided, the default "overwrite" behavior applies (drop and recreate table).
341+ If the table does not exist, ``overwrite_condition`` is ignored and the table is created normally.
334342
335343
336344 Example 1::
@@ -364,6 +372,21 @@ def save_as_table(
364372 ... "partition_by": ["a", bucket(3, col("b"))],
365373 ... }
366374 >>> df.write.mode("overwrite").save_as_table("my_table", iceberg_config=iceberg_config) # doctest: +SKIP
375+
376+ Example 3::
377+
378+ Using overwrite_condition for targeted delete and insert:
379+
380+ >>> from snowflake.snowpark.functions import col
381+ >>> df = session.create_dataframe([[1, "a"], [2, "b"], [3, "c"]], schema=["id", "val"])
382+ >>> df.write.mode("overwrite").save_as_table("my_table", table_type="temporary")
383+ >>> session.table("my_table").order_by("id").collect()
384+ [Row(ID=1, VAL='a'), Row(ID=2, VAL='b'), Row(ID=3, VAL='c')]
385+
386+ >>> new_df = session.create_dataframe([[2, "updated2"], [5, "updated5"]], schema=["id", "val"])
387+ >>> new_df.write.mode("overwrite").save_as_table("my_table", overwrite_condition="id = 1 or val = 'b'")
388+ >>> session.table("my_table").order_by("id").collect()
389+ [Row(ID=2, VAL='updated2'), Row(ID=3, VAL='c'), Row(ID=5, VAL='updated5')]
367390 """
368391
369392 statement_params = track_data_source_statement_params (
@@ -392,6 +415,8 @@ def save_as_table(
392415 # change_tracking: Optional[bool] = None,
393416 # copy_grants: bool = False,
394417 # iceberg_config: Optional[dict] = None,
418+ # table_exists: Optional[bool] = None,
419+ # overwrite_condition: Optional[ColumnOrSqlExpr] = None,
395420
396421 build_table_name (expr .table_name , table_name )
397422
@@ -433,6 +458,12 @@ def save_as_table(
433458 t = expr .iceberg_config .add ()
434459 t ._1 = k
435460 build_expr_from_python_val (t ._2 , v )
461+ if table_exists is not None :
462+ expr .table_exists .value = table_exists
463+ if overwrite_condition is not None :
464+ build_expr_from_snowpark_column_or_sql_str (
465+ expr .overwrite_condition , overwrite_condition
466+ )
436467
437468 self ._dataframe ._session ._ast_batch .eval (stmt )
438469
@@ -486,18 +517,33 @@ def save_as_table(
486517 f"Unsupported table type. Expected table types: { SUPPORTED_TABLE_TYPES } "
487518 )
488519
520+ # overwrite_condition must be used with OVERWRITE mode only
521+ if overwrite_condition is not None and save_mode != SaveMode .OVERWRITE :
522+ raise ValueError (
523+ f"'overwrite_condition' is only supported with mode='overwrite'. "
524+ f"Got mode='{ save_mode .value } '."
525+ )
526+
527+ overwrite_condition_expr = (
528+ _to_col_if_sql_expr (
529+ overwrite_condition , "DataFrameWriter.save_as_table"
530+ )._expression
531+ if overwrite_condition is not None
532+ else None
533+ )
534+
489535 session = self ._dataframe ._session
536+ needs_table_exists_check = save_mode in [
537+ SaveMode .APPEND ,
538+ SaveMode .TRUNCATE ,
539+ ] or (save_mode == SaveMode .OVERWRITE and overwrite_condition is not None )
490540 if (
491541 table_exists is None
492542 and not isinstance (session ._conn , MockServerConnection )
493- and save_mode
494- in [
495- SaveMode .APPEND ,
496- SaveMode .TRUNCATE ,
497- ]
543+ and needs_table_exists_check
498544 ):
499545 # whether the table already exists in the database
500- # determines the compiled SQL for APPEND and TRUNCATE mode
546+ # determines the compiled SQL for APPEND, TRUNCATE, and OVERWRITE with overwrite_condition
501547 # if the table does not exist, we need to create it first;
502548 # if the table exists, we can skip the creation step and insert data directly
503549 table_exists = session ._table_exists (table_name )
@@ -518,6 +564,7 @@ def save_as_table(
518564 copy_grants ,
519565 iceberg_config ,
520566 table_exists ,
567+ overwrite_condition_expr ,
521568 )
522569 snowflake_plan = session ._analyzer .resolve (create_table_logic_plan )
523570 result = session ._conn .execute (
0 commit comments