33import logging
44from datetime import datetime , date
55
6- from awswrangler .exceptions import UnsupportedType , UnsupportedFileFormat
6+ from awswrangler .exceptions import UnsupportedType , UnsupportedFileFormat , PartitionColumnTypeNotFound
77
88logger = logging .getLogger (__name__ )
99
@@ -111,10 +111,11 @@ def metadata_to_glue(self,
111111 mode = "append" ,
112112 cast_columns = None ,
113113 extra_args = None ):
114- schema = Glue ._build_schema (dataframe = dataframe ,
115- partition_cols = partition_cols ,
116- preserve_index = preserve_index ,
117- cast_columns = cast_columns )
114+ schema , partition_cols_schema = Glue ._build_schema (
115+ dataframe = dataframe ,
116+ partition_cols = partition_cols ,
117+ preserve_index = preserve_index ,
118+ cast_columns = cast_columns )
118119 table = table if table else Glue ._parse_table_name (path )
119120 table = table .lower ().replace ("." , "_" )
120121 if mode == "overwrite" :
@@ -124,13 +125,14 @@ def metadata_to_glue(self,
124125 self .create_table (database = database ,
125126 table = table ,
126127 schema = schema ,
127- partition_cols = partition_cols ,
128+ partition_cols_schema = partition_cols_schema ,
128129 path = path ,
129130 file_format = file_format ,
130131 extra_args = extra_args )
131132 if partition_cols :
132133 partitions_tuples = Glue ._parse_partitions_tuples (
133134 objects_paths = objects_paths , partition_cols = partition_cols )
135+ print (partitions_tuples )
134136 self .add_partitions (
135137 database = database ,
136138 table = table ,
@@ -157,14 +159,13 @@ def create_table(self,
157159 schema ,
158160 path ,
159161 file_format ,
160- partition_cols = None ,
162+ partition_cols_schema = None ,
161163 extra_args = None ):
162164 if file_format == "parquet" :
163- table_input = Glue .parquet_table_definition (
164- table , partition_cols , schema , path )
165+ table_input = Glue .parquet_table_definition (table , partition_cols_schema , schema , path )
165166 elif file_format == "csv" :
166167 table_input = Glue .csv_table_definition (table ,
167- partition_cols ,
168+ partition_cols_schema ,
168169 schema ,
169170 path ,
170171 extra_args = extra_args )
@@ -189,6 +190,9 @@ def add_partitions(self, database, table, partition_paths, file_format):
189190 for _ in range (pages_num ):
190191 page = partitions [:100 ]
191192 del partitions [:100 ]
193+ print (database )
194+ print (table )
195+ print (page )
192196 self ._client_glue .batch_create_partition (DatabaseName = database ,
193197 TableName = table ,
194198 PartitionInputList = page )
@@ -206,25 +210,32 @@ def _build_schema(dataframe,
206210 if not partition_cols :
207211 partition_cols = []
208212 schema_built = []
213+ partition_cols_schema_built = []
209214 if preserve_index :
210215 name = str (
211216 dataframe .index .name ) if dataframe .index .name else "index"
212217 dataframe .index .name = "index"
213218 dtype = str (dataframe .index .dtype )
219+ athena_type = Glue .type_pandas2athena (dtype )
214220 if name not in partition_cols :
215- athena_type = Glue .type_pandas2athena (dtype )
216221 schema_built .append ((name , athena_type ))
222+ else :
223+ partition_cols_schema_built .append ((name , athena_type ))
217224 for col in dataframe .columns :
218225 name = str (col )
219226 if cast_columns and name in cast_columns :
220227 dtype = cast_columns [name ]
221228 else :
222229 dtype = str (dataframe [name ].dtype )
230+ athena_type = Glue .type_pandas2athena (dtype )
223231 if name not in partition_cols :
224- athena_type = Glue .type_pandas2athena (dtype )
225232 schema_built .append ((name , athena_type ))
233+ else :
234+ partition_cols_schema_built .append ((name , athena_type ))
226235 logger .debug (f"schema_built:\n { schema_built } " )
227- return schema_built
236+ logger .debug (
237+ f"partition_cols_schema_built:\n { partition_cols_schema_built } " )
238+ return schema_built , partition_cols_schema_built
228239
229240 @staticmethod
230241 def _parse_table_name (path ):
@@ -233,17 +244,17 @@ def _parse_table_name(path):
233244 return path .rpartition ("/" )[2 ]
234245
235246 @staticmethod
236- def csv_table_definition (table , partition_cols , schema , path , extra_args ):
247+ def csv_table_definition (table , partition_cols_schema , schema , path , extra_args ):
237248 sep = extra_args ["sep" ] if "sep" in extra_args else ","
238- if not partition_cols :
239- partition_cols = []
249+ if not partition_cols_schema :
250+ partition_cols_schema = []
240251 return {
241252 "Name" :
242253 table ,
243254 "PartitionKeys" : [{
244- "Name" : x ,
245- "Type" : "string"
246- } for x in partition_cols ],
255+ "Name" : x [ 0 ] ,
256+ "Type" : x [ 1 ]
257+ } for x in partition_cols_schema ],
247258 "TableType" :
248259 "EXTERNAL_TABLE" ,
249260 "Parameters" : {
@@ -304,16 +315,17 @@ def csv_partition_definition(partition):
304315 }
305316
306317 @staticmethod
307- def parquet_table_definition (table , partition_cols , schema , path ):
308- if not partition_cols :
309- partition_cols = []
318+ def parquet_table_definition (table , partition_cols_schema ,
319+ schema , path ):
320+ if not partition_cols_schema :
321+ partition_cols_schema = []
310322 return {
311323 "Name" :
312324 table ,
313325 "PartitionKeys" : [{
314- "Name" : x ,
315- "Type" : "string"
316- } for x in partition_cols ],
326+ "Name" : x [ 0 ] ,
327+ "Type" : x [ 1 ]
328+ } for x in partition_cols_schema ],
317329 "TableType" :
318330 "EXTERNAL_TABLE" ,
319331 "Parameters" : {
0 commit comments