@@ -220,7 +220,9 @@ def _validate_args(
220220 mode : Literal ["append" , "overwrite" , "overwrite_partitions" ],
221221 partition_cols : list [str ] | None ,
222222 merge_cols : list [str ] | None ,
223- merge_condition : Literal ["update" , "ignore" ],
223+ merge_on_condition : str | None ,
224+ merge_condition : Literal ["update" , "ignore" , "conditional_merge" ],
225+ merge_conditional_clauses : list [_MergeClause ] | None ,
224226) -> None :
225227 if df .empty is True :
226228 raise exceptions .EmptyDataFrame ("DataFrame cannot be empty." )
@@ -229,6 +231,45 @@ def _validate_args(
229231 raise exceptions .InvalidArgumentCombination (
230232 "Either path or workgroup path must be specified to store the temporary results."
231233 )
234+
235+ if merge_cols and merge_on_condition :
236+ raise exceptions .InvalidArgumentCombination (
237+ "Cannot specify both merge_cols and merge_on_condition. Use either merge_cols for simple equality matching or merge_on_condition for custom logic."
238+ )
239+
240+ if merge_conditional_clauses and merge_condition != "conditional_merge" :
241+ raise exceptions .InvalidArgumentCombination (
242+ "merge_conditional_clauses can only be used when merge_condition is 'conditional_merge'."
243+ )
244+
245+ if (merge_cols or merge_on_condition ) and merge_condition not in ["update" , "ignore" , "conditional_merge" ]:
246+ raise exceptions .InvalidArgumentValue (
247+ f"Invalid merge_condition: { merge_condition } . Valid values: ['update', 'ignore', 'conditional_merge']"
248+ )
249+
250+ if merge_condition == "conditional_merge" :
251+ if not merge_conditional_clauses :
252+ raise exceptions .InvalidArgumentCombination (
253+ "merge_conditional_clauses must be provided when merge_condition is 'conditional_merge'."
254+ )
255+
256+ for i , clause in enumerate (merge_conditional_clauses ):
257+ if "when" not in clause :
258+ raise exceptions .InvalidArgumentValue (
259+ f"merge_conditional_clauses[{ i } ] must contain 'when' field."
260+ )
261+ if "action" not in clause :
262+ raise exceptions .InvalidArgumentValue (
263+ f"merge_conditional_clauses[{ i } ] must contain 'action' field."
264+ )
265+ if clause ["when" ] not in ['MATCHED' , 'NOT_MATCHED' , 'NOT_MATCHED_BY_SOURCE' ]:
266+ raise exceptions .InvalidArgumentValue (
267+ f"merge_conditional_clauses[{ i } ]['when'] must be one of ['MATCHED', 'NOT_MATCHED', 'NOT_MATCHED_BY_SOURCE']."
268+ )
269+ if clause ["action" ] not in ["UPDATE" , "DELETE" , "INSERT" , "IGNORE" ]:
270+ raise exceptions .InvalidArgumentValue (
271+ f"merge_conditional_clauses[{ i } ]['action'] must be one of ['UPDATE', 'DELETE', 'INSERT', 'IGNORE']."
272+ )
232273
233274 if mode == "overwrite_partitions" :
234275 if not partition_cols :
@@ -240,12 +281,6 @@ def _validate_args(
240281 "When mode is 'overwrite_partitions' merge_cols must not be specified."
241282 )
242283
243- if merge_cols and merge_condition not in ["update" , "ignore" ]:
244- raise exceptions .InvalidArgumentValue (
245- f"Invalid merge_condition: { merge_condition } . Valid values: ['update', 'ignore']"
246- )
247-
248-
249284def _merge_iceberg (
250285 df : pd .DataFrame ,
251286 database : str ,
@@ -501,7 +536,9 @@ def to_iceberg( # noqa: PLR0913
501536 mode = mode ,
502537 partition_cols = partition_cols ,
503538 merge_cols = merge_cols ,
539+ merge_on_condition = merge_on_condition ,
504540 merge_condition = merge_condition ,
541+ merge_conditional_clauses = merge_conditional_clauses ,
505542 )
506543
507544 glue_table_settings = glue_table_settings if glue_table_settings else {}
0 commit comments