@@ -252,6 +252,29 @@ def _overwrite_table_parameters(
252252 return parameters
253253
254254
255+ def _update_table_input (table_input : Dict [str , Any ], columns_types : Dict [str , str ], allow_reorder : bool = True ) -> bool :
256+ column_updated = False
257+
258+ catalog_cols : Dict [str , str ] = {x ["Name" ]: x ["Type" ] for x in table_input ["StorageDescriptor" ]["Columns" ]}
259+
260+ if not allow_reorder :
261+ for catalog_key , frame_key in zip (catalog_cols , columns_types ):
262+ if catalog_key != frame_key :
263+ raise exceptions .InvalidArgumentValue (f"Column { frame_key } is out of order." )
264+
265+ for c , t in columns_types .items ():
266+ if c not in catalog_cols :
267+ _logger .debug ("New column %s with type %s." , c , t )
268+ table_input ["StorageDescriptor" ]["Columns" ].append ({"Name" : c , "Type" : t })
269+ column_updated = True
270+ elif t != catalog_cols [c ]: # Data type change detected!
271+ raise exceptions .InvalidArgumentValue (
272+ f"Data type change detected on column { c } (Old type: { catalog_cols [c ]} / New type { t } )."
273+ )
274+
275+ return column_updated
276+
277+
255278def _create_parquet_table (
256279 database : str ,
257280 table : str ,
@@ -282,19 +305,14 @@ def _create_parquet_table(
282305 table = sanitize_table_name (table = table )
283306 partitions_types = {} if partitions_types is None else partitions_types
284307 _logger .debug ("catalog_table_input: %s" , catalog_table_input )
308+
285309 table_input : Dict [str , Any ]
286310 if (catalog_table_input is not None ) and (mode in ("append" , "overwrite_partitions" )):
287311 table_input = catalog_table_input
288- catalog_cols : Dict [str , str ] = {x ["Name" ]: x ["Type" ] for x in table_input ["StorageDescriptor" ]["Columns" ]}
289- for c , t in columns_types .items ():
290- if c not in catalog_cols :
291- _logger .debug ("New column %s with type %s." , c , t )
292- table_input ["StorageDescriptor" ]["Columns" ].append ({"Name" : c , "Type" : t })
293- mode = "update"
294- elif t != catalog_cols [c ]: # Data type change detected!
295- raise exceptions .InvalidArgumentValue (
296- f"Data type change detected on column { c } (Old type: { catalog_cols [c ]} / New type { t } )."
297- )
312+
313+ is_table_updated = _update_table_input (table_input , columns_types )
314+ if is_table_updated :
315+ mode = "update"
298316 else :
299317 table_input = _parquet_table_definition (
300318 table = table ,
@@ -368,11 +386,18 @@ def _create_csv_table( # pylint: disable=too-many-arguments,too-many-locals
368386 table = sanitize_table_name (table = table )
369387 partitions_types = {} if partitions_types is None else partitions_types
370388 _logger .debug ("catalog_table_input: %s" , catalog_table_input )
371- table_input : Dict [ str , Any ]
389+
372390 if schema_evolution is False :
373391 _utils .check_schema_changes (columns_types = columns_types , table_input = catalog_table_input , mode = mode )
392+
393+ table_input : Dict [str , Any ]
374394 if (catalog_table_input is not None ) and (mode in ("append" , "overwrite_partitions" )):
375395 table_input = catalog_table_input
396+
397+ is_table_updated = _update_table_input (table_input , columns_types , allow_reorder = False )
398+ if is_table_updated :
399+ mode = "update"
400+
376401 else :
377402 table_input = _csv_table_definition (
378403 table = table ,
@@ -415,7 +440,7 @@ def _create_csv_table( # pylint: disable=too-many-arguments,too-many-locals
415440 )
416441
417442
418- def _create_json_table ( # pylint: disable=too-many-arguments
443+ def _create_json_table ( # pylint: disable=too-many-arguments,too-many-locals
419444 database : str ,
420445 table : str ,
421446 path : str ,
@@ -453,6 +478,11 @@ def _create_json_table( # pylint: disable=too-many-arguments
453478 _utils .check_schema_changes (columns_types = columns_types , table_input = catalog_table_input , mode = mode )
454479 if (catalog_table_input is not None ) and (mode in ("append" , "overwrite_partitions" )):
455480 table_input = catalog_table_input
481+
482+ is_table_updated = _update_table_input (table_input , columns_types )
483+ if is_table_updated :
484+ mode = "update"
485+
456486 else :
457487 table_input = _json_table_definition (
458488 table = table ,
0 commit comments