Skip to content

Commit 462d689

Browse files
committed
Fixing small bugs on Athena Cache
1 parent 19f06df commit 462d689

File tree

1 file changed

+11
-4
lines changed

1 file changed

+11
-4
lines changed

awswrangler/athena.py

Lines changed: 11 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -374,8 +374,7 @@ def _fix_csv_types(df: pd.DataFrame, parse_dates: List[str], binaries: List[str]
374374
return df
375375

376376

377-
# pylint: disable=too-many-branches,too-many-locals,too-many-return-statements,too-many-statements,broad-except
378-
def read_sql_query(
377+
def read_sql_query( # pylint: disable=too-many-branches,too-many-locals,too-many-return-statements,too-many-statements
379378
sql: str,
380379
database: str,
381380
ctas_approach: bool = True,
@@ -519,6 +518,7 @@ def read_sql_query(
519518
use_threads=use_threads,
520519
session=session,
521520
)
521+
# pylint: disable=broad-except
522522
except Exception as e: # pragma: no cover
523523
_logger.error(e)
524524
# if there is anything wrong with the cache, just fallback to the usual path
@@ -839,6 +839,7 @@ def read_sql_table(
839839
use_threads: bool = True,
840840
boto3_session: Optional[boto3.Session] = None,
841841
max_cache_seconds: int = 0,
842+
max_cache_query_inspections: int = 50,
842843
) -> Union[pd.DataFrame, Iterator[pd.DataFrame]]:
843844
"""Extract the full table AWS Athena and return the results as a Pandas DataFrame.
844845
@@ -929,6 +930,10 @@ def read_sql_table(
929930
If cached results are valid, wrangler ignores the `ctas_approach`, `s3_output`, `encryption`, `kms_key`,
930931
`keep_files` and `ctas_temp_table_name` params.
931932
If reading cached data fails for any reason, execution falls back to the usual query run path.
933+
max_cache_query_inspections : int
934+
Max number of queries that will be inspected from the history to try to find some result to reuse.
935+
The bigger the number of inspection, the bigger will be the latency for not cached queries.
936+
Only takes effect if max_cache_seconds > 0.
932937
933938
Returns
934939
-------
@@ -957,6 +962,7 @@ def read_sql_table(
957962
use_threads=use_threads,
958963
boto3_session=boto3_session,
959964
max_cache_seconds=max_cache_seconds,
965+
max_cache_query_inspections=max_cache_query_inspections,
960966
)
961967

962968

@@ -975,7 +981,7 @@ def _get_last_query_executions(
975981
args: Dict[str, str] = {}
976982
if workgroup is not None:
977983
args["WorkGroup"] = workgroup
978-
paginator = client_athena.get_paginator("get_query_results")
984+
paginator = client_athena.get_paginator("list_query_executions")
979985
for page in paginator.paginate(**args):
980986
query_execution_id_list: List[str] = page["QueryExecutionIds"]
981987
execution_data = client_athena.batch_get_query_execution(QueryExecutionIds=query_execution_id_list)
@@ -1019,9 +1025,10 @@ def _check_for_cached_results(
10191025
"""
10201026
num_executions_inspected: int = 0
10211027
if max_cache_seconds > 0: # pylint: disable=too-many-nested-blocks
1028+
current_timestamp = datetime.datetime.now(datetime.timezone.utc)
1029+
print(current_timestamp)
10221030
for query_executions in _get_last_query_executions(boto3_session=session, workgroup=workgroup):
10231031
cached_queries: List[Dict[str, Any]] = _sort_successful_executions_data(query_executions=query_executions)
1024-
current_timestamp = datetime.datetime.utcnow()
10251032
comparable_sql: str = _prepare_query_string_for_comparison(sql)
10261033

10271034
# this could be mapreduced, but it is only 50 items long, tops

0 commit comments

Comments
 (0)