Skip to content

Commit 1643297

Browse files
committed
create table exploration
1 parent d78f656 commit 1643297

File tree

9 files changed

+686
-31
lines changed

9 files changed

+686
-31
lines changed
Lines changed: 357 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,357 @@
1+
// Licensed to the Apache Software Foundation (ASF) under one
2+
// or more contributor license agreements. See the NOTICE file
3+
// distributed with this work for additional information
4+
// regarding copyright ownership. The ASF licenses this file
5+
// to you under the Apache License, Version 2.0 (the
6+
// "License"); you may not use this file except in compliance
7+
// with the License. You may obtain a copy of the License at
8+
//
9+
// http://www.apache.org/licenses/LICENSE-2.0
10+
//
11+
// Unless required by applicable law or agreed to in writing,
12+
// software distributed under the License is distributed on an
13+
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14+
// KIND, either express or implied. See the License for the
15+
// specific language governing permissions and limitations
16+
// under the License.
17+
18+
//! Arrow schema field ID assignment using breadth-first traversal
19+
20+
use std::sync::Arc;
21+
22+
use arrow_schema::{DataType, Field, Fields, Schema as ArrowSchema, TimeUnit};
23+
24+
use super::get_field_doc;
25+
use crate::error::Result;
26+
use crate::spec::{
27+
ListType, MapType, NestedField, NestedFieldRef, PrimitiveType, Schema, StructType, Type,
28+
};
29+
use crate::{Error, ErrorKind};
30+
31+
/// Helper for assigning field IDs using breadth-first traversal.
32+
///
33+
/// This struct implements BFS traversal to assign field IDs level-by-level,
34+
/// similar to how `ReassignFieldIds` works in the spec module. All fields at
35+
/// one level are assigned IDs before descending to nested fields.
36+
pub(super) struct ArrowSchemaIdAssigner {
37+
next_id: i32,
38+
}
39+
40+
impl ArrowSchemaIdAssigner {
41+
pub(super) fn new(start_id: i32) -> Self {
42+
Self { next_id: start_id }
43+
}
44+
45+
fn next_field_id(&mut self) -> i32 {
46+
let id = self.next_id;
47+
self.next_id += 1;
48+
id
49+
}
50+
51+
pub(super) fn convert_schema(&mut self, schema: &ArrowSchema) -> Result<Schema> {
52+
let fields = self.convert_fields(schema.fields())?;
53+
Schema::builder().with_fields(fields).build()
54+
}
55+
56+
fn convert_fields(&mut self, fields: &Fields) -> Result<Vec<NestedFieldRef>> {
57+
// First pass: convert all fields at this level and assign IDs
58+
let fields_with_types: Vec<_> = fields
59+
.iter()
60+
.map(|field| {
61+
let id = self.next_field_id();
62+
let field_type = arrow_type_to_primitive_or_placeholder(field.data_type())?;
63+
Ok((field, id, field_type))
64+
})
65+
.collect::<Result<Vec<_>>>()?;
66+
67+
// Second pass: recursively process nested types
68+
fields_with_types
69+
.into_iter()
70+
.map(|(field, id, field_type)| {
71+
let final_type = self.process_nested_type(field.data_type(), field_type)?;
72+
let doc = get_field_doc(field);
73+
Ok(Arc::new(NestedField {
74+
id,
75+
doc,
76+
name: field.name().clone(),
77+
required: !field.is_nullable(),
78+
field_type: Box::new(final_type),
79+
initial_default: None,
80+
write_default: None,
81+
}))
82+
})
83+
.collect()
84+
}
85+
86+
fn process_nested_type(&mut self, arrow_type: &DataType, placeholder: Type) -> Result<Type> {
87+
match arrow_type {
88+
DataType::Struct(fields) => {
89+
let nested_fields = self.convert_fields(fields)?;
90+
Ok(Type::Struct(StructType::new(nested_fields)))
91+
}
92+
DataType::List(element_field)
93+
| DataType::LargeList(element_field)
94+
| DataType::FixedSizeList(element_field, _) => {
95+
let element_id = self.next_field_id();
96+
let element_type =
97+
arrow_type_to_primitive_or_placeholder(element_field.data_type())?;
98+
let final_element_type =
99+
self.process_nested_type(element_field.data_type(), element_type)?;
100+
101+
let doc = get_field_doc(element_field);
102+
let mut element = NestedField::list_element(
103+
element_id,
104+
final_element_type,
105+
!element_field.is_nullable(),
106+
);
107+
if let Some(doc) = doc {
108+
element = element.with_doc(doc);
109+
}
110+
Ok(Type::List(ListType {
111+
element_field: Arc::new(element),
112+
}))
113+
}
114+
DataType::Map(field, _) => match field.data_type() {
115+
DataType::Struct(fields) if fields.len() == 2 => {
116+
let key_field = &fields[0];
117+
let value_field = &fields[1];
118+
119+
let key_id = self.next_field_id();
120+
let key_type = arrow_type_to_primitive_or_placeholder(key_field.data_type())?;
121+
let final_key_type =
122+
self.process_nested_type(key_field.data_type(), key_type)?;
123+
124+
let value_id = self.next_field_id();
125+
let value_type =
126+
arrow_type_to_primitive_or_placeholder(value_field.data_type())?;
127+
let final_value_type =
128+
self.process_nested_type(value_field.data_type(), value_type)?;
129+
130+
let key_doc = get_field_doc(key_field);
131+
let mut key = NestedField::map_key_element(key_id, final_key_type);
132+
if let Some(doc) = key_doc {
133+
key = key.with_doc(doc);
134+
}
135+
136+
let value_doc = get_field_doc(value_field);
137+
let mut value = NestedField::map_value_element(
138+
value_id,
139+
final_value_type,
140+
!value_field.is_nullable(),
141+
);
142+
if let Some(doc) = value_doc {
143+
value = value.with_doc(doc);
144+
}
145+
146+
Ok(Type::Map(MapType {
147+
key_field: Arc::new(key),
148+
value_field: Arc::new(value),
149+
}))
150+
}
151+
_ => Err(Error::new(
152+
ErrorKind::DataInvalid,
153+
"Map field must have struct type with 2 fields",
154+
)),
155+
},
156+
_ => Ok(placeholder), // Primitive type, return as-is
157+
}
158+
}
159+
}
160+
161+
/// Convert Arrow type to Iceberg type for primitives, or return a placeholder for complex types
162+
fn arrow_type_to_primitive_or_placeholder(ty: &DataType) -> Result<Type> {
163+
match ty {
164+
DataType::Boolean => Ok(Type::Primitive(PrimitiveType::Boolean)),
165+
DataType::Int8 | DataType::Int16 | DataType::Int32 => {
166+
Ok(Type::Primitive(PrimitiveType::Int))
167+
}
168+
DataType::UInt8 | DataType::UInt16 => Ok(Type::Primitive(PrimitiveType::Int)),
169+
DataType::UInt32 => Ok(Type::Primitive(PrimitiveType::Long)),
170+
DataType::Int64 => Ok(Type::Primitive(PrimitiveType::Long)),
171+
DataType::UInt64 => Err(Error::new(
172+
ErrorKind::DataInvalid,
173+
"UInt64 is not supported. Use Int64 for values ≤ 9,223,372,036,854,775,807 or Decimal(20,0) for full uint64 range.",
174+
)),
175+
DataType::Float32 => Ok(Type::Primitive(PrimitiveType::Float)),
176+
DataType::Float64 => Ok(Type::Primitive(PrimitiveType::Double)),
177+
DataType::Decimal128(p, s) => Type::decimal(*p as u32, *s as u32).map_err(|e| {
178+
Error::new(
179+
ErrorKind::DataInvalid,
180+
"Failed to create decimal type".to_string(),
181+
)
182+
.with_source(e)
183+
}),
184+
DataType::Date32 => Ok(Type::Primitive(PrimitiveType::Date)),
185+
DataType::Time64(unit) if unit == &TimeUnit::Microsecond => {
186+
Ok(Type::Primitive(PrimitiveType::Time))
187+
}
188+
DataType::Timestamp(unit, None) if unit == &TimeUnit::Microsecond => {
189+
Ok(Type::Primitive(PrimitiveType::Timestamp))
190+
}
191+
DataType::Timestamp(unit, None) if unit == &TimeUnit::Nanosecond => {
192+
Ok(Type::Primitive(PrimitiveType::TimestampNs))
193+
}
194+
DataType::Timestamp(unit, Some(zone))
195+
if unit == &TimeUnit::Microsecond
196+
&& (zone.as_ref() == "UTC" || zone.as_ref() == "+00:00") =>
197+
{
198+
Ok(Type::Primitive(PrimitiveType::Timestamptz))
199+
}
200+
DataType::Timestamp(unit, Some(zone))
201+
if unit == &TimeUnit::Nanosecond
202+
&& (zone.as_ref() == "UTC" || zone.as_ref() == "+00:00") =>
203+
{
204+
Ok(Type::Primitive(PrimitiveType::TimestamptzNs))
205+
}
206+
DataType::Binary | DataType::LargeBinary | DataType::BinaryView => {
207+
Ok(Type::Primitive(PrimitiveType::Binary))
208+
}
209+
DataType::FixedSizeBinary(width) => {
210+
Ok(Type::Primitive(PrimitiveType::Fixed(*width as u64)))
211+
}
212+
DataType::Utf8View | DataType::Utf8 | DataType::LargeUtf8 => {
213+
Ok(Type::Primitive(PrimitiveType::String))
214+
}
215+
// For complex types, return a placeholder that will be replaced
216+
DataType::Struct(_)
217+
| DataType::List(_)
218+
| DataType::LargeList(_)
219+
| DataType::FixedSizeList(_, _)
220+
| DataType::Map(_, _) => {
221+
Ok(Type::Primitive(PrimitiveType::Boolean)) // Placeholder
222+
}
223+
other => Err(Error::new(
224+
ErrorKind::DataInvalid,
225+
format!("Unsupported Arrow data type: {other}"),
226+
)),
227+
}
228+
}
229+
230+
#[cfg(test)]
231+
mod tests {
232+
use super::*;
233+
use crate::arrow::DEFAULT_MAP_FIELD_NAME;
234+
235+
#[test]
236+
fn test_arrow_schema_to_schema_with_assigned_ids() {
237+
// Create an Arrow schema without field IDs (like DataFusion CREATE TABLE)
238+
// Include nested structures to test ID assignment in BFS order
239+
let arrow_schema = ArrowSchema::new(vec![
240+
Field::new("id", DataType::Int32, false),
241+
Field::new("name", DataType::Utf8, true),
242+
// Struct field with nested fields
243+
Field::new(
244+
"address",
245+
DataType::Struct(Fields::from(vec![
246+
Field::new("street", DataType::Utf8, false),
247+
Field::new("city", DataType::Utf8, false),
248+
Field::new("zip", DataType::Int32, true),
249+
])),
250+
true,
251+
),
252+
// List field
253+
Field::new(
254+
"tags",
255+
DataType::List(Arc::new(Field::new("element", DataType::Utf8, false))),
256+
true,
257+
),
258+
// Map field
259+
Field::new(
260+
"properties",
261+
DataType::Map(
262+
Arc::new(Field::new(
263+
DEFAULT_MAP_FIELD_NAME,
264+
DataType::Struct(Fields::from(vec![
265+
Field::new("key", DataType::Utf8, false),
266+
Field::new("value", DataType::Int32, true),
267+
])),
268+
false,
269+
)),
270+
false,
271+
),
272+
false,
273+
),
274+
Field::new("value", DataType::Float64, false),
275+
]);
276+
277+
// Convert to Iceberg schema with auto-assigned IDs
278+
let mut assigner = ArrowSchemaIdAssigner::new(1);
279+
let iceberg_schema = assigner.convert_schema(&arrow_schema).unwrap();
280+
281+
// Verify the schema structure
282+
let fields = iceberg_schema.as_struct().fields();
283+
assert_eq!(fields.len(), 6);
284+
285+
// BFS ordering: top-level fields get IDs 1-6, then nested fields get IDs 7+
286+
287+
// Check field 1: id
288+
assert_eq!(fields[0].id, 1);
289+
assert_eq!(fields[0].name, "id");
290+
assert!(fields[0].required);
291+
assert!(matches!(
292+
fields[0].field_type.as_ref(),
293+
Type::Primitive(PrimitiveType::Int)
294+
));
295+
296+
// Check field 2: name
297+
assert_eq!(fields[1].id, 2);
298+
assert_eq!(fields[1].name, "name");
299+
assert!(!fields[1].required);
300+
assert!(matches!(
301+
fields[1].field_type.as_ref(),
302+
Type::Primitive(PrimitiveType::String)
303+
));
304+
305+
// Check field 3: address (struct with nested fields)
306+
assert_eq!(fields[2].id, 3);
307+
assert_eq!(fields[2].name, "address");
308+
assert!(!fields[2].required);
309+
if let Type::Struct(struct_type) = fields[2].field_type.as_ref() {
310+
let nested_fields = struct_type.fields();
311+
assert_eq!(nested_fields.len(), 3);
312+
// Nested field IDs are assigned after all top-level fields (7, 8, 9)
313+
assert_eq!(nested_fields[0].id, 7);
314+
assert_eq!(nested_fields[0].name, "street");
315+
assert_eq!(nested_fields[1].id, 8);
316+
assert_eq!(nested_fields[1].name, "city");
317+
assert_eq!(nested_fields[2].id, 9);
318+
assert_eq!(nested_fields[2].name, "zip");
319+
} else {
320+
panic!("Expected struct type for address field");
321+
}
322+
323+
// Check field 4: tags (list)
324+
assert_eq!(fields[3].id, 4);
325+
assert_eq!(fields[3].name, "tags");
326+
assert!(!fields[3].required);
327+
if let Type::List(list_type) = fields[3].field_type.as_ref() {
328+
// List element ID is assigned after top-level fields
329+
assert_eq!(list_type.element_field.id, 10);
330+
assert!(list_type.element_field.required);
331+
} else {
332+
panic!("Expected list type for tags field");
333+
}
334+
335+
// Check field 5: properties (map)
336+
assert_eq!(fields[4].id, 5);
337+
assert_eq!(fields[4].name, "properties");
338+
assert!(fields[4].required);
339+
if let Type::Map(map_type) = fields[4].field_type.as_ref() {
340+
// Map key and value IDs are assigned after top-level fields
341+
assert_eq!(map_type.key_field.id, 11);
342+
assert_eq!(map_type.value_field.id, 12);
343+
assert!(!map_type.value_field.required);
344+
} else {
345+
panic!("Expected map type for properties field");
346+
}
347+
348+
// Check field 6: value
349+
assert_eq!(fields[5].id, 6);
350+
assert_eq!(fields[5].name, "value");
351+
assert!(fields[5].required);
352+
assert!(matches!(
353+
fields[5].field_type.as_ref(),
354+
Type::Primitive(PrimitiveType::Double)
355+
));
356+
}
357+
}

crates/iceberg/src/arrow/mod.rs

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,7 @@
1717

1818
//! Conversion between Iceberg and Arrow schema
1919
20+
mod id_assigner;
2021
mod schema;
2122
pub use schema::*;
2223

0 commit comments

Comments
 (0)