@@ -999,6 +999,119 @@ def test_athena_delete_from_iceberg_empty_df_error(
999999 keep_files = False ,
10001000 )
10011001
1002+ def test_to_iceberg_merge_cols_and_merge_on_clause_error (
1003+ path : str , path2 : str , glue_database : str , glue_table : str
1004+ ) -> None :
1005+ df = pd .DataFrame ({"id" : [1 ], "val" : ["a" ]})
1006+ with pytest .raises (wr .exceptions .InvalidArgumentCombination ):
1007+ wr .athena .to_iceberg (
1008+ df = df ,
1009+ database = glue_database ,
1010+ table = glue_table ,
1011+ table_location = path ,
1012+ temp_path = path2 ,
1013+ merge_cols = ["id" ],
1014+ merge_on_clause = "id = source.id" ,
1015+ )
1016+
1017+ def test_to_iceberg_merge_match_nulls_with_merge_on_clause_error (
1018+ path : str , path2 : str , glue_database : str , glue_table : str
1019+ ) -> None :
1020+ df = pd .DataFrame ({"id" : [1 ], "val" : ["a" ]})
1021+ with pytest .raises (wr .exceptions .InvalidArgumentCombination ):
1022+ wr .athena .to_iceberg (
1023+ df = df ,
1024+ database = glue_database ,
1025+ table = glue_table ,
1026+ table_location = path ,
1027+ temp_path = path2 ,
1028+ merge_on_clause = "id = source.id" ,
1029+ merge_match_nulls = True ,
1030+ )
1031+
1032+ def test_to_iceberg_merge_conditional_clauses_without_conditional_merge_error (
1033+ path : str , path2 : str , glue_database : str , glue_table : str
1034+ ) -> None :
1035+ df = pd .DataFrame ({"id" : [1 ], "val" : ["a" ]})
1036+ with pytest .raises (wr .exceptions .InvalidArgumentCombination ):
1037+ wr .athena .to_iceberg (
1038+ df = df ,
1039+ database = glue_database ,
1040+ table = glue_table ,
1041+ table_location = path ,
1042+ temp_path = path2 ,
1043+ merge_cols = ["id" ],
1044+ merge_conditional_clauses = [{"when" : "MATCHED" , "action" : "UPDATE" }],
1045+ merge_condition = "update" ,
1046+ )
1047+
1048+ def test_to_iceberg_conditional_merge_without_clauses_error (
1049+ path : str , path2 : str , glue_database : str , glue_table : str
1050+ ) -> None :
1051+ df = pd .DataFrame ({"id" : [1 ], "val" : ["a" ]})
1052+ with pytest .raises (wr .exceptions .InvalidArgumentCombination ):
1053+ wr .athena .to_iceberg (
1054+ df = df ,
1055+ database = glue_database ,
1056+ table = glue_table ,
1057+ table_location = path ,
1058+ temp_path = path2 ,
1059+ merge_cols = ["id" ],
1060+ merge_condition = "conditional_merge" ,
1061+ )
1062+
1063+ def test_to_iceberg_invalid_merge_condition_error (
1064+ path : str , path2 : str , glue_database : str , glue_table : str
1065+ ) -> None :
1066+ df = pd .DataFrame ({"id" : [1 ], "val" : ["a" ]})
1067+ with pytest .raises (wr .exceptions .InvalidArgumentValue ):
1068+ wr .athena .to_iceberg (
1069+ df = df ,
1070+ database = glue_database ,
1071+ table = glue_table ,
1072+ table_location = path ,
1073+ temp_path = path2 ,
1074+ merge_cols = ["id" ],
1075+ merge_condition = "not_a_valid_condition" ,
1076+ )
1077+
1078+ def test_to_iceberg_conditional_merge_happy_path (
1079+ path : str , path2 : str , glue_database : str , glue_table : str
1080+ ) -> None :
1081+ df = pd .DataFrame ({"id" : [1 , 2 ], "val" : ["a" , "b" ]})
1082+ wr .athena .to_iceberg (
1083+ df = df ,
1084+ database = glue_database ,
1085+ table = glue_table ,
1086+ table_location = path ,
1087+ temp_path = path2 ,
1088+ keep_files = False ,
1089+ )
1090+ df2 = pd .DataFrame ({"id" : [1 , 3 ], "val" : ["c" , "d" ]})
1091+ clauses = [
1092+ {"when" : "MATCHED" , "action" : "UPDATE" , "columns" : ["val" ]},
1093+ {"when" : "NOT MATCHED" , "action" : "INSERT" },
1094+ ]
1095+ wr .athena .to_iceberg (
1096+ df = df2 ,
1097+ database = glue_database ,
1098+ table = glue_table ,
1099+ table_location = path ,
1100+ temp_path = path2 ,
1101+ merge_cols = ["id" ],
1102+ merge_condition = "conditional_merge" ,
1103+ merge_conditional_clauses = clauses ,
1104+ keep_files = False ,
1105+ )
1106+ df_out = wr .athena .read_sql_query (
1107+ sql = f'SELECT * FROM "{ glue_table } " ORDER BY id' ,
1108+ database = glue_database ,
1109+ ctas_approach = False ,
1110+ unload_approach = False ,
1111+ )
1112+ # id=1 should be updated, id=2 should remain, id=3 should be inserted
1113+ expected = pd .DataFrame ({"id" : [1 , 2 , 3 ], "val" : ["c" , "b" , "d" ]})
1114+ assert_pandas_equals (expected , df_out .reset_index (drop = True ))
10021115
10031116def test_athena_iceberg_use_partition_function (
10041117 path : str ,
0 commit comments