Skip to content

Commit e0ddef1

Browse files
authored
Merge branch 'master' into aurora
2 parents 7b9921a + bc9ebda commit e0ddef1

File tree

9 files changed

+731
-28
lines changed

9 files changed

+731
-28
lines changed

awswrangler/glue.py

Lines changed: 244 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -1,8 +1,11 @@
1-
from typing import Dict, Optional
1+
from typing import Dict, Optional, Any, Iterator, List
22
from math import ceil
3+
from itertools import islice
34
import re
45
import logging
56

7+
from pandas import DataFrame # type: ignore
8+
69
from awswrangler import data_types
710
from awswrangler.athena import Athena
811
from awswrangler.exceptions import UnsupportedFileFormat, InvalidSerDe, ApiError, UnsupportedType, UndetectedType, InvalidTable, InvalidArguments
@@ -55,7 +58,29 @@ def metadata_to_glue(self,
5558
mode="append",
5659
compression=None,
5760
cast_columns=None,
58-
extra_args=None):
61+
extra_args=None,
62+
description: Optional[str] = None,
63+
parameters: Optional[Dict[str, str]] = None,
64+
columns_comments: Optional[Dict[str, str]] = None) -> None:
65+
"""
66+
67+
:param dataframe: Pandas Dataframe
68+
:param objects_paths: Files paths on S3
69+
:param preserve_index: Should preserve index on S3?
70+
:param partition_cols: partitions names
71+
:param mode: "append", "overwrite", "overwrite_partitions"
72+
:param cast_columns: Dictionary of columns names and Athena/Glue types to be casted. (E.g. {"col name": "bigint", "col2 name": "int"}) (Only for "parquet" file_format)
73+
:param database: AWS Glue Database name
74+
:param table: AWS Glue table name
75+
:param path: AWS S3 path (E.g. s3://bucket-name/folder_name/
76+
:param file_format: "csv" or "parquet"
77+
:param compression: None, gzip, snappy, etc
78+
:param extra_args: Extra arguments specific for each file formats (E.g. "sep" for CSV)
79+
:param description: Table description
80+
:param parameters: Key/value pairs to tag the table (Optional[Dict[str, str]])
81+
:param columns_comments: Columns names and the related comments (Optional[Dict[str, str]])
82+
:return: None
83+
"""
5984
indexes_position = "left" if file_format == "csv" else "right"
6085
schema, partition_cols_schema = Glue._build_schema(dataframe=dataframe,
6186
partition_cols=partition_cols,
@@ -75,7 +100,10 @@ def metadata_to_glue(self,
75100
path=path,
76101
file_format=file_format,
77102
compression=compression,
78-
extra_args=extra_args)
103+
extra_args=extra_args,
104+
description=description,
105+
parameters=parameters,
106+
columns_comments=columns_comments)
79107
if partition_cols:
80108
partitions_tuples = Glue._parse_partitions_tuples(objects_paths=objects_paths,
81109
partition_cols=partition_cols)
@@ -111,7 +139,26 @@ def create_table(self,
111139
file_format,
112140
compression,
113141
partition_cols_schema=None,
114-
extra_args=None):
142+
extra_args=None,
143+
description: Optional[str] = None,
144+
parameters: Optional[Dict[str, str]] = None,
145+
columns_comments: Optional[Dict[str, str]] = None) -> None:
146+
"""
147+
Create Glue table (Catalog)
148+
149+
:param database: AWS Glue Database name
150+
:param table: AWS Glue table name
151+
:param schema: Table schema
152+
:param path: AWS S3 path (E.g. s3://bucket-name/folder_name/
153+
:param file_format: "csv" or "parquet"
154+
:param compression: None, gzip, snappy, etc
155+
:param partition_cols_schema: Partitions schema
156+
:param extra_args: Extra arguments specific for each file formats (E.g. "sep" for CSV)
157+
:param description: Table description
158+
:param parameters: Key/value pairs to tag the table (Optional[Dict[str, str]])
159+
:param columns_comments: Columns names and the related comments (Optional[Dict[str, str]])
160+
:return: None
161+
"""
115162
if file_format == "parquet":
116163
table_input = Glue.parquet_table_definition(table, partition_cols_schema, schema, path, compression)
117164
elif file_format == "csv":
@@ -123,6 +170,20 @@ def create_table(self,
123170
extra_args=extra_args)
124171
else:
125172
raise UnsupportedFileFormat(file_format)
173+
if description is not None:
174+
table_input["Description"] = description
175+
if parameters is not None:
176+
for k, v in parameters.items():
177+
table_input["Parameters"][k] = v
178+
if columns_comments is not None:
179+
for col in table_input["StorageDescriptor"]["Columns"]:
180+
name = col["Name"]
181+
if name in columns_comments:
182+
col["Comment"] = columns_comments[name]
183+
for par in table_input["PartitionKeys"]:
184+
name = par["Name"]
185+
if name in columns_comments:
186+
par["Comment"] = columns_comments[name]
126187
self._client_glue.create_table(DatabaseName=database, TableInput=table_input)
127188

128189
def add_partitions(self, database, table, partition_paths, file_format, compression, extra_args=None):
@@ -390,3 +451,182 @@ def get_table_location(self, database: str, table: str):
390451
return res["Table"]["StorageDescriptor"]["Location"]
391452
except KeyError:
392453
raise InvalidTable(f"{database}.{table}")
454+
455+
def get_databases(self, catalog_id: Optional[str] = None) -> Iterator[Dict[str, Any]]:
456+
"""
457+
Get an iterator of databases
458+
459+
:param catalog_id: The ID of the Data Catalog from which to retrieve Databases. If none is provided, the AWS account ID is used by default.
460+
:return: Iterator[Dict[str, Any]] of Databases
461+
"""
462+
paginator = self._client_glue.get_paginator("get_databases")
463+
if catalog_id is None:
464+
response_iterator = paginator.paginate()
465+
else:
466+
response_iterator = paginator.paginate(CatalogId=catalog_id)
467+
for page in response_iterator:
468+
for db in page["DatabaseList"]:
469+
yield db
470+
471+
def get_tables(self,
472+
catalog_id: Optional[str] = None,
473+
database: Optional[str] = None,
474+
name_contains: Optional[str] = None,
475+
name_prefix: Optional[str] = None,
476+
name_suffix: Optional[str] = None) -> Iterator[Dict[str, Any]]:
477+
"""
478+
Get an iterator of tables
479+
480+
:param catalog_id: The ID of the Data Catalog from which to retrieve Databases. If none is provided, the AWS account ID is used by default.
481+
:param database: Filter a specific database
482+
:param name_contains: Select by a specific string on table name
483+
:param name_prefix: Select by a specific prefix on table name
484+
:param name_suffix: Select by a specific suffix on table name
485+
:return: Iterator[Dict[str, Any]] of Tables
486+
"""
487+
paginator = self._client_glue.get_paginator("get_tables")
488+
args: Dict[str, str] = {}
489+
if catalog_id is not None:
490+
args["CatalogId"] = catalog_id
491+
if (name_prefix is not None) and (name_suffix is not None) and (name_contains is not None):
492+
args["Expression"] = f"{name_prefix}.*{name_contains}.*{name_suffix}"
493+
elif (name_prefix is not None) and (name_suffix is not None):
494+
args["Expression"] = f"{name_prefix}.*{name_suffix}"
495+
elif name_contains is not None:
496+
args["Expression"] = f".*{name_contains}.*"
497+
elif name_prefix is not None:
498+
args["Expression"] = f"{name_prefix}.*"
499+
elif name_suffix is not None:
500+
args["Expression"] = f".*{name_suffix}"
501+
if database is not None:
502+
databases = [database]
503+
else:
504+
databases = [x["Name"] for x in self.get_databases(catalog_id=catalog_id)]
505+
for db in databases:
506+
args["DatabaseName"] = db
507+
response_iterator = paginator.paginate(**args)
508+
for page in response_iterator:
509+
for tbl in page["TableList"]:
510+
yield tbl
511+
512+
def tables(self,
513+
limit: int = 100,
514+
catalog_id: Optional[str] = None,
515+
database: Optional[str] = None,
516+
search_text: Optional[str] = None,
517+
name_contains: Optional[str] = None,
518+
name_prefix: Optional[str] = None,
519+
name_suffix: Optional[str] = None) -> DataFrame:
520+
"""
521+
Get a Dataframe with tables filtered by a search term, prefix, suffix.
522+
523+
:param limit: Max number of tables
524+
:param catalog_id: The ID of the Data Catalog from which to retrieve Databases. If none is provided, the AWS account ID is used by default.
525+
:param database: Glue database name
526+
:param search_text: Select only tables with the given string in table's properties
527+
:param name_contains: Select by a specific string on table name
528+
:param name_prefix: Select only tables with the given string in the name prefix
529+
:param name_suffix: Select only tables with the given string in the name suffix
530+
:return: Pandas Dataframe filled by formatted infos
531+
"""
532+
if search_text is None:
533+
table_iter = self.get_tables(catalog_id=catalog_id,
534+
database=database,
535+
name_contains=name_contains,
536+
name_prefix=name_prefix,
537+
name_suffix=name_suffix)
538+
tables: List[Dict[str, Any]] = list(islice(table_iter, limit))
539+
else:
540+
tables = list(self.search_tables(text=search_text, catalog_id=catalog_id))
541+
if database is not None:
542+
tables = [x for x in tables if x["DatabaseName"] == database]
543+
if name_contains is not None:
544+
tables = [x for x in tables if name_contains in x["Name"]]
545+
if name_prefix is not None:
546+
tables = [x for x in tables if x["Name"].startswith(name_prefix)]
547+
if name_suffix is not None:
548+
tables = [x for x in tables if x["Name"].endswith(name_suffix)]
549+
tables = tables[:limit]
550+
551+
df_dict: Dict[str, List] = {"Database": [], "Table": [], "Description": [], "Columns": [], "Partitions": []}
552+
for table in tables:
553+
df_dict["Database"].append(table["DatabaseName"])
554+
df_dict["Table"].append(table["Name"])
555+
if "Description" in table:
556+
df_dict["Description"].append(table["Description"])
557+
else:
558+
df_dict["Description"].append("")
559+
df_dict["Columns"].append(", ".join([x["Name"] for x in table["StorageDescriptor"]["Columns"]]))
560+
df_dict["Partitions"].append(", ".join([x["Name"] for x in table["PartitionKeys"]]))
561+
return DataFrame(data=df_dict)
562+
563+
def search_tables(self, text: str, catalog_id: Optional[str] = None):
564+
"""
565+
Get iterator of tables filtered by a search string.
566+
567+
:param text: Select only tables with the given string in table's properties.
568+
:param catalog_id: The ID of the Data Catalog from which to retrieve Databases. If none is provided, the AWS account ID is used by default.
569+
:return: Iterator of tables
570+
"""
571+
args: Dict[str, Any] = {"SearchText": text}
572+
if catalog_id is not None:
573+
args["CatalogId"] = catalog_id
574+
response = self._client_glue.search_tables(**args)
575+
for tbl in response["TableList"]:
576+
yield tbl
577+
while "NextToken" in response:
578+
args["NextToken"] = response["NextToken"]
579+
response = self._client_glue.search_tables(**args)
580+
for tbl in response["TableList"]:
581+
yield tbl
582+
583+
def databases(self, limit: int = 100, catalog_id: Optional[str] = None) -> DataFrame:
584+
"""
585+
Get iterator of databases.
586+
587+
:param limit: Max number of tables
588+
:param catalog_id: The ID of the Data Catalog from which to retrieve Databases. If none is provided, the AWS account ID is used by default.
589+
:return: Pandas Dataframe filled by formatted infos
590+
"""
591+
database_iter = self.get_databases(catalog_id=catalog_id)
592+
dbs = islice(database_iter, limit)
593+
df_dict: Dict[str, List] = {"Database": [], "Description": []}
594+
for db in dbs:
595+
df_dict["Database"].append(db["Name"])
596+
if "Description" in db:
597+
df_dict["Description"].append(db["Description"])
598+
else:
599+
df_dict["Description"].append("")
600+
return DataFrame(data=df_dict)
601+
602+
def table(self, database: str, name: str, catalog_id: Optional[str] = None) -> DataFrame:
603+
"""
604+
Get table details as Pandas Dataframe
605+
606+
:param database: Glue database name
607+
:param name: Table name
608+
:param catalog_id: The ID of the Data Catalog from which to retrieve Databases. If none is provided, the AWS account ID is used by default.
609+
:return: Pandas Dataframe filled by formatted infos
610+
"""
611+
if catalog_id is None:
612+
table: Dict[str, Any] = self._client_glue.get_table(DatabaseName=database, Name=name)["Table"]
613+
else:
614+
table = self._client_glue.get_table(CatalogId=catalog_id, DatabaseName=database, Name=name)["Table"]
615+
df_dict: Dict[str, List] = {"Column Name": [], "Type": [], "Partition": [], "Comment": []}
616+
for col in table["StorageDescriptor"]["Columns"]:
617+
df_dict["Column Name"].append(col["Name"])
618+
df_dict["Type"].append(col["Type"])
619+
df_dict["Partition"].append(False)
620+
if "Comment" in table:
621+
df_dict["Comment"].append(table["Comment"])
622+
else:
623+
df_dict["Comment"].append("")
624+
for col in table["PartitionKeys"]:
625+
df_dict["Column Name"].append(col["Name"])
626+
df_dict["Type"].append(col["Type"])
627+
df_dict["Partition"].append(True)
628+
if "Comment" in table:
629+
df_dict["Comment"].append(table["Comment"])
630+
else:
631+
df_dict["Comment"].append("")
632+
return DataFrame(data=df_dict)

0 commit comments

Comments
 (0)