diff --git a/awswrangler/athena/_write_iceberg.py b/awswrangler/athena/_write_iceberg.py index 5e5481f13..b85e68887 100644 --- a/awswrangler/athena/_write_iceberg.py +++ b/awswrangler/athena/_write_iceberg.py @@ -115,7 +115,13 @@ def _determine_differences( catalog_column_types = typing.cast( Dict[str, str], - catalog.get_table_types(database=database, table=table, catalog_id=catalog_id, boto3_session=boto3_session), + catalog.get_table_types( + database=database, + table=table, + catalog_id=catalog_id, + filter_iceberg_current=True, + boto3_session=boto3_session, + ), ) original_column_names = set(catalog_column_types) diff --git a/awswrangler/catalog/_get.py b/awswrangler/catalog/_get.py index 8b3d93385..b5fc5a443 100644 --- a/awswrangler/catalog/_get.py +++ b/awswrangler/catalog/_get.py @@ -107,6 +107,7 @@ def get_table_types( database: str, table: str, catalog_id: str | None = None, + filter_iceberg_current: bool = False, boto3_session: boto3.Session | None = None, ) -> dict[str, str] | None: """Get all columns and types from a table. @@ -120,6 +121,9 @@ def get_table_types( catalog_id The ID of the Data Catalog from which to retrieve Databases. If ``None`` is provided, the AWS account ID is used by default. + filter_iceberg_current + If True, returns only current iceberg fields (fields marked with iceberg.field.current: true). + Otherwise, returns the all fields. False by default (return all fields). boto3_session The default boto3 session will be used if **boto3_session** receive ``None``. @@ -139,7 +143,10 @@ def get_table_types( response = client_glue.get_table(**_catalog_id(catalog_id=catalog_id, DatabaseName=database, Name=table)) except client_glue.exceptions.EntityNotFoundException: return None - return _extract_dtypes_from_table_details(response=response) + return _extract_dtypes_from_table_details( + response=response, + filter_iceberg_current=filter_iceberg_current, + ) def get_databases( diff --git a/awswrangler/catalog/_utils.py b/awswrangler/catalog/_utils.py index 2958d2bad..10e142bd2 100644 --- a/awswrangler/catalog/_utils.py +++ b/awswrangler/catalog/_utils.py @@ -31,10 +31,16 @@ def _sanitize_name(name: str) -> str: return re.sub("[^A-Za-z0-9_]+", "_", name).lower() # Replacing non alphanumeric characters by underscore -def _extract_dtypes_from_table_details(response: "GetTableResponseTypeDef") -> dict[str, str]: +def _extract_dtypes_from_table_details( + response: "GetTableResponseTypeDef", + filter_iceberg_current: bool = False, +) -> dict[str, str]: dtypes: dict[str, str] = {} for col in response["Table"]["StorageDescriptor"]["Columns"]: - dtypes[col["Name"]] = col["Type"] + # Only return current fields if flag is enabled + if not filter_iceberg_current or col.get("Parameters", {}).get("iceberg.field.current") == "true": + dtypes[col["Name"]] = col["Type"] + # Add partition keys as columns if "PartitionKeys" in response["Table"]: for par in response["Table"]["PartitionKeys"]: dtypes[par["Name"]] = par["Type"] diff --git a/tests/unit/test_athena_iceberg.py b/tests/unit/test_athena_iceberg.py index 71198e2ca..034e83d68 100644 --- a/tests/unit/test_athena_iceberg.py +++ b/tests/unit/test_athena_iceberg.py @@ -1159,3 +1159,55 @@ def test_to_iceberg_fill_missing_columns_with_complex_types( schema_evolution=True, fill_missing_columns_in_df=True, ) + + +def test_athena_to_iceberg_alter_schema( + path: str, + path2: str, + glue_database: str, + glue_table: str, +) -> None: + df = pd.DataFrame( + { + "id": pd.Series([1, 2, 3, 4, 5], dtype="Int64"), + "name": pd.Series(["a", "b", "c", "d", "e"], dtype="string"), + }, + ).reset_index(drop=True) + + split_index = 3 + + wr.athena.to_iceberg( + df=df[:split_index], + database=glue_database, + table=glue_table, + table_location=path, + temp_path=path2, + schema_evolution=True, + keep_files=False, + ) + + wr.athena.start_query_execution( + sql=f"ALTER TABLE {glue_table} CHANGE COLUMN id new_id bigint", + database=glue_database, + wait=True, + ) + + df = df.rename(columns={"id": "new_id"}) + + wr.athena.to_iceberg( + df=df[split_index:], + database=glue_database, + table=glue_table, + table_location=path, + temp_path=path2, + schema_evolution=True, + keep_files=False, + ) + + df_actual = wr.athena.read_sql_query( + sql=f"SELECT new_id, name FROM {glue_table} ORDER BY new_id", + database=glue_database, + ctas_approach=False, + ) + + assert_pandas_equals(df, df_actual)