Skip to content

Commit b3837c6

Browse files
committed
Remove in memory copy of DataFrame for to_parquet and to_csv.
1 parent bd9ab94 commit b3837c6

File tree

3 files changed

+16
-12
lines changed

3 files changed

+16
-12
lines changed

awswrangler/catalog.py

Lines changed: 12 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -886,6 +886,10 @@ def sanitize_table_name(table: str) -> str:
886886
def drop_duplicated_columns(df: pd.DataFrame) -> pd.DataFrame:
887887
"""Drop all repeated columns (duplicated names).
888888
889+
Note
890+
----
891+
This transformation will run `inplace` and will make changes in the original DataFrame.
892+
889893
Note
890894
----
891895
It is different from Panda's drop_duplicates() function which considers the column values.
@@ -912,11 +916,14 @@ def drop_duplicated_columns(df: pd.DataFrame) -> pd.DataFrame:
912916
1 2
913917
914918
"""
915-
duplicated_cols = df.columns.duplicated()
916-
duplicated_cols_names: List[str] = list(df.columns[duplicated_cols])
917-
if len(duplicated_cols_names) > 0:
918-
_logger.warning("Dropping repeated columns: %s", duplicated_cols_names)
919-
return df.loc[:, ~duplicated_cols]
919+
duplicated = df.columns.duplicated()
920+
if duplicated.any():
921+
_logger.warning("Dropping duplicated columns...")
922+
columns = df.columns.values
923+
columns[duplicated] = "AWSDataWranglerDuplicatedMarker"
924+
df.columns = columns
925+
df.drop(columns="AWSDataWranglerDuplicatedMarker", inplace=True)
926+
return df
920927

921928

922929
def get_connection(

awswrangler/s3/_write.py

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -443,7 +443,7 @@ def to_csv( # pylint: disable=too-many-arguments,too-many-locals
443443
df = catalog.sanitize_dataframe_columns_names(df=df)
444444
partition_cols = [catalog.sanitize_column_name(p) for p in partition_cols]
445445
dtype = {catalog.sanitize_column_name(k): v.lower() for k, v in dtype.items()}
446-
df = catalog.drop_duplicated_columns(df=df)
446+
catalog.drop_duplicated_columns(df=df)
447447

448448
session: boto3.Session = _utils.ensure_session(session=boto3_session)
449449
fs: s3fs.S3FileSystem = _utils.get_fs(session=session, s3_additional_kwargs=s3_additional_kwargs)
@@ -829,7 +829,7 @@ def to_parquet( # pylint: disable=too-many-arguments,too-many-locals
829829
df = catalog.sanitize_dataframe_columns_names(df=df)
830830
partition_cols = [catalog.sanitize_column_name(p) for p in partition_cols]
831831
dtype = {catalog.sanitize_column_name(k): v.lower() for k, v in dtype.items()}
832-
df = catalog.drop_duplicated_columns(df=df)
832+
catalog.drop_duplicated_columns(df=df)
833833

834834
session: boto3.Session = _utils.ensure_session(session=boto3_session)
835835
cpus: int = _utils.ensure_cpu_count(use_threads=use_threads)

testing/test_awswrangler/test_data_lake2.py

Lines changed: 2 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -103,11 +103,10 @@ def test_json_chunksize(path):
103103
def test_parquet_cast_string(path):
104104
df = pd.DataFrame({"id": [1, 2, 3], "value": ["foo", "boo", "bar"]})
105105
path_file = f"{path}0.parquet"
106-
wr.s3.to_parquet(df, path_file, dtype={"id": "string"})
106+
wr.s3.to_parquet(df, path_file, dtype={"id": "string"}, sanitize_columns=False)
107107
wr.s3.wait_objects_exist([path_file])
108108
df2 = wr.s3.read_parquet(path_file)
109109
assert str(df2.id.dtypes) == "string"
110-
df2["id"] = df2["id"].astype(int)
111110
assert df.shape == df2.shape
112111
for col, row in tuple(itertools.product(df.columns, range(3))):
113112
assert df[col].iloc[row] == df2[col].iloc[row]
@@ -123,8 +122,6 @@ def test_parquet_cast_string_dataset(path, partition_cols):
123122
df2 = wr.s3.read_parquet(path, dataset=True).sort_values("id", ignore_index=True)
124123
assert str(df2.id.dtypes) == "string"
125124
assert str(df2.c3.dtypes) == "string"
126-
df2["id"] = df2["id"].astype(int)
127-
df2["c3"] = df2["c3"].astype(float)
128125
assert df.shape == df2.shape
129126
for col, row in tuple(itertools.product(df.columns, range(3))):
130127
assert df[col].iloc[row] == df2[col].iloc[row]
@@ -158,7 +155,7 @@ def test_athena_undefined_column(database):
158155
def test_to_parquet_file_sanitize(path):
159156
df = pd.DataFrame({"C0": [0, 1], "camelCase": [2, 3], "c**--2": [4, 5]})
160157
path_file = f"{path}0.parquet"
161-
wr.s3.to_parquet(df, path_file)
158+
wr.s3.to_parquet(df, path_file, sanitize_columns=True)
162159
wr.s3.wait_objects_exist([path_file])
163160
df2 = wr.s3.read_parquet(path_file)
164161
assert df.shape == df2.shape

0 commit comments

Comments
 (0)