Skip to content

Commit 2cc8938

Browse files
committed
Refactoring Glue partitions pagination.
1 parent 95364c8 commit 2cc8938

File tree

1 file changed

+40
-12
lines changed

1 file changed

+40
-12
lines changed

awswrangler/catalog.py

Lines changed: 40 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -259,6 +259,7 @@ def create_parquet_table(
259259

260260
session: boto3.Session = _utils.ensure_session(session=boto3_session)
261261
cat_table_input: Optional[Dict[str, Any]] = _get_table_input(database=database, table=table, boto3_session=session)
262+
_logger.debug("cat_table_input: %s", cat_table_input)
262263
table_input: Dict[str, Any]
263264
if (cat_table_input is not None) and (mode in ("append", "overwrite_partitions")):
264265
table_input = cat_table_input
@@ -284,6 +285,7 @@ def create_parquet_table(
284285
compression=compression,
285286
)
286287
table_exist: bool = cat_table_input is not None
288+
_logger.debug("table_exist: %s", table_exist)
287289
_create_table(
288290
database=database,
289291
table=table,
@@ -1221,6 +1223,8 @@ def _create_table( # pylint: disable=too-many-branches,too-many-statements
12211223
if name in columns_comments:
12221224
par["Comment"] = columns_comments[name]
12231225

1226+
_logger.debug("table_input: %s", table_input)
1227+
12241228
session: boto3.Session = _utils.ensure_session(session=boto3_session)
12251229
client_glue: boto3.client = _utils.client(service_name="glue", session=session)
12261230
skip_archive: bool = not catalog_versioning
@@ -1229,12 +1233,16 @@ def _create_table( # pylint: disable=too-many-branches,too-many-statements
12291233
f"{mode} is not a valid mode. It must be 'overwrite', 'append' or 'overwrite_partitions'."
12301234
)
12311235
if (table_exist is True) and (mode == "overwrite"):
1236+
_logger.debug("Fetching existing partitions...")
12321237
partitions_values: List[List[str]] = list(
12331238
_get_partitions(database=database, table=table, boto3_session=session).values()
12341239
)
1240+
_logger.debug("Number of old partitions: %s", len(partitions_values))
1241+
_logger.debug("Deleting existing partitions...")
12351242
client_glue.batch_delete_partition(
12361243
DatabaseName=database, TableName=table, PartitionsToDelete=[{"Values": v} for v in partitions_values]
12371244
)
1245+
_logger.debug("Updating table...")
12381246
client_glue.update_table(DatabaseName=database, TableInput=table_input, SkipArchive=skip_archive)
12391247
elif (table_exist is True) and (mode in ("append", "overwrite_partitions", "update")):
12401248
if parameters is not None:
@@ -1514,26 +1522,46 @@ def _get_partitions(
15141522
boto3_session: Optional[boto3.Session] = None,
15151523
) -> Dict[str, List[str]]:
15161524
client_glue: boto3.client = _utils.client(service_name="glue", session=boto3_session)
1517-
paginator = client_glue.get_paginator("get_partitions")
1518-
args: Dict[str, Any] = {}
1525+
1526+
args: Dict[str, Any] = {
1527+
"DatabaseName": database,
1528+
"TableName": table,
1529+
"MaxResults": 1_000,
1530+
"Segment": {"SegmentNumber": 0, "TotalSegments": 1},
1531+
}
15191532
if expression is not None:
15201533
args["Expression"] = expression
15211534
if catalog_id is not None:
15221535
args["CatalogId"] = catalog_id
1523-
response_iterator = paginator.paginate(
1524-
DatabaseName=database, TableName=table, PaginationConfig={"PageSize": 1000}, **args
1525-
)
1536+
15261537
partitions_values: Dict[str, List[str]] = {}
1527-
for page in response_iterator:
1528-
if (page is not None) and ("Partitions" in page):
1529-
for partition in page["Partitions"]:
1530-
location: Optional[str] = partition["StorageDescriptor"].get("Location")
1531-
if location is not None:
1532-
values: List[str] = partition["Values"]
1533-
partitions_values[location] = values
1538+
_logger.debug("Starting pagination...")
1539+
1540+
response: Dict[str, Any] = client_glue.get_partitions(**args)
1541+
token: Optional[str] = _append_partitions(partitions_values=partitions_values, response=response)
1542+
while token is not None:
1543+
args["NextToken"] = response["NextToken"]
1544+
response = client_glue.get_partitions(**args)
1545+
token = _append_partitions(partitions_values=partitions_values, response=response)
1546+
1547+
_logger.debug("Pagination done.")
15341548
return partitions_values
15351549

15361550

1551+
def _append_partitions(partitions_values: Dict[str, List[str]], response: Dict[str, Any]) -> Optional[str]:
1552+
_logger.debug("response: %s", response)
1553+
token: Optional[str] = response.get("NextToken", None)
1554+
if (response is not None) and ("Partitions" in response):
1555+
for partition in response["Partitions"]:
1556+
location: Optional[str] = partition["StorageDescriptor"].get("Location")
1557+
if location is not None:
1558+
values: List[str] = partition["Values"]
1559+
partitions_values[location] = values
1560+
else:
1561+
token = None # pragma: no cover
1562+
return token
1563+
1564+
15371565
def extract_athena_types(
15381566
df: pd.DataFrame,
15391567
index: bool = False,

0 commit comments

Comments
 (0)