Skip to content

Commit f6e5ebe

Browse files
committed
fix: prevent DataFusion RepartitionExec insertion into IcebergCommitExec
Ensures DataFusion coalesces input partitions automatically before commit, fixing partitioning issues in Iceberg write operations (partial writes).
1 parent cdb2321 commit f6e5ebe

File tree

2 files changed

+136
-3
lines changed

2 files changed

+136
-3
lines changed

crates/integrations/datafusion/src/physical_plan/commit.rs

Lines changed: 107 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -139,6 +139,14 @@ impl ExecutionPlan for IcebergCommitExec {
139139
vec![&self.input]
140140
}
141141

142+
fn required_input_distribution(&self) -> Vec<datafusion::physical_plan::Distribution> {
143+
vec![datafusion::physical_plan::Distribution::SinglePartition; self.children().len()]
144+
}
145+
146+
fn benefits_from_input_partitioning(&self) -> Vec<bool> {
147+
vec![false]
148+
}
149+
142150
fn with_new_children(
143151
self: Arc<Self>,
144152
children: Vec<Arc<dyn ExecutionPlan>>,
@@ -265,14 +273,16 @@ mod tests {
265273
use std::fmt;
266274
use std::sync::Arc;
267275

268-
use datafusion::arrow::array::{ArrayRef, RecordBatch, StringArray};
276+
use datafusion::arrow::array::{ArrayRef, Int32Array, RecordBatch, StringArray, UInt64Array};
269277
use datafusion::arrow::datatypes::{DataType, Field, Schema as ArrowSchema};
278+
use datafusion::datasource::MemTable;
270279
use datafusion::execution::context::TaskContext;
271280
use datafusion::physical_expr::{EquivalenceProperties, Partitioning};
272281
use datafusion::physical_plan::common::collect;
273282
use datafusion::physical_plan::execution_plan::Boundedness;
274283
use datafusion::physical_plan::stream::RecordBatchStreamAdapter;
275284
use datafusion::physical_plan::{DisplayAs, DisplayFormatType, ExecutionPlan, PlanProperties};
285+
use datafusion::prelude::*;
276286
use futures::StreamExt;
277287
use iceberg::memory::{MEMORY_CATALOG_WAREHOUSE, MemoryCatalogBuilder};
278288
use iceberg::spec::{
@@ -283,6 +293,7 @@ mod tests {
283293

284294
use super::*;
285295
use crate::physical_plan::DATA_FILES_COL_NAME;
296+
use crate::table::IcebergTableProvider;
286297

287298
// A mock execution plan that returns record batches with serialized data files
288299
#[derive(Debug)]
@@ -513,4 +524,99 @@ mod tests {
513524

514525
Ok(())
515526
}
527+
528+
#[tokio::test]
529+
async fn test_datafusion_execution_partitioned_source() -> Result<(), Box<dyn std::error::Error>>
530+
{
531+
let catalog = Arc::new(
532+
MemoryCatalogBuilder::default()
533+
.load(
534+
"memory",
535+
HashMap::from([(
536+
MEMORY_CATALOG_WAREHOUSE.to_string(),
537+
"memory://root".to_string(),
538+
)]),
539+
)
540+
.await?,
541+
);
542+
543+
let namespace = NamespaceIdent::new("test_namespace".to_string());
544+
catalog.create_namespace(&namespace, HashMap::new()).await?;
545+
546+
let schema = Schema::builder()
547+
.with_schema_id(1)
548+
.with_fields(vec![
549+
NestedField::required(1, "id", Type::Primitive(PrimitiveType::Int)).into(),
550+
NestedField::required(2, "name", Type::Primitive(PrimitiveType::String)).into(),
551+
])
552+
.build()?;
553+
554+
let table_name = "test_table";
555+
let table_creation = TableCreation::builder()
556+
.name(table_name.to_string())
557+
.schema(schema)
558+
.location("memory://root/test_table".to_string())
559+
.properties(HashMap::new())
560+
.build();
561+
let _ = catalog.create_table(&namespace, table_creation).await?;
562+
563+
let arrow_schema = Arc::new(ArrowSchema::new(vec![
564+
Field::new("id", DataType::Int32, false),
565+
Field::new("name", DataType::Utf8, false),
566+
]));
567+
568+
let batches: Vec<RecordBatch> = (1..4)
569+
.map(|idx| {
570+
RecordBatch::try_new(arrow_schema.clone(), vec![
571+
Arc::new(Int32Array::from(vec![idx])) as ArrayRef,
572+
Arc::new(StringArray::from(vec![format!("Name{idx}")])) as ArrayRef,
573+
])
574+
})
575+
.collect::<Result<_, _>>()?;
576+
577+
// Create DataFusion context with specific partition configuration
578+
let mut config = SessionConfig::new();
579+
config = config.set_usize("datafusion.execution.target_partitions", 8);
580+
let ctx = SessionContext::new_with_config(config);
581+
582+
// Create multiple partitions - each batch becomes a separate partition
583+
let partitions: Vec<Vec<RecordBatch>> =
584+
batches.into_iter().map(|batch| vec![batch]).collect();
585+
let source_table = Arc::new(MemTable::try_new(Arc::clone(&arrow_schema), partitions)?);
586+
ctx.register_table("source_table", source_table)?;
587+
588+
let table_ident = TableIdent::from_strs(vec!["test_namespace", "test_table"])?;
589+
let iceberg_table_provider =
590+
IcebergTableProvider::try_new(catalog.clone(), table_ident).await?;
591+
ctx.register_table("iceberg_table", Arc::new(iceberg_table_provider))?;
592+
593+
let insert_plan = ctx
594+
.sql("INSERT INTO iceberg_table SELECT * FROM source_table")
595+
.await?;
596+
597+
let physical_plan = insert_plan.create_physical_plan().await?;
598+
599+
let actual_plan = format!(
600+
"{}",
601+
datafusion::physical_plan::displayable(physical_plan.as_ref()).indent(false)
602+
);
603+
604+
println!("Physical plan:\n{actual_plan}");
605+
606+
let expected_plan = "\
607+
IcebergCommitExec: table=test_namespace.test_table
608+
CoalescePartitionsExec
609+
IcebergWriteExec: table=test_namespace.test_table
610+
DataSourceExec: partitions=3, partition_sizes=[1, 1, 1]";
611+
612+
assert_eq!(
613+
actual_plan.trim(),
614+
expected_plan.trim(),
615+
"Physical plan does not match expected\n\nExpected:\n{}\n\nActual:\n{}",
616+
expected_plan.trim(),
617+
actual_plan.trim()
618+
);
619+
620+
Ok(())
621+
}
516622
}

crates/integrations/datafusion/src/physical_plan/write.rs

Lines changed: 29 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -144,6 +144,18 @@ impl ExecutionPlan for IcebergWriteExec {
144144
self
145145
}
146146

147+
/// Prevents the introduction of additional `RepartitionExec` and processing input in parallel.
148+
/// This guarantees that the input is processed as a single stream, preserving the order of the data.
149+
fn benefits_from_input_partitioning(&self) -> Vec<bool> {
150+
vec![false]
151+
}
152+
153+
fn maintains_input_order(&self) -> Vec<bool> {
154+
// Maintains ordering in the sense that the written file will reflect
155+
// the ordering of the input.
156+
vec![true; self.children().len()]
157+
}
158+
147159
fn properties(&self) -> &PlanProperties {
148160
&self.plan_properties
149161
}
@@ -284,12 +296,23 @@ impl ExecutionPlan for IcebergWriteExec {
284296
.await
285297
.map_err(to_datafusion_error)?;
286298
let mut input_stream = data;
299+
let mut total_rows = 0;
300+
let mut batch_count = 0;
287301

288302
while let Some(batch) = input_stream.next().await {
289-
writer.write(batch?).await.map_err(to_datafusion_error)?;
303+
batch_count += 1;
304+
let batch = batch?;
305+
let rows = batch.num_rows();
306+
total_rows += rows;
307+
println!("Batch {}: {} rows", batch_count, rows);
308+
println!("Batch schema: {:?}", batch.schema());
309+
writer.write(batch).await.map_err(to_datafusion_error)?;
290310
}
311+
312+
println!("Total: {} batches, {} rows", batch_count, total_rows);
291313

292314
let data_files = writer.close().await.map_err(to_datafusion_error)?;
315+
println!("Wrote {} data files", data_files.len());
293316

294317
// Convert builders to data files and then to JSON strings
295318
let data_files_strs: Vec<String> = data_files
@@ -300,6 +323,8 @@ impl ExecutionPlan for IcebergWriteExec {
300323
})
301324
.collect::<DFResult<Vec<String>>>()?;
302325

326+
println!("Serialized data files: {:?}", data_files_strs);
327+
303328
Self::make_result_batch(data_files_strs)
304329
})
305330
.boxed();
@@ -617,4 +642,6 @@ mod tests {
617642

618643
Ok(())
619644
}
620-
}
645+
646+
647+
}

0 commit comments

Comments
 (0)