Skip to content

Commit 778b386

Browse files
author
David Cournapeau
committed
BUG: fix read_parquet with dataset=True when the first partition is empty.
When reading a set of parquet files with dataset=True, if the first partition is empty the current logic for dtype inference will fail. It ill raise exceptions as follows: ``` pyarrow.lib.ArrowTypeError: Unable to merge: Field col0 has incompatible types: dictionary<values=null, indices=int32, ordered=0> vs dictionary<values=string, indices=int32, ordered=0 ``` To fix this, we filter out empty table(s) before merging them into one parquet file.
1 parent 635f6d5 commit 778b386

File tree

2 files changed

+28
-0
lines changed

2 files changed

+28
-0
lines changed

awswrangler/s3/_read_parquet.py

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -308,6 +308,7 @@ def _read_parquet(
308308
itertools.repeat(schema),
309309
itertools.repeat(decryption_properties),
310310
)
311+
tables = [table for table in tables if len(table) > 0]
311312
return _utils.table_refs_to_df(tables, kwargs=arrow_kwargs)
312313

313314

tests/unit/test_moto.py

Lines changed: 27 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -485,6 +485,33 @@ def test_s3_delete_object_success(moto_s3_client: "S3Client") -> None:
485485
wr.s3.read_parquet(path=path, dataset=True)
486486

487487

488+
def test_s3_dataset_empty_table(moto_s3_client: "S3Client") -> None:
489+
""" Test that a dataset split into multiple parquet files whose first
490+
partition is an empty table still loads properly.
491+
"""
492+
partition_col, partition_val = "col0", "1"
493+
dataset = f"{partition_col}={partition_val}"
494+
s3_key = f's3://bucket/{dataset}'
495+
496+
dtypes = {"id": "string[python]"}
497+
df1 = pd.DataFrame({"id": []}).astype(dtypes)
498+
df2 = pd.DataFrame({"id": ['1'] * 2}).astype(dtypes)
499+
df3 = pd.DataFrame({"id": ['1'] * 3}).astype(dtypes)
500+
501+
dataframes = [df1, df2, df3]
502+
r_df = pd.concat(dataframes, ignore_index=True)
503+
r_df = r_df.assign(col0=pd.Categorical([partition_val] * len(r_df)))
504+
505+
for i, df in enumerate(dataframes):
506+
wr.s3.to_parquet(
507+
df=df,
508+
path=f"{s3_key}/part{i}.parquet",
509+
)
510+
511+
result_df = wr.s3.read_parquet(path=s3_key, dataset=True)
512+
pd.testing.assert_frame_equal(result_df, r_df, check_dtype=True)
513+
514+
488515
def test_s3_raise_delete_object_exception_success(moto_s3_client: "S3Client") -> None:
489516
path = "s3://bucket/test.parquet"
490517
wr.s3.to_parquet(df=get_df_list(), path=path, index=False, dataset=True, partition_cols=["par0", "par1"])

0 commit comments

Comments
 (0)