From 1f49b03d12c3b1120205910714368c7ccefa5bf7 Mon Sep 17 00:00:00 2001 From: David Katz <41651296+DavidKatz-il@users.noreply.github.com> Date: Thu, 16 Oct 2025 09:42:57 +0000 Subject: [PATCH 1/3] feat: add result reuse configuration to query execution functions --- awswrangler/athena/_executions.py | 4 +++ awswrangler/athena/_executions.pyi | 3 +++ awswrangler/athena/_read.py | 17 +++++++++++- awswrangler/athena/_utils.py | 8 ++++++ tests/unit/test_athena.py | 43 ++++++++++++++++++++++++++++++ 5 files changed, 74 insertions(+), 1 deletion(-) diff --git a/awswrangler/athena/_executions.py b/awswrangler/athena/_executions.py index b2d3f518a..135f826c5 100644 --- a/awswrangler/athena/_executions.py +++ b/awswrangler/athena/_executions.py @@ -40,6 +40,7 @@ def start_query_execution( kms_key: str | None = None, params: dict[str, Any] | list[str] | None = None, paramstyle: Literal["qmark", "named"] = "named", + result_reuse_configuration: dict[str, Any] | None = None, boto3_session: boto3.Session | None = None, client_request_token: str | None = None, athena_cache_settings: typing.AthenaCacheSettings | None = None, @@ -87,6 +88,8 @@ def start_query_execution( - ``named`` - ``qmark`` + result_reuse_configuration + A structure that contains the configuration settings for reusing query results. boto3_session The default boto3 session will be used if **boto3_session** receive ``None``. client_request_token @@ -156,6 +159,7 @@ def start_query_execution( encryption=encryption, kms_key=kms_key, execution_params=execution_params, + result_reuse_configuration=result_reuse_configuration, client_request_token=client_request_token, boto3_session=boto3_session, ) diff --git a/awswrangler/athena/_executions.pyi b/awswrangler/athena/_executions.pyi index 5a394d916..073e63c8d 100644 --- a/awswrangler/athena/_executions.pyi +++ b/awswrangler/athena/_executions.pyi @@ -18,6 +18,7 @@ def start_query_execution( kms_key: str | None = ..., params: dict[str, Any] | list[str] | None = ..., paramstyle: Literal["qmark", "named"] = ..., + result_reuse_configuration: dict[str, Any] | None = ..., boto3_session: boto3.Session | None = ..., athena_cache_settings: typing.AthenaCacheSettings | None = ..., athena_query_wait_polling_delay: float = ..., @@ -35,6 +36,7 @@ def start_query_execution( kms_key: str | None = ..., params: dict[str, Any] | list[str] | None = ..., paramstyle: Literal["qmark", "named"] = ..., + result_reuse_configuration: dict[str, Any] | None = ..., boto3_session: boto3.Session | None = ..., athena_cache_settings: typing.AthenaCacheSettings | None = ..., athena_query_wait_polling_delay: float = ..., @@ -52,6 +54,7 @@ def start_query_execution( kms_key: str | None = ..., params: dict[str, Any] | list[str] | None = ..., paramstyle: Literal["qmark", "named"] = ..., + result_reuse_configuration: dict[str, Any] | None = ..., boto3_session: boto3.Session | None = ..., athena_cache_settings: typing.AthenaCacheSettings | None = ..., athena_query_wait_polling_delay: float = ..., diff --git a/awswrangler/athena/_read.py b/awswrangler/athena/_read.py index 34b088a1d..3ffb822b5 100644 --- a/awswrangler/athena/_read.py +++ b/awswrangler/athena/_read.py @@ -320,6 +320,7 @@ def _resolve_query_without_cache_ctas( boto3_session: boto3.Session | None, pyarrow_additional_kwargs: dict[str, Any] | None = None, execution_params: list[str] | None = None, + result_reuse_configuration: dict[str, Any] | None = None, dtype_backend: Literal["numpy_nullable", "pyarrow"] = "numpy_nullable", ) -> pd.DataFrame | Iterator[pd.DataFrame]: ctas_query_info: dict[str, str | _QueryMetadata] = create_ctas_table( @@ -339,6 +340,7 @@ def _resolve_query_without_cache_ctas( boto3_session=boto3_session, params=execution_params, paramstyle="qmark", + result_reuse_configuration=result_reuse_configuration, ) fully_qualified_name: str = f'"{ctas_query_info["ctas_database"]}"."{ctas_query_info["ctas_table"]}"' ctas_query_metadata = cast(_QueryMetadata, ctas_query_info["ctas_query_metadata"]) @@ -378,6 +380,7 @@ def _resolve_query_without_cache_unload( boto3_session: boto3.Session | None, pyarrow_additional_kwargs: dict[str, Any] | None = None, execution_params: list[str] | None = None, + result_reuse_configuration: dict[str, Any] | None = None, dtype_backend: Literal["numpy_nullable", "pyarrow"] = "numpy_nullable", ) -> pd.DataFrame | Iterator[pd.DataFrame]: query_metadata = _unload( @@ -395,6 +398,7 @@ def _resolve_query_without_cache_unload( data_source=data_source, athena_query_wait_polling_delay=athena_query_wait_polling_delay, execution_params=execution_params, + result_reuse_configuration=result_reuse_configuration, ) if file_format == "PARQUET": return _fetch_parquet_result( @@ -427,6 +431,7 @@ def _resolve_query_without_cache_regular( s3_additional_kwargs: dict[str, Any] | None, boto3_session: boto3.Session | None, execution_params: list[str] | None = None, + result_reuse_configuration: dict[str, Any] | None = None, dtype_backend: Literal["numpy_nullable", "pyarrow"] = "numpy_nullable", client_request_token: str | None = None, ) -> pd.DataFrame | Iterator[pd.DataFrame]: @@ -444,6 +449,7 @@ def _resolve_query_without_cache_regular( encryption=encryption, kms_key=kms_key, execution_params=execution_params, + result_reuse_configuration=result_reuse_configuration, client_request_token=client_request_token, boto3_session=boto3_session, ) @@ -467,7 +473,7 @@ def _resolve_query_without_cache_regular( ) -def _resolve_query_without_cache( +def _resolve_query_without_cache( # noqa: PLR0913 sql: str, database: str, data_source: str | None, @@ -491,6 +497,7 @@ def _resolve_query_without_cache( boto3_session: boto3.Session | None, pyarrow_additional_kwargs: dict[str, Any] | None = None, execution_params: list[str] | None = None, + result_reuse_configuration: dict[str, Any] | None = None, dtype_backend: Literal["numpy_nullable", "pyarrow"] = "numpy_nullable", client_request_token: str | None = None, ) -> pd.DataFrame | Iterator[pd.DataFrame]: @@ -526,6 +533,7 @@ def _resolve_query_without_cache( boto3_session=boto3_session, pyarrow_additional_kwargs=pyarrow_additional_kwargs, execution_params=execution_params, + result_reuse_configuration=result_reuse_configuration, dtype_backend=dtype_backend, ) finally: @@ -554,6 +562,7 @@ def _resolve_query_without_cache( boto3_session=boto3_session, pyarrow_additional_kwargs=pyarrow_additional_kwargs, execution_params=execution_params, + result_reuse_configuration=result_reuse_configuration, dtype_backend=dtype_backend, ) return _resolve_query_without_cache_regular( @@ -572,6 +581,7 @@ def _resolve_query_without_cache( s3_additional_kwargs=s3_additional_kwargs, boto3_session=boto3_session, execution_params=execution_params, + result_reuse_configuration=result_reuse_configuration, dtype_backend=dtype_backend, client_request_token=client_request_token, ) @@ -592,6 +602,7 @@ def _unload( data_source: str | None, athena_query_wait_polling_delay: float, execution_params: list[str] | None, + result_reuse_configuration: dict[str, Any] | None = None, ) -> _QueryMetadata: wg_config: _WorkGroupConfig = _get_workgroup_config(session=boto3_session, workgroup=workgroup) s3_output: str = _get_s3_output(s3_output=path, wg_config=wg_config, boto3_session=boto3_session) @@ -624,6 +635,7 @@ def _unload( kms_key=kms_key, boto3_session=boto3_session, execution_params=execution_params, + result_reuse_configuration=result_reuse_configuration, ) except botocore.exceptions.ClientError as ex: msg: str = str(ex) @@ -1104,6 +1116,7 @@ def read_sql_query( boto3_session=boto3_session, pyarrow_additional_kwargs=pyarrow_additional_kwargs, execution_params=execution_params, + result_reuse_configuration=cache_info.result_reuse_configuration, dtype_backend=dtype_backend, client_request_token=client_request_token, ) @@ -1371,6 +1384,7 @@ def unload( data_source: str | None = None, params: dict[str, Any] | list[str] | None = None, paramstyle: Literal["qmark", "named"] = "named", + result_reuse_configuration: dict[str, Any] | None = None, athena_query_wait_polling_delay: float = _QUERY_WAIT_POLLING_DELAY, ) -> _QueryMetadata: """Write query results from a SELECT statement to the specified data format using UNLOAD. @@ -1459,4 +1473,5 @@ def unload( boto3_session=boto3_session, data_source=data_source, execution_params=execution_params, + result_reuse_configuration=result_reuse_configuration, ) diff --git a/awswrangler/athena/_utils.py b/awswrangler/athena/_utils.py index 7a0e74aac..c2bc7559a 100644 --- a/awswrangler/athena/_utils.py +++ b/awswrangler/athena/_utils.py @@ -86,6 +86,7 @@ def _start_query_execution( encryption: str | None = None, kms_key: str | None = None, execution_params: list[str] | None = None, + result_reuse_configuration: dict[str, Any] | None = None, client_request_token: str | None = None, boto3_session: boto3.Session | None = None, ) -> str: @@ -123,6 +124,9 @@ def _start_query_execution( if execution_params: args["ExecutionParameters"] = execution_params + if result_reuse_configuration: + args["ResultReuseConfiguration"] = result_reuse_configuration + client_athena = _utils.client(service_name="athena", session=boto3_session) _logger.debug("Starting query execution with args: \n%s", pprint.pformat(args)) response = _utils.try_it( @@ -649,6 +653,7 @@ def create_ctas_table( execution_params: list[str] | None = None, params: dict[str, Any] | list[str] | None = None, paramstyle: Literal["qmark", "named"] = "named", + result_reuse_configuration: dict[str, Any] | None = None, boto3_session: boto3.Session | None = None, ) -> dict[str, str | _QueryMetadata]: """Create a new table populated with the results of a SELECT query. @@ -713,6 +718,8 @@ def create_ctas_table( The syntax style to use for the parameters. Supported values are ``named`` and ``qmark``. The default is ``named``. + result_reuse_configuration + A structure that contains the configuration settings for reusing query results. boto3_session The default boto3 session will be used if **boto3_session** receive ``None``. @@ -828,6 +835,7 @@ def create_ctas_table( kms_key=kms_key, boto3_session=boto3_session, execution_params=execution_params, + result_reuse_configuration=result_reuse_configuration, ) except botocore.exceptions.ClientError as ex: error = ex.response["Error"] diff --git a/tests/unit/test_athena.py b/tests/unit/test_athena.py index d747ae001..285b7a1a8 100644 --- a/tests/unit/test_athena.py +++ b/tests/unit/test_athena.py @@ -32,6 +32,49 @@ pytestmark = pytest.mark.distributed +def test_start_query_execution_with_result_reuse_configuration(glue_database): + sql = "SELECT 1" + result_reuse_configuration = {"ReuseEnabled": True, "MaxAgeInMinutes": 10} + query_execution_id = wr.athena.start_query_execution( + sql=sql, + database=glue_database, + result_reuse_configuration=result_reuse_configuration, + wait=False, + ) + assert isinstance(query_execution_id, str) + + +def test_read_sql_query_with_result_reuse_configuration(glue_database): + sql = "SELECT 1" + result_reuse_configuration = {"ReuseEnabled": True, "MaxAgeInMinutes": 10} + df = wr.athena.read_sql_query( + sql=sql, + database=glue_database, + result_reuse_configuration=result_reuse_configuration, + ) + assert hasattr(df, "query_metadata") + + +def test_read_sql_query_with_result_reuse_configuration_returns_cached_result(glue_database): + sql = "SELECT 1" + result_reuse_configuration = {"ReuseEnabled": True, "MaxAgeInMinutes": 10} + # First query: should run and cache + df1 = wr.athena.read_sql_query( + sql=sql, + database=glue_database, + result_reuse_configuration=result_reuse_configuration, + ) + query_id_1 = getattr(df1, "query_metadata")["QueryExecutionId"] + # Second query: should hit cache and return same query_execution_id + df2 = wr.athena.read_sql_query( + sql=sql, + database=glue_database, + result_reuse_configuration=result_reuse_configuration, + ) + query_id_2 = getattr(df2, "query_metadata")["QueryExecutionId"] + assert query_id_1 == query_id_2, "Expected cached result to return same QueryExecutionId" + + def test_athena_ctas(path, path2, path3, glue_table, glue_table2, glue_database, glue_ctas_database, kms_key): df = get_df_list() columns_types, partitions_types = wr.catalog.extract_athena_types(df=df, partition_cols=["par0", "par1"]) From 9da6c5e34561e450b8f0e5c8323165961d787913 Mon Sep 17 00:00:00 2001 From: David Katz <41651296+DavidKatz-il@users.noreply.github.com> Date: Thu, 16 Oct 2025 12:53:14 +0000 Subject: [PATCH 2/3] reuse query results is not suported for CTAS and UNLOAD --- awswrangler/athena/_executions.py | 1 + awswrangler/athena/_read.py | 21 ++++---- awswrangler/athena/_utils.py | 4 -- tests/unit/test_athena.py | 87 +++++++++++++++++++------------ 4 files changed, 64 insertions(+), 49 deletions(-) diff --git a/awswrangler/athena/_executions.py b/awswrangler/athena/_executions.py index 135f826c5..f3297771e 100644 --- a/awswrangler/athena/_executions.py +++ b/awswrangler/athena/_executions.py @@ -90,6 +90,7 @@ def start_query_execution( - ``qmark`` result_reuse_configuration A structure that contains the configuration settings for reusing query results. + See also: https://docs.aws.amazon.com/athena/latest/ug/reusing-query-results.html boto3_session The default boto3 session will be used if **boto3_session** receive ``None``. client_request_token diff --git a/awswrangler/athena/_read.py b/awswrangler/athena/_read.py index 3ffb822b5..6d05fa35d 100644 --- a/awswrangler/athena/_read.py +++ b/awswrangler/athena/_read.py @@ -320,7 +320,6 @@ def _resolve_query_without_cache_ctas( boto3_session: boto3.Session | None, pyarrow_additional_kwargs: dict[str, Any] | None = None, execution_params: list[str] | None = None, - result_reuse_configuration: dict[str, Any] | None = None, dtype_backend: Literal["numpy_nullable", "pyarrow"] = "numpy_nullable", ) -> pd.DataFrame | Iterator[pd.DataFrame]: ctas_query_info: dict[str, str | _QueryMetadata] = create_ctas_table( @@ -340,7 +339,6 @@ def _resolve_query_without_cache_ctas( boto3_session=boto3_session, params=execution_params, paramstyle="qmark", - result_reuse_configuration=result_reuse_configuration, ) fully_qualified_name: str = f'"{ctas_query_info["ctas_database"]}"."{ctas_query_info["ctas_table"]}"' ctas_query_metadata = cast(_QueryMetadata, ctas_query_info["ctas_query_metadata"]) @@ -380,7 +378,6 @@ def _resolve_query_without_cache_unload( boto3_session: boto3.Session | None, pyarrow_additional_kwargs: dict[str, Any] | None = None, execution_params: list[str] | None = None, - result_reuse_configuration: dict[str, Any] | None = None, dtype_backend: Literal["numpy_nullable", "pyarrow"] = "numpy_nullable", ) -> pd.DataFrame | Iterator[pd.DataFrame]: query_metadata = _unload( @@ -398,7 +395,6 @@ def _resolve_query_without_cache_unload( data_source=data_source, athena_query_wait_polling_delay=athena_query_wait_polling_delay, execution_params=execution_params, - result_reuse_configuration=result_reuse_configuration, ) if file_format == "PARQUET": return _fetch_parquet_result( @@ -533,7 +529,6 @@ def _resolve_query_without_cache( # noqa: PLR0913 boto3_session=boto3_session, pyarrow_additional_kwargs=pyarrow_additional_kwargs, execution_params=execution_params, - result_reuse_configuration=result_reuse_configuration, dtype_backend=dtype_backend, ) finally: @@ -562,7 +557,6 @@ def _resolve_query_without_cache( # noqa: PLR0913 boto3_session=boto3_session, pyarrow_additional_kwargs=pyarrow_additional_kwargs, execution_params=execution_params, - result_reuse_configuration=result_reuse_configuration, dtype_backend=dtype_backend, ) return _resolve_query_without_cache_regular( @@ -602,7 +596,6 @@ def _unload( data_source: str | None, athena_query_wait_polling_delay: float, execution_params: list[str] | None, - result_reuse_configuration: dict[str, Any] | None = None, ) -> _QueryMetadata: wg_config: _WorkGroupConfig = _get_workgroup_config(session=boto3_session, workgroup=workgroup) s3_output: str = _get_s3_output(s3_output=path, wg_config=wg_config, boto3_session=boto3_session) @@ -635,7 +628,6 @@ def _unload( kms_key=kms_key, boto3_session=boto3_session, execution_params=execution_params, - result_reuse_configuration=result_reuse_configuration, ) except botocore.exceptions.ClientError as ex: msg: str = str(ex) @@ -797,6 +789,7 @@ def read_sql_query( athena_query_wait_polling_delay: float = _QUERY_WAIT_POLLING_DELAY, params: dict[str, Any] | list[str] | None = None, paramstyle: Literal["qmark", "named"] = "named", + result_reuse_configuration: dict[str, Any] | None = None, dtype_backend: Literal["numpy_nullable", "pyarrow"] = "numpy_nullable", s3_additional_kwargs: dict[str, Any] | None = None, pyarrow_additional_kwargs: dict[str, Any] | None = None, @@ -992,6 +985,10 @@ def read_sql_query( - ``named`` - ``qmark`` + result_reuse_configuration + A structure that contains the configuration settings for reusing query results. + This parameter is only valid when both `ctas_approach` and `unload_approach` are set to `False`. + See also: https://docs.aws.amazon.com/athena/latest/ug/reusing-query-results.html dtype_backend Which dtype_backend to use, e.g. whether a DataFrame should have NumPy arrays, nullable dtypes are used for all dtypes that have a nullable implementation when @@ -1052,6 +1049,10 @@ def read_sql_query( raise exceptions.InvalidArgumentCombination( "Using `client_request_token` is only allowed when `ctas_approach=False` and `unload_approach=False`." ) + if result_reuse_configuration and (ctas_approach or unload_approach): + raise exceptions.InvalidArgumentCombination( + "Using `result_reuse_configuration` is only allowed when `ctas_approach=False` and `unload_approach=False`." + ) chunksize = sys.maxsize if ctas_approach is False and chunksize is True else chunksize # Substitute query parameters if applicable @@ -1116,7 +1117,7 @@ def read_sql_query( boto3_session=boto3_session, pyarrow_additional_kwargs=pyarrow_additional_kwargs, execution_params=execution_params, - result_reuse_configuration=cache_info.result_reuse_configuration, + result_reuse_configuration=result_reuse_configuration, dtype_backend=dtype_backend, client_request_token=client_request_token, ) @@ -1384,7 +1385,6 @@ def unload( data_source: str | None = None, params: dict[str, Any] | list[str] | None = None, paramstyle: Literal["qmark", "named"] = "named", - result_reuse_configuration: dict[str, Any] | None = None, athena_query_wait_polling_delay: float = _QUERY_WAIT_POLLING_DELAY, ) -> _QueryMetadata: """Write query results from a SELECT statement to the specified data format using UNLOAD. @@ -1473,5 +1473,4 @@ def unload( boto3_session=boto3_session, data_source=data_source, execution_params=execution_params, - result_reuse_configuration=result_reuse_configuration, ) diff --git a/awswrangler/athena/_utils.py b/awswrangler/athena/_utils.py index c2bc7559a..7ff7589cc 100644 --- a/awswrangler/athena/_utils.py +++ b/awswrangler/athena/_utils.py @@ -653,7 +653,6 @@ def create_ctas_table( execution_params: list[str] | None = None, params: dict[str, Any] | list[str] | None = None, paramstyle: Literal["qmark", "named"] = "named", - result_reuse_configuration: dict[str, Any] | None = None, boto3_session: boto3.Session | None = None, ) -> dict[str, str | _QueryMetadata]: """Create a new table populated with the results of a SELECT query. @@ -718,8 +717,6 @@ def create_ctas_table( The syntax style to use for the parameters. Supported values are ``named`` and ``qmark``. The default is ``named``. - result_reuse_configuration - A structure that contains the configuration settings for reusing query results. boto3_session The default boto3 session will be used if **boto3_session** receive ``None``. @@ -835,7 +832,6 @@ def create_ctas_table( kms_key=kms_key, boto3_session=boto3_session, execution_params=execution_params, - result_reuse_configuration=result_reuse_configuration, ) except botocore.exceptions.ClientError as ex: error = ex.response["Error"] diff --git a/tests/unit/test_athena.py b/tests/unit/test_athena.py index 285b7a1a8..2cdd62bcb 100644 --- a/tests/unit/test_athena.py +++ b/tests/unit/test_athena.py @@ -32,47 +32,66 @@ pytestmark = pytest.mark.distributed -def test_start_query_execution_with_result_reuse_configuration(glue_database): - sql = "SELECT 1" - result_reuse_configuration = {"ReuseEnabled": True, "MaxAgeInMinutes": 10} - query_execution_id = wr.athena.start_query_execution( - sql=sql, +def test_start_query_execution_with_result_reuse_configuration(path, glue_database, glue_table): + df = pd.DataFrame({"c0": [0, 1], "c1": ["foo", "bar"]}) + wr.s3.to_parquet( + df=df, + path=path, + dataset=True, database=glue_database, - result_reuse_configuration=result_reuse_configuration, - wait=False, + table=glue_table, + mode="overwrite", ) - assert isinstance(query_execution_id, str) - -def test_read_sql_query_with_result_reuse_configuration(glue_database): - sql = "SELECT 1" - result_reuse_configuration = {"ReuseEnabled": True, "MaxAgeInMinutes": 10} - df = wr.athena.read_sql_query( - sql=sql, + sql = f'select * from {glue_table}' + result_reuse_configuration = { + "ResultReuseByAgeConfiguration": { + "Enabled": True, + "MaxAgeInMinutes": 1 + } + } + query_execution_result1 = wr.athena.start_query_execution(sql=sql, database=glue_database, result_reuse_configuration=result_reuse_configuration, wait=True) + assert query_execution_result1["Query"] == sql + assert query_execution_result1["ResultReuseConfiguration"] == result_reuse_configuration + assert not query_execution_result1["Statistics"]["ResultReuseInformation"]["ReusedPreviousResult"] + + query_execution_result2 = wr.athena.start_query_execution(sql=sql, database=glue_database, result_reuse_configuration=result_reuse_configuration, wait=True) + assert query_execution_result2["Query"] == sql + assert query_execution_result2["ResultReuseConfiguration"] == result_reuse_configuration + assert query_execution_result2["Statistics"]["ResultReuseInformation"]["ReusedPreviousResult"] + +def test_read_sql_query_with_result_reuse_configuration(path, glue_database, glue_table): + df = pd.DataFrame({"c0": [0, 1], "c1": ["foo", "bar"]}) + wr.s3.to_parquet( + df=df, + path=path, + dataset=True, database=glue_database, - result_reuse_configuration=result_reuse_configuration, + table=glue_table, + mode="overwrite", ) - assert hasattr(df, "query_metadata") + sql = f'select * from {glue_table}' + result_reuse_configuration = { + "ResultReuseByAgeConfiguration": { + "Enabled": True, + "MaxAgeInMinutes": 1 + } + } + df1 = wr.athena.read_sql_query(sql=sql, database=glue_database, ctas_approach=False, unload_approach=False, result_reuse_configuration=result_reuse_configuration) + df2 = wr.athena.read_sql_query(sql=sql, database=glue_database, ctas_approach=False, unload_approach=False, result_reuse_configuration=result_reuse_configuration) + assert pandas_equals(df1, df2) + assert not df1.query_metadata["Statistics"]["ResultReuseInformation"]["ReusedPreviousResult"] + assert df2.query_metadata["Statistics"]["ResultReuseInformation"]["ReusedPreviousResult"] + +def test_read_sql_query_with_result_reuse_configuration_error(glue_database): + # default behavior: ctas_approach is True and unload_approach is False + with pytest.raises(wr.exceptions.InvalidArgumentCombination): + wr.athena.read_sql_query(sql='select 1', database=glue_database, result_reuse_configuration={"ResultReuseByAgeConfiguration": {"Enabled": True, "MaxAgeInMinutes": 1}}) -def test_read_sql_query_with_result_reuse_configuration_returns_cached_result(glue_database): - sql = "SELECT 1" - result_reuse_configuration = {"ReuseEnabled": True, "MaxAgeInMinutes": 10} - # First query: should run and cache - df1 = wr.athena.read_sql_query( - sql=sql, - database=glue_database, - result_reuse_configuration=result_reuse_configuration, - ) - query_id_1 = getattr(df1, "query_metadata")["QueryExecutionId"] - # Second query: should hit cache and return same query_execution_id - df2 = wr.athena.read_sql_query( - sql=sql, - database=glue_database, - result_reuse_configuration=result_reuse_configuration, - ) - query_id_2 = getattr(df2, "query_metadata")["QueryExecutionId"] - assert query_id_1 == query_id_2, "Expected cached result to return same QueryExecutionId" + # ctas_approach is False and default unload_approach is False + with pytest.raises(wr.exceptions.InvalidArgumentCombination): + wr.athena.read_sql_query(sql='select 1', database=glue_database, ctas_approach=False, unload_approach=True, result_reuse_configuration={"ResultReuseByAgeConfiguration": {"Enabled": True, "MaxAgeInMinutes": 1}}) def test_athena_ctas(path, path2, path3, glue_table, glue_table2, glue_database, glue_ctas_database, kms_key): From cfa86ee70f25dd1e83d4f81cc47a17083c5115a1 Mon Sep 17 00:00:00 2001 From: David Katz <41651296+DavidKatz-il@users.noreply.github.com> Date: Thu, 16 Oct 2025 13:53:19 +0000 Subject: [PATCH 3/3] run fix --- tests/unit/test_athena.py | 60 +++++++++++++++++++++++++-------------- 1 file changed, 39 insertions(+), 21 deletions(-) diff --git a/tests/unit/test_athena.py b/tests/unit/test_athena.py index 2cdd62bcb..47e444179 100644 --- a/tests/unit/test_athena.py +++ b/tests/unit/test_athena.py @@ -43,23 +43,23 @@ def test_start_query_execution_with_result_reuse_configuration(path, glue_databa mode="overwrite", ) - sql = f'select * from {glue_table}' - result_reuse_configuration = { - "ResultReuseByAgeConfiguration": { - "Enabled": True, - "MaxAgeInMinutes": 1 - } - } - query_execution_result1 = wr.athena.start_query_execution(sql=sql, database=glue_database, result_reuse_configuration=result_reuse_configuration, wait=True) + sql = f"select * from {glue_table}" + result_reuse_configuration = {"ResultReuseByAgeConfiguration": {"Enabled": True, "MaxAgeInMinutes": 1}} + query_execution_result1 = wr.athena.start_query_execution( + sql=sql, database=glue_database, result_reuse_configuration=result_reuse_configuration, wait=True + ) assert query_execution_result1["Query"] == sql assert query_execution_result1["ResultReuseConfiguration"] == result_reuse_configuration assert not query_execution_result1["Statistics"]["ResultReuseInformation"]["ReusedPreviousResult"] - - query_execution_result2 = wr.athena.start_query_execution(sql=sql, database=glue_database, result_reuse_configuration=result_reuse_configuration, wait=True) + + query_execution_result2 = wr.athena.start_query_execution( + sql=sql, database=glue_database, result_reuse_configuration=result_reuse_configuration, wait=True + ) assert query_execution_result2["Query"] == sql assert query_execution_result2["ResultReuseConfiguration"] == result_reuse_configuration assert query_execution_result2["Statistics"]["ResultReuseInformation"]["ReusedPreviousResult"] + def test_read_sql_query_with_result_reuse_configuration(path, glue_database, glue_table): df = pd.DataFrame({"c0": [0, 1], "c1": ["foo", "bar"]}) wr.s3.to_parquet( @@ -71,27 +71,45 @@ def test_read_sql_query_with_result_reuse_configuration(path, glue_database, glu mode="overwrite", ) - sql = f'select * from {glue_table}' - result_reuse_configuration = { - "ResultReuseByAgeConfiguration": { - "Enabled": True, - "MaxAgeInMinutes": 1 - } - } - df1 = wr.athena.read_sql_query(sql=sql, database=glue_database, ctas_approach=False, unload_approach=False, result_reuse_configuration=result_reuse_configuration) - df2 = wr.athena.read_sql_query(sql=sql, database=glue_database, ctas_approach=False, unload_approach=False, result_reuse_configuration=result_reuse_configuration) + sql = f"select * from {glue_table}" + result_reuse_configuration = {"ResultReuseByAgeConfiguration": {"Enabled": True, "MaxAgeInMinutes": 1}} + df1 = wr.athena.read_sql_query( + sql=sql, + database=glue_database, + ctas_approach=False, + unload_approach=False, + result_reuse_configuration=result_reuse_configuration, + ) + df2 = wr.athena.read_sql_query( + sql=sql, + database=glue_database, + ctas_approach=False, + unload_approach=False, + result_reuse_configuration=result_reuse_configuration, + ) assert pandas_equals(df1, df2) assert not df1.query_metadata["Statistics"]["ResultReuseInformation"]["ReusedPreviousResult"] assert df2.query_metadata["Statistics"]["ResultReuseInformation"]["ReusedPreviousResult"] + def test_read_sql_query_with_result_reuse_configuration_error(glue_database): # default behavior: ctas_approach is True and unload_approach is False with pytest.raises(wr.exceptions.InvalidArgumentCombination): - wr.athena.read_sql_query(sql='select 1', database=glue_database, result_reuse_configuration={"ResultReuseByAgeConfiguration": {"Enabled": True, "MaxAgeInMinutes": 1}}) + wr.athena.read_sql_query( + sql="select 1", + database=glue_database, + result_reuse_configuration={"ResultReuseByAgeConfiguration": {"Enabled": True, "MaxAgeInMinutes": 1}}, + ) # ctas_approach is False and default unload_approach is False with pytest.raises(wr.exceptions.InvalidArgumentCombination): - wr.athena.read_sql_query(sql='select 1', database=glue_database, ctas_approach=False, unload_approach=True, result_reuse_configuration={"ResultReuseByAgeConfiguration": {"Enabled": True, "MaxAgeInMinutes": 1}}) + wr.athena.read_sql_query( + sql="select 1", + database=glue_database, + ctas_approach=False, + unload_approach=True, + result_reuse_configuration={"ResultReuseByAgeConfiguration": {"Enabled": True, "MaxAgeInMinutes": 1}}, + ) def test_athena_ctas(path, path2, path3, glue_table, glue_table2, glue_database, glue_ctas_database, kms_key):