Skip to content

Commit cc5618c

Browse files
committed
Add suffix filters for read_parquet_table() #495
1 parent ecc7e97 commit cc5618c

File tree

2 files changed

+39
-0
lines changed

2 files changed

+39
-0
lines changed

awswrangler/s3/_read_parquet.py

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -614,6 +614,8 @@ def read_parquet(
614614
def read_parquet_table(
615615
table: str,
616616
database: str,
617+
filename_suffix: Union[str, List[str], None] = None,
618+
filename_ignore_suffix: Union[str, List[str], None] = None,
617619
catalog_id: Optional[str] = None,
618620
partition_filter: Optional[Callable[[Dict[str, str]], bool]] = None,
619621
columns: Optional[List[str]] = None,
@@ -655,6 +657,12 @@ def read_parquet_table(
655657
AWS Glue Catalog table name.
656658
database : str
657659
AWS Glue Catalog database name.
660+
path_suffix: Union[str, List[str], None]
661+
Suffix or List of suffixes to be read (e.g. [".gz.parquet", ".snappy.parquet"]).
662+
If None, will try to read all files. (default)
663+
path_ignore_suffix: Union[str, List[str], None]
664+
Suffix or List of suffixes for S3 keys to be ignored.(e.g. [".csv", "_SUCCESS"]).
665+
If None, will try to read all files. (default)
658666
catalog_id : str, optional
659667
The ID of the Data Catalog from which to retrieve Databases.
660668
If none is provided, the AWS account ID is used by default.
@@ -741,6 +749,8 @@ def read_parquet_table(
741749
return _data_types.cast_pandas_with_athena_types(
742750
df=read_parquet(
743751
path=path,
752+
path_suffix=filename_suffix,
753+
path_ignore_suffix=filename_ignore_suffix,
744754
partition_filter=partition_filter,
745755
columns=columns,
746756
validate_schema=validate_schema,

tests/test_athena_parquet.py

Lines changed: 29 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -707,3 +707,32 @@ def test_to_parquet_nested_structs(glue_database, glue_table, path):
707707
wr.s3.to_parquet(df=df, path=path, dataset=True, database=glue_database, table=glue_table)
708708
df3 = wr.athena.read_sql_query(sql=f"SELECT * FROM {glue_table}", database=glue_database)
709709
assert df3.shape == (2, 2)
710+
711+
712+
def test_ignore_empty_files(glue_database, glue_table, path):
713+
df = pd.DataFrame({"c0": [0, 1], "c1": ["foo", "boo"]})
714+
bucket, directory = wr._utils.parse_path(path)
715+
wr.s3.to_parquet(df=df, path=path, dataset=True, database=glue_database, table=glue_table)
716+
boto3.client("s3").put_object(Body=b"", Bucket=bucket, Key=f"{directory}to_be_ignored")
717+
df2 = wr.athena.read_sql_query(sql=f"SELECT * FROM {glue_table}", database=glue_database)
718+
assert df2.shape == df.shape
719+
df3 = wr.s3.read_parquet_table(database=glue_database, table=glue_table)
720+
assert df3.shape == df.shape
721+
722+
723+
def test_suffix(glue_database, glue_table, path):
724+
df = pd.DataFrame({"c0": [0, 1], "c1": ["foo", "boo"]})
725+
bucket, directory = wr._utils.parse_path(path)
726+
wr.s3.to_parquet(df=df, path=path, dataset=True, database=glue_database, table=glue_table)
727+
boto3.client("s3").put_object(Body=b"garbage", Bucket=bucket, Key=f"{directory}to_be_ignored")
728+
df2 = wr.s3.read_parquet_table(database=glue_database, table=glue_table, filename_suffix=".parquet")
729+
assert df2.shape == df.shape
730+
731+
732+
def test_ignore_suffix(glue_database, glue_table, path):
733+
df = pd.DataFrame({"c0": [0, 1], "c1": ["foo", "boo"]})
734+
bucket, directory = wr._utils.parse_path(path)
735+
wr.s3.to_parquet(df=df, path=path, dataset=True, database=glue_database, table=glue_table)
736+
boto3.client("s3").put_object(Body=b"garbage", Bucket=bucket, Key=f"{directory}to_be_ignored")
737+
df2 = wr.s3.read_parquet_table(database=glue_database, table=glue_table, filename_ignore_suffix="ignored")
738+
assert df2.shape == df.shape

0 commit comments

Comments
 (0)