From 8e2938b22ee67fb5595ac857e6cd03ba17c27efc Mon Sep 17 00:00:00 2001 From: Brayan Alfaro Date: Mon, 4 Aug 2025 18:35:20 -0600 Subject: [PATCH 1/3] fix: do not remove new columns values --- awswrangler/athena/_write_iceberg.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/awswrangler/athena/_write_iceberg.py b/awswrangler/athena/_write_iceberg.py index fabc4d032..b5f05eb57 100644 --- a/awswrangler/athena/_write_iceberg.py +++ b/awswrangler/athena/_write_iceberg.py @@ -548,7 +548,7 @@ def to_iceberg( # noqa: PLR0913 # Ensure that the ordering of the DF is the same as in the catalog. # This is required for the INSERT command to work. - df = df[catalog_cols] + df = df[catalog_cols + [col_name for col_name, _ in schema_differences["new_columns"].items()]] if schema_evolution is False and any([schema_differences[x] for x in schema_differences]): # type: ignore[literal-required] raise exceptions.InvalidArgumentValue(f"Schema change detected: {schema_differences}") From 5f561c535e83411a1989b456d428e6e64afba60a Mon Sep 17 00:00:00 2001 From: Brayan Alfaro Date: Thu, 4 Sep 2025 17:59:00 -0600 Subject: [PATCH 2/3] Expand athena iceberg add columns test to check values --- tests/unit/test_athena_iceberg.py | 20 ++++++++++++-------- 1 file changed, 12 insertions(+), 8 deletions(-) diff --git a/tests/unit/test_athena_iceberg.py b/tests/unit/test_athena_iceberg.py index 6233e8425..efb19ddbe 100644 --- a/tests/unit/test_athena_iceberg.py +++ b/tests/unit/test_athena_iceberg.py @@ -321,11 +321,10 @@ def test_athena_to_iceberg_overwrite_partitions_merge_cols_error( mode="overwrite_partitions", ) - def test_athena_to_iceberg_schema_evolution_add_columns( path: str, path2: str, glue_database: str, glue_table: str ) -> None: - df = pd.DataFrame({"c0": [0, 1, 2], "c1": [3, 4, 5]}) + df = pd.DataFrame({"c0": [0, 1, 2], "c1": [6, 7, 8]}) wr.athena.to_iceberg( df=df, database=glue_database, @@ -336,9 +335,9 @@ def test_athena_to_iceberg_schema_evolution_add_columns( schema_evolution=True, ) - df["c2"] = [6, 7, 8] + df2 = pd.DataFrame({"c0": [3, 4, 5], "c2": [9, 10, 11]}) wr.athena.to_iceberg( - df=df, + df=df2, database=glue_database, table=glue_table, table_location=path, @@ -348,7 +347,7 @@ def test_athena_to_iceberg_schema_evolution_add_columns( ) column_types = wr.catalog.get_table_types(glue_database, glue_table) - assert len(column_types) == len(df.columns) + assert len(column_types) == 3 df_out = wr.athena.read_sql_table( table=glue_table, @@ -356,12 +355,17 @@ def test_athena_to_iceberg_schema_evolution_add_columns( ctas_approach=False, unload_approach=False, ) - assert len(df_out) == len(df) * 2 - df["c3"] = [9, 10, 11] + df_expected = pd.DataFrame( + {"c0": [0, 1, 2, 3, 4, 5], "c1": [6, 7, 8, np.nan, np.nan, np.nan], "c2": [np.nan, np.nan, np.nan, 9, 10, 11]}, + dtype="Int64", + ) + assert_pandas_equals(df_out.sort_values(by=["c0"]).reset_index(drop=True), df_expected) + + df3 = pd.DataFrame({"c0": [12], "c1": [13], "c2": [14], "c3": [15]}) with pytest.raises(wr.exceptions.InvalidArgumentValue): wr.athena.to_iceberg( - df=df, + df=df3, database=glue_database, table=glue_table, table_location=path, From 9b7e61462d0068cf9a143a93266f73c8a5ff6316 Mon Sep 17 00:00:00 2001 From: Brayan Alfaro Date: Thu, 4 Sep 2025 18:02:41 -0600 Subject: [PATCH 3/3] Add missing blank line --- tests/unit/test_athena_iceberg.py | 1 + 1 file changed, 1 insertion(+) diff --git a/tests/unit/test_athena_iceberg.py b/tests/unit/test_athena_iceberg.py index efb19ddbe..9954375b0 100644 --- a/tests/unit/test_athena_iceberg.py +++ b/tests/unit/test_athena_iceberg.py @@ -321,6 +321,7 @@ def test_athena_to_iceberg_overwrite_partitions_merge_cols_error( mode="overwrite_partitions", ) + def test_athena_to_iceberg_schema_evolution_add_columns( path: str, path2: str, glue_database: str, glue_table: str ) -> None: