11use core:: str;
2+ use itertools:: izip;
23use std:: collections:: HashMap ;
34use std:: error:: Error ;
45use std:: sync:: Arc ;
@@ -63,7 +64,7 @@ fn create_empty_metadata(
6364}
6465
6566// Clone an arrow schema, assigning sequential field IDs starting from 1
66- fn assign_field_ids ( arrow_schema : Arc < Schema > ) -> Schema {
67+ fn assign_field_ids ( arrow_schema : & Arc < Schema > ) -> Schema {
6768 let mut field_id_counter = 1 ;
6869 let new_fields: Vec < Field > = arrow_schema
6970 . fields
@@ -83,6 +84,36 @@ fn assign_field_ids(arrow_schema: Arc<Schema>) -> Schema {
8384 Schema :: new_with_metadata ( new_fields, arrow_schema. metadata . clone ( ) )
8485}
8586
87+ fn is_schema_aligned (
88+ new_arrow_schema : & Arc < Schema > ,
89+ existing_iceberg_schema : & Arc < iceberg:: spec:: Schema > ,
90+ ) -> Result < ( ) , DataLoadingError > {
91+ let old_iceberg_struct = existing_iceberg_schema. as_struct ( ) ;
92+ let old_iceberg_fields = old_iceberg_struct. fields ( ) ;
93+
94+ let new_arrow_schema_with_field_ids = assign_field_ids ( new_arrow_schema) ;
95+ let new_iceberg_schema = Arc :: new ( iceberg:: arrow:: arrow_schema_to_schema (
96+ & new_arrow_schema_with_field_ids,
97+ ) ?) ;
98+ let new_iceberg_struct = new_iceberg_schema. as_struct ( ) ;
99+ let new_iceberg_fields = new_iceberg_struct. fields ( ) ;
100+
101+ if old_iceberg_fields. len ( ) != new_iceberg_fields. len ( ) {
102+ return Err ( DataLoadingError :: BadInputError ( format ! ( "New data is incompatible with existing schema. Old schema has {} fields but new schema has {} fields" , old_iceberg_fields. len( ) , new_iceberg_fields. len( ) ) ) ) ;
103+ }
104+ for ( i, old_iceberg_field, new_iceberg_field) in
105+ izip ! ( 0 .., old_iceberg_fields. iter( ) , new_iceberg_fields. iter( ) )
106+ {
107+ if old_iceberg_field. required && !new_iceberg_field. required {
108+ return Err ( DataLoadingError :: BadInputError ( format ! ( "New data is incompatible with existing schema. Field {} ({}) is required in old schema but not required in new schema" , i, old_iceberg_field. name) ) ) ;
109+ }
110+ if old_iceberg_field. field_type != new_iceberg_field. field_type {
111+ return Err ( DataLoadingError :: BadInputError ( format ! ( "New data is incompatible with existing schema. Field {} ({}) has data type {:?} in old schema but {:?} in new schema" , i, old_iceberg_field. name, old_iceberg_field. field_type, new_iceberg_field. field_type) ) ) ;
112+ }
113+ }
114+ Ok ( ( ) )
115+ }
116+
86117// Create a new TableMetadata object by updating the current snapshot of an existing TableMetadata
87118fn update_metadata_snapshot (
88119 previous_metadata : & TableMetadata ,
@@ -139,10 +170,6 @@ pub async fn record_batches_to_iceberg(
139170 pin_mut ! ( record_batch_stream) ;
140171
141172 let file_io = create_file_io ( target_url. to_string ( ) ) ?;
142- let arrow_schema_with_ids = assign_field_ids ( arrow_schema. clone ( ) ) ;
143- let iceberg_schema = Arc :: new ( iceberg:: arrow:: arrow_schema_to_schema (
144- & arrow_schema_with_ids,
145- ) ?) ;
146173
147174 let version_hint_location = format ! ( "{}/metadata/version-hint.text" , target_url) ;
148175 let version_hint_input = file_io. new_input ( & version_hint_location) ?;
@@ -170,7 +197,7 @@ pub async fn record_batches_to_iceberg(
170197 } else {
171198 None
172199 } ;
173- let ( previous_metadata, previous_metadata_location) = match old_version_hint {
200+ let ( previous_metadata, previous_metadata_location, iceberg_schema ) = match old_version_hint {
174201 Some ( version_hint) => {
175202 let old_metadata_location =
176203 format ! ( "{}/metadata/v{}.metadata.json" , target_url, version_hint) ;
@@ -188,17 +215,21 @@ pub async fn record_batches_to_iceberg(
188215 "Could not parse old metadata file" ,
189216 ) )
190217 } ) ?;
191- if old_metadata. current_schema ( ) != & iceberg_schema {
192- return Err ( DataLoadingError :: IcebergError ( iceberg :: Error :: new (
193- iceberg :: ErrorKind :: FeatureUnsupported ,
194- "Schema changes not supported" ,
195- ) ) ) ;
196- }
197- ( old_metadata , Some ( old_metadata_location ) )
218+ let old_iceberg_schema = old_metadata. current_schema ( ) ;
219+ is_schema_aligned ( & arrow_schema , old_iceberg_schema ) ? ;
220+ (
221+ old_metadata . clone ( ) ,
222+ Some ( old_metadata_location ) ,
223+ old_iceberg_schema . clone ( ) ,
224+ )
198225 }
199226 None => {
227+ let arrow_schema_with_ids = assign_field_ids ( & arrow_schema) ;
228+ let iceberg_schema = Arc :: new ( iceberg:: arrow:: arrow_schema_to_schema (
229+ & arrow_schema_with_ids,
230+ ) ?) ;
200231 let empty_metadata = create_empty_metadata ( & iceberg_schema, target_url. to_string ( ) ) ?;
201- ( empty_metadata, None )
232+ ( empty_metadata, None , iceberg_schema )
202233 }
203234 } ;
204235
@@ -344,3 +375,179 @@ pub async fn record_batches_to_iceberg(
344375
345376 Ok ( ( ) )
346377}
378+
379+ #[ cfg( test) ]
380+ mod tests {
381+ use std:: { collections:: HashMap , sync:: Arc } ;
382+
383+ use arrow_schema:: { DataType , Field , Schema } ;
384+ use iceberg:: spec:: { NestedField , PrimitiveType , Type } ;
385+
386+ use crate :: iceberg_destination:: is_schema_aligned;
387+
388+ #[ test]
389+ fn test_is_schema_aligned_positive ( ) {
390+ let arrow_schema = Schema :: new_with_metadata (
391+ vec ! [
392+ Field :: new( "a" , DataType :: Utf8 , false ) ,
393+ Field :: new( "b" , DataType :: Int32 , false ) ,
394+ Field :: new( "c" , DataType :: Boolean , false ) ,
395+ ] ,
396+ HashMap :: new ( ) ,
397+ ) ;
398+
399+ let iceberg_schema = iceberg:: spec:: Schema :: builder ( )
400+ . with_fields ( vec ! [
401+ NestedField :: optional( 1 , "a" , Type :: Primitive ( PrimitiveType :: String ) ) . into( ) ,
402+ NestedField :: required( 2 , "b" , Type :: Primitive ( PrimitiveType :: Int ) ) . into( ) ,
403+ NestedField :: optional( 3 , "c" , Type :: Primitive ( PrimitiveType :: Boolean ) ) . into( ) ,
404+ ] )
405+ . build ( )
406+ . unwrap ( ) ;
407+
408+ assert ! ( is_schema_aligned( & Arc :: new( arrow_schema) , & Arc :: new( iceberg_schema) ) . is_ok( ) ) ;
409+ }
410+
411+ #[ test]
412+ fn test_is_schema_aligned_positive_renamed ( ) {
413+ let arrow_schema = Schema :: new_with_metadata (
414+ vec ! [
415+ // Fields renamed
416+ Field :: new( "x" , DataType :: Utf8 , false ) ,
417+ Field :: new( "y" , DataType :: Int32 , false ) ,
418+ Field :: new( "z" , DataType :: Boolean , false ) ,
419+ ] ,
420+ HashMap :: new ( ) ,
421+ ) ;
422+
423+ let iceberg_schema = iceberg:: spec:: Schema :: builder ( )
424+ . with_fields ( vec ! [
425+ NestedField :: required( 1 , "a" , Type :: Primitive ( PrimitiveType :: String ) ) . into( ) ,
426+ NestedField :: required( 2 , "b" , Type :: Primitive ( PrimitiveType :: Int ) ) . into( ) ,
427+ NestedField :: required( 3 , "c" , Type :: Primitive ( PrimitiveType :: Boolean ) ) . into( ) ,
428+ ] )
429+ . build ( )
430+ . unwrap ( ) ;
431+
432+ assert ! ( is_schema_aligned( & Arc :: new( arrow_schema) , & Arc :: new( iceberg_schema) ) . is_ok( ) ) ;
433+ }
434+
435+ // OK to insert a non-nullable value into a nullable field
436+ #[ test]
437+ fn test_is_schema_aligned_positive_nonnullable ( ) {
438+ let arrow_schema = Schema :: new_with_metadata (
439+ vec ! [
440+ Field :: new( "a" , DataType :: Utf8 , false ) ,
441+ Field :: new( "b" , DataType :: Int32 , false ) ,
442+ Field :: new( "c" , DataType :: Boolean , false ) ,
443+ ] ,
444+ HashMap :: new ( ) ,
445+ ) ;
446+
447+ let iceberg_schema = iceberg:: spec:: Schema :: builder ( )
448+ . with_fields ( vec ! [
449+ NestedField :: optional( 1 , "a" , Type :: Primitive ( PrimitiveType :: String ) ) . into( ) ,
450+ NestedField :: optional( 2 , "b" , Type :: Primitive ( PrimitiveType :: Int ) ) . into( ) ,
451+ NestedField :: optional( 3 , "c" , Type :: Primitive ( PrimitiveType :: Boolean ) ) . into( ) ,
452+ ] )
453+ . build ( )
454+ . unwrap ( ) ;
455+
456+ assert ! ( is_schema_aligned( & Arc :: new( arrow_schema) , & Arc :: new( iceberg_schema) ) . is_ok( ) ) ;
457+ }
458+
459+ #[ test]
460+ fn test_is_schema_aligned_negative_added_field ( ) {
461+ let arrow_schema = Schema :: new_with_metadata (
462+ vec ! [
463+ Field :: new( "a" , DataType :: Utf8 , false ) ,
464+ Field :: new( "b" , DataType :: Int32 , false ) ,
465+ Field :: new( "c" , DataType :: Boolean , false ) ,
466+ Field :: new( "d" , DataType :: Boolean , false ) , // Added field
467+ ] ,
468+ HashMap :: new ( ) ,
469+ ) ;
470+
471+ let iceberg_schema = iceberg:: spec:: Schema :: builder ( )
472+ . with_fields ( vec ! [
473+ NestedField :: required( 1 , "a" , Type :: Primitive ( PrimitiveType :: String ) ) . into( ) ,
474+ NestedField :: required( 2 , "b" , Type :: Primitive ( PrimitiveType :: Int ) ) . into( ) ,
475+ NestedField :: required( 3 , "c" , Type :: Primitive ( PrimitiveType :: Boolean ) ) . into( ) ,
476+ ] )
477+ . build ( )
478+ . unwrap ( ) ;
479+
480+ assert ! ( is_schema_aligned( & Arc :: new( arrow_schema) , & Arc :: new( iceberg_schema) ) . is_err( ) ) ;
481+ }
482+
483+ #[ test]
484+ fn test_is_schema_aligned_negative_different_type ( ) {
485+ let arrow_schema = Schema :: new_with_metadata (
486+ vec ! [
487+ Field :: new( "a" , DataType :: Utf8 , false ) ,
488+ Field :: new( "b" , DataType :: Int32 , false ) ,
489+ Field :: new( "c" , DataType :: Int32 , false ) , // Mismatched type
490+ ] ,
491+ HashMap :: new ( ) ,
492+ ) ;
493+
494+ let iceberg_schema = iceberg:: spec:: Schema :: builder ( )
495+ . with_fields ( vec ! [
496+ NestedField :: required( 1 , "a" , Type :: Primitive ( PrimitiveType :: String ) ) . into( ) ,
497+ NestedField :: required( 2 , "b" , Type :: Primitive ( PrimitiveType :: Int ) ) . into( ) ,
498+ NestedField :: required( 3 , "c" , Type :: Primitive ( PrimitiveType :: Boolean ) ) . into( ) ,
499+ ] )
500+ . build ( )
501+ . unwrap ( ) ;
502+
503+ assert ! ( is_schema_aligned( & Arc :: new( arrow_schema) , & Arc :: new( iceberg_schema) ) . is_err( ) ) ;
504+ }
505+
506+ #[ test]
507+ fn test_is_schema_aligned_negative_reordered ( ) {
508+ let arrow_schema = Schema :: new_with_metadata (
509+ vec ! [
510+ // Same fields but in wrong order
511+ Field :: new( "b" , DataType :: Int32 , false ) ,
512+ Field :: new( "a" , DataType :: Utf8 , false ) ,
513+ Field :: new( "c" , DataType :: Boolean , false ) ,
514+ ] ,
515+ HashMap :: new ( ) ,
516+ ) ;
517+
518+ let iceberg_schema = iceberg:: spec:: Schema :: builder ( )
519+ . with_fields ( vec ! [
520+ NestedField :: required( 1 , "a" , Type :: Primitive ( PrimitiveType :: String ) ) . into( ) ,
521+ NestedField :: required( 2 , "b" , Type :: Primitive ( PrimitiveType :: Int ) ) . into( ) ,
522+ NestedField :: required( 3 , "c" , Type :: Primitive ( PrimitiveType :: Boolean ) ) . into( ) ,
523+ ] )
524+ . build ( )
525+ . unwrap ( ) ;
526+
527+ assert ! ( is_schema_aligned( & Arc :: new( arrow_schema) , & Arc :: new( iceberg_schema) ) . is_err( ) ) ;
528+ }
529+
530+ // Not allowed to insert a nullable value into a non-nullable field
531+ #[ test]
532+ fn test_is_schema_aligned_negative_nullable ( ) {
533+ let arrow_schema = Schema :: new_with_metadata (
534+ vec ! [
535+ Field :: new( "a" , DataType :: Utf8 , true ) , // Nullable
536+ Field :: new( "b" , DataType :: Int32 , false ) ,
537+ Field :: new( "c" , DataType :: Boolean , false ) ,
538+ ] ,
539+ HashMap :: new ( ) ,
540+ ) ;
541+
542+ let iceberg_schema = iceberg:: spec:: Schema :: builder ( )
543+ . with_fields ( vec ! [
544+ NestedField :: required( 1 , "a" , Type :: Primitive ( PrimitiveType :: String ) ) . into( ) ,
545+ NestedField :: required( 2 , "b" , Type :: Primitive ( PrimitiveType :: Int ) ) . into( ) ,
546+ NestedField :: required( 3 , "c" , Type :: Primitive ( PrimitiveType :: Boolean ) ) . into( ) ,
547+ ] )
548+ . build ( )
549+ . unwrap ( ) ;
550+
551+ assert ! ( is_schema_aligned( & Arc :: new( arrow_schema) , & Arc :: new( iceberg_schema) ) . is_err( ) ) ;
552+ }
553+ }
0 commit comments