Skip to content

Commit 579d428

Browse files
authored
Merge branch 'main' into feature/iceberg-merge-into-full-functionality
2 parents 5c65f85 + c81581c commit 579d428

File tree

15 files changed

+203
-19
lines changed

15 files changed

+203
-19
lines changed

.github/workflows/cfn-nag.yml

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -26,7 +26,7 @@ jobs:
2626
steps:
2727
- uses: actions/checkout@v5
2828
- name: Use Node.js
29-
uses: actions/setup-node@v5
29+
uses: actions/setup-node@v6
3030
with:
3131
node-version: 18
3232
- name: Cache Node.js modules
@@ -49,7 +49,7 @@ jobs:
4949
- name: Rust latest
5050
run: rustup update
5151
- name: Install uv
52-
uses: astral-sh/setup-uv@v6
52+
uses: astral-sh/setup-uv@v7
5353
with:
5454
enable-cache: true
5555
- name: Set up cdk.json

.github/workflows/minimal-tests.yml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -37,7 +37,7 @@ jobs:
3737
- name: Rust latest
3838
run: rustup update
3939
- name: Install uv
40-
uses: astral-sh/setup-uv@v6
40+
uses: astral-sh/setup-uv@v7
4141
with:
4242
enable-cache: true
4343
- name: Install Requirements

.github/workflows/snyk.yml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -23,6 +23,6 @@ jobs:
2323
with:
2424
args: --severity-threshold=high --sarif-file-output=snyk.sarif
2525
- name: Upload result to GitHub Code Scanning
26-
uses: github/codeql-action/upload-sarif@v3
26+
uses: github/codeql-action/upload-sarif@v4
2727
with:
2828
sarif_file: snyk.sarif

.github/workflows/static-checking.yml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -29,7 +29,7 @@ jobs:
2929
- name: Rust latest
3030
run: rustup update
3131
- name: Install uv
32-
uses: astral-sh/setup-uv@v6
32+
uses: astral-sh/setup-uv@v7
3333
with:
3434
enable-cache: true
3535
- name: Install Requirements

awswrangler/athena/_executions.py

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -40,6 +40,7 @@ def start_query_execution(
4040
kms_key: str | None = None,
4141
params: dict[str, Any] | list[str] | None = None,
4242
paramstyle: Literal["qmark", "named"] = "named",
43+
result_reuse_configuration: dict[str, Any] | None = None,
4344
boto3_session: boto3.Session | None = None,
4445
client_request_token: str | None = None,
4546
athena_cache_settings: typing.AthenaCacheSettings | None = None,
@@ -87,6 +88,9 @@ def start_query_execution(
8788
8889
- ``named``
8990
- ``qmark``
91+
result_reuse_configuration
92+
A structure that contains the configuration settings for reusing query results.
93+
See also: https://docs.aws.amazon.com/athena/latest/ug/reusing-query-results.html
9094
boto3_session
9195
The default boto3 session will be used if **boto3_session** receive ``None``.
9296
client_request_token
@@ -156,6 +160,7 @@ def start_query_execution(
156160
encryption=encryption,
157161
kms_key=kms_key,
158162
execution_params=execution_params,
163+
result_reuse_configuration=result_reuse_configuration,
159164
client_request_token=client_request_token,
160165
boto3_session=boto3_session,
161166
)

awswrangler/athena/_executions.pyi

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,7 @@ def start_query_execution(
1818
kms_key: str | None = ...,
1919
params: dict[str, Any] | list[str] | None = ...,
2020
paramstyle: Literal["qmark", "named"] = ...,
21+
result_reuse_configuration: dict[str, Any] | None = ...,
2122
boto3_session: boto3.Session | None = ...,
2223
athena_cache_settings: typing.AthenaCacheSettings | None = ...,
2324
athena_query_wait_polling_delay: float = ...,
@@ -35,6 +36,7 @@ def start_query_execution(
3536
kms_key: str | None = ...,
3637
params: dict[str, Any] | list[str] | None = ...,
3738
paramstyle: Literal["qmark", "named"] = ...,
39+
result_reuse_configuration: dict[str, Any] | None = ...,
3840
boto3_session: boto3.Session | None = ...,
3941
athena_cache_settings: typing.AthenaCacheSettings | None = ...,
4042
athena_query_wait_polling_delay: float = ...,
@@ -52,6 +54,7 @@ def start_query_execution(
5254
kms_key: str | None = ...,
5355
params: dict[str, Any] | list[str] | None = ...,
5456
paramstyle: Literal["qmark", "named"] = ...,
57+
result_reuse_configuration: dict[str, Any] | None = ...,
5558
boto3_session: boto3.Session | None = ...,
5659
athena_cache_settings: typing.AthenaCacheSettings | None = ...,
5760
athena_query_wait_polling_delay: float = ...,

awswrangler/athena/_read.py

Lines changed: 15 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -427,6 +427,7 @@ def _resolve_query_without_cache_regular(
427427
s3_additional_kwargs: dict[str, Any] | None,
428428
boto3_session: boto3.Session | None,
429429
execution_params: list[str] | None = None,
430+
result_reuse_configuration: dict[str, Any] | None = None,
430431
dtype_backend: Literal["numpy_nullable", "pyarrow"] = "numpy_nullable",
431432
client_request_token: str | None = None,
432433
) -> pd.DataFrame | Iterator[pd.DataFrame]:
@@ -444,6 +445,7 @@ def _resolve_query_without_cache_regular(
444445
encryption=encryption,
445446
kms_key=kms_key,
446447
execution_params=execution_params,
448+
result_reuse_configuration=result_reuse_configuration,
447449
client_request_token=client_request_token,
448450
boto3_session=boto3_session,
449451
)
@@ -467,7 +469,7 @@ def _resolve_query_without_cache_regular(
467469
)
468470

469471

470-
def _resolve_query_without_cache(
472+
def _resolve_query_without_cache( # noqa: PLR0913
471473
sql: str,
472474
database: str,
473475
data_source: str | None,
@@ -491,6 +493,7 @@ def _resolve_query_without_cache(
491493
boto3_session: boto3.Session | None,
492494
pyarrow_additional_kwargs: dict[str, Any] | None = None,
493495
execution_params: list[str] | None = None,
496+
result_reuse_configuration: dict[str, Any] | None = None,
494497
dtype_backend: Literal["numpy_nullable", "pyarrow"] = "numpy_nullable",
495498
client_request_token: str | None = None,
496499
) -> pd.DataFrame | Iterator[pd.DataFrame]:
@@ -572,6 +575,7 @@ def _resolve_query_without_cache(
572575
s3_additional_kwargs=s3_additional_kwargs,
573576
boto3_session=boto3_session,
574577
execution_params=execution_params,
578+
result_reuse_configuration=result_reuse_configuration,
575579
dtype_backend=dtype_backend,
576580
client_request_token=client_request_token,
577581
)
@@ -785,6 +789,7 @@ def read_sql_query(
785789
athena_query_wait_polling_delay: float = _QUERY_WAIT_POLLING_DELAY,
786790
params: dict[str, Any] | list[str] | None = None,
787791
paramstyle: Literal["qmark", "named"] = "named",
792+
result_reuse_configuration: dict[str, Any] | None = None,
788793
dtype_backend: Literal["numpy_nullable", "pyarrow"] = "numpy_nullable",
789794
s3_additional_kwargs: dict[str, Any] | None = None,
790795
pyarrow_additional_kwargs: dict[str, Any] | None = None,
@@ -980,6 +985,10 @@ def read_sql_query(
980985
981986
- ``named``
982987
- ``qmark``
988+
result_reuse_configuration
989+
A structure that contains the configuration settings for reusing query results.
990+
This parameter is only valid when both `ctas_approach` and `unload_approach` are set to `False`.
991+
See also: https://docs.aws.amazon.com/athena/latest/ug/reusing-query-results.html
983992
dtype_backend
984993
Which dtype_backend to use, e.g. whether a DataFrame should have NumPy arrays,
985994
nullable dtypes are used for all dtypes that have a nullable implementation when
@@ -1040,6 +1049,10 @@ def read_sql_query(
10401049
raise exceptions.InvalidArgumentCombination(
10411050
"Using `client_request_token` is only allowed when `ctas_approach=False` and `unload_approach=False`."
10421051
)
1052+
if result_reuse_configuration and (ctas_approach or unload_approach):
1053+
raise exceptions.InvalidArgumentCombination(
1054+
"Using `result_reuse_configuration` is only allowed when `ctas_approach=False` and `unload_approach=False`."
1055+
)
10431056
chunksize = sys.maxsize if ctas_approach is False and chunksize is True else chunksize
10441057

10451058
# Substitute query parameters if applicable
@@ -1104,6 +1117,7 @@ def read_sql_query(
11041117
boto3_session=boto3_session,
11051118
pyarrow_additional_kwargs=pyarrow_additional_kwargs,
11061119
execution_params=execution_params,
1120+
result_reuse_configuration=result_reuse_configuration,
11071121
dtype_backend=dtype_backend,
11081122
client_request_token=client_request_token,
11091123
)

awswrangler/athena/_utils.py

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -93,6 +93,7 @@ def _start_query_execution(
9393
encryption: str | None = None,
9494
kms_key: str | None = None,
9595
execution_params: list[str] | None = None,
96+
result_reuse_configuration: dict[str, Any] | None = None,
9697
client_request_token: str | None = None,
9798
boto3_session: boto3.Session | None = None,
9899
) -> str:
@@ -130,6 +131,9 @@ def _start_query_execution(
130131
if execution_params:
131132
args["ExecutionParameters"] = execution_params
132133

134+
if result_reuse_configuration:
135+
args["ResultReuseConfiguration"] = result_reuse_configuration
136+
133137
client_athena = _utils.client(service_name="athena", session=boto3_session)
134138
_logger.debug("Starting query execution with args: \n%s", pprint.pformat(args))
135139
response = _utils.try_it(

awswrangler/mysql.py

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -93,7 +93,7 @@ def connect(
9393
write_timeout: int | None = None,
9494
connect_timeout: int = 10,
9595
cursorclass: type["Cursor"] | None = None,
96-
) -> "pymysql.connections.Connection": # type: ignore[type-arg]
96+
) -> "pymysql.connections.Connection":
9797
"""Return a pymysql connection from a Glue Catalog Connection or Secrets Manager.
9898
9999
https://pymysql.readthedocs.io
@@ -231,7 +231,7 @@ def read_sql_query(
231231
@_utils.check_optional_dependency(pymysql, "pymysql")
232232
def read_sql_query(
233233
sql: str,
234-
con: "pymysql.connections.Connection", # type: ignore[type-arg]
234+
con: "pymysql.connections.Connection",
235235
index_col: str | list[str] | None = None,
236236
params: list[Any] | tuple[Any, ...] | dict[Any, Any] | None = None,
237237
chunksize: int | None = None,
@@ -351,7 +351,7 @@ def read_sql_table(
351351
@_utils.check_optional_dependency(pymysql, "pymysql")
352352
def read_sql_table(
353353
table: str,
354-
con: "pymysql.connections.Connection", # type: ignore[type-arg]
354+
con: "pymysql.connections.Connection",
355355
schema: str | None = None,
356356
index_col: str | list[str] | None = None,
357357
params: list[Any] | tuple[Any, ...] | dict[Any, Any] | None = None,
@@ -439,7 +439,7 @@ def read_sql_table(
439439
@apply_configs
440440
def to_sql(
441441
df: pd.DataFrame,
442-
con: "pymysql.connections.Connection", # type: ignore[type-arg]
442+
con: "pymysql.connections.Connection",
443443
table: str,
444444
schema: str,
445445
mode: _ToSqlModeLiteral = "append",

awswrangler/redshift/_read.py

Lines changed: 54 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -241,6 +241,7 @@ def unload_to_files(
241241
kms_key_id: str | None = None,
242242
manifest: bool = False,
243243
partition_cols: list[str] | None = None,
244+
cleanpath: bool = False,
244245
boto3_session: boto3.Session | None = None,
245246
) -> None:
246247
"""Unload Parquet files on s3 from a Redshift query result (Through the UNLOAD command).
@@ -294,6 +295,21 @@ def unload_to_files(
294295
Unload a manifest file on S3.
295296
partition_cols
296297
Specifies the partition keys for the unload operation.
298+
cleanpath
299+
Use CLEANPATH instead of ALLOWOVERWRITE. When True, uses CLEANPATH to remove existing files
300+
located in the Amazon S3 path before unloading files. When False (default), uses ALLOWOVERWRITE
301+
to overwrite existing files, including the manifest file. These options are mutually exclusive.
302+
303+
ALLOWOVERWRITE: By default, UNLOAD fails if it finds files that it would possibly overwrite.
304+
If ALLOWOVERWRITE is specified, UNLOAD overwrites existing files, including the manifest file.
305+
306+
CLEANPATH: Removes existing files located in the Amazon S3 path specified in the TO clause
307+
before unloading files to the specified location. If you include the PARTITION BY clause,
308+
existing files are removed only from the partition folders to receive new files generated
309+
by the UNLOAD operation. You must have the s3:DeleteObject permission on the Amazon S3 bucket.
310+
Files removed using CLEANPATH are permanently deleted and can't be recovered.
311+
312+
For more information, see: https://docs.aws.amazon.com/redshift/latest/dg/r_UNLOAD.html
297313
boto3_session
298314
The default boto3 session will be used if **boto3_session** is ``None``.
299315
@@ -307,6 +323,15 @@ def unload_to_files(
307323
... con=con,
308324
... iam_role="arn:aws:iam::XXX:role/XXX"
309325
... )
326+
>>> # Using CLEANPATH instead of ALLOWOVERWRITE
327+
>>> with wr.redshift.connect("MY_GLUE_CONNECTION") as con:
328+
... wr.redshift.unload_to_files(
329+
... sql="SELECT * FROM public.mytable",
330+
... path="s3://bucket/extracted_parquet_files/",
331+
... con=con,
332+
... iam_role="arn:aws:iam::XXX:role/XXX",
333+
... cleanpath=True
334+
... )
310335
311336
312337
"""
@@ -339,11 +364,13 @@ def unload_to_files(
339364
# Escape quotation marks in SQL
340365
sql = sql.replace("'", "''")
341366

367+
overwrite_str: str = "CLEANPATH" if cleanpath else "ALLOWOVERWRITE"
368+
342369
unload_sql = (
343370
f"UNLOAD ('{sql}')\n"
344371
f"TO '{path}'\n"
345372
f"{auth_str}"
346-
"ALLOWOVERWRITE\n"
373+
f"{overwrite_str}\n"
347374
f"{parallel_str}\n"
348375
f"FORMAT {format_str}\n"
349376
"ENCRYPTED"
@@ -376,6 +403,7 @@ def unload(
376403
chunked: bool | int = False,
377404
keep_files: bool = False,
378405
parallel: bool = True,
406+
cleanpath: bool = False,
379407
use_threads: bool | int = True,
380408
boto3_session: boto3.Session | None = None,
381409
s3_additional_kwargs: dict[str, str] | None = None,
@@ -452,6 +480,21 @@ def unload(
452480
By default, UNLOAD writes data in parallel to multiple files, according to the number of
453481
slices in the cluster. If parallel is False, UNLOAD writes to one or more data files serially,
454482
sorted absolutely according to the ORDER BY clause, if one is used.
483+
cleanpath
484+
Use CLEANPATH instead of ALLOWOVERWRITE. When True, uses CLEANPATH to remove existing files
485+
located in the Amazon S3 path before unloading files. When False (default), uses ALLOWOVERWRITE
486+
to overwrite existing files, including the manifest file. These options are mutually exclusive.
487+
488+
ALLOWOVERWRITE: By default, UNLOAD fails if it finds files that it would possibly overwrite.
489+
If ALLOWOVERWRITE is specified, UNLOAD overwrites existing files, including the manifest file.
490+
491+
CLEANPATH: Removes existing files located in the Amazon S3 path specified in the TO clause
492+
before unloading files to the specified location. If you include the PARTITION BY clause,
493+
existing files are removed only from the partition folders to receive new files generated
494+
by the UNLOAD operation. You must have the s3:DeleteObject permission on the Amazon S3 bucket.
495+
Files removed using CLEANPATH are permanently deleted and can't be recovered.
496+
497+
For more information, see: https://docs.aws.amazon.com/redshift/latest/dg/r_UNLOAD.html
455498
dtype_backend
456499
Which dtype_backend to use, e.g. whether a DataFrame should have NumPy arrays,
457500
nullable dtypes are used for all dtypes that have a nullable implementation when
@@ -489,6 +532,15 @@ def unload(
489532
... con=con,
490533
... iam_role="arn:aws:iam::XXX:role/XXX"
491534
... )
535+
>>> # Using CLEANPATH instead of ALLOWOVERWRITE
536+
>>> with wr.redshift.connect("MY_GLUE_CONNECTION") as con:
537+
... df = wr.redshift.unload(
538+
... sql="SELECT * FROM public.mytable",
539+
... path="s3://bucket/extracted_parquet_files/",
540+
... con=con,
541+
... iam_role="arn:aws:iam::XXX:role/XXX",
542+
... cleanpath=True
543+
... )
492544
493545
"""
494546
path = path if path.endswith("/") else f"{path}/"
@@ -505,6 +557,7 @@ def unload(
505557
kms_key_id=kms_key_id,
506558
manifest=False,
507559
parallel=parallel,
560+
cleanpath=cleanpath,
508561
boto3_session=boto3_session,
509562
)
510563
if chunked is False:

0 commit comments

Comments
 (0)