Skip to content

Commit 620cd41

Browse files
authored
fix: add catalog id in Athena to_iceberg (#2446)
1 parent 2ab00a5 commit 620cd41

File tree

1 file changed

+13
-4
lines changed

1 file changed

+13
-4
lines changed

awswrangler/athena/_write_iceberg.py

Lines changed: 13 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -88,6 +88,7 @@ def to_iceberg(
8888
s3_additional_kwargs: Optional[Dict[str, Any]] = None,
8989
additional_table_properties: Optional[Dict[str, Any]] = None,
9090
dtype: Optional[Dict[str, str]] = None,
91+
catalog_id: Optional[str] = None,
9192
) -> None:
9293
"""
9394
Insert into Athena Iceberg table using INSERT INTO ... SELECT. Will create Iceberg table if it does not exist.
@@ -139,6 +140,9 @@ def to_iceberg(
139140
Dictionary of columns names and Athena/Glue types to be casted.
140141
Useful when you have columns with undetermined or mixed data types.
141142
e.g. {'col name': 'bigint', 'col2 name': 'int'}
143+
catalog_id : str, optional
144+
The ID of the Data Catalog from which to retrieve Databases.
145+
If none is provided, the AWS account ID is used by default
142146
143147
Returns
144148
-------
@@ -183,7 +187,9 @@ def to_iceberg(
183187

184188
try:
185189
# Create Iceberg table if it doesn't exist
186-
if not catalog.does_table_exist(database=database, table=table, boto3_session=boto3_session):
190+
if not catalog.does_table_exist(
191+
database=database, table=table, boto3_session=boto3_session, catalog_id=catalog_id
192+
):
187193
_create_iceberg_table(
188194
df=df,
189195
database=database,
@@ -211,10 +217,11 @@ def to_iceberg(
211217
boto3_session=boto3_session,
212218
s3_additional_kwargs=s3_additional_kwargs,
213219
dtype=dtype,
220+
catalog_id=catalog_id,
214221
)
215222

216223
# Insert into iceberg table
217-
query_id: str = _start_query_execution(
224+
query_execution_id: str = _start_query_execution(
218225
sql=f'INSERT INTO "{database}"."{table}" SELECT * FROM "{database}"."{temp_table}"',
219226
workgroup=workgroup,
220227
wg_config=wg_config,
@@ -224,14 +231,16 @@ def to_iceberg(
224231
kms_key=kms_key,
225232
boto3_session=boto3_session,
226233
)
227-
wait_query(query_id)
234+
wait_query(query_execution_id=query_execution_id, boto3_session=boto3_session)
228235

229236
except Exception as ex:
230237
_logger.error(ex)
231238

232239
raise
233240
finally:
234-
catalog.delete_table_if_exists(database=database, table=temp_table, boto3_session=boto3_session)
241+
catalog.delete_table_if_exists(
242+
database=database, table=temp_table, boto3_session=boto3_session, catalog_id=catalog_id
243+
)
235244

236245
if keep_files is False:
237246
s3.delete_objects(

0 commit comments

Comments
 (0)