Skip to content

Commit 95ea145

Browse files
committed
remove downcast
1 parent fd95aaf commit 95ea145

File tree

9 files changed

+109
-49
lines changed

9 files changed

+109
-49
lines changed

datafusion/datasource-parquet/src/opener.rs

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -69,13 +69,13 @@ use parquet::file::metadata::{PageIndexPolicy, ParquetMetaDataReader, RowGroupMe
6969
/// Implements [`FileOpener`] for a parquet file
7070
pub(super) struct ParquetOpener {
7171
/// Execution partition index
72-
pub partition_index: usize,
72+
pub(crate) partition_index: usize,
7373
/// Projection to apply on top of the table schema (i.e. can reference partition columns).
7474
pub projection: ProjectionExprs,
7575
/// Target number of rows in each output RecordBatch
7676
pub batch_size: usize,
7777
/// Optional limit on the number of rows to read
78-
pub limit: Option<usize>,
78+
pub(crate) limit: Option<usize>,
7979
/// If should keep the output rows in order
8080
pub preserve_order: bool,
8181
/// Optional predicate to apply during the scan

datafusion/datasource/src/file_scan_config.rs

Lines changed: 15 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -484,6 +484,9 @@ impl FileScanConfigBuilder {
484484
let file_compression_type =
485485
file_compression_type.unwrap_or(FileCompressionType::UNCOMPRESSED);
486486

487+
// If there is an output ordering, we should preserve it.
488+
let preserve_order = preserve_order || !output_ordering.is_empty();
489+
487490
FileScanConfig {
488491
object_store_url,
489492
file_source,
@@ -869,6 +872,18 @@ impl DataSource for FileScanConfig {
869872
}
870873
}
871874
}
875+
876+
fn with_preserve_order(&self, preserve_order: bool) -> Option<Arc<dyn DataSource>> {
877+
if self.preserve_order == preserve_order {
878+
return Some(Arc::new(self.clone()));
879+
}
880+
881+
let new_config = FileScanConfig {
882+
preserve_order,
883+
..self.clone()
884+
};
885+
Some(Arc::new(new_config))
886+
}
872887
}
873888

874889
impl FileScanConfig {

datafusion/datasource/src/source.rs

Lines changed: 17 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -210,6 +210,11 @@ pub trait DataSource: Send + Sync + Debug {
210210
) -> Result<SortOrderPushdownResult<Arc<dyn DataSource>>> {
211211
Ok(SortOrderPushdownResult::Unsupported)
212212
}
213+
214+
/// Returns a variant of this `DataSource` that is aware of order-sensitivity.
215+
fn with_preserve_order(&self, _preserve_order: bool) -> Option<Arc<dyn DataSource>> {
216+
None
217+
}
213218
}
214219

215220
/// [`ExecutionPlan`] that reads one or more files
@@ -393,6 +398,18 @@ impl ExecutionPlan for DataSourceExec {
393398
Ok(Arc::new(new_exec) as Arc<dyn ExecutionPlan>)
394399
})
395400
}
401+
402+
fn with_preserve_order(
403+
&self,
404+
preserve_order: bool,
405+
) -> Option<Arc<dyn ExecutionPlan>> {
406+
self.data_source
407+
.with_preserve_order(preserve_order)
408+
.map(|new_data_source| {
409+
Arc::new(self.clone().with_data_source(new_data_source))
410+
as Arc<dyn ExecutionPlan>
411+
})
412+
}
396413
}
397414

398415
impl DataSourceExec {

datafusion/physical-optimizer/src/limit_pushdown.rs

Lines changed: 10 additions & 47 deletions
Original file line numberDiff line numberDiff line change
@@ -27,8 +27,6 @@ use datafusion_common::config::ConfigOptions;
2727
use datafusion_common::error::Result;
2828
use datafusion_common::tree_node::{Transformed, TreeNodeRecursion};
2929
use datafusion_common::utils::combine_limit;
30-
use datafusion_datasource::file_scan_config::{FileScanConfig, FileScanConfigBuilder};
31-
use datafusion_datasource::source::DataSourceExec;
3230
use datafusion_physical_plan::coalesce_partitions::CoalescePartitionsExec;
3331
use datafusion_physical_plan::limit::{GlobalLimitExec, LocalLimitExec};
3432
use datafusion_physical_plan::sorts::sort_preserving_merge::SortPreservingMergeExec;
@@ -52,7 +50,7 @@ pub struct GlobalRequirements {
5250
fetch: Option<usize>,
5351
skip: usize,
5452
satisfied: bool,
55-
order_sensitive: bool,
53+
preserve_order: bool,
5654
}
5755

5856
impl LimitPushdown {
@@ -72,7 +70,7 @@ impl PhysicalOptimizerRule for LimitPushdown {
7270
fetch: None,
7371
skip: 0,
7472
satisfied: false,
75-
order_sensitive: false,
73+
preserve_order: false,
7674
};
7775
pushdown_limits(plan, global_state)
7876
}
@@ -116,7 +114,7 @@ impl LimitExec {
116114
}
117115
}
118116

119-
fn order_sensitive(&self) -> bool {
117+
fn preserve_order(&self) -> bool {
120118
match self {
121119
Self::Global(global) => global.required_ordering().is_some(),
122120
Self::Local(local) => local.required_ordering().is_some(),
@@ -156,7 +154,7 @@ pub fn pushdown_limit_helper(
156154
);
157155
global_state.skip = skip;
158156
global_state.fetch = fetch;
159-
global_state.order_sensitive = limit_exec.order_sensitive();
157+
global_state.preserve_order = limit_exec.preserve_order();
160158

161159
// Now the global state has the most recent information, we can remove
162160
// the `LimitExec` plan. We will decide later if we should add it again
@@ -253,21 +251,19 @@ pub fn pushdown_limit_helper(
253251
let maybe_fetchable = pushdown_plan.with_fetch(skip_and_fetch);
254252
if global_state.satisfied {
255253
if let Some(plan_with_fetch) = maybe_fetchable {
256-
let plan_with_preserve_order = ensure_preserve_order_if_needed(
257-
plan_with_fetch,
258-
global_state.order_sensitive,
259-
);
254+
let plan_with_preserve_order = plan_with_fetch
255+
.with_preserve_order(global_state.preserve_order)
256+
.unwrap_or(plan_with_fetch);
260257
Ok((Transformed::yes(plan_with_preserve_order), global_state))
261258
} else {
262259
Ok((Transformed::no(pushdown_plan), global_state))
263260
}
264261
} else {
265262
global_state.satisfied = true;
266263
pushdown_plan = if let Some(plan_with_fetch) = maybe_fetchable {
267-
let plan_with_preserve_order = ensure_preserve_order_if_needed(
268-
plan_with_fetch,
269-
global_state.order_sensitive,
270-
);
264+
let plan_with_preserve_order = plan_with_fetch
265+
.with_preserve_order(global_state.preserve_order)
266+
.unwrap_or(plan_with_fetch);
271267

272268
if global_skip > 0 {
273269
add_global_limit(
@@ -362,37 +358,4 @@ fn add_global_limit(
362358
Arc::new(GlobalLimitExec::new(pushdown_plan, skip, fetch)) as _
363359
}
364360

365-
/// Helper function to handle DataSourceExec preserve_order setting
366-
fn ensure_preserve_order_if_needed(
367-
plan: Arc<dyn ExecutionPlan>,
368-
order_sensitive: bool,
369-
) -> Arc<dyn ExecutionPlan> {
370-
if !order_sensitive {
371-
return plan;
372-
}
373-
374-
let Some(data_source_exec) = plan.as_any().downcast_ref::<DataSourceExec>() else {
375-
return plan;
376-
};
377-
378-
let Some(file_scan_config) = data_source_exec
379-
.data_source()
380-
.as_any()
381-
.downcast_ref::<FileScanConfig>()
382-
else {
383-
return plan;
384-
};
385-
386-
if file_scan_config.preserve_order {
387-
return plan;
388-
}
389-
390-
let new_config = FileScanConfigBuilder::from(file_scan_config.clone())
391-
.with_preserve_order(true)
392-
.build();
393-
394-
let new_data_source_exec = DataSourceExec::new(Arc::new(new_config));
395-
Arc::new(new_data_source_exec) as Arc<dyn ExecutionPlan>
396-
}
397-
398361
// See tests in datafusion/core/tests/physical_optimizer

datafusion/physical-plan/src/coalesce_partitions.rs

Lines changed: 13 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -278,6 +278,19 @@ impl ExecutionPlan for CoalescePartitionsExec {
278278
}))
279279
}
280280

281+
fn with_preserve_order(
282+
&self,
283+
preserve_order: bool,
284+
) -> Option<Arc<dyn ExecutionPlan>> {
285+
self.input
286+
.with_preserve_order(preserve_order)
287+
.and_then(|new_input| {
288+
Arc::new(self.clone())
289+
.with_new_children(vec![new_input])
290+
.ok()
291+
})
292+
}
293+
281294
fn gather_filters_for_pushdown(
282295
&self,
283296
_phase: FilterPushdownPhase,

datafusion/physical-plan/src/execution_plan.rs

Lines changed: 13 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -708,6 +708,19 @@ pub trait ExecutionPlan: Debug + DisplayAs + Send + Sync {
708708
) -> Result<SortOrderPushdownResult<Arc<dyn ExecutionPlan>>> {
709709
Ok(SortOrderPushdownResult::Unsupported)
710710
}
711+
712+
/// Returns a variant of this `ExecutionPlan` that is aware of order-sensitivity.
713+
///
714+
/// This is used to signal to data sources that the output ordering must be
715+
/// preserved, even if it might be more efficient to ignore it (e.g. by
716+
/// skipping some row groups in Parquet).
717+
///
718+
fn with_preserve_order(
719+
&self,
720+
_preserve_order: bool,
721+
) -> Option<Arc<dyn ExecutionPlan>> {
722+
None
723+
}
711724
}
712725

713726
/// [`ExecutionPlan`] Invariant Level

datafusion/physical-plan/src/filter.rs

Lines changed: 13 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -615,6 +615,19 @@ impl ExecutionPlan for FilterExec {
615615
fetch,
616616
}))
617617
}
618+
619+
fn with_preserve_order(
620+
&self,
621+
preserve_order: bool,
622+
) -> Option<Arc<dyn ExecutionPlan>> {
623+
self.input
624+
.with_preserve_order(preserve_order)
625+
.and_then(|new_input| {
626+
Arc::new(self.clone())
627+
.with_new_children(vec![new_input])
628+
.ok()
629+
})
630+
}
618631
}
619632

620633
impl EmbeddedProjection for FilterExec {

datafusion/physical-plan/src/projection.rs

Lines changed: 13 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -427,6 +427,19 @@ impl ExecutionPlan for ProjectionExec {
427427
}
428428
}
429429
}
430+
431+
fn with_preserve_order(
432+
&self,
433+
preserve_order: bool,
434+
) -> Option<Arc<dyn ExecutionPlan>> {
435+
self.input
436+
.with_preserve_order(preserve_order)
437+
.and_then(|new_input| {
438+
Arc::new(self.clone())
439+
.with_new_children(vec![new_input])
440+
.ok()
441+
})
442+
}
430443
}
431444

432445
impl ProjectionStream {

datafusion/physical-plan/src/sorts/sort_preserving_merge.rs

Lines changed: 13 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -245,6 +245,19 @@ impl ExecutionPlan for SortPreservingMergeExec {
245245
}))
246246
}
247247

248+
fn with_preserve_order(
249+
&self,
250+
preserve_order: bool,
251+
) -> Option<Arc<dyn ExecutionPlan>> {
252+
self.input
253+
.with_preserve_order(preserve_order)
254+
.and_then(|new_input| {
255+
Arc::new(self.clone())
256+
.with_new_children(vec![new_input])
257+
.ok()
258+
})
259+
}
260+
248261
fn required_input_distribution(&self) -> Vec<Distribution> {
249262
vec![Distribution::UnspecifiedDistribution]
250263
}

0 commit comments

Comments
 (0)