Skip to content

Commit 9da6c5e

Browse files
committed
reuse query results is not suported for CTAS and UNLOAD
1 parent 1f49b03 commit 9da6c5e

File tree

4 files changed

+64
-49
lines changed

4 files changed

+64
-49
lines changed

awswrangler/athena/_executions.py

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -90,6 +90,7 @@ def start_query_execution(
9090
- ``qmark``
9191
result_reuse_configuration
9292
A structure that contains the configuration settings for reusing query results.
93+
See also: https://docs.aws.amazon.com/athena/latest/ug/reusing-query-results.html
9394
boto3_session
9495
The default boto3 session will be used if **boto3_session** receive ``None``.
9596
client_request_token

awswrangler/athena/_read.py

Lines changed: 10 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -320,7 +320,6 @@ def _resolve_query_without_cache_ctas(
320320
boto3_session: boto3.Session | None,
321321
pyarrow_additional_kwargs: dict[str, Any] | None = None,
322322
execution_params: list[str] | None = None,
323-
result_reuse_configuration: dict[str, Any] | None = None,
324323
dtype_backend: Literal["numpy_nullable", "pyarrow"] = "numpy_nullable",
325324
) -> pd.DataFrame | Iterator[pd.DataFrame]:
326325
ctas_query_info: dict[str, str | _QueryMetadata] = create_ctas_table(
@@ -340,7 +339,6 @@ def _resolve_query_without_cache_ctas(
340339
boto3_session=boto3_session,
341340
params=execution_params,
342341
paramstyle="qmark",
343-
result_reuse_configuration=result_reuse_configuration,
344342
)
345343
fully_qualified_name: str = f'"{ctas_query_info["ctas_database"]}"."{ctas_query_info["ctas_table"]}"'
346344
ctas_query_metadata = cast(_QueryMetadata, ctas_query_info["ctas_query_metadata"])
@@ -380,7 +378,6 @@ def _resolve_query_without_cache_unload(
380378
boto3_session: boto3.Session | None,
381379
pyarrow_additional_kwargs: dict[str, Any] | None = None,
382380
execution_params: list[str] | None = None,
383-
result_reuse_configuration: dict[str, Any] | None = None,
384381
dtype_backend: Literal["numpy_nullable", "pyarrow"] = "numpy_nullable",
385382
) -> pd.DataFrame | Iterator[pd.DataFrame]:
386383
query_metadata = _unload(
@@ -398,7 +395,6 @@ def _resolve_query_without_cache_unload(
398395
data_source=data_source,
399396
athena_query_wait_polling_delay=athena_query_wait_polling_delay,
400397
execution_params=execution_params,
401-
result_reuse_configuration=result_reuse_configuration,
402398
)
403399
if file_format == "PARQUET":
404400
return _fetch_parquet_result(
@@ -533,7 +529,6 @@ def _resolve_query_without_cache( # noqa: PLR0913
533529
boto3_session=boto3_session,
534530
pyarrow_additional_kwargs=pyarrow_additional_kwargs,
535531
execution_params=execution_params,
536-
result_reuse_configuration=result_reuse_configuration,
537532
dtype_backend=dtype_backend,
538533
)
539534
finally:
@@ -562,7 +557,6 @@ def _resolve_query_without_cache( # noqa: PLR0913
562557
boto3_session=boto3_session,
563558
pyarrow_additional_kwargs=pyarrow_additional_kwargs,
564559
execution_params=execution_params,
565-
result_reuse_configuration=result_reuse_configuration,
566560
dtype_backend=dtype_backend,
567561
)
568562
return _resolve_query_without_cache_regular(
@@ -602,7 +596,6 @@ def _unload(
602596
data_source: str | None,
603597
athena_query_wait_polling_delay: float,
604598
execution_params: list[str] | None,
605-
result_reuse_configuration: dict[str, Any] | None = None,
606599
) -> _QueryMetadata:
607600
wg_config: _WorkGroupConfig = _get_workgroup_config(session=boto3_session, workgroup=workgroup)
608601
s3_output: str = _get_s3_output(s3_output=path, wg_config=wg_config, boto3_session=boto3_session)
@@ -635,7 +628,6 @@ def _unload(
635628
kms_key=kms_key,
636629
boto3_session=boto3_session,
637630
execution_params=execution_params,
638-
result_reuse_configuration=result_reuse_configuration,
639631
)
640632
except botocore.exceptions.ClientError as ex:
641633
msg: str = str(ex)
@@ -797,6 +789,7 @@ def read_sql_query(
797789
athena_query_wait_polling_delay: float = _QUERY_WAIT_POLLING_DELAY,
798790
params: dict[str, Any] | list[str] | None = None,
799791
paramstyle: Literal["qmark", "named"] = "named",
792+
result_reuse_configuration: dict[str, Any] | None = None,
800793
dtype_backend: Literal["numpy_nullable", "pyarrow"] = "numpy_nullable",
801794
s3_additional_kwargs: dict[str, Any] | None = None,
802795
pyarrow_additional_kwargs: dict[str, Any] | None = None,
@@ -992,6 +985,10 @@ def read_sql_query(
992985
993986
- ``named``
994987
- ``qmark``
988+
result_reuse_configuration
989+
A structure that contains the configuration settings for reusing query results.
990+
This parameter is only valid when both `ctas_approach` and `unload_approach` are set to `False`.
991+
See also: https://docs.aws.amazon.com/athena/latest/ug/reusing-query-results.html
995992
dtype_backend
996993
Which dtype_backend to use, e.g. whether a DataFrame should have NumPy arrays,
997994
nullable dtypes are used for all dtypes that have a nullable implementation when
@@ -1052,6 +1049,10 @@ def read_sql_query(
10521049
raise exceptions.InvalidArgumentCombination(
10531050
"Using `client_request_token` is only allowed when `ctas_approach=False` and `unload_approach=False`."
10541051
)
1052+
if result_reuse_configuration and (ctas_approach or unload_approach):
1053+
raise exceptions.InvalidArgumentCombination(
1054+
"Using `result_reuse_configuration` is only allowed when `ctas_approach=False` and `unload_approach=False`."
1055+
)
10551056
chunksize = sys.maxsize if ctas_approach is False and chunksize is True else chunksize
10561057

10571058
# Substitute query parameters if applicable
@@ -1116,7 +1117,7 @@ def read_sql_query(
11161117
boto3_session=boto3_session,
11171118
pyarrow_additional_kwargs=pyarrow_additional_kwargs,
11181119
execution_params=execution_params,
1119-
result_reuse_configuration=cache_info.result_reuse_configuration,
1120+
result_reuse_configuration=result_reuse_configuration,
11201121
dtype_backend=dtype_backend,
11211122
client_request_token=client_request_token,
11221123
)
@@ -1384,7 +1385,6 @@ def unload(
13841385
data_source: str | None = None,
13851386
params: dict[str, Any] | list[str] | None = None,
13861387
paramstyle: Literal["qmark", "named"] = "named",
1387-
result_reuse_configuration: dict[str, Any] | None = None,
13881388
athena_query_wait_polling_delay: float = _QUERY_WAIT_POLLING_DELAY,
13891389
) -> _QueryMetadata:
13901390
"""Write query results from a SELECT statement to the specified data format using UNLOAD.
@@ -1473,5 +1473,4 @@ def unload(
14731473
boto3_session=boto3_session,
14741474
data_source=data_source,
14751475
execution_params=execution_params,
1476-
result_reuse_configuration=result_reuse_configuration,
14771476
)

awswrangler/athena/_utils.py

Lines changed: 0 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -653,7 +653,6 @@ def create_ctas_table(
653653
execution_params: list[str] | None = None,
654654
params: dict[str, Any] | list[str] | None = None,
655655
paramstyle: Literal["qmark", "named"] = "named",
656-
result_reuse_configuration: dict[str, Any] | None = None,
657656
boto3_session: boto3.Session | None = None,
658657
) -> dict[str, str | _QueryMetadata]:
659658
"""Create a new table populated with the results of a SELECT query.
@@ -718,8 +717,6 @@ def create_ctas_table(
718717
The syntax style to use for the parameters.
719718
Supported values are ``named`` and ``qmark``.
720719
The default is ``named``.
721-
result_reuse_configuration
722-
A structure that contains the configuration settings for reusing query results.
723720
boto3_session
724721
The default boto3 session will be used if **boto3_session** receive ``None``.
725722
@@ -835,7 +832,6 @@ def create_ctas_table(
835832
kms_key=kms_key,
836833
boto3_session=boto3_session,
837834
execution_params=execution_params,
838-
result_reuse_configuration=result_reuse_configuration,
839835
)
840836
except botocore.exceptions.ClientError as ex:
841837
error = ex.response["Error"]

tests/unit/test_athena.py

Lines changed: 53 additions & 34 deletions
Original file line numberDiff line numberDiff line change
@@ -32,47 +32,66 @@
3232
pytestmark = pytest.mark.distributed
3333

3434

35-
def test_start_query_execution_with_result_reuse_configuration(glue_database):
36-
sql = "SELECT 1"
37-
result_reuse_configuration = {"ReuseEnabled": True, "MaxAgeInMinutes": 10}
38-
query_execution_id = wr.athena.start_query_execution(
39-
sql=sql,
35+
def test_start_query_execution_with_result_reuse_configuration(path, glue_database, glue_table):
36+
df = pd.DataFrame({"c0": [0, 1], "c1": ["foo", "bar"]})
37+
wr.s3.to_parquet(
38+
df=df,
39+
path=path,
40+
dataset=True,
4041
database=glue_database,
41-
result_reuse_configuration=result_reuse_configuration,
42-
wait=False,
42+
table=glue_table,
43+
mode="overwrite",
4344
)
44-
assert isinstance(query_execution_id, str)
45-
4645

47-
def test_read_sql_query_with_result_reuse_configuration(glue_database):
48-
sql = "SELECT 1"
49-
result_reuse_configuration = {"ReuseEnabled": True, "MaxAgeInMinutes": 10}
50-
df = wr.athena.read_sql_query(
51-
sql=sql,
46+
sql = f'select * from {glue_table}'
47+
result_reuse_configuration = {
48+
"ResultReuseByAgeConfiguration": {
49+
"Enabled": True,
50+
"MaxAgeInMinutes": 1
51+
}
52+
}
53+
query_execution_result1 = wr.athena.start_query_execution(sql=sql, database=glue_database, result_reuse_configuration=result_reuse_configuration, wait=True)
54+
assert query_execution_result1["Query"] == sql
55+
assert query_execution_result1["ResultReuseConfiguration"] == result_reuse_configuration
56+
assert not query_execution_result1["Statistics"]["ResultReuseInformation"]["ReusedPreviousResult"]
57+
58+
query_execution_result2 = wr.athena.start_query_execution(sql=sql, database=glue_database, result_reuse_configuration=result_reuse_configuration, wait=True)
59+
assert query_execution_result2["Query"] == sql
60+
assert query_execution_result2["ResultReuseConfiguration"] == result_reuse_configuration
61+
assert query_execution_result2["Statistics"]["ResultReuseInformation"]["ReusedPreviousResult"]
62+
63+
def test_read_sql_query_with_result_reuse_configuration(path, glue_database, glue_table):
64+
df = pd.DataFrame({"c0": [0, 1], "c1": ["foo", "bar"]})
65+
wr.s3.to_parquet(
66+
df=df,
67+
path=path,
68+
dataset=True,
5269
database=glue_database,
53-
result_reuse_configuration=result_reuse_configuration,
70+
table=glue_table,
71+
mode="overwrite",
5472
)
55-
assert hasattr(df, "query_metadata")
5673

74+
sql = f'select * from {glue_table}'
75+
result_reuse_configuration = {
76+
"ResultReuseByAgeConfiguration": {
77+
"Enabled": True,
78+
"MaxAgeInMinutes": 1
79+
}
80+
}
81+
df1 = wr.athena.read_sql_query(sql=sql, database=glue_database, ctas_approach=False, unload_approach=False, result_reuse_configuration=result_reuse_configuration)
82+
df2 = wr.athena.read_sql_query(sql=sql, database=glue_database, ctas_approach=False, unload_approach=False, result_reuse_configuration=result_reuse_configuration)
83+
assert pandas_equals(df1, df2)
84+
assert not df1.query_metadata["Statistics"]["ResultReuseInformation"]["ReusedPreviousResult"]
85+
assert df2.query_metadata["Statistics"]["ResultReuseInformation"]["ReusedPreviousResult"]
86+
87+
def test_read_sql_query_with_result_reuse_configuration_error(glue_database):
88+
# default behavior: ctas_approach is True and unload_approach is False
89+
with pytest.raises(wr.exceptions.InvalidArgumentCombination):
90+
wr.athena.read_sql_query(sql='select 1', database=glue_database, result_reuse_configuration={"ResultReuseByAgeConfiguration": {"Enabled": True, "MaxAgeInMinutes": 1}})
5791

58-
def test_read_sql_query_with_result_reuse_configuration_returns_cached_result(glue_database):
59-
sql = "SELECT 1"
60-
result_reuse_configuration = {"ReuseEnabled": True, "MaxAgeInMinutes": 10}
61-
# First query: should run and cache
62-
df1 = wr.athena.read_sql_query(
63-
sql=sql,
64-
database=glue_database,
65-
result_reuse_configuration=result_reuse_configuration,
66-
)
67-
query_id_1 = getattr(df1, "query_metadata")["QueryExecutionId"]
68-
# Second query: should hit cache and return same query_execution_id
69-
df2 = wr.athena.read_sql_query(
70-
sql=sql,
71-
database=glue_database,
72-
result_reuse_configuration=result_reuse_configuration,
73-
)
74-
query_id_2 = getattr(df2, "query_metadata")["QueryExecutionId"]
75-
assert query_id_1 == query_id_2, "Expected cached result to return same QueryExecutionId"
92+
# ctas_approach is False and default unload_approach is False
93+
with pytest.raises(wr.exceptions.InvalidArgumentCombination):
94+
wr.athena.read_sql_query(sql='select 1', database=glue_database, ctas_approach=False, unload_approach=True, result_reuse_configuration={"ResultReuseByAgeConfiguration": {"Enabled": True, "MaxAgeInMinutes": 1}})
7695

7796

7897
def test_athena_ctas(path, path2, path3, glue_table, glue_table2, glue_database, glue_ctas_database, kms_key):

0 commit comments

Comments
 (0)