Skip to content

Commit 88afe82

Browse files
committed
fix partitioning, and fmt ofc
1 parent 77b349b commit 88afe82

File tree

1 file changed

+13
-14
lines changed
  • crates/integrations/datafusion/src/physical_plan

1 file changed

+13
-14
lines changed

crates/integrations/datafusion/src/physical_plan/write.rs

Lines changed: 13 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -27,11 +27,11 @@ use datafusion::arrow::datatypes::{
2727
use datafusion::common::Result as DFResult;
2828
use datafusion::error::DataFusionError;
2929
use datafusion::execution::{SendableRecordBatchStream, TaskContext};
30-
use datafusion::physical_expr::{EquivalenceProperties, Partitioning};
31-
use datafusion::physical_plan::execution_plan::{Boundedness, EmissionType};
30+
use datafusion::physical_expr::EquivalenceProperties;
3231
use datafusion::physical_plan::stream::RecordBatchStreamAdapter;
3332
use datafusion::physical_plan::{
34-
DisplayAs, DisplayFormatType, ExecutionPlan, PlanProperties, execute_input_stream,
33+
DisplayAs, DisplayFormatType, ExecutionPlan, ExecutionPlanProperties, PlanProperties,
34+
execute_input_stream,
3535
};
3636
use futures::StreamExt;
3737
use iceberg::arrow::schema_to_arrow_schema;
@@ -41,10 +41,10 @@ use iceberg::spec::{
4141
};
4242
use iceberg::table::Table;
4343
use iceberg::writer::base_writer::data_file_writer::DataFileWriterBuilder;
44+
use iceberg::writer::file_writer::ParquetWriterBuilder;
4445
use iceberg::writer::file_writer::location_generator::{
4546
DefaultFileNameGenerator, DefaultLocationGenerator,
4647
};
47-
use iceberg::writer::file_writer::ParquetWriterBuilder;
4848
use iceberg::writer::{CurrentFileStatus, IcebergWriter, IcebergWriterBuilder};
4949
use iceberg::{Error, ErrorKind};
5050
use parquet::file::properties::WriterProperties;
@@ -61,7 +61,7 @@ pub(crate) struct IcebergWriteExec {
6161

6262
impl IcebergWriteExec {
6363
pub fn new(table: Table, input: Arc<dyn ExecutionPlan>, schema: ArrowSchemaRef) -> Self {
64-
let plan_properties = Self::compute_properties(schema.clone());
64+
let plan_properties = Self::compute_properties(&input, schema.clone());
6565

6666
Self {
6767
table,
@@ -71,16 +71,15 @@ impl IcebergWriteExec {
7171
}
7272
}
7373

74-
/// todo: Copied from scan.rs
75-
fn compute_properties(schema: ArrowSchemaRef) -> PlanProperties {
76-
// TODO:
77-
// This is more or less a placeholder, to be replaced
78-
// once we support output-partitioning
74+
fn compute_properties(
75+
input: &Arc<dyn ExecutionPlan>,
76+
schema: ArrowSchemaRef,
77+
) -> PlanProperties {
7978
PlanProperties::new(
8079
EquivalenceProperties::new(schema),
81-
Partitioning::UnknownPartitioning(1),
82-
EmissionType::Incremental,
83-
Boundedness::Bounded,
80+
input.output_partitioning().clone(),
81+
input.pipeline_behavior(),
82+
input.boundedness(),
8483
)
8584
}
8685

@@ -101,8 +100,8 @@ impl IcebergWriteExec {
101100
fn make_result_schema() -> ArrowSchemaRef {
102101
// Define a schema.
103102
Arc::new(ArrowSchema::new(vec![
104-
Field::new("data_files", DataType::Utf8, false),
105103
Field::new("count", DataType::UInt64, false),
104+
Field::new("data_files", DataType::Utf8, false),
106105
]))
107106
}
108107
}

0 commit comments

Comments
 (0)