Skip to content

Commit 1de3315

Browse files
authored
fix: ensure CoalescePartitionsExec is enabled for IcebergCommitExec (#1723)
## Which issue does this PR close? PR fixes partial writes similar to reported [here](spiceai/spiceai#7407). Despite the following code to enforce `CoalescePartitionsExec` (single input behavior) it can be removed by DataFusion optimizer. Unit test was added to demonstrate such behavior. https://github.com/apache/iceberg-rust/blob/dc349284a4204c1a56af47fb3177ace6f9e899a0/crates/integrations/datafusion/src/table/mod.rs#L196-L210 ```rust let write_plan = Arc::new(IcebergWriteExec::new( self.table.clone(), input, self.schema.clone(), )); // Merge the outputs of write_plan into one so we can commit all files together let coalesce_partitions = Arc::new(CoalescePartitionsExec::new(write_plan)); Ok(Arc::new(IcebergCommitExec::new( self.table.clone(), catalog, coalesce_partitions, self.schema.clone(), ))) ``` Example plan (observe no `CoalescePartitionsExec`) ```shell explain format tree insert into task_history_sink select * from runtime.task_history; +---------------+-------------------------------+ | plan_type | plan | +---------------+-------------------------------+ | physical_plan | ┌───────────────────────────┐ | | | │ IcebergCommitExec │ | | | │ -------------------- │ | | | │ IcebergCommitExec: table: │ | | | │ team_app.task_history │ | | | └─────────────┬─────────────┘ | | | ┌─────────────┴─────────────┐ | | | │ IcebergWriteExec │ | | | │ -------------------- │ | | | │ IcebergWriteExec: table: │ | | | │ team_app.task_history │ | | | └─────────────┬─────────────┘ | | | ┌─────────────┴─────────────┐ | | | │ ProjectionExec │ | | | │ -------------------- │ | | | │ captured_output: │ | | | │ captured_output │ | | | │ │ | | | │ end_time: │ | | | │ CAST(end_time AS Timestamp│ | | | │ (Microsecond, None)) │ | | | │ │ | | | │ error_message: │ | | | │ error_message │ | | | │ │ | | | │ execution_duration_ms: │ | | | │ execution_duration_ms │ | | | │ │ | | | │ input: input │ | | | │ │ | | | │ labels: │ | | | │ CAST(labels AS Map(Field {│ | | | │ name: "key_value", │ | | | │ data_type: Struct( │ | | | │ [Field { name: "key", │ | | | │ data_type: Utf8, │ | | | │ nullable: false, │ | | | │ dict_id: 0, │ | | | │ dict_is_ordered │ | | | │ : false, metadata: { │ | | | │ "PARQUET:field_id": │ | | | │ "12"} }, Field { name: │ | | | │ "value", data_type: │ | | | │ Utf8, nullable: true │ | | | │ ... │ | | | └─────────────┬─────────────┘ | | | ┌─────────────┴─────────────┐ | | | │ RepartitionExec │ | | | │ -------------------- │ | | | │ partition_count(in->out): │ | | | │ 1 -> 14 │ | | | │ │ | | | │ partitioning_scheme: │ | | | │ RoundRobinBatch(14) │ | | | └─────────────┬─────────────┘ | | | ┌─────────────┴─────────────┐ | | | │ BytesProcessedExec │ | | | │ -------------------- │ | | | │ BytesProcessedExec │ | | | └─────────────┬─────────────┘ | | | ┌─────────────┴─────────────┐ | | | │ SchemaCastScanExec │ | | | │ -------------------- │ | | | │ SchemaCastScanExec │ | | | └─────────────┬─────────────┘ | | | ┌─────────────┴─────────────┐ | | | │ DataSourceExec │ | | | │ -------------------- │ | | | │ bytes: 88176 │ | | | │ format: memory │ | | | │ rows: 6 │ | | | └───────────────────────────┘ | | | | +---------------+-------------------------------+ ``` ## What changes are included in this PR? PR adds `required_input_distribution` setting for `IcebergWriteExec` to ensure DataFusion coalesces input partitions automatically before commit. Similar to [DataFusion DataSinkExec](https://github.com/apache/datafusion/blob/a7b113c45509aae34595b6a62469b3173cac91bd/datafusion/datasource/src/sink.rs#L187) `test_datafusion_execution_partitioned_source` can be used to ovserve behavior before and after Before ```rust Physical plan: IcebergCommitExec: table=test_namespace.test_table_partitioning RepartitionExec: partitioning=RoundRobinBatch(8), input_partitions=3 IcebergWriteExec: table=test_namespace.test_table_partitioning DataSourceExec: partitions=3, partition_sizes=[1, 1, 1] ``` After ```rust IcebergCommitExec: table=test_namespace.test_table CoalescePartitionsExec IcebergWriteExec: table=test_namespace.test_table DataSourceExec: partitions=3, partition_sizes=[1, 1, 1] ``` ## Are these changes tested? Added `test_datafusion_execution_partitioned_source` unit test, tested manually
1 parent 05d9122 commit 1de3315

File tree

2 files changed

+120
-1
lines changed

2 files changed

+120
-1
lines changed

crates/integrations/datafusion/src/physical_plan/commit.rs

Lines changed: 110 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -136,6 +136,14 @@ impl ExecutionPlan for IcebergCommitExec {
136136
vec![&self.input]
137137
}
138138

139+
fn required_input_distribution(&self) -> Vec<datafusion::physical_plan::Distribution> {
140+
vec![datafusion::physical_plan::Distribution::SinglePartition; self.children().len()]
141+
}
142+
143+
fn benefits_from_input_partitioning(&self) -> Vec<bool> {
144+
vec![false]
145+
}
146+
139147
fn with_new_children(
140148
self: Arc<Self>,
141149
children: Vec<Arc<dyn ExecutionPlan>>,
@@ -262,14 +270,16 @@ mod tests {
262270
use std::fmt;
263271
use std::sync::Arc;
264272

265-
use datafusion::arrow::array::{ArrayRef, RecordBatch, StringArray};
273+
use datafusion::arrow::array::{ArrayRef, Int32Array, RecordBatch, StringArray, UInt64Array};
266274
use datafusion::arrow::datatypes::{DataType, Field, Schema as ArrowSchema};
275+
use datafusion::datasource::MemTable;
267276
use datafusion::execution::context::TaskContext;
268277
use datafusion::physical_expr::{EquivalenceProperties, Partitioning};
269278
use datafusion::physical_plan::common::collect;
270279
use datafusion::physical_plan::execution_plan::Boundedness;
271280
use datafusion::physical_plan::stream::RecordBatchStreamAdapter;
272281
use datafusion::physical_plan::{DisplayAs, DisplayFormatType, ExecutionPlan, PlanProperties};
282+
use datafusion::prelude::*;
273283
use futures::StreamExt;
274284
use iceberg::memory::{MEMORY_CATALOG_WAREHOUSE, MemoryCatalogBuilder};
275285
use iceberg::spec::{
@@ -280,6 +290,7 @@ mod tests {
280290

281291
use super::*;
282292
use crate::physical_plan::DATA_FILES_COL_NAME;
293+
use crate::table::IcebergTableProvider;
283294

284295
// A mock execution plan that returns record batches with serialized data files
285296
#[derive(Debug)]
@@ -510,4 +521,102 @@ mod tests {
510521

511522
Ok(())
512523
}
524+
525+
#[tokio::test]
526+
async fn test_datafusion_execution_partitioned_source() -> Result<(), Box<dyn std::error::Error>>
527+
{
528+
let catalog = Arc::new(
529+
MemoryCatalogBuilder::default()
530+
.load(
531+
"memory",
532+
HashMap::from([(
533+
MEMORY_CATALOG_WAREHOUSE.to_string(),
534+
"memory://root".to_string(),
535+
)]),
536+
)
537+
.await?,
538+
);
539+
540+
let namespace = NamespaceIdent::new("test_namespace".to_string());
541+
catalog.create_namespace(&namespace, HashMap::new()).await?;
542+
543+
let schema = Schema::builder()
544+
.with_schema_id(1)
545+
.with_fields(vec![
546+
NestedField::required(1, "id", Type::Primitive(PrimitiveType::Int)).into(),
547+
NestedField::required(2, "name", Type::Primitive(PrimitiveType::String)).into(),
548+
])
549+
.build()?;
550+
551+
let table_name = "test_table";
552+
let table_creation = TableCreation::builder()
553+
.name(table_name.to_string())
554+
.schema(schema)
555+
.location("memory://root/test_table".to_string())
556+
.properties(HashMap::new())
557+
.build();
558+
let _ = catalog.create_table(&namespace, table_creation).await?;
559+
560+
let arrow_schema = Arc::new(ArrowSchema::new(vec![
561+
Field::new("id", DataType::Int32, false),
562+
Field::new("name", DataType::Utf8, false),
563+
]));
564+
565+
let batches: Vec<RecordBatch> = (1..4)
566+
.map(|idx| {
567+
RecordBatch::try_new(arrow_schema.clone(), vec![
568+
Arc::new(Int32Array::from(vec![idx])) as ArrayRef,
569+
Arc::new(StringArray::from(vec![format!("Name{idx}")])) as ArrayRef,
570+
])
571+
})
572+
.collect::<Result<_, _>>()?;
573+
574+
// Create DataFusion context with specific partition configuration
575+
let mut config = SessionConfig::new();
576+
config = config.set_usize("datafusion.execution.target_partitions", 8);
577+
let ctx = SessionContext::new_with_config(config);
578+
579+
// Create multiple partitions - each batch becomes a separate partition
580+
let partitions: Vec<Vec<RecordBatch>> =
581+
batches.into_iter().map(|batch| vec![batch]).collect();
582+
let source_table = Arc::new(MemTable::try_new(Arc::clone(&arrow_schema), partitions)?);
583+
ctx.register_table("source_table", source_table)?;
584+
585+
let iceberg_table_provider = IcebergTableProvider::try_new(
586+
catalog.clone(),
587+
namespace.clone(),
588+
table_name.to_string(),
589+
)
590+
.await?;
591+
ctx.register_table("iceberg_table", Arc::new(iceberg_table_provider))?;
592+
593+
let insert_plan = ctx
594+
.sql("INSERT INTO iceberg_table SELECT * FROM source_table")
595+
.await?;
596+
597+
let physical_plan = insert_plan.create_physical_plan().await?;
598+
599+
let actual_plan = format!(
600+
"{}",
601+
datafusion::physical_plan::displayable(physical_plan.as_ref()).indent(false)
602+
);
603+
604+
println!("Physical plan:\n{actual_plan}");
605+
606+
let expected_plan = "\
607+
IcebergCommitExec: table=test_namespace.test_table
608+
CoalescePartitionsExec
609+
IcebergWriteExec: table=test_namespace.test_table
610+
DataSourceExec: partitions=3, partition_sizes=[1, 1, 1]";
611+
612+
assert_eq!(
613+
actual_plan.trim(),
614+
expected_plan.trim(),
615+
"Physical plan does not match expected\n\nExpected:\n{}\n\nActual:\n{}",
616+
expected_plan.trim(),
617+
actual_plan.trim()
618+
);
619+
620+
Ok(())
621+
}
513622
}

crates/integrations/datafusion/src/physical_plan/write.rs

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -144,6 +144,16 @@ impl ExecutionPlan for IcebergWriteExec {
144144
self
145145
}
146146

147+
/// Prevents the introduction of additional `RepartitionExec` and processing input in parallel.
148+
fn benefits_from_input_partitioning(&self) -> Vec<bool> {
149+
vec![false]
150+
}
151+
152+
fn maintains_input_order(&self) -> Vec<bool> {
153+
// Maintains ordering in the sense that the written file will reflect the ordering of the input.
154+
vec![true; self.children().len()]
155+
}
156+
147157
fn properties(&self) -> &PlanProperties {
148158
&self.plan_properties
149159
}

0 commit comments

Comments
 (0)