Skip to content

Commit 51ee97c

Browse files
authored
Merge pull request #21 from awslabs/to-csv-sep
Add *sep* argument for to_csv function
2 parents f2d50b7 + 5bc343f commit 51ee97c

File tree

13 files changed

+145
-63
lines changed

13 files changed

+145
-63
lines changed

awswrangler/cloudwatchlogs.py

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -32,6 +32,7 @@ def start_query(self,
3232
:param limit: The maximum number of log events to return in the query.
3333
:return: Query ID
3434
"""
35+
logger.debug(f"log_group_names: {log_group_names}")
3536
start_timestamp = int(1000 * start_time.timestamp())
3637
end_timestamp = int(1000 * end_time.timestamp())
3738
logger.debug(f"start_timestamp: {start_timestamp}")

awswrangler/glue.py

Lines changed: 20 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -109,7 +109,8 @@ def metadata_to_glue(self,
109109
partition_cols=None,
110110
preserve_index=True,
111111
mode="append",
112-
cast_columns=None):
112+
cast_columns=None,
113+
extra_args=None):
113114
schema = Glue._build_schema(dataframe=dataframe,
114115
partition_cols=partition_cols,
115116
preserve_index=preserve_index,
@@ -120,14 +121,13 @@ def metadata_to_glue(self,
120121
self.delete_table_if_exists(database=database, table=table)
121122
exists = self.does_table_exists(database=database, table=table)
122123
if not exists:
123-
self.create_table(
124-
database=database,
125-
table=table,
126-
schema=schema,
127-
partition_cols=partition_cols,
128-
path=path,
129-
file_format=file_format,
130-
)
124+
self.create_table(database=database,
125+
table=table,
126+
schema=schema,
127+
partition_cols=partition_cols,
128+
path=path,
129+
file_format=file_format,
130+
extra_args=extra_args)
131131
if partition_cols:
132132
partitions_tuples = Glue._parse_partitions_tuples(
133133
objects_paths=objects_paths, partition_cols=partition_cols)
@@ -157,13 +157,17 @@ def create_table(self,
157157
schema,
158158
path,
159159
file_format,
160-
partition_cols=None):
160+
partition_cols=None,
161+
extra_args=None):
161162
if file_format == "parquet":
162163
table_input = Glue.parquet_table_definition(
163164
table, partition_cols, schema, path)
164165
elif file_format == "csv":
165-
table_input = Glue.csv_table_definition(table, partition_cols,
166-
schema, path)
166+
table_input = Glue.csv_table_definition(table,
167+
partition_cols,
168+
schema,
169+
path,
170+
extra_args=extra_args)
167171
else:
168172
raise UnsupportedFileFormat(file_format)
169173
self._client_glue.create_table(DatabaseName=database,
@@ -229,7 +233,8 @@ def _parse_table_name(path):
229233
return path.rpartition("/")[2]
230234

231235
@staticmethod
232-
def csv_table_definition(table, partition_cols, schema, path):
236+
def csv_table_definition(table, partition_cols, schema, path, extra_args):
237+
sep = extra_args["sep"] if "sep" in extra_args else ","
233238
if not partition_cols:
234239
partition_cols = []
235240
return {
@@ -245,7 +250,7 @@ def csv_table_definition(table, partition_cols, schema, path):
245250
"classification": "csv",
246251
"compressionType": "none",
247252
"typeOfData": "file",
248-
"delimiter": ",",
253+
"delimiter": sep,
249254
"columnsOrdered": "true",
250255
"areColumnsQuoted": "false",
251256
},
@@ -262,7 +267,7 @@ def csv_table_definition(table, partition_cols, schema, path):
262267
"NumberOfBuckets": -1,
263268
"SerdeInfo": {
264269
"Parameters": {
265-
"field.delim": ","
270+
"field.delim": sep
266271
},
267272
"SerializationLibrary":
268273
"org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe",

awswrangler/pandas.py

Lines changed: 71 additions & 45 deletions
Original file line numberDiff line numberDiff line change
@@ -433,6 +433,7 @@ def to_csv(
433433
self,
434434
dataframe,
435435
path,
436+
sep=",",
436437
database=None,
437438
table=None,
438439
partition_cols=None,
@@ -447,6 +448,7 @@ def to_csv(
447448
448449
:param dataframe: Pandas Dataframe
449450
:param path: AWS S3 path (E.g. s3://bucket-name/folder_name/
451+
:param sep: Same as pandas.to_csv()
450452
:param database: AWS Glue Database name
451453
:param table: AWS Glue table name
452454
:param partition_cols: List of columns names that will be partitions on S3
@@ -456,18 +458,18 @@ def to_csv(
456458
:param procs_io_bound: Number of cores used for I/O bound tasks
457459
:return: List of objects written on S3
458460
"""
459-
return self.to_s3(
460-
dataframe=dataframe,
461-
path=path,
462-
file_format="csv",
463-
database=database,
464-
table=table,
465-
partition_cols=partition_cols,
466-
preserve_index=preserve_index,
467-
mode=mode,
468-
procs_cpu_bound=procs_cpu_bound,
469-
procs_io_bound=procs_io_bound,
470-
)
461+
extra_args = {"sep": sep}
462+
return self.to_s3(dataframe=dataframe,
463+
path=path,
464+
file_format="csv",
465+
database=database,
466+
table=table,
467+
partition_cols=partition_cols,
468+
preserve_index=preserve_index,
469+
mode=mode,
470+
procs_cpu_bound=procs_cpu_bound,
471+
procs_io_bound=procs_io_bound,
472+
extra_args=extra_args)
471473

472474
def to_parquet(self,
473475
dataframe,
@@ -519,7 +521,8 @@ def to_s3(self,
519521
mode="append",
520522
procs_cpu_bound=None,
521523
procs_io_bound=None,
522-
cast_columns=None):
524+
cast_columns=None,
525+
extra_args=None):
523526
"""
524527
Write a Pandas Dataframe on S3
525528
Optionally writes metadata on AWS Glue.
@@ -535,6 +538,7 @@ def to_s3(self,
535538
:param procs_cpu_bound: Number of cores used for CPU bound tasks
536539
:param procs_io_bound: Number of cores used for I/O bound tasks
537540
:param cast_columns: Dictionary of columns indexes and Arrow types to be casted. (E.g. {2: "int64", 5: "int32"}) (Only for "parquet" file_format)
541+
:param extra_args: Extra arguments specific for each file formats (E.g. "sep" for CSV)
538542
:return: List of objects written on S3
539543
"""
540544
if dataframe.empty:
@@ -554,7 +558,8 @@ def to_s3(self,
554558
mode=mode,
555559
procs_cpu_bound=procs_cpu_bound,
556560
procs_io_bound=procs_io_bound,
557-
cast_columns=cast_columns)
561+
cast_columns=cast_columns,
562+
extra_args=extra_args)
558563
if database:
559564
self._session.glue.metadata_to_glue(dataframe=dataframe,
560565
path=path,
@@ -565,7 +570,8 @@ def to_s3(self,
565570
preserve_index=preserve_index,
566571
file_format=file_format,
567572
mode=mode,
568-
cast_columns=cast_columns)
573+
cast_columns=cast_columns,
574+
extra_args=extra_args)
569575
return objects_paths
570576

571577
def data_to_s3(self,
@@ -577,7 +583,8 @@ def data_to_s3(self,
577583
mode="append",
578584
procs_cpu_bound=None,
579585
procs_io_bound=None,
580-
cast_columns=None):
586+
cast_columns=None,
587+
extra_args=None):
581588
if not procs_cpu_bound:
582589
procs_cpu_bound = self._session.procs_cpu_bound
583590
if not procs_io_bound:
@@ -601,7 +608,8 @@ def data_to_s3(self,
601608
target=self._data_to_s3_dataset_writer_remote,
602609
args=(send_pipe, dataframe.iloc[bounder[0]:bounder[1], :],
603610
path, partition_cols, preserve_index,
604-
self._session.primitives, file_format, cast_columns),
611+
self._session.primitives, file_format, cast_columns,
612+
extra_args),
605613
)
606614
proc.daemon = False
607615
proc.start()
@@ -619,7 +627,8 @@ def data_to_s3(self,
619627
preserve_index=preserve_index,
620628
session_primitives=self._session.primitives,
621629
file_format=file_format,
622-
cast_columns=cast_columns)
630+
cast_columns=cast_columns,
631+
extra_args=extra_args)
623632
if mode == "overwrite_partitions" and partition_cols:
624633
if procs_io_bound > procs_cpu_bound:
625634
num_procs = floor(
@@ -639,7 +648,8 @@ def _data_to_s3_dataset_writer(dataframe,
639648
preserve_index,
640649
session_primitives,
641650
file_format,
642-
cast_columns=None):
651+
cast_columns=None,
652+
extra_args=None):
643653
objects_paths = []
644654
if not partition_cols:
645655
object_path = Pandas._data_to_s3_object_writer(
@@ -648,7 +658,8 @@ def _data_to_s3_dataset_writer(dataframe,
648658
preserve_index=preserve_index,
649659
session_primitives=session_primitives,
650660
file_format=file_format,
651-
cast_columns=cast_columns)
661+
cast_columns=cast_columns,
662+
extra_args=extra_args)
652663
objects_paths.append(object_path)
653664
else:
654665
for keys, subgroup in dataframe.groupby(partition_cols):
@@ -665,21 +676,21 @@ def _data_to_s3_dataset_writer(dataframe,
665676
preserve_index=preserve_index,
666677
session_primitives=session_primitives,
667678
file_format=file_format,
668-
cast_columns=cast_columns)
679+
cast_columns=cast_columns,
680+
extra_args=extra_args)
669681
objects_paths.append(object_path)
670682
return objects_paths
671683

672684
@staticmethod
673-
def _data_to_s3_dataset_writer_remote(
674-
send_pipe,
675-
dataframe,
676-
path,
677-
partition_cols,
678-
preserve_index,
679-
session_primitives,
680-
file_format,
681-
cast_columns=None,
682-
):
685+
def _data_to_s3_dataset_writer_remote(send_pipe,
686+
dataframe,
687+
path,
688+
partition_cols,
689+
preserve_index,
690+
session_primitives,
691+
file_format,
692+
cast_columns=None,
693+
extra_args=None):
683694
send_pipe.send(
684695
Pandas._data_to_s3_dataset_writer(
685696
dataframe=dataframe,
@@ -688,7 +699,8 @@ def _data_to_s3_dataset_writer_remote(
688699
preserve_index=preserve_index,
689700
session_primitives=session_primitives,
690701
file_format=file_format,
691-
cast_columns=cast_columns))
702+
cast_columns=cast_columns,
703+
extra_args=extra_args))
692704
send_pipe.close()
693705

694706
@staticmethod
@@ -697,7 +709,8 @@ def _data_to_s3_object_writer(dataframe,
697709
preserve_index,
698710
session_primitives,
699711
file_format,
700-
cast_columns=None):
712+
cast_columns=None,
713+
extra_args=None):
701714
fs = s3.get_fs(session_primitives=session_primitives)
702715
fs = pyarrow.filesystem._ensure_filesystem(fs)
703716
s3.mkdir_if_not_exists(fs, path)
@@ -713,27 +726,40 @@ def _data_to_s3_object_writer(dataframe,
713726
path=object_path,
714727
preserve_index=preserve_index,
715728
fs=fs,
716-
cast_columns=cast_columns)
729+
cast_columns=cast_columns,
730+
extra_args=extra_args)
717731
elif file_format == "csv":
718-
Pandas.write_csv_dataframe(
719-
dataframe=dataframe,
720-
path=object_path,
721-
preserve_index=preserve_index,
722-
fs=fs,
723-
)
732+
Pandas.write_csv_dataframe(dataframe=dataframe,
733+
path=object_path,
734+
preserve_index=preserve_index,
735+
fs=fs,
736+
extra_args=extra_args)
724737
return object_path
725738

726739
@staticmethod
727-
def write_csv_dataframe(dataframe, path, preserve_index, fs):
740+
def write_csv_dataframe(dataframe,
741+
path,
742+
preserve_index,
743+
fs,
744+
extra_args=None):
745+
csv_extra_args = {}
746+
if "sep" in extra_args:
747+
csv_extra_args["sep"] = extra_args["sep"]
728748
csv_buffer = bytes(
729-
dataframe.to_csv(None, header=False, index=preserve_index),
730-
"utf-8")
749+
dataframe.to_csv(None,
750+
header=False,
751+
index=preserve_index,
752+
**csv_extra_args), "utf-8")
731753
with fs.open(path, "wb") as f:
732754
f.write(csv_buffer)
733755

734756
@staticmethod
735-
def write_parquet_dataframe(dataframe, path, preserve_index, fs,
736-
cast_columns):
757+
def write_parquet_dataframe(dataframe,
758+
path,
759+
preserve_index,
760+
fs,
761+
cast_columns,
762+
extra_args=None):
737763
if not cast_columns:
738764
cast_columns = {}
739765
casted_in_pandas = []

building/build-docs.sh

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,5 @@
11
#!/bin/bash
2+
set -e
23

34
cd ..
45
sphinx-apidoc --separate -f -H "API Reference" -o docs/source/api awswrangler/

building/build-glue-egg.sh

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,5 @@
11
#!/bin/bash
2+
set -e
23

34
cd ..
45
rm -rf *.egg-info build dist/*.egg

building/build-image.sh

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,5 @@
11
#!/bin/bash
2+
set -e
23

34
cp ../requirements.txt .
45
cp ../requirements-dev.txt .

building/build-lambda-layer.sh

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,5 @@
11
#!/bin/bash
2+
set -e
23

34
# Go to home
45
cd ~

building/deploy-source.sh

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,5 @@
11
#!/bin/bash
2+
set -e
23

34
cd ..
45
rm -rf *.egg-info dist/*.tar.gz

building/publish.sh

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,5 @@
11
#!/bin/bash
2+
set -e
23

34
cd ..
45
rm -fr build dist .egg awswrangler.egg-info

testing/run-tests.sh

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,7 @@
11
#!/bin/bash
22

3+
set -e
4+
35
cd ..
46
rm -rf *.pytest_cache
57
yapf --in-place --recursive setup.py awswrangler testing/test_awswrangler

0 commit comments

Comments
 (0)