Skip to content

Commit 225c55e

Browse files
authored
Merge branch 'main' into add-s3_output-parameter-to-start_query_execution_call
2 parents 4361e00 + 005955a commit 225c55e

File tree

6 files changed

+111
-5
lines changed

6 files changed

+111
-5
lines changed

awswrangler/athena/_executions.py

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -40,6 +40,7 @@ def start_query_execution(
4040
kms_key: str | None = None,
4141
params: dict[str, Any] | list[str] | None = None,
4242
paramstyle: Literal["qmark", "named"] = "named",
43+
result_reuse_configuration: dict[str, Any] | None = None,
4344
boto3_session: boto3.Session | None = None,
4445
client_request_token: str | None = None,
4546
athena_cache_settings: typing.AthenaCacheSettings | None = None,
@@ -87,6 +88,9 @@ def start_query_execution(
8788
8889
- ``named``
8990
- ``qmark``
91+
result_reuse_configuration
92+
A structure that contains the configuration settings for reusing query results.
93+
See also: https://docs.aws.amazon.com/athena/latest/ug/reusing-query-results.html
9094
boto3_session
9195
The default boto3 session will be used if **boto3_session** receive ``None``.
9296
client_request_token
@@ -156,6 +160,7 @@ def start_query_execution(
156160
encryption=encryption,
157161
kms_key=kms_key,
158162
execution_params=execution_params,
163+
result_reuse_configuration=result_reuse_configuration,
159164
client_request_token=client_request_token,
160165
boto3_session=boto3_session,
161166
)

awswrangler/athena/_executions.pyi

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,7 @@ def start_query_execution(
1818
kms_key: str | None = ...,
1919
params: dict[str, Any] | list[str] | None = ...,
2020
paramstyle: Literal["qmark", "named"] = ...,
21+
result_reuse_configuration: dict[str, Any] | None = ...,
2122
boto3_session: boto3.Session | None = ...,
2223
athena_cache_settings: typing.AthenaCacheSettings | None = ...,
2324
athena_query_wait_polling_delay: float = ...,
@@ -35,6 +36,7 @@ def start_query_execution(
3536
kms_key: str | None = ...,
3637
params: dict[str, Any] | list[str] | None = ...,
3738
paramstyle: Literal["qmark", "named"] = ...,
39+
result_reuse_configuration: dict[str, Any] | None = ...,
3840
boto3_session: boto3.Session | None = ...,
3941
athena_cache_settings: typing.AthenaCacheSettings | None = ...,
4042
athena_query_wait_polling_delay: float = ...,
@@ -52,6 +54,7 @@ def start_query_execution(
5254
kms_key: str | None = ...,
5355
params: dict[str, Any] | list[str] | None = ...,
5456
paramstyle: Literal["qmark", "named"] = ...,
57+
result_reuse_configuration: dict[str, Any] | None = ...,
5558
boto3_session: boto3.Session | None = ...,
5659
athena_cache_settings: typing.AthenaCacheSettings | None = ...,
5760
athena_query_wait_polling_delay: float = ...,

awswrangler/athena/_read.py

Lines changed: 15 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -427,6 +427,7 @@ def _resolve_query_without_cache_regular(
427427
s3_additional_kwargs: dict[str, Any] | None,
428428
boto3_session: boto3.Session | None,
429429
execution_params: list[str] | None = None,
430+
result_reuse_configuration: dict[str, Any] | None = None,
430431
dtype_backend: Literal["numpy_nullable", "pyarrow"] = "numpy_nullable",
431432
client_request_token: str | None = None,
432433
) -> pd.DataFrame | Iterator[pd.DataFrame]:
@@ -444,6 +445,7 @@ def _resolve_query_without_cache_regular(
444445
encryption=encryption,
445446
kms_key=kms_key,
446447
execution_params=execution_params,
448+
result_reuse_configuration=result_reuse_configuration,
447449
client_request_token=client_request_token,
448450
boto3_session=boto3_session,
449451
)
@@ -467,7 +469,7 @@ def _resolve_query_without_cache_regular(
467469
)
468470

469471

470-
def _resolve_query_without_cache(
472+
def _resolve_query_without_cache( # noqa: PLR0913
471473
sql: str,
472474
database: str,
473475
data_source: str | None,
@@ -491,6 +493,7 @@ def _resolve_query_without_cache(
491493
boto3_session: boto3.Session | None,
492494
pyarrow_additional_kwargs: dict[str, Any] | None = None,
493495
execution_params: list[str] | None = None,
496+
result_reuse_configuration: dict[str, Any] | None = None,
494497
dtype_backend: Literal["numpy_nullable", "pyarrow"] = "numpy_nullable",
495498
client_request_token: str | None = None,
496499
) -> pd.DataFrame | Iterator[pd.DataFrame]:
@@ -572,6 +575,7 @@ def _resolve_query_without_cache(
572575
s3_additional_kwargs=s3_additional_kwargs,
573576
boto3_session=boto3_session,
574577
execution_params=execution_params,
578+
result_reuse_configuration=result_reuse_configuration,
575579
dtype_backend=dtype_backend,
576580
client_request_token=client_request_token,
577581
)
@@ -785,6 +789,7 @@ def read_sql_query(
785789
athena_query_wait_polling_delay: float = _QUERY_WAIT_POLLING_DELAY,
786790
params: dict[str, Any] | list[str] | None = None,
787791
paramstyle: Literal["qmark", "named"] = "named",
792+
result_reuse_configuration: dict[str, Any] | None = None,
788793
dtype_backend: Literal["numpy_nullable", "pyarrow"] = "numpy_nullable",
789794
s3_additional_kwargs: dict[str, Any] | None = None,
790795
pyarrow_additional_kwargs: dict[str, Any] | None = None,
@@ -980,6 +985,10 @@ def read_sql_query(
980985
981986
- ``named``
982987
- ``qmark``
988+
result_reuse_configuration
989+
A structure that contains the configuration settings for reusing query results.
990+
This parameter is only valid when both `ctas_approach` and `unload_approach` are set to `False`.
991+
See also: https://docs.aws.amazon.com/athena/latest/ug/reusing-query-results.html
983992
dtype_backend
984993
Which dtype_backend to use, e.g. whether a DataFrame should have NumPy arrays,
985994
nullable dtypes are used for all dtypes that have a nullable implementation when
@@ -1040,6 +1049,10 @@ def read_sql_query(
10401049
raise exceptions.InvalidArgumentCombination(
10411050
"Using `client_request_token` is only allowed when `ctas_approach=False` and `unload_approach=False`."
10421051
)
1052+
if result_reuse_configuration and (ctas_approach or unload_approach):
1053+
raise exceptions.InvalidArgumentCombination(
1054+
"Using `result_reuse_configuration` is only allowed when `ctas_approach=False` and `unload_approach=False`."
1055+
)
10431056
chunksize = sys.maxsize if ctas_approach is False and chunksize is True else chunksize
10441057

10451058
# Substitute query parameters if applicable
@@ -1104,6 +1117,7 @@ def read_sql_query(
11041117
boto3_session=boto3_session,
11051118
pyarrow_additional_kwargs=pyarrow_additional_kwargs,
11061119
execution_params=execution_params,
1120+
result_reuse_configuration=result_reuse_configuration,
11071121
dtype_backend=dtype_backend,
11081122
client_request_token=client_request_token,
11091123
)

awswrangler/athena/_utils.py

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -86,6 +86,7 @@ def _start_query_execution(
8686
encryption: str | None = None,
8787
kms_key: str | None = None,
8888
execution_params: list[str] | None = None,
89+
result_reuse_configuration: dict[str, Any] | None = None,
8990
client_request_token: str | None = None,
9091
boto3_session: boto3.Session | None = None,
9192
) -> str:
@@ -123,6 +124,9 @@ def _start_query_execution(
123124
if execution_params:
124125
args["ExecutionParameters"] = execution_params
125126

127+
if result_reuse_configuration:
128+
args["ResultReuseConfiguration"] = result_reuse_configuration
129+
126130
client_athena = _utils.client(service_name="athena", session=boto3_session)
127131
_logger.debug("Starting query execution with args: \n%s", pprint.pformat(args))
128132
response = _utils.try_it(

pyproject.toml

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -53,9 +53,9 @@ opensearch = [
5353
]
5454
openpyxl = ["openpyxl>=3.0.0,<4"]
5555
progressbar = ["progressbar2>=4.0.0,<5"]
56-
deltalake = ["deltalake>=0.18.0,<1.2.0"]
56+
deltalake = ["deltalake>=0.18.0,<1.3.0"]
5757
geopandas = ["geopandas>=1.0.0,<2"]
58-
modin = ["modin>=0.31,<0.36"]
58+
modin = ["modin>=0.31,<0.38"]
5959
ray = ["ray[default, data]>=2.49.0,<3"]
6060

6161
[project.urls]
@@ -72,7 +72,7 @@ dev = [
7272
"boto3-stubs[athena, cleanrooms, chime, cloudwatch, dynamodb, ec2, emr, emr-serverless, glue, kms, logs, neptune, opensearch, opensearchserverless, quicksight, rds, rds-data, redshift, redshift-data, s3, secretsmanager, ssm, sts, timestream-query, timestream-write]>=1.36.2,<2",
7373
"doc8~=1.1",
7474
"mypy~=1.14",
75-
"ruff>=0.9.2,<0.13.0",
75+
"ruff>=0.9.2,<0.15.0",
7676
"moto~=5.0",
7777
"openpyxl~=3.1",
7878
"pyparsing>=3.2.1,<4",
@@ -83,7 +83,7 @@ dev = [
8383
"pytest-xdist>=3.6.1,<4",
8484
"s3fs==0.4.2",
8585
"tox>=4.23.2,<5",
86-
"tox-uv==1.28.0",
86+
"tox-uv==1.28.1",
8787
"bump-my-version>=0.29,<1.3",
8888
"IPython>=8.18.1,<9",
8989
"jupyterlab~=4.3",

tests/unit/test_athena.py

Lines changed: 80 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -32,6 +32,86 @@
3232
pytestmark = pytest.mark.distributed
3333

3434

35+
def test_start_query_execution_with_result_reuse_configuration(path, glue_database, glue_table):
36+
df = pd.DataFrame({"c0": [0, 1], "c1": ["foo", "bar"]})
37+
wr.s3.to_parquet(
38+
df=df,
39+
path=path,
40+
dataset=True,
41+
database=glue_database,
42+
table=glue_table,
43+
mode="overwrite",
44+
)
45+
46+
sql = f"select * from {glue_table}"
47+
result_reuse_configuration = {"ResultReuseByAgeConfiguration": {"Enabled": True, "MaxAgeInMinutes": 1}}
48+
query_execution_result1 = wr.athena.start_query_execution(
49+
sql=sql, database=glue_database, result_reuse_configuration=result_reuse_configuration, wait=True
50+
)
51+
assert query_execution_result1["Query"] == sql
52+
assert query_execution_result1["ResultReuseConfiguration"] == result_reuse_configuration
53+
assert not query_execution_result1["Statistics"]["ResultReuseInformation"]["ReusedPreviousResult"]
54+
55+
query_execution_result2 = wr.athena.start_query_execution(
56+
sql=sql, database=glue_database, result_reuse_configuration=result_reuse_configuration, wait=True
57+
)
58+
assert query_execution_result2["Query"] == sql
59+
assert query_execution_result2["ResultReuseConfiguration"] == result_reuse_configuration
60+
assert query_execution_result2["Statistics"]["ResultReuseInformation"]["ReusedPreviousResult"]
61+
62+
63+
def test_read_sql_query_with_result_reuse_configuration(path, glue_database, glue_table):
64+
df = pd.DataFrame({"c0": [0, 1], "c1": ["foo", "bar"]})
65+
wr.s3.to_parquet(
66+
df=df,
67+
path=path,
68+
dataset=True,
69+
database=glue_database,
70+
table=glue_table,
71+
mode="overwrite",
72+
)
73+
74+
sql = f"select * from {glue_table}"
75+
result_reuse_configuration = {"ResultReuseByAgeConfiguration": {"Enabled": True, "MaxAgeInMinutes": 1}}
76+
df1 = wr.athena.read_sql_query(
77+
sql=sql,
78+
database=glue_database,
79+
ctas_approach=False,
80+
unload_approach=False,
81+
result_reuse_configuration=result_reuse_configuration,
82+
)
83+
df2 = wr.athena.read_sql_query(
84+
sql=sql,
85+
database=glue_database,
86+
ctas_approach=False,
87+
unload_approach=False,
88+
result_reuse_configuration=result_reuse_configuration,
89+
)
90+
assert pandas_equals(df1, df2)
91+
assert not df1.query_metadata["Statistics"]["ResultReuseInformation"]["ReusedPreviousResult"]
92+
assert df2.query_metadata["Statistics"]["ResultReuseInformation"]["ReusedPreviousResult"]
93+
94+
95+
def test_read_sql_query_with_result_reuse_configuration_error(glue_database):
96+
# default behavior: ctas_approach is True and unload_approach is False
97+
with pytest.raises(wr.exceptions.InvalidArgumentCombination):
98+
wr.athena.read_sql_query(
99+
sql="select 1",
100+
database=glue_database,
101+
result_reuse_configuration={"ResultReuseByAgeConfiguration": {"Enabled": True, "MaxAgeInMinutes": 1}},
102+
)
103+
104+
# ctas_approach is False and default unload_approach is False
105+
with pytest.raises(wr.exceptions.InvalidArgumentCombination):
106+
wr.athena.read_sql_query(
107+
sql="select 1",
108+
database=glue_database,
109+
ctas_approach=False,
110+
unload_approach=True,
111+
result_reuse_configuration={"ResultReuseByAgeConfiguration": {"Enabled": True, "MaxAgeInMinutes": 1}},
112+
)
113+
114+
35115
def test_athena_ctas(path, path2, path3, glue_table, glue_table2, glue_database, glue_ctas_database, kms_key):
36116
df = get_df_list()
37117
columns_types, partitions_types = wr.catalog.extract_athena_types(df=df, partition_cols=["par0", "par1"])

0 commit comments

Comments
 (0)