Skip to content

Commit 6aa22bc

Browse files
feat: Add s3.read_orc and s3.to_orc (#2312)
1 parent 311563e commit 6aa22bc

File tree

23 files changed

+3324
-396
lines changed

23 files changed

+3324
-396
lines changed

awswrangler/catalog/__init__.py

Lines changed: 12 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,13 +1,22 @@
11
"""Amazon Glue Catalog Module."""
22

3-
from awswrangler.catalog._add import add_column, add_csv_partitions, add_json_partitions, add_parquet_partitions # noqa
3+
from awswrangler.catalog._add import (
4+
add_column,
5+
add_csv_partitions,
6+
add_json_partitions,
7+
add_orc_partitions,
8+
add_parquet_partitions,
9+
)
10+
11+
# noqa
412
from awswrangler.catalog._create import ( # noqa
513
_create_csv_table,
614
_create_json_table,
715
_create_parquet_table,
816
create_csv_table,
917
create_database,
1018
create_json_table,
19+
create_orc_table,
1120
create_parquet_table,
1221
overwrite_table_parameters,
1322
upsert_table_parameters,
@@ -54,6 +63,7 @@
5463
"add_csv_partitions",
5564
"add_json_partitions",
5665
"add_parquet_partitions",
66+
"add_orc_partitions",
5767
"does_table_exist",
5868
"delete_column",
5969
"drop_duplicated_columns",
@@ -68,6 +78,7 @@
6878
"create_csv_table",
6979
"create_database",
7080
"create_parquet_table",
81+
"create_orc_table",
7182
"create_json_table",
7283
"overwrite_table_parameters",
7384
"upsert_table_parameters",

awswrangler/catalog/_add.py

Lines changed: 79 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -11,6 +11,7 @@
1111
_check_column_type,
1212
_csv_partition_definition,
1313
_json_partition_definition,
14+
_orc_partition_definition,
1415
_parquet_partition_definition,
1516
_update_table_definition,
1617
)
@@ -294,6 +295,84 @@ def add_parquet_partitions(
294295
)
295296

296297

298+
@apply_configs
299+
def add_orc_partitions(
300+
database: str,
301+
table: str,
302+
partitions_values: Dict[str, List[str]],
303+
bucketing_info: Optional[typing.BucketingInfoTuple] = None,
304+
catalog_id: Optional[str] = None,
305+
compression: Optional[str] = None,
306+
boto3_session: Optional[boto3.Session] = None,
307+
columns_types: Optional[Dict[str, str]] = None,
308+
partitions_parameters: Optional[Dict[str, str]] = None,
309+
) -> None:
310+
"""Add partitions (metadata) to a ORC Table in the AWS Glue Catalog.
311+
312+
Parameters
313+
----------
314+
database : str
315+
Database name.
316+
table : str
317+
Table name.
318+
partitions_values: Dict[str, List[str]]
319+
Dictionary with keys as S3 path locations and values as a list of partitions values as str
320+
(e.g. {'s3://bucket/prefix/y=2020/m=10/': ['2020', '10']}).
321+
bucketing_info: Tuple[List[str], int], optional
322+
Tuple consisting of the column names used for bucketing as the first element and the number of buckets as the
323+
second element.
324+
Only `str`, `int` and `bool` are supported as column data types for bucketing.
325+
catalog_id : str, optional
326+
The ID of the Data Catalog from which to retrieve Databases.
327+
If none is provided, the AWS account ID is used by default.
328+
compression: str, optional
329+
Compression style (``None``, ``snappy``, ``zlib``, etc).
330+
boto3_session : boto3.Session(), optional
331+
Boto3 Session. The default boto3 session will be used if boto3_session receive None.
332+
columns_types: Optional[Dict[str, str]]
333+
Only required for Hive compability.
334+
Dictionary with keys as column names and values as data types (e.g. {'col0': 'bigint', 'col1': 'double'}).
335+
P.S. Only materialized columns please, not partition columns.
336+
partitions_parameters: Optional[Dict[str, str]]
337+
Dictionary with key-value pairs defining partition parameters.
338+
339+
Returns
340+
-------
341+
None
342+
None.
343+
344+
Examples
345+
--------
346+
>>> import awswrangler as wr
347+
>>> wr.catalog.add_orc_partitions(
348+
... database='default',
349+
... table='my_table',
350+
... partitions_values={
351+
... 's3://bucket/prefix/y=2020/m=10/': ['2020', '10'],
352+
... 's3://bucket/prefix/y=2020/m=11/': ['2020', '11'],
353+
... 's3://bucket/prefix/y=2020/m=12/': ['2020', '12']
354+
... }
355+
... )
356+
357+
"""
358+
table = sanitize_table_name(table=table)
359+
if partitions_values:
360+
inputs: List[Dict[str, Any]] = [
361+
_orc_partition_definition(
362+
location=k,
363+
values=v,
364+
bucketing_info=bucketing_info,
365+
compression=compression,
366+
columns_types=columns_types,
367+
partitions_parameters=partitions_parameters,
368+
)
369+
for k, v in partitions_values.items()
370+
]
371+
_add_partitions(
372+
database=database, table=table, boto3_session=boto3_session, inputs=inputs, catalog_id=catalog_id
373+
)
374+
375+
297376
@apply_configs
298377
def add_column(
299378
database: str,

awswrangler/catalog/_create.py

Lines changed: 230 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -7,7 +7,12 @@
77

88
from awswrangler import _utils, exceptions, typing
99
from awswrangler._config import apply_configs
10-
from awswrangler.catalog._definitions import _csv_table_definition, _json_table_definition, _parquet_table_definition
10+
from awswrangler.catalog._definitions import (
11+
_csv_table_definition,
12+
_json_table_definition,
13+
_orc_table_definition,
14+
_parquet_table_definition,
15+
)
1116
from awswrangler.catalog._delete import delete_all_partitions, delete_table_if_exists
1217
from awswrangler.catalog._get import _get_table_input
1318
from awswrangler.catalog._utils import _catalog_id, _transaction_id, sanitize_column_name, sanitize_table_name
@@ -335,6 +340,68 @@ def _create_parquet_table(
335340
)
336341

337342

343+
def _create_orc_table(
344+
database: str,
345+
table: str,
346+
path: str,
347+
columns_types: Dict[str, str],
348+
table_type: Optional[str],
349+
partitions_types: Optional[Dict[str, str]],
350+
bucketing_info: Optional[typing.BucketingInfoTuple],
351+
catalog_id: Optional[str],
352+
compression: Optional[str],
353+
description: Optional[str],
354+
parameters: Optional[Dict[str, str]],
355+
columns_comments: Optional[Dict[str, str]],
356+
mode: str,
357+
catalog_versioning: bool,
358+
transaction_id: Optional[str],
359+
athena_partition_projection_settings: Optional[typing.AthenaPartitionProjectionSettings],
360+
boto3_session: Optional[boto3.Session],
361+
catalog_table_input: Optional[Dict[str, Any]],
362+
) -> None:
363+
table = sanitize_table_name(table=table)
364+
partitions_types = {} if partitions_types is None else partitions_types
365+
_logger.debug("catalog_table_input: %s", catalog_table_input)
366+
367+
table_input: Dict[str, Any]
368+
if (catalog_table_input is not None) and (mode in ("append", "overwrite_partitions")):
369+
table_input = catalog_table_input
370+
371+
is_table_updated = _update_table_input(table_input, columns_types)
372+
if is_table_updated:
373+
mode = "update"
374+
else:
375+
table_input = _orc_table_definition(
376+
table=table,
377+
path=path,
378+
columns_types=columns_types,
379+
table_type=table_type,
380+
partitions_types=partitions_types,
381+
bucketing_info=bucketing_info,
382+
compression=compression,
383+
)
384+
table_exist: bool = catalog_table_input is not None
385+
_logger.debug("table_exist: %s", table_exist)
386+
_create_table(
387+
database=database,
388+
table=table,
389+
description=description,
390+
parameters=parameters,
391+
columns_comments=columns_comments,
392+
mode=mode,
393+
catalog_versioning=catalog_versioning,
394+
boto3_session=boto3_session,
395+
table_input=table_input,
396+
table_type=table_type,
397+
table_exist=table_exist,
398+
partitions_types=partitions_types,
399+
transaction_id=transaction_id,
400+
athena_partition_projection_settings=athena_partition_projection_settings,
401+
catalog_id=catalog_id,
402+
)
403+
404+
338405
def _create_csv_table( # pylint: disable=too-many-arguments,too-many-locals
339406
database: str,
340407
table: str,
@@ -827,6 +894,168 @@ def create_parquet_table(
827894
)
828895

829896

897+
@apply_configs
898+
def create_orc_table(
899+
database: str,
900+
table: str,
901+
path: str,
902+
columns_types: Dict[str, str],
903+
table_type: Optional[str] = None,
904+
partitions_types: Optional[Dict[str, str]] = None,
905+
bucketing_info: Optional[typing.BucketingInfoTuple] = None,
906+
catalog_id: Optional[str] = None,
907+
compression: Optional[str] = None,
908+
description: Optional[str] = None,
909+
parameters: Optional[Dict[str, str]] = None,
910+
columns_comments: Optional[Dict[str, str]] = None,
911+
mode: Literal["overwrite", "append"] = "overwrite",
912+
catalog_versioning: bool = False,
913+
transaction_id: Optional[str] = None,
914+
athena_partition_projection_settings: Optional[typing.AthenaPartitionProjectionSettings] = None,
915+
boto3_session: Optional[boto3.Session] = None,
916+
) -> None:
917+
"""Create a ORC Table (Metadata Only) in the AWS Glue Catalog.
918+
919+
'https://docs.aws.amazon.com/athena/latest/ug/data-types.html'
920+
921+
Parameters
922+
----------
923+
database : str
924+
Database name.
925+
table : str
926+
Table name.
927+
path : str
928+
Amazon S3 path (e.g. s3://bucket/prefix/).
929+
columns_types: Dict[str, str]
930+
Dictionary with keys as column names and values as data types (e.g. {'col0': 'bigint', 'col1': 'double'}).
931+
table_type: str, optional
932+
The type of the Glue Table (EXTERNAL_TABLE, GOVERNED...). Set to EXTERNAL_TABLE if None
933+
partitions_types: Dict[str, str], optional
934+
Dictionary with keys as partition names and values as data types (e.g. {'col2': 'date'}).
935+
bucketing_info: Tuple[List[str], int], optional
936+
Tuple consisting of the column names used for bucketing as the first element and the number of buckets as the
937+
second element.
938+
Only `str`, `int` and `bool` are supported as column data types for bucketing.
939+
catalog_id : str, optional
940+
The ID of the Data Catalog from which to retrieve Databases.
941+
If none is provided, the AWS account ID is used by default.
942+
compression: str, optional
943+
Compression style (``None``, ``snappy``, ``gzip``, etc).
944+
description: str, optional
945+
Table description
946+
parameters: Dict[str, str], optional
947+
Key/value pairs to tag the table.
948+
columns_comments: Dict[str, str], optional
949+
Columns names and the related comments (e.g. {'col0': 'Column 0.', 'col1': 'Column 1.', 'col2': 'Partition.'}).
950+
mode: str
951+
'overwrite' to recreate any possible existing table or 'append' to keep any possible existing table.
952+
catalog_versioning : bool
953+
If True and `mode="overwrite"`, creates an archived version of the table catalog before updating it.
954+
transaction_id: str, optional
955+
The ID of the transaction (i.e. used with GOVERNED tables).
956+
athena_partition_projection_settings: typing.AthenaPartitionProjectionSettings, optional
957+
Parameters of the Athena Partition Projection (https://docs.aws.amazon.com/athena/latest/ug/partition-projection.html).
958+
AthenaPartitionProjectionSettings is a `TypedDict`, meaning the passed parameter can be instantiated either as an
959+
instance of AthenaPartitionProjectionSettings or as a regular Python dict.
960+
961+
Following projection parameters are supported:
962+
963+
.. list-table:: Projection Parameters
964+
:header-rows: 1
965+
966+
* - Name
967+
- Type
968+
- Description
969+
* - projection_types
970+
- Optional[Dict[str, str]]
971+
- Dictionary of partitions names and Athena projections types.
972+
Valid types: "enum", "integer", "date", "injected"
973+
https://docs.aws.amazon.com/athena/latest/ug/partition-projection-supported-types.html
974+
(e.g. {'col_name': 'enum', 'col2_name': 'integer'})
975+
* - projection_ranges
976+
- Optional[Dict[str, str]]
977+
- Dictionary of partitions names and Athena projections ranges.
978+
https://docs.aws.amazon.com/athena/latest/ug/partition-projection-supported-types.html
979+
(e.g. {'col_name': '0,10', 'col2_name': '-1,8675309'})
980+
* - projection_values
981+
- Optional[Dict[str, str]]
982+
- Dictionary of partitions names and Athena projections values.
983+
https://docs.aws.amazon.com/athena/latest/ug/partition-projection-supported-types.html
984+
(e.g. {'col_name': 'A,B,Unknown', 'col2_name': 'foo,boo,bar'})
985+
* - projection_intervals
986+
- Optional[Dict[str, str]]
987+
- Dictionary of partitions names and Athena projections intervals.
988+
https://docs.aws.amazon.com/athena/latest/ug/partition-projection-supported-types.html
989+
(e.g. {'col_name': '1', 'col2_name': '5'})
990+
* - projection_digits
991+
- Optional[Dict[str, str]]
992+
- Dictionary of partitions names and Athena projections digits.
993+
https://docs.aws.amazon.com/athena/latest/ug/partition-projection-supported-types.html
994+
(e.g. {'col_name': '1', 'col2_name': '2'})
995+
* - projection_formats
996+
- Optional[Dict[str, str]]
997+
- Dictionary of partitions names and Athena projections formats.
998+
https://docs.aws.amazon.com/athena/latest/ug/partition-projection-supported-types.html
999+
(e.g. {'col_date': 'yyyy-MM-dd', 'col2_timestamp': 'yyyy-MM-dd HH:mm:ss'})
1000+
* - projection_storage_location_template
1001+
- Optional[str]
1002+
- Value which is allows Athena to properly map partition values if the S3 file locations do not follow
1003+
a typical `.../column=value/...` pattern.
1004+
https://docs.aws.amazon.com/athena/latest/ug/partition-projection-setting-up.html
1005+
(e.g. s3://bucket/table_root/a=${a}/${b}/some_static_subdirectory/${c}/)
1006+
boto3_session : boto3.Session(), optional
1007+
Boto3 Session. The default boto3 session will be used if boto3_session receive None.
1008+
1009+
Returns
1010+
-------
1011+
None
1012+
None.
1013+
1014+
Examples
1015+
--------
1016+
>>> import awswrangler as wr
1017+
>>> wr.catalog.create_parquet_table(
1018+
... database='default',
1019+
... table='my_table',
1020+
... path='s3://bucket/prefix/',
1021+
... columns_types={'col0': 'bigint', 'col1': 'double'},
1022+
... partitions_types={'col2': 'date'},
1023+
... compression='snappy',
1024+
... description='My own table!',
1025+
... parameters={'source': 'postgresql'},
1026+
... columns_comments={'col0': 'Column 0.', 'col1': 'Column 1.', 'col2': 'Partition.'}
1027+
... )
1028+
1029+
"""
1030+
catalog_table_input: Optional[Dict[str, Any]] = _get_table_input(
1031+
database=database,
1032+
table=table,
1033+
boto3_session=boto3_session,
1034+
transaction_id=transaction_id,
1035+
catalog_id=catalog_id,
1036+
)
1037+
_create_orc_table(
1038+
database=database,
1039+
table=table,
1040+
path=path,
1041+
columns_types=columns_types,
1042+
table_type=table_type,
1043+
partitions_types=partitions_types,
1044+
bucketing_info=bucketing_info,
1045+
catalog_id=catalog_id,
1046+
compression=compression,
1047+
description=description,
1048+
parameters=parameters,
1049+
columns_comments=columns_comments,
1050+
mode=mode,
1051+
catalog_versioning=catalog_versioning,
1052+
transaction_id=transaction_id,
1053+
athena_partition_projection_settings=athena_partition_projection_settings,
1054+
boto3_session=boto3_session,
1055+
catalog_table_input=catalog_table_input,
1056+
)
1057+
1058+
8301059
@apply_configs
8311060
def create_csv_table( # pylint: disable=too-many-arguments,too-many-locals
8321061
database: str,

0 commit comments

Comments
 (0)