diff --git a/awswrangler/athena/_write_iceberg.py b/awswrangler/athena/_write_iceberg.py index ee3791bf2..1c10289ef 100644 --- a/awswrangler/athena/_write_iceberg.py +++ b/awswrangler/athena/_write_iceberg.py @@ -549,10 +549,6 @@ def to_iceberg( # noqa: PLR0913 schema_differences["missing_columns"] = {} - # Ensure that the ordering of the DF is the same as in the catalog. - # This is required for the INSERT command to work. - df = df[catalog_cols] - if schema_evolution is False and any([schema_differences[x] for x in schema_differences]): # type: ignore[literal-required] raise exceptions.InvalidArgumentValue(f"Schema change detected: {schema_differences}") @@ -570,6 +566,21 @@ def to_iceberg( # noqa: PLR0913 boto3_session=boto3_session, ) + # Ensure that the ordering of the DF is the same as in the catalog. + # This is required for the INSERT command to work. + # update catalog_cols after altering table + _, catalog_cols = _determine_differences( + df=df, + database=database, + table=table, + index=index, + partition_cols=partition_cols, + boto3_session=boto3_session, + dtype=dtype, + catalog_id=catalog_id, + ) + df = df[catalog_cols] + # if mode == "overwrite_partitions", drop matched partitions if mode == "overwrite_partitions": delete_from_iceberg_table( diff --git a/tests/unit/test_athena_iceberg.py b/tests/unit/test_athena_iceberg.py index 6233e8425..68c44fe47 100644 --- a/tests/unit/test_athena_iceberg.py +++ b/tests/unit/test_athena_iceberg.py @@ -1217,3 +1217,58 @@ def test_athena_to_iceberg_alter_schema( ) assert_pandas_equals(df, df_actual) + + +@pytest.mark.parametrize("partition_cols", [None, ["name"]]) +def test_athena_to_iceberg_append_schema_evolution( + path: str, + path2: str, + path3: str, + glue_database: str, + glue_table: str, + partition_cols: list[str] | None, +) -> None: + df = pd.DataFrame( + { + "id": [1, 2, 3, 4, 5], + "name": ["a", "b", "c", "a", "c"], + "age": [None, None, None, None, 50], + } + ) + df["id"] = df["id"].astype("Int64") # Cast as nullable int64 type + df["name"] = df["name"].astype("string") + df["age"] = df["age"].astype("Int64") # Cast as nullable int64 type + split_index_rows = 4 + split_index_columns = 2 + + wr.athena.to_iceberg( + df=df.iloc[:split_index_rows, :split_index_columns], + database=glue_database, + table=glue_table, + table_location=path, + temp_path=path2, + partition_cols=partition_cols, + keep_files=False, + ) + + wr.athena.to_iceberg( + df=df.iloc[split_index_rows:, :], + database=glue_database, + table=glue_table, + table_location=path, + temp_path=path2, + partition_cols=partition_cols, + schema_evolution=True, + keep_files=False, + mode="append", + s3_output=path3, + ) + + df_actual = wr.athena.read_sql_query( + sql=f'SELECT * FROM "{glue_table}" ORDER BY id', + database=glue_database, + ctas_approach=False, + unload_approach=False, + ) + + assert_pandas_equals(df, df_actual)