Skip to content

Commit 38e3233

Browse files
committed
PR comments
1 parent 0d674a4 commit 38e3233

File tree

4 files changed

+138
-122
lines changed

4 files changed

+138
-122
lines changed

crates/iceberg/src/arrow/reader.rs

Lines changed: 2 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -56,7 +56,7 @@ use crate::expr::{BoundPredicate, BoundReference};
5656
use crate::io::{FileIO, FileMetadata, FileRead};
5757
use crate::metadata_columns::{RESERVED_FIELD_ID_FILE, is_metadata_field};
5858
use crate::scan::{ArrowRecordBatchStream, FileScanTask, FileScanTaskStream};
59-
use crate::spec::{Datum, NameMapping, NestedField, PrimitiveLiteral, PrimitiveType, Schema, Type};
59+
use crate::spec::{Datum, NameMapping, NestedField, PrimitiveType, Schema, Type};
6060
use crate::utils::available_parallelism;
6161
use crate::{Error, ErrorKind};
6262

@@ -279,10 +279,7 @@ impl ArrowReader {
279279
// column re-ordering, partition constants, and virtual field addition (like _file)
280280
let mut record_batch_transformer_builder =
281281
RecordBatchTransformerBuilder::new(task.schema_ref(), task.project_field_ids())
282-
.with_constant(
283-
RESERVED_FIELD_ID_FILE,
284-
PrimitiveLiteral::String(task.data_file_path.clone()),
285-
)?;
282+
.with_reserved_field(RESERVED_FIELD_ID_FILE, task.data_file_path.clone())?;
286283

287284
if let (Some(partition_spec), Some(partition_data)) =
288285
(task.partition_spec.clone(), task.partition.clone())

crates/iceberg/src/arrow/record_batch_transformer.rs

Lines changed: 100 additions & 50 deletions
Original file line numberDiff line numberDiff line change
@@ -31,13 +31,14 @@ use arrow_schema::{
3131
use parquet::arrow::PARQUET_FIELD_ID_META_KEY;
3232

3333
use crate::arrow::schema_to_arrow_schema;
34-
use crate::metadata_columns::get_metadata_field;
34+
use crate::metadata_columns::{get_metadata_field, metadata_field_primitive_type};
3535
use crate::spec::{
36-
Literal, PartitionSpec, PrimitiveLiteral, Schema as IcebergSchema, Struct, Transform,
36+
Datum, Literal, PartitionSpec, PrimitiveLiteral, PrimitiveType, Schema as IcebergSchema,
37+
Struct, Transform,
3738
};
3839
use crate::{Error, ErrorKind, Result};
3940

40-
/// Build a map of field ID to constant value for identity-partitioned fields.
41+
/// Build a map of field ID to constant value (as Datum) for identity-partitioned fields.
4142
///
4243
/// Implements Iceberg spec "Column Projection" rule #1: use partition metadata constants
4344
/// only for identity-transformed fields. Non-identity transforms (bucket, truncate, year, etc.)
@@ -54,20 +55,32 @@ use crate::{Error, ErrorKind, Result};
5455
fn constants_map(
5556
partition_spec: &PartitionSpec,
5657
partition_data: &Struct,
57-
) -> HashMap<i32, PrimitiveLiteral> {
58+
schema: &IcebergSchema,
59+
) -> Result<HashMap<i32, Datum>> {
5860
let mut constants = HashMap::new();
5961

6062
for (pos, field) in partition_spec.fields().iter().enumerate() {
6163
// Only identity transforms should use constant values from partition metadata
6264
if matches!(field.transform, Transform::Identity) {
6365
// Get the partition value for this field
6466
if let Some(Literal::Primitive(value)) = &partition_data[pos] {
65-
constants.insert(field.source_id, value.clone());
67+
// Get the field from schema to extract its type
68+
let iceberg_field = schema.field_by_id(field.source_id).ok_or(Error::new(
69+
ErrorKind::Unexpected,
70+
format!("Field {} not found in schema", field.source_id),
71+
))?;
72+
73+
// Extract the primitive type from the field
74+
if let crate::spec::Type::Primitive(prim_type) = &*iceberg_field.field_type {
75+
// Create a Datum from the primitive type and value
76+
let datum = Datum::new(prim_type.clone(), value.clone());
77+
constants.insert(field.source_id, datum);
78+
}
6679
}
6780
}
6881
}
6982

70-
constants
83+
Ok(constants)
7184
}
7285

7386
/// Indicates how a particular column in a processed RecordBatch should
@@ -153,7 +166,7 @@ enum SchemaComparison {
153166
pub(crate) struct RecordBatchTransformerBuilder {
154167
snapshot_schema: Arc<IcebergSchema>,
155168
projected_iceberg_field_ids: Vec<i32>,
156-
constant_fields: HashMap<i32, (DataType, PrimitiveLiteral)>,
169+
constant_fields: HashMap<i32, Datum>,
157170
}
158171

159172
impl RecordBatchTransformerBuilder {
@@ -173,11 +186,34 @@ impl RecordBatchTransformerBuilder {
173186
///
174187
/// # Arguments
175188
/// * `field_id` - The field ID to associate with the constant
176-
/// * `value` - The constant value for this field
177-
pub(crate) fn with_constant(mut self, field_id: i32, value: PrimitiveLiteral) -> Result<Self> {
178-
let arrow_type = RecordBatchTransformer::primitive_literal_to_arrow_type(&value)?;
179-
self.constant_fields.insert(field_id, (arrow_type, value));
180-
Ok(self)
189+
/// * `datum` - The constant value (with type) for this field
190+
pub(crate) fn with_constant(mut self, field_id: i32, datum: Datum) -> Self {
191+
self.constant_fields.insert(field_id, datum);
192+
self
193+
}
194+
195+
/// Add a reserved/metadata field with a constant string value.
196+
/// This is a convenience method for reserved fields like _file that automatically
197+
/// handles type extraction from the field definition.
198+
///
199+
/// # Arguments
200+
/// * `field_id` - The reserved field ID (e.g., RESERVED_FIELD_ID_FILE)
201+
/// * `value` - The constant string value for this field
202+
///
203+
/// # Returns
204+
/// Self for method chaining, or an error if the field is not a valid metadata field
205+
pub(crate) fn with_reserved_field(self, field_id: i32, value: String) -> Result<Self> {
206+
// Get the Iceberg field definition
207+
let iceberg_field = get_metadata_field(field_id)?;
208+
209+
// Extract the primitive type from the field
210+
let prim_type = metadata_field_primitive_type(&iceberg_field)?;
211+
212+
// Create a Datum with the extracted type and value
213+
let datum = Datum::new(prim_type, PrimitiveLiteral::String(value));
214+
215+
// Add the constant field
216+
Ok(self.with_constant(field_id, datum))
181217
}
182218

183219
/// Set partition spec and data together for identifying identity-transformed partition columns.
@@ -190,13 +226,13 @@ impl RecordBatchTransformerBuilder {
190226
partition_spec: Arc<PartitionSpec>,
191227
partition_data: Struct,
192228
) -> Result<Self> {
193-
// Compute partition constants for identity-transformed fields
194-
let partition_constants = constants_map(&partition_spec, &partition_data);
229+
// Compute partition constants for identity-transformed fields (already returns Datum)
230+
let partition_constants =
231+
constants_map(&partition_spec, &partition_data, &self.snapshot_schema)?;
195232

196-
// Add partition constants to constant_fields (compute REE types from literals)
197-
for (field_id, value) in partition_constants {
198-
let arrow_type = RecordBatchTransformer::primitive_literal_to_arrow_type(&value)?;
199-
self.constant_fields.insert(field_id, (arrow_type, value));
233+
// Add partition constants to constant_fields
234+
for (field_id, datum) in partition_constants {
235+
self.constant_fields.insert(field_id, datum);
200236
}
201237

202238
Ok(self)
@@ -246,10 +282,10 @@ impl RecordBatchTransformerBuilder {
246282
pub(crate) struct RecordBatchTransformer {
247283
snapshot_schema: Arc<IcebergSchema>,
248284
projected_iceberg_field_ids: Vec<i32>,
249-
// Pre-computed constant field information: field_id -> (arrow_type, value)
285+
// Pre-computed constant field information: field_id -> Datum
250286
// Includes both virtual/metadata fields (like _file) and identity-partitioned fields
251-
// Avoids type conversions during batch processing
252-
constant_fields: HashMap<i32, (DataType, PrimitiveLiteral)>,
287+
// Datum holds both the Iceberg type and the value
288+
constant_fields: HashMap<i32, Datum>,
253289

254290
// BatchTransform gets lazily constructed based on the schema of
255291
// the first RecordBatch we receive from the file
@@ -310,7 +346,7 @@ impl RecordBatchTransformer {
310346
source_schema: &ArrowSchemaRef,
311347
snapshot_schema: &IcebergSchema,
312348
projected_iceberg_field_ids: &[i32],
313-
constant_fields: &HashMap<i32, (DataType, PrimitiveLiteral)>,
349+
constant_fields: &HashMap<i32, Datum>,
314350
) -> Result<BatchTransform> {
315351
let mapped_unprojected_arrow_schema = Arc::new(schema_to_arrow_schema(snapshot_schema)?);
316352
let field_id_to_mapped_schema_map =
@@ -325,19 +361,28 @@ impl RecordBatchTransformer {
325361
if constant_fields.contains_key(field_id) {
326362
// For metadata/virtual fields (like _file), get name from metadata_columns
327363
// For partition fields, get name from schema (they exist in schema)
328-
if let Ok(field) = get_metadata_field(*field_id) {
329-
// This is a metadata/virtual field - use the predefined field
330-
Ok(field)
364+
if let Ok(iceberg_field) = get_metadata_field(*field_id) {
365+
// This is a metadata/virtual field - convert Iceberg field to Arrow
366+
let arrow_type =
367+
Self::datum_to_arrow_type(constant_fields.get(field_id).unwrap());
368+
let arrow_field =
369+
Field::new(&iceberg_field.name, arrow_type, !iceberg_field.required)
370+
.with_metadata(HashMap::from([(
371+
PARQUET_FIELD_ID_META_KEY.to_string(),
372+
iceberg_field.id.to_string(),
373+
)]));
374+
Ok(Arc::new(arrow_field))
331375
} else {
332376
// This is a partition constant field (exists in schema but uses constant value)
333377
let field = &field_id_to_mapped_schema_map
334378
.get(field_id)
335379
.ok_or(Error::new(ErrorKind::Unexpected, "field not found"))?
336380
.0;
337-
let (arrow_type, _) = constant_fields.get(field_id).unwrap();
381+
let datum = constant_fields.get(field_id).unwrap();
382+
let arrow_type = Self::datum_to_arrow_type(datum);
338383
// Use the type from constant_fields (REE for constants)
339384
let constant_field =
340-
Field::new(field.name(), arrow_type.clone(), field.is_nullable())
385+
Field::new(field.name(), arrow_type, field.is_nullable())
341386
.with_metadata(field.metadata().clone());
342387
Ok(Arc::new(constant_field))
343388
}
@@ -420,7 +465,7 @@ impl RecordBatchTransformer {
420465
snapshot_schema: &IcebergSchema,
421466
projected_iceberg_field_ids: &[i32],
422467
field_id_to_mapped_schema_map: HashMap<i32, (FieldRef, usize)>,
423-
constant_fields: &HashMap<i32, (DataType, PrimitiveLiteral)>,
468+
constant_fields: &HashMap<i32, Datum>,
424469
) -> Result<Vec<ColumnSource>> {
425470
let field_id_to_source_schema_map =
426471
Self::build_field_id_to_arrow_schema_map(source_schema)?;
@@ -432,10 +477,11 @@ impl RecordBatchTransformer {
432477
// Constant fields always use their pre-computed constant values, regardless of whether
433478
// they exist in the Parquet file. This is per Iceberg spec rule #1: partition metadata
434479
// is authoritative and should be preferred over file data.
435-
if let Some((arrow_type, value)) = constant_fields.get(field_id) {
480+
if let Some(datum) = constant_fields.get(field_id) {
481+
let arrow_type = Self::datum_to_arrow_type(datum);
436482
return Ok(ColumnSource::Add {
437-
value: Some(value.clone()),
438-
target_type: arrow_type.clone(),
483+
value: Some(datum.literal().clone()),
484+
target_type: arrow_type,
439485
});
440486
}
441487

@@ -791,10 +837,10 @@ impl RecordBatchTransformer {
791837
}
792838
}
793839

794-
/// Converts a PrimitiveLiteral to its corresponding Arrow DataType.
795-
/// This is used for constant fields to determine the Arrow type.
840+
/// Converts a Datum (Iceberg type + primitive literal) to its corresponding Arrow DataType.
841+
/// Uses the PrimitiveType from the Datum to determine the correct Arrow type.
796842
/// For constant values, we use Run-End Encoding for all types to save memory.
797-
fn primitive_literal_to_arrow_type(literal: &PrimitiveLiteral) -> Result<DataType> {
843+
fn datum_to_arrow_type(datum: &Datum) -> DataType {
798844
// Helper to create REE type with the given values type
799845
// Note: values field is nullable as Arrow expects this when building the
800846
// final Arrow schema with `RunArray::try_new`.
@@ -804,23 +850,27 @@ impl RecordBatchTransformer {
804850
DataType::RunEndEncoded(run_ends_field, values_field)
805851
};
806852

807-
Ok(match literal {
808-
PrimitiveLiteral::Boolean(_) => make_ree(DataType::Boolean),
809-
PrimitiveLiteral::Int(_) => make_ree(DataType::Int32),
810-
PrimitiveLiteral::Long(_) => make_ree(DataType::Int64),
811-
PrimitiveLiteral::Float(_) => make_ree(DataType::Float32),
812-
PrimitiveLiteral::Double(_) => make_ree(DataType::Float64),
813-
PrimitiveLiteral::String(_) => make_ree(DataType::Utf8),
814-
PrimitiveLiteral::Binary(_) => make_ree(DataType::Binary),
815-
PrimitiveLiteral::Int128(_) => make_ree(DataType::Decimal128(38, 0)),
816-
PrimitiveLiteral::UInt128(_) => make_ree(DataType::Decimal128(38, 0)),
817-
PrimitiveLiteral::AboveMax | PrimitiveLiteral::BelowMin => {
818-
return Err(Error::new(
819-
ErrorKind::Unexpected,
820-
"Cannot create arrow type for AboveMax/BelowMin literal",
821-
));
853+
// Match on the PrimitiveType from the Datum to determine the Arrow type
854+
match datum.data_type() {
855+
PrimitiveType::Boolean => make_ree(DataType::Boolean),
856+
PrimitiveType::Int => make_ree(DataType::Int32),
857+
PrimitiveType::Long => make_ree(DataType::Int64),
858+
PrimitiveType::Float => make_ree(DataType::Float32),
859+
PrimitiveType::Double => make_ree(DataType::Float64),
860+
PrimitiveType::Date => make_ree(DataType::Date32),
861+
PrimitiveType::Time => make_ree(DataType::Int64),
862+
PrimitiveType::Timestamp => make_ree(DataType::Int64),
863+
PrimitiveType::Timestamptz => make_ree(DataType::Int64),
864+
PrimitiveType::TimestampNs => make_ree(DataType::Int64),
865+
PrimitiveType::TimestamptzNs => make_ree(DataType::Int64),
866+
PrimitiveType::String => make_ree(DataType::Utf8),
867+
PrimitiveType::Uuid => make_ree(DataType::Binary),
868+
PrimitiveType::Fixed(_) => make_ree(DataType::Binary),
869+
PrimitiveType::Binary => make_ree(DataType::Binary),
870+
PrimitiveType::Decimal { precision, scale } => {
871+
make_ree(DataType::Decimal128(*precision as u8, *scale as i8))
822872
}
823-
})
873+
}
824874
}
825875
}
826876

crates/iceberg/src/metadata_columns.rs

Lines changed: 35 additions & 25 deletions
Original file line numberDiff line numberDiff line change
@@ -22,13 +22,11 @@
2222
//! during reading. Examples include the _file column (file path) and future
2323
//! columns like partition values or row numbers.
2424
25-
use std::collections::HashMap;
2625
use std::sync::Arc;
2726

28-
use arrow_schema::{DataType, Field};
2927
use once_cell::sync::Lazy;
30-
use parquet::arrow::PARQUET_FIELD_ID_META_KEY;
3128

29+
use crate::spec::{NestedField, NestedFieldRef, PrimitiveType, Type};
3230
use crate::{Error, ErrorKind, Result};
3331

3432
/// Reserved field ID for the file path (_file) column per Iceberg spec
@@ -37,40 +35,52 @@ pub const RESERVED_FIELD_ID_FILE: i32 = i32::MAX - 1;
3735
/// Reserved column name for the file path metadata column
3836
pub const RESERVED_COL_NAME_FILE: &str = "_file";
3937

40-
/// Lazy-initialized Arrow Field definition for the _file metadata column.
41-
/// Uses Run-End Encoding for memory efficiency.
42-
static FILE_FIELD: Lazy<Arc<Field>> = Lazy::new(|| {
43-
let run_ends_field = Arc::new(Field::new("run_ends", DataType::Int32, false));
44-
let values_field = Arc::new(Field::new("values", DataType::Utf8, true));
45-
Arc::new(
46-
Field::new(
47-
RESERVED_COL_NAME_FILE,
48-
DataType::RunEndEncoded(run_ends_field, values_field),
49-
false,
50-
)
51-
.with_metadata(HashMap::from([(
52-
PARQUET_FIELD_ID_META_KEY.to_string(),
53-
RESERVED_FIELD_ID_FILE.to_string(),
54-
)])),
55-
)
38+
/// Lazy-initialized Iceberg field definition for the _file metadata column.
39+
/// This field represents the file path as a required string field.
40+
static FILE_FIELD: Lazy<NestedFieldRef> = Lazy::new(|| {
41+
Arc::new(NestedField::required(
42+
RESERVED_FIELD_ID_FILE,
43+
RESERVED_COL_NAME_FILE,
44+
Type::Primitive(PrimitiveType::String),
45+
))
5646
});
5747

58-
/// Returns the Arrow Field definition for the _file metadata column.
48+
/// Returns the Iceberg field definition for the _file metadata column.
5949
///
6050
/// # Returns
61-
/// A reference to the _file field definition (RunEndEncoded type)
62-
pub fn file_field() -> &'static Arc<Field> {
51+
/// A reference to the _file field definition as an Iceberg NestedField
52+
pub fn file_field() -> &'static NestedFieldRef {
6353
&FILE_FIELD
6454
}
6555

66-
/// Returns the Arrow Field definition for a metadata field ID.
56+
/// Extracts the primitive type from a metadata field.
57+
///
58+
/// # Arguments
59+
/// * `field` - The metadata field
60+
///
61+
/// # Returns
62+
/// The PrimitiveType of the field, or an error if the field is not a primitive type
63+
pub fn metadata_field_primitive_type(field: &NestedFieldRef) -> Result<PrimitiveType> {
64+
field
65+
.field_type
66+
.as_primitive_type()
67+
.cloned()
68+
.ok_or_else(|| {
69+
Error::new(
70+
ErrorKind::Unexpected,
71+
format!("Metadata field '{}' must be a primitive type", field.name),
72+
)
73+
})
74+
}
75+
76+
/// Returns the Iceberg field definition for a metadata field ID.
6777
///
6878
/// # Arguments
6979
/// * `field_id` - The metadata field ID
7080
///
7181
/// # Returns
72-
/// The Arrow Field definition for the metadata column, or an error if not a metadata field
73-
pub fn get_metadata_field(field_id: i32) -> Result<Arc<Field>> {
82+
/// The Iceberg field definition for the metadata column, or an error if not a metadata field
83+
pub fn get_metadata_field(field_id: i32) -> Result<NestedFieldRef> {
7484
match field_id {
7585
RESERVED_FIELD_ID_FILE => Ok(Arc::clone(file_field())),
7686
_ if is_metadata_field(field_id) => {

0 commit comments

Comments
 (0)