Skip to content

Commit 6e43648

Browse files
feat: Enable Iceberg row deletion & add mode parameter to to_iceberg (#2632)
1 parent b527aba commit 6e43648

File tree

7 files changed

+659
-13
lines changed

7 files changed

+659
-13
lines changed

.github/workflows/static-checking.yml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -41,4 +41,4 @@ jobs:
4141
- name: Documentation check
4242
run: doc8 --max-line-length 120 docs/source
4343
- name: Check poetry.lock consistency with pyproject.toml
44-
run: poetry lock --check
44+
run: poetry check --lock

awswrangler/athena/__init__.py

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -31,7 +31,7 @@
3131
repair_table,
3232
show_create_table,
3333
)
34-
from awswrangler.athena._write_iceberg import to_iceberg
34+
from awswrangler.athena._write_iceberg import to_iceberg, delete_from_iceberg_table
3535

3636

3737
__all__ = [
@@ -60,4 +60,5 @@
6060
"list_prepared_statements",
6161
"delete_prepared_statement",
6262
"to_iceberg",
63+
"delete_from_iceberg_table",
6364
]

awswrangler/athena/_write_iceberg.py

Lines changed: 216 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -5,7 +5,7 @@
55
import logging
66
import typing
77
import uuid
8-
from typing import Any, Dict, TypedDict, cast
8+
from typing import Any, Dict, Literal, TypedDict, cast
99

1010
import boto3
1111
import pandas as pd
@@ -191,6 +191,33 @@ def _alter_iceberg_table_change_columns_sql(
191191
return sql_statements
192192

193193

194+
def _validate_args(
195+
df: pd.DataFrame,
196+
temp_path: str | None,
197+
wg_config: _WorkGroupConfig,
198+
mode: Literal["append", "overwrite", "overwrite_partitions"],
199+
partition_cols: list[str] | None,
200+
merge_cols: list[str] | None,
201+
) -> None:
202+
if df.empty is True:
203+
raise exceptions.EmptyDataFrame("DataFrame cannot be empty.")
204+
205+
if not temp_path and not wg_config.s3_output:
206+
raise exceptions.InvalidArgumentCombination(
207+
"Either path or workgroup path must be specified to store the temporary results."
208+
)
209+
210+
if mode == "overwrite_partitions":
211+
if not partition_cols:
212+
raise exceptions.InvalidArgumentCombination(
213+
"When mode is 'overwrite_partitions' partition_cols must be specified."
214+
)
215+
if merge_cols:
216+
raise exceptions.InvalidArgumentCombination(
217+
"When mode is 'overwrite_partitions' merge_cols must not be specified."
218+
)
219+
220+
194221
@apply_configs
195222
@_utils.validate_distributed_kwargs(
196223
unsupported_kwargs=["boto3_session", "s3_additional_kwargs"],
@@ -207,6 +234,7 @@ def to_iceberg(
207234
keep_files: bool = True,
208235
data_source: str | None = None,
209236
workgroup: str = "primary",
237+
mode: Literal["append", "overwrite", "overwrite_partitions"] = "append",
210238
encryption: str | None = None,
211239
kms_key: str | None = None,
212240
boto3_session: boto3.Session | None = None,
@@ -254,6 +282,8 @@ def to_iceberg(
254282
Data Source / Catalog name. If None, 'AwsDataCatalog' will be used by default.
255283
workgroup : str
256284
Athena workgroup. Primary by default.
285+
mode: str
286+
``append`` (default), ``overwrite``, ``overwrite_partitions``.
257287
encryption : str, optional
258288
Valid values: [None, 'SSE_S3', 'SSE_KMS']. Notice: 'CSE_KMS' is not supported.
259289
kms_key : str, optional
@@ -318,22 +348,28 @@ def to_iceberg(
318348
... )
319349
320350
"""
321-
if df.empty is True:
322-
raise exceptions.EmptyDataFrame("DataFrame cannot be empty.")
323-
324351
wg_config: _WorkGroupConfig = _get_workgroup_config(session=boto3_session, workgroup=workgroup)
325352
temp_table: str = f"temp_table_{uuid.uuid4().hex}"
326353

327-
if not temp_path and not wg_config.s3_output:
328-
raise exceptions.InvalidArgumentCombination(
329-
"Either path or workgroup path must be specified to store the temporary results."
330-
)
354+
_validate_args(
355+
df=df,
356+
temp_path=temp_path,
357+
wg_config=wg_config,
358+
mode=mode,
359+
partition_cols=partition_cols,
360+
merge_cols=merge_cols,
361+
)
331362

332363
glue_table_settings = cast(
333364
GlueTableSettings,
334365
glue_table_settings if glue_table_settings else {},
335366
)
336367

368+
if mode == "overwrite":
369+
catalog.delete_table_if_exists(
370+
database=database, table=table, catalog_id=catalog_id, boto3_session=boto3_session
371+
)
372+
337373
try:
338374
# Create Iceberg table if it doesn't exist
339375
if not catalog.does_table_exist(
@@ -396,6 +432,24 @@ def to_iceberg(
396432
boto3_session=boto3_session,
397433
)
398434

435+
# if mode == "overwrite_partitions", drop matched partitions
436+
if mode == "overwrite_partitions":
437+
delete_from_iceberg_table(
438+
df=df,
439+
database=database,
440+
table=table,
441+
merge_cols=partition_cols, # type: ignore[arg-type]
442+
temp_path=temp_path,
443+
keep_files=False,
444+
data_source=data_source,
445+
workgroup=workgroup,
446+
encryption=encryption,
447+
kms_key=kms_key,
448+
boto3_session=boto3_session,
449+
s3_additional_kwargs=s3_additional_kwargs,
450+
catalog_id=catalog_id,
451+
)
452+
399453
# Create temporary external table, write the results
400454
s3.to_parquet(
401455
df=df,
@@ -453,3 +507,157 @@ def to_iceberg(
453507
boto3_session=boto3_session,
454508
s3_additional_kwargs=s3_additional_kwargs,
455509
)
510+
511+
512+
@apply_configs
513+
@_utils.validate_distributed_kwargs(
514+
unsupported_kwargs=["boto3_session", "s3_additional_kwargs"],
515+
)
516+
def delete_from_iceberg_table(
517+
df: pd.DataFrame,
518+
database: str,
519+
table: str,
520+
merge_cols: list[str],
521+
temp_path: str | None = None,
522+
keep_files: bool = True,
523+
data_source: str | None = None,
524+
workgroup: str = "primary",
525+
encryption: str | None = None,
526+
kms_key: str | None = None,
527+
boto3_session: boto3.Session | None = None,
528+
s3_additional_kwargs: dict[str, Any] | None = None,
529+
catalog_id: str | None = None,
530+
) -> None:
531+
"""
532+
Delete rows from an Iceberg table.
533+
534+
Creates temporary external table, writes staged files and then deletes any rows which match the contents of the temporary table.
535+
536+
Parameters
537+
----------
538+
df: pandas.DataFrame
539+
Pandas DataFrame containing the IDs of rows that are to be deleted from the Iceberg table.
540+
database: str
541+
Database name.
542+
table: str
543+
Table name.
544+
merge_cols: list[str]
545+
List of columns to be used to determine which rows of the Iceberg table should be deleted.
546+
547+
`MERGE INTO <https://docs.aws.amazon.com/athena/latest/ug/merge-into-statement.html>`_
548+
temp_path: str, optional
549+
S3 path to temporarily store the DataFrame.
550+
keep_files: bool
551+
Whether staging files produced by Athena are retained. ``True`` by default.
552+
data_source: str, optional
553+
The AWS KMS key ID or alias used to encrypt the data.
554+
workgroup: str, optional
555+
Athena workgroup name.
556+
encryption: str, optional
557+
Valid values: [``None``, ``"SSE_S3"``, ``"SSE_KMS"``]. Notice: ``"CSE_KMS"`` is not supported.
558+
kms_key: str, optional
559+
For SSE-KMS, this is the KMS key ARN or ID.
560+
boto3_session: boto3.Session(), optional
561+
Boto3 Session. The default boto3 session will be used if ``boto3_session`` receive None.
562+
s3_additional_kwargs: Optional[Dict[str, Any]]
563+
Forwarded to botocore requests.
564+
e.g. ```s3_additional_kwargs={"RequestPayer": "requester"}```
565+
catalog_id: str, optional
566+
The ID of the Data Catalog which contains the database and table.
567+
If none is provided, the AWS account ID is used by default.
568+
569+
Returns
570+
-------
571+
None
572+
573+
Examples
574+
--------
575+
>>> import awswrangler as wr
576+
>>> import pandas as pd
577+
>>> df = pd.DataFrame({"id": [1, 2, 3], "col": ["foo", "bar", "baz"]})
578+
>>> wr.athena.to_iceberg(
579+
... df=df,
580+
... database="my_database",
581+
... table="my_table",
582+
... temp_path="s3://bucket/temp/",
583+
... )
584+
>>> df_delete = pd.DataFrame({"id": [1, 3]})
585+
>>> wr.athena.delete_from_iceberg_table(
586+
... df=df_delete,
587+
... database="my_database",
588+
... table="my_table",
589+
... merge_cols=["id"],
590+
... )
591+
>>> wr.athena.read_sql_table(table="my_table", database="my_database")
592+
id col
593+
0 2 bar
594+
"""
595+
if df.empty is True:
596+
raise exceptions.EmptyDataFrame("DataFrame cannot be empty.")
597+
598+
if not merge_cols:
599+
raise exceptions.InvalidArgumentValue("Merge columns must be specified.")
600+
601+
wg_config: _WorkGroupConfig = _get_workgroup_config(session=boto3_session, workgroup=workgroup)
602+
temp_table: str = f"temp_table_{uuid.uuid4().hex}"
603+
604+
if not temp_path and not wg_config.s3_output:
605+
raise exceptions.InvalidArgumentCombination(
606+
"Either path or workgroup path must be specified to store the temporary results."
607+
)
608+
609+
if not catalog.does_table_exist(database=database, table=table, boto3_session=boto3_session, catalog_id=catalog_id):
610+
raise exceptions.InvalidTable(f"Table {table} does not exist in database {database}.")
611+
612+
df = df[merge_cols].drop_duplicates(ignore_index=True)
613+
614+
try:
615+
# Create temporary external table, write the results
616+
s3.to_parquet(
617+
df=df,
618+
path=temp_path or wg_config.s3_output,
619+
dataset=True,
620+
database=database,
621+
table=temp_table,
622+
boto3_session=boto3_session,
623+
s3_additional_kwargs=s3_additional_kwargs,
624+
catalog_id=catalog_id,
625+
index=False,
626+
)
627+
628+
sql_statement = f"""
629+
MERGE INTO "{database}"."{table}" target
630+
USING "{database}"."{temp_table}" source
631+
ON {' AND '.join([f'target."{x}" = source."{x}"' for x in merge_cols])}
632+
WHEN MATCHED THEN
633+
DELETE
634+
"""
635+
636+
query_execution_id: str = _start_query_execution(
637+
sql=sql_statement,
638+
workgroup=workgroup,
639+
wg_config=wg_config,
640+
database=database,
641+
data_source=data_source,
642+
encryption=encryption,
643+
kms_key=kms_key,
644+
boto3_session=boto3_session,
645+
)
646+
wait_query(query_execution_id=query_execution_id, boto3_session=boto3_session)
647+
648+
except Exception as ex:
649+
_logger.error(ex)
650+
651+
raise
652+
653+
finally:
654+
catalog.delete_table_if_exists(
655+
database=database, table=temp_table, boto3_session=boto3_session, catalog_id=catalog_id
656+
)
657+
658+
if keep_files is False:
659+
s3.delete_objects(
660+
path=temp_path or wg_config.s3_output, # type: ignore[arg-type]
661+
boto3_session=boto3_session,
662+
s3_additional_kwargs=s3_additional_kwargs,
663+
)

awswrangler/distributed/ray/modin/_utils.py

Lines changed: 12 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -2,7 +2,7 @@
22
from __future__ import annotations
33

44
from dataclasses import dataclass
5-
from typing import Any, Callable
5+
from typing import Any, Callable, cast
66

77
import modin.pandas as modin_pd
88
import pandas as pd
@@ -62,11 +62,21 @@ def _split_modin_frame(df: modin_pd.DataFrame, splits: int) -> list[ObjectRef[An
6262
return object_refs
6363

6464

65-
def _arrow_refs_to_df(arrow_refs: list[Callable[..., Any]], kwargs: dict[str, Any] | None) -> modin_pd.DataFrame:
65+
def _arrow_refs_to_df(
66+
arrow_refs: list[Callable[..., Any] | pa.Table], kwargs: dict[str, Any] | None
67+
) -> modin_pd.DataFrame:
6668
@ray_remote()
6769
def _is_not_empty(table: pa.Table) -> Any:
6870
return table.num_rows > 0 or table.num_columns > 0
6971

72+
if isinstance(arrow_refs[0], pa.Table):
73+
tables = cast(list[pa.Table], arrow_refs)
74+
tables = [table for table in tables if table.num_rows > 0 or table.num_columns > 0]
75+
return _to_modin(
76+
dataset=ray.data.from_arrow(tables) if len(tables) > 0 else ray.data.from_arrow([pa.Table.from_arrays([])]),
77+
to_pandas_kwargs=kwargs,
78+
)
79+
7080
ref_rows: list[bool] = ray_get([_is_not_empty(arrow_ref) for arrow_ref in arrow_refs])
7181
refs: list[Callable[..., Any]] = [ref for ref_rows, ref in zip(ref_rows, arrow_refs) if ref_rows]
7282
return _to_modin(

docs/source/api.rst

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -144,6 +144,7 @@ Amazon Athena
144144
start_query_execution
145145
stop_query_execution
146146
to_iceberg
147+
delete_from_iceberg_table
147148
unload
148149
wait_query
149150
create_prepared_statement

docs/source/scale.rst

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -116,6 +116,8 @@ This table lists the ``awswrangler`` APIs available in distributed mode (i.e. th
116116
+-------------------+------------------------------+------------------+
117117
| | ``to_iceberg`` ||
118118
+-------------------+------------------------------+------------------+
119+
| | ``delete_from_iceberg_table``| ✅ |
120+
+-------------------+------------------------------+------------------+
119121
| ``DynamoDB`` | ``read_items`` ||
120122
+-------------------+------------------------------+------------------+
121123
| | ``put_df`` ||

0 commit comments

Comments
 (0)