@@ -715,7 +715,7 @@ def _sanitized_spark_field_name(name):
715715 return _SPARK_DISALLOWED_CHARS .sub ('_' , name )
716716
717717
718- def _sanitize_field_recursive (field ):
718+ def _sanitize_spark_field_recursive (field ):
719719 """
720720 Recursively sanitize field names in struct types for Spark compatibility.
721721
@@ -729,14 +729,16 @@ def _sanitize_field_recursive(field):
729729 type_changed = False
730730
731731 if pa .types .is_struct (field .type ):
732- sanitized_fields = [_sanitize_field_recursive (f ) for f in field .type ]
733- if any (changed for _ , changed in sanitized_fields ):
734- sanitized_type = pa .struct ([f for f , _ in sanitized_fields ])
732+ sanitized_fields , changed = zip (
733+ * [_sanitize_spark_field_recursive (f ) for f in field .type ])
734+ if any (changed ):
735+ sanitized_type = pa .struct (sanitized_fields )
735736 type_changed = True
736737 elif pa .types .is_list (field .type ) or pa .types .is_large_list (field .type ):
737738 # Sanitize the value field of list types
738739 value_field = field .type .value_field
739- sanitized_value_field , value_changed = _sanitize_field_recursive (value_field )
740+ sanitized_value_field , value_changed = _sanitize_spark_field_recursive (
741+ value_field )
740742 if value_changed :
741743 if pa .types .is_list (field .type ):
742744 sanitized_type = pa .list_ (sanitized_value_field )
@@ -747,16 +749,17 @@ def _sanitize_field_recursive(field):
747749 # Sanitize the value field of fixed_size_list types
748750 value_field = field .type .value_field
749751 list_size = field .type .list_size
750- sanitized_value_field , value_changed = _sanitize_field_recursive (value_field )
752+ sanitized_value_field , value_changed = _sanitize_spark_field_recursive (
753+ value_field )
751754 if value_changed :
752755 sanitized_type = pa .list_ (sanitized_value_field , list_size )
753756 type_changed = True
754757 elif pa .types .is_map (field .type ):
755758 # Sanitize both key and item fields of map types
756759 key_field = field .type .key_field
757760 item_field = field .type .item_field
758- sanitized_key_field , key_changed = _sanitize_field_recursive (key_field )
759- sanitized_item_field , item_changed = _sanitize_field_recursive (item_field )
761+ sanitized_key_field , key_changed = _sanitize_spark_field_recursive (key_field )
762+ sanitized_item_field , item_changed = _sanitize_spark_field_recursive (item_field )
760763 if key_changed or item_changed :
761764 sanitized_type = pa .map_ (sanitized_key_field , sanitized_item_field ,
762765 keys_sorted = field .type .keys_sorted )
@@ -777,7 +780,7 @@ def _sanitize_schema(schema, flavor):
777780 schema_changed = False
778781
779782 for field in schema :
780- sanitized_field , changed = _sanitize_field_recursive (field )
783+ sanitized_field , changed = _sanitize_spark_field_recursive (field )
781784 sanitized_fields .append (sanitized_field )
782785 schema_changed = schema_changed or changed
783786
0 commit comments