Skip to content

Commit 58cda50

Browse files
committed
Fix issue with Hive partitions compatibility. #397
1 parent dad5e07 commit 58cda50

File tree

7 files changed

+40
-11
lines changed

7 files changed

+40
-11
lines changed

awswrangler/catalog/_add.py

Lines changed: 12 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -43,6 +43,7 @@ def add_csv_partitions(
4343
compression: Optional[str] = None,
4444
sep: str = ",",
4545
boto3_session: Optional[boto3.Session] = None,
46+
columns_types: Optional[Dict[str, str]] = None,
4647
) -> None:
4748
"""Add partitions (metadata) to a CSV Table in the AWS Glue Catalog.
4849
@@ -64,6 +65,10 @@ def add_csv_partitions(
6465
String of length 1. Field delimiter for the output file.
6566
boto3_session : boto3.Session(), optional
6667
Boto3 Session. The default boto3 session will be used if boto3_session receive None.
68+
columns_types: Optional[Dict[str, str]]
69+
Only required for Hive compability.
70+
Dictionary with keys as column names and values as data types (e.g. {'col0': 'bigint', 'col1': 'double'}).
71+
P.S. Only materialized columns please, not partition columns.
6772
6873
Returns
6974
-------
@@ -85,7 +90,7 @@ def add_csv_partitions(
8590
8691
"""
8792
inputs: List[Dict[str, Any]] = [
88-
_csv_partition_definition(location=k, values=v, compression=compression, sep=sep)
93+
_csv_partition_definition(location=k, values=v, compression=compression, sep=sep, columns_types=columns_types)
8994
for k, v in partitions_values.items()
9095
]
9196
_add_partitions(database=database, table=table, boto3_session=boto3_session, inputs=inputs, catalog_id=catalog_id)
@@ -99,6 +104,7 @@ def add_parquet_partitions(
99104
catalog_id: Optional[str] = None,
100105
compression: Optional[str] = None,
101106
boto3_session: Optional[boto3.Session] = None,
107+
columns_types: Optional[Dict[str, str]] = None,
102108
) -> None:
103109
"""Add partitions (metadata) to a Parquet Table in the AWS Glue Catalog.
104110
@@ -118,6 +124,10 @@ def add_parquet_partitions(
118124
Compression style (``None``, ``snappy``, ``gzip``, etc).
119125
boto3_session : boto3.Session(), optional
120126
Boto3 Session. The default boto3 session will be used if boto3_session receive None.
127+
columns_types: Optional[Dict[str, str]]
128+
Only required for Hive compability.
129+
Dictionary with keys as column names and values as data types (e.g. {'col0': 'bigint', 'col1': 'double'}).
130+
P.S. Only materialized columns please, not partition columns.
121131
122132
Returns
123133
-------
@@ -141,7 +151,7 @@ def add_parquet_partitions(
141151
table = sanitize_table_name(table=table)
142152
if partitions_values:
143153
inputs: List[Dict[str, Any]] = [
144-
_parquet_partition_definition(location=k, values=v, compression=compression)
154+
_parquet_partition_definition(location=k, values=v, compression=compression, columns_types=columns_types)
145155
for k, v in partitions_values.items()
146156
]
147157
_add_partitions(

awswrangler/catalog/_create.py

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -544,7 +544,7 @@ def create_parquet_table(
544544
path : str
545545
Amazon S3 path (e.g. s3://bucket/prefix/).
546546
columns_types: Dict[str, str]
547-
Dictionary with keys as column names and vales as data types (e.g. {'col0': 'bigint', 'col1': 'double'}).
547+
Dictionary with keys as column names and values as data types (e.g. {'col0': 'bigint', 'col1': 'double'}).
548548
partitions_types: Dict[str, str], optional
549549
Dictionary with keys as partition names and values as data types (e.g. {'col2': 'date'}).
550550
catalog_id : str, optional
@@ -674,7 +674,7 @@ def create_csv_table(
674674
path : str
675675
Amazon S3 path (e.g. s3://bucket/prefix/).
676676
columns_types: Dict[str, str]
677-
Dictionary with keys as column names and vales as data types (e.g. {'col0': 'bigint', 'col1': 'double'}).
677+
Dictionary with keys as column names and values as data types (e.g. {'col0': 'bigint', 'col1': 'double'}).
678678
partitions_types: Dict[str, str], optional
679679
Dictionary with keys as partition names and values as data types (e.g. {'col2': 'date'}).
680680
compression : str, optional

awswrangler/catalog/_definitions.py

Lines changed: 20 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -38,9 +38,11 @@ def _parquet_table_definition(
3838
}
3939

4040

41-
def _parquet_partition_definition(location: str, values: List[str], compression: Optional[str]) -> Dict[str, Any]:
41+
def _parquet_partition_definition(
42+
location: str, values: List[str], compression: Optional[str], columns_types: Optional[Dict[str, str]]
43+
) -> Dict[str, Any]:
4244
compressed: bool = compression is not None
43-
return {
45+
definition: Dict[str, Any] = {
4446
"StorageDescriptor": {
4547
"InputFormat": "org.apache.hadoop.hive.ql.io.parquet.MapredParquetInputFormat",
4648
"OutputFormat": "org.apache.hadoop.hive.ql.io.parquet.MapredParquetOutputFormat",
@@ -51,9 +53,15 @@ def _parquet_partition_definition(location: str, values: List[str], compression:
5153
"SerializationLibrary": "org.apache.hadoop.hive.ql.io.parquet.serde.ParquetHiveSerDe",
5254
},
5355
"StoredAsSubDirectories": False,
56+
"NumberOfBuckets": -1,
5457
},
5558
"Values": values,
5659
}
60+
if columns_types is not None:
61+
definition["StorageDescriptor"]["Columns"] = [
62+
{"Name": cname, "Type": dtype} for cname, dtype in columns_types.items()
63+
]
64+
return definition
5765

5866

5967
def _csv_table_definition(
@@ -106,9 +114,11 @@ def _csv_table_definition(
106114
}
107115

108116

109-
def _csv_partition_definition(location: str, values: List[str], compression: Optional[str], sep: str) -> Dict[str, Any]:
117+
def _csv_partition_definition(
118+
location: str, values: List[str], compression: Optional[str], sep: str, columns_types: Optional[Dict[str, str]]
119+
) -> Dict[str, Any]:
110120
compressed: bool = compression is not None
111-
return {
121+
definition: Dict[str, Any] = {
112122
"StorageDescriptor": {
113123
"InputFormat": "org.apache.hadoop.mapred.TextInputFormat",
114124
"OutputFormat": "org.apache.hadoop.hive.ql.io.HiveIgnoreKeyTextOutputFormat",
@@ -119,6 +129,12 @@ def _csv_partition_definition(location: str, values: List[str], compression: Opt
119129
"SerializationLibrary": "org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe",
120130
},
121131
"StoredAsSubDirectories": False,
132+
"NumberOfBuckets": -1,
122133
},
123134
"Values": values,
124135
}
136+
if columns_types is not None:
137+
definition["StorageDescriptor"]["Columns"] = [
138+
{"Name": cname, "Type": dtype} for cname, dtype in columns_types.items()
139+
]
140+
return definition

awswrangler/catalog/_utils.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -235,7 +235,7 @@ def extract_athena_types(
235235
Returns
236236
-------
237237
Tuple[Dict[str, str], Dict[str, str]]
238-
columns_types: Dictionary with keys as column names and vales as
238+
columns_types: Dictionary with keys as column names and values as
239239
data types (e.g. {'col0': 'bigint', 'col1': 'double'}). /
240240
partitions_types: Dictionary with keys as partition names
241241
and values as data types (e.g. {'col2': 'date'}).

awswrangler/s3/_read_parquet.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -762,7 +762,7 @@ def read_parquet_metadata(
762762
Returns
763763
-------
764764
Tuple[Dict[str, str], Optional[Dict[str, str]]]
765-
columns_types: Dictionary with keys as column names and vales as
765+
columns_types: Dictionary with keys as column names and values as
766766
data types (e.g. {'col0': 'bigint', 'col1': 'double'}). /
767767
partitions_types: Dictionary with keys as partition names
768768
and values as data types (e.g. {'col2': 'date'}).

awswrangler/s3/_write_parquet.py

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -562,6 +562,7 @@ def to_parquet( # pylint: disable=too-many-arguments,too-many-locals
562562
compression=compression,
563563
boto3_session=session,
564564
catalog_id=catalog_id,
565+
columns_types=columns_types,
565566
)
566567
return {"paths": paths, "partitions_values": partitions_values}
567568

@@ -700,7 +701,7 @@ def store_parquet_metadata( # pylint: disable=too-many-arguments
700701
-------
701702
Tuple[Dict[str, str], Optional[Dict[str, str]], Optional[Dict[str, List[str]]]]
702703
The metadata used to create the Glue Table.
703-
columns_types: Dictionary with keys as column names and vales as
704+
columns_types: Dictionary with keys as column names and values as
704705
data types (e.g. {'col0': 'bigint', 'col1': 'double'}). /
705706
partitions_types: Dictionary with keys as partition names
706707
and values as data types (e.g. {'col2': 'date'}). /
@@ -766,5 +767,6 @@ def store_parquet_metadata( # pylint: disable=too-many-arguments
766767
compression=compression,
767768
boto3_session=session,
768769
catalog_id=catalog_id,
770+
columns_types=columns_types,
769771
)
770772
return columns_types, partitions_types, partitions_values

awswrangler/s3/_write_text.py

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -442,6 +442,7 @@ def to_csv( # pylint: disable=too-many-arguments,too-many-locals
442442
boto3_session=session,
443443
sep=sep,
444444
catalog_id=catalog_id,
445+
columns_types=columns_types,
445446
)
446447
return {"paths": paths, "partitions_values": partitions_values}
447448

0 commit comments

Comments
 (0)