Skip to content

Commit 27b728f

Browse files
committed
Splitting up catalog.py in multiple files.
1 parent 2b3c7eb commit 27b728f

File tree

9 files changed

+2138
-2034
lines changed

9 files changed

+2138
-2034
lines changed

awswrangler/catalog.py

Lines changed: 0 additions & 2033 deletions
This file was deleted.

awswrangler/catalog/__init__.py

Lines changed: 37 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,37 @@
1+
"""Amazon Glue Catalog Module."""
2+
3+
from awswrangler.catalog._add import add_csv_partitions, add_parquet_partitions # noqa
4+
from awswrangler.catalog._create import ( # noqa
5+
create_csv_table,
6+
create_database,
7+
create_parquet_table,
8+
overwrite_table_parameters,
9+
upsert_table_parameters,
10+
)
11+
from awswrangler.catalog._delete import delete_database, delete_table_if_exists # noqa
12+
from awswrangler.catalog._get import ( # noqa
13+
databases,
14+
get_columns_comments,
15+
get_connection,
16+
get_csv_partitions,
17+
get_databases,
18+
get_engine,
19+
get_parquet_partitions,
20+
get_partitions,
21+
get_table_description,
22+
get_table_location,
23+
get_table_parameters,
24+
get_table_types,
25+
get_tables,
26+
search_tables,
27+
table,
28+
tables,
29+
)
30+
from awswrangler.catalog._utils import ( # noqa
31+
does_table_exist,
32+
drop_duplicated_columns,
33+
extract_athena_types,
34+
sanitize_column_name,
35+
sanitize_dataframe_columns_names,
36+
sanitize_table_name,
37+
)

awswrangler/catalog/_add.py

Lines changed: 144 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,144 @@
1+
"""AWS Glue Catalog Delete Module."""
2+
3+
import logging
4+
from typing import Any, Dict, List, Optional
5+
6+
import boto3 # type: ignore
7+
8+
from awswrangler import _utils, exceptions
9+
from awswrangler._config import apply_configs
10+
from awswrangler.catalog._definitions import _csv_partition_definition, _parquet_partition_definition
11+
from awswrangler.catalog._utils import _catalog_id
12+
13+
_logger: logging.Logger = logging.getLogger(__name__)
14+
15+
16+
def _add_partitions(
17+
database: str,
18+
table: str,
19+
boto3_session: Optional[boto3.Session],
20+
inputs: List[Dict[str, Any]],
21+
catalog_id: Optional[str] = None,
22+
):
23+
chunks: List[List[Dict[str, Any]]] = _utils.chunkify(lst=inputs, max_length=100)
24+
client_glue: boto3.client = _utils.client(service_name="glue", session=boto3_session)
25+
for chunk in chunks: # pylint: disable=too-many-nested-blocks
26+
res: Dict[str, Any] = client_glue.batch_create_partition(
27+
**_catalog_id(catalog_id=catalog_id, DatabaseName=database, TableName=table, PartitionInputList=chunk)
28+
)
29+
if ("Errors" in res) and res["Errors"]:
30+
for error in res["Errors"]:
31+
if "ErrorDetail" in error:
32+
if "ErrorCode" in error["ErrorDetail"]:
33+
if error["ErrorDetail"]["ErrorCode"] != "AlreadyExistsException":
34+
raise exceptions.ServiceApiError(str(res["Errors"]))
35+
36+
37+
@apply_configs
38+
def add_csv_partitions(
39+
database: str,
40+
table: str,
41+
partitions_values: Dict[str, List[str]],
42+
compression: Optional[str] = None,
43+
sep: str = ",",
44+
boto3_session: Optional[boto3.Session] = None,
45+
) -> None:
46+
"""Add partitions (metadata) to a CSV Table in the AWS Glue Catalog.
47+
48+
Parameters
49+
----------
50+
database : str
51+
Database name.
52+
table : str
53+
Table name.
54+
partitions_values: Dict[str, List[str]]
55+
Dictionary with keys as S3 path locations and values as a list of partitions values as str
56+
(e.g. {'s3://bucket/prefix/y=2020/m=10/': ['2020', '10']}).
57+
compression: str, optional
58+
Compression style (``None``, ``gzip``, etc).
59+
sep : str
60+
String of length 1. Field delimiter for the output file.
61+
boto3_session : boto3.Session(), optional
62+
Boto3 Session. The default boto3 session will be used if boto3_session receive None.
63+
64+
Returns
65+
-------
66+
None
67+
None.
68+
69+
Examples
70+
--------
71+
>>> import awswrangler as wr
72+
>>> wr.catalog.add_csv_partitions(
73+
... database='default',
74+
... table='my_table',
75+
... partitions_values={
76+
... 's3://bucket/prefix/y=2020/m=10/': ['2020', '10'],
77+
... 's3://bucket/prefix/y=2020/m=11/': ['2020', '11'],
78+
... 's3://bucket/prefix/y=2020/m=12/': ['2020', '12']
79+
... }
80+
... )
81+
82+
"""
83+
inputs: List[Dict[str, Any]] = [
84+
_csv_partition_definition(location=k, values=v, compression=compression, sep=sep)
85+
for k, v in partitions_values.items()
86+
]
87+
_add_partitions(database=database, table=table, boto3_session=boto3_session, inputs=inputs)
88+
89+
90+
@apply_configs
91+
def add_parquet_partitions(
92+
database: str,
93+
table: str,
94+
partitions_values: Dict[str, List[str]],
95+
catalog_id: Optional[str] = None,
96+
compression: Optional[str] = None,
97+
boto3_session: Optional[boto3.Session] = None,
98+
) -> None:
99+
"""Add partitions (metadata) to a Parquet Table in the AWS Glue Catalog.
100+
101+
Parameters
102+
----------
103+
database : str
104+
Database name.
105+
table : str
106+
Table name.
107+
partitions_values: Dict[str, List[str]]
108+
Dictionary with keys as S3 path locations and values as a list of partitions values as str
109+
(e.g. {'s3://bucket/prefix/y=2020/m=10/': ['2020', '10']}).
110+
catalog_id : str, optional
111+
The ID of the Data Catalog from which to retrieve Databases.
112+
If none is provided, the AWS account ID is used by default.
113+
compression: str, optional
114+
Compression style (``None``, ``snappy``, ``gzip``, etc).
115+
boto3_session : boto3.Session(), optional
116+
Boto3 Session. The default boto3 session will be used if boto3_session receive None.
117+
118+
Returns
119+
-------
120+
None
121+
None.
122+
123+
Examples
124+
--------
125+
>>> import awswrangler as wr
126+
>>> wr.catalog.add_parquet_partitions(
127+
... database='default',
128+
... table='my_table',
129+
... partitions_values={
130+
... 's3://bucket/prefix/y=2020/m=10/': ['2020', '10'],
131+
... 's3://bucket/prefix/y=2020/m=11/': ['2020', '11'],
132+
... 's3://bucket/prefix/y=2020/m=12/': ['2020', '12']
133+
... }
134+
... )
135+
136+
"""
137+
if partitions_values:
138+
inputs: List[Dict[str, Any]] = [
139+
_parquet_partition_definition(location=k, values=v, compression=compression)
140+
for k, v in partitions_values.items()
141+
]
142+
_add_partitions(
143+
database=database, table=table, boto3_session=boto3_session, inputs=inputs, catalog_id=catalog_id
144+
)

0 commit comments

Comments
 (0)