Skip to content

Commit 54823ad

Browse files
committed
Add Description, parameters and column's comments as args to Glue tables
1 parent d7f1d19 commit 54823ad

File tree

5 files changed

+158
-29
lines changed

5 files changed

+158
-29
lines changed

awswrangler/glue.py

Lines changed: 61 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -58,7 +58,29 @@ def metadata_to_glue(self,
5858
mode="append",
5959
compression=None,
6060
cast_columns=None,
61-
extra_args=None):
61+
extra_args=None,
62+
description: Optional[str] = None,
63+
parameters: Optional[Dict[str, str]] = None,
64+
columns_comments: Optional[Dict[str, str]] = None) -> None:
65+
"""
66+
67+
:param dataframe: Pandas Dataframe
68+
:param objects_paths: Files paths on S3
69+
:param preserve_index: Should preserve index on S3?
70+
:param partition_cols: partitions names
71+
:param mode: "append", "overwrite", "overwrite_partitions"
72+
:param cast_columns: Dictionary of columns names and Athena/Glue types to be casted. (E.g. {"col name": "bigint", "col2 name": "int"}) (Only for "parquet" file_format)
73+
:param database: AWS Glue Database name
74+
:param table: AWS Glue table name
75+
:param path: AWS S3 path (E.g. s3://bucket-name/folder_name/
76+
:param file_format: "csv" or "parquet"
77+
:param compression: None, gzip, snappy, etc
78+
:param extra_args: Extra arguments specific for each file formats (E.g. "sep" for CSV)
79+
:param description: Table description
80+
:param parameters: Key/value pairs to tag the table (Optional[Dict[str, str]])
81+
:param columns_comments: Columns names and the related comments (Optional[Dict[str, str]])
82+
:return: None
83+
"""
6284
indexes_position = "left" if file_format == "csv" else "right"
6385
schema, partition_cols_schema = Glue._build_schema(dataframe=dataframe,
6486
partition_cols=partition_cols,
@@ -78,7 +100,10 @@ def metadata_to_glue(self,
78100
path=path,
79101
file_format=file_format,
80102
compression=compression,
81-
extra_args=extra_args)
103+
extra_args=extra_args,
104+
description=description,
105+
parameters=parameters,
106+
columns_comments=columns_comments)
82107
if partition_cols:
83108
partitions_tuples = Glue._parse_partitions_tuples(objects_paths=objects_paths,
84109
partition_cols=partition_cols)
@@ -114,7 +139,26 @@ def create_table(self,
114139
file_format,
115140
compression,
116141
partition_cols_schema=None,
117-
extra_args=None):
142+
extra_args=None,
143+
description: Optional[str] = None,
144+
parameters: Optional[Dict[str, str]] = None,
145+
columns_comments: Optional[Dict[str, str]] = None) -> None:
146+
"""
147+
Create Glue table (Catalog)
148+
149+
:param database: AWS Glue Database name
150+
:param table: AWS Glue table name
151+
:param schema: Table schema
152+
:param path: AWS S3 path (E.g. s3://bucket-name/folder_name/
153+
:param file_format: "csv" or "parquet"
154+
:param compression: None, gzip, snappy, etc
155+
:param partition_cols_schema: Partitions schema
156+
:param extra_args: Extra arguments specific for each file formats (E.g. "sep" for CSV)
157+
:param description: Table description
158+
:param parameters: Key/value pairs to tag the table (Optional[Dict[str, str]])
159+
:param columns_comments: Columns names and the related comments (Optional[Dict[str, str]])
160+
:return: None
161+
"""
118162
if file_format == "parquet":
119163
table_input = Glue.parquet_table_definition(table, partition_cols_schema, schema, path, compression)
120164
elif file_format == "csv":
@@ -126,6 +170,20 @@ def create_table(self,
126170
extra_args=extra_args)
127171
else:
128172
raise UnsupportedFileFormat(file_format)
173+
if description is not None:
174+
table_input["Description"] = description
175+
if parameters is not None:
176+
for k, v in parameters.items():
177+
table_input["Parameters"][k] = v
178+
if columns_comments is not None:
179+
for col in table_input["StorageDescriptor"]["Columns"]:
180+
name = col["Name"]
181+
if name in columns_comments:
182+
col["Comment"] = columns_comments[name]
183+
for par in table_input["PartitionKeys"]:
184+
name = par["Name"]
185+
if name in columns_comments:
186+
par["Comment"] = columns_comments[name]
129187
self._client_glue.create_table(DatabaseName=database, TableInput=table_input)
130188

131189
def add_partitions(self, database, table, partition_paths, file_format, compression, extra_args=None):

awswrangler/pandas.py

Lines changed: 45 additions & 20 deletions
Original file line numberDiff line numberDiff line change
@@ -627,21 +627,22 @@ def _apply_dates_to_generator(generator, parse_dates):
627627
df[col] = df[col].dt.date.replace(to_replace={pd.NaT: None})
628628
yield df
629629

630-
def to_csv(
631-
self,
632-
dataframe,
633-
path,
634-
sep=",",
635-
serde="OpenCSVSerDe",
636-
database: Optional[str] = None,
637-
table=None,
638-
partition_cols=None,
639-
preserve_index=True,
640-
mode="append",
641-
procs_cpu_bound=None,
642-
procs_io_bound=None,
643-
inplace=True,
644-
):
630+
def to_csv(self,
631+
dataframe,
632+
path,
633+
sep=",",
634+
serde="OpenCSVSerDe",
635+
database: Optional[str] = None,
636+
table=None,
637+
partition_cols=None,
638+
preserve_index=True,
639+
mode="append",
640+
procs_cpu_bound=None,
641+
procs_io_bound=None,
642+
inplace=True,
643+
description: Optional[str] = None,
644+
parameters: Optional[Dict[str, str]] = None,
645+
columns_comments: Optional[Dict[str, str]] = None):
645646
"""
646647
Write a Pandas Dataframe as CSV files on S3
647648
Optionally writes metadata on AWS Glue.
@@ -658,6 +659,9 @@ def to_csv(
658659
:param procs_cpu_bound: Number of cores used for CPU bound tasks
659660
:param procs_io_bound: Number of cores used for I/O bound tasks
660661
:param inplace: True is cheapest (CPU and Memory) but False leaves your DataFrame intact
662+
:param description: Table description
663+
:param parameters: Key/value pairs to tag the table (Optional[Dict[str, str]])
664+
:param columns_comments: Columns names and the related comments (Optional[Dict[str, str]])
661665
:return: List of objects written on S3
662666
"""
663667
if serde not in Pandas.VALID_CSV_SERDES:
@@ -675,7 +679,10 @@ def to_csv(
675679
procs_cpu_bound=procs_cpu_bound,
676680
procs_io_bound=procs_io_bound,
677681
extra_args=extra_args,
678-
inplace=inplace)
682+
inplace=inplace,
683+
description=description,
684+
parameters=parameters,
685+
columns_comments=columns_comments)
679686

680687
def to_parquet(self,
681688
dataframe,
@@ -689,7 +696,10 @@ def to_parquet(self,
689696
procs_cpu_bound=None,
690697
procs_io_bound=None,
691698
cast_columns=None,
692-
inplace=True):
699+
inplace=True,
700+
description: Optional[str] = None,
701+
parameters: Optional[Dict[str, str]] = None,
702+
columns_comments: Optional[Dict[str, str]] = None):
693703
"""
694704
Write a Pandas Dataframe as parquet files on S3
695705
Optionally writes metadata on AWS Glue.
@@ -706,6 +716,9 @@ def to_parquet(self,
706716
:param procs_io_bound: Number of cores used for I/O bound tasks
707717
:param cast_columns: Dictionary of columns names and Athena/Glue types to be casted (E.g. {"col name": "bigint", "col2 name": "int"})
708718
:param inplace: True is cheapest (CPU and Memory) but False leaves your DataFrame intact
719+
:param description: Table description
720+
:param parameters: Key/value pairs to tag the table (Optional[Dict[str, str]])
721+
:param columns_comments: Columns names and the related comments (Optional[Dict[str, str]])
709722
:return: List of objects written on S3
710723
"""
711724
return self.to_s3(dataframe=dataframe,
@@ -720,7 +733,10 @@ def to_parquet(self,
720733
procs_cpu_bound=procs_cpu_bound,
721734
procs_io_bound=procs_io_bound,
722735
cast_columns=cast_columns,
723-
inplace=inplace)
736+
inplace=inplace,
737+
description=description,
738+
parameters=parameters,
739+
columns_comments=columns_comments)
724740

725741
def to_s3(self,
726742
dataframe: pd.DataFrame,
@@ -736,7 +752,10 @@ def to_s3(self,
736752
procs_io_bound=None,
737753
cast_columns=None,
738754
extra_args=None,
739-
inplace: bool = True) -> List[str]:
755+
inplace: bool = True,
756+
description: Optional[str] = None,
757+
parameters: Optional[Dict[str, str]] = None,
758+
columns_comments: Optional[Dict[str, str]] = None) -> List[str]:
740759
"""
741760
Write a Pandas Dataframe on S3
742761
Optionally writes metadata on AWS Glue.
@@ -755,6 +774,9 @@ def to_s3(self,
755774
:param cast_columns: Dictionary of columns names and Athena/Glue types to be casted. (E.g. {"col name": "bigint", "col2 name": "int"}) (Only for "parquet" file_format)
756775
:param extra_args: Extra arguments specific for each file formats (E.g. "sep" for CSV)
757776
:param inplace: True is cheapest (CPU and Memory) but False leaves your DataFrame intact
777+
:param description: Table description
778+
:param parameters: Key/value pairs to tag the table (Optional[Dict[str, str]])
779+
:param columns_comments: Columns names and the related comments (Optional[Dict[str, str]])
758780
:return: List of objects written on S3
759781
"""
760782
if partition_cols is None:
@@ -810,7 +832,10 @@ def to_s3(self,
810832
mode=mode,
811833
compression=compression,
812834
cast_columns=cast_columns,
813-
extra_args=extra_args)
835+
extra_args=extra_args,
836+
description=description,
837+
parameters=parameters,
838+
columns_comments=columns_comments)
814839
return objects_paths
815840

816841
def data_to_s3(self,

awswrangler/spark.py

Lines changed: 11 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -164,7 +164,10 @@ def create_glue_table(self,
164164
sep=",",
165165
partition_by=None,
166166
load_partitions=True,
167-
replace_if_exists=True):
167+
replace_if_exists=True,
168+
description: Optional[str] = None,
169+
parameters: Optional[Dict[str, str]] = None,
170+
columns_comments: Optional[Dict[str, str]] = None):
168171
"""
169172
Create a Glue metadata table pointing for some dataset stored on AWS S3.
170173
@@ -179,6 +182,9 @@ def create_glue_table(self,
179182
:param table: Glue table name. If not passed, extracted from the path
180183
:param load_partitions: Load partitions after the table creation
181184
:param replace_if_exists: Drop table and recreates that if already exists
185+
:param description: Table description
186+
:param parameters: Key/value pairs to tag the table (Optional[Dict[str, str]])
187+
:param columns_comments: Columns names and the related comments (Optional[Dict[str, str]])
182188
:return: None
183189
"""
184190
file_format = file_format.lower()
@@ -210,7 +216,10 @@ def create_glue_table(self,
210216
path=path,
211217
file_format=file_format,
212218
compression=compression,
213-
extra_args=extra_args)
219+
extra_args=extra_args,
220+
description=description,
221+
parameters=parameters,
222+
columns_comments=columns_comments)
214223
if load_partitions:
215224
self._session.athena.repair_table(database=database, table=table)
216225

testing/test_awswrangler/test_glue.py

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -130,9 +130,9 @@ def test_get_tables_suffix(session, table):
130130

131131

132132
def test_glue_utils(session, database, table):
133-
assert len(session.glue.databases().index) > 1
134-
assert len(session.glue.tables().index) > 1
135-
assert len(session.glue.table(database=database, name=table).index) > 1
133+
assert len(session.glue.databases().index) > 0
134+
assert len(session.glue.tables().index) > 0
135+
assert len(session.glue.table(database=database, name=table).index) > 0
136136

137137

138138
def test_glue_tables_full(session, database, table):
@@ -141,4 +141,4 @@ def test_glue_tables_full(session, database, table):
141141
search_text="parquet",
142142
name_contains=table[1:-1],
143143
name_prefix=table[0],
144-
name_suffix=table[-1]).index) > 1
144+
name_suffix=table[-1]).index) > 0

testing/test_awswrangler/test_pandas.py

Lines changed: 37 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1777,3 +1777,40 @@ def test_read_csv_list_iterator(bucket, sample, row_num):
17771777
total_count += count
17781778
wr.s3.delete_listed_objects(objects_paths=paths)
17791779
assert total_count == row_num * n
1780+
1781+
1782+
def test_to_csv_metadata(
1783+
session,
1784+
bucket,
1785+
database,
1786+
):
1787+
session.glue.delete_table_if_exists(table="test_to_csv_metadata", database=database)
1788+
assert len(session.glue.tables(database=database, search_text="boo bar").index) == 0
1789+
dataframe = pd.read_csv("data_samples/nano.csv")
1790+
session.pandas.to_csv(dataframe=dataframe,
1791+
database=database,
1792+
path=f"s3://{bucket}/test_to_csv_metadata/",
1793+
preserve_index=False,
1794+
mode="overwrite",
1795+
sep="|",
1796+
description="foo boo bar",
1797+
parameters={
1798+
"123": "345",
1799+
"678": "910"
1800+
},
1801+
columns_comments={
1802+
"name": "zoo",
1803+
"value": "zaa"
1804+
})
1805+
dataframe2 = None
1806+
for counter in range(10):
1807+
sleep(1)
1808+
dataframe2 = session.pandas.read_sql_athena(ctas_approach=False,
1809+
sql="select * from test_to_csv_metadata",
1810+
database=database)
1811+
if len(dataframe.index) == len(dataframe2.index):
1812+
break
1813+
assert len(dataframe.index) == len(dataframe2.index)
1814+
assert len(list(dataframe.columns)) == len(list(dataframe2.columns))
1815+
assert len(session.glue.tables(database=database, search_text="boo bar").index) == 1
1816+
assert len(session.glue.tables(database=database, search_text="value").index) > 0

0 commit comments

Comments
 (0)