Skip to content

Commit 2164b68

Browse files
committed
Add sanitize_columns arg in to_parquet and to_csv. #278 #279
1 parent d0c8614 commit 2164b68

File tree

2 files changed

+53
-16
lines changed

2 files changed

+53
-16
lines changed

awswrangler/s3.py

Lines changed: 30 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -429,7 +429,7 @@ def size_objects(
429429
return size_dict
430430

431431

432-
def to_csv( # pylint: disable=too-many-arguments
432+
def to_csv( # pylint: disable=too-many-arguments,too-many-locals
433433
df: pd.DataFrame,
434434
path: str,
435435
sep: str = ",",
@@ -438,6 +438,7 @@ def to_csv( # pylint: disable=too-many-arguments
438438
use_threads: bool = True,
439439
boto3_session: Optional[boto3.Session] = None,
440440
s3_additional_kwargs: Optional[Dict[str, str]] = None,
441+
sanitize_columns: bool = False,
441442
dataset: bool = False,
442443
partition_cols: Optional[List[str]] = None,
443444
mode: Optional[str] = None,
@@ -464,8 +465,9 @@ def to_csv( # pylint: disable=too-many-arguments
464465
465466
Note
466467
----
467-
The table name and all column names will be automatically sanitize using
468+
If `dataset=True` The table name and all column names will be automatically sanitized using
468469
`wr.catalog.sanitize_table_name` and `wr.catalog.sanitize_column_name`.
470+
Please, pass `sanitize_columns=True` to force the same behaviour for `dataset=False`.
469471
470472
Note
471473
----
@@ -495,13 +497,16 @@ def to_csv( # pylint: disable=too-many-arguments
495497
s3_additional_kwargs:
496498
Forward to s3fs, useful for server side encryption
497499
https://s3fs.readthedocs.io/en/latest/#serverside-encryption
498-
dataset: bool
500+
sanitize_columns : bool
501+
True to sanitize columns names or False to keep it as is.
502+
True value is forced if `dataset=True`.
503+
dataset : bool
499504
If True store a parquet dataset instead of a single file.
500505
If True, enable all follow arguments:
501506
partition_cols, mode, database, table, description, parameters, columns_comments, .
502507
partition_cols: List[str], optional
503508
List of column names that will be used to create partitions. Only takes effect if dataset=True.
504-
mode: str, optional
509+
mode : str, optional
505510
``append`` (Default), ``overwrite``, ``overwrite_partitions``. Only takes effect if dataset=True.
506511
catalog_versioning : bool
507512
If True and `mode="overwrite"`, creates an archived version of the table catalog before updating it.
@@ -662,13 +667,16 @@ def to_csv( # pylint: disable=too-many-arguments
662667
if df.empty is True:
663668
raise exceptions.EmptyDataFrame()
664669

665-
# Sanitize table to respect Athena's standards
666670
partition_cols = partition_cols if partition_cols else []
667671
dtype = dtype if dtype else {}
668672
partitions_values: Dict[str, List[str]] = {}
669-
df = catalog.sanitize_dataframe_columns_names(df=df)
670-
partition_cols = [catalog.sanitize_column_name(p) for p in partition_cols]
671-
dtype = {catalog.sanitize_column_name(k): v.lower() for k, v in dtype.items()}
673+
674+
# Sanitize table to respect Athena's standards
675+
if (sanitize_columns is True) or (dataset is True):
676+
df = catalog.sanitize_dataframe_columns_names(df=df)
677+
partition_cols = [catalog.sanitize_column_name(p) for p in partition_cols]
678+
dtype = {catalog.sanitize_column_name(k): v.lower() for k, v in dtype.items()}
679+
df = catalog.drop_duplicated_columns(df=df)
672680

673681
session: boto3.Session = _utils.ensure_session(session=boto3_session)
674682
fs: s3fs.S3FileSystem = _utils.get_fs(session=session, s3_additional_kwargs=s3_additional_kwargs)
@@ -703,7 +711,6 @@ def to_csv( # pylint: disable=too-many-arguments
703711
if catalog_types is not None:
704712
for k, v in catalog_types.items():
705713
dtype[k] = v
706-
df = catalog.drop_duplicated_columns(df=df)
707714
paths, partitions_values = _to_csv_dataset(
708715
df=df,
709716
path=path,
@@ -906,6 +913,7 @@ def to_parquet( # pylint: disable=too-many-arguments,too-many-locals
906913
use_threads: bool = True,
907914
boto3_session: Optional[boto3.Session] = None,
908915
s3_additional_kwargs: Optional[Dict[str, str]] = None,
916+
sanitize_columns: bool = False,
909917
dataset: bool = False,
910918
partition_cols: Optional[List[str]] = None,
911919
mode: Optional[str] = None,
@@ -931,8 +939,9 @@ def to_parquet( # pylint: disable=too-many-arguments,too-many-locals
931939
932940
Note
933941
----
934-
The table name and all column names will be automatically sanitize using
942+
If `dataset=True` The table name and all column names will be automatically sanitized using
935943
`wr.catalog.sanitize_table_name` and `wr.catalog.sanitize_column_name`.
944+
Please, pass `sanitize_columns=True` to force the same behaviour for `dataset=False`.
936945
937946
Note
938947
----
@@ -960,7 +969,10 @@ def to_parquet( # pylint: disable=too-many-arguments,too-many-locals
960969
s3_additional_kwargs:
961970
Forward to s3fs, useful for server side encryption
962971
https://s3fs.readthedocs.io/en/latest/#serverside-encryption
963-
dataset: bool
972+
sanitize_columns : bool
973+
True to sanitize columns names or False to keep it as is.
974+
True value is forced if `dataset=True`.
975+
dataset : bool
964976
If True store a parquet dataset instead of a single file.
965977
If True, enable all follow arguments:
966978
partition_cols, mode, database, table, description, parameters, columns_comments, .
@@ -1127,14 +1139,16 @@ def to_parquet( # pylint: disable=too-many-arguments,too-many-locals
11271139
if df.empty is True:
11281140
raise exceptions.EmptyDataFrame()
11291141

1130-
# Sanitize table to respect Athena's standards
11311142
partition_cols = partition_cols if partition_cols else []
11321143
dtype = dtype if dtype else {}
11331144
partitions_values: Dict[str, List[str]] = {}
1134-
df = catalog.sanitize_dataframe_columns_names(df=df)
1135-
partition_cols = [catalog.sanitize_column_name(p) for p in partition_cols]
1136-
dtype = {catalog.sanitize_column_name(k): v.lower() for k, v in dtype.items()}
1137-
df = catalog.drop_duplicated_columns(df=df)
1145+
1146+
# Sanitize table to respect Athena's standards
1147+
if (sanitize_columns is True) or (dataset is True):
1148+
df = catalog.sanitize_dataframe_columns_names(df=df)
1149+
partition_cols = [catalog.sanitize_column_name(p) for p in partition_cols]
1150+
dtype = {catalog.sanitize_column_name(k): v.lower() for k, v in dtype.items()}
1151+
df = catalog.drop_duplicated_columns(df=df)
11381152

11391153
session: boto3.Session = _utils.ensure_session(session=boto3_session)
11401154
cpus: int = _utils.ensure_cpu_count(use_threads=use_threads)

testing/test_awswrangler/test_data_lake2.py

Lines changed: 23 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -463,3 +463,26 @@ def test_list_wrong_path(path):
463463
wrong_path = path.replace("s3://", "")
464464
with pytest.raises(wr.exceptions.InvalidArgumentValue):
465465
wr.s3.list_objects(wrong_path)
466+
467+
468+
@pytest.mark.parametrize("sanitize_columns,col", [(True, "foo_boo"), (False, "FooBoo")])
469+
def test_sanitize_columns(path, sanitize_columns, col):
470+
df = pd.DataFrame({"FooBoo": [1, 2, 3]})
471+
472+
# Parquet
473+
file_path = f"{path}0.parquet"
474+
wr.s3.to_parquet(df, path=file_path, sanitize_columns=sanitize_columns)
475+
wr.s3.wait_objects_exist([file_path])
476+
df = wr.s3.read_parquet(file_path)
477+
assert len(df.index) == 3
478+
assert len(df.columns) == 1
479+
assert df.columns == [col]
480+
481+
# CSV
482+
file_path = f"{path}0.csv"
483+
wr.s3.to_csv(df, path=file_path, sanitize_columns=sanitize_columns, index=False)
484+
wr.s3.wait_objects_exist([file_path])
485+
df = wr.s3.read_csv(file_path)
486+
assert len(df.index) == 3
487+
assert len(df.columns) == 1
488+
assert df.columns == [col]

0 commit comments

Comments
 (0)