Skip to content

Commit ec85502

Browse files
committed
Add support to Partition Projection. 🚀
1 parent 5e3a533 commit ec85502

File tree

5 files changed

+439
-33
lines changed

5 files changed

+439
-33
lines changed

.pylintrc

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -331,7 +331,7 @@ indent-string=' '
331331
max-line-length=120
332332

333333
# Maximum number of lines in a module.
334-
max-module-lines=2500
334+
max-module-lines=4000
335335

336336
# List of optional constructs for which whitespace checking is disabled. `dict-
337337
# separator` is used to allow tabulation in dicts, etc.: {1 : 1,\n222: 2}.
@@ -547,7 +547,7 @@ valid-metaclass-classmethod-first-arg=cls
547547
[DESIGN]
548548

549549
# Maximum number of arguments for function / method.
550-
max-args=15
550+
max-args=20
551551

552552
# Maximum number of attributes for a class (see R0902).
553553
max-attributes=7
@@ -556,10 +556,10 @@ max-attributes=7
556556
max-bool-expr=5
557557

558558
# Maximum number of branch for function / method body.
559-
max-branches=15
559+
max-branches=20
560560

561561
# Maximum number of locals for function / method body.
562-
max-locals=30
562+
max-locals=35
563563

564564
# Maximum number of parents for a class (see R0901).
565565
max-parents=7

awswrangler/catalog.py

Lines changed: 123 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -94,6 +94,12 @@ def create_parquet_table(
9494
columns_comments: Optional[Dict[str, str]] = None,
9595
mode: str = "overwrite",
9696
catalog_versioning: bool = False,
97+
projection_enabled: bool = False,
98+
projection_types: Optional[Dict[str, str]] = None,
99+
projection_ranges: Optional[Dict[str, str]] = None,
100+
projection_values: Optional[Dict[str, str]] = None,
101+
projection_intervals: Optional[Dict[str, str]] = None,
102+
projection_digits: Optional[Dict[str, str]] = None,
97103
boto3_session: Optional[boto3.Session] = None,
98104
) -> None:
99105
"""Create a Parquet Table (Metadata Only) in the AWS Glue Catalog.
@@ -124,6 +130,29 @@ def create_parquet_table(
124130
'overwrite' to recreate any possible existing table or 'append' to keep any possible existing table.
125131
catalog_versioning : bool
126132
If True and `mode="overwrite"`, creates an archived version of the table catalog before updating it.
133+
projection_enabled : bool
134+
Enable Partition Projection on Athena (https://docs.aws.amazon.com/athena/latest/ug/partition-projection.html)
135+
projection_types : Optional[Dict[str, str]]
136+
Dictionary of partitions names and Athena projections types.
137+
Valid types: "enum", "integer", "date", "injected"
138+
https://docs.aws.amazon.com/athena/latest/ug/partition-projection-supported-types.html
139+
(e.g. {'col_name': 'enum', 'col2_name': 'integer'})
140+
projection_ranges: Optional[Dict[str, str]]
141+
Dictionary of partitions names and Athena projections ranges.
142+
https://docs.aws.amazon.com/athena/latest/ug/partition-projection-supported-types.html
143+
(e.g. {'col_name': '0,10', 'col2_name': '-1,8675309'})
144+
projection_values: Optional[Dict[str, str]]
145+
Dictionary of partitions names and Athena projections values.
146+
https://docs.aws.amazon.com/athena/latest/ug/partition-projection-supported-types.html
147+
(e.g. {'col_name': 'A,B,Unknown', 'col2_name': 'foo,boo,bar'})
148+
projection_intervals: Optional[Dict[str, str]]
149+
Dictionary of partitions names and Athena projections intervals.
150+
https://docs.aws.amazon.com/athena/latest/ug/partition-projection-supported-types.html
151+
(e.g. {'col_name': '1', 'col2_name': '5'})
152+
projection_digits: Optional[Dict[str, str]]
153+
Dictionary of partitions names and Athena projections digits.
154+
https://docs.aws.amazon.com/athena/latest/ug/partition-projection-supported-types.html
155+
(e.g. {'col_name': '1', 'col2_name': '2'})
127156
boto3_session : boto3.Session(), optional
128157
Boto3 Session. The default boto3 session will be used if boto3_session receive None.
129158
@@ -150,6 +179,7 @@ def create_parquet_table(
150179
"""
151180
table = sanitize_table_name(table=table)
152181
partitions_types = {} if partitions_types is None else partitions_types
182+
153183
session: boto3.Session = _utils.ensure_session(session=boto3_session)
154184
cat_table_input: Optional[Dict[str, Any]] = _get_table_input(database=database, table=table, boto3_session=session)
155185
table_input: Dict[str, Any]
@@ -188,6 +218,13 @@ def create_parquet_table(
188218
boto3_session=session,
189219
table_input=table_input,
190220
table_exist=table_exist,
221+
partitions_types=partitions_types,
222+
projection_enabled=projection_enabled,
223+
projection_types=projection_types,
224+
projection_ranges=projection_ranges,
225+
projection_values=projection_values,
226+
projection_intervals=projection_intervals,
227+
projection_digits=projection_digits,
191228
)
192229

193230

@@ -903,6 +940,12 @@ def create_csv_table(
903940
catalog_versioning: bool = False,
904941
sep: str = ",",
905942
boto3_session: Optional[boto3.Session] = None,
943+
projection_enabled: bool = False,
944+
projection_types: Optional[Dict[str, str]] = None,
945+
projection_ranges: Optional[Dict[str, str]] = None,
946+
projection_values: Optional[Dict[str, str]] = None,
947+
projection_intervals: Optional[Dict[str, str]] = None,
948+
projection_digits: Optional[Dict[str, str]] = None,
906949
) -> None:
907950
"""Create a CSV Table (Metadata Only) in the AWS Glue Catalog.
908951
@@ -934,6 +977,29 @@ def create_csv_table(
934977
If True and `mode="overwrite"`, creates an archived version of the table catalog before updating it.
935978
sep : str
936979
String of length 1. Field delimiter for the output file.
980+
projection_enabled : bool
981+
Enable Partition Projection on Athena (https://docs.aws.amazon.com/athena/latest/ug/partition-projection.html)
982+
projection_types : Optional[Dict[str, str]]
983+
Dictionary of partitions names and Athena projections types.
984+
Valid types: "enum", "integer", "date", "injected"
985+
https://docs.aws.amazon.com/athena/latest/ug/partition-projection-supported-types.html
986+
(e.g. {'col_name': 'enum', 'col2_name': 'integer'})
987+
projection_ranges: Optional[Dict[str, str]]
988+
Dictionary of partitions names and Athena projections ranges.
989+
https://docs.aws.amazon.com/athena/latest/ug/partition-projection-supported-types.html
990+
(e.g. {'col_name': '0,10', 'col2_name': '-1,8675309'})
991+
projection_values: Optional[Dict[str, str]]
992+
Dictionary of partitions names and Athena projections values.
993+
https://docs.aws.amazon.com/athena/latest/ug/partition-projection-supported-types.html
994+
(e.g. {'col_name': 'A,B,Unknown', 'col2_name': 'foo,boo,bar'})
995+
projection_intervals: Optional[Dict[str, str]]
996+
Dictionary of partitions names and Athena projections intervals.
997+
https://docs.aws.amazon.com/athena/latest/ug/partition-projection-supported-types.html
998+
(e.g. {'col_name': '1', 'col2_name': '5'})
999+
projection_digits: Optional[Dict[str, str]]
1000+
Dictionary of partitions names and Athena projections digits.
1001+
https://docs.aws.amazon.com/athena/latest/ug/partition-projection-supported-types.html
1002+
(e.g. {'col_name': '1', 'col2_name': '2'})
9371003
boto3_session : boto3.Session(), optional
9381004
Boto3 Session. The default boto3 session will be used if boto3_session receive None.
9391005
@@ -980,27 +1046,77 @@ def create_csv_table(
9801046
boto3_session=session,
9811047
table_input=table_input,
9821048
table_exist=does_table_exist(database=database, table=table, boto3_session=session),
1049+
partitions_types=partitions_types,
1050+
projection_enabled=projection_enabled,
1051+
projection_types=projection_types,
1052+
projection_ranges=projection_ranges,
1053+
projection_values=projection_values,
1054+
projection_intervals=projection_intervals,
1055+
projection_digits=projection_digits,
9831056
)
9841057

9851058

986-
def _create_table(
1059+
def _create_table( # pylint: disable=too-many-branches,too-many-statements
9871060
database: str,
9881061
table: str,
9891062
description: Optional[str],
9901063
parameters: Optional[Dict[str, str]],
991-
columns_comments: Optional[Dict[str, str]],
9921064
mode: str,
9931065
catalog_versioning: bool,
9941066
boto3_session: Optional[boto3.Session],
9951067
table_input: Dict[str, Any],
9961068
table_exist: bool,
1069+
projection_enabled: bool,
1070+
partitions_types: Optional[Dict[str, str]] = None,
1071+
columns_comments: Optional[Dict[str, str]] = None,
1072+
projection_types: Optional[Dict[str, str]] = None,
1073+
projection_ranges: Optional[Dict[str, str]] = None,
1074+
projection_values: Optional[Dict[str, str]] = None,
1075+
projection_intervals: Optional[Dict[str, str]] = None,
1076+
projection_digits: Optional[Dict[str, str]] = None,
9971077
):
1078+
# Description
9981079
if description is not None:
9991080
table_input["Description"] = description
1000-
if parameters is not None:
1001-
for k, v in parameters.items():
1002-
table_input["Parameters"][k] = v
1003-
if columns_comments is not None:
1081+
1082+
# Parameters & Projection
1083+
parameters = parameters if parameters else {}
1084+
partitions_types = partitions_types if partitions_types else {}
1085+
projection_types = projection_types if projection_types else {}
1086+
projection_ranges = projection_ranges if projection_ranges else {}
1087+
projection_values = projection_values if projection_values else {}
1088+
projection_intervals = projection_intervals if projection_intervals else {}
1089+
projection_digits = projection_digits if projection_digits else {}
1090+
projection_types = {sanitize_column_name(k): v for k, v in projection_types.items()}
1091+
projection_ranges = {sanitize_column_name(k): v for k, v in projection_ranges.items()}
1092+
projection_values = {sanitize_column_name(k): v for k, v in projection_values.items()}
1093+
projection_intervals = {sanitize_column_name(k): v for k, v in projection_intervals.items()}
1094+
projection_digits = {sanitize_column_name(k): v for k, v in projection_digits.items()}
1095+
for k, v in partitions_types.items():
1096+
if v == "date":
1097+
table_input["Parameters"][f"projection.{k}.format"] = "yyyy-MM-dd"
1098+
elif v == "timestamp":
1099+
table_input["Parameters"][f"projection.{k}.format"] = "yyyy-MM-dd HH:mm:ss"
1100+
table_input["Parameters"][f"projection.{k}.interval.unit"] = "SECONDS"
1101+
table_input["Parameters"][f"projection.{k}.interval"] = "1"
1102+
for k, v in projection_types.items():
1103+
table_input["Parameters"][f"projection.{k}.type"] = v
1104+
for k, v in projection_ranges.items():
1105+
table_input["Parameters"][f"projection.{k}.range"] = v
1106+
for k, v in projection_values.items():
1107+
table_input["Parameters"][f"projection.{k}.values"] = v
1108+
for k, v in projection_intervals.items():
1109+
table_input["Parameters"][f"projection.{k}.interval"] = str(v)
1110+
for k, v in projection_digits.items():
1111+
table_input["Parameters"][f"projection.{k}.digits"] = str(v)
1112+
parameters["projection.enabled"] = "true" if projection_enabled is True else "false"
1113+
for k, v in parameters.items():
1114+
table_input["Parameters"][k] = v
1115+
1116+
# Column comments
1117+
columns_comments = columns_comments if columns_comments else {}
1118+
columns_comments = {sanitize_column_name(k): v for k, v in columns_comments.items()}
1119+
if columns_comments:
10041120
for col in table_input["StorageDescriptor"]["Columns"]:
10051121
name: str = col["Name"]
10061122
if name in columns_comments:
@@ -1009,6 +1125,7 @@ def _create_table(
10091125
name = par["Name"]
10101126
if name in columns_comments:
10111127
par["Comment"] = columns_comments[name]
1128+
10121129
session: boto3.Session = _utils.ensure_session(session=boto3_session)
10131130
client_glue: boto3.client = _utils.client(service_name="glue", session=session)
10141131
skip_archive: bool = not catalog_versioning

0 commit comments

Comments
 (0)