@@ -213,6 +213,7 @@ def _validate_args(
213213 mode : Literal ["append" , "overwrite" , "overwrite_partitions" ],
214214 partition_cols : list [str ] | None ,
215215 merge_cols : list [str ] | None ,
216+ merge_condition : Literal ["update" , "ignore" ],
216217) -> None :
217218 if df .empty is True :
218219 raise exceptions .EmptyDataFrame ("DataFrame cannot be empty." )
@@ -232,6 +233,11 @@ def _validate_args(
232233 "When mode is 'overwrite_partitions' merge_cols must not be specified."
233234 )
234235
236+ if merge_cols and merge_condition not in ["update" , "ignore" ]:
237+ raise exceptions .InvalidArgumentValue (
238+ f"Invalid merge_condition: { merge_condition } . Valid values: ['update', 'ignore']"
239+ )
240+
235241
236242@apply_configs
237243@_utils .validate_distributed_kwargs (
@@ -246,6 +252,7 @@ def to_iceberg(
246252 table_location : str | None = None ,
247253 partition_cols : list [str ] | None = None ,
248254 merge_cols : list [str ] | None = None ,
255+ merge_condition : Literal ["update" , "ignore" ] = "update" ,
249256 keep_files : bool = True ,
250257 data_source : str | None = None ,
251258 s3_output : str | None = None ,
@@ -292,6 +299,8 @@ def to_iceberg(
292299 List of column names that will be used for conditional inserts and updates.
293300
294301 https://docs.aws.amazon.com/athena/latest/ug/merge-into-statement.html
302+ merge_condition: str, optional
303+ The condition to be used in the MERGE INTO statement. Valid values: ['update', 'ignore'].
295304 keep_files : bool
296305 Whether staging files produced by Athena are retained. 'True' by default.
297306 data_source : str, optional
@@ -376,6 +385,7 @@ def to_iceberg(
376385 mode = mode ,
377386 partition_cols = partition_cols ,
378387 merge_cols = merge_cols ,
388+ merge_condition = merge_condition ,
379389 )
380390
381391 glue_table_settings = cast (
@@ -497,12 +507,16 @@ def to_iceberg(
497507 # Insert or merge into Iceberg table
498508 sql_statement : str
499509 if merge_cols :
510+ if merge_condition == "update" :
511+ match_condition = f"""WHEN MATCHED THEN
512+ UPDATE SET { ', ' .join ([f'"{ x } " = source."{ x } "' for x in df .columns ])} """
513+ else :
514+ match_condition = ""
500515 sql_statement = f"""
501516 MERGE INTO "{ database } "."{ table } " target
502517 USING "{ database } "."{ temp_table } " source
503518 ON { ' AND ' .join ([f'target."{ x } " = source."{ x } "' for x in merge_cols ])}
504- WHEN MATCHED THEN
505- UPDATE SET { ', ' .join ([f'"{ x } " = source."{ x } "' for x in df .columns ])}
519+ { match_condition }
506520 WHEN NOT MATCHED THEN
507521 INSERT ({ ', ' .join ([f'"{ x } "' for x in df .columns ])} )
508522 VALUES ({ ', ' .join ([f'source."{ x } "' for x in df .columns ])} )
0 commit comments