Skip to content

Commit a4b686c

Browse files
committed
fix
1 parent 1ae6efa commit a4b686c

File tree

9 files changed

+641
-41
lines changed

9 files changed

+641
-41
lines changed

datafusion/datasource-parquet/src/opener.rs

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -495,11 +495,11 @@ impl FileOpener for ParquetOpener {
495495

496496
// If reverse scanning is enabled, reverse the prepared plan
497497
if reverse_row_groups {
498-
info!("reversing parquet file scan for file {}", file_name);
498+
info!("reversing parquet file scan for file {file_name}");
499499
prepared_plan = prepared_plan.reverse(file_metadata.as_ref())?;
500500
}
501501

502-
info!("parquet file scan for file {}", file_name);
502+
info!("parquet file scan for file {file_name}");
503503

504504
// Apply the prepared plan to the builder
505505
builder = prepared_plan.apply_to_builder(builder);

datafusion/datasource-parquet/src/source.rs

Lines changed: 1 addition & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -55,9 +55,8 @@ use datafusion_physical_plan::SortOrderPushdownResult;
5555

5656
#[cfg(feature = "parquet_encryption")]
5757
use datafusion_execution::parquet_encryption::EncryptionFactory;
58-
use datafusion_physical_expr_common::sort_expr::{LexOrdering, PhysicalSortExpr};
58+
use datafusion_physical_expr_common::sort_expr::PhysicalSortExpr;
5959
use itertools::Itertools;
60-
use log::info;
6160
use object_store::ObjectStore;
6261
#[cfg(feature = "parquet_encryption")]
6362
use parquet::encryption::decrypt::FileDecryptionProperties;
@@ -381,7 +380,6 @@ impl ParquetSource {
381380
self
382381
}
383382

384-
385383
/// Return the value described in [`Self::with_pushdown_filters`]
386384
pub(crate) fn pushdown_filters(&self) -> bool {
387385
self.table_parquet_options.global.pushdown_filters

datafusion/datasource/src/file_scan_config.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -66,7 +66,7 @@ use datafusion_physical_plan::execution_plan::SchedulingType;
6666
use datafusion_physical_plan::projection::{
6767
all_alias_free_columns, new_projections_for_columns,
6868
};
69-
use log::{debug, info, warn};
69+
use log::{debug, warn};
7070
use std::borrow::Cow;
7171
use std::collections::HashMap;
7272
use std::marker::PhantomData;

datafusion/datasource/src/source.rs

Lines changed: 11 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -22,28 +22,28 @@ use std::fmt;
2222
use std::fmt::{Debug, Formatter};
2323
use std::sync::Arc;
2424

25+
use crate::file_scan_config::FileScanConfig;
26+
use datafusion_common::config::ConfigOptions;
27+
use datafusion_common::{Constraints, Result, Statistics};
28+
use datafusion_execution::{SendableRecordBatchStream, TaskContext};
29+
use datafusion_physical_expr::{EquivalenceProperties, Partitioning, PhysicalExpr};
30+
use datafusion_physical_expr_common::sort_expr::{LexOrdering, PhysicalSortExpr};
2531
use datafusion_physical_plan::execution_plan::{
2632
Boundedness, EmissionType, SchedulingType,
2733
};
34+
use datafusion_physical_plan::filter_pushdown::{
35+
ChildPushdownResult, FilterPushdownPhase, FilterPushdownPropagation, PushedDown,
36+
};
2837
use datafusion_physical_plan::metrics::SplitMetrics;
2938
use datafusion_physical_plan::metrics::{ExecutionPlanMetricsSet, MetricsSet};
3039
use datafusion_physical_plan::projection::{ProjectionExec, ProjectionExpr};
3140
use datafusion_physical_plan::stream::BatchSplitStream;
41+
use datafusion_physical_plan::SortOrderPushdownResult;
3242
use datafusion_physical_plan::{
3343
DisplayAs, DisplayFormatType, ExecutionPlan, PlanProperties,
3444
};
3545
use itertools::Itertools;
3646
use log::info;
37-
use crate::file_scan_config::FileScanConfig;
38-
use datafusion_common::config::ConfigOptions;
39-
use datafusion_common::{Constraints, Result, Statistics};
40-
use datafusion_execution::{SendableRecordBatchStream, TaskContext};
41-
use datafusion_physical_expr::{EquivalenceProperties, Partitioning, PhysicalExpr};
42-
use datafusion_physical_expr_common::sort_expr::{LexOrdering, PhysicalSortExpr};
43-
use datafusion_physical_plan::filter_pushdown::{
44-
ChildPushdownResult, FilterPushdownPhase, FilterPushdownPropagation, PushedDown,
45-
};
46-
use datafusion_physical_plan::SortOrderPushdownResult;
4747

4848
/// A source of data, typically a list of files or memory
4949
///
@@ -396,7 +396,7 @@ impl ExecutionPlan for DataSourceExec {
396396
&self,
397397
order: &[PhysicalSortExpr],
398398
) -> Result<SortOrderPushdownResult<Arc<dyn ExecutionPlan>>> {
399-
info!("trying to pushdown sort in DataSource: {:?}", order);
399+
info!("trying to pushdown sort in DataSource: {order:?}");
400400
// Delegate to the data source and wrap result with DataSourceExec
401401
self.data_source
402402
.try_pushdown_sort(order)?

datafusion/physical-optimizer/src/pushdown_sort.rs

Lines changed: 0 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -57,7 +57,6 @@ use datafusion_physical_plan::sorts::sort::SortExec;
5757
use datafusion_physical_plan::ExecutionPlan;
5858
use datafusion_physical_plan::SortOrderPushdownResult;
5959
use std::sync::Arc;
60-
use log::info;
6160

6261
/// A PhysicalOptimizerRule that attempts to push down sort requirements to data sources.
6362
///
@@ -92,8 +91,6 @@ impl PhysicalOptimizerRule for PushdownSort {
9291
let sort_input = Arc::clone(sort_exec.input());
9392
let required_ordering = sort_exec.expr();
9493

95-
info!("trying to pushdown sort: {:?}", required_ordering);
96-
9794
// Try to push the sort requirement down through the plan tree
9895
// Each node type defines its own pushdown behavior via try_pushdown_sort()
9996
match sort_input.try_pushdown_sort(required_ordering)? {
@@ -102,7 +99,6 @@ impl PhysicalOptimizerRule for PushdownSort {
10299
Ok(Transformed::yes(inner))
103100
}
104101
SortOrderPushdownResult::Inexact { inner } => {
105-
info!("inexact pushdown sort : {:?}", required_ordering);
106102
// Data source is optimized for the ordering but not perfectly sorted
107103
// Keep the Sort operator but use the optimized input
108104
// Benefits: TopK queries can terminate early, better cache locality
@@ -115,7 +111,6 @@ impl PhysicalOptimizerRule for PushdownSort {
115111
)))
116112
}
117113
SortOrderPushdownResult::Unsupported => {
118-
info!("unsupported pushdown sort : {:?}", required_ordering);
119114
// Cannot optimize for this ordering - no change
120115
Ok(Transformed::no(plan))
121116
}

datafusion/physical-plan/src/coop.rs

Lines changed: 13 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -79,17 +79,20 @@ use crate::filter_pushdown::{
7979
ChildPushdownResult, FilterDescription, FilterPushdownPhase,
8080
FilterPushdownPropagation,
8181
};
82-
use crate::{DisplayAs, DisplayFormatType, ExecutionPlan, PlanProperties, RecordBatchStream, SendableRecordBatchStream, SortOrderPushdownResult};
82+
use crate::{
83+
DisplayAs, DisplayFormatType, ExecutionPlan, PlanProperties, RecordBatchStream,
84+
SendableRecordBatchStream, SortOrderPushdownResult,
85+
};
8386
use arrow::record_batch::RecordBatch;
8487
use arrow_schema::Schema;
8588
use datafusion_common::{internal_err, Result, Statistics};
8689
use datafusion_execution::TaskContext;
8790

8891
use crate::execution_plan::SchedulingType;
8992
use crate::stream::RecordBatchStreamAdapter;
93+
use datafusion_physical_expr_common::sort_expr::PhysicalSortExpr;
9094
use futures::{Stream, StreamExt};
9195
use log::info;
92-
use datafusion_physical_expr_common::sort_expr::PhysicalSortExpr;
9396

9497
/// A stream that passes record batches through unchanged while cooperating with the Tokio runtime.
9598
/// It consumes cooperative scheduling budget for each returned [`RecordBatch`],
@@ -313,13 +316,14 @@ impl ExecutionPlan for CooperativeExec {
313316
Ok(FilterPushdownPropagation::if_all(child_pushdown_result))
314317
}
315318

316-
fn try_pushdown_sort(&self, order: &[PhysicalSortExpr]) -> Result<SortOrderPushdownResult<Arc<dyn ExecutionPlan>>> {
317-
info!("CooperativeExec trying to pushdown sort: {:?}", order);
318-
self.input
319-
.try_pushdown_sort(order)?
320-
.try_map(|inner| {
321-
Ok(Arc::new(CooperativeExec::new(inner)) as Arc<dyn ExecutionPlan>)
322-
})
319+
fn try_pushdown_sort(
320+
&self,
321+
order: &[PhysicalSortExpr],
322+
) -> Result<SortOrderPushdownResult<Arc<dyn ExecutionPlan>>> {
323+
info!("CooperativeExec trying to pushdown sort: {order:?}");
324+
self.input.try_pushdown_sort(order)?.try_map(|inner| {
325+
Ok(Arc::new(CooperativeExec::new(inner)) as Arc<dyn ExecutionPlan>)
326+
})
323327
}
324328
}
325329

datafusion/physical-plan/src/filter.rs

Lines changed: 21 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -24,7 +24,7 @@ use itertools::Itertools;
2424

2525
use super::{
2626
ColumnStatistics, DisplayAs, ExecutionPlanProperties, PlanProperties,
27-
RecordBatchStream, SendableRecordBatchStream, Statistics,
27+
RecordBatchStream, SendableRecordBatchStream, SortOrderPushdownResult, Statistics,
2828
};
2929
use crate::common::can_project;
3030
use crate::execution_plan::CardinalityEffect;
@@ -63,6 +63,7 @@ use datafusion_physical_expr::{
6363
};
6464

6565
use datafusion_physical_expr_common::physical_expr::fmt_sql;
66+
use datafusion_physical_expr_common::sort_expr::PhysicalSortExpr;
6667
use futures::stream::{Stream, StreamExt};
6768
use log::trace;
6869

@@ -570,6 +571,25 @@ impl ExecutionPlan for FilterExec {
570571
updated_node,
571572
})
572573
}
574+
fn try_pushdown_sort(
575+
&self,
576+
order: &[PhysicalSortExpr],
577+
) -> Result<SortOrderPushdownResult<Arc<dyn ExecutionPlan>>> {
578+
let child = self.input();
579+
match child.try_pushdown_sort(order)? {
580+
SortOrderPushdownResult::Exact { inner } => {
581+
let new_exec = Arc::new(self.clone()).with_new_children(vec![inner])?;
582+
Ok(SortOrderPushdownResult::Exact { inner: new_exec })
583+
}
584+
SortOrderPushdownResult::Inexact { inner } => {
585+
let new_exec = Arc::new(self.clone()).with_new_children(vec![inner])?;
586+
Ok(SortOrderPushdownResult::Inexact { inner: new_exec })
587+
}
588+
SortOrderPushdownResult::Unsupported => {
589+
Ok(SortOrderPushdownResult::Unsupported)
590+
}
591+
}
592+
}
573593
}
574594

575595
impl EmbeddedProjection for FilterExec {

datafusion/physical-plan/src/projection.rs

Lines changed: 13 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -22,7 +22,10 @@
2222
2323
use super::expressions::{Column, Literal};
2424
use super::metrics::{BaselineMetrics, ExecutionPlanMetricsSet, MetricsSet};
25-
use super::{DisplayAs, ExecutionPlanProperties, PlanProperties, RecordBatchStream, SendableRecordBatchStream, SortOrderPushdownResult, Statistics};
25+
use super::{
26+
DisplayAs, ExecutionPlanProperties, PlanProperties, RecordBatchStream,
27+
SendableRecordBatchStream, SortOrderPushdownResult, Statistics,
28+
};
2629
use crate::execution_plan::CardinalityEffect;
2730
use crate::filter_pushdown::{
2831
ChildPushdownResult, FilterDescription, FilterPushdownPhase,
@@ -47,7 +50,9 @@ use datafusion_execution::TaskContext;
4750
use datafusion_physical_expr::equivalence::ProjectionMapping;
4851
use datafusion_physical_expr::utils::collect_columns;
4952
use datafusion_physical_expr_common::physical_expr::{fmt_sql, PhysicalExprRef};
50-
use datafusion_physical_expr_common::sort_expr::{LexOrdering, LexRequirement, PhysicalSortExpr};
53+
use datafusion_physical_expr_common::sort_expr::{
54+
LexOrdering, LexRequirement, PhysicalSortExpr,
55+
};
5156
// Re-exported from datafusion-physical-expr for backwards compatibility
5257
// We recommend updating your imports to use datafusion-physical-expr directly
5358
pub use datafusion_physical_expr::projection::{
@@ -344,19 +349,18 @@ impl ExecutionPlan for ProjectionExec {
344349
Ok(FilterPushdownPropagation::if_all(child_pushdown_result))
345350
}
346351

347-
fn try_pushdown_sort(&self, order: &[PhysicalSortExpr]) -> Result<SortOrderPushdownResult<Arc<dyn ExecutionPlan>>> {
352+
fn try_pushdown_sort(
353+
&self,
354+
order: &[PhysicalSortExpr],
355+
) -> Result<SortOrderPushdownResult<Arc<dyn ExecutionPlan>>> {
348356
let child = self.input();
349-
350-
println!("try pushdown_sort in ProjectionExec: {:?}", order);
351357
match child.try_pushdown_sort(order)? {
352358
SortOrderPushdownResult::Exact { inner } => {
353-
let new_exec = Arc::new(self.clone())
354-
.with_new_children(vec![inner])?;
359+
let new_exec = Arc::new(self.clone()).with_new_children(vec![inner])?;
355360
Ok(SortOrderPushdownResult::Exact { inner: new_exec })
356361
}
357362
SortOrderPushdownResult::Inexact { inner } => {
358-
let new_exec = Arc::new(self.clone())
359-
.with_new_children(vec![inner])?;
363+
let new_exec = Arc::new(self.clone()).with_new_children(vec![inner])?;
360364
Ok(SortOrderPushdownResult::Inexact { inner: new_exec })
361365
}
362366
SortOrderPushdownResult::Unsupported => {

0 commit comments

Comments
 (0)