Skip to content

Commit 9f4d83d

Browse files
authored
feat: Iceberg PARTITIONED BY and additional table properties support (#2322)
* feat: Iceberg `PARTITIONED BY` support * Additional table properties
1 parent cb2e4a3 commit 9f4d83d

File tree

2 files changed

+41
-4
lines changed

2 files changed

+41
-4
lines changed

awswrangler/athena/_write_iceberg.py

Lines changed: 25 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -2,7 +2,7 @@
22

33
import logging
44
import uuid
5-
from typing import Any, Dict, Optional
5+
from typing import Any, Dict, List, Optional
66

77
import boto3
88
import pandas as pd
@@ -25,6 +25,8 @@ def _create_iceberg_table(
2525
table: str,
2626
path: str,
2727
wg_config: _WorkGroupConfig,
28+
partition_cols: Optional[List[str]],
29+
additional_table_properties: Optional[Dict[str, Any]],
2830
index: bool = False,
2931
data_source: Optional[str] = None,
3032
workgroup: Optional[str] = None,
@@ -37,11 +39,18 @@ def _create_iceberg_table(
3739

3840
columns_types, _ = catalog.extract_athena_types(df=df, index=index)
3941
cols_str: str = ", ".join([f"{k} {v}" for k, v in columns_types.items()])
42+
partition_cols_str: str = f"PARTITIONED BY ({', '.join([col for col in partition_cols])})" if partition_cols else ""
43+
table_properties_str: str = (
44+
", " + ", ".join([f"'{key}'='{value}'" for key, value in additional_table_properties.items()])
45+
if additional_table_properties
46+
else ""
47+
)
4048

4149
create_sql: str = (
4250
f"CREATE TABLE IF NOT EXISTS {table} ({cols_str}) "
51+
f"{partition_cols_str} "
4352
f"LOCATION '{path}' "
44-
f"TBLPROPERTIES ( 'table_type' ='ICEBERG', 'format'='parquet' )"
53+
f"TBLPROPERTIES ('table_type' ='ICEBERG', 'format'='parquet'{table_properties_str})"
4554
)
4655

4756
query_id: str = _start_query_execution(
@@ -68,13 +77,15 @@ def to_iceberg(
6877
temp_path: Optional[str] = None,
6978
index: bool = False,
7079
table_location: Optional[str] = None,
80+
partition_cols: Optional[List[str]] = None,
7181
keep_files: bool = True,
7282
data_source: Optional[str] = None,
7383
workgroup: Optional[str] = None,
7484
encryption: Optional[str] = None,
7585
kms_key: Optional[str] = None,
7686
boto3_session: Optional[boto3.Session] = None,
7787
s3_additional_kwargs: Optional[Dict[str, Any]] = None,
88+
additional_table_properties: Optional[Dict[str, Any]] = None,
7889
) -> None:
7990
"""
8091
Insert into Athena Iceberg table using INSERT INTO ... SELECT. Will create Iceberg table if it does not exist.
@@ -97,6 +108,11 @@ def to_iceberg(
97108
Should consider the DataFrame index as a column?.
98109
table_location : str, optional
99110
Amazon S3 location for the table. Will only be used to create a new table if it does not exist.
111+
partition_cols: List[str], optional
112+
List of column names that will be used to create partitions, including support for transform
113+
functions (e.g. "day(ts)").
114+
115+
https://docs.aws.amazon.com/athena/latest/ug/querying-iceberg-creating-tables.html#querying-iceberg-partitioning
100116
keep_files : bool
101117
Whether staging files produced by Athena are retained. 'True' by default.
102118
data_source : str, optional
@@ -112,6 +128,11 @@ def to_iceberg(
112128
s3_additional_kwargs : Optional[Dict[str, Any]]
113129
Forwarded to botocore requests.
114130
e.g. s3_additional_kwargs={'RequestPayer': 'requester'}
131+
additional_table_properties : Optional[Dict[str, Any]]
132+
Additional table properties.
133+
e.g. additional_table_properties={'write_target_data_file_size_bytes': '536870912'}
134+
135+
https://docs.aws.amazon.com/athena/latest/ug/querying-iceberg-creating-tables.html#querying-iceberg-table-properties
115136
116137
Returns
117138
-------
@@ -163,6 +184,8 @@ def to_iceberg(
163184
table=table,
164185
path=table_location, # type: ignore[arg-type]
165186
wg_config=wg_config,
187+
partition_cols=partition_cols,
188+
additional_table_properties=additional_table_properties,
166189
index=index,
167190
data_source=data_source,
168191
workgroup=workgroup,

tests/unit/test_athena.py

Lines changed: 16 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -23,6 +23,7 @@
2323
get_df_txt,
2424
get_time_str_with_random_suffix,
2525
pandas_equals,
26+
ts,
2627
)
2728

2829
logging.getLogger("awswrangler").setLevel(logging.DEBUG)
@@ -1403,8 +1404,19 @@ def test_athena_date_recovery(path, glue_database, glue_table):
14031404
assert pandas_equals(df, df2)
14041405

14051406

1406-
def test_athena_to_iceberg(path, path2, glue_database, glue_table):
1407-
df = pd.DataFrame({"id": [1, 2, 3], "name": ["a", "b", "c"]})
1407+
@pytest.mark.parametrize("partition_cols", [None, ["name"], ["name", "day(ts)"]])
1408+
@pytest.mark.parametrize(
1409+
"additional_table_properties",
1410+
[None, {"write_target_data_file_size_bytes": 536870912, "optimize_rewrite_delete_file_threshold": 10}],
1411+
)
1412+
def test_athena_to_iceberg(path, path2, glue_database, glue_table, partition_cols, additional_table_properties):
1413+
df = pd.DataFrame(
1414+
{
1415+
"id": [1, 2, 3],
1416+
"name": ["a", "b", "c"],
1417+
"ts": [ts("2020-01-01 00:00:00.0"), ts("2020-01-02 00:00:01.0"), ts("2020-01-03 00:00:00.0")],
1418+
}
1419+
)
14081420
df["id"] = df["id"].astype("Int64") # Cast as nullable int64 type
14091421
df["name"] = df["name"].astype("string")
14101422

@@ -1414,6 +1426,8 @@ def test_athena_to_iceberg(path, path2, glue_database, glue_table):
14141426
table=glue_table,
14151427
table_location=path,
14161428
temp_path=path2,
1429+
partition_cols=partition_cols,
1430+
additional_table_properties=additional_table_properties,
14171431
)
14181432

14191433
df_out = wr.athena.read_sql_query(

0 commit comments

Comments
 (0)