Skip to content
Merged
8 changes: 8 additions & 0 deletions awswrangler/s3/_read_parquet.py
Original file line number Diff line number Diff line change
Expand Up @@ -311,6 +311,14 @@ def _read_parquet(
itertools.repeat(schema),
itertools.repeat(decryption_properties),
)
# When the first table is empty in a dataset, the inferred schema may not
# be compatible with the other tables, which will raise an exception when
# concatening them down the line. As a workaround, we filter out empty
# tables, unless every table is empty. In that latter case, the schemas
# will be compatible so we do nothing in that case.
should_filter_out = any(len(table) > 0 for table in tables)
if should_filter_out:
tables = [table for table in tables if len(table) > 0]
return _utils.table_refs_to_df(tables, kwargs=arrow_kwargs)


Expand Down
41 changes: 41 additions & 0 deletions tests/unit/test_moto.py
Original file line number Diff line number Diff line change
Expand Up @@ -485,6 +485,47 @@ def test_s3_delete_object_success(moto_s3_client: "S3Client") -> None:
wr.s3.read_parquet(path=path, dataset=True)


@pytest.mark.parametrize("chunked", [True, False])
def test_s3_parquet_empty_table(moto_s3_client: "S3Client", chunked) -> None:
path = "s3://bucket/file.parquet"

r_df = pd.DataFrame({"id": []}, dtype=pd.Int64Dtype())
wr.s3.to_parquet(df=r_df, path=path)

df = wr.s3.read_parquet(path, chunked=chunked)
if chunked:
df = pd.concat(list(df))

pd.testing.assert_frame_equal(r_df, df, check_dtype=True)


def test_s3_dataset_empty_table(moto_s3_client: "S3Client") -> None:
"""Test that a dataset split into multiple parquet files whose first
partition is an empty table still loads properly.
"""
partition_col, partition_val = "col0", "1"
dataset = f"{partition_col}={partition_val}"
s3_key = f"s3://bucket/{dataset}"

dtypes = {"id": "string[python]"}
df1 = pd.DataFrame({"id": []}).astype(dtypes)
df2 = pd.DataFrame({"id": ["1"] * 2}).astype(dtypes)
df3 = pd.DataFrame({"id": ["1"] * 3}).astype(dtypes)

dataframes = [df1, df2, df3]
r_df = pd.concat(dataframes, ignore_index=True)
r_df = r_df.assign(col0=pd.Categorical([partition_val] * len(r_df)))

for i, df in enumerate(dataframes):
wr.s3.to_parquet(
df=df,
path=f"{s3_key}/part{i}.parquet",
)

result_df = wr.s3.read_parquet(path=s3_key, dataset=True)
pd.testing.assert_frame_equal(result_df, r_df, check_dtype=True)


def test_s3_raise_delete_object_exception_success(moto_s3_client: "S3Client") -> None:
path = "s3://bucket/test.parquet"
wr.s3.to_parquet(df=get_df_list(), path=path, index=False, dataset=True, partition_cols=["par0", "par1"])
Expand Down
Loading