Skip to content

Commit fe6f50b

Browse files
committed
Removing delete_table operations from catalog._create_table() and add catalog_versioning arg. #198
1 parent 10ea9e8 commit fe6f50b

File tree

3 files changed

+99
-7
lines changed

3 files changed

+99
-7
lines changed

awswrangler/catalog.py

Lines changed: 20 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -93,6 +93,7 @@ def create_parquet_table(
9393
parameters: Optional[Dict[str, str]] = None,
9494
columns_comments: Optional[Dict[str, str]] = None,
9595
mode: str = "overwrite",
96+
catalog_versioning: bool = False,
9697
boto3_session: Optional[boto3.Session] = None,
9798
) -> None:
9899
"""Create a Parquet Table (Metadata Only) in the AWS Glue Catalog.
@@ -121,6 +122,8 @@ def create_parquet_table(
121122
Columns names and the related comments (e.g. {'col0': 'Column 0.', 'col1': 'Column 1.', 'col2': 'Partition.'}).
122123
mode: str
123124
'overwrite' to recreate any possible existing table or 'append' to keep any possible existing table.
125+
catalog_versioning : bool
126+
If True and `mode="overwrite"`, creates an archived version of the table catalog before updating it.
124127
boto3_session : boto3.Session(), optional
125128
Boto3 Session. The default boto3 session will be used if boto3_session receive None.
126129
@@ -157,6 +160,7 @@ def create_parquet_table(
157160
parameters=parameters,
158161
columns_comments=columns_comments,
159162
mode=mode,
163+
catalog_versioning=catalog_versioning,
160164
boto3_session=boto3_session,
161165
table_input=table_input,
162166
)
@@ -865,6 +869,7 @@ def create_csv_table(
865869
parameters: Optional[Dict[str, str]] = None,
866870
columns_comments: Optional[Dict[str, str]] = None,
867871
mode: str = "overwrite",
872+
catalog_versioning: bool = False,
868873
sep: str = ",",
869874
boto3_session: Optional[boto3.Session] = None,
870875
) -> None:
@@ -884,16 +889,18 @@ def create_csv_table(
884889
Dictionary with keys as column names and vales as data types (e.g. {'col0': 'bigint', 'col1': 'double'}).
885890
partitions_types: Dict[str, str], optional
886891
Dictionary with keys as partition names and values as data types (e.g. {'col2': 'date'}).
887-
compression: str, optional
892+
compression : str, optional
888893
Compression style (``None``, ``gzip``, etc).
889-
description: str, optional
894+
description : str, optional
890895
Table description
891-
parameters: Dict[str, str], optional
896+
parameters : Dict[str, str], optional
892897
Key/value pairs to tag the table.
893898
columns_comments: Dict[str, str], optional
894899
Columns names and the related comments (e.g. {'col0': 'Column 0.', 'col1': 'Column 1.', 'col2': 'Partition.'}).
895-
mode: str
900+
mode : str
896901
'overwrite' to recreate any possible axisting table or 'append' to keep any possible axisting table.
902+
catalog_versioning : bool
903+
If True and `mode="overwrite"`, creates an archived version of the table catalog before updating it.
897904
sep : str
898905
String of length 1. Field delimiter for the output file.
899906
boto3_session : boto3.Session(), optional
@@ -937,6 +944,7 @@ def create_csv_table(
937944
parameters=parameters,
938945
columns_comments=columns_comments,
939946
mode=mode,
947+
catalog_versioning=catalog_versioning,
940948
boto3_session=boto3_session,
941949
table_input=table_input,
942950
)
@@ -949,6 +957,7 @@ def _create_table(
949957
parameters: Optional[Dict[str, str]],
950958
columns_comments: Optional[Dict[str, str]],
951959
mode: str,
960+
catalog_versioning: bool,
952961
boto3_session: Optional[boto3.Session],
953962
table_input: Dict[str, Any],
954963
):
@@ -967,10 +976,14 @@ def _create_table(
967976
if name in columns_comments:
968977
par["Comment"] = columns_comments[name]
969978
session: boto3.Session = _utils.ensure_session(session=boto3_session)
979+
client_glue: boto3.client = _utils.client(service_name="glue", session=session)
970980
exist: bool = does_table_exist(database=database, table=table, boto3_session=session)
971-
if (mode == "overwrite") or (exist is False):
972-
delete_table_if_exists(database=database, table=table, boto3_session=session)
973-
client_glue: boto3.client = _utils.client(service_name="glue", session=session)
981+
if mode not in ("overwrite", "append"): # pragma: no cover
982+
raise exceptions.InvalidArgument(f"{mode} is not a valid mode. It must be 'overwrite' or 'append'.")
983+
if (exist is True) and (mode == "overwrite"):
984+
skip_archive: bool = not catalog_versioning
985+
client_glue.update_table(DatabaseName=database, TableInput=table_input, SkipArchive=skip_archive)
986+
elif exist is False:
974987
client_glue.create_table(DatabaseName=database, TableInput=table_input)
975988

976989

awswrangler/s3.py

Lines changed: 12 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -433,6 +433,7 @@ def to_csv( # pylint: disable=too-many-arguments
433433
dataset: bool = False,
434434
partition_cols: Optional[List[str]] = None,
435435
mode: Optional[str] = None,
436+
catalog_versioning: bool = False,
436437
database: Optional[str] = None,
437438
table: Optional[str] = None,
438439
dtype: Optional[Dict[str, str]] = None,
@@ -483,6 +484,8 @@ def to_csv( # pylint: disable=too-many-arguments
483484
List of column names that will be used to create partitions. Only takes effect if dataset=True.
484485
mode: str, optional
485486
``append`` (Default), ``overwrite``, ``overwrite_partitions``. Only takes effect if dataset=True.
487+
catalog_versioning : bool
488+
If True and `mode="overwrite"`, creates an archived version of the table catalog before updating it.
486489
database : str, optional
487490
Glue/Athena catalog: Database name.
488491
table : str, optional
@@ -677,6 +680,7 @@ def to_csv( # pylint: disable=too-many-arguments
677680
columns_comments=columns_comments,
678681
boto3_session=session,
679682
mode="overwrite",
683+
catalog_versioning=catalog_versioning,
680684
sep=sep,
681685
)
682686
if partitions_values:
@@ -846,6 +850,7 @@ def to_parquet( # pylint: disable=too-many-arguments
846850
dataset: bool = False,
847851
partition_cols: Optional[List[str]] = None,
848852
mode: Optional[str] = None,
853+
catalog_versioning: bool = False,
849854
database: Optional[str] = None,
850855
table: Optional[str] = None,
851856
dtype: Optional[Dict[str, str]] = None,
@@ -893,6 +898,8 @@ def to_parquet( # pylint: disable=too-many-arguments
893898
List of column names that will be used to create partitions. Only takes effect if dataset=True.
894899
mode: str, optional
895900
``append`` (Default), ``overwrite``, ``overwrite_partitions``. Only takes effect if dataset=True.
901+
catalog_versioning : bool
902+
If True and `mode="overwrite"`, creates an archived version of the table catalog before updating it.
896903
database : str, optional
897904
Glue/Athena catalog: Database name.
898905
table : str, optional
@@ -1092,6 +1099,7 @@ def to_parquet( # pylint: disable=too-many-arguments
10921099
columns_comments=columns_comments,
10931100
boto3_session=session,
10941101
mode="overwrite",
1102+
catalog_versioning=catalog_versioning,
10951103
)
10961104
if partitions_values:
10971105
_logger.debug("partitions_values:\n%s", partitions_values)
@@ -1838,6 +1846,7 @@ def store_parquet_metadata(
18381846
columns_comments: Optional[Dict[str, str]] = None,
18391847
compression: Optional[str] = None,
18401848
mode: str = "overwrite",
1849+
catalog_versioning: bool = False,
18411850
boto3_session: Optional[boto3.Session] = None,
18421851
) -> Tuple[Dict[str, str], Optional[Dict[str, str]], Optional[Dict[str, List[str]]]]:
18431852
"""Infer and store parquet metadata on AWS Glue Catalog.
@@ -1879,6 +1888,8 @@ def store_parquet_metadata(
18791888
Compression style (``None``, ``snappy``, ``gzip``, etc).
18801889
mode: str
18811890
'overwrite' to recreate any possible existing table or 'append' to keep any possible existing table.
1891+
catalog_versioning : bool
1892+
If True and `mode="overwrite"`, creates an archived version of the table catalog before updating it.
18821893
boto3_session : boto3.Session(), optional
18831894
Boto3 Session. The default boto3 session will be used if boto3_session receive None.
18841895
@@ -1924,6 +1935,7 @@ def store_parquet_metadata(
19241935
parameters=parameters,
19251936
columns_comments=columns_comments,
19261937
mode=mode,
1938+
catalog_versioning=catalog_versioning,
19271939
boto3_session=session,
19281940
)
19291941
partitions_values: Dict[str, List[str]] = _data_types.athena_partitions_from_pyarrow_partitions(

testing/test_awswrangler/test_data_lake.py

Lines changed: 67 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1281,3 +1281,70 @@ def test_athena_nested(bucket, database):
12811281
df2 = wr.athena.read_sql_query(sql=f"SELECT c0, c1, c2, c4 FROM {table}", database=database)
12821282
assert len(df2.index) == 2
12831283
assert len(df2.columns) == 4
1284+
1285+
1286+
def test_catalog_versioning(bucket, database):
1287+
table = "test_catalog_versioning"
1288+
wr.catalog.delete_table_if_exists(database=database, table=table)
1289+
path = f"s3://{bucket}/{table}/"
1290+
wr.s3.delete_objects(path=path)
1291+
1292+
# Version 0
1293+
df = pd.DataFrame({"c0": [1, 2]})
1294+
paths = wr.s3.to_parquet(df=df, path=path, dataset=True, database=database, table=table, mode="overwrite")["paths"]
1295+
wr.s3.wait_objects_exist(paths=paths, use_threads=False)
1296+
df = wr.athena.read_sql_table(table=table, database=database)
1297+
assert len(df.index) == 2
1298+
assert len(df.columns) == 1
1299+
assert str(df.c0.dtype).startswith("Int")
1300+
1301+
# Version 1
1302+
df = pd.DataFrame({"c1": ["foo", "boo"]})
1303+
paths = wr.s3.to_parquet(
1304+
df=df, path=path, dataset=True, database=database, table=table, mode="overwrite", catalog_versioning=True
1305+
)["paths"]
1306+
wr.s3.wait_objects_exist(paths=paths, use_threads=False)
1307+
df = wr.athena.read_sql_table(table=table, database=database)
1308+
assert len(df.index) == 2
1309+
assert len(df.columns) == 1
1310+
assert str(df.c1.dtype) == "string"
1311+
1312+
# Version 2
1313+
df = pd.DataFrame({"c1": [1.0, 2.0]})
1314+
paths = wr.s3.to_csv(
1315+
df=df,
1316+
path=path,
1317+
dataset=True,
1318+
database=database,
1319+
table=table,
1320+
mode="overwrite",
1321+
catalog_versioning=True,
1322+
index=False,
1323+
)["paths"]
1324+
wr.s3.wait_objects_exist(paths=paths, use_threads=False)
1325+
df = wr.athena.read_sql_table(table=table, database=database)
1326+
assert len(df.index) == 2
1327+
assert len(df.columns) == 1
1328+
assert str(df.c1.dtype).startswith("float")
1329+
1330+
# Version 3 (removing version 2)
1331+
df = pd.DataFrame({"c1": [True, False]})
1332+
paths = wr.s3.to_csv(
1333+
df=df,
1334+
path=path,
1335+
dataset=True,
1336+
database=database,
1337+
table=table,
1338+
mode="overwrite",
1339+
catalog_versioning=False,
1340+
index=False,
1341+
)["paths"]
1342+
wr.s3.wait_objects_exist(paths=paths, use_threads=False)
1343+
df = wr.athena.read_sql_table(table=table, database=database)
1344+
assert len(df.index) == 2
1345+
assert len(df.columns) == 1
1346+
assert str(df.c1.dtype).startswith("boolean")
1347+
1348+
# Cleaning Up
1349+
wr.catalog.delete_table_if_exists(database=database, table=table)
1350+
wr.s3.delete_objects(path=path)

0 commit comments

Comments
 (0)