Skip to content

Commit 0fe945f

Browse files
committed
PR comments
1 parent 300d938 commit 0fe945f

File tree

3 files changed

+242
-204
lines changed

3 files changed

+242
-204
lines changed

crates/iceberg/src/arrow/record_batch_transformer.rs

Lines changed: 22 additions & 201 deletions
Original file line numberDiff line numberDiff line change
@@ -19,17 +19,15 @@ use std::collections::HashMap;
1919
use std::sync::Arc;
2020

2121
use arrow_array::{
22-
Array as ArrowArray, ArrayRef, BinaryArray, BooleanArray, Date32Array, Decimal128Array,
23-
Float32Array, Float64Array, Int32Array, Int64Array, NullArray, RecordBatch, RecordBatchOptions,
24-
RunArray, StringArray, StructArray,
22+
Array as ArrowArray, ArrayRef, Int32Array, RecordBatch, RecordBatchOptions, RunArray,
2523
};
26-
use arrow_buffer::NullBuffer;
2724
use arrow_cast::cast;
2825
use arrow_schema::{
2926
DataType, Field, FieldRef, Schema as ArrowSchema, SchemaRef as ArrowSchemaRef, SchemaRef,
3027
};
3128
use parquet::arrow::PARQUET_FIELD_ID_META_KEY;
3229

30+
use crate::arrow::value::{create_primitive_array_repeated, create_primitive_array_single_element};
3331
use crate::arrow::{datum_to_arrow_type_with_ree, schema_to_arrow_schema};
3432
use crate::metadata_columns::get_metadata_field;
3533
use crate::spec::{
@@ -85,8 +83,13 @@ fn constants_map(
8583
// Handle both None (null) and Some(Literal::Primitive) cases
8684
match &partition_data[pos] {
8785
None => {
88-
// Null partition values are skipped - they cannot be represented as a constant Datum
89-
// The field will be read from the data file instead (or produce null from missing data)
86+
return Err(Error::new(
87+
ErrorKind::Unexpected,
88+
format!(
89+
"Partition field {} has null value for identity transform",
90+
field.source_id
91+
),
92+
));
9093
}
9194
Some(Literal::Primitive(value)) => {
9295
// Create a Datum from the primitive type and value
@@ -365,8 +368,11 @@ impl RecordBatchTransformer {
365368
// For partition fields, get name from schema (they exist in schema)
366369
if let Ok(iceberg_field) = get_metadata_field(*field_id) {
367370
// This is a metadata/virtual field - convert Iceberg field to Arrow
368-
let arrow_type =
369-
datum_to_arrow_type_with_ree(constant_fields.get(field_id).unwrap());
371+
let datum = constant_fields.get(field_id).ok_or(Error::new(
372+
ErrorKind::Unexpected,
373+
"constant field not found",
374+
))?;
375+
let arrow_type = datum_to_arrow_type_with_ree(datum);
370376
let arrow_field =
371377
Field::new(&iceberg_field.name, arrow_type, !iceberg_field.required)
372378
.with_metadata(HashMap::from([(
@@ -380,7 +386,10 @@ impl RecordBatchTransformer {
380386
.get(field_id)
381387
.ok_or(Error::new(ErrorKind::Unexpected, "field not found"))?
382388
.0;
383-
let datum = constant_fields.get(field_id).unwrap();
389+
let datum = constant_fields.get(field_id).ok_or(Error::new(
390+
ErrorKind::Unexpected,
391+
"constant field not found",
392+
))?;
384393
let arrow_type = datum_to_arrow_type_with_ree(datum);
385394
// Use the type from constant_fields (REE for constants)
386395
let constant_field =
@@ -639,203 +648,15 @@ impl RecordBatchTransformer {
639648
))
640649
};
641650

642-
// Create the values array based on the literal value
643-
let values_array: ArrayRef = match (values_field.data_type(), prim_lit) {
644-
(DataType::Boolean, Some(PrimitiveLiteral::Boolean(v))) => {
645-
Arc::new(BooleanArray::from(vec![*v]))
646-
}
647-
(DataType::Boolean, None) => {
648-
Arc::new(BooleanArray::from(vec![Option::<bool>::None]))
649-
}
650-
(DataType::Int32, Some(PrimitiveLiteral::Int(v))) => {
651-
Arc::new(Int32Array::from(vec![*v]))
652-
}
653-
(DataType::Int32, None) => Arc::new(Int32Array::from(vec![Option::<i32>::None])),
654-
(DataType::Date32, Some(PrimitiveLiteral::Int(v))) => {
655-
Arc::new(Date32Array::from(vec![*v]))
656-
}
657-
(DataType::Date32, None) => Arc::new(Date32Array::from(vec![Option::<i32>::None])),
658-
(DataType::Int64, Some(PrimitiveLiteral::Long(v))) => {
659-
Arc::new(Int64Array::from(vec![*v]))
660-
}
661-
(DataType::Int64, None) => Arc::new(Int64Array::from(vec![Option::<i64>::None])),
662-
(DataType::Float32, Some(PrimitiveLiteral::Float(v))) => {
663-
Arc::new(Float32Array::from(vec![v.0]))
664-
}
665-
(DataType::Float32, None) => {
666-
Arc::new(Float32Array::from(vec![Option::<f32>::None]))
667-
}
668-
(DataType::Float64, Some(PrimitiveLiteral::Double(v))) => {
669-
Arc::new(Float64Array::from(vec![v.0]))
670-
}
671-
(DataType::Float64, None) => {
672-
Arc::new(Float64Array::from(vec![Option::<f64>::None]))
673-
}
674-
(DataType::Utf8, Some(PrimitiveLiteral::String(v))) => {
675-
Arc::new(StringArray::from(vec![v.as_str()]))
676-
}
677-
(DataType::Utf8, None) => Arc::new(StringArray::from(vec![Option::<&str>::None])),
678-
(DataType::Binary, Some(PrimitiveLiteral::Binary(v))) => {
679-
Arc::new(BinaryArray::from_vec(vec![v.as_slice()]))
680-
}
681-
(DataType::Binary, None) => {
682-
Arc::new(BinaryArray::from_opt_vec(vec![Option::<&[u8]>::None]))
683-
}
684-
(DataType::Decimal128(_, _), Some(PrimitiveLiteral::Int128(v))) => {
685-
Arc::new(arrow_array::Decimal128Array::from(vec![{ *v }]))
686-
}
687-
(DataType::Decimal128(_, _), Some(PrimitiveLiteral::UInt128(v))) => {
688-
Arc::new(arrow_array::Decimal128Array::from(vec![*v as i128]))
689-
}
690-
(DataType::Decimal128(_, _), None) => {
691-
Arc::new(arrow_array::Decimal128Array::from(vec![
692-
Option::<i128>::None,
693-
]))
694-
}
695-
(DataType::Struct(fields), None) => {
696-
// Create a single-element StructArray with nulls
697-
let null_arrays: Vec<ArrayRef> = fields
698-
.iter()
699-
.map(|f| {
700-
// Recursively create null arrays for struct fields
701-
// For primitive fields in structs, use simple null arrays (not REE within struct)
702-
match f.data_type() {
703-
DataType::Boolean => {
704-
Arc::new(BooleanArray::from(vec![Option::<bool>::None]))
705-
as ArrayRef
706-
}
707-
DataType::Int32 | DataType::Date32 => {
708-
Arc::new(Int32Array::from(vec![Option::<i32>::None]))
709-
}
710-
DataType::Int64 => {
711-
Arc::new(Int64Array::from(vec![Option::<i64>::None]))
712-
}
713-
DataType::Float32 => {
714-
Arc::new(Float32Array::from(vec![Option::<f32>::None]))
715-
}
716-
DataType::Float64 => {
717-
Arc::new(Float64Array::from(vec![Option::<f64>::None]))
718-
}
719-
DataType::Utf8 => {
720-
Arc::new(StringArray::from(vec![Option::<&str>::None]))
721-
}
722-
DataType::Binary => {
723-
Arc::new(BinaryArray::from_opt_vec(vec![Option::<&[u8]>::None]))
724-
}
725-
_ => panic!("Unsupported struct field type: {:?}", f.data_type()),
726-
}
727-
})
728-
.collect();
729-
Arc::new(arrow_array::StructArray::new(
730-
fields.clone(),
731-
null_arrays,
732-
Some(arrow_buffer::NullBuffer::new_null(1)),
733-
))
734-
}
735-
_ => {
736-
return Err(Error::new(
737-
ErrorKind::Unexpected,
738-
format!(
739-
"Unsupported constant type combination: {:?} with {:?}",
740-
values_field.data_type(),
741-
prim_lit
742-
),
743-
));
744-
}
745-
};
651+
// Create the values array using the helper function
652+
let values_array =
653+
create_primitive_array_single_element(values_field.data_type(), prim_lit)?;
746654

747655
// Wrap in Run-End Encoding
748656
create_ree_array(values_array)
749657
} else {
750658
// Non-REE type (simple arrays for non-constant fields)
751-
Ok(match (target_type, prim_lit) {
752-
(DataType::Boolean, Some(PrimitiveLiteral::Boolean(value))) => {
753-
Arc::new(BooleanArray::from(vec![*value; num_rows]))
754-
}
755-
(DataType::Boolean, None) => {
756-
let vals: Vec<Option<bool>> = vec![None; num_rows];
757-
Arc::new(BooleanArray::from(vals))
758-
}
759-
(DataType::Int32, Some(PrimitiveLiteral::Int(value))) => {
760-
Arc::new(Int32Array::from(vec![*value; num_rows]))
761-
}
762-
(DataType::Int32, None) => {
763-
let vals: Vec<Option<i32>> = vec![None; num_rows];
764-
Arc::new(Int32Array::from(vals))
765-
}
766-
(DataType::Date32, Some(PrimitiveLiteral::Int(value))) => {
767-
Arc::new(Date32Array::from(vec![*value; num_rows]))
768-
}
769-
(DataType::Date32, None) => {
770-
let vals: Vec<Option<i32>> = vec![None; num_rows];
771-
Arc::new(Date32Array::from(vals))
772-
}
773-
(DataType::Int64, Some(PrimitiveLiteral::Long(value))) => {
774-
Arc::new(Int64Array::from(vec![*value; num_rows]))
775-
}
776-
(DataType::Int64, None) => {
777-
let vals: Vec<Option<i64>> = vec![None; num_rows];
778-
Arc::new(Int64Array::from(vals))
779-
}
780-
(DataType::Float32, Some(PrimitiveLiteral::Float(value))) => {
781-
Arc::new(Float32Array::from(vec![value.0; num_rows]))
782-
}
783-
(DataType::Float32, None) => {
784-
let vals: Vec<Option<f32>> = vec![None; num_rows];
785-
Arc::new(Float32Array::from(vals))
786-
}
787-
(DataType::Float64, Some(PrimitiveLiteral::Double(value))) => {
788-
Arc::new(Float64Array::from(vec![value.0; num_rows]))
789-
}
790-
(DataType::Float64, None) => {
791-
let vals: Vec<Option<f64>> = vec![None; num_rows];
792-
Arc::new(Float64Array::from(vals))
793-
}
794-
(DataType::Utf8, Some(PrimitiveLiteral::String(value))) => {
795-
Arc::new(StringArray::from(vec![value.clone(); num_rows]))
796-
}
797-
(DataType::Utf8, None) => {
798-
let vals: Vec<Option<String>> = vec![None; num_rows];
799-
Arc::new(StringArray::from(vals))
800-
}
801-
(DataType::Binary, Some(PrimitiveLiteral::Binary(value))) => {
802-
Arc::new(BinaryArray::from_vec(vec![value; num_rows]))
803-
}
804-
(DataType::Binary, None) => {
805-
let vals: Vec<Option<&[u8]>> = vec![None; num_rows];
806-
Arc::new(BinaryArray::from_opt_vec(vals))
807-
}
808-
(DataType::Decimal128(_, _), Some(PrimitiveLiteral::Int128(value))) => {
809-
Arc::new(Decimal128Array::from(vec![*value; num_rows]))
810-
}
811-
(DataType::Decimal128(_, _), Some(PrimitiveLiteral::UInt128(value))) => {
812-
Arc::new(Decimal128Array::from(vec![*value as i128; num_rows]))
813-
}
814-
(DataType::Decimal128(_, _), None) => {
815-
let vals: Vec<Option<i128>> = vec![None; num_rows];
816-
Arc::new(Decimal128Array::from(vals))
817-
}
818-
(DataType::Struct(fields), None) => {
819-
// Create a StructArray filled with nulls
820-
let null_arrays: Vec<ArrayRef> = fields
821-
.iter()
822-
.map(|field| Self::create_column(field.data_type(), &None, num_rows))
823-
.collect::<Result<Vec<_>>>()?;
824-
825-
Arc::new(StructArray::new(
826-
fields.clone(),
827-
null_arrays,
828-
Some(NullBuffer::new_null(num_rows)),
829-
))
830-
}
831-
(DataType::Null, _) => Arc::new(NullArray::new(num_rows)),
832-
(dt, _) => {
833-
return Err(Error::new(
834-
ErrorKind::Unexpected,
835-
format!("unexpected target column type {}", dt),
836-
));
837-
}
838-
})
659+
create_primitive_array_repeated(target_type, prim_lit, num_rows)
839660
}
840661
}
841662
}

crates/iceberg/src/arrow/schema.rs

Lines changed: 5 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1032,12 +1032,15 @@ impl TryFrom<&crate::spec::Schema> for ArrowSchema {
10321032
/// Arrow DataType with Run-End Encoding applied
10331033
///
10341034
/// # Example
1035-
/// ```ignore
1035+
/// ```
1036+
/// use iceberg::arrow::datum_to_arrow_type_with_ree;
1037+
/// use iceberg::spec::Datum;
1038+
///
10361039
/// let datum = Datum::string("test_file.parquet");
10371040
/// let ree_type = datum_to_arrow_type_with_ree(&datum);
10381041
/// // Returns: RunEndEncoded(Int32, Utf8)
10391042
/// ```
1040-
pub(crate) fn datum_to_arrow_type_with_ree(datum: &Datum) -> DataType {
1043+
pub fn datum_to_arrow_type_with_ree(datum: &Datum) -> DataType {
10411044
// Helper to create REE type with the given values type.
10421045
// Note: values field is nullable as Arrow expects this when building the
10431046
// final Arrow schema with `RunArray::try_new`.

0 commit comments

Comments
 (0)