Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
111 changes: 110 additions & 1 deletion crates/integrations/datafusion/src/physical_plan/commit.rs
Original file line number Diff line number Diff line change
Expand Up @@ -136,6 +136,14 @@ impl ExecutionPlan for IcebergCommitExec {
vec![&self.input]
}

fn required_input_distribution(&self) -> Vec<datafusion::physical_plan::Distribution> {
vec![datafusion::physical_plan::Distribution::SinglePartition; self.children().len()]
}

fn benefits_from_input_partitioning(&self) -> Vec<bool> {
vec![false]
}

fn with_new_children(
self: Arc<Self>,
children: Vec<Arc<dyn ExecutionPlan>>,
Expand Down Expand Up @@ -262,14 +270,16 @@ mod tests {
use std::fmt;
use std::sync::Arc;

use datafusion::arrow::array::{ArrayRef, RecordBatch, StringArray};
use datafusion::arrow::array::{ArrayRef, Int32Array, RecordBatch, StringArray, UInt64Array};
use datafusion::arrow::datatypes::{DataType, Field, Schema as ArrowSchema};
use datafusion::datasource::MemTable;
use datafusion::execution::context::TaskContext;
use datafusion::physical_expr::{EquivalenceProperties, Partitioning};
use datafusion::physical_plan::common::collect;
use datafusion::physical_plan::execution_plan::Boundedness;
use datafusion::physical_plan::stream::RecordBatchStreamAdapter;
use datafusion::physical_plan::{DisplayAs, DisplayFormatType, ExecutionPlan, PlanProperties};
use datafusion::prelude::*;
use futures::StreamExt;
use iceberg::memory::{MEMORY_CATALOG_WAREHOUSE, MemoryCatalogBuilder};
use iceberg::spec::{
Expand All @@ -280,6 +290,7 @@ mod tests {

use super::*;
use crate::physical_plan::DATA_FILES_COL_NAME;
use crate::table::IcebergTableProvider;

// A mock execution plan that returns record batches with serialized data files
#[derive(Debug)]
Expand Down Expand Up @@ -510,4 +521,102 @@ mod tests {

Ok(())
}

#[tokio::test]
async fn test_datafusion_execution_partitioned_source() -> Result<(), Box<dyn std::error::Error>>
{
let catalog = Arc::new(
MemoryCatalogBuilder::default()
.load(
"memory",
HashMap::from([(
MEMORY_CATALOG_WAREHOUSE.to_string(),
"memory://root".to_string(),
)]),
)
.await?,
);

let namespace = NamespaceIdent::new("test_namespace".to_string());
catalog.create_namespace(&namespace, HashMap::new()).await?;

let schema = Schema::builder()
.with_schema_id(1)
.with_fields(vec![
NestedField::required(1, "id", Type::Primitive(PrimitiveType::Int)).into(),
NestedField::required(2, "name", Type::Primitive(PrimitiveType::String)).into(),
])
.build()?;

let table_name = "test_table";
let table_creation = TableCreation::builder()
.name(table_name.to_string())
.schema(schema)
.location("memory://root/test_table".to_string())
.properties(HashMap::new())
.build();
let _ = catalog.create_table(&namespace, table_creation).await?;

let arrow_schema = Arc::new(ArrowSchema::new(vec![
Field::new("id", DataType::Int32, false),
Field::new("name", DataType::Utf8, false),
]));

let batches: Vec<RecordBatch> = (1..4)
.map(|idx| {
RecordBatch::try_new(arrow_schema.clone(), vec![
Arc::new(Int32Array::from(vec![idx])) as ArrayRef,
Arc::new(StringArray::from(vec![format!("Name{idx}")])) as ArrayRef,
])
})
.collect::<Result<_, _>>()?;

// Create DataFusion context with specific partition configuration
let mut config = SessionConfig::new();
config = config.set_usize("datafusion.execution.target_partitions", 8);
let ctx = SessionContext::new_with_config(config);

// Create multiple partitions - each batch becomes a separate partition
let partitions: Vec<Vec<RecordBatch>> =
batches.into_iter().map(|batch| vec![batch]).collect();
let source_table = Arc::new(MemTable::try_new(Arc::clone(&arrow_schema), partitions)?);
ctx.register_table("source_table", source_table)?;

let iceberg_table_provider = IcebergTableProvider::try_new(
catalog.clone(),
namespace.clone(),
table_name.to_string(),
)
.await?;
ctx.register_table("iceberg_table", Arc::new(iceberg_table_provider))?;

let insert_plan = ctx
.sql("INSERT INTO iceberg_table SELECT * FROM source_table")
.await?;

let physical_plan = insert_plan.create_physical_plan().await?;

let actual_plan = format!(
"{}",
datafusion::physical_plan::displayable(physical_plan.as_ref()).indent(false)
);

println!("Physical plan:\n{actual_plan}");

let expected_plan = "\
IcebergCommitExec: table=test_namespace.test_table
CoalescePartitionsExec
IcebergWriteExec: table=test_namespace.test_table
DataSourceExec: partitions=3, partition_sizes=[1, 1, 1]";

assert_eq!(
actual_plan.trim(),
expected_plan.trim(),
"Physical plan does not match expected\n\nExpected:\n{}\n\nActual:\n{}",
expected_plan.trim(),
actual_plan.trim()
);

Ok(())
}
}
10 changes: 10 additions & 0 deletions crates/integrations/datafusion/src/physical_plan/write.rs
Original file line number Diff line number Diff line change
Expand Up @@ -144,6 +144,16 @@ impl ExecutionPlan for IcebergWriteExec {
self
}

/// Prevents the introduction of additional `RepartitionExec` and processing input in parallel.
fn benefits_from_input_partitioning(&self) -> Vec<bool> {
vec![false]
}

fn maintains_input_order(&self) -> Vec<bool> {
// Maintains ordering in the sense that the written file will reflect the ordering of the input.
vec![true; self.children().len()]
}

fn properties(&self) -> &PlanProperties {
&self.plan_properties
}
Expand Down
Loading