diff --git a/crates/integrations/datafusion/src/physical_plan/commit.rs b/crates/integrations/datafusion/src/physical_plan/commit.rs index 0ee0504fa..067049c12 100644 --- a/crates/integrations/datafusion/src/physical_plan/commit.rs +++ b/crates/integrations/datafusion/src/physical_plan/commit.rs @@ -136,6 +136,14 @@ impl ExecutionPlan for IcebergCommitExec { vec![&self.input] } + fn required_input_distribution(&self) -> Vec { + vec![datafusion::physical_plan::Distribution::SinglePartition; self.children().len()] + } + + fn benefits_from_input_partitioning(&self) -> Vec { + vec![false] + } + fn with_new_children( self: Arc, children: Vec>, @@ -262,14 +270,16 @@ mod tests { use std::fmt; use std::sync::Arc; - use datafusion::arrow::array::{ArrayRef, RecordBatch, StringArray}; + use datafusion::arrow::array::{ArrayRef, Int32Array, RecordBatch, StringArray, UInt64Array}; use datafusion::arrow::datatypes::{DataType, Field, Schema as ArrowSchema}; + use datafusion::datasource::MemTable; use datafusion::execution::context::TaskContext; use datafusion::physical_expr::{EquivalenceProperties, Partitioning}; use datafusion::physical_plan::common::collect; use datafusion::physical_plan::execution_plan::Boundedness; use datafusion::physical_plan::stream::RecordBatchStreamAdapter; use datafusion::physical_plan::{DisplayAs, DisplayFormatType, ExecutionPlan, PlanProperties}; + use datafusion::prelude::*; use futures::StreamExt; use iceberg::memory::{MEMORY_CATALOG_WAREHOUSE, MemoryCatalogBuilder}; use iceberg::spec::{ @@ -280,6 +290,7 @@ mod tests { use super::*; use crate::physical_plan::DATA_FILES_COL_NAME; + use crate::table::IcebergTableProvider; // A mock execution plan that returns record batches with serialized data files #[derive(Debug)] @@ -510,4 +521,102 @@ mod tests { Ok(()) } + + #[tokio::test] + async fn test_datafusion_execution_partitioned_source() -> Result<(), Box> + { + let catalog = Arc::new( + MemoryCatalogBuilder::default() + .load( + "memory", + HashMap::from([( + MEMORY_CATALOG_WAREHOUSE.to_string(), + "memory://root".to_string(), + )]), + ) + .await?, + ); + + let namespace = NamespaceIdent::new("test_namespace".to_string()); + catalog.create_namespace(&namespace, HashMap::new()).await?; + + let schema = Schema::builder() + .with_schema_id(1) + .with_fields(vec![ + NestedField::required(1, "id", Type::Primitive(PrimitiveType::Int)).into(), + NestedField::required(2, "name", Type::Primitive(PrimitiveType::String)).into(), + ]) + .build()?; + + let table_name = "test_table"; + let table_creation = TableCreation::builder() + .name(table_name.to_string()) + .schema(schema) + .location("memory://root/test_table".to_string()) + .properties(HashMap::new()) + .build(); + let _ = catalog.create_table(&namespace, table_creation).await?; + + let arrow_schema = Arc::new(ArrowSchema::new(vec![ + Field::new("id", DataType::Int32, false), + Field::new("name", DataType::Utf8, false), + ])); + + let batches: Vec = (1..4) + .map(|idx| { + RecordBatch::try_new(arrow_schema.clone(), vec![ + Arc::new(Int32Array::from(vec![idx])) as ArrayRef, + Arc::new(StringArray::from(vec![format!("Name{idx}")])) as ArrayRef, + ]) + }) + .collect::>()?; + + // Create DataFusion context with specific partition configuration + let mut config = SessionConfig::new(); + config = config.set_usize("datafusion.execution.target_partitions", 8); + let ctx = SessionContext::new_with_config(config); + + // Create multiple partitions - each batch becomes a separate partition + let partitions: Vec> = + batches.into_iter().map(|batch| vec![batch]).collect(); + let source_table = Arc::new(MemTable::try_new(Arc::clone(&arrow_schema), partitions)?); + ctx.register_table("source_table", source_table)?; + + let iceberg_table_provider = IcebergTableProvider::try_new( + catalog.clone(), + namespace.clone(), + table_name.to_string(), + ) + .await?; + ctx.register_table("iceberg_table", Arc::new(iceberg_table_provider))?; + + let insert_plan = ctx + .sql("INSERT INTO iceberg_table SELECT * FROM source_table") + .await?; + + let physical_plan = insert_plan.create_physical_plan().await?; + + let actual_plan = format!( + "{}", + datafusion::physical_plan::displayable(physical_plan.as_ref()).indent(false) + ); + + println!("Physical plan:\n{actual_plan}"); + + let expected_plan = "\ +IcebergCommitExec: table=test_namespace.test_table + CoalescePartitionsExec + IcebergWriteExec: table=test_namespace.test_table + DataSourceExec: partitions=3, partition_sizes=[1, 1, 1]"; + + assert_eq!( + actual_plan.trim(), + expected_plan.trim(), + "Physical plan does not match expected\n\nExpected:\n{}\n\nActual:\n{}", + expected_plan.trim(), + actual_plan.trim() + ); + + Ok(()) + } } diff --git a/crates/integrations/datafusion/src/physical_plan/write.rs b/crates/integrations/datafusion/src/physical_plan/write.rs index 625405c95..c2d94a480 100644 --- a/crates/integrations/datafusion/src/physical_plan/write.rs +++ b/crates/integrations/datafusion/src/physical_plan/write.rs @@ -144,6 +144,16 @@ impl ExecutionPlan for IcebergWriteExec { self } + /// Prevents the introduction of additional `RepartitionExec` and processing input in parallel. + fn benefits_from_input_partitioning(&self) -> Vec { + vec![false] + } + + fn maintains_input_order(&self) -> Vec { + // Maintains ordering in the sense that the written file will reflect the ordering of the input. + vec![true; self.children().len()] + } + fn properties(&self) -> &PlanProperties { &self.plan_properties }