From 19635ce7f80035db0ce71086a0f7c03ae1a3dc76 Mon Sep 17 00:00:00 2001 From: lautarortega <70765773+lautarortega@users.noreply.github.com> Date: Thu, 9 Jan 2025 11:11:06 +0100 Subject: [PATCH 1/4] bugfix/writing-complete-df Using the catalog columns only meant the new columns were being dropped. This caused the insert into statements to fail to write data for new columns. --- awswrangler/athena/_write_iceberg.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/awswrangler/athena/_write_iceberg.py b/awswrangler/athena/_write_iceberg.py index b85e68887..7108ac46f 100644 --- a/awswrangler/athena/_write_iceberg.py +++ b/awswrangler/athena/_write_iceberg.py @@ -546,7 +546,7 @@ def to_iceberg( # Ensure that the ordering of the DF is the same as in the catalog. # This is required for the INSERT command to work. - df = df[catalog_cols] + df = df[catalog_cols + list(schema_differences["new_columns"].keys())] if schema_evolution is False and any([schema_differences[x] for x in schema_differences]): # type: ignore[literal-required] raise exceptions.InvalidArgumentValue(f"Schema change detected: {schema_differences}") From 3b847ce7b699cb85806f70938aa04d93794df5c8 Mon Sep 17 00:00:00 2001 From: Lautaro Ortega Date: Tue, 4 Feb 2025 14:03:33 +0100 Subject: [PATCH 2/4] updated implementation --- awswrangler/athena/_write_iceberg.py | 20 ++++++++++++++++---- 1 file changed, 16 insertions(+), 4 deletions(-) diff --git a/awswrangler/athena/_write_iceberg.py b/awswrangler/athena/_write_iceberg.py index 30967a925..6075bf1b7 100644 --- a/awswrangler/athena/_write_iceberg.py +++ b/awswrangler/athena/_write_iceberg.py @@ -544,10 +544,6 @@ def to_iceberg( schema_differences["missing_columns"] = {} - # Ensure that the ordering of the DF is the same as in the catalog. - # This is required for the INSERT command to work. - df = df[catalog_cols + list(schema_differences["new_columns"].keys())] - if schema_evolution is False and any([schema_differences[x] for x in schema_differences]): # type: ignore[literal-required] raise exceptions.InvalidArgumentValue(f"Schema change detected: {schema_differences}") @@ -565,6 +561,22 @@ def to_iceberg( boto3_session=boto3_session, ) + # Ensure that the ordering of the DF is the same as in the catalog. + # This is required for the INSERT command to work. + # update catalog_cols after altering table + catalog_column_types = typing.cast( + Dict[str, str], + catalog.get_table_types( + database=database, + table=table, + catalog_id=catalog_id, + filter_iceberg_current=True, + boto3_session=boto3_session, + ), + ) + catalog_cols = [key for key in catalog_column_types] + df = df[catalog_cols] + # if mode == "overwrite_partitions", drop matched partitions if mode == "overwrite_partitions": delete_from_iceberg_table( From 954e35c8d17bc3a2b0df7901d8c5044bc02e269f Mon Sep 17 00:00:00 2001 From: Lautaro Ortega Date: Tue, 4 Feb 2025 14:41:18 +0100 Subject: [PATCH 3/4] test case added --- tests/unit/test_athena_iceberg.py | 55 +++++++++++++++++++++++++++++++ 1 file changed, 55 insertions(+) diff --git a/tests/unit/test_athena_iceberg.py b/tests/unit/test_athena_iceberg.py index 034e83d68..8ed799e91 100644 --- a/tests/unit/test_athena_iceberg.py +++ b/tests/unit/test_athena_iceberg.py @@ -1211,3 +1211,58 @@ def test_athena_to_iceberg_alter_schema( ) assert_pandas_equals(df, df_actual) + + +@pytest.mark.parametrize("partition_cols", [None, ["name"]]) +def test_athena_to_iceberg_append_schema_evolution( + path: str, + path2: str, + path3: str, + glue_database: str, + glue_table: str, + partition_cols: list[str] | None, +) -> None: + df = pd.DataFrame( + { + "id": [1, 2, 3, 4, 5], + "name": ["a", "b", "c", "a", "c"], + "age": [None, None, None, None, 50], + } + ) + df["id"] = df["id"].astype("Int64") # Cast as nullable int64 type + df["name"] = df["name"].astype("string") + df["age"] = df["age"].astype("Int64") # Cast as nullable int64 type + split_index_rows = 4 + split_index_columns = 2 + + wr.athena.to_iceberg( + df=df.iloc[:split_index_rows, :split_index_columns], + database=glue_database, + table=glue_table, + table_location=path, + temp_path=path2, + partition_cols=partition_cols, + keep_files=False, + ) + + wr.athena.to_iceberg( + df=df.iloc[split_index_rows:, :], + database=glue_database, + table=glue_table, + table_location=path, + temp_path=path2, + partition_cols=partition_cols, + schema_evolution=True, + keep_files=False, + mode="append", + s3_output=path3, + ) + + df_actual = wr.athena.read_sql_query( + sql=f'SELECT * FROM "{glue_table}" ORDER BY id', + database=glue_database, + ctas_approach=False, + unload_approach=False, + ) + + assert_pandas_equals(df, df_actual) From d5dba3a0bd8408b7be6f73ee02bd9a38ecb92a7a Mon Sep 17 00:00:00 2001 From: lautarortega <70765773+lautarortega@users.noreply.github.com> Date: Tue, 18 Feb 2025 22:32:47 +0100 Subject: [PATCH 4/4] fix: failing test fix --- awswrangler/athena/_write_iceberg.py | 19 +++++++++---------- 1 file changed, 9 insertions(+), 10 deletions(-) diff --git a/awswrangler/athena/_write_iceberg.py b/awswrangler/athena/_write_iceberg.py index 0996b7d2c..fc6cd8274 100644 --- a/awswrangler/athena/_write_iceberg.py +++ b/awswrangler/athena/_write_iceberg.py @@ -569,17 +569,16 @@ def to_iceberg( # noqa: PLR0913 # Ensure that the ordering of the DF is the same as in the catalog. # This is required for the INSERT command to work. # update catalog_cols after altering table - catalog_column_types = typing.cast( - Dict[str, str], - catalog.get_table_types( - database=database, - table=table, - catalog_id=catalog_id, - filter_iceberg_current=True, - boto3_session=boto3_session, - ), + _, catalog_cols = _determine_differences( + df=df, + database=database, + table=table, + index=index, + partition_cols=partition_cols, + boto3_session=boto3_session, + dtype=dtype, + catalog_id=catalog_id, ) - catalog_cols = [key for key in catalog_column_types] df = df[catalog_cols] # if mode == "overwrite_partitions", drop matched partitions