Skip to content

Commit 392ad1a

Browse files
committed
fix fmt, input boundedness
1 parent 22d14bf commit 392ad1a

File tree

4 files changed

+8
-4
lines changed

4 files changed

+8
-4
lines changed

crates/iceberg/src/arrow/nan_val_cnt_visitor.rs

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -159,6 +159,7 @@ impl NanValueCountVisitor {
159159
let arrow_arr_partner_accessor = ArrowArrayAccessor {};
160160

161161
let struct_arr = Arc::new(StructArray::from(batch)) as ArrayRef;
162+
// todo remove these log lines
162163
println!("----StructArray from record stream: {:?}", struct_arr);
163164
println!("----Schema.as_struct from table: {:?}", schema.as_struct());
164165
visit_struct_with_partner(

crates/iceberg/src/arrow/value.rs

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -457,6 +457,7 @@ impl PartnerAccessor<ArrayRef> for ArrowArrayAccessor {
457457
)
458458
})?;
459459

460+
// todo remove unneeded log lines
460461
println!(
461462
"!!!Accessor struct array from struct partner: {:?}",
462463
struct_array

crates/integrations/datafusion/src/physical_plan/commit.rs

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -74,7 +74,7 @@ impl IcebergCommitExec {
7474
PlanProperties::new(
7575
EquivalenceProperties::new(schema),
7676
Partitioning::UnknownPartitioning(1),
77-
EmissionType::Incremental,
77+
EmissionType::Final,
7878
Boundedness::Bounded,
7979
)
8080
}
@@ -148,7 +148,8 @@ impl ExecutionPlan for IcebergCommitExec {
148148
) -> DFResult<Arc<dyn ExecutionPlan>> {
149149
if children.len() != 1 {
150150
return Err(DataFusionError::Internal(
151-
"IcebergCommitExec expects exactly one child, but provided {children.len()}".to_string(),
151+
"IcebergCommitExec expects exactly one child, but provided {children.len()}"
152+
.to_string(),
152153
));
153154
}
154155

crates/integrations/datafusion/src/physical_plan/write.rs

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -28,6 +28,7 @@ use datafusion::common::Result as DFResult;
2828
use datafusion::error::DataFusionError;
2929
use datafusion::execution::{SendableRecordBatchStream, TaskContext};
3030
use datafusion::physical_expr::{EquivalenceProperties, Partitioning};
31+
use datafusion::physical_plan::execution_plan::{Boundedness, EmissionType};
3132
use datafusion::physical_plan::stream::RecordBatchStreamAdapter;
3233
use datafusion::physical_plan::{
3334
DisplayAs, DisplayFormatType, ExecutionPlan, ExecutionPlanProperties, PlanProperties,
@@ -78,8 +79,8 @@ impl IcebergWriteExec {
7879
PlanProperties::new(
7980
EquivalenceProperties::new(schema),
8081
Partitioning::UnknownPartitioning(input.output_partitioning().partition_count()),
81-
input.pipeline_behavior(),
82-
input.boundedness(),
82+
EmissionType::Final,
83+
Boundedness::Bounded,
8384
)
8485
}
8586

0 commit comments

Comments
 (0)