Skip to content

Commit 3a6c537

Browse files
committed
Allow sanitize_columns=False when dataset=True and database=None. #380
1 parent f324483 commit 3a6c537

File tree

4 files changed

+52
-21
lines changed

4 files changed

+52
-21
lines changed

awswrangler/s3/_write.py

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -52,7 +52,7 @@ def _validate_args(
5252
if dataset is False:
5353
if path.endswith("/"):
5454
raise exceptions.InvalidArgumentValue(
55-
"If <dataset=False>, the argument <path> should be a object path, not a directory."
55+
"If <dataset=False>, the argument <path> should be a file path, not a directory."
5656
)
5757
if partition_cols:
5858
raise exceptions.InvalidArgumentCombination("Please, pass dataset=True to be able to use partition_cols.")
@@ -66,8 +66,8 @@ def _validate_args(
6666
)
6767
elif (database is None) != (table is None):
6868
raise exceptions.InvalidArgumentCombination(
69-
"Arguments database and table must be passed together. If you want to store your dataset in the Glue "
70-
"Catalog, please ensure you are passing both."
69+
"Arguments database and table must be passed together. If you want to store your dataset metadata in "
70+
"the Glue Catalog, please ensure you are passing both."
7171
)
7272

7373

awswrangler/s3/_write_parquet.py

Lines changed: 13 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -225,14 +225,14 @@ def to_parquet( # pylint: disable=too-many-arguments,too-many-locals
225225
) -> Dict[str, Union[List[str], Dict[str, List[str]]]]:
226226
"""Write Parquet file or dataset on Amazon S3.
227227
228-
The concept of Dataset goes beyond the simple idea of files and enable more
229-
complex features like partitioning, casting and catalog integration (Amazon Athena/AWS Glue Catalog).
228+
The concept of Dataset goes beyond the simple idea of ordinary files and enable more
229+
complex features like partitioning and catalog integration (Amazon Athena/AWS Glue Catalog).
230230
231231
Note
232232
----
233-
If `dataset=True` The table name and all column names will be automatically sanitized using
234-
`wr.catalog.sanitize_table_name` and `wr.catalog.sanitize_column_name`.
235-
Please, pass `sanitize_columns=True` to force the same behaviour for `dataset=False`.
233+
If `database` and `table` arguments are passed, the table name and all column names
234+
will be automatically sanitized using `wr.catalog.sanitize_table_name` and `wr.catalog.sanitize_column_name`.
235+
Please, pass `sanitize_columns=True` to enforce this behaviour always.
236236
237237
Note
238238
----
@@ -267,12 +267,15 @@ def to_parquet( # pylint: disable=too-many-arguments,too-many-locals
267267
"SSECustomerAlgorithm", "SSECustomerKey", "SSEKMSKeyId", "SSEKMSEncryptionContext", "Tagging".
268268
e.g. s3_additional_kwargs={'ServerSideEncryption': 'aws:kms', 'SSEKMSKeyId': 'YOUR_KMY_KEY_ARN'}
269269
sanitize_columns : bool
270-
True to sanitize columns names or False to keep it as is.
271-
True value is forced if `dataset=True`.
270+
True to sanitize columns names (using `wr.catalog.sanitize_table_name` and `wr.catalog.sanitize_column_name`)
271+
or False to keep it as is.
272+
True value behaviour is enforced if `database` and `table` arguments are passed.
272273
dataset : bool
273-
If True store a parquet dataset instead of a single file.
274+
If True store a parquet dataset instead of a ordinary file(s)
274275
If True, enable all follow arguments:
275-
partition_cols, mode, database, table, description, parameters, columns_comments, .
276+
partition_cols, mode, database, table, description, parameters, columns_comments, concurrent_partitioning,
277+
catalog_versioning, projection_enabled, projection_types, projection_ranges, projection_values,
278+
projection_intervals, projection_digits, catalog_id, schema_evolution.
276279
partition_cols: List[str], optional
277280
List of column names that will be used to create partitions. Only takes effect if dataset=True.
278281
concurrent_partitioning: bool
@@ -470,7 +473,7 @@ def to_parquet( # pylint: disable=too-many-arguments,too-many-locals
470473
session: boto3.Session = _utils.ensure_session(session=boto3_session)
471474

472475
# Sanitize table to respect Athena's standards
473-
if (sanitize_columns is True) or (dataset is True):
476+
if (sanitize_columns is True) or (database is not None and table is not None):
474477
df, dtype, partition_cols = _sanitize(df=df, dtype=dtype, partition_cols=partition_cols)
475478

476479
# Evaluating dtype

awswrangler/s3/_write_text.py

Lines changed: 10 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -88,14 +88,14 @@ def to_csv( # pylint: disable=too-many-arguments,too-many-locals
8888
) -> Dict[str, Union[List[str], Dict[str, List[str]]]]:
8989
"""Write CSV file or dataset on Amazon S3.
9090
91-
The concept of Dataset goes beyond the simple idea of files and enable more
92-
complex features like partitioning, casting and catalog integration (Amazon Athena/AWS Glue Catalog).
91+
The concept of Dataset goes beyond the simple idea of ordinary files and enable more
92+
complex features like partitioning and catalog integration (Amazon Athena/AWS Glue Catalog).
9393
9494
Note
9595
----
96-
If `dataset=True` The table name and all column names will be automatically sanitized using
97-
`wr.catalog.sanitize_table_name` and `wr.catalog.sanitize_column_name`.
98-
Please, pass `sanitize_columns=True` to force the same behaviour for `dataset=False`.
96+
If database` and `table` arguments are passed, the table name and all column names
97+
will be automatically sanitized using `wr.catalog.sanitize_table_name` and `wr.catalog.sanitize_column_name`.
98+
Please, pass `sanitize_columns=True` to enforce this behaviour always.
9999
100100
Note
101101
----
@@ -142,9 +142,11 @@ def to_csv( # pylint: disable=too-many-arguments,too-many-locals
142142
True to sanitize columns names or False to keep it as is.
143143
True value is forced if `dataset=True`.
144144
dataset : bool
145-
If True store a parquet dataset instead of a single file.
145+
If True store a parquet dataset instead of a ordinary file(s)
146146
If True, enable all follow arguments:
147-
partition_cols, mode, database, table, description, parameters, columns_comments, .
147+
partition_cols, mode, database, table, description, parameters, columns_comments, concurrent_partitioning,
148+
catalog_versioning, projection_enabled, projection_types, projection_ranges, projection_values,
149+
projection_intervals, projection_digits, catalog_id, schema_evolution.
148150
partition_cols: List[str], optional
149151
List of column names that will be used to create partitions. Only takes effect if dataset=True.
150152
concurrent_partitioning: bool
@@ -358,7 +360,7 @@ def to_csv( # pylint: disable=too-many-arguments,too-many-locals
358360
session: boto3.Session = _utils.ensure_session(session=boto3_session)
359361

360362
# Sanitize table to respect Athena's standards
361-
if (sanitize_columns is True) or (dataset is True):
363+
if (sanitize_columns is True) or (database is not None and table is not None):
362364
df, dtype, partition_cols = _sanitize(df=df, dtype=dtype, partition_cols=partition_cols)
363365

364366
# Evaluating dtype

tests/test_s3_parquet.py

Lines changed: 26 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -323,3 +323,29 @@ def test_multi_index_recovery_nameless(path, use_threads):
323323
wr.s3.wait_objects_exist(paths=paths, use_threads=use_threads)
324324
df2 = wr.s3.read_parquet(f"{path}*.parquet", use_threads=use_threads)
325325
assert df.reset_index().equals(df2.reset_index())
326+
327+
328+
def test_to_parquet_dataset_sanitize(path):
329+
df = pd.DataFrame({"C0": [0, 1], "camelCase": [2, 3], "c**--2": [4, 5], "Par": ["a", "b"]})
330+
331+
paths = wr.s3.to_parquet(df, path, dataset=True, partition_cols=["Par"], sanitize_columns=False)["paths"]
332+
wr.s3.wait_objects_exist(paths)
333+
df2 = wr.s3.read_parquet(path, dataset=True)
334+
assert df.shape == df2.shape
335+
assert list(df2.columns) == ["C0", "camelCase", "c**--2", "Par"]
336+
assert df2.C0.sum() == 1
337+
assert df2.camelCase.sum() == 5
338+
assert df2["c**--2"].sum() == 9
339+
assert df2.Par.to_list() == ["a", "b"]
340+
341+
paths = wr.s3.to_parquet(df, path, dataset=True, partition_cols=["par"], sanitize_columns=True, mode="overwrite")[
342+
"paths"
343+
]
344+
wr.s3.wait_objects_exist(paths)
345+
df2 = wr.s3.read_parquet(path, dataset=True)
346+
assert df.shape == df2.shape
347+
assert list(df2.columns) == ["c0", "camel_case", "c_2", "par"]
348+
assert df2.c0.sum() == 1
349+
assert df2.camel_case.sum() == 5
350+
assert df2.c_2.sum() == 9
351+
assert df2.par.to_list() == ["a", "b"]

0 commit comments

Comments
 (0)