44
55from awswrangler .exceptions import UnsupportedType , UnsupportedFileFormat
66
7-
87LOGGER = logging .getLogger (__name__ )
98
109
@@ -13,16 +12,16 @@ def __init__(self, session):
1312 self ._session = session
1413
1514 def metadata_to_glue (
16- self ,
17- dataframe ,
18- path ,
19- objects_paths ,
20- file_format ,
21- database = None ,
22- table = None ,
23- partition_cols = None ,
24- preserve_index = True ,
25- mode = "append" ,
15+ self ,
16+ dataframe ,
17+ path ,
18+ objects_paths ,
19+ file_format ,
20+ database = None ,
21+ table = None ,
22+ partition_cols = None ,
23+ preserve_index = True ,
24+ mode = "append" ,
2625 ):
2726 schema = Glue ._build_schema (
2827 dataframe = dataframe ,
@@ -44,8 +43,7 @@ def metadata_to_glue(
4443 )
4544 if partition_cols :
4645 partitions_tuples = Glue ._parse_partitions_tuples (
47- objects_paths = objects_paths , partition_cols = partition_cols
48- )
46+ objects_paths = objects_paths , partition_cols = partition_cols )
4947 self .add_partitions (
5048 database = database ,
5149 table = table ,
@@ -55,43 +53,43 @@ def metadata_to_glue(
5553
5654 def delete_table_if_exists (self , database , table ):
5755 client = self ._session .boto3_session .client (
58- service_name = "glue" , config = self ._session .botocore_config
59- )
56+ service_name = "glue" , config = self ._session .botocore_config )
6057 try :
6158 client .delete_table (DatabaseName = database , Name = table )
6259 except client .exceptions .EntityNotFoundException :
6360 pass
6461
6562 def does_table_exists (self , database , table ):
6663 client = self ._session .boto3_session .client (
67- service_name = "glue" , config = self ._session .botocore_config
68- )
64+ service_name = "glue" , config = self ._session .botocore_config )
6965 try :
7066 client .get_table (DatabaseName = database , Name = table )
7167 return True
7268 except client .exceptions .EntityNotFoundException :
7369 return False
7470
75- def create_table (
76- self , database , table , schema , path , file_format , partition_cols = None
77- ):
71+ def create_table (self ,
72+ database ,
73+ table ,
74+ schema ,
75+ path ,
76+ file_format ,
77+ partition_cols = None ):
7878 client = self ._session .boto3_session .client (
79- service_name = "glue" , config = self ._session .botocore_config
80- )
79+ service_name = "glue" , config = self ._session .botocore_config )
8180 if file_format == "parquet" :
8281 table_input = Glue .parquet_table_definition (
83- table , partition_cols , schema , path
84- )
82+ table , partition_cols , schema , path )
8583 elif file_format == "csv" :
86- table_input = Glue .csv_table_definition (table , partition_cols , schema , path )
84+ table_input = Glue .csv_table_definition (table , partition_cols ,
85+ schema , path )
8786 else :
8887 raise UnsupportedFileFormat (file_format )
8988 client .create_table (DatabaseName = database , TableInput = table_input )
9089
9190 def add_partitions (self , database , table , partition_paths , file_format ):
9291 client = self ._session .boto3_session .client (
93- service_name = "glue" , config = self ._session .botocore_config
94- )
92+ service_name = "glue" , config = self ._session .botocore_config )
9593 if not partition_paths :
9694 return None
9795 partitions = list ()
@@ -107,23 +105,24 @@ def add_partitions(self, database, table, partition_paths, file_format):
107105 for _ in range (pages_num ):
108106 page = partitions [:100 ]
109107 del partitions [:100 ]
110- client .batch_create_partition (
111- DatabaseName = database , TableName = table , PartitionInputList = page
112- )
108+ client .batch_create_partition (DatabaseName = database ,
109+ TableName = table ,
110+ PartitionInputList = page )
113111
114112 def get_connection_details (self , name ):
115113 client = self ._session .boto3_session .client (
116- service_name = "glue" , config = self ._session .botocore_config
117- )
118- return client . get_connection ( Name = name , HidePassword = False )["Connection" ]
114+ service_name = "glue" , config = self ._session .botocore_config )
115+ return client . get_connection ( Name = name ,
116+ HidePassword = False )["Connection" ]
119117
120118 @staticmethod
121119 def _build_schema (dataframe , partition_cols , preserve_index ):
122120 if not partition_cols :
123121 partition_cols = []
124122 schema_built = []
125123 if preserve_index :
126- name = str (dataframe .index .name ) if dataframe .index .name else "index"
124+ name = str (
125+ dataframe .index .name ) if dataframe .index .name else "index"
127126 dataframe .index .name = "index"
128127 dtype = str (dataframe .index .dtype )
129128 if name not in partition_cols :
@@ -168,9 +167,14 @@ def csv_table_definition(table, partition_cols, schema, path):
168167 if not partition_cols :
169168 partition_cols = []
170169 return {
171- "Name" : table ,
172- "PartitionKeys" : [{"Name" : x , "Type" : "string" } for x in partition_cols ],
173- "TableType" : "EXTERNAL_TABLE" ,
170+ "Name" :
171+ table ,
172+ "PartitionKeys" : [{
173+ "Name" : x ,
174+ "Type" : "string"
175+ } for x in partition_cols ],
176+ "TableType" :
177+ "EXTERNAL_TABLE" ,
174178 "Parameters" : {
175179 "classification" : "csv" ,
176180 "compressionType" : "none" ,
@@ -180,15 +184,22 @@ def csv_table_definition(table, partition_cols, schema, path):
180184 "areColumnsQuoted" : "false" ,
181185 },
182186 "StorageDescriptor" : {
183- "Columns" : [{"Name" : x [0 ], "Type" : x [1 ]} for x in schema ],
187+ "Columns" : [{
188+ "Name" : x [0 ],
189+ "Type" : x [1 ]
190+ } for x in schema ],
184191 "Location" : path ,
185192 "InputFormat" : "org.apache.hadoop.mapred.TextInputFormat" ,
186- "OutputFormat" : "org.apache.hadoop.hive.ql.io.HiveIgnoreKeyTextOutputFormat" ,
193+ "OutputFormat" :
194+ "org.apache.hadoop.hive.ql.io.HiveIgnoreKeyTextOutputFormat" ,
187195 "Compressed" : False ,
188196 "NumberOfBuckets" : - 1 ,
189197 "SerdeInfo" : {
190- "Parameters" : {"field.delim" : "," },
191- "SerializationLibrary" : "org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe" ,
198+ "Parameters" : {
199+ "field.delim" : ","
200+ },
201+ "SerializationLibrary" :
202+ "org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe" ,
192203 },
193204 "StoredAsSubDirectories" : False ,
194205 "SortColumns" : [],
@@ -210,8 +221,11 @@ def csv_partition_definition(partition):
210221 "InputFormat" : "org.apache.hadoop.mapred.TextInputFormat" ,
211222 "Location" : partition [0 ],
212223 "SerdeInfo" : {
213- "Parameters" : {"field.delim" : "," },
214- "SerializationLibrary" : "org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe" ,
224+ "Parameters" : {
225+ "field.delim" : ","
226+ },
227+ "SerializationLibrary" :
228+ "org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe" ,
215229 },
216230 "StoredAsSubDirectories" : False ,
217231 },
@@ -223,24 +237,37 @@ def parquet_table_definition(table, partition_cols, schema, path):
223237 if not partition_cols :
224238 partition_cols = []
225239 return {
226- "Name" : table ,
227- "PartitionKeys" : [{"Name" : x , "Type" : "string" } for x in partition_cols ],
228- "TableType" : "EXTERNAL_TABLE" ,
240+ "Name" :
241+ table ,
242+ "PartitionKeys" : [{
243+ "Name" : x ,
244+ "Type" : "string"
245+ } for x in partition_cols ],
246+ "TableType" :
247+ "EXTERNAL_TABLE" ,
229248 "Parameters" : {
230249 "classification" : "parquet" ,
231250 "compressionType" : "none" ,
232251 "typeOfData" : "file" ,
233252 },
234253 "StorageDescriptor" : {
235- "Columns" : [{"Name" : x [0 ], "Type" : x [1 ]} for x in schema ],
254+ "Columns" : [{
255+ "Name" : x [0 ],
256+ "Type" : x [1 ]
257+ } for x in schema ],
236258 "Location" : path ,
237- "InputFormat" : "org.apache.hadoop.hive.ql.io.parquet.MapredParquetInputFormat" ,
238- "OutputFormat" : "org.apache.hadoop.hive.ql.io.parquet.MapredParquetOutputFormat" ,
259+ "InputFormat" :
260+ "org.apache.hadoop.hive.ql.io.parquet.MapredParquetInputFormat" ,
261+ "OutputFormat" :
262+ "org.apache.hadoop.hive.ql.io.parquet.MapredParquetOutputFormat" ,
239263 "Compressed" : False ,
240264 "NumberOfBuckets" : - 1 ,
241265 "SerdeInfo" : {
242- "SerializationLibrary" : "org.apache.hadoop.hive.ql.io.parquet.serde.ParquetHiveSerDe" ,
243- "Parameters" : {"serialization.format" : "1" },
266+ "SerializationLibrary" :
267+ "org.apache.hadoop.hive.ql.io.parquet.serde.ParquetHiveSerDe" ,
268+ "Parameters" : {
269+ "serialization.format" : "1"
270+ },
244271 },
245272 "StoredAsSubDirectories" : False ,
246273 "SortColumns" : [],
@@ -260,8 +287,11 @@ def parquet_partition_definition(partition):
260287 "InputFormat" : "org.apache.hadoop.mapred.TextInputFormat" ,
261288 "Location" : partition [0 ],
262289 "SerdeInfo" : {
263- "Parameters" : {"serialization.format" : "1" },
264- "SerializationLibrary" : "org.apache.hadoop.hive.ql.io.parquet.serde.ParquetHiveSerDe" ,
290+ "Parameters" : {
291+ "serialization.format" : "1"
292+ },
293+ "SerializationLibrary" :
294+ "org.apache.hadoop.hive.ql.io.parquet.serde.ParquetHiveSerDe" ,
265295 },
266296 "StoredAsSubDirectories" : False ,
267297 },
@@ -271,14 +301,15 @@ def parquet_partition_definition(partition):
271301 @staticmethod
272302 def _parse_partitions_tuples (objects_paths , partition_cols ):
273303 paths = {f"{ path .rpartition ('/' )[0 ]} /" for path in objects_paths }
274- return [
275- (
276- path ,
277- Glue ._parse_partition_values (path = path , partition_cols = partition_cols ),
278- )
279- for path in paths
280- ]
304+ return [(
305+ path ,
306+ Glue ._parse_partition_values (path = path ,
307+ partition_cols = partition_cols ),
308+ ) for path in paths ]
281309
282310 @staticmethod
283311 def _parse_partition_values (path , partition_cols ):
284- return [re .search (f"/{ col } =(.*?)/" , path ).group (1 ) for col in partition_cols ]
312+ return [
313+ re .search (f"/{ col } =(.*?)/" , path ).group (1 )
314+ for col in partition_cols
315+ ]
0 commit comments