Skip to content

Commit 9e59a0a

Browse files
committed
Fix s3.to_parquet() overwriting with different partition schema
1 parent 197f06f commit 9e59a0a

File tree

2 files changed

+47
-1
lines changed

2 files changed

+47
-1
lines changed

awswrangler/catalog.py

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -982,6 +982,12 @@ def _create_table(
982982
raise exceptions.InvalidArgument(f"{mode} is not a valid mode. It must be 'overwrite' or 'append'.")
983983
if (exist is True) and (mode == "overwrite"):
984984
skip_archive: bool = not catalog_versioning
985+
partitions_values: List[List[str]] = list(
986+
_get_partitions(database=database, table=table, boto3_session=session).values()
987+
)
988+
client_glue.batch_delete_partition(
989+
DatabaseName=database, TableName=table, PartitionsToDelete=[{"Values": v} for v in partitions_values]
990+
)
985991
client_glue.update_table(DatabaseName=database, TableInput=table_input, SkipArchive=skip_archive)
986992
elif exist is False:
987993
client_glue.create_table(DatabaseName=database, TableInput=table_input)

testing/test_awswrangler/test_data_lake.py

Lines changed: 41 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1370,8 +1370,8 @@ def test_copy_replacing_filename(bucket):
13701370

13711371

13721372
def test_unsigned_parquet(bucket, database, external_schema):
1373-
path = f"s3://{bucket}/test_unsigned_parquet/"
13741373
table = "test_unsigned_parquet"
1374+
path = f"s3://{bucket}/{table}/"
13751375
wr.s3.delete_objects(path=path)
13761376
df = pd.DataFrame({"c0": [0, 0, (2 ** 8) - 1], "c1": [0, 0, (2 ** 16) - 1], "c2": [0, 0, (2 ** 32) - 1]})
13771377
df["c0"] = df.c0.astype("uint8")
@@ -1436,3 +1436,43 @@ def test_parquet_uint64(bucket):
14361436
assert df.c3.max() == (2 ** 64) - 1
14371437
assert df.c4.astype("uint8").sum() == 3
14381438
wr.s3.delete_objects(path=path)
1439+
1440+
1441+
def test_parquet_overwrite_partition_cols(bucket, database, external_schema):
1442+
table = "test_parquet_overwrite_partition_cols"
1443+
path = f"s3://{bucket}/{table}/"
1444+
wr.s3.delete_objects(path=path)
1445+
df = pd.DataFrame({"c0": [1, 2, 1, 2], "c1": [1, 2, 1, 2], "c2": [2, 1, 2, 1]})
1446+
1447+
paths = wr.s3.to_parquet(
1448+
df=df, path=path, dataset=True, database=database, table=table, mode="overwrite", partition_cols=["c2"]
1449+
)["paths"]
1450+
wr.s3.wait_objects_exist(paths=paths, use_threads=False)
1451+
df = wr.athena.read_sql_table(table=table, database=database)
1452+
assert len(df.index) == 4
1453+
assert len(df.columns) == 3
1454+
assert df.c0.sum() == 6
1455+
assert df.c1.sum() == 6
1456+
assert df.c2.sum() == 6
1457+
1458+
paths = wr.s3.to_parquet(
1459+
df=df, path=path, dataset=True, database=database, table=table, mode="overwrite", partition_cols=["c1", "c2"]
1460+
)["paths"]
1461+
wr.s3.wait_objects_exist(paths=paths, use_threads=False)
1462+
df = wr.athena.read_sql_table(table=table, database=database)
1463+
assert len(df.index) == 4
1464+
assert len(df.columns) == 3
1465+
assert df.c0.sum() == 6
1466+
assert df.c1.sum() == 6
1467+
assert df.c2.sum() == 6
1468+
1469+
engine = wr.catalog.get_engine("aws-data-wrangler-redshift")
1470+
df = wr.db.read_sql_table(con=engine, table=table, schema=external_schema)
1471+
assert len(df.index) == 4
1472+
assert len(df.columns) == 3
1473+
assert df.c0.sum() == 6
1474+
assert df.c1.sum() == 6
1475+
assert df.c2.sum() == 6
1476+
1477+
wr.s3.delete_objects(path=path)
1478+
wr.catalog.delete_table_if_exists(database=database, table=table)

0 commit comments

Comments
 (0)