|
| 1 | +import bz2 |
1 | 2 | import datetime |
| 3 | +import gzip |
2 | 4 | import logging |
| 5 | +import lzma |
| 6 | +from io import BytesIO, TextIOWrapper |
3 | 7 |
|
4 | 8 | import boto3 |
5 | 9 | import pandas as pd |
@@ -846,7 +850,7 @@ def test_athena_types(bucket, database): |
846 | 850 |
|
847 | 851 |
|
848 | 852 | def test_parquet_catalog_columns(bucket, database): |
849 | | - path = f"s3://{bucket}/test_parquet_catalog_columns /" |
| 853 | + path = f"s3://{bucket}/test_parquet_catalog_columns/" |
850 | 854 | paths = wr.s3.to_parquet( |
851 | 855 | df=get_df_csv()[["id", "date", "timestamp", "par0", "par1"]], |
852 | 856 | path=path, |
@@ -889,3 +893,67 @@ def test_parquet_catalog_columns(bucket, database): |
889 | 893 |
|
890 | 894 | wr.s3.delete_objects(path=path) |
891 | 895 | assert wr.catalog.delete_table_if_exists(database=database, table="test_parquet_catalog_columns") is True |
| 896 | + |
| 897 | + |
| 898 | +@pytest.mark.parametrize("compression", [None, "gzip", "snappy"]) |
| 899 | +def test_parquet_compress(bucket, database, compression): |
| 900 | + path = f"s3://{bucket}/test_parquet_compress_{compression}/" |
| 901 | + paths = wr.s3.to_parquet( |
| 902 | + df=get_df(), |
| 903 | + path=path, |
| 904 | + compression=compression, |
| 905 | + dataset=True, |
| 906 | + database=database, |
| 907 | + table=f"test_parquet_compress_{compression}", |
| 908 | + mode="overwrite", |
| 909 | + )["paths"] |
| 910 | + wr.s3.wait_objects_exist(paths=paths) |
| 911 | + df2 = wr.athena.read_sql_table(f"test_parquet_compress_{compression}", database) |
| 912 | + ensure_data_types(df2) |
| 913 | + df2 = wr.s3.read_parquet(path=path) |
| 914 | + wr.s3.delete_objects(path=path) |
| 915 | + assert wr.catalog.delete_table_if_exists(database=database, table=f"test_parquet_compress_{compression}") is True |
| 916 | + ensure_data_types(df2) |
| 917 | + |
| 918 | + |
| 919 | +@pytest.mark.parametrize("compression", ["gzip", "bz2", "xz"]) |
| 920 | +def test_csv_compress(bucket, compression): |
| 921 | + path = f"s3://{bucket}/test_csv_compress_{compression}/" |
| 922 | + wr.s3.delete_objects(path=path) |
| 923 | + df = get_df_csv() |
| 924 | + if compression == "gzip": |
| 925 | + buffer = BytesIO() |
| 926 | + with gzip.GzipFile(mode="w", fileobj=buffer) as zipped_file: |
| 927 | + df.to_csv(TextIOWrapper(zipped_file, "utf8"), index=False, header=None) |
| 928 | + s3_resource = boto3.resource("s3") |
| 929 | + s3_object = s3_resource.Object(bucket, f"test_csv_compress_{compression}/test.csv.gz") |
| 930 | + s3_object.put(Body=buffer.getvalue()) |
| 931 | + file_path = f"s3://{bucket}/test_csv_compress_{compression}/test.csv.gz" |
| 932 | + elif compression == "bz2": |
| 933 | + buffer = BytesIO() |
| 934 | + with bz2.BZ2File(mode="w", filename=buffer) as zipped_file: |
| 935 | + df.to_csv(TextIOWrapper(zipped_file, "utf8"), index=False, header=None) |
| 936 | + s3_resource = boto3.resource("s3") |
| 937 | + s3_object = s3_resource.Object(bucket, f"test_csv_compress_{compression}/test.csv.bz2") |
| 938 | + s3_object.put(Body=buffer.getvalue()) |
| 939 | + file_path = f"s3://{bucket}/test_csv_compress_{compression}/test.csv.bz2" |
| 940 | + elif compression == "xz": |
| 941 | + buffer = BytesIO() |
| 942 | + with lzma.LZMAFile(mode="w", filename=buffer) as zipped_file: |
| 943 | + df.to_csv(TextIOWrapper(zipped_file, "utf8"), index=False, header=None) |
| 944 | + s3_resource = boto3.resource("s3") |
| 945 | + s3_object = s3_resource.Object(bucket, f"test_csv_compress_{compression}/test.csv.xz") |
| 946 | + s3_object.put(Body=buffer.getvalue()) |
| 947 | + file_path = f"s3://{bucket}/test_csv_compress_{compression}/test.csv.xz" |
| 948 | + else: |
| 949 | + file_path = f"s3://{bucket}/test_csv_compress_{compression}/test.csv" |
| 950 | + wr.s3.to_csv(df=df, path=file_path, index=False, header=None) |
| 951 | + |
| 952 | + wr.s3.wait_objects_exist(paths=[file_path]) |
| 953 | + df2 = wr.s3.read_csv(path=[file_path], names=df.columns) |
| 954 | + assert len(df2.index) == 3 |
| 955 | + assert len(df2.columns) == 10 |
| 956 | + dfs = wr.s3.read_csv(path=[file_path], names=df.columns, chunksize=1) |
| 957 | + for df3 in dfs: |
| 958 | + assert len(df3.columns) == 10 |
| 959 | + wr.s3.delete_objects(path=path) |
0 commit comments