@@ -22,6 +22,7 @@ use arrow_buffer::NullBuffer;
22
22
use arrow_schema:: { DataType , Field , FieldRef , Fields , Schema , SchemaRef } ;
23
23
use parquet:: arrow:: PARQUET_FIELD_ID_META_KEY ;
24
24
25
+ use crate :: arrow:: schema:: schema_to_arrow_schema;
25
26
use crate :: error:: Result ;
26
27
use crate :: spec:: Schema as IcebergSchema ;
27
28
use crate :: { Error , ErrorKind } ;
@@ -79,22 +80,21 @@ impl RecordBatchProjector {
79
80
} )
80
81
}
81
82
82
- /// Create RecordBatchProjector using Iceberg schema for field mapping .
83
+ /// Create RecordBatchProjector using Iceberg schema.
83
84
///
84
- /// This constructor is more flexible and works with any Arrow schema by using
85
- /// the Iceberg schema to map field names to field IDs .
85
+ /// This constructor converts the Iceberg schema to Arrow schema with field ID metadata,
86
+ /// then uses the standard field ID lookup for projection .
86
87
///
87
88
/// # Arguments
88
- /// * `original_schema` - The original Arrow schema (doesn't need field ID metadata)
89
- /// * `iceberg_schema` - The Iceberg schema for field ID mapping
89
+ /// * `iceberg_schema` - The Iceberg schema for field ID mapping
90
90
/// * `target_field_ids` - The field IDs to project
91
- pub fn from_iceberg_schema_mapping (
92
- original_schema : SchemaRef ,
91
+ pub fn from_iceberg_schema (
93
92
iceberg_schema : Arc < IcebergSchema > ,
94
93
target_field_ids : & [ i32 ] ,
95
94
) -> Result < Self > {
95
+ let arrow_schema_with_ids = Arc :: new ( schema_to_arrow_schema ( & iceberg_schema) ?) ;
96
+
96
97
let field_id_fetch_func = |field : & Field | -> Result < Option < i64 > > {
97
- // First try to get field ID from metadata (Parquet case)
98
98
if let Some ( value) = field. metadata ( ) . get ( PARQUET_FIELD_ID_META_KEY ) {
99
99
let field_id = value. parse :: < i32 > ( ) . map_err ( |e| {
100
100
Error :: new (
@@ -104,49 +104,16 @@ impl RecordBatchProjector {
104
104
. with_context ( "value" , value)
105
105
. with_source ( e)
106
106
} ) ?;
107
- return Ok ( Some ( field_id as i64 ) ) ;
108
- }
109
-
110
- // Fallback: use Iceberg schema's built-in field lookup
111
- if let Some ( iceberg_field) = iceberg_schema. field_by_name ( field. name ( ) ) {
112
- return Ok ( Some ( iceberg_field. id as i64 ) ) ;
113
- }
114
-
115
- // Additional fallback: for nested fields, we need to search recursively
116
- fn find_field_id_in_struct (
117
- struct_type : & crate :: spec:: StructType ,
118
- field_name : & str ,
119
- ) -> Option < i32 > {
120
- for field in struct_type. fields ( ) {
121
- if field. name == field_name {
122
- return Some ( field. id ) ;
123
- }
124
- if let crate :: spec:: Type :: Struct ( nested_struct) = & * field. field_type {
125
- if let Some ( nested_id) = find_field_id_in_struct ( nested_struct, field_name)
126
- {
127
- return Some ( nested_id) ;
128
- }
129
- }
130
- }
131
- None
132
- }
133
-
134
- // Search in nested structs
135
- for iceberg_field in iceberg_schema. as_struct ( ) . fields ( ) {
136
- if let crate :: spec:: Type :: Struct ( struct_type) = & * iceberg_field. field_type {
137
- if let Some ( nested_id) = find_field_id_in_struct ( struct_type, field. name ( ) ) {
138
- return Ok ( Some ( nested_id as i64 ) ) ;
139
- }
140
- }
107
+ Ok ( Some ( field_id as i64 ) )
108
+ } else {
109
+ Ok ( None )
141
110
}
142
-
143
- Ok ( None )
144
111
} ;
145
112
146
113
let searchable_field_func = |_field : & Field | -> bool { true } ;
147
114
148
115
Self :: new (
149
- original_schema ,
116
+ arrow_schema_with_ids ,
150
117
target_field_ids,
151
118
field_id_fetch_func,
152
119
searchable_field_func,
@@ -242,6 +209,7 @@ mod test {
242
209
use arrow_schema:: { DataType , Field , Fields , Schema } ;
243
210
244
211
use crate :: arrow:: record_batch_projector:: RecordBatchProjector ;
212
+ use crate :: spec:: { NestedField , PrimitiveType , Schema as IcebergSchema , Type } ;
245
213
use crate :: { Error , ErrorKind } ;
246
214
247
215
#[ test]
@@ -369,4 +337,25 @@ mod test {
369
337
RecordBatchProjector :: new ( schema. clone ( ) , & [ 3 ] , field_id_fetch_func, |_| true ) ;
370
338
assert ! ( projector. is_ok( ) ) ;
371
339
}
340
+
341
+ #[ test]
342
+ fn test_from_iceberg_schema ( ) {
343
+ let iceberg_schema = IcebergSchema :: builder ( )
344
+ . with_schema_id ( 0 )
345
+ . with_fields ( vec ! [
346
+ NestedField :: required( 1 , "id" , Type :: Primitive ( PrimitiveType :: Int ) ) . into( ) ,
347
+ NestedField :: required( 2 , "name" , Type :: Primitive ( PrimitiveType :: String ) ) . into( ) ,
348
+ NestedField :: optional( 3 , "age" , Type :: Primitive ( PrimitiveType :: Int ) ) . into( ) ,
349
+ ] )
350
+ . build ( )
351
+ . unwrap ( ) ;
352
+
353
+ let projector =
354
+ RecordBatchProjector :: from_iceberg_schema ( Arc :: new ( iceberg_schema) , & [ 1 , 3 ] ) . unwrap ( ) ;
355
+
356
+ assert_eq ! ( projector. field_indices. len( ) , 2 ) ;
357
+ assert_eq ! ( projector. projected_schema_ref( ) . fields( ) . len( ) , 2 ) ;
358
+ assert_eq ! ( projector. projected_schema_ref( ) . field( 0 ) . name( ) , "id" ) ;
359
+ assert_eq ! ( projector. projected_schema_ref( ) . field( 1 ) . name( ) , "age" ) ;
360
+ }
372
361
}
0 commit comments