Skip to content

Commit 678262b

Browse files
committed
Cleaning up written files if catalog write failed.
1 parent 630b420 commit 678262b

File tree

4 files changed

+97
-59
lines changed

4 files changed

+97
-59
lines changed

awswrangler/s3/_write_parquet.py

Lines changed: 33 additions & 27 deletions
Original file line numberDiff line numberDiff line change
@@ -14,6 +14,7 @@
1414

1515
from awswrangler import _data_types, _utils, catalog, exceptions
1616
from awswrangler._config import apply_configs
17+
from awswrangler.s3._delete import delete_objects
1718
from awswrangler.s3._fs import open_s3_object
1819
from awswrangler.s3._read_parquet import _read_parquet_metadata
1920
from awswrangler.s3._write import _COMPRESSION_2_EXT, _apply_dtype, _sanitize, _validate_args
@@ -531,39 +532,44 @@ def to_parquet( # pylint: disable=too-many-arguments,too-many-locals
531532
max_rows_by_file=max_rows_by_file,
532533
)
533534
if (database is not None) and (table is not None):
534-
catalog._create_parquet_table( # pylint: disable=protected-access
535-
database=database,
536-
table=table,
537-
path=path,
538-
columns_types=columns_types,
539-
partitions_types=partitions_types,
540-
compression=compression,
541-
description=description,
542-
parameters=parameters,
543-
columns_comments=columns_comments,
544-
boto3_session=session,
545-
mode=mode,
546-
catalog_versioning=catalog_versioning,
547-
projection_enabled=projection_enabled,
548-
projection_types=projection_types,
549-
projection_ranges=projection_ranges,
550-
projection_values=projection_values,
551-
projection_intervals=projection_intervals,
552-
projection_digits=projection_digits,
553-
catalog_id=catalog_id,
554-
catalog_table_input=catalog_table_input,
555-
)
556-
if partitions_values and (regular_partitions is True):
557-
_logger.debug("partitions_values:\n%s", partitions_values)
558-
catalog.add_parquet_partitions(
535+
try:
536+
catalog._create_parquet_table( # pylint: disable=protected-access
559537
database=database,
560538
table=table,
561-
partitions_values=partitions_values,
539+
path=path,
540+
columns_types=columns_types,
541+
partitions_types=partitions_types,
562542
compression=compression,
543+
description=description,
544+
parameters=parameters,
545+
columns_comments=columns_comments,
563546
boto3_session=session,
547+
mode=mode,
548+
catalog_versioning=catalog_versioning,
549+
projection_enabled=projection_enabled,
550+
projection_types=projection_types,
551+
projection_ranges=projection_ranges,
552+
projection_values=projection_values,
553+
projection_intervals=projection_intervals,
554+
projection_digits=projection_digits,
564555
catalog_id=catalog_id,
565-
columns_types=columns_types,
556+
catalog_table_input=catalog_table_input,
566557
)
558+
if partitions_values and (regular_partitions is True):
559+
_logger.debug("partitions_values:\n%s", partitions_values)
560+
catalog.add_parquet_partitions(
561+
database=database,
562+
table=table,
563+
partitions_values=partitions_values,
564+
compression=compression,
565+
boto3_session=session,
566+
catalog_id=catalog_id,
567+
columns_types=columns_types,
568+
)
569+
except Exception:
570+
_logger.debug("Catalog write failed, cleaning up S3 (paths: %s).", paths)
571+
delete_objects(path=paths, use_threads=use_threads, boto3_session=session)
572+
raise
567573
return {"paths": paths, "partitions_values": partitions_values}
568574

569575

awswrangler/s3/_write_text.py

Lines changed: 38 additions & 32 deletions
Original file line numberDiff line numberDiff line change
@@ -10,6 +10,7 @@
1010

1111
from awswrangler import _data_types, _utils, catalog, exceptions
1212
from awswrangler._config import apply_configs
13+
from awswrangler.s3._delete import delete_objects
1314
from awswrangler.s3._fs import open_s3_object
1415
from awswrangler.s3._write import _apply_dtype, _sanitize, _validate_args
1516
from awswrangler.s3._write_dataset import _to_dataset
@@ -406,44 +407,49 @@ def to_csv( # pylint: disable=too-many-arguments,too-many-locals
406407
date_format="%Y-%m-%d %H:%M:%S.%f",
407408
)
408409
if (database is not None) and (table is not None):
409-
columns_types, partitions_types = _data_types.athena_types_from_pandas_partitioned(
410-
df=df, index=index, partition_cols=partition_cols, dtype=dtype, index_left=True
411-
)
412-
catalog._create_csv_table( # pylint: disable=protected-access
413-
database=database,
414-
table=table,
415-
path=path,
416-
columns_types=columns_types,
417-
partitions_types=partitions_types,
418-
description=description,
419-
parameters=parameters,
420-
columns_comments=columns_comments,
421-
boto3_session=session,
422-
mode=mode,
423-
catalog_versioning=catalog_versioning,
424-
sep=sep,
425-
projection_enabled=projection_enabled,
426-
projection_types=projection_types,
427-
projection_ranges=projection_ranges,
428-
projection_values=projection_values,
429-
projection_intervals=projection_intervals,
430-
projection_digits=projection_digits,
431-
catalog_table_input=catalog_table_input,
432-
catalog_id=catalog_id,
433-
compression=None,
434-
skip_header_line_count=None,
435-
)
436-
if partitions_values and (regular_partitions is True):
437-
_logger.debug("partitions_values:\n%s", partitions_values)
438-
catalog.add_csv_partitions(
410+
try:
411+
columns_types, partitions_types = _data_types.athena_types_from_pandas_partitioned(
412+
df=df, index=index, partition_cols=partition_cols, dtype=dtype, index_left=True
413+
)
414+
catalog._create_csv_table( # pylint: disable=protected-access
439415
database=database,
440416
table=table,
441-
partitions_values=partitions_values,
417+
path=path,
418+
columns_types=columns_types,
419+
partitions_types=partitions_types,
420+
description=description,
421+
parameters=parameters,
422+
columns_comments=columns_comments,
442423
boto3_session=session,
424+
mode=mode,
425+
catalog_versioning=catalog_versioning,
443426
sep=sep,
427+
projection_enabled=projection_enabled,
428+
projection_types=projection_types,
429+
projection_ranges=projection_ranges,
430+
projection_values=projection_values,
431+
projection_intervals=projection_intervals,
432+
projection_digits=projection_digits,
433+
catalog_table_input=catalog_table_input,
444434
catalog_id=catalog_id,
445-
columns_types=columns_types,
435+
compression=None,
436+
skip_header_line_count=None,
446437
)
438+
if partitions_values and (regular_partitions is True):
439+
_logger.debug("partitions_values:\n%s", partitions_values)
440+
catalog.add_csv_partitions(
441+
database=database,
442+
table=table,
443+
partitions_values=partitions_values,
444+
boto3_session=session,
445+
sep=sep,
446+
catalog_id=catalog_id,
447+
columns_types=columns_types,
448+
)
449+
except Exception:
450+
_logger.debug("Catalog write failed, cleaning up S3 (paths: %s).", paths)
451+
delete_objects(path=paths, use_threads=use_threads, boto3_session=session)
452+
raise
447453
return {"paths": paths, "partitions_values": partitions_values}
448454

449455

tests/test_athena_csv.py

Lines changed: 13 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,7 @@
11
import logging
2+
import time
23

4+
import boto3
35
import pandas as pd
46
import pytest
57

@@ -386,3 +388,14 @@ def test_mixed_types_column(path, glue_table, glue_database, use_threads):
386388
wr.s3.to_csv(
387389
df, path, index=False, dataset=True, table=glue_table, database=glue_database, partition_cols=["par"]
388390
)
391+
392+
393+
@pytest.mark.parametrize("use_threads", [True, False])
394+
def test_failing_catalog(path, glue_table, glue_database, use_threads):
395+
df = pd.DataFrame({"c0": [1, 2, 3]})
396+
try:
397+
wr.s3.to_csv(df, path, dataset=True, table=glue_table, database="foo")
398+
except boto3.client("glue").exceptions.EntityNotFoundException:
399+
pass
400+
time.sleep(3)
401+
assert len(wr.s3.list_objects(path)) == 0

tests/test_athena_parquet.py

Lines changed: 13 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,9 @@
11
import datetime
22
import logging
33
import math
4+
import time
45

6+
import boto3
57
import numpy as np
68
import pandas as pd
79
import pytest
@@ -645,3 +647,14 @@ def test_mixed_types_column(path, glue_table, glue_database, use_threads):
645647
df["par"] = df["par"].astype("string")
646648
with pytest.raises(TypeError):
647649
wr.s3.to_parquet(df, path, dataset=True, table=glue_table, database=glue_database, partition_cols=["par"])
650+
651+
652+
@pytest.mark.parametrize("use_threads", [True, False])
653+
def test_failing_catalog(path, glue_table, glue_database, use_threads):
654+
df = pd.DataFrame({"c0": [1, 2, 3]})
655+
try:
656+
wr.s3.to_parquet(df, path, max_rows_by_file=1, dataset=True, table=glue_table, database="foo")
657+
except boto3.client("glue").exceptions.EntityNotFoundException:
658+
pass
659+
time.sleep(3)
660+
assert len(wr.s3.list_objects(path)) == 0

0 commit comments

Comments
 (0)