Skip to content

Commit 31ba2a0

Browse files
authored
Prepend prefix to to_parquet created files (#622)
1 parent 32ec1d9 commit 31ba2a0

File tree

6 files changed

+128
-15
lines changed

6 files changed

+128
-15
lines changed

awswrangler/s3/_write_dataset.py

Lines changed: 14 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,6 @@
11
"""Amazon S3 Write Dataset (PRIVATE)."""
22

33
import logging
4-
import uuid
54
from typing import Any, Callable, Dict, List, Optional, Tuple, Union
65

76
import boto3
@@ -24,12 +23,12 @@ def _to_partitions(
2423
mode: str,
2524
partition_cols: List[str],
2625
bucketing_info: Optional[Tuple[List[str], int]],
26+
filename_prefix: str,
2727
boto3_session: boto3.Session,
2828
**func_kwargs: Any,
2929
) -> Tuple[List[str], Dict[str, List[str]]]:
3030
partitions_values: Dict[str, List[str]] = {}
3131
proxy: _WriteProxy = _WriteProxy(use_threads=concurrent_partitioning)
32-
filename_prefix = uuid.uuid4().hex
3332

3433
for keys, subgroup in df.groupby(by=partition_cols, observed=True):
3534
subgroup = subgroup.drop(partition_cols, axis="columns")
@@ -60,6 +59,7 @@ def _to_partitions(
6059
func=func,
6160
df=subgroup,
6261
path_root=prefix,
62+
filename_prefix=filename_prefix,
6363
boto3_session=boto3_session,
6464
use_threads=use_threads,
6565
**func_kwargs,
@@ -74,25 +74,23 @@ def _to_buckets(
7474
df: pd.DataFrame,
7575
path_root: str,
7676
bucketing_info: Tuple[List[str], int],
77+
filename_prefix: str,
7778
boto3_session: boto3.Session,
7879
use_threads: bool,
7980
proxy: Optional[_WriteProxy] = None,
80-
filename_prefix: Optional[str] = None,
8181
**func_kwargs: Any,
8282
) -> List[str]:
8383
_proxy: _WriteProxy = proxy if proxy else _WriteProxy(use_threads=False)
8484
bucket_number_series = df.apply(
8585
lambda row: _get_bucket_number(bucketing_info[1], [row[col_name] for col_name in bucketing_info[0]]),
8686
axis="columns",
8787
)
88-
if filename_prefix is None:
89-
filename_prefix = uuid.uuid4().hex
9088
for bucket_number, subgroup in df.groupby(by=bucket_number_series, observed=True):
9189
_proxy.write(
9290
func=func,
9391
df=subgroup,
9492
path_root=path_root,
95-
filename=f"{filename_prefix}_bucket-{bucket_number:05d}",
93+
filename_prefix=f"{filename_prefix}_bucket-{bucket_number:05d}",
9694
boto3_session=boto3_session,
9795
use_threads=use_threads,
9896
**func_kwargs,
@@ -133,6 +131,7 @@ def _to_dataset(
133131
concurrent_partitioning: bool,
134132
df: pd.DataFrame,
135133
path_root: str,
134+
filename_prefix: str,
136135
index: bool,
137136
use_threads: bool,
138137
mode: str,
@@ -168,6 +167,7 @@ def _to_dataset(
168167
use_threads=use_threads,
169168
mode=mode,
170169
bucketing_info=bucketing_info,
170+
filename_prefix=filename_prefix,
171171
partition_cols=partition_cols,
172172
boto3_session=boto3_session,
173173
index=index,
@@ -180,13 +180,20 @@ def _to_dataset(
180180
path_root=path_root,
181181
use_threads=use_threads,
182182
bucketing_info=bucketing_info,
183+
filename_prefix=filename_prefix,
183184
boto3_session=boto3_session,
184185
index=index,
185186
**func_kwargs,
186187
)
187188
else:
188189
paths = func(
189-
df=df, path_root=path_root, use_threads=use_threads, boto3_session=boto3_session, index=index, **func_kwargs
190+
df=df,
191+
path_root=path_root,
192+
filename_prefix=filename_prefix,
193+
use_threads=use_threads,
194+
boto3_session=boto3_session,
195+
index=index,
196+
**func_kwargs,
190197
)
191198
_logger.debug("paths: %s", paths)
192199
_logger.debug("partitions_values: %s", partitions_values)

awswrangler/s3/_write_parquet.py

Lines changed: 7 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -150,13 +150,11 @@ def _to_parquet(
150150
use_threads: bool,
151151
path: Optional[str] = None,
152152
path_root: Optional[str] = None,
153-
filename: Optional[str] = None,
153+
filename_prefix: Optional[str] = uuid.uuid4().hex,
154154
max_rows_by_file: Optional[int] = 0,
155155
) -> List[str]:
156156
if path is None and path_root is not None:
157-
if filename is None:
158-
filename = uuid.uuid4().hex
159-
file_path: str = f"{path_root}{filename}{compression_ext}.parquet"
157+
file_path: str = f"{path_root}{filename_prefix}{compression_ext}.parquet"
160158
elif path is not None and path_root is None:
161159
file_path = path
162160
else:
@@ -207,6 +205,7 @@ def to_parquet( # pylint: disable=too-many-arguments,too-many-locals
207205
s3_additional_kwargs: Optional[Dict[str, Any]] = None,
208206
sanitize_columns: bool = False,
209207
dataset: bool = False,
208+
filename_prefix: Optional[str] = None,
210209
partition_cols: Optional[List[str]] = None,
211210
bucketing_info: Optional[Tuple[List[str], int]] = None,
212211
concurrent_partitioning: bool = False,
@@ -283,6 +282,8 @@ def to_parquet( # pylint: disable=too-many-arguments,too-many-locals
283282
partition_cols, mode, database, table, description, parameters, columns_comments, concurrent_partitioning,
284283
catalog_versioning, projection_enabled, projection_types, projection_ranges, projection_values,
285284
projection_intervals, projection_digits, catalog_id, schema_evolution.
285+
filename_prefix: str, optional
286+
If dataset=True, add a filename prefix to the output files.
286287
partition_cols: List[str], optional
287288
List of column names that will be used to create partitions. Only takes effect if dataset=True.
288289
bucketing_info: Tuple[List[str], int], optional
@@ -499,6 +500,7 @@ def to_parquet( # pylint: disable=too-many-arguments,too-many-locals
499500
dtype = dtype if dtype else {}
500501
partitions_values: Dict[str, List[str]] = {}
501502
mode = "append" if mode is None else mode
503+
filename_prefix = filename_prefix + uuid.uuid4().hex if filename_prefix else uuid.uuid4().hex
502504
cpus: int = _utils.ensure_cpu_count(use_threads=use_threads)
503505
session: boto3.Session = _utils.ensure_session(session=boto3_session)
504506

@@ -560,6 +562,7 @@ def to_parquet( # pylint: disable=too-many-arguments,too-many-locals
560562
concurrent_partitioning=concurrent_partitioning,
561563
df=df,
562564
path_root=path, # type: ignore
565+
filename_prefix=filename_prefix,
563566
index=index,
564567
compression=compression,
565568
compression_ext=compression_ext,

awswrangler/s3/_write_text.py

Lines changed: 7 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -37,16 +37,14 @@ def _to_text(
3737
s3_additional_kwargs: Optional[Dict[str, str]],
3838
path: Optional[str] = None,
3939
path_root: Optional[str] = None,
40-
filename: Optional[str] = None,
40+
filename_prefix: Optional[str] = uuid.uuid4().hex,
4141
**pandas_kwargs: Any,
4242
) -> List[str]:
4343
if df.empty is True:
4444
raise exceptions.EmptyDataFrame()
4545
if path is None and path_root is not None:
46-
if filename is None:
47-
filename = uuid.uuid4().hex
4846
file_path: str = (
49-
f"{path_root}{filename}.{file_format}{_COMPRESSION_2_EXT.get(pandas_kwargs.get('compression'))}"
47+
f"{path_root}{filename_prefix}.{file_format}{_COMPRESSION_2_EXT.get(pandas_kwargs.get('compression'))}"
5048
)
5149
elif path is not None and path_root is None:
5250
file_path = path
@@ -83,6 +81,7 @@ def to_csv( # pylint: disable=too-many-arguments,too-many-locals,too-many-state
8381
s3_additional_kwargs: Optional[Dict[str, Any]] = None,
8482
sanitize_columns: bool = False,
8583
dataset: bool = False,
84+
filename_prefix: Optional[str] = None,
8685
partition_cols: Optional[List[str]] = None,
8786
bucketing_info: Optional[Tuple[List[str], int]] = None,
8887
concurrent_partitioning: bool = False,
@@ -165,6 +164,8 @@ def to_csv( # pylint: disable=too-many-arguments,too-many-locals,too-many-state
165164
partition_cols, mode, database, table, description, parameters, columns_comments, concurrent_partitioning,
166165
catalog_versioning, projection_enabled, projection_types, projection_ranges, projection_values,
167166
projection_intervals, projection_digits, catalog_id, schema_evolution.
167+
filename_prefix: str, optional
168+
If dataset=True, add a filename prefix to the output files.
168169
partition_cols: List[str], optional
169170
List of column names that will be used to create partitions. Only takes effect if dataset=True.
170171
bucketing_info: Tuple[List[str], int], optional
@@ -403,6 +404,7 @@ def to_csv( # pylint: disable=too-many-arguments,too-many-locals,too-many-state
403404
dtype = dtype if dtype else {}
404405
partitions_values: Dict[str, List[str]] = {}
405406
mode = "append" if mode is None else mode
407+
filename_prefix = filename_prefix + uuid.uuid4().hex if filename_prefix else uuid.uuid4().hex
406408
session: boto3.Session = _utils.ensure_session(session=boto3_session)
407409

408410
# Sanitize table to respect Athena's standards
@@ -480,6 +482,7 @@ def to_csv( # pylint: disable=too-many-arguments,too-many-locals,too-many-state
480482
index=index,
481483
sep=sep,
482484
compression=compression,
485+
filename_prefix=filename_prefix,
483486
use_threads=use_threads,
484487
partition_cols=partition_cols,
485488
bucketing_info=bucketing_info,

tests/conftest.py

Lines changed: 11 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -271,3 +271,14 @@ def timestream_database_and_table():
271271
yield name
272272
wr.timestream.delete_table(name, name)
273273
wr.timestream.delete_database(name)
274+
275+
276+
@pytest.fixture(scope="function")
277+
def compare_filename_prefix():
278+
def assert_filename_prefix(filename, filename_prefix, test_prefix):
279+
if filename_prefix:
280+
assert filename.startswith(test_prefix)
281+
else:
282+
assert not filename.startswith(test_prefix)
283+
284+
return assert_filename_prefix

tests/test_s3_parquet.py

Lines changed: 43 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -192,6 +192,49 @@ def test_to_parquet_file_dtype(path, use_threads):
192192
assert str(df2.c1.dtype) == "string"
193193

194194

195+
@pytest.mark.parametrize("filename_prefix", [None, "my_prefix"])
196+
@pytest.mark.parametrize("use_threads", [True, False])
197+
def test_to_parquet_filename_prefix(compare_filename_prefix, path, filename_prefix, use_threads):
198+
test_prefix = "my_prefix"
199+
df = pd.DataFrame({"col": [1, 2, 3], "col2": ["A", "A", "B"]})
200+
file_path = f"{path}0.parquet"
201+
202+
# If Dataset is False, parquet file should never start with prefix
203+
filename = wr.s3.to_parquet(
204+
df=df, path=file_path, dataset=False, filename_prefix=filename_prefix, use_threads=use_threads
205+
)["paths"][0].split("/")[-1]
206+
assert not filename.startswith(test_prefix)
207+
208+
# If Dataset is True, parquet file starts with prefix if one is supplied
209+
filename = wr.s3.to_parquet(
210+
df=df, path=path, dataset=True, filename_prefix=filename_prefix, use_threads=use_threads
211+
)["paths"][0].split("/")[-1]
212+
compare_filename_prefix(filename, filename_prefix, test_prefix)
213+
214+
# Partitioned
215+
filename = wr.s3.to_parquet(
216+
df=df,
217+
path=path,
218+
dataset=True,
219+
filename_prefix=filename_prefix,
220+
partition_cols=["col2"],
221+
use_threads=use_threads,
222+
)["paths"][0].split("/")[-1]
223+
compare_filename_prefix(filename, filename_prefix, test_prefix)
224+
225+
# Bucketing
226+
filename = wr.s3.to_parquet(
227+
df=df,
228+
path=path,
229+
dataset=True,
230+
filename_prefix=filename_prefix,
231+
bucketing_info=(["col2"], 2),
232+
use_threads=use_threads,
233+
)["paths"][0].split("/")[-1]
234+
compare_filename_prefix(filename, filename_prefix, test_prefix)
235+
assert filename.endswith("bucket-00000.snappy.parquet")
236+
237+
195238
def test_read_parquet_map_types(path):
196239
df = pd.DataFrame({"c0": [0, 1, 1, 2]}, dtype=np.int8)
197240
file_path = f"{path}0.parquet"

tests/test_s3_text.py

Lines changed: 46 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -130,6 +130,52 @@ def test_json(path):
130130
assert df1.equals(wr.s3.read_json(path=[path0, path1], use_threads=True))
131131

132132

133+
@pytest.mark.parametrize("filename_prefix", [None, "my_prefix"])
134+
@pytest.mark.parametrize("use_threads", [True, False])
135+
def test_to_text_filename_prefix(compare_filename_prefix, path, filename_prefix, use_threads):
136+
test_prefix = "my_prefix"
137+
df = pd.DataFrame({"col": [1, 2, 3], "col2": ["A", "A", "B"]})
138+
139+
# If Dataset is False, csv/json file should never start with prefix
140+
file_path = f"{path}0.json"
141+
filename = wr.s3.to_json(df=df, path=file_path, use_threads=use_threads)[0].split("/")[-1]
142+
assert not filename.startswith(test_prefix)
143+
file_path = f"{path}0.csv"
144+
filename = wr.s3.to_csv(
145+
df=df, path=file_path, dataset=False, filename_prefix=filename_prefix, use_threads=use_threads
146+
)["paths"][0].split("/")[-1]
147+
assert not filename.startswith(test_prefix)
148+
149+
# If Dataset is True, csv file starts with prefix if one is supplied
150+
filename = wr.s3.to_csv(df=df, path=path, dataset=True, filename_prefix=filename_prefix, use_threads=use_threads)[
151+
"paths"
152+
][0].split("/")[-1]
153+
compare_filename_prefix(filename, filename_prefix, test_prefix)
154+
155+
# Partitioned
156+
filename = wr.s3.to_csv(
157+
df=df,
158+
path=path,
159+
dataset=True,
160+
filename_prefix=filename_prefix,
161+
partition_cols=["col2"],
162+
use_threads=use_threads,
163+
)["paths"][0].split("/")[-1]
164+
compare_filename_prefix(filename, filename_prefix, test_prefix)
165+
166+
# Bucketing
167+
filename = wr.s3.to_csv(
168+
df=df,
169+
path=path,
170+
dataset=True,
171+
filename_prefix=filename_prefix,
172+
bucketing_info=(["col2"], 2),
173+
use_threads=use_threads,
174+
)["paths"][0].split("/")[-1]
175+
compare_filename_prefix(filename, filename_prefix, test_prefix)
176+
assert filename.endswith("bucket-00000.csv")
177+
178+
133179
def test_fwf(path):
134180
text = "1 Herfelingen27-12-18\n2 Lambusart14-06-18\n3Spormaggiore15-04-18"
135181
client_s3 = boto3.client("s3")

0 commit comments

Comments
 (0)