Skip to content

Commit 3e49124

Browse files
Add get_query_executions generating DataFrames from Athena query executions detail (#1676)
* Add get_query_executions generating DataFrames from Athena query executions detail Co-authored-by: Lucas Hanson <[email protected]>
1 parent c5ac479 commit 3e49124

File tree

4 files changed

+126
-0
lines changed

4 files changed

+126
-0
lines changed

awswrangler/athena/__init__.py

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -9,7 +9,9 @@
99
get_named_query_statement,
1010
get_query_columns_types,
1111
get_query_execution,
12+
get_query_executions,
1213
get_work_group,
14+
list_query_executions,
1315
repair_table,
1416
show_create_table,
1517
start_query_execution,
@@ -24,10 +26,12 @@
2426
"describe_table",
2527
"get_query_columns_types",
2628
"get_query_execution",
29+
"get_query_executions",
2730
"get_query_results",
2831
"get_named_query_statement",
2932
"get_work_group",
3033
"generate_create_query",
34+
"list_query_executions",
3135
"repair_table",
3236
"create_ctas_table",
3337
"show_create_table",

awswrangler/athena/_utils.py

Lines changed: 101 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1144,3 +1144,104 @@ def get_query_execution(query_execution_id: str, boto3_session: Optional[boto3.S
11441144
QueryExecutionId=query_execution_id,
11451145
)
11461146
return cast(Dict[str, Any], response["QueryExecution"])
1147+
1148+
1149+
def get_query_executions(
1150+
query_execution_ids: List[str], return_unprocessed: bool = False, boto3_session: Optional[boto3.Session] = None
1151+
) -> Union[Tuple[pd.DataFrame, pd.DataFrame], pd.DataFrame]:
1152+
"""From specified query execution IDs, return a DataFrame of query execution details.
1153+
1154+
https://boto3.amazonaws.com/v1/documentation/api/latest/reference/services/athena.html#Athena.Client.batch_get_query_execution
1155+
1156+
Parameters
1157+
----------
1158+
query_execution_ids : List[str]
1159+
Athena query execution IDs.
1160+
return_unprocessed: bool.
1161+
True to also return query executions id that are unable to be processed.
1162+
False to only return DataFrame of query execution details.
1163+
Default is False
1164+
boto3_session : boto3.Session(), optional
1165+
Boto3 Session. The default boto3 session will be used if boto3_session receive None.
1166+
1167+
Returns
1168+
-------
1169+
DataFrame
1170+
DataFrame contain information about query execution details.
1171+
1172+
DataFrame
1173+
DataFrame contain information about unprocessed query execution ids.
1174+
1175+
Examples
1176+
--------
1177+
>>> import awswrangler as wr
1178+
>>> query_executions_df, unprocessed_query_executions_df = wr.athena.get_query_executions(
1179+
query_execution_ids=['query-execution-id','query-execution-id1']
1180+
)
1181+
"""
1182+
chunked_size: int = 50
1183+
query_executions: List[Dict[str, Any]] = []
1184+
unprocessed_query_execution: List[Dict[str, str]] = []
1185+
client_athena: boto3.client = _utils.client(service_name="athena", session=boto3_session)
1186+
for i in range(0, len(query_execution_ids), chunked_size):
1187+
response = client_athena.batch_get_query_execution(QueryExecutionIds=query_execution_ids[i : i + chunked_size])
1188+
query_executions += response["QueryExecutions"]
1189+
unprocessed_query_execution += response["UnprocessedQueryExecutionIds"]
1190+
if unprocessed_query_execution and not return_unprocessed:
1191+
_logger.warning(
1192+
"Some of query execution ids are unable to be processed."
1193+
"Set return_unprocessed to True to get unprocessed query execution ids"
1194+
)
1195+
if return_unprocessed:
1196+
return pd.json_normalize(query_executions), pd.json_normalize(unprocessed_query_execution)
1197+
return pd.json_normalize(query_executions)
1198+
1199+
1200+
def list_query_executions(workgroup: Optional[str] = None, boto3_session: Optional[boto3.Session] = None) -> List[str]:
1201+
"""Fetch list query execution IDs ran in specified workgroup or primary work group if not specified.
1202+
1203+
https://boto3.amazonaws.com/v1/documentation/api/latest/reference/services/athena.html#Athena.Client.list_query_executions
1204+
1205+
Parameters
1206+
----------
1207+
workgroup : str
1208+
The name of the workgroup from which the query_id are being returned.
1209+
If not specified, a list of available query execution IDs for the queries in the primary workgroup is returned.
1210+
boto3_session : boto3.Session(), optional
1211+
Boto3 Session. The default boto3 session will be used if boto3_session receive None.
1212+
1213+
Returns
1214+
-------
1215+
List[str]
1216+
List of query execution IDs.
1217+
1218+
Examples
1219+
--------
1220+
>>> import awswrangler as wr
1221+
>>> res = wr.athena.list_query_executions(workgroup='workgroup-name')
1222+
1223+
"""
1224+
client_athena: boto3.client = _utils.client(service_name="athena", session=boto3_session)
1225+
kwargs: Dict[str, Any] = {"base": 1}
1226+
if workgroup:
1227+
kwargs["WorkGroup"] = workgroup
1228+
query_list: List[str] = []
1229+
response: Dict[str, Any] = _utils.try_it(
1230+
f=client_athena.list_query_executions,
1231+
ex=botocore.exceptions.ClientError,
1232+
ex_code="ThrottlingException",
1233+
max_num_tries=5,
1234+
**kwargs,
1235+
)
1236+
query_list += response["QueryExecutionIds"]
1237+
while "NextToken" in response:
1238+
kwargs["NextToken"] = response["NextToken"]
1239+
response = _utils.try_it(
1240+
f=client_athena.list_query_executions,
1241+
ex=botocore.exceptions.ClientError,
1242+
ex_code="ThrottlingException",
1243+
max_num_tries=5,
1244+
**kwargs,
1245+
)
1246+
query_list += response["QueryExecutionIds"]
1247+
return query_list

docs/source/api.rst

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -119,9 +119,11 @@ Amazon Athena
119119
generate_create_query
120120
get_query_columns_types
121121
get_query_execution
122+
get_query_executions
122123
get_query_results
123124
get_named_query_statement
124125
get_work_group
126+
list_query_executions
125127
read_sql_query
126128
read_sql_table
127129
repair_table

tests/test_athena.py

Lines changed: 19 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1233,3 +1233,22 @@ def test_athena_generate_create_query(path, glue_database, glue_table):
12331233
)
12341234
wr.athena.start_query_execution(sql=query, database=glue_database, wait=True)
12351235
assert query == wr.athena.generate_create_query(database=glue_database, table=glue_table)
1236+
1237+
1238+
def test_get_query_execution(workgroup0, workgroup1):
1239+
query_execution_ids = wr.athena.list_query_executions(workgroup=workgroup0) + wr.athena.list_query_executions(
1240+
workgroup=workgroup1
1241+
)
1242+
assert query_execution_ids
1243+
query_execution_detail = wr.athena.get_query_execution(query_execution_id=query_execution_ids[0])
1244+
query_executions_df = wr.athena.get_query_executions(query_execution_ids)
1245+
assert isinstance(query_executions_df, pd.DataFrame)
1246+
assert isinstance(query_execution_detail, dict)
1247+
assert set(query_execution_ids).intersection(set(query_executions_df["QueryExecutionId"].values.tolist()))
1248+
query_execution_ids1 = query_execution_ids + ["aaa", "bbb"]
1249+
query_executions_df, unprocessed_query_executions_df = wr.athena.get_query_executions(
1250+
query_execution_ids1, return_unprocessed=True
1251+
)
1252+
assert isinstance(unprocessed_query_executions_df, pd.DataFrame)
1253+
assert set(query_execution_ids).intersection(set(query_executions_df["QueryExecutionId"].values.tolist()))
1254+
assert {"aaa", "bbb"}.intersection(set(unprocessed_query_executions_df["QueryExecutionId"].values.tolist()))

0 commit comments

Comments
 (0)