diff --git a/crates/iceberg/src/arrow/mod.rs b/crates/iceberg/src/arrow/mod.rs index 949f84241..c5d92dd3f 100644 --- a/crates/iceberg/src/arrow/mod.rs +++ b/crates/iceberg/src/arrow/mod.rs @@ -31,6 +31,7 @@ mod reader; pub(crate) mod record_batch_projector; pub(crate) mod record_batch_transformer; mod value; +mod partition_value_visitor; pub use reader::*; pub use value::*; diff --git a/crates/iceberg/src/arrow/partition_value_visitor.rs b/crates/iceberg/src/arrow/partition_value_visitor.rs new file mode 100644 index 000000000..ad60340b3 --- /dev/null +++ b/crates/iceberg/src/arrow/partition_value_visitor.rs @@ -0,0 +1,203 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +//! Schema visitor for partition value extraction + +use std::collections::HashMap; +use std::sync::Arc; + +use arrow_array::{ArrayRef, RecordBatch, StructArray}; +use crate::{Error, ErrorKind, Result}; +use crate::arrow::{ArrowArrayAccessor, FieldMatchMode}; +use crate::spec::{ListType, MapType, NestedFieldRef, PartitionField, PrimitiveType, Schema, SchemaRef, SchemaWithPartnerVisitor, StructType, visit_struct_with_partner}; +use crate::transform::{create_transform_function, BoxedTransformFunction}; + +/// Visitor which extracts partition values from a record batch based on partition fields +pub struct PartitionValueVisitor { + /// Map from source ids to their fields + source_id_to_field: HashMap, + /// Match mode for finding columns in Arrow struct + match_mode: FieldMatchMode, + /// Store the partition values temporarily during computation + partition_values: Vec, + /// Current field ID being processed + current_transform_fn: Option, +} + +impl PartitionValueVisitor { + /// Creates new instance of PartitionValueVisitor + #[allow(dead_code)] + pub fn new(partition_fields: Vec) -> Self { + Self::new_with_match_mode(partition_fields, FieldMatchMode::Name) + } + + /// Creates new instance of PartitionValueVisitor with explicit match mode + #[allow(dead_code)] + pub fn new_with_match_mode( + partition_fields: Vec, + match_mode: FieldMatchMode, + ) -> Self { + Self { + source_id_to_field: partition_fields + .into_iter() + .map(|field| (field.source_id, field)) + .collect(), + match_mode, + partition_values: vec![], + current_transform_fn: None, + } + } + + /// Compute partition values in given schema and record batch + #[allow(dead_code)] + pub fn compute( + &mut self, + schema: SchemaRef, + batch: RecordBatch, + ) -> Result> { + self.partition_values = vec![]; + + let arrow_accessor = ArrowArrayAccessor::new_with_match_mode(self.match_mode); + + let struct_arr = Arc::new(StructArray::from(batch)) as ArrayRef; + visit_struct_with_partner( + schema.as_struct(), + &struct_arr, + self, + &arrow_accessor, + )?; + + Ok(std::mem::take(&mut self.partition_values)) + } + + /// Check if the current field is a source field, if so, create a transform function for it + fn check_and_create_transform_fn(&mut self, field: &NestedFieldRef) -> Result<()> { + self.current_transform_fn = match self.source_id_to_field.get(&field.id) { + Some(partition_field) => { + if field.field_type.is_primitive() { + Some(create_transform_function(&partition_field.transform)?) + } else { + return Err(Error::new( + ErrorKind::DataInvalid, + format!( + "Cannot partition by non-primitive source field: '{field}'.", + ), + )); + } + + }, + None => None, + }; + + Ok(()) + } +} + +impl SchemaWithPartnerVisitor for PartitionValueVisitor { + type T = (); + + fn before_struct_field(&mut self, field: &NestedFieldRef, _partner: &ArrayRef) -> Result<()> { + self.check_and_create_transform_fn(field)?; + Ok(()) + } + + fn after_struct_field(&mut self, _field: &NestedFieldRef, _partner: &ArrayRef) -> Result<()> { + self.current_transform_fn = None; + Ok(()) + } + + fn before_list_element(&mut self, field: &NestedFieldRef, _partner: &ArrayRef) -> Result<()> { + self.check_and_create_transform_fn(field)?; + Ok(()) + } + + fn after_list_element(&mut self, _field: &NestedFieldRef, _partner: &ArrayRef) -> Result<()> { + self.current_transform_fn = None; + Ok(()) + } + + fn before_map_key(&mut self, field: &NestedFieldRef, _partner: &ArrayRef) -> Result<()> { + self.check_and_create_transform_fn(field)?; + Ok(()) + } + + fn after_map_key(&mut self, _field: &NestedFieldRef, _partner: &ArrayRef) -> Result<()> { + self.current_transform_fn = None; + Ok(()) + } + + fn before_map_value(&mut self, field: &NestedFieldRef, _partner: &ArrayRef) -> Result<()> { + self.check_and_create_transform_fn(field)?; + Ok(()) + } + + fn after_map_value(&mut self, _field: &NestedFieldRef, _partner: &ArrayRef) -> Result<()> { + self.current_transform_fn = None; + Ok(()) + } + + fn schema( + &mut self, + _schema: &Schema, + _partner: &ArrayRef, + _value: Self::T, + ) -> Result { + Ok(()) + } + + fn field( + &mut self, + _field: &NestedFieldRef, + _partner: &ArrayRef, + _value: Self::T, + ) -> Result { + Ok(()) + } + + fn r#struct( + &mut self, + _struct: &StructType, + _partner: &ArrayRef, + _results: Vec, + ) -> Result { + Ok(()) + } + + fn list(&mut self, _list: &ListType, _list_arr: &ArrayRef, _value: Self::T) -> Result { + Ok(()) + } + + fn map( + &mut self, + _map: &MapType, + _partner: &ArrayRef, + _key_value: Self::T, + _value: Self::T, + ) -> Result { + Ok(()) + } + + fn primitive(&mut self, _p: &PrimitiveType, col: &ArrayRef) -> Result { + // If the transform fn is some, then it means we are visiting a source field, + // and we should apply the current transform fn + if let Some(transform_fn) = &self.current_transform_fn { + self.partition_values.push(transform_fn.transform(col.clone())?); + } + + Ok(()) + } +} diff --git a/crates/integrations/datafusion/src/physical_plan/mod.rs b/crates/integrations/datafusion/src/physical_plan/mod.rs index fcfd11a45..09295feb1 100644 --- a/crates/integrations/datafusion/src/physical_plan/mod.rs +++ b/crates/integrations/datafusion/src/physical_plan/mod.rs @@ -19,6 +19,7 @@ pub(crate) mod commit; pub(crate) mod expr_to_predicate; pub(crate) mod metadata_scan; pub(crate) mod scan; +pub(crate) mod sort; pub(crate) mod write; pub(crate) const DATA_FILES_COL_NAME: &str = "data_files"; diff --git a/crates/integrations/datafusion/src/physical_plan/sort.rs b/crates/integrations/datafusion/src/physical_plan/sort.rs new file mode 100644 index 000000000..d3926dfe1 --- /dev/null +++ b/crates/integrations/datafusion/src/physical_plan/sort.rs @@ -0,0 +1,227 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +use std::any::Any; +use std::fmt::{Debug, Formatter}; +use std::sync::Arc; + +use datafusion::arrow::compute::SortOptions; +use datafusion::arrow::datatypes::SchemaRef as ArrowSchemaRef; +use datafusion::common::Result as DFResult; +use datafusion::error::DataFusionError; +use datafusion::execution::TaskContext; +use datafusion::physical_expr::{LexOrdering, PhysicalExpr, PhysicalSortExpr}; +use datafusion::physical_plan::execution_plan::{Boundedness, EmissionType}; +use datafusion::physical_plan::expressions::Column; +use datafusion::physical_plan::sorts::sort::SortExec; +use datafusion::physical_plan::{ + DisplayAs, DisplayFormatType, ExecutionPlan, PlanProperties, SendableRecordBatchStream, +}; +use iceberg::spec::{PartitionSpecRef, Transform}; +use iceberg::table::Table; + +/// An execution plan that sorts incoming data by Iceberg partition values. +/// +/// This execution plan takes input data that has been repartitioned and sorts it by +/// partition values within each partition. This ensures that data belonging to the +/// same Iceberg partition is grouped together, allowing a single writer to efficiently +/// process it in subsequent steps. +#[derive(Debug)] +pub struct IcebergPartitionSortExec { + table: Table, + input: Arc, + sort_exprs: Vec, + cache: PlanProperties, +} + +impl IcebergPartitionSortExec { + /// Create a new IcebergPartitionSortExec + pub fn new(input: Arc, table: Table) -> DFResult { + // Extract partition spec from table + let partition_spec = table.metadata().default_partition_spec(); + + // Generate sort expressions from partition spec + let sort_exprs = Self::create_sort_expressions(partition_spec, input.schema())?; + + // Compute plan properties + let cache = Self::compute_properties(&input); + + Ok(Self { + input, + table, + sort_exprs, + cache, + }) + } + + /// Create sort expressions from partition spec + fn create_sort_expressions( + partition_spec: &PartitionSpecRef, + schema: ArrowSchemaRef, + ) -> DFResult> { + if partition_spec.is_unpartitioned() { + return Err(DataFusionError::Execution( + "IcebergPartitionSortExec is expected to be used on partitioned table only!" + .to_string(), + )); + } + + let mut sort_exprs = Vec::new(); + + // For each partition field, create a sort expression + for field in partition_spec.fields() { + // Skip void transforms as they don't contribute to sorting + if matches!(field.transform, Transform::Void) { + continue; + } + + // todo revisit this part, the input schema may not have field ids, and using field ids as indices is wrong + // Find the column in the schema that corresponds to the source_id + // In a real implementation, we would need to map from Iceberg schema to Arrow schema + let source_id_usize = field.source_id as usize; + let schema_len = schema.fields().len(); + let column_index_and_name = if source_id_usize >= schema_len { + None + } else { + let f = schema.field(source_id_usize); + Some((source_id_usize, f.name().to_string())) + }; + + let column_index_and_name = column_index_and_name.ok_or_else(|| { + DataFusionError::Internal(format!( + "Could not find column for source_id: {}", + field.source_id + )) + })?; + + // todo need to handle partition value transform as well (how without field ids??) + // Create a physical expression based on the transform + // For now, we'll just use the column directly for all transforms + let expr: Arc = Arc::new(Column::new( + &column_index_and_name.1, + column_index_and_name.0, + )); + + // Add to sort expressions + sort_exprs.push(PhysicalSortExpr { + expr, + options: SortOptions::default(), // Ascending, nulls last + }); + } + + Ok(sort_exprs) + } + + /// Compute plan properties + fn compute_properties(input: &Arc) -> PlanProperties { + PlanProperties::new( + // Equivalence properties would be calculated in [`SortExec`] according to the sort expressions + input.properties().equivalence_properties().clone(), + input.properties().output_partitioning().clone(), + EmissionType::Final, + Boundedness::Bounded, + ) + } +} + +impl ExecutionPlan for IcebergPartitionSortExec { + fn name(&self) -> &str { + "IcebergPartitionSortExec" + } + + fn as_any(&self) -> &dyn Any { + self + } + + fn properties(&self) -> &PlanProperties { + &self.cache + } + + fn children(&self) -> Vec<&Arc> { + vec![&self.input] + } + + fn with_new_children( + self: Arc, + children: Vec>, + ) -> DFResult> { + if children.len() != 1 { + return Err(DataFusionError::Internal(format!( + "IcebergPartitionSortExec expects exactly one child, got {}", + children.len() + ))); + } + + // Create a new instance with the new child + IcebergPartitionSortExec::new(Arc::clone(&children[0]), self.table.clone()) + .map(|exec| Arc::new(exec) as Arc) + } + + fn execute( + &self, + partition: usize, + context: Arc, + ) -> DFResult { + // Convert Vec to LexOrdering + let lex_ordering = LexOrdering::from(self.sort_exprs.clone()); + + // We always set preserve_partitioning to true to ensure the output partitioning + // is the same as the input partitioning + let sort_exec = Arc::new( + SortExec::new(lex_ordering, Arc::clone(&self.input)).with_preserve_partitioning(true), + ); + + // Execute the sort + sort_exec.execute(partition, context) + } +} + +impl DisplayAs for IcebergPartitionSortExec { + fn fmt_as(&self, t: DisplayFormatType, f: &mut Formatter) -> std::fmt::Result { + match t { + DisplayFormatType::Default => { + write!( + f, + "IcebergPartitionSortExec: table={}", + self.table.identifier() + ) + } + DisplayFormatType::Verbose => { + write!( + f, + "IcebergPartitionSortExec: table={}, sort_exprs=[{}]", + self.table.identifier(), + self.sort_exprs + .iter() + .map(|e| format!("{}", e)) + .collect::>() + .join(", ") + ) + } + DisplayFormatType::TreeRender => { + write!( + f, + "IcebergPartitionSortExec: table={}", + self.table.identifier() + ) + } + } + } +} + +#[cfg(test)] +mod tests {}