Skip to content

Commit 34608fa

Browse files
authored
Merge pull request #249 from awslabs/dev
Bumping version to 1.2.0
2 parents 4f962d5 + c9ec52c commit 34608fa

34 files changed

+1762
-342
lines changed

.github/workflows/static-checking.yml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -25,7 +25,7 @@ jobs:
2525
with:
2626
python-version: ${{ matrix.python-version }}
2727
- name: Setup Environment
28-
run: ./setup-dev-env.sh
28+
run: ./requirements.sh
2929
- name: CloudFormation Lint
3030
run: cfn-lint -t testing/cloudformation.yaml
3131
- name: Documentation Lint

CONTRIBUTING.md

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -82,7 +82,7 @@ We may ask you to sign a [Contributor License Agreement (CLA)](http://en.wikiped
8282

8383
* Then run the command bellow to install all dependencies:
8484

85-
`./setup-dev-env.sh`
85+
`./requirements.sh`
8686

8787
* Go to the ``testing`` directory
8888

README.md

Lines changed: 3 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -1,17 +1,9 @@
11
# AWS Data Wrangler
22
*Pandas on AWS*
33

4-
---
5-
6-
**NOTE**
7-
8-
Due the new major version `1.0.0` with breaking changes, please make sure that all your old projects has dependencies frozen on the desired version (e.g. `pip install awswrangler==0.3.2`). You can always read the legacy docs [here](https://aws-data-wrangler.readthedocs.io/en/legacy/).
9-
10-
---
11-
124
![AWS Data Wrangler](docs/source/_static/logo2.png?raw=true "AWS Data Wrangler")
135

14-
[![Release](https://img.shields.io/badge/release-1.1.2-brightgreen.svg)](https://pypi.org/project/awswrangler/)
6+
[![Release](https://img.shields.io/badge/release-1.2.0-brightgreen.svg)](https://pypi.org/project/awswrangler/)
157
[![Python Version](https://img.shields.io/badge/python-3.6%20%7C%203.7%20%7C%203.8-brightgreen.svg)](https://anaconda.org/conda-forge/awswrangler)
168
[![Code style: black](https://img.shields.io/badge/code%20style-black-000000.svg)](https://github.com/psf/black)
179
[![License](https://img.shields.io/badge/License-Apache%202.0-blue.svg)](https://opensource.org/licenses/Apache-2.0)
@@ -84,6 +76,7 @@ df = wr.db.read_sql_query("SELECT * FROM external_schema.my_table", con=engine)
8476
- [11 - CSV Datasets](https://github.com/awslabs/aws-data-wrangler/blob/master/tutorials/11%20-%20CSV%20Datasets.ipynb)
8577
- [12 - CSV Crawler](https://github.com/awslabs/aws-data-wrangler/blob/master/tutorials/12%20-%20CSV%20Crawler.ipynb)
8678
- [13 - Merging Datasets on S3](https://github.com/awslabs/aws-data-wrangler/blob/master/tutorials/13%20-%20Merging%20Datasets%20on%20S3.ipynb)
79+
- [14 - Schema Evolution](https://github.com/awslabs/aws-data-wrangler/blob/master/tutorials/14%20-%20Schema%20Evolution.ipynb)
8780
- [15 - EMR](https://github.com/awslabs/aws-data-wrangler/blob/master/tutorials/15%20-%20EMR.ipynb)
8881
- [16 - EMR & Docker](https://github.com/awslabs/aws-data-wrangler/blob/master/tutorials/16%20-%20EMR%20%26%20Docker.ipynb)
8982
- [**API Reference**](https://aws-data-wrangler.readthedocs.io/en/latest/api.html)
@@ -95,3 +88,4 @@ df = wr.db.read_sql_query("SELECT * FROM external_schema.my_table", con=engine)
9588
- [CloudWatch Logs](https://aws-data-wrangler.readthedocs.io/en/latest/api.html#cloudwatch-logs)
9689
- [**License**](https://github.com/awslabs/aws-data-wrangler/blob/master/LICENSE)
9790
- [**Contributing**](https://github.com/awslabs/aws-data-wrangler/blob/master/CONTRIBUTING.md)
91+
- [**Legacy Docs** (pre-1.0.0)](https://aws-data-wrangler.readthedocs.io/en/legacy/)

awswrangler/__metadata__.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -7,5 +7,5 @@
77

88
__title__ = "awswrangler"
99
__description__ = "Pandas on AWS."
10-
__version__ = "1.1.2"
10+
__version__ = "1.2.0"
1111
__license__ = "Apache License 2.0"

awswrangler/_data_types.py

Lines changed: 18 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -376,25 +376,11 @@ def athena_types_from_pyarrow_schema(
376376
_logger.debug("columns_types: %s", columns_types)
377377
partitions_types: Optional[Dict[str, str]] = None
378378
if partitions is not None:
379-
partitions_types = {p.name: pyarrow2athena(p.dictionary.type) for p in partitions}
379+
partitions_types = {p.name: pyarrow2athena(p.dictionary.type) for p in partitions} # pragma: no cover
380380
_logger.debug("partitions_types: %s", partitions_types)
381381
return columns_types, partitions_types
382382

383383

384-
def athena_partitions_from_pyarrow_partitions(
385-
path: str, partitions: pyarrow.parquet.ParquetPartitions
386-
) -> Dict[str, List[str]]:
387-
"""Extract the related Athena partitions values from any PyArrow Partitions."""
388-
path = path if path[-1] == "/" else f"{path}/"
389-
partitions_values: Dict[str, List[str]] = {}
390-
names: List[str] = [p.name for p in partitions]
391-
for values in zip(*[p.keys for p in partitions]):
392-
suffix: str = "/".join([f"{n}={v}" for n, v in zip(names, values)])
393-
suffix = suffix if suffix[-1] == "/" else f"{suffix}/"
394-
partitions_values[f"{path}{suffix}"] = list(values)
395-
return partitions_values
396-
397-
398384
def cast_pandas_with_athena_types(df: pd.DataFrame, dtype: Dict[str, str]) -> pd.DataFrame:
399385
"""Cast columns in a Pandas DataFrame."""
400386
for col, athena_type in dtype.items():
@@ -410,10 +396,25 @@ def cast_pandas_with_athena_types(df: pd.DataFrame, dtype: Dict[str, str]) -> pd
410396
df[col] = (
411397
df[col]
412398
.astype("string")
413-
.apply(lambda x: Decimal(str(x)) if str(x) not in ("", "none", " ", "<NA>") else None)
399+
.apply(lambda x: Decimal(str(x)) if str(x) not in ("", "none", "None", " ", "<NA>") else None)
414400
)
401+
elif pandas_type == "string":
402+
curr_type: str = str(df[col].dtypes)
403+
if curr_type.startswith("int") or curr_type.startswith("float"):
404+
df[col] = df[col].astype(str).astype("string")
405+
else:
406+
df[col] = df[col].astype("string")
415407
else:
416-
df[col] = df[col].astype(pandas_type)
408+
try:
409+
df[col] = df[col].astype(pandas_type)
410+
except TypeError as ex:
411+
if "object cannot be converted to an IntegerDtype" not in str(ex):
412+
raise ex # pragma: no cover
413+
df[col] = (
414+
df[col]
415+
.apply(lambda x: int(x) if str(x) not in ("", "none", "None", " ", "<NA>") else None)
416+
.astype(pandas_type)
417+
)
417418
return df
418419

419420

awswrangler/_utils.py

Lines changed: 62 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -3,6 +3,7 @@
33
import logging
44
import math
55
import os
6+
import random
67
from typing import Any, Dict, Generator, List, Optional, Tuple
78

89
import boto3 # type: ignore
@@ -11,14 +12,20 @@
1112
import psycopg2 # type: ignore
1213
import s3fs # type: ignore
1314

14-
logger: logging.Logger = logging.getLogger(__name__)
15+
from awswrangler import exceptions
16+
17+
_logger: logging.Logger = logging.getLogger(__name__)
1518

1619

1720
def ensure_session(session: Optional[boto3.Session] = None) -> boto3.Session:
1821
"""Ensure that a valid boto3.Session will be returned."""
1922
if session is not None:
2023
return session
21-
return boto3.Session()
24+
# Ensure the boto3's default session is used so that its parameters can be
25+
# set via boto3.setup_default_session()
26+
if boto3.DEFAULT_SESSION is not None:
27+
return boto3.DEFAULT_SESSION
28+
return boto3.Session() # pragma: no cover
2229

2330

2431
def client(service_name: str, session: Optional[boto3.Session] = None) -> boto3.client:
@@ -124,6 +131,8 @@ def chunkify(lst: List[Any], num_chunks: int = 1, max_length: Optional[int] = No
124131
[[0, 1, 2, 3], [4, 5, 6], [7, 8, 9], [10, 11, 12]]
125132
126133
"""
134+
if not lst:
135+
return [] # pragma: no cover
127136
n: int = num_chunks if max_length is None else int(math.ceil((float(len(lst)) / float(max_length))))
128137
np_chunks = np.array_split(lst, n)
129138
return [arr.tolist() for arr in np_chunks if len(arr) > 0]
@@ -179,3 +188,54 @@ def get_region_from_subnet(subnet_id: str, boto3_session: Optional[boto3.Session
179188
session: boto3.Session = ensure_session(session=boto3_session)
180189
client_ec2: boto3.client = client(service_name="ec2", session=session)
181190
return client_ec2.describe_subnets(SubnetIds=[subnet_id])["Subnets"][0]["AvailabilityZone"][:9]
191+
192+
193+
def extract_partitions_from_paths(
194+
path: str, paths: List[str]
195+
) -> Tuple[Optional[Dict[str, str]], Optional[Dict[str, List[str]]]]:
196+
"""Extract partitions from Amazon S3 paths."""
197+
path = path if path.endswith("/") else f"{path}/"
198+
partitions_types: Dict[str, str] = {}
199+
partitions_values: Dict[str, List[str]] = {}
200+
for p in paths:
201+
if path not in p:
202+
raise exceptions.InvalidArgumentValue(
203+
f"Object {p} is not under the root path ({path})."
204+
) # pragma: no cover
205+
path_wo_filename: str = p.rpartition("/")[0] + "/"
206+
if path_wo_filename not in partitions_values:
207+
path_wo_prefix: str = p.replace(f"{path}/", "")
208+
dirs: List[str] = [x for x in path_wo_prefix.split("/") if (x != "") and ("=" in x)]
209+
if dirs:
210+
values_tups: List[Tuple[str, str]] = [tuple(x.split("=")[:2]) for x in dirs] # type: ignore
211+
values_dics: Dict[str, str] = dict(values_tups)
212+
p_values: List[str] = list(values_dics.values())
213+
p_types: Dict[str, str] = {x: "string" for x in values_dics.keys()}
214+
if not partitions_types:
215+
partitions_types = p_types
216+
if p_values:
217+
partitions_types = p_types
218+
partitions_values[path_wo_filename] = p_values
219+
elif p_types != partitions_types: # pragma: no cover
220+
raise exceptions.InvalidSchemaConvergence(
221+
f"At least two different partitions schema detected: {partitions_types} and {p_types}"
222+
)
223+
if not partitions_types:
224+
return None, None
225+
return partitions_types, partitions_values
226+
227+
228+
def list_sampling(lst: List[Any], sampling: float) -> List[Any]:
229+
"""Random List sampling."""
230+
if sampling > 1.0 or sampling <= 0.0: # pragma: no cover
231+
raise exceptions.InvalidArgumentValue(f"Argument <sampling> must be [0.0 < value <= 1.0]. {sampling} received.")
232+
_len: int = len(lst)
233+
if _len == 0:
234+
return [] # pragma: no cover
235+
num_samples: int = int(round(_len * sampling))
236+
num_samples = _len if num_samples > _len else num_samples
237+
num_samples = 1 if num_samples < 1 else num_samples
238+
_logger.debug("_len: %s", _len)
239+
_logger.debug("sampling: %s", sampling)
240+
_logger.debug("num_samples: %s", num_samples)
241+
return random.sample(population=lst, k=num_samples)

awswrangler/athena.py

Lines changed: 17 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -282,6 +282,8 @@ def repair_table(
282282
283283
"""
284284
query = f"MSCK REPAIR TABLE `{table}`;"
285+
if (database is not None) and (not database.startswith("`")):
286+
database = f"`{database}`"
285287
session: boto3.Session = _utils.ensure_session(session=boto3_session)
286288
query_id = start_query_execution(
287289
sql=query,
@@ -492,7 +494,7 @@ def read_sql_query( # pylint: disable=too-many-branches,too-many-locals,too-man
492494
path: str = f"{_s3_output}/{name}"
493495
ext_location: str = "\n" if wg_config["enforced"] is True else f",\n external_location = '{path}'\n"
494496
sql = (
495-
f"CREATE TABLE {name}\n"
497+
f'CREATE TABLE "{name}"\n'
496498
f"WITH(\n"
497499
f" format = 'Parquet',\n"
498500
f" parquet_compression = 'SNAPPY'"
@@ -512,7 +514,20 @@ def read_sql_query( # pylint: disable=too-many-branches,too-many-locals,too-man
512514
boto3_session=session,
513515
)
514516
_logger.debug("query_id: %s", query_id)
515-
query_response: Dict[str, Any] = wait_query(query_execution_id=query_id, boto3_session=session)
517+
try:
518+
query_response: Dict[str, Any] = wait_query(query_execution_id=query_id, boto3_session=session)
519+
except exceptions.QueryFailed as ex:
520+
if ctas_approach is True:
521+
if "Column name not specified" in str(ex):
522+
raise exceptions.InvalidArgumentValue(
523+
"Please, define all columns names in your query. (E.g. 'SELECT MAX(col1) AS max_col1, ...')"
524+
)
525+
if "Column type is unknown" in str(ex):
526+
raise exceptions.InvalidArgumentValue(
527+
"Please, define all columns types in your query. "
528+
"(E.g. 'SELECT CAST(NULL AS INTEGER) AS MY_COL, ...')"
529+
)
530+
raise ex # pragma: no cover
516531
if query_response["QueryExecution"]["Status"]["State"] in ["FAILED", "CANCELLED"]: # pragma: no cover
517532
reason: str = query_response["QueryExecution"]["Status"]["StateChangeReason"]
518533
message_error: str = f"Query error: {reason}"

0 commit comments

Comments
 (0)