Skip to content

Commit 57b6133

Browse files
authored
Merge pull request #266 from awslabs/dev
Releasing version 1.3.0
2 parents 34608fa + a660e59 commit 57b6133

18 files changed

+1641
-101
lines changed

.pylintrc

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -331,7 +331,7 @@ indent-string=' '
331331
max-line-length=120
332332

333333
# Maximum number of lines in a module.
334-
max-module-lines=2500
334+
max-module-lines=4000
335335

336336
# List of optional constructs for which whitespace checking is disabled. `dict-
337337
# separator` is used to allow tabulation in dicts, etc.: {1 : 1,\n222: 2}.
@@ -547,7 +547,7 @@ valid-metaclass-classmethod-first-arg=cls
547547
[DESIGN]
548548

549549
# Maximum number of arguments for function / method.
550-
max-args=15
550+
max-args=20
551551

552552
# Maximum number of attributes for a class (see R0902).
553553
max-attributes=7
@@ -556,10 +556,10 @@ max-attributes=7
556556
max-bool-expr=5
557557

558558
# Maximum number of branch for function / method body.
559-
max-branches=15
559+
max-branches=20
560560

561561
# Maximum number of locals for function / method body.
562-
max-locals=30
562+
max-locals=35
563563

564564
# Maximum number of parents for a class (see R0901).
565565
max-parents=7

README.md

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -3,7 +3,7 @@
33

44
![AWS Data Wrangler](docs/source/_static/logo2.png?raw=true "AWS Data Wrangler")
55

6-
[![Release](https://img.shields.io/badge/release-1.2.0-brightgreen.svg)](https://pypi.org/project/awswrangler/)
6+
[![Release](https://img.shields.io/badge/release-1.3.0-brightgreen.svg)](https://pypi.org/project/awswrangler/)
77
[![Python Version](https://img.shields.io/badge/python-3.6%20%7C%203.7%20%7C%203.8-brightgreen.svg)](https://anaconda.org/conda-forge/awswrangler)
88
[![Code style: black](https://img.shields.io/badge/code%20style-black-000000.svg)](https://github.com/psf/black)
99
[![License](https://img.shields.io/badge/License-Apache%202.0-blue.svg)](https://opensource.org/licenses/Apache-2.0)
@@ -79,6 +79,7 @@ df = wr.db.read_sql_query("SELECT * FROM external_schema.my_table", con=engine)
7979
- [14 - Schema Evolution](https://github.com/awslabs/aws-data-wrangler/blob/master/tutorials/14%20-%20Schema%20Evolution.ipynb)
8080
- [15 - EMR](https://github.com/awslabs/aws-data-wrangler/blob/master/tutorials/15%20-%20EMR.ipynb)
8181
- [16 - EMR & Docker](https://github.com/awslabs/aws-data-wrangler/blob/master/tutorials/16%20-%20EMR%20%26%20Docker.ipynb)
82+
- [17 - Partition Projection](https://github.com/awslabs/aws-data-wrangler/blob/master/tutorials/17%20-%20Partition%20Projection.ipynb)
8283
- [**API Reference**](https://aws-data-wrangler.readthedocs.io/en/latest/api.html)
8384
- [Amazon S3](https://aws-data-wrangler.readthedocs.io/en/latest/api.html#amazon-s3)
8485
- [AWS Glue Catalog](https://aws-data-wrangler.readthedocs.io/en/latest/api.html#aws-glue-catalog)

awswrangler/__metadata__.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -7,5 +7,5 @@
77

88
__title__ = "awswrangler"
99
__description__ = "Pandas on AWS."
10-
__version__ = "1.2.0"
10+
__version__ = "1.3.0"
1111
__license__ = "Apache License 2.0"

awswrangler/_data_types.py

Lines changed: 21 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -21,7 +21,7 @@
2121

2222
def athena2pyarrow(dtype: str) -> pa.DataType: # pylint: disable=too-many-return-statements
2323
"""Athena to PyArrow data types conversion."""
24-
dtype = dtype.lower()
24+
dtype = dtype.lower().replace(" ", "")
2525
if dtype == "tinyint":
2626
return pa.int8()
2727
if dtype == "smallint":
@@ -47,6 +47,12 @@ def athena2pyarrow(dtype: str) -> pa.DataType: # pylint: disable=too-many-retur
4747
if dtype.startswith("decimal"):
4848
precision, scale = dtype.replace("decimal(", "").replace(")", "").split(sep=",")
4949
return pa.decimal128(precision=int(precision), scale=int(scale))
50+
if dtype.startswith("array"):
51+
return pa.large_list(athena2pyarrow(dtype=dtype[6:-1]))
52+
if dtype.startswith("struct"):
53+
return pa.struct([(f.split(":", 1)[0], athena2pyarrow(f.split(":", 1)[1])) for f in dtype[7:-1].split(",")])
54+
if dtype.startswith("map"): # pragma: no cover
55+
return pa.map_(athena2pyarrow(dtype[4:-1].split(",", 1)[0]), athena2pyarrow(dtype[4:-1].split(",", 1)[1]))
5056
raise exceptions.UnsupportedType(f"Unsupported Athena type: {dtype}") # pragma: no cover
5157

5258

@@ -77,8 +83,6 @@ def athena2pandas(dtype: str) -> str: # pylint: disable=too-many-branches,too-m
7783
return "decimal"
7884
if dtype in ("binary", "varbinary"):
7985
return "bytes"
80-
if dtype == "array": # pragma: no cover
81-
return "list"
8286
raise exceptions.UnsupportedType(f"Unsupported Athena type: {dtype}") # pragma: no cover
8387

8488

@@ -143,9 +147,9 @@ def pyarrow2athena(dtype: pa.DataType) -> str: # pylint: disable=too-many-branc
143147
if pa.types.is_list(dtype):
144148
return f"array<{pyarrow2athena(dtype=dtype.value_type)}>"
145149
if pa.types.is_struct(dtype):
146-
return f"struct<{', '.join([f'{f.name}:{pyarrow2athena(dtype=f.type)}' for f in dtype])}>"
150+
return f"struct<{','.join([f'{f.name}:{pyarrow2athena(dtype=f.type)}' for f in dtype])}>"
147151
if pa.types.is_map(dtype): # pragma: no cover
148-
return f"map<{pyarrow2athena(dtype=dtype.key_type)},{pyarrow2athena(dtype=dtype.item_type)}>"
152+
return f"map<{pyarrow2athena(dtype=dtype.key_type)}, {pyarrow2athena(dtype=dtype.item_type)}>"
149153
if dtype == pa.null():
150154
raise exceptions.UndetectedType("We can not infer the data type from an entire null object column")
151155
raise exceptions.UnsupportedType(f"Unsupported Pyarrow type: {dtype}") # pragma: no cover
@@ -321,7 +325,7 @@ def athena_types_from_pandas(
321325
athena_columns_types: Dict[str, str] = {}
322326
for k, v in pa_columns_types.items():
323327
if v is None:
324-
athena_columns_types[k] = casts[k]
328+
athena_columns_types[k] = casts[k].replace(" ", "")
325329
else:
326330
athena_columns_types[k] = pyarrow2athena(dtype=v)
327331
_logger.debug("athena_columns_types: %s", athena_columns_types)
@@ -341,12 +345,12 @@ def athena_types_from_pandas_partitioned(
341345
df=df, index=index, dtype=dtype, index_left=index_left
342346
)
343347
columns_types: Dict[str, str] = {}
348+
for col, typ in athena_columns_types.items():
349+
if col not in partitions:
350+
columns_types[col] = typ
344351
partitions_types: Dict[str, str] = {}
345-
for k, v in athena_columns_types.items():
346-
if k in partitions:
347-
partitions_types[k] = v
348-
else:
349-
columns_types[k] = v
352+
for par in partitions:
353+
partitions_types[par] = athena_columns_types[par]
350354
return columns_types, partitions_types
351355

352356

@@ -384,7 +388,12 @@ def athena_types_from_pyarrow_schema(
384388
def cast_pandas_with_athena_types(df: pd.DataFrame, dtype: Dict[str, str]) -> pd.DataFrame:
385389
"""Cast columns in a Pandas DataFrame."""
386390
for col, athena_type in dtype.items():
387-
if col in df.columns:
391+
if (
392+
(col in df.columns)
393+
and (not athena_type.startswith("array"))
394+
and (not athena_type.startswith("struct"))
395+
and (not athena_type.startswith("map"))
396+
):
388397
pandas_type: str = athena2pandas(dtype=athena_type)
389398
if pandas_type == "datetime64":
390399
df[col] = pd.to_datetime(df[col])

awswrangler/_utils.py

Lines changed: 15 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -183,11 +183,24 @@ def get_account_id(boto3_session: Optional[boto3.Session] = None) -> str:
183183
return client(service_name="sts", session=session).get_caller_identity().get("Account")
184184

185185

186-
def get_region_from_subnet(subnet_id: str, boto3_session: Optional[boto3.Session] = None) -> str:
186+
def get_region_from_subnet(subnet_id: str, boto3_session: Optional[boto3.Session] = None) -> str: # pragma: no cover
187187
"""Extract region from Subnet ID."""
188188
session: boto3.Session = ensure_session(session=boto3_session)
189189
client_ec2: boto3.client = client(service_name="ec2", session=session)
190-
return client_ec2.describe_subnets(SubnetIds=[subnet_id])["Subnets"][0]["AvailabilityZone"][:9]
190+
return client_ec2.describe_subnets(SubnetIds=[subnet_id])["Subnets"][0]["AvailabilityZone"][:-1]
191+
192+
193+
def get_region_from_session(boto3_session: Optional[boto3.Session] = None, default_region: Optional[str] = None) -> str:
194+
"""Extract region from session."""
195+
session: boto3.Session = ensure_session(session=boto3_session)
196+
region: Optional[str] = session.region_name
197+
if region is not None:
198+
return region
199+
if default_region is not None: # pragma: no cover
200+
return default_region
201+
raise exceptions.InvalidArgument(
202+
"There is no region_name defined on boto3, please configure it."
203+
) # pragma: no cover
191204

192205

193206
def extract_partitions_from_paths(

awswrangler/catalog.py

Lines changed: 123 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -94,6 +94,12 @@ def create_parquet_table(
9494
columns_comments: Optional[Dict[str, str]] = None,
9595
mode: str = "overwrite",
9696
catalog_versioning: bool = False,
97+
projection_enabled: bool = False,
98+
projection_types: Optional[Dict[str, str]] = None,
99+
projection_ranges: Optional[Dict[str, str]] = None,
100+
projection_values: Optional[Dict[str, str]] = None,
101+
projection_intervals: Optional[Dict[str, str]] = None,
102+
projection_digits: Optional[Dict[str, str]] = None,
97103
boto3_session: Optional[boto3.Session] = None,
98104
) -> None:
99105
"""Create a Parquet Table (Metadata Only) in the AWS Glue Catalog.
@@ -124,6 +130,29 @@ def create_parquet_table(
124130
'overwrite' to recreate any possible existing table or 'append' to keep any possible existing table.
125131
catalog_versioning : bool
126132
If True and `mode="overwrite"`, creates an archived version of the table catalog before updating it.
133+
projection_enabled : bool
134+
Enable Partition Projection on Athena (https://docs.aws.amazon.com/athena/latest/ug/partition-projection.html)
135+
projection_types : Optional[Dict[str, str]]
136+
Dictionary of partitions names and Athena projections types.
137+
Valid types: "enum", "integer", "date", "injected"
138+
https://docs.aws.amazon.com/athena/latest/ug/partition-projection-supported-types.html
139+
(e.g. {'col_name': 'enum', 'col2_name': 'integer'})
140+
projection_ranges: Optional[Dict[str, str]]
141+
Dictionary of partitions names and Athena projections ranges.
142+
https://docs.aws.amazon.com/athena/latest/ug/partition-projection-supported-types.html
143+
(e.g. {'col_name': '0,10', 'col2_name': '-1,8675309'})
144+
projection_values: Optional[Dict[str, str]]
145+
Dictionary of partitions names and Athena projections values.
146+
https://docs.aws.amazon.com/athena/latest/ug/partition-projection-supported-types.html
147+
(e.g. {'col_name': 'A,B,Unknown', 'col2_name': 'foo,boo,bar'})
148+
projection_intervals: Optional[Dict[str, str]]
149+
Dictionary of partitions names and Athena projections intervals.
150+
https://docs.aws.amazon.com/athena/latest/ug/partition-projection-supported-types.html
151+
(e.g. {'col_name': '1', 'col2_name': '5'})
152+
projection_digits: Optional[Dict[str, str]]
153+
Dictionary of partitions names and Athena projections digits.
154+
https://docs.aws.amazon.com/athena/latest/ug/partition-projection-supported-types.html
155+
(e.g. {'col_name': '1', 'col2_name': '2'})
127156
boto3_session : boto3.Session(), optional
128157
Boto3 Session. The default boto3 session will be used if boto3_session receive None.
129158
@@ -150,6 +179,7 @@ def create_parquet_table(
150179
"""
151180
table = sanitize_table_name(table=table)
152181
partitions_types = {} if partitions_types is None else partitions_types
182+
153183
session: boto3.Session = _utils.ensure_session(session=boto3_session)
154184
cat_table_input: Optional[Dict[str, Any]] = _get_table_input(database=database, table=table, boto3_session=session)
155185
table_input: Dict[str, Any]
@@ -188,6 +218,13 @@ def create_parquet_table(
188218
boto3_session=session,
189219
table_input=table_input,
190220
table_exist=table_exist,
221+
partitions_types=partitions_types,
222+
projection_enabled=projection_enabled,
223+
projection_types=projection_types,
224+
projection_ranges=projection_ranges,
225+
projection_values=projection_values,
226+
projection_intervals=projection_intervals,
227+
projection_digits=projection_digits,
191228
)
192229

193230

@@ -903,6 +940,12 @@ def create_csv_table(
903940
catalog_versioning: bool = False,
904941
sep: str = ",",
905942
boto3_session: Optional[boto3.Session] = None,
943+
projection_enabled: bool = False,
944+
projection_types: Optional[Dict[str, str]] = None,
945+
projection_ranges: Optional[Dict[str, str]] = None,
946+
projection_values: Optional[Dict[str, str]] = None,
947+
projection_intervals: Optional[Dict[str, str]] = None,
948+
projection_digits: Optional[Dict[str, str]] = None,
906949
) -> None:
907950
"""Create a CSV Table (Metadata Only) in the AWS Glue Catalog.
908951
@@ -934,6 +977,29 @@ def create_csv_table(
934977
If True and `mode="overwrite"`, creates an archived version of the table catalog before updating it.
935978
sep : str
936979
String of length 1. Field delimiter for the output file.
980+
projection_enabled : bool
981+
Enable Partition Projection on Athena (https://docs.aws.amazon.com/athena/latest/ug/partition-projection.html)
982+
projection_types : Optional[Dict[str, str]]
983+
Dictionary of partitions names and Athena projections types.
984+
Valid types: "enum", "integer", "date", "injected"
985+
https://docs.aws.amazon.com/athena/latest/ug/partition-projection-supported-types.html
986+
(e.g. {'col_name': 'enum', 'col2_name': 'integer'})
987+
projection_ranges: Optional[Dict[str, str]]
988+
Dictionary of partitions names and Athena projections ranges.
989+
https://docs.aws.amazon.com/athena/latest/ug/partition-projection-supported-types.html
990+
(e.g. {'col_name': '0,10', 'col2_name': '-1,8675309'})
991+
projection_values: Optional[Dict[str, str]]
992+
Dictionary of partitions names and Athena projections values.
993+
https://docs.aws.amazon.com/athena/latest/ug/partition-projection-supported-types.html
994+
(e.g. {'col_name': 'A,B,Unknown', 'col2_name': 'foo,boo,bar'})
995+
projection_intervals: Optional[Dict[str, str]]
996+
Dictionary of partitions names and Athena projections intervals.
997+
https://docs.aws.amazon.com/athena/latest/ug/partition-projection-supported-types.html
998+
(e.g. {'col_name': '1', 'col2_name': '5'})
999+
projection_digits: Optional[Dict[str, str]]
1000+
Dictionary of partitions names and Athena projections digits.
1001+
https://docs.aws.amazon.com/athena/latest/ug/partition-projection-supported-types.html
1002+
(e.g. {'col_name': '1', 'col2_name': '2'})
9371003
boto3_session : boto3.Session(), optional
9381004
Boto3 Session. The default boto3 session will be used if boto3_session receive None.
9391005
@@ -980,27 +1046,77 @@ def create_csv_table(
9801046
boto3_session=session,
9811047
table_input=table_input,
9821048
table_exist=does_table_exist(database=database, table=table, boto3_session=session),
1049+
partitions_types=partitions_types,
1050+
projection_enabled=projection_enabled,
1051+
projection_types=projection_types,
1052+
projection_ranges=projection_ranges,
1053+
projection_values=projection_values,
1054+
projection_intervals=projection_intervals,
1055+
projection_digits=projection_digits,
9831056
)
9841057

9851058

986-
def _create_table(
1059+
def _create_table( # pylint: disable=too-many-branches,too-many-statements
9871060
database: str,
9881061
table: str,
9891062
description: Optional[str],
9901063
parameters: Optional[Dict[str, str]],
991-
columns_comments: Optional[Dict[str, str]],
9921064
mode: str,
9931065
catalog_versioning: bool,
9941066
boto3_session: Optional[boto3.Session],
9951067
table_input: Dict[str, Any],
9961068
table_exist: bool,
1069+
projection_enabled: bool,
1070+
partitions_types: Optional[Dict[str, str]] = None,
1071+
columns_comments: Optional[Dict[str, str]] = None,
1072+
projection_types: Optional[Dict[str, str]] = None,
1073+
projection_ranges: Optional[Dict[str, str]] = None,
1074+
projection_values: Optional[Dict[str, str]] = None,
1075+
projection_intervals: Optional[Dict[str, str]] = None,
1076+
projection_digits: Optional[Dict[str, str]] = None,
9971077
):
1078+
# Description
9981079
if description is not None:
9991080
table_input["Description"] = description
1000-
if parameters is not None:
1001-
for k, v in parameters.items():
1002-
table_input["Parameters"][k] = v
1003-
if columns_comments is not None:
1081+
1082+
# Parameters & Projection
1083+
parameters = parameters if parameters else {}
1084+
partitions_types = partitions_types if partitions_types else {}
1085+
projection_types = projection_types if projection_types else {}
1086+
projection_ranges = projection_ranges if projection_ranges else {}
1087+
projection_values = projection_values if projection_values else {}
1088+
projection_intervals = projection_intervals if projection_intervals else {}
1089+
projection_digits = projection_digits if projection_digits else {}
1090+
projection_types = {sanitize_column_name(k): v for k, v in projection_types.items()}
1091+
projection_ranges = {sanitize_column_name(k): v for k, v in projection_ranges.items()}
1092+
projection_values = {sanitize_column_name(k): v for k, v in projection_values.items()}
1093+
projection_intervals = {sanitize_column_name(k): v for k, v in projection_intervals.items()}
1094+
projection_digits = {sanitize_column_name(k): v for k, v in projection_digits.items()}
1095+
for k, v in partitions_types.items():
1096+
if v == "date":
1097+
table_input["Parameters"][f"projection.{k}.format"] = "yyyy-MM-dd"
1098+
elif v == "timestamp":
1099+
table_input["Parameters"][f"projection.{k}.format"] = "yyyy-MM-dd HH:mm:ss"
1100+
table_input["Parameters"][f"projection.{k}.interval.unit"] = "SECONDS"
1101+
table_input["Parameters"][f"projection.{k}.interval"] = "1"
1102+
for k, v in projection_types.items():
1103+
table_input["Parameters"][f"projection.{k}.type"] = v
1104+
for k, v in projection_ranges.items():
1105+
table_input["Parameters"][f"projection.{k}.range"] = v
1106+
for k, v in projection_values.items():
1107+
table_input["Parameters"][f"projection.{k}.values"] = v
1108+
for k, v in projection_intervals.items():
1109+
table_input["Parameters"][f"projection.{k}.interval"] = str(v)
1110+
for k, v in projection_digits.items():
1111+
table_input["Parameters"][f"projection.{k}.digits"] = str(v)
1112+
parameters["projection.enabled"] = "true" if projection_enabled is True else "false"
1113+
for k, v in parameters.items():
1114+
table_input["Parameters"][k] = v
1115+
1116+
# Column comments
1117+
columns_comments = columns_comments if columns_comments else {}
1118+
columns_comments = {sanitize_column_name(k): v for k, v in columns_comments.items()}
1119+
if columns_comments:
10041120
for col in table_input["StorageDescriptor"]["Columns"]:
10051121
name: str = col["Name"]
10061122
if name in columns_comments:
@@ -1009,6 +1125,7 @@ def _create_table(
10091125
name = par["Name"]
10101126
if name in columns_comments:
10111127
par["Comment"] = columns_comments[name]
1128+
10121129
session: boto3.Session = _utils.ensure_session(session=boto3_session)
10131130
client_glue: boto3.client = _utils.client(service_name="glue", session=session)
10141131
skip_archive: bool = not catalog_versioning

0 commit comments

Comments
 (0)