Skip to content

Commit b849f1a

Browse files
committed
Updating all dependencies
1 parent 90576b2 commit b849f1a

File tree

12 files changed

+92
-60
lines changed

12 files changed

+92
-60
lines changed

awswrangler/pandas.py

Lines changed: 27 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -8,9 +8,11 @@
88
from datetime import datetime
99
import ast
1010

11+
from botocore.exceptions import ClientError, HTTPClientError # type: ignore
1112
import pandas as pd # type: ignore
1213
import pyarrow as pa # type: ignore
1314
from pyarrow import parquet as pq # type: ignore
15+
import tenacity # type: ignore
1416

1517
from awswrangler import data_types
1618
from awswrangler.exceptions import (UnsupportedWriteMode, UnsupportedFileFormat, AthenaQueryError, EmptyS3Object,
@@ -880,8 +882,18 @@ def write_csv_dataframe(dataframe, path, preserve_index, compression, fs, extra_
880882
csv_buffer = bytes(
881883
dataframe.to_csv(None, header=False, index=preserve_index, compression=compression, **csv_extra_args),
882884
"utf-8")
885+
Pandas._write_csv_to_s3_retrying(fs=fs, path=path, buffer=csv_buffer)
886+
887+
@staticmethod
888+
@tenacity.retry(
889+
retry=tenacity.retry_if_exception_type(exception_types=(ClientError, HTTPClientError)),
890+
wait=tenacity.wait_random_exponential(multiplier=0.5, max=10),
891+
stop=tenacity.stop_after_attempt(max_attempt_number=15),
892+
reraise=True,
893+
)
894+
def _write_csv_to_s3_retrying(fs: Any, path: str, buffer: bytes) -> None:
883895
with fs.open(path, "wb") as f:
884-
f.write(csv_buffer)
896+
f.write(buffer)
885897

886898
@staticmethod
887899
def write_parquet_dataframe(dataframe, path, preserve_index, compression, fs, cast_columns, isolated_dataframe):
@@ -906,18 +918,29 @@ def write_parquet_dataframe(dataframe, path, preserve_index, compression, fs, ca
906918
for col_name, dtype in cast_columns.items():
907919
col_index = table.column_names.index(col_name)
908920
pyarrow_dtype = data_types.athena2pyarrow(dtype)
909-
table = table.set_column(col_index, table.column(col_name).cast(pyarrow_dtype))
921+
field = pa.field(name=col_name, type=pyarrow_dtype)
922+
table = table.set_column(col_index, field, table.column(col_name).cast(pyarrow_dtype))
910923
logger.debug(f"Casting column {col_name} ({col_index}) to {dtype} ({pyarrow_dtype})")
911924

912925
# Persisting on S3
913-
with fs.open(path, "wb") as f:
914-
pq.write_table(table, f, compression=compression, coerce_timestamps="ms", flavor="spark")
926+
Pandas._write_parquet_to_s3_retrying(fs=fs, path=path, table=table, compression=compression)
915927

916928
# Casting back on Pandas if necessary
917929
if isolated_dataframe is False:
918930
for col in casted_in_pandas:
919931
dataframe[col] = dataframe[col].astype("Int64")
920932

933+
@staticmethod
934+
@tenacity.retry(
935+
retry=tenacity.retry_if_exception_type(exception_types=[ClientError, HTTPClientError]),
936+
wait=tenacity.wait_random_exponential(multiplier=0.5, max=10),
937+
stop=tenacity.stop_after_attempt(max_attempt_number=15),
938+
reraise=True,
939+
)
940+
def _write_parquet_to_s3_retrying(fs: Any, path: str, table: pa.Table, compression: str) -> None:
941+
with fs.open(path, "wb") as f:
942+
pq.write_table(table, f, compression=compression, coerce_timestamps="ms", flavor="spark")
943+
921944
def to_redshift(
922945
self,
923946
dataframe: pd.DataFrame,

awswrangler/s3.py

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -2,7 +2,7 @@
22
from math import ceil
33
import logging
44

5-
from botocore.exceptions import ClientError # type: ignore
5+
from botocore.exceptions import ClientError, HTTPClientError # type: ignore
66
import s3fs # type: ignore
77
import tenacity # type: ignore
88

@@ -203,7 +203,7 @@ def list_objects(self, path):
203203

204204
@staticmethod
205205
@tenacity.retry(
206-
retry=tenacity.retry_if_exception_type(exception_types=ClientError),
206+
retry=tenacity.retry_if_exception_type(exception_types=(ClientError, HTTPClientError)),
207207
wait=tenacity.wait_random_exponential(multiplier=0.5, max=10),
208208
stop=tenacity.stop_after_attempt(max_attempt_number=15),
209209
reraise=True,

awswrangler/spark.py

Lines changed: 16 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -1,9 +1,12 @@
11
from typing import List, Tuple, Dict
22
import logging
3+
import os
34

45
import pandas as pd # type: ignore
56

6-
from pyspark import sql
7+
from pyspark.sql.functions import pandas_udf, PandasUDFType, spark_partition_id
8+
from pyspark.sql.types import TimestampType
9+
from pyspark.sql import DataFrame
710

811
from awswrangler.exceptions import MissingBatchDetected, UnsupportedFileFormat
912

@@ -35,7 +38,7 @@ def _extract_casts(dtypes):
3538
def date2timestamp(dataframe):
3639
for name, dtype in dataframe.dtypes:
3740
if dtype == "date":
38-
dataframe = dataframe.withColumn(name, dataframe[name].cast(sql.types.TimestampType()))
41+
dataframe = dataframe.withColumn(name, dataframe[name].cast(TimestampType()))
3942
logger.warning(f"Casting column {name} from date to timestamp!")
4043
return dataframe
4144

@@ -93,9 +96,13 @@ def to_redshift(
9396
spark.conf.set("spark.sql.execution.arrow.enabled", "true")
9497
session_primitives = self._session.primitives
9598

96-
@sql.functions.pandas_udf(returnType="objects_paths string",
97-
functionType=sql.functions.PandasUDFType.GROUPED_MAP)
99+
@pandas_udf(returnType="objects_paths string", functionType=PandasUDFType.GROUPED_MAP)
98100
def write(pandas_dataframe):
101+
# Exporting ARROW_PRE_0_15_IPC_FORMAT environment variable for
102+
# a temporary workaround while waiting for Apache Arrow updates
103+
# https://stackoverflow.com/questions/58273063/pandasudf-and-pyarrow-0-15-0
104+
os.environ["ARROW_PRE_0_15_IPC_FORMAT"] = "1"
105+
99106
del pandas_dataframe["aws_data_wrangler_internal_partition_id"]
100107
paths = session_primitives.session.pandas.to_parquet(dataframe=pandas_dataframe,
101108
path=path,
@@ -106,7 +113,7 @@ def write(pandas_dataframe):
106113
return pd.DataFrame.from_dict({"objects_paths": paths})
107114

108115
df_objects_paths = dataframe.repartition(numPartitions=num_partitions) \
109-
.withColumn("aws_data_wrangler_internal_partition_id", sql.functions.spark_partition_id()) \
116+
.withColumn("aws_data_wrangler_internal_partition_id", spark_partition_id()) \
110117
.groupby("aws_data_wrangler_internal_partition_id") \
111118
.apply(write)
112119

@@ -255,7 +262,7 @@ def _flatten_struct_column(path: str, dtype: str) -> List[Tuple[str, str]]:
255262
return cols
256263

257264
@staticmethod
258-
def _flatten_struct_dataframe(df: sql.DataFrame, explode_outer: bool = True,
265+
def _flatten_struct_dataframe(df: DataFrame, explode_outer: bool = True,
259266
explode_pos: bool = True) -> List[Tuple[str, str, str]]:
260267
explode: str = "EXPLODE_OUTER" if explode_outer is True else "EXPLODE"
261268
explode = f"POS{explode}" if explode_pos is True else explode
@@ -294,8 +301,8 @@ def _build_name(name: str, expr: str) -> str:
294301
return f"{name}_{suffix}".replace(".", "_")
295302

296303
@staticmethod
297-
def flatten(dataframe: sql.DataFrame, explode_outer: bool = True, explode_pos: bool = True,
298-
name: str = "root") -> Dict[str, sql.DataFrame]:
304+
def flatten(dataframe: DataFrame, explode_outer: bool = True, explode_pos: bool = True,
305+
name: str = "root") -> Dict[str, DataFrame]:
299306
"""
300307
Convert a complex nested DataFrame in one (or many) flat DataFrames
301308
If a columns is a struct it is flatten directly.
@@ -311,7 +318,7 @@ def flatten(dataframe: sql.DataFrame, explode_outer: bool = True, explode_pos: b
311318
explode_pos=explode_pos)
312319
exprs_arr: List[str] = [x[2] for x in cols_exprs if Spark._is_array_or_map(x[1])]
313320
exprs: List[str] = [x[2] for x in cols_exprs if not Spark._is_array_or_map(x[1])]
314-
dfs: Dict[str, sql.DataFrame] = {name: dataframe.selectExpr(exprs)}
321+
dfs: Dict[str, DataFrame] = {name: dataframe.selectExpr(exprs)}
315322
exprs = [x[2] for x in cols_exprs if not Spark._is_array_or_map(x[1]) and not x[0].endswith("_pos")]
316323
for expr in exprs_arr:
317324
df_arr = dataframe.selectExpr(exprs + [expr])

building/Dockerfile

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -1,12 +1,12 @@
11
FROM lambci/lambda:build-python3.6
22

3-
RUN pip install --upgrade pip
3+
RUN pip3 install --upgrade pip
44

55
ADD requirements.txt /root/
6-
RUN pip install --upgrade -r /root/requirements.txt
6+
RUN pip3 install --upgrade -r /root/requirements.txt
77
RUN rm -rf /root/requirements.txt
88
ADD requirements-dev.txt /root/
9-
RUN pip install --upgrade -r /root/requirements-dev.txt
9+
RUN pip3 install --upgrade -r /root/requirements-dev.txt
1010
RUN rm -rf /root/requirements-dev.txt
1111

1212
ENTRYPOINT ["/bin/sh"]

building/build-lambda-layer.sh

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -7,7 +7,7 @@ cd ~
77
# Clone desired Arrow version
88
rm -rf arrow dist pyarrow*
99
git clone \
10-
--branch apache-arrow-0.14.1 \
10+
--branch apache-arrow-0.15.1 \
1111
--single-branch \
1212
https://github.com/apache/arrow.git
1313

@@ -43,6 +43,7 @@ make install
4343
popd
4444

4545
# Build Pyarrow
46+
export ARROW_PRE_0_15_IPC_FORMAT=1
4647
export PYARROW_WITH_FLIGHT=0
4748
export PYARROW_WITH_GANDIVA=0
4849
export PYARROW_WITH_ORC=0

requirements-dev.txt

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -1,10 +1,10 @@
11
yapf~=0.28.0
22
mypy~=0.740
3-
flake8~=3.7.8
3+
flake8~=3.7.9
44
pytest-cov~=2.8.1
5-
cfn-lint~=0.24.4
5+
cfn-lint~=0.25.0
66
twine~=2.0.0
77
wheel~=0.33.6
8-
sphinx~=2.2.0
8+
sphinx~=2.2.1
99
pyspark~=2.4.4
10-
pyspark-stubs~=2.4.0
10+
pyspark-stubs~=2.4.0.post6

requirements.txt

Lines changed: 7 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -1,8 +1,8 @@
1-
numpy~=1.17.3
2-
pandas~=0.25.2
3-
pyarrow~=0.14.0
4-
botocore~=1.13.2
5-
boto3~=1.10.2
6-
s3fs~=0.3.5
7-
tenacity~=5.1.1
1+
numpy~=1.17.4
2+
pandas~=0.25.3
3+
pyarrow~=0.15.1
4+
botocore~=1.13.18
5+
boto3~=1.10.18
6+
s3fs~=0.4.0
7+
tenacity~=6.0.0
88
pg8000~=1.13.2

setup.py

Lines changed: 7 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -21,13 +21,13 @@
2121
packages=find_packages(include=["awswrangler", "awswrangler.*"], exclude=["tests"]),
2222
python_requires=">=3.6",
2323
install_requires=[
24-
"numpy~=1.17.3",
25-
"pandas~=0.25.2",
26-
"pyarrow~=0.14.0",
27-
"botocore~=1.12.253",
28-
"boto3~=1.9.253",
29-
"s3fs~=0.3.5",
30-
"tenacity~=5.1.1",
24+
"numpy~=1.17.4",
25+
"pandas~=0.25.3",
26+
"pyarrow~=0.15.1",
27+
"botocore~=1.13.18",
28+
"boto3~=1.10.18",
29+
"s3fs~=0.4.0",
30+
"tenacity~=6.0.0",
3131
"pg8000~=1.13.2",
3232
],
3333
)

testing/Dockerfile

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -10,8 +10,8 @@ RUN curl https://pyenv.run | bash
1010
RUN echo 'eval "$(pyenv init -)"' >> /root/.bashrc
1111
ENV PATH="/root/.pyenv/bin:$PATH"
1212
RUN pyenv install 3.6.8
13-
RUN pyenv install 3.7.3
14-
RUN pyenv global 3.7.3 3.6.8
13+
RUN pyenv install 3.7.4
14+
RUN pyenv global 3.7.4 3.6.8
1515
ENV PYSPARK_PYTHON=python
1616
ENV PIP=/root/.pyenv/shims/pip
1717
RUN $PIP install --upgrade pip

testing/run-tests.sh

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -2,9 +2,9 @@
22
set -e
33

44
cd ..
5-
pip install --upgrade -e .
65
yapf --in-place --recursive setup.py awswrangler testing/test_awswrangler
76
mypy awswrangler
87
flake8 setup.py awswrangler testing/test_awswrangler
8+
pip install --upgrade -e .
99
pytest --cov=awswrangler testing/test_awswrangler
1010
cd testing

0 commit comments

Comments
 (0)