Skip to content

Commit 7db9432

Browse files
committed
implement children and with_new_children for write node, fix fmt
1 parent 92588f5 commit 7db9432

File tree

2 files changed

+25
-10
lines changed

2 files changed

+25
-10
lines changed

crates/integrations/datafusion/src/physical_plan/write.rs

Lines changed: 16 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -27,7 +27,7 @@ use datafusion::arrow::datatypes::{
2727
use datafusion::common::Result as DFResult;
2828
use datafusion::error::DataFusionError;
2929
use datafusion::execution::{SendableRecordBatchStream, TaskContext};
30-
use datafusion::physical_expr::EquivalenceProperties;
30+
use datafusion::physical_expr::{EquivalenceProperties, Partitioning};
3131
use datafusion::physical_plan::stream::RecordBatchStreamAdapter;
3232
use datafusion::physical_plan::{
3333
DisplayAs, DisplayFormatType, ExecutionPlan, ExecutionPlanProperties, PlanProperties,
@@ -61,7 +61,7 @@ pub(crate) struct IcebergWriteExec {
6161

6262
impl IcebergWriteExec {
6363
pub fn new(table: Table, input: Arc<dyn ExecutionPlan>, schema: ArrowSchemaRef) -> Self {
64-
let plan_properties = Self::compute_properties(&input, schema.clone());
64+
let plan_properties = Self::compute_properties(&input, schema);
6565

6666
Self {
6767
table,
@@ -77,7 +77,7 @@ impl IcebergWriteExec {
7777
) -> PlanProperties {
7878
PlanProperties::new(
7979
EquivalenceProperties::new(schema),
80-
input.output_partitioning().clone(),
80+
Partitioning::UnknownPartitioning(input.output_partitioning().partition_count()),
8181
input.pipeline_behavior(),
8282
input.boundedness(),
8383
)
@@ -147,14 +147,24 @@ impl ExecutionPlan for IcebergWriteExec {
147147
}
148148

149149
fn children(&self) -> Vec<&Arc<dyn ExecutionPlan>> {
150-
vec![]
150+
vec![&self.input]
151151
}
152152

153153
fn with_new_children(
154154
self: Arc<Self>,
155-
_children: Vec<Arc<dyn ExecutionPlan>>,
155+
children: Vec<Arc<dyn ExecutionPlan>>,
156156
) -> DFResult<Arc<dyn ExecutionPlan>> {
157-
Ok(self)
157+
if children.len() != 1 {
158+
return Err(DataFusionError::Internal(
159+
"IcebergWriteExec expects exactly one child".to_string(),
160+
));
161+
}
162+
163+
Ok(Arc::new(Self::new(
164+
self.table.clone(),
165+
Arc::clone(&children[0]),
166+
self.schema(),
167+
)))
158168
}
159169

160170
fn execute(

crates/integrations/datafusion/src/table/mod.rs

Lines changed: 9 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -162,8 +162,7 @@ impl TableProvider for IcebergTableProvider {
162162
fn supports_filters_pushdown(
163163
&self,
164164
filters: &[&Expr],
165-
) -> std::result::Result<Vec<TableProviderFilterPushDown>, DataFusionError>
166-
{
165+
) -> std::result::Result<Vec<TableProviderFilterPushDown>, DataFusionError> {
167166
// Push down all filters, as a single source of truth, the scanner will drop the filters which couldn't be push down
168167
Ok(vec![TableProviderFilterPushDown::Inexact; filters.len()])
169168
}
@@ -174,10 +173,16 @@ impl TableProvider for IcebergTableProvider {
174173
input: Arc<dyn ExecutionPlan>,
175174
_insert_op: InsertOp,
176175
) -> DFResult<Arc<dyn ExecutionPlan>> {
177-
if !self.table.metadata().default_partition_spec().is_unpartitioned() {
176+
if !self
177+
.table
178+
.metadata()
179+
.default_partition_spec()
180+
.is_unpartitioned()
181+
{
178182
// TODO add insert into support for partitioned tables
179183
return Err(DataFusionError::NotImplemented(
180-
"IcebergTableProvider::insert_into does not support partitioned tables yet".to_string()
184+
"IcebergTableProvider::insert_into does not support partitioned tables yet"
185+
.to_string(),
181186
));
182187
}
183188

0 commit comments

Comments
 (0)