From 29ec58d9c94c4ab8382329de969b0f598c52e583 Mon Sep 17 00:00:00 2001 From: Anton Kukushkin Date: Tue, 1 Oct 2024 19:25:04 +0100 Subject: [PATCH 1/6] fix: do not return hidden iceberg columns --- awswrangler/catalog/_utils.py | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) diff --git a/awswrangler/catalog/_utils.py b/awswrangler/catalog/_utils.py index 2958d2bad..e19c52a24 100644 --- a/awswrangler/catalog/_utils.py +++ b/awswrangler/catalog/_utils.py @@ -34,7 +34,9 @@ def _sanitize_name(name: str) -> str: def _extract_dtypes_from_table_details(response: "GetTableResponseTypeDef") -> dict[str, str]: dtypes: dict[str, str] = {} for col in response["Table"]["StorageDescriptor"]["Columns"]: - dtypes[col["Name"]] = col["Type"] + # Do not return "hidden" iceberg columns + if col.get("Parameters", {}).get("iceberg.field.current") != "false": + dtypes[col["Name"]] = col["Type"] if "PartitionKeys" in response["Table"]: for par in response["Table"]["PartitionKeys"]: dtypes[par["Name"]] = par["Type"] From 1ac85a9738d1059ec90422b7429eaa92453f634d Mon Sep 17 00:00:00 2001 From: Anton Kukushkin Date: Wed, 2 Oct 2024 00:39:54 +0100 Subject: [PATCH 2/6] add parameter --- awswrangler/athena/_write_iceberg.py | 8 +++++++- awswrangler/catalog/_get.py | 9 ++++++++- awswrangler/catalog/_utils.py | 10 +++++++--- 3 files changed, 22 insertions(+), 5 deletions(-) diff --git a/awswrangler/athena/_write_iceberg.py b/awswrangler/athena/_write_iceberg.py index 5e5481f13..38ca2de65 100644 --- a/awswrangler/athena/_write_iceberg.py +++ b/awswrangler/athena/_write_iceberg.py @@ -115,7 +115,13 @@ def _determine_differences( catalog_column_types = typing.cast( Dict[str, str], - catalog.get_table_types(database=database, table=table, catalog_id=catalog_id, boto3_session=boto3_session), + catalog.get_table_types( + database=database, + table=table, + catalog_id=catalog_id, + return_iceberg_current=True, + boto3_session=boto3_session, + ), ) original_column_names = set(catalog_column_types) diff --git a/awswrangler/catalog/_get.py b/awswrangler/catalog/_get.py index 8b3d93385..083782e02 100644 --- a/awswrangler/catalog/_get.py +++ b/awswrangler/catalog/_get.py @@ -107,6 +107,7 @@ def get_table_types( database: str, table: str, catalog_id: str | None = None, + return_iceberg_current: bool = False, boto3_session: boto3.Session | None = None, ) -> dict[str, str] | None: """Get all columns and types from a table. @@ -120,6 +121,9 @@ def get_table_types( catalog_id The ID of the Data Catalog from which to retrieve Databases. If ``None`` is provided, the AWS account ID is used by default. + return_iceberg_current + If True, returns only current iceberg fields (fields marked with iceberg.field.current: true). + Otherwise, returns the all fields. False by default (return all fields). boto3_session The default boto3 session will be used if **boto3_session** receive ``None``. @@ -139,7 +143,10 @@ def get_table_types( response = client_glue.get_table(**_catalog_id(catalog_id=catalog_id, DatabaseName=database, Name=table)) except client_glue.exceptions.EntityNotFoundException: return None - return _extract_dtypes_from_table_details(response=response) + return _extract_dtypes_from_table_details( + response=response, + return_iceberg_current=return_iceberg_current, + ) def get_databases( diff --git a/awswrangler/catalog/_utils.py b/awswrangler/catalog/_utils.py index e19c52a24..82487d368 100644 --- a/awswrangler/catalog/_utils.py +++ b/awswrangler/catalog/_utils.py @@ -31,12 +31,16 @@ def _sanitize_name(name: str) -> str: return re.sub("[^A-Za-z0-9_]+", "_", name).lower() # Replacing non alphanumeric characters by underscore -def _extract_dtypes_from_table_details(response: "GetTableResponseTypeDef") -> dict[str, str]: +def _extract_dtypes_from_table_details( + response: "GetTableResponseTypeDef", + return_iceberg_current: bool = False, +) -> dict[str, str]: dtypes: dict[str, str] = {} for col in response["Table"]["StorageDescriptor"]["Columns"]: - # Do not return "hidden" iceberg columns - if col.get("Parameters", {}).get("iceberg.field.current") != "false": + # Only return current fields if flag is enabled + if not return_iceberg_current or col.get("Parameters", {}).get("iceberg.field.current") == "true": dtypes[col["Name"]] = col["Type"] + # Add partition keys as columns if "PartitionKeys" in response["Table"]: for par in response["Table"]["PartitionKeys"]: dtypes[par["Name"]] = par["Type"] From 22333cf5c1810548caa0201fe8cf31ecf0a44fcf Mon Sep 17 00:00:00 2001 From: Anton Kukushkin Date: Wed, 2 Oct 2024 00:40:09 +0100 Subject: [PATCH 3/6] add test case --- tests/unit/test_athena_iceberg.py | 52 +++++++++++++++++++++++++++++++ 1 file changed, 52 insertions(+) diff --git a/tests/unit/test_athena_iceberg.py b/tests/unit/test_athena_iceberg.py index 71198e2ca..252d4b1b7 100644 --- a/tests/unit/test_athena_iceberg.py +++ b/tests/unit/test_athena_iceberg.py @@ -1159,3 +1159,55 @@ def test_to_iceberg_fill_missing_columns_with_complex_types( schema_evolution=True, fill_missing_columns_in_df=True, ) + + +def test_athena_to_iceberg_alter_schema( + path: str, + path2: str, + glue_database: str, + glue_table: str, +) -> None: + df = pd.DataFrame( + { + "id": pd.Series([1, 2, 3, 4, 5], dtype="Int64"), + "name": pd.Series(["a", "b", "c", "d", "e"], dtype="string"), + }, + ).reset_index(drop=True) + + split_index = 3 + + wr.athena.to_iceberg( + df=df[:split_index], + database=glue_database, + table=glue_table, + table_location=path, + temp_path=path2, + schema_evolution=True, + keep_files=False, + ) + + wr.athena.start_query_execution( + sql=f"ALTER TABLE {glue_table} CHANGE COLUMN id new_id bigint", + database=glue_database, + wait=True, + ) + + df = df.rename(columns={"id": "new_id"}) + + wr.athena.to_iceberg( + df=df[split_index:], + database=glue_database, + table=glue_table, + table_location=path, + temp_path=path2, + schema_evolution=True, + keep_files=False, + ) + + df_actual = wr.athena.read_sql_query( + sql=f"SELECT new_id, name FROM '{glue_table}' ORDER BY new_id", + database=glue_database, + ctas_approach=False, + ) + + assert_pandas_equals(df, df_actual) From f82fed5f065a4dc0f0153fe7a91044800ed7901c Mon Sep 17 00:00:00 2001 From: Anton Kukushkin Date: Wed, 2 Oct 2024 00:43:42 +0100 Subject: [PATCH 4/6] fix parentheses --- tests/unit/test_athena_iceberg.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/tests/unit/test_athena_iceberg.py b/tests/unit/test_athena_iceberg.py index 252d4b1b7..034e83d68 100644 --- a/tests/unit/test_athena_iceberg.py +++ b/tests/unit/test_athena_iceberg.py @@ -1205,7 +1205,7 @@ def test_athena_to_iceberg_alter_schema( ) df_actual = wr.athena.read_sql_query( - sql=f"SELECT new_id, name FROM '{glue_table}' ORDER BY new_id", + sql=f"SELECT new_id, name FROM {glue_table} ORDER BY new_id", database=glue_database, ctas_approach=False, ) From de830997229da1b812d20d414bc3ba93ec6ff9f6 Mon Sep 17 00:00:00 2001 From: Anton Kukushkin Date: Wed, 2 Oct 2024 16:03:21 +0100 Subject: [PATCH 5/6] pr feedback --- awswrangler/catalog/_get.py | 10 +++++----- awswrangler/catalog/_utils.py | 4 ++-- 2 files changed, 7 insertions(+), 7 deletions(-) diff --git a/awswrangler/catalog/_get.py b/awswrangler/catalog/_get.py index 083782e02..b5fc5a443 100644 --- a/awswrangler/catalog/_get.py +++ b/awswrangler/catalog/_get.py @@ -107,7 +107,7 @@ def get_table_types( database: str, table: str, catalog_id: str | None = None, - return_iceberg_current: bool = False, + filter_iceberg_current: bool = False, boto3_session: boto3.Session | None = None, ) -> dict[str, str] | None: """Get all columns and types from a table. @@ -121,9 +121,9 @@ def get_table_types( catalog_id The ID of the Data Catalog from which to retrieve Databases. If ``None`` is provided, the AWS account ID is used by default. - return_iceberg_current - If True, returns only current iceberg fields (fields marked with iceberg.field.current: true). - Otherwise, returns the all fields. False by default (return all fields). + filter_iceberg_current + If True, returns only current iceberg fields (fields marked with iceberg.field.current: true). + Otherwise, returns the all fields. False by default (return all fields). boto3_session The default boto3 session will be used if **boto3_session** receive ``None``. @@ -145,7 +145,7 @@ def get_table_types( return None return _extract_dtypes_from_table_details( response=response, - return_iceberg_current=return_iceberg_current, + filter_iceberg_current=filter_iceberg_current, ) diff --git a/awswrangler/catalog/_utils.py b/awswrangler/catalog/_utils.py index 82487d368..10e142bd2 100644 --- a/awswrangler/catalog/_utils.py +++ b/awswrangler/catalog/_utils.py @@ -33,12 +33,12 @@ def _sanitize_name(name: str) -> str: def _extract_dtypes_from_table_details( response: "GetTableResponseTypeDef", - return_iceberg_current: bool = False, + filter_iceberg_current: bool = False, ) -> dict[str, str]: dtypes: dict[str, str] = {} for col in response["Table"]["StorageDescriptor"]["Columns"]: # Only return current fields if flag is enabled - if not return_iceberg_current or col.get("Parameters", {}).get("iceberg.field.current") == "true": + if not filter_iceberg_current or col.get("Parameters", {}).get("iceberg.field.current") == "true": dtypes[col["Name"]] = col["Type"] # Add partition keys as columns if "PartitionKeys" in response["Table"]: From 58c17ae00fb99473d7f7e985be3d1ecf2a3d3df9 Mon Sep 17 00:00:00 2001 From: Anton Kukushkin Date: Wed, 2 Oct 2024 16:09:52 +0100 Subject: [PATCH 6/6] rename param --- awswrangler/athena/_write_iceberg.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/awswrangler/athena/_write_iceberg.py b/awswrangler/athena/_write_iceberg.py index 38ca2de65..b85e68887 100644 --- a/awswrangler/athena/_write_iceberg.py +++ b/awswrangler/athena/_write_iceberg.py @@ -119,7 +119,7 @@ def _determine_differences( database=database, table=table, catalog_id=catalog_id, - return_iceberg_current=True, + filter_iceberg_current=True, boto3_session=boto3_session, ), )