Skip to content

Commit 8d38a28

Browse files
committed
Improve IcebergCommitExec to correctly specify properties schema (#22)
1 parent dc34928 commit 8d38a28

File tree

2 files changed

+117
-1
lines changed

2 files changed

+117
-1
lines changed

crates/integrations/datafusion/src/physical_plan/commit.rs

Lines changed: 107 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -136,6 +136,14 @@ impl ExecutionPlan for IcebergCommitExec {
136136
vec![&self.input]
137137
}
138138

139+
fn required_input_distribution(&self) -> Vec<datafusion::physical_plan::Distribution> {
140+
vec![datafusion::physical_plan::Distribution::SinglePartition; self.children().len()]
141+
}
142+
143+
fn benefits_from_input_partitioning(&self) -> Vec<bool> {
144+
vec![false]
145+
}
146+
139147
fn with_new_children(
140148
self: Arc<Self>,
141149
children: Vec<Arc<dyn ExecutionPlan>>,
@@ -262,14 +270,16 @@ mod tests {
262270
use std::fmt;
263271
use std::sync::Arc;
264272

265-
use datafusion::arrow::array::{ArrayRef, RecordBatch, StringArray};
273+
use datafusion::arrow::array::{ArrayRef, Int32Array, RecordBatch, StringArray, UInt64Array};
266274
use datafusion::arrow::datatypes::{DataType, Field, Schema as ArrowSchema};
275+
use datafusion::datasource::MemTable;
267276
use datafusion::execution::context::TaskContext;
268277
use datafusion::physical_expr::{EquivalenceProperties, Partitioning};
269278
use datafusion::physical_plan::common::collect;
270279
use datafusion::physical_plan::execution_plan::Boundedness;
271280
use datafusion::physical_plan::stream::RecordBatchStreamAdapter;
272281
use datafusion::physical_plan::{DisplayAs, DisplayFormatType, ExecutionPlan, PlanProperties};
282+
use datafusion::prelude::*;
273283
use futures::StreamExt;
274284
use iceberg::memory::{MEMORY_CATALOG_WAREHOUSE, MemoryCatalogBuilder};
275285
use iceberg::spec::{
@@ -280,6 +290,7 @@ mod tests {
280290

281291
use super::*;
282292
use crate::physical_plan::DATA_FILES_COL_NAME;
293+
use crate::table::IcebergTableProvider;
283294

284295
// A mock execution plan that returns record batches with serialized data files
285296
#[derive(Debug)]
@@ -510,4 +521,99 @@ mod tests {
510521

511522
Ok(())
512523
}
524+
525+
#[tokio::test]
526+
async fn test_datafusion_execution_partitioned_source() -> Result<(), Box<dyn std::error::Error>>
527+
{
528+
let catalog = Arc::new(
529+
MemoryCatalogBuilder::default()
530+
.load(
531+
"memory",
532+
HashMap::from([(
533+
MEMORY_CATALOG_WAREHOUSE.to_string(),
534+
"memory://root".to_string(),
535+
)]),
536+
)
537+
.await?,
538+
);
539+
540+
let namespace = NamespaceIdent::new("test_namespace".to_string());
541+
catalog.create_namespace(&namespace, HashMap::new()).await?;
542+
543+
let schema = Schema::builder()
544+
.with_schema_id(1)
545+
.with_fields(vec![
546+
NestedField::required(1, "id", Type::Primitive(PrimitiveType::Int)).into(),
547+
NestedField::required(2, "name", Type::Primitive(PrimitiveType::String)).into(),
548+
])
549+
.build()?;
550+
551+
let table_name = "test_table";
552+
let table_creation = TableCreation::builder()
553+
.name(table_name.to_string())
554+
.schema(schema)
555+
.location("memory://root/test_table".to_string())
556+
.properties(HashMap::new())
557+
.build();
558+
let _ = catalog.create_table(&namespace, table_creation).await?;
559+
560+
let arrow_schema = Arc::new(ArrowSchema::new(vec![
561+
Field::new("id", DataType::Int32, false),
562+
Field::new("name", DataType::Utf8, false),
563+
]));
564+
565+
let batches: Vec<RecordBatch> = (1..4)
566+
.map(|idx| {
567+
RecordBatch::try_new(arrow_schema.clone(), vec![
568+
Arc::new(Int32Array::from(vec![idx])) as ArrayRef,
569+
Arc::new(StringArray::from(vec![format!("Name{idx}")])) as ArrayRef,
570+
])
571+
})
572+
.collect::<Result<_, _>>()?;
573+
574+
// Create DataFusion context with specific partition configuration
575+
let mut config = SessionConfig::new();
576+
config = config.set_usize("datafusion.execution.target_partitions", 8);
577+
let ctx = SessionContext::new_with_config(config);
578+
579+
// Create multiple partitions - each batch becomes a separate partition
580+
let partitions: Vec<Vec<RecordBatch>> =
581+
batches.into_iter().map(|batch| vec![batch]).collect();
582+
let source_table = Arc::new(MemTable::try_new(Arc::clone(&arrow_schema), partitions)?);
583+
ctx.register_table("source_table", source_table)?;
584+
585+
let table_ident = TableIdent::from_strs(vec!["test_namespace", "test_table"])?;
586+
let iceberg_table_provider =
587+
IcebergTableProvider::try_new(catalog.clone(), table_ident).await?;
588+
ctx.register_table("iceberg_table", Arc::new(iceberg_table_provider))?;
589+
590+
let insert_plan = ctx
591+
.sql("INSERT INTO iceberg_table SELECT * FROM source_table")
592+
.await?;
593+
594+
let physical_plan = insert_plan.create_physical_plan().await?;
595+
596+
let actual_plan = format!(
597+
"{}",
598+
datafusion::physical_plan::displayable(physical_plan.as_ref()).indent(false)
599+
);
600+
601+
println!("Physical plan:\n{actual_plan}");
602+
603+
let expected_plan = "\
604+
IcebergCommitExec: table=test_namespace.test_table
605+
CoalescePartitionsExec
606+
IcebergWriteExec: table=test_namespace.test_table
607+
DataSourceExec: partitions=3, partition_sizes=[1, 1, 1]";
608+
609+
assert_eq!(
610+
actual_plan.trim(),
611+
expected_plan.trim(),
612+
"Physical plan does not match expected\n\nExpected:\n{}\n\nActual:\n{}",
613+
expected_plan.trim(),
614+
actual_plan.trim()
615+
);
616+
617+
Ok(())
618+
}
513619
}

crates/integrations/datafusion/src/physical_plan/write.rs

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -144,6 +144,16 @@ impl ExecutionPlan for IcebergWriteExec {
144144
self
145145
}
146146

147+
/// Prevents the introduction of additional `RepartitionExec` and processing input in parallel.
148+
fn benefits_from_input_partitioning(&self) -> Vec<bool> {
149+
vec![false]
150+
}
151+
152+
fn maintains_input_order(&self) -> Vec<bool> {
153+
// Maintains ordering in the sense that the written file will reflect the ordering of the input.
154+
vec![true; self.children().len()]
155+
}
156+
147157
fn properties(&self) -> &PlanProperties {
148158
&self.plan_properties
149159
}

0 commit comments

Comments
 (0)