Skip to content

Commit 69fe87a

Browse files
committed
Improve IcebergCommitExec to correctly specify properties schema (#22)
1 parent 15e2fc9 commit 69fe87a

File tree

2 files changed

+122
-3
lines changed

2 files changed

+122
-3
lines changed

crates/integrations/datafusion/src/physical_plan/commit.rs

Lines changed: 112 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -57,14 +57,17 @@ impl IcebergCommitExec {
5757
input: Arc<dyn ExecutionPlan>,
5858
schema: ArrowSchemaRef,
5959
) -> Self {
60-
let plan_properties = Self::compute_properties(schema.clone());
60+
61+
let count_schema = Self::make_count_schema();
62+
63+
let plan_properties = Self::compute_properties(Arc::clone(&count_schema));
6164

6265
Self {
6366
table,
6467
catalog,
6568
input,
6669
schema,
67-
count_schema: Self::make_count_schema(),
70+
count_schema,
6871
plan_properties,
6972
}
7073
}
@@ -136,6 +139,14 @@ impl ExecutionPlan for IcebergCommitExec {
136139
vec![&self.input]
137140
}
138141

142+
fn required_input_distribution(&self) -> Vec<datafusion::physical_plan::Distribution> {
143+
vec![datafusion::physical_plan::Distribution::SinglePartition; self.children().len()]
144+
}
145+
146+
fn benefits_from_input_partitioning(&self) -> Vec<bool> {
147+
vec![false]
148+
}
149+
139150
fn with_new_children(
140151
self: Arc<Self>,
141152
children: Vec<Arc<dyn ExecutionPlan>>,
@@ -262,14 +273,16 @@ mod tests {
262273
use std::fmt;
263274
use std::sync::Arc;
264275

265-
use datafusion::arrow::array::{ArrayRef, RecordBatch, StringArray};
276+
use datafusion::arrow::array::{ArrayRef, Int32Array, RecordBatch, StringArray, UInt64Array};
266277
use datafusion::arrow::datatypes::{DataType, Field, Schema as ArrowSchema};
278+
use datafusion::datasource::MemTable;
267279
use datafusion::execution::context::TaskContext;
268280
use datafusion::physical_expr::{EquivalenceProperties, Partitioning};
269281
use datafusion::physical_plan::common::collect;
270282
use datafusion::physical_plan::execution_plan::Boundedness;
271283
use datafusion::physical_plan::stream::RecordBatchStreamAdapter;
272284
use datafusion::physical_plan::{DisplayAs, DisplayFormatType, ExecutionPlan, PlanProperties};
285+
use datafusion::prelude::*;
273286
use futures::StreamExt;
274287
use iceberg::memory::{MEMORY_CATALOG_WAREHOUSE, MemoryCatalogBuilder};
275288
use iceberg::spec::{
@@ -280,6 +293,7 @@ mod tests {
280293

281294
use super::*;
282295
use crate::physical_plan::DATA_FILES_COL_NAME;
296+
use crate::table::IcebergTableProvider;
283297

284298
// A mock execution plan that returns record batches with serialized data files
285299
#[derive(Debug)]
@@ -510,4 +524,99 @@ mod tests {
510524

511525
Ok(())
512526
}
527+
528+
#[tokio::test]
529+
async fn test_datafusion_execution_partitioned_source() -> Result<(), Box<dyn std::error::Error>>
530+
{
531+
let catalog = Arc::new(
532+
MemoryCatalogBuilder::default()
533+
.load(
534+
"memory",
535+
HashMap::from([(
536+
MEMORY_CATALOG_WAREHOUSE.to_string(),
537+
"memory://root".to_string(),
538+
)]),
539+
)
540+
.await?,
541+
);
542+
543+
let namespace = NamespaceIdent::new("test_namespace".to_string());
544+
catalog.create_namespace(&namespace, HashMap::new()).await?;
545+
546+
let schema = Schema::builder()
547+
.with_schema_id(1)
548+
.with_fields(vec![
549+
NestedField::required(1, "id", Type::Primitive(PrimitiveType::Int)).into(),
550+
NestedField::required(2, "name", Type::Primitive(PrimitiveType::String)).into(),
551+
])
552+
.build()?;
553+
554+
let table_name = "test_table";
555+
let table_creation = TableCreation::builder()
556+
.name(table_name.to_string())
557+
.schema(schema)
558+
.location("memory://root/test_table".to_string())
559+
.properties(HashMap::new())
560+
.build();
561+
let _ = catalog.create_table(&namespace, table_creation).await?;
562+
563+
let arrow_schema = Arc::new(ArrowSchema::new(vec![
564+
Field::new("id", DataType::Int32, false),
565+
Field::new("name", DataType::Utf8, false),
566+
]));
567+
568+
let batches: Vec<RecordBatch> = (1..4)
569+
.map(|idx| {
570+
RecordBatch::try_new(arrow_schema.clone(), vec![
571+
Arc::new(Int32Array::from(vec![idx])) as ArrayRef,
572+
Arc::new(StringArray::from(vec![format!("Name{idx}")])) as ArrayRef,
573+
])
574+
})
575+
.collect::<Result<_, _>>()?;
576+
577+
// Create DataFusion context with specific partition configuration
578+
let mut config = SessionConfig::new();
579+
config = config.set_usize("datafusion.execution.target_partitions", 8);
580+
let ctx = SessionContext::new_with_config(config);
581+
582+
// Create multiple partitions - each batch becomes a separate partition
583+
let partitions: Vec<Vec<RecordBatch>> =
584+
batches.into_iter().map(|batch| vec![batch]).collect();
585+
let source_table = Arc::new(MemTable::try_new(Arc::clone(&arrow_schema), partitions)?);
586+
ctx.register_table("source_table", source_table)?;
587+
588+
let table_ident = TableIdent::from_strs(vec!["test_namespace", "test_table"])?;
589+
let iceberg_table_provider =
590+
IcebergTableProvider::try_new(catalog.clone(), table_ident).await?;
591+
ctx.register_table("iceberg_table", Arc::new(iceberg_table_provider))?;
592+
593+
let insert_plan = ctx
594+
.sql("INSERT INTO iceberg_table SELECT * FROM source_table")
595+
.await?;
596+
597+
let physical_plan = insert_plan.create_physical_plan().await?;
598+
599+
let actual_plan = format!(
600+
"{}",
601+
datafusion::physical_plan::displayable(physical_plan.as_ref()).indent(false)
602+
);
603+
604+
println!("Physical plan:\n{actual_plan}");
605+
606+
let expected_plan = "\
607+
IcebergCommitExec: table=test_namespace.test_table
608+
CoalescePartitionsExec
609+
IcebergWriteExec: table=test_namespace.test_table
610+
DataSourceExec: partitions=3, partition_sizes=[1, 1, 1]";
611+
612+
assert_eq!(
613+
actual_plan.trim(),
614+
expected_plan.trim(),
615+
"Physical plan does not match expected\n\nExpected:\n{}\n\nActual:\n{}",
616+
expected_plan.trim(),
617+
actual_plan.trim()
618+
);
619+
620+
Ok(())
621+
}
513622
}

crates/integrations/datafusion/src/physical_plan/write.rs

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -144,6 +144,16 @@ impl ExecutionPlan for IcebergWriteExec {
144144
self
145145
}
146146

147+
/// Prevents the introduction of additional `RepartitionExec` and processing input in parallel.
148+
fn benefits_from_input_partitioning(&self) -> Vec<bool> {
149+
vec![false]
150+
}
151+
152+
fn maintains_input_order(&self) -> Vec<bool> {
153+
// Maintains ordering in the sense that the written file will reflect the ordering of the input.
154+
vec![true; self.children().len()]
155+
}
156+
147157
fn properties(&self) -> &PlanProperties {
148158
&self.plan_properties
149159
}

0 commit comments

Comments
 (0)