Skip to content

Commit 437b5c3

Browse files
committed
chore(cubestore): Upgrade DF 46: Pass customizer more completely and avoid ParquetExec
1 parent 8928842 commit 437b5c3

File tree

9 files changed

+111
-71
lines changed

9 files changed

+111
-71
lines changed

rust/cubestore/Cargo.lock

Lines changed: 3 additions & 3 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

rust/cubestore/cubestore/src/queryplanner/mod.rs

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -264,6 +264,7 @@ impl QueryPlannerImpl {
264264
}
265265

266266
pub fn make_execution_context() -> SessionContext {
267+
// TODO upgrade DF: Remove this -- use metadata_cache_factory.make_session_config()
267268
Self::execution_context_helper(SessionConfig::new())
268269
}
269270

rust/cubestore/cubestore/src/queryplanner/optimizations/check_memory.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -14,7 +14,7 @@ pub fn add_check_memory_exec(
1414
mem_handler: Arc<dyn MemoryHandler>,
1515
) -> Result<Arc<dyn ExecutionPlan>, DataFusionError> {
1616
let p_any = p.as_any();
17-
// TODO upgrade DF: Do we use ParquetExec? Or just DataSourceExec? It's fine to have both here.
17+
// We supposedly don't use ParquetExec, which is deprecated in DF 46, anymore but we keep the check here in case we do.
1818
if p_any.is::<DataSourceExec>() || p_any.is::<ParquetExec>() || p_any.is::<MemoryExec>() || p_any.is::<ClusterSendExec>() {
1919
let memory_check = Arc::new(CheckMemoryExec::new(p, mem_handler.clone()));
2020
Ok(memory_check)

rust/cubestore/cubestore/src/queryplanner/optimizations/mod.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -169,7 +169,7 @@ fn finalize_physical_plan(
169169
let p = rewrite_physical_plan(p, &mut |p| add_check_memory_exec(p, memory_handler.clone()))?;
170170
let p = if let Some(data_loaded_size) = data_loaded_size {
171171
rewrite_physical_plan(p, &mut |p| {
172-
add_trace_data_loaded_exec(p, data_loaded_size.clone())
172+
add_trace_data_loaded_exec(p, &data_loaded_size)
173173
})?
174174
} else {
175175
p
Lines changed: 18 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -1,19 +1,30 @@
11
use crate::queryplanner::trace_data_loaded::{DataLoadedSize, TraceDataLoadedExec};
2-
use datafusion::datasource::physical_plan::ParquetExec;
2+
use datafusion::datasource::physical_plan::{ParquetExec, ParquetSource};
33
use datafusion::error::DataFusionError;
44
use datafusion::physical_plan::ExecutionPlan;
5+
use datafusion_datasource::file_scan_config::FileScanConfig;
6+
use datafusion_datasource::source::DataSourceExec;
57
use std::sync::Arc;
68

7-
/// Add `TraceDataLoadedExec` behind ParquetExec nodes.
9+
/// Add `TraceDataLoadedExec` behind ParquetExec or DataSourceExec (with File hence Parquet source) nodes.
810
pub fn add_trace_data_loaded_exec(
911
p: Arc<dyn ExecutionPlan>,
10-
data_loaded_size: Arc<DataLoadedSize>,
12+
data_loaded_size: &Arc<DataLoadedSize>,
1113
) -> Result<Arc<dyn ExecutionPlan>, DataFusionError> {
14+
fn do_wrap(p: Arc<dyn ExecutionPlan>, data_loaded_size: &Arc<DataLoadedSize>) -> Result<Arc<dyn ExecutionPlan>, DataFusionError> {
15+
Ok(Arc::new(TraceDataLoadedExec::new(p, data_loaded_size.clone())))
16+
}
17+
1218
let p_any = p.as_any();
1319
if p_any.is::<ParquetExec>() {
14-
let trace_data_loaded = Arc::new(TraceDataLoadedExec::new(p, data_loaded_size.clone()));
15-
Ok(trace_data_loaded)
16-
} else {
17-
Ok(p)
20+
// ParquetExec is deprecated in DF 46 and we don't use it; we shouldn't hit this case, but we keep it just in case.
21+
return do_wrap(p, data_loaded_size);
22+
} else if let Some(dse) = p_any.downcast_ref::<DataSourceExec>() {
23+
if let Some(file_scan) = dse.data_source().as_any().downcast_ref::<FileScanConfig>() {
24+
if file_scan.file_source().as_any().is::<ParquetSource>() {
25+
return do_wrap(p, data_loaded_size);
26+
}
27+
}
1828
}
29+
Ok(p)
1930
}

rust/cubestore/cubestore/src/queryplanner/pretty_printers.rs

Lines changed: 21 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -4,7 +4,7 @@ use bigdecimal::ToPrimitive;
44
use datafusion::arrow::datatypes::Schema;
55
use datafusion::common::tree_node::{TreeNode, TreeNodeRecursion, TreeNodeVisitor};
66
use datafusion::common::DFSchema;
7-
use datafusion::datasource::physical_plan::ParquetExec;
7+
use datafusion::datasource::physical_plan::{ParquetExec, ParquetSource};
88
use datafusion::datasource::{DefaultTableSource, TableProvider};
99
use datafusion::error::DataFusionError;
1010
use datafusion::logical_expr::{
@@ -15,9 +15,11 @@ use datafusion::physical_plan::aggregates::{AggregateExec, AggregateMode};
1515
use datafusion::physical_plan::coalesce_partitions::CoalescePartitionsExec;
1616
use datafusion::physical_plan::filter::FilterExec;
1717
use datafusion::physical_plan::limit::{GlobalLimitExec, LocalLimitExec};
18-
use datafusion::physical_plan::{ExecutionPlan, InputOrderMode, PlanProperties};
18+
use datafusion::physical_plan::{DefaultDisplay, DisplayAs, DisplayFormatType, ExecutionPlan, InputOrderMode, PlanProperties};
1919
use datafusion::prelude::Expr;
20+
use datafusion_datasource::file_scan_config::FileScanConfig;
2021
use datafusion_datasource::memory::MemoryExec;
22+
use datafusion_datasource::source::DataSourceExec;
2123
use itertools::{repeat_n, Itertools};
2224
use std::sync::Arc;
2325

@@ -656,15 +658,31 @@ fn pp_phys_plan_indented(p: &dyn ExecutionPlan, indent: usize, o: &PPOptions, ou
656658
} else if let Some(_) = a.downcast_ref::<FilterByKeyRangeExec>() {
657659
*out += "FilterByKeyRange";
658660
} else if let Some(p) = a.downcast_ref::<ParquetExec>() {
661+
// We don't use ParquetExec any more.
659662
*out += &format!(
660-
"ParquetScan, files: {}",
663+
"ParquetExec (ERROR: deprecated), files: {}",
661664
p.base_config()
662665
.file_groups
663666
.iter()
664667
.flatten()
665668
.map(|p| p.object_meta.location.to_string())
666669
.join(",")
667670
);
671+
} else if let Some(dse) = a.downcast_ref::<DataSourceExec>() {
672+
let data_source = dse.data_source();
673+
if let Some(fse) = data_source.as_any().downcast_ref::<FileScanConfig>() {
674+
if let Some(p) = fse.file_source().as_any().downcast_ref::<ParquetSource>() {
675+
*out += &format!(
676+
"ParquetScan, files: {}",
677+
fse.file_groups.iter().flatten().map(|p| p.object_meta.location.to_string()).join(","),
678+
);
679+
} else {
680+
*out += &format!("{}", DefaultDisplay(dse));
681+
}
682+
} else {
683+
*out += &format!("{}", DefaultDisplay(dse));
684+
}
685+
668686
// TODO upgrade DF
669687
// } else if let Some(_) = a.downcast_ref::<SkipExec>() {
670688
// *out += "SkipRows";

rust/cubestore/cubestore/src/queryplanner/query_executor.rs

Lines changed: 33 additions & 27 deletions
Original file line numberDiff line numberDiff line change
@@ -17,14 +17,16 @@ use crate::queryplanner::serialized_plan::{IndexSnapshot, RowFilter, RowRange, S
1717
use crate::queryplanner::trace_data_loaded::DataLoadedSize;
1818
use crate::store::DataFrame;
1919
use crate::table::data::rows_to_columns;
20-
use crate::table::parquet::{parquet_source, CubestoreParquetMetadataCache};
20+
use crate::table::parquet::CubestoreParquetMetadataCache;
2121
use crate::table::{Row, TableValue, TimestampValue};
2222
use crate::telemetry::suboptimal_query_plan_event;
2323
use crate::util::memory::MemoryHandler;
2424
use crate::{app_metrics, CubeError};
2525
use async_trait::async_trait;
26+
use datafusion::config::TableParquetOptions;
2627
use datafusion::physical_plan::execution_plan::{Boundedness, EmissionType};
2728
use datafusion_datasource::memory::MemoryExec;
29+
use datafusion_datasource::source::DataSourceExec;
2830
use core::fmt;
2931
use datafusion::arrow::array::{
3032
make_array, Array, ArrayRef, BinaryArray, BooleanArray, Decimal128Array, Float64Array,
@@ -40,9 +42,9 @@ use datafusion::catalog::Session;
4042
use datafusion::common::ToDFSchema;
4143
use datafusion::datasource::listing::PartitionedFile;
4244
use datafusion::datasource::object_store::ObjectStoreUrl;
43-
use datafusion::datasource::physical_plan::parquet::ParquetExecBuilder;
45+
use datafusion::datasource::physical_plan::parquet::get_reader_options_customizer;
4446
use datafusion::datasource::physical_plan::{
45-
FileScanConfig, ParquetExec, ParquetFileReaderFactory, ParquetSource,
47+
FileScanConfig, ParquetFileReaderFactory, ParquetSource,
4648
};
4749
use datafusion::datasource::{TableProvider, TableType};
4850
use datafusion::error::DataFusionError;
@@ -401,7 +403,7 @@ impl QueryExecutorImpl {
401403
serialized_plan: Arc<PreSerializedPlan>,
402404
) -> Result<Arc<SessionContext>, CubeError> {
403405
let runtime = Arc::new(RuntimeEnv::default());
404-
let config = Self::session_config();
406+
let config = self.session_config();
405407
let session_state = SessionStateBuilder::new()
406408
.with_config(config)
407409
.with_runtime_env(runtime)
@@ -455,7 +457,7 @@ impl QueryExecutorImpl {
455457
data_loaded_size: Option<Arc<DataLoadedSize>>,
456458
) -> Result<Arc<SessionContext>, CubeError> {
457459
let runtime = Arc::new(RuntimeEnv::default());
458-
let config = Self::session_config();
460+
let config = self.session_config();
459461
let session_state = SessionStateBuilder::new()
460462
.with_config(config)
461463
.with_runtime_env(runtime)
@@ -474,8 +476,8 @@ impl QueryExecutorImpl {
474476
Ok(Arc::new(ctx))
475477
}
476478

477-
fn session_config() -> SessionConfig {
478-
let mut config = SessionConfig::new()
479+
fn session_config(&self) -> SessionConfig {
480+
let mut config = self.metadata_cache_factory.make_session_config()
479481
.with_batch_size(4096)
480482
// TODO upgrade DF if less than 2 then there will be no MergeJoin. Decide on repartitioning.
481483
.with_target_partitions(2)
@@ -693,8 +695,16 @@ impl CubeTable {
693695
.get(remote_path.as_str())
694696
.expect(format!("Missing remote path {}", remote_path).as_str());
695697

698+
let parquet_source = ParquetSource::new(TableParquetOptions::default(), get_reader_options_customizer(state.config()))
699+
.with_parquet_file_reader_factory(self.parquet_metadata_cache.clone());
700+
let parquet_source = if let Some(phys_pred) = &physical_predicate {
701+
parquet_source.with_predicate(index_schema.clone(), phys_pred.clone())
702+
} else {
703+
parquet_source
704+
};
705+
696706
let file_scan =
697-
FileScanConfig::new(ObjectStoreUrl::local_filesystem(), index_schema.clone(), parquet_source())
707+
FileScanConfig::new(ObjectStoreUrl::local_filesystem(), index_schema.clone(), Arc::new(parquet_source))
698708
.with_file(PartitionedFile::from_path(local_path.to_string())?)
699709
.with_projection(index_projection_or_none_on_schema_match.clone())
700710
.with_output_ordering(vec![LexOrdering::new((0..key_len)
@@ -710,16 +720,11 @@ impl CubeTable {
710720
))
711721
})
712722
.collect::<Result<Vec<_>, _>>()?)]);
713-
let parquet_exec_builder = ParquetExecBuilder::new(file_scan)
714-
.with_parquet_file_reader_factory(self.parquet_metadata_cache.clone());
715-
let parquet_exec_builder = if let Some(phys_pred) = &physical_predicate {
716-
parquet_exec_builder.with_predicate(phys_pred.clone())
717-
} else {
718-
parquet_exec_builder
719-
};
720-
let parquet_exec = parquet_exec_builder.build();
721723

722-
let arc: Arc<dyn ExecutionPlan> = Arc::new(parquet_exec);
724+
725+
let data_source_exec = DataSourceExec::new(Arc::new(file_scan));
726+
727+
let arc: Arc<dyn ExecutionPlan> = Arc::new(data_source_exec);
723728
let arc = FilterByKeyRangeExec::issue_filters(arc, filter.clone(), key_len);
724729
partition_execs.push(arc);
725730
}
@@ -763,7 +768,15 @@ impl CubeTable {
763768
.get(&remote_path)
764769
.expect(format!("Missing remote path {}", remote_path).as_str());
765770

766-
let file_scan = FileScanConfig::new(ObjectStoreUrl::local_filesystem(), index_schema.clone(), parquet_source())
771+
let parquet_source = ParquetSource::new(TableParquetOptions::default(), get_reader_options_customizer(state.config()))
772+
.with_parquet_file_reader_factory(self.parquet_metadata_cache.clone());
773+
let parquet_source = if let Some(phys_pred) = &physical_predicate {
774+
parquet_source.with_predicate(index_schema.clone(), phys_pred.clone())
775+
} else {
776+
parquet_source
777+
};
778+
779+
let file_scan = FileScanConfig::new(ObjectStoreUrl::local_filesystem(), index_schema.clone(), Arc::new(parquet_source))
767780
.with_file(PartitionedFile::from_path(local_path.to_string())?)
768781
.with_projection(index_projection_or_none_on_schema_match.clone())
769782
.with_output_ordering(vec![LexOrdering::new((0..key_len).map(|i| -> Result<_, DataFusionError> { Ok(PhysicalSortExpr::new(
@@ -773,16 +786,9 @@ impl CubeTable {
773786
SortOptions::default(),
774787
))}).collect::<Result<Vec<_>, _>>()?)])
775788
;
776-
let parquet_exec_builder = ParquetExecBuilder::new(file_scan)
777-
.with_parquet_file_reader_factory(self.parquet_metadata_cache.clone());
778-
let parquet_exec_builder = if let Some(phys_pred) = &physical_predicate {
779-
parquet_exec_builder.with_predicate(phys_pred.clone())
780-
} else {
781-
parquet_exec_builder
782-
};
783-
let parquet_exec = parquet_exec_builder.build();
784789

785-
Arc::new(parquet_exec)
790+
let data_source_exec = DataSourceExec::new(Arc::new(file_scan));
791+
Arc::new(data_source_exec)
786792
};
787793

788794
let node = FilterByKeyRangeExec::issue_filters(node, filter.clone(), key_len);

0 commit comments

Comments
 (0)