Skip to content

Commit b1880cb

Browse files
feat: add columns comments to iceberg (#2482)
1 parent 83cc0b4 commit b1880cb

File tree

2 files changed

+46
-2
lines changed

2 files changed

+46
-2
lines changed

awswrangler/athena/_write_iceberg.py

Lines changed: 23 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -3,7 +3,7 @@
33
import logging
44
import typing
55
import uuid
6-
from typing import Any, Dict, List, Optional, Set, TypedDict
6+
from typing import Any, Dict, List, Optional, Set, TypedDict, cast
77

88
import boto3
99
import pandas as pd
@@ -16,6 +16,7 @@
1616
_start_query_execution,
1717
_WorkGroupConfig,
1818
)
19+
from awswrangler.typing import GlueTableSettings
1920

2021
_logger: logging.Logger = logging.getLogger(__name__)
2122

@@ -35,12 +36,20 @@ def _create_iceberg_table(
3536
kms_key: Optional[str] = None,
3637
boto3_session: Optional[boto3.Session] = None,
3738
dtype: Optional[Dict[str, str]] = None,
39+
columns_comments: Optional[Dict[str, Any]] = None,
3840
) -> None:
3941
if not path:
4042
raise exceptions.InvalidArgumentValue("Must specify table location to create the table.")
4143

4244
columns_types, _ = catalog.extract_athena_types(df=df, index=index, dtype=dtype)
43-
cols_str: str = ", ".join([f"{k} {v}" for k, v in columns_types.items()])
45+
cols_str: str = ", ".join(
46+
[
47+
f"{k} {v}"
48+
if (columns_comments is None or columns_comments.get(k) is None)
49+
else f"{k} {v} COMMENT '{columns_comments[k]}'"
50+
for k, v in columns_types.items()
51+
]
52+
)
4453
partition_cols_str: str = f"PARTITIONED BY ({', '.join([col for col in partition_cols])})" if partition_cols else ""
4554
table_properties_str: str = (
4655
", " + ", ".join([f"'{key}'='{value}'" for key, value in additional_table_properties.items()])
@@ -196,6 +205,7 @@ def to_iceberg(
196205
dtype: Optional[Dict[str, str]] = None,
197206
catalog_id: Optional[str] = None,
198207
schema_evolution: bool = False,
208+
glue_table_settings: Optional[GlueTableSettings] = None,
199209
) -> None:
200210
"""
201211
Insert into Athena Iceberg table using INSERT INTO ... SELECT. Will create Iceberg table if it does not exist.
@@ -252,6 +262,10 @@ def to_iceberg(
252262
If none is provided, the AWS account ID is used by default
253263
schema_evolution: bool
254264
If True allows schema evolution for new columns or changes in column types.
265+
columns_comments: Optional[GlueTableSettings]
266+
Glue/Athena catalog: Settings for writing to the Glue table.
267+
Currently only the 'columns_comments' attribute is supported for this function.
268+
Columns comments can only be added with this function when creating a new table.
255269
256270
Returns
257271
-------
@@ -294,6 +308,11 @@ def to_iceberg(
294308
"Either path or workgroup path must be specified to store the temporary results."
295309
)
296310

311+
glue_table_settings = cast(
312+
GlueTableSettings,
313+
glue_table_settings if glue_table_settings else {},
314+
)
315+
297316
try:
298317
# Create Iceberg table if it doesn't exist
299318
if not catalog.does_table_exist(
@@ -314,6 +333,7 @@ def to_iceberg(
314333
kms_key=kms_key,
315334
boto3_session=boto3_session,
316335
dtype=dtype,
336+
columns_comments=glue_table_settings.get("columns_comments"),
317337
)
318338
else:
319339
schema_differences = _determine_differences(
@@ -352,6 +372,7 @@ def to_iceberg(
352372
s3_additional_kwargs=s3_additional_kwargs,
353373
dtype=dtype,
354374
catalog_id=catalog_id,
375+
glue_table_settings=glue_table_settings,
355376
)
356377

357378
# Insert into iceberg table

tests/unit/test_athena.py

Lines changed: 23 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1726,3 +1726,26 @@ def test_athena_to_iceberg_with_hyphenated_table_name(
17261726

17271727
assert len(df) == len(df_out)
17281728
assert len(df.columns) == len(df_out.columns)
1729+
1730+
1731+
def test_athena_to_iceberg_column_comments(path: str, path2: str, glue_database: str, glue_table: str) -> None:
1732+
df = pd.DataFrame({"c0": [0, 1, 2], "c1": [3, 4, 5]})
1733+
column_comments = {
1734+
"c0": "comment 0",
1735+
"c1": "comment 1",
1736+
}
1737+
wr.athena.to_iceberg(
1738+
df=df,
1739+
database=glue_database,
1740+
table=glue_table,
1741+
table_location=path,
1742+
temp_path=path2,
1743+
keep_files=False,
1744+
glue_table_settings={
1745+
"columns_comments": column_comments,
1746+
},
1747+
)
1748+
1749+
column_comments_actual = wr.catalog.get_columns_comments(glue_database, glue_table)
1750+
1751+
assert column_comments_actual == column_comments

0 commit comments

Comments
 (0)