@@ -174,16 +174,21 @@ def _commit(self) -> UpdatesAndRequirements:
174
174
return updates , requirements
175
175
176
176
def _apply (self ) -> PartitionSpec :
177
- def _check_and_add_partition_name (schema : Schema , name : str , source_id : int , partition_names : Set [str ]) -> None :
177
+ def _check_and_add_partition_name (
178
+ schema : Schema , name : str , source_id : int , transform : Transform [Any , Any ], partition_names : Set [str ]
179
+ ) -> None :
178
180
try :
179
181
field = schema .find_field (name )
180
182
except ValueError :
181
183
field = None
182
184
183
- if source_id is not None and field is not None and field .field_id != source_id :
184
- raise ValueError (f"Cannot create identity partition from a different field in the schema { name } " )
185
- elif field is not None and source_id != field .field_id :
186
- raise ValueError (f"Cannot create partition from name that exists in schema { name } " )
185
+ if field is not None :
186
+ if isinstance (transform , (IdentityTransform , VoidTransform )):
187
+ # For identity transforms allow name conflict only if sourced from the same schema field
188
+ if field .field_id != source_id :
189
+ raise ValueError (f"Cannot create identity partition from a different field in the schema: { name } " )
190
+ else :
191
+ raise ValueError (f"Cannot create partition from name that exists in schema: { name } " )
187
192
if not name :
188
193
raise ValueError ("Undefined name" )
189
194
if name in partition_names :
@@ -193,7 +198,7 @@ def _check_and_add_partition_name(schema: Schema, name: str, source_id: int, par
193
198
def _add_new_field (
194
199
schema : Schema , source_id : int , field_id : int , name : str , transform : Transform [Any , Any ], partition_names : Set [str ]
195
200
) -> PartitionField :
196
- _check_and_add_partition_name (schema , name , source_id , partition_names )
201
+ _check_and_add_partition_name (schema , name , source_id , transform , partition_names )
197
202
return PartitionField (source_id , field_id , transform , name )
198
203
199
204
partition_fields = []
@@ -244,6 +249,13 @@ def _add_new_field(
244
249
partition_fields .append (new_field )
245
250
246
251
for added_field in self ._adds :
252
+ _check_and_add_partition_name (
253
+ self ._transaction .table_metadata .schema (),
254
+ added_field .name ,
255
+ added_field .source_id ,
256
+ added_field .transform ,
257
+ partition_names ,
258
+ )
247
259
new_field = PartitionField (
248
260
source_id = added_field .source_id ,
249
261
field_id = added_field .field_id ,
0 commit comments