Skip to content

Commit 2fe891d

Browse files
committed
Updated implementation of “overwrite_files” mode
1 parent 74e51c7 commit 2fe891d

File tree

6 files changed

+16
-15
lines changed

6 files changed

+16
-15
lines changed

awswrangler/_utils.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -833,7 +833,7 @@ def block_waiting_available_thread(seq: Sequence[Future], max_workers: int) -> N
833833

834834
def check_schema_changes(columns_types: dict[str, str], table_input: dict[str, Any] | None, mode: str) -> None:
835835
"""Check schema changes."""
836-
if (table_input is not None) and (mode in ("append", "overwrite_partitions")):
836+
if (table_input is not None) and (mode in ("append", "overwrite_partitions", "overwrite_files")):
837837
catalog_cols: dict[str, str] = {x["Name"]: x["Type"] for x in table_input["StorageDescriptor"]["Columns"]}
838838
for c, t in columns_types.items():
839839
if c not in catalog_cols:

awswrangler/catalog/_create.py

Lines changed: 10 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -31,7 +31,7 @@ def _update_if_necessary(
3131
if value is not None:
3232
if key not in dic or dic[key] != value:
3333
dic[key] = value
34-
if mode in ("append", "overwrite_partitions"):
34+
if mode in ("append", "overwrite_partitions", "overwrite_files"):
3535
return "update"
3636
return mode
3737

@@ -150,9 +150,10 @@ def _create_table( # noqa: PLR0912,PLR0915
150150

151151
client_glue = _utils.client(service_name="glue", session=boto3_session)
152152
skip_archive: bool = not catalog_versioning
153-
if mode not in ("overwrite", "append", "overwrite_partitions", "update"):
153+
if mode not in ("overwrite", "append", "overwrite_partitions", "overwrite_files", "update"):
154154
raise exceptions.InvalidArgument(
155-
f"{mode} is not a valid mode. It must be 'overwrite', 'append' or 'overwrite_partitions'."
155+
f"{mode} is not a valid mode. It must be 'overwrite', "
156+
f"'append', 'overwrite_partitions' or 'overwrite_files'."
156157
)
157158
args: dict[str, Any] = _catalog_id(
158159
catalog_id=catalog_id,
@@ -304,7 +305,7 @@ def _create_parquet_table(
304305
_logger.debug("catalog_table_input: %s", catalog_table_input)
305306

306307
table_input: dict[str, Any]
307-
if (catalog_table_input is not None) and (mode in ("append", "overwrite_partitions")):
308+
if (catalog_table_input is not None) and (mode in ("append", "overwrite_partitions", "overwrite_files")):
308309
table_input = catalog_table_input
309310

310311
is_table_updated = _update_table_input(table_input, columns_types)
@@ -366,7 +367,7 @@ def _create_orc_table(
366367
_logger.debug("catalog_table_input: %s", catalog_table_input)
367368

368369
table_input: dict[str, Any]
369-
if (catalog_table_input is not None) and (mode in ("append", "overwrite_partitions")):
370+
if (catalog_table_input is not None) and (mode in ("append", "overwrite_partitions", "overwrite_files")):
370371
table_input = catalog_table_input
371372

372373
is_table_updated = _update_table_input(table_input, columns_types)
@@ -436,7 +437,7 @@ def _create_csv_table(
436437
_utils.check_schema_changes(columns_types=columns_types, table_input=catalog_table_input, mode=mode)
437438

438439
table_input: dict[str, Any]
439-
if (catalog_table_input is not None) and (mode in ("append", "overwrite_partitions")):
440+
if (catalog_table_input is not None) and (mode in ("append", "overwrite_partitions", "overwrite_files")):
440441
table_input = catalog_table_input
441442

442443
is_table_updated = _update_table_input(table_input, columns_types, allow_reorder=False)
@@ -508,7 +509,7 @@ def _create_json_table(
508509
table_input: dict[str, Any]
509510
if schema_evolution is False:
510511
_utils.check_schema_changes(columns_types=columns_types, table_input=catalog_table_input, mode=mode)
511-
if (catalog_table_input is not None) and (mode in ("append", "overwrite_partitions")):
512+
if (catalog_table_input is not None) and (mode in ("append", "overwrite_partitions", "overwrite_files")):
512513
table_input = catalog_table_input
513514

514515
is_table_updated = _update_table_input(table_input, columns_types)
@@ -1098,7 +1099,7 @@ def create_csv_table(
10981099
If True and `mode="overwrite"`, creates an archived version of the table catalog before updating it.
10991100
schema_evolution
11001101
If True allows schema evolution (new or missing columns), otherwise a exception will be raised.
1101-
(Only considered if dataset=True and mode in ("append", "overwrite_partitions"))
1102+
(Only considered if dataset=True and mode in ("append", "overwrite_partitions", "overwrite_files"))
11021103
Related tutorial:
11031104
https://aws-sdk-pandas.readthedocs.io/en/3.11.0/tutorials/014%20-%20Schema%20Evolution.html
11041105
sep
@@ -1278,7 +1279,7 @@ def create_json_table(
12781279
If True and `mode="overwrite"`, creates an archived version of the table catalog before updating it.
12791280
schema_evolution
12801281
If True allows schema evolution (new or missing columns), otherwise a exception will be raised.
1281-
(Only considered if dataset=True and mode in ("append", "overwrite_partitions"))
1282+
(Only considered if dataset=True and mode in ("append", "overwrite_partitions", "overwrite_files"))
12821283
Related tutorial:
12831284
https://aws-sdk-pandas.readthedocs.io/en/3.11.0/tutorials/014%20-%20Schema%20Evolution.html
12841285
serde_library

awswrangler/s3/_write.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -47,7 +47,7 @@ def _extract_dtypes_from_table_input(table_input: dict[str, Any]) -> dict[str, s
4747
def _apply_dtype(
4848
df: pd.DataFrame, dtype: dict[str, str], catalog_table_input: dict[str, Any] | None, mode: str
4949
) -> pd.DataFrame:
50-
if mode in ("append", "overwrite_partitions"):
50+
if mode in ("append", "overwrite_partitions", "overwrite_files"):
5151
if catalog_table_input is not None:
5252
catalog_types: dict[str, str] | None = _extract_dtypes_from_table_input(table_input=catalog_table_input)
5353
if catalog_types is not None:

awswrangler/s3/_write_orc.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -414,7 +414,7 @@ def to_orc(
414414
If True and `mode="overwrite"`, creates an archived version of the table catalog before updating it.
415415
schema_evolution
416416
If True allows schema evolution (new or missing columns), otherwise a exception will be raised. True by default.
417-
(Only considered if dataset=True and mode in ("append", "overwrite_partitions"))
417+
(Only considered if dataset=True and mode in ("append", "overwrite_partitions", "overwrite_files"))
418418
Related tutorial:
419419
https://aws-sdk-pandas.readthedocs.io/en/3.11.0/tutorials/014%20-%20Schema%20Evolution.html
420420
database

awswrangler/s3/_write_parquet.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -444,7 +444,7 @@ def to_parquet(
444444
If True and `mode="overwrite"`, creates an archived version of the table catalog before updating it.
445445
schema_evolution
446446
If True allows schema evolution (new or missing columns), otherwise a exception will be raised. True by default.
447-
(Only considered if dataset=True and mode in ("append", "overwrite_partitions"))
447+
(Only considered if dataset=True and mode in ("append", "overwrite_partitions", "overwrite_files"))
448448
Related tutorial:
449449
https://aws-sdk-pandas.readthedocs.io/en/3.11.0/tutorials/014%20-%20Schema%20Evolution.html
450450
database

awswrangler/s3/_write_text.py

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -179,7 +179,7 @@ def to_csv( # noqa: PLR0912,PLR0915
179179
If True and `mode="overwrite"`, creates an archived version of the table catalog before updating it.
180180
schema_evolution
181181
If True allows schema evolution (new or missing columns), otherwise a exception will be raised.
182-
(Only considered if dataset=True and mode in ("append", "overwrite_partitions")). False by default.
182+
(Only considered if dataset=True and mode in ("append", "overwrite_partitions", "overwrite_files")). False by default.
183183
Related tutorial:
184184
https://aws-sdk-pandas.readthedocs.io/en/3.11.0/tutorials/014%20-%20Schema%20Evolution.html
185185
database
@@ -724,7 +724,7 @@ def to_json( # noqa: PLR0912,PLR0915
724724
If True and `mode="overwrite"`, creates an archived version of the table catalog before updating it.
725725
schema_evolution
726726
If True allows schema evolution (new or missing columns), otherwise a exception will be raised.
727-
(Only considered if dataset=True and mode in ("append", "overwrite_partitions"))
727+
(Only considered if dataset=True and mode in ("append", "overwrite_partitions", "overwrite_files"))
728728
Related tutorial:
729729
https://aws-sdk-pandas.readthedocs.io/en/3.11.0/tutorials/014%20-%20Schema%20Evolution.html
730730
database

0 commit comments

Comments
 (0)