Skip to content

Commit d97273c

Browse files
committed
chore(cubestore): Upgrade DF 46: Pass customizer more completely and avoid ParquetExec
1 parent 8928842 commit d97273c

File tree

9 files changed

+134
-94
lines changed

9 files changed

+134
-94
lines changed

rust/cubestore/Cargo.lock

Lines changed: 26 additions & 26 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

rust/cubestore/cubestore/src/queryplanner/mod.rs

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -264,6 +264,7 @@ impl QueryPlannerImpl {
264264
}
265265

266266
pub fn make_execution_context() -> SessionContext {
267+
// TODO upgrade DF: Remove this -- use metadata_cache_factory.make_session_config()
267268
Self::execution_context_helper(SessionConfig::new())
268269
}
269270

rust/cubestore/cubestore/src/queryplanner/optimizations/check_memory.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -14,7 +14,7 @@ pub fn add_check_memory_exec(
1414
mem_handler: Arc<dyn MemoryHandler>,
1515
) -> Result<Arc<dyn ExecutionPlan>, DataFusionError> {
1616
let p_any = p.as_any();
17-
// TODO upgrade DF: Do we use ParquetExec? Or just DataSourceExec? It's fine to have both here.
17+
// We supposedly don't use ParquetExec, which is deprecated in DF 46, anymore but we keep the check here in case we do.
1818
if p_any.is::<DataSourceExec>() || p_any.is::<ParquetExec>() || p_any.is::<MemoryExec>() || p_any.is::<ClusterSendExec>() {
1919
let memory_check = Arc::new(CheckMemoryExec::new(p, mem_handler.clone()));
2020
Ok(memory_check)

rust/cubestore/cubestore/src/queryplanner/optimizations/mod.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -169,7 +169,7 @@ fn finalize_physical_plan(
169169
let p = rewrite_physical_plan(p, &mut |p| add_check_memory_exec(p, memory_handler.clone()))?;
170170
let p = if let Some(data_loaded_size) = data_loaded_size {
171171
rewrite_physical_plan(p, &mut |p| {
172-
add_trace_data_loaded_exec(p, data_loaded_size.clone())
172+
add_trace_data_loaded_exec(p, &data_loaded_size)
173173
})?
174174
} else {
175175
p
Lines changed: 18 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -1,19 +1,30 @@
11
use crate::queryplanner::trace_data_loaded::{DataLoadedSize, TraceDataLoadedExec};
2-
use datafusion::datasource::physical_plan::ParquetExec;
2+
use datafusion::datasource::physical_plan::{ParquetExec, ParquetSource};
33
use datafusion::error::DataFusionError;
44
use datafusion::physical_plan::ExecutionPlan;
5+
use datafusion_datasource::file_scan_config::FileScanConfig;
6+
use datafusion_datasource::source::DataSourceExec;
57
use std::sync::Arc;
68

7-
/// Add `TraceDataLoadedExec` behind ParquetExec nodes.
9+
/// Add `TraceDataLoadedExec` behind ParquetExec or DataSourceExec (with File hence Parquet source) nodes.
810
pub fn add_trace_data_loaded_exec(
911
p: Arc<dyn ExecutionPlan>,
10-
data_loaded_size: Arc<DataLoadedSize>,
12+
data_loaded_size: &Arc<DataLoadedSize>,
1113
) -> Result<Arc<dyn ExecutionPlan>, DataFusionError> {
14+
fn do_wrap(p: Arc<dyn ExecutionPlan>, data_loaded_size: &Arc<DataLoadedSize>) -> Result<Arc<dyn ExecutionPlan>, DataFusionError> {
15+
Ok(Arc::new(TraceDataLoadedExec::new(p, data_loaded_size.clone())))
16+
}
17+
1218
let p_any = p.as_any();
1319
if p_any.is::<ParquetExec>() {
14-
let trace_data_loaded = Arc::new(TraceDataLoadedExec::new(p, data_loaded_size.clone()));
15-
Ok(trace_data_loaded)
16-
} else {
17-
Ok(p)
20+
// ParquetExec is deprecated in DF 46 and we don't use it; we shouldn't hit this case, but we keep it just in case.
21+
return do_wrap(p, data_loaded_size);
22+
} else if let Some(dse) = p_any.downcast_ref::<DataSourceExec>() {
23+
if let Some(file_scan) = dse.data_source().as_any().downcast_ref::<FileScanConfig>() {
24+
if file_scan.file_source().as_any().is::<ParquetSource>() {
25+
return do_wrap(p, data_loaded_size);
26+
}
27+
}
1828
}
29+
Ok(p)
1930
}

rust/cubestore/cubestore/src/queryplanner/pretty_printers.rs

Lines changed: 21 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -4,7 +4,7 @@ use bigdecimal::ToPrimitive;
44
use datafusion::arrow::datatypes::Schema;
55
use datafusion::common::tree_node::{TreeNode, TreeNodeRecursion, TreeNodeVisitor};
66
use datafusion::common::DFSchema;
7-
use datafusion::datasource::physical_plan::ParquetExec;
7+
use datafusion::datasource::physical_plan::{ParquetExec, ParquetSource};
88
use datafusion::datasource::{DefaultTableSource, TableProvider};
99
use datafusion::error::DataFusionError;
1010
use datafusion::logical_expr::{
@@ -15,9 +15,11 @@ use datafusion::physical_plan::aggregates::{AggregateExec, AggregateMode};
1515
use datafusion::physical_plan::coalesce_partitions::CoalescePartitionsExec;
1616
use datafusion::physical_plan::filter::FilterExec;
1717
use datafusion::physical_plan::limit::{GlobalLimitExec, LocalLimitExec};
18-
use datafusion::physical_plan::{ExecutionPlan, InputOrderMode, PlanProperties};
18+
use datafusion::physical_plan::{DefaultDisplay, DisplayAs, DisplayFormatType, ExecutionPlan, InputOrderMode, PlanProperties};
1919
use datafusion::prelude::Expr;
20+
use datafusion_datasource::file_scan_config::FileScanConfig;
2021
use datafusion_datasource::memory::MemoryExec;
22+
use datafusion_datasource::source::DataSourceExec;
2123
use itertools::{repeat_n, Itertools};
2224
use std::sync::Arc;
2325

@@ -656,15 +658,31 @@ fn pp_phys_plan_indented(p: &dyn ExecutionPlan, indent: usize, o: &PPOptions, ou
656658
} else if let Some(_) = a.downcast_ref::<FilterByKeyRangeExec>() {
657659
*out += "FilterByKeyRange";
658660
} else if let Some(p) = a.downcast_ref::<ParquetExec>() {
661+
// We don't use ParquetExec any more.
659662
*out += &format!(
660-
"ParquetScan, files: {}",
663+
"ParquetExec (ERROR: deprecated), files: {}",
661664
p.base_config()
662665
.file_groups
663666
.iter()
664667
.flatten()
665668
.map(|p| p.object_meta.location.to_string())
666669
.join(",")
667670
);
671+
} else if let Some(dse) = a.downcast_ref::<DataSourceExec>() {
672+
let data_source = dse.data_source();
673+
if let Some(fse) = data_source.as_any().downcast_ref::<FileScanConfig>() {
674+
if let Some(p) = fse.file_source().as_any().downcast_ref::<ParquetSource>() {
675+
*out += &format!(
676+
"ParquetScan, files: {}",
677+
fse.file_groups.iter().flatten().map(|p| p.object_meta.location.to_string()).join(","),
678+
);
679+
} else {
680+
*out += &format!("{}", DefaultDisplay(dse));
681+
}
682+
} else {
683+
*out += &format!("{}", DefaultDisplay(dse));
684+
}
685+
668686
// TODO upgrade DF
669687
// } else if let Some(_) = a.downcast_ref::<SkipExec>() {
670688
// *out += "SkipRows";

0 commit comments

Comments
 (0)