Skip to content

Commit 0b869a6

Browse files
committed
coalesce partitions
1 parent b28f15b commit 0b869a6

File tree

3 files changed

+67
-76
lines changed

3 files changed

+67
-76
lines changed

crates/integrations/datafusion/src/physical_plan/commit.rs

Lines changed: 56 additions & 64 deletions
Original file line numberDiff line numberDiff line change
@@ -29,7 +29,7 @@ use datafusion::physical_expr::{EquivalenceProperties, Partitioning};
2929
use datafusion::physical_plan::execution_plan::{Boundedness, EmissionType};
3030
use datafusion::physical_plan::stream::RecordBatchStreamAdapter;
3131
use datafusion::physical_plan::{
32-
DisplayAs, DisplayFormatType, ExecutionPlan, PlanProperties, execute_stream_partitioned,
32+
DisplayAs, DisplayFormatType, ExecutionPlan, ExecutionPlanProperties, PlanProperties,
3333
};
3434
use futures::StreamExt;
3535
use iceberg::Catalog;
@@ -42,10 +42,11 @@ use crate::to_datafusion_error;
4242

4343
/// IcebergCommitExec is responsible for collecting results from multiple IcebergWriteExec
4444
/// instances and using Transaction::fast_append to commit the data files written.
45+
#[derive(Debug)]
4546
pub(crate) struct IcebergCommitExec {
4647
table: Table,
4748
catalog: Arc<dyn Catalog>,
48-
write_plan: Arc<dyn ExecutionPlan>,
49+
input: Arc<dyn ExecutionPlan>,
4950
schema: ArrowSchemaRef,
5051
count_schema: ArrowSchemaRef,
5152
plan_properties: PlanProperties,
@@ -55,15 +56,15 @@ impl IcebergCommitExec {
5556
pub fn new(
5657
table: Table,
5758
catalog: Arc<dyn Catalog>,
58-
write_plan: Arc<dyn ExecutionPlan>,
59+
input: Arc<dyn ExecutionPlan>,
5960
schema: ArrowSchemaRef,
6061
) -> Self {
6162
let plan_properties = Self::compute_properties(schema.clone());
6263

6364
Self {
6465
table,
6566
catalog,
66-
write_plan,
67+
input,
6768
schema,
6869
count_schema: Self::make_count_schema(),
6970
plan_properties,
@@ -99,12 +100,6 @@ impl IcebergCommitExec {
99100
}
100101
}
101102

102-
impl Debug for IcebergCommitExec {
103-
fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
104-
write!(f, "IcebergCommitExec")
105-
}
106-
}
107-
108103
impl DisplayAs for IcebergCommitExec {
109104
fn fmt_as(&self, t: DisplayFormatType, f: &mut Formatter) -> std::fmt::Result {
110105
match t {
@@ -140,18 +135,18 @@ impl ExecutionPlan for IcebergCommitExec {
140135
}
141136

142137
fn children(&self) -> Vec<&Arc<dyn ExecutionPlan>> {
143-
vec![&self.write_plan]
138+
vec![&self.input]
144139
}
145140

146141
fn with_new_children(
147142
self: Arc<Self>,
148143
children: Vec<Arc<dyn ExecutionPlan>>,
149144
) -> DFResult<Arc<dyn ExecutionPlan>> {
150145
if children.len() != 1 {
151-
return Err(DataFusionError::Internal(
152-
"IcebergCommitExec expects exactly one child, but provided {children.len()}"
153-
.to_string(),
154-
));
146+
return Err(DataFusionError::Internal(format!(
147+
"IcebergCommitExec expects exactly one child, but provided {}",
148+
children.len()
149+
)));
155150
}
156151

157152
Ok(Arc::new(IcebergCommitExec::new(
@@ -176,7 +171,7 @@ impl ExecutionPlan for IcebergCommitExec {
176171
}
177172

178173
let table = self.table.clone();
179-
let input_plan = self.write_plan.clone();
174+
let input_plan = self.input.clone();
180175
let count_schema = Arc::clone(&self.count_schema);
181176

182177
// todo revisit this
@@ -191,54 +186,51 @@ impl ExecutionPlan for IcebergCommitExec {
191186
let mut data_files: Vec<DataFile> = Vec::new();
192187
let mut total_record_count: u64 = 0;
193188

194-
// Execute and collect results from all partitions of the input plan
195-
let batches = execute_stream_partitioned(input_plan, context)?;
196-
197-
// Collect all data files from this partition's stream
198-
for mut batch_stream in batches {
199-
while let Some(batch_result) = batch_stream.next().await {
200-
let batch = batch_result?;
201-
202-
let files_array = batch
203-
.column_by_name(DATA_FILES_COL_NAME)
204-
.ok_or_else(|| {
205-
DataFusionError::Internal(
206-
"Expected 'data_files' column in input batch".to_string(),
207-
)
208-
})?
209-
.as_any()
210-
.downcast_ref::<StringArray>()
211-
.ok_or_else(|| {
212-
DataFusionError::Internal(
213-
"Expected 'data_files' column to be StringArray".to_string(),
214-
)
215-
})?;
216-
217-
// todo remove log
218-
println!("files_array to deserialize: {:?}", files_array);
219-
220-
// Deserialize all data files from the StringArray
221-
let batch_files: Vec<DataFile> = files_array
222-
.into_iter()
223-
.flatten()
224-
.map(|f| -> DFResult<DataFile> {
225-
// Parse JSON to DataFileSerde and convert to DataFile
226-
deserialize_data_file_from_json(
227-
f,
228-
spec_id,
229-
&partition_type,
230-
&current_schema,
231-
)
232-
.map_err(to_datafusion_error)
233-
})
234-
.collect::<datafusion::common::Result<_>>()?;
235-
236-
// add record_counts from the current batch to total record count
237-
total_record_count += batch_files.iter().map(|f| f.record_count()).sum::<u64>();
238-
239-
// Add all deserialized files to our collection
240-
data_files.extend(batch_files);
241-
}
189+
// Execute and collect results from the input coalesced plan
190+
let mut batch_stream = input_plan.execute(0, context)?;
191+
192+
while let Some(batch_result) = batch_stream.next().await {
193+
let batch = batch_result?;
194+
195+
let files_array = batch
196+
.column_by_name(DATA_FILES_COL_NAME)
197+
.ok_or_else(|| {
198+
DataFusionError::Internal(
199+
"Expected 'data_files' column in input batch".to_string(),
200+
)
201+
})?
202+
.as_any()
203+
.downcast_ref::<StringArray>()
204+
.ok_or_else(|| {
205+
DataFusionError::Internal(
206+
"Expected 'data_files' column to be StringArray".to_string(),
207+
)
208+
})?;
209+
210+
// todo remove log
211+
println!("files_array to deserialize: {:?}", files_array);
212+
213+
// Deserialize all data files from the StringArray
214+
let batch_files: Vec<DataFile> = files_array
215+
.into_iter()
216+
.flatten()
217+
.map(|f| -> DFResult<DataFile> {
218+
// Parse JSON to DataFileSerde and convert to DataFile
219+
deserialize_data_file_from_json(
220+
f,
221+
spec_id,
222+
&partition_type,
223+
&current_schema,
224+
)
225+
.map_err(to_datafusion_error)
226+
})
227+
.collect::<datafusion::common::Result<_>>()?;
228+
229+
// add record_counts from the current batch to total record count
230+
total_record_count += batch_files.iter().map(|f| f.record_count()).sum::<u64>();
231+
232+
// Add all deserialized files to our collection
233+
data_files.extend(batch_files);
242234
}
243235

244236
// If no data files were collected, return an empty result

crates/integrations/datafusion/src/physical_plan/write.rs

Lines changed: 7 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -37,8 +37,8 @@ use datafusion::physical_plan::{
3737
use futures::StreamExt;
3838
use iceberg::arrow::schema_to_arrow_schema;
3939
use iceberg::spec::{
40-
DataFileFormat, PROPERTY_DEFAULT_FILE_FORMAT,
41-
PROPERTY_DEFAULT_FILE_FORMAT_DEFAULT, serialize_data_file_to_json,
40+
DataFileFormat, PROPERTY_DEFAULT_FILE_FORMAT, PROPERTY_DEFAULT_FILE_FORMAT_DEFAULT,
41+
serialize_data_file_to_json,
4242
};
4343
use iceberg::table::Table;
4444
use iceberg::writer::base_writer::data_file_writer::DataFileWriterBuilder;
@@ -54,6 +54,7 @@ use uuid::Uuid;
5454
use crate::physical_plan::DATA_FILES_COL_NAME;
5555
use crate::to_datafusion_error;
5656

57+
#[derive(Debug)]
5758
pub(crate) struct IcebergWriteExec {
5859
table: Table,
5960
input: Arc<dyn ExecutionPlan>,
@@ -104,12 +105,6 @@ impl IcebergWriteExec {
104105
}
105106
}
106107

107-
impl Debug for IcebergWriteExec {
108-
fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
109-
write!(f, "IcebergWriteExec")
110-
}
111-
}
112-
113108
impl DisplayAs for IcebergWriteExec {
114109
fn fmt_as(&self, t: DisplayFormatType, f: &mut Formatter) -> std::fmt::Result {
115110
match t {
@@ -153,9 +148,10 @@ impl ExecutionPlan for IcebergWriteExec {
153148
children: Vec<Arc<dyn ExecutionPlan>>,
154149
) -> DFResult<Arc<dyn ExecutionPlan>> {
155150
if children.len() != 1 {
156-
return Err(DataFusionError::Internal(
157-
"IcebergWriteExec expects exactly one child, but provided {} ".to_string(),
158-
));
151+
return Err(DataFusionError::Internal(format!(
152+
"IcebergWriteExec expects exactly one child, but provided {}",
153+
children.len()
154+
)));
159155
}
160156

161157
Ok(Arc::new(Self::new(

crates/integrations/datafusion/src/table/mod.rs

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -30,6 +30,7 @@ use datafusion::error::Result as DFResult;
3030
use datafusion::logical_expr::dml::InsertOp;
3131
use datafusion::logical_expr::{Expr, TableProviderFilterPushDown};
3232
use datafusion::physical_plan::ExecutionPlan;
33+
use datafusion::physical_plan::coalesce_partitions::CoalescePartitionsExec;
3334
use iceberg::arrow::schema_to_arrow_schema;
3435
use iceberg::inspect::MetadataTableType;
3536
use iceberg::table::Table;
@@ -198,10 +199,12 @@ impl TableProvider for IcebergTableProvider {
198199
self.schema.clone(),
199200
));
200201

202+
let coalesce_partitions = Arc::new(CoalescePartitionsExec::new(write_plan));
203+
201204
Ok(Arc::new(IcebergCommitExec::new(
202205
self.table.clone(),
203206
catalog,
204-
write_plan,
207+
coalesce_partitions,
205208
self.schema.clone(),
206209
)))
207210
}

0 commit comments

Comments
 (0)