Skip to content

Commit 94b3805

Browse files
authored
Merge pull request #177 from awslabs/csv-decompression
Add csv decompression for s3.read_csv #175
2 parents 2e018ec + c6f076b commit 94b3805

File tree

2 files changed

+76
-3
lines changed

2 files changed

+76
-3
lines changed

awswrangler/s3.py

Lines changed: 7 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -16,6 +16,7 @@
1616
import pyarrow.lib # type: ignore
1717
import pyarrow.parquet # type: ignore
1818
import s3fs # type: ignore
19+
from pandas.io.common import infer_compression # type: ignore
1920

2021
from awswrangler import _data_types, _utils, catalog, exceptions
2122

@@ -1450,7 +1451,9 @@ def _read_text_chunksize(
14501451
fs: s3fs.S3FileSystem = _utils.get_fs(session=boto3_session, s3_additional_kwargs=s3_additional_kwargs)
14511452
for path in paths:
14521453
_logger.debug(f"path: {path}")
1453-
with fs.open(path, "r") as f:
1454+
if pandas_args.get("compression", "infer") == "infer":
1455+
pandas_args["compression"] = infer_compression(path, compression="infer")
1456+
with fs.open(path, "rb") as f:
14541457
reader: pandas.io.parsers.TextFileReader = parser_func(f, chunksize=chunksize, **pandas_args)
14551458
for df in reader:
14561459
yield df
@@ -1464,7 +1467,9 @@ def _read_text_full(
14641467
s3_additional_kwargs: Optional[Dict[str, str]] = None,
14651468
) -> pd.DataFrame:
14661469
fs: s3fs.S3FileSystem = _utils.get_fs(session=boto3_session, s3_additional_kwargs=s3_additional_kwargs)
1467-
with fs.open(path, "r") as f:
1470+
if pandas_args.get("compression", "infer") == "infer":
1471+
pandas_args["compression"] = infer_compression(path, compression="infer")
1472+
with fs.open(path, "rb") as f:
14681473
return parser_func(f, **pandas_args)
14691474

14701475

testing/test_awswrangler/test_data_lake.py

Lines changed: 69 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,9 @@
1+
import bz2
12
import datetime
3+
import gzip
24
import logging
5+
import lzma
6+
from io import BytesIO, TextIOWrapper
37

48
import boto3
59
import pandas as pd
@@ -846,7 +850,7 @@ def test_athena_types(bucket, database):
846850

847851

848852
def test_parquet_catalog_columns(bucket, database):
849-
path = f"s3://{bucket}/test_parquet_catalog_columns /"
853+
path = f"s3://{bucket}/test_parquet_catalog_columns/"
850854
paths = wr.s3.to_parquet(
851855
df=get_df_csv()[["id", "date", "timestamp", "par0", "par1"]],
852856
path=path,
@@ -889,3 +893,67 @@ def test_parquet_catalog_columns(bucket, database):
889893

890894
wr.s3.delete_objects(path=path)
891895
assert wr.catalog.delete_table_if_exists(database=database, table="test_parquet_catalog_columns") is True
896+
897+
898+
@pytest.mark.parametrize("compression", [None, "gzip", "snappy"])
899+
def test_parquet_compress(bucket, database, compression):
900+
path = f"s3://{bucket}/test_parquet_compress_{compression}/"
901+
paths = wr.s3.to_parquet(
902+
df=get_df(),
903+
path=path,
904+
compression=compression,
905+
dataset=True,
906+
database=database,
907+
table=f"test_parquet_compress_{compression}",
908+
mode="overwrite",
909+
)["paths"]
910+
wr.s3.wait_objects_exist(paths=paths)
911+
df2 = wr.athena.read_sql_table(f"test_parquet_compress_{compression}", database)
912+
ensure_data_types(df2)
913+
df2 = wr.s3.read_parquet(path=path)
914+
wr.s3.delete_objects(path=path)
915+
assert wr.catalog.delete_table_if_exists(database=database, table=f"test_parquet_compress_{compression}") is True
916+
ensure_data_types(df2)
917+
918+
919+
@pytest.mark.parametrize("compression", ["gzip", "bz2", "xz"])
920+
def test_csv_compress(bucket, compression):
921+
path = f"s3://{bucket}/test_csv_compress_{compression}/"
922+
wr.s3.delete_objects(path=path)
923+
df = get_df_csv()
924+
if compression == "gzip":
925+
buffer = BytesIO()
926+
with gzip.GzipFile(mode="w", fileobj=buffer) as zipped_file:
927+
df.to_csv(TextIOWrapper(zipped_file, "utf8"), index=False, header=None)
928+
s3_resource = boto3.resource("s3")
929+
s3_object = s3_resource.Object(bucket, f"test_csv_compress_{compression}/test.csv.gz")
930+
s3_object.put(Body=buffer.getvalue())
931+
file_path = f"s3://{bucket}/test_csv_compress_{compression}/test.csv.gz"
932+
elif compression == "bz2":
933+
buffer = BytesIO()
934+
with bz2.BZ2File(mode="w", filename=buffer) as zipped_file:
935+
df.to_csv(TextIOWrapper(zipped_file, "utf8"), index=False, header=None)
936+
s3_resource = boto3.resource("s3")
937+
s3_object = s3_resource.Object(bucket, f"test_csv_compress_{compression}/test.csv.bz2")
938+
s3_object.put(Body=buffer.getvalue())
939+
file_path = f"s3://{bucket}/test_csv_compress_{compression}/test.csv.bz2"
940+
elif compression == "xz":
941+
buffer = BytesIO()
942+
with lzma.LZMAFile(mode="w", filename=buffer) as zipped_file:
943+
df.to_csv(TextIOWrapper(zipped_file, "utf8"), index=False, header=None)
944+
s3_resource = boto3.resource("s3")
945+
s3_object = s3_resource.Object(bucket, f"test_csv_compress_{compression}/test.csv.xz")
946+
s3_object.put(Body=buffer.getvalue())
947+
file_path = f"s3://{bucket}/test_csv_compress_{compression}/test.csv.xz"
948+
else:
949+
file_path = f"s3://{bucket}/test_csv_compress_{compression}/test.csv"
950+
wr.s3.to_csv(df=df, path=file_path, index=False, header=None)
951+
952+
wr.s3.wait_objects_exist(paths=[file_path])
953+
df2 = wr.s3.read_csv(path=[file_path], names=df.columns)
954+
assert len(df2.index) == 3
955+
assert len(df2.columns) == 10
956+
dfs = wr.s3.read_csv(path=[file_path], names=df.columns, chunksize=1)
957+
for df3 in dfs:
958+
assert len(df3.columns) == 10
959+
wr.s3.delete_objects(path=path)

0 commit comments

Comments
 (0)