Skip to content

Commit 128acc8

Browse files
committed
Improve IcebergCommitExec to correctly specify properties schema (#22)
1 parent cdb2321 commit 128acc8

File tree

2 files changed

+117
-1
lines changed

2 files changed

+117
-1
lines changed

crates/integrations/datafusion/src/physical_plan/commit.rs

Lines changed: 107 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -139,6 +139,14 @@ impl ExecutionPlan for IcebergCommitExec {
139139
vec![&self.input]
140140
}
141141

142+
fn required_input_distribution(&self) -> Vec<datafusion::physical_plan::Distribution> {
143+
vec![datafusion::physical_plan::Distribution::SinglePartition; self.children().len()]
144+
}
145+
146+
fn benefits_from_input_partitioning(&self) -> Vec<bool> {
147+
vec![false]
148+
}
149+
142150
fn with_new_children(
143151
self: Arc<Self>,
144152
children: Vec<Arc<dyn ExecutionPlan>>,
@@ -265,14 +273,16 @@ mod tests {
265273
use std::fmt;
266274
use std::sync::Arc;
267275

268-
use datafusion::arrow::array::{ArrayRef, RecordBatch, StringArray};
276+
use datafusion::arrow::array::{ArrayRef, Int32Array, RecordBatch, StringArray, UInt64Array};
269277
use datafusion::arrow::datatypes::{DataType, Field, Schema as ArrowSchema};
278+
use datafusion::datasource::MemTable;
270279
use datafusion::execution::context::TaskContext;
271280
use datafusion::physical_expr::{EquivalenceProperties, Partitioning};
272281
use datafusion::physical_plan::common::collect;
273282
use datafusion::physical_plan::execution_plan::Boundedness;
274283
use datafusion::physical_plan::stream::RecordBatchStreamAdapter;
275284
use datafusion::physical_plan::{DisplayAs, DisplayFormatType, ExecutionPlan, PlanProperties};
285+
use datafusion::prelude::*;
276286
use futures::StreamExt;
277287
use iceberg::memory::{MEMORY_CATALOG_WAREHOUSE, MemoryCatalogBuilder};
278288
use iceberg::spec::{
@@ -283,6 +293,7 @@ mod tests {
283293

284294
use super::*;
285295
use crate::physical_plan::DATA_FILES_COL_NAME;
296+
use crate::table::IcebergTableProvider;
286297

287298
// A mock execution plan that returns record batches with serialized data files
288299
#[derive(Debug)]
@@ -513,4 +524,99 @@ mod tests {
513524

514525
Ok(())
515526
}
527+
528+
#[tokio::test]
529+
async fn test_datafusion_execution_partitioned_source() -> Result<(), Box<dyn std::error::Error>>
530+
{
531+
let catalog = Arc::new(
532+
MemoryCatalogBuilder::default()
533+
.load(
534+
"memory",
535+
HashMap::from([(
536+
MEMORY_CATALOG_WAREHOUSE.to_string(),
537+
"memory://root".to_string(),
538+
)]),
539+
)
540+
.await?,
541+
);
542+
543+
let namespace = NamespaceIdent::new("test_namespace".to_string());
544+
catalog.create_namespace(&namespace, HashMap::new()).await?;
545+
546+
let schema = Schema::builder()
547+
.with_schema_id(1)
548+
.with_fields(vec![
549+
NestedField::required(1, "id", Type::Primitive(PrimitiveType::Int)).into(),
550+
NestedField::required(2, "name", Type::Primitive(PrimitiveType::String)).into(),
551+
])
552+
.build()?;
553+
554+
let table_name = "test_table";
555+
let table_creation = TableCreation::builder()
556+
.name(table_name.to_string())
557+
.schema(schema)
558+
.location("memory://root/test_table".to_string())
559+
.properties(HashMap::new())
560+
.build();
561+
let _ = catalog.create_table(&namespace, table_creation).await?;
562+
563+
let arrow_schema = Arc::new(ArrowSchema::new(vec![
564+
Field::new("id", DataType::Int32, false),
565+
Field::new("name", DataType::Utf8, false),
566+
]));
567+
568+
let batches: Vec<RecordBatch> = (1..4)
569+
.map(|idx| {
570+
RecordBatch::try_new(arrow_schema.clone(), vec![
571+
Arc::new(Int32Array::from(vec![idx])) as ArrayRef,
572+
Arc::new(StringArray::from(vec![format!("Name{idx}")])) as ArrayRef,
573+
])
574+
})
575+
.collect::<Result<_, _>>()?;
576+
577+
// Create DataFusion context with specific partition configuration
578+
let mut config = SessionConfig::new();
579+
config = config.set_usize("datafusion.execution.target_partitions", 8);
580+
let ctx = SessionContext::new_with_config(config);
581+
582+
// Create multiple partitions - each batch becomes a separate partition
583+
let partitions: Vec<Vec<RecordBatch>> =
584+
batches.into_iter().map(|batch| vec![batch]).collect();
585+
let source_table = Arc::new(MemTable::try_new(Arc::clone(&arrow_schema), partitions)?);
586+
ctx.register_table("source_table", source_table)?;
587+
588+
let table_ident = TableIdent::from_strs(vec!["test_namespace", "test_table"])?;
589+
let iceberg_table_provider =
590+
IcebergTableProvider::try_new(catalog.clone(), table_ident).await?;
591+
ctx.register_table("iceberg_table", Arc::new(iceberg_table_provider))?;
592+
593+
let insert_plan = ctx
594+
.sql("INSERT INTO iceberg_table SELECT * FROM source_table")
595+
.await?;
596+
597+
let physical_plan = insert_plan.create_physical_plan().await?;
598+
599+
let actual_plan = format!(
600+
"{}",
601+
datafusion::physical_plan::displayable(physical_plan.as_ref()).indent(false)
602+
);
603+
604+
println!("Physical plan:\n{actual_plan}");
605+
606+
let expected_plan = "\
607+
IcebergCommitExec: table=test_namespace.test_table
608+
CoalescePartitionsExec
609+
IcebergWriteExec: table=test_namespace.test_table
610+
DataSourceExec: partitions=3, partition_sizes=[1, 1, 1]";
611+
612+
assert_eq!(
613+
actual_plan.trim(),
614+
expected_plan.trim(),
615+
"Physical plan does not match expected\n\nExpected:\n{}\n\nActual:\n{}",
616+
expected_plan.trim(),
617+
actual_plan.trim()
618+
);
619+
620+
Ok(())
621+
}
516622
}

crates/integrations/datafusion/src/physical_plan/write.rs

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -144,6 +144,16 @@ impl ExecutionPlan for IcebergWriteExec {
144144
self
145145
}
146146

147+
/// Prevents the introduction of additional `RepartitionExec` and processing input in parallel.
148+
fn benefits_from_input_partitioning(&self) -> Vec<bool> {
149+
vec![false]
150+
}
151+
152+
fn maintains_input_order(&self) -> Vec<bool> {
153+
// Maintains ordering in the sense that the written file will reflect the ordering of the input.
154+
vec![true; self.children().len()]
155+
}
156+
147157
fn properties(&self) -> &PlanProperties {
148158
&self.plan_properties
149159
}

0 commit comments

Comments
 (0)