Skip to content

Commit caf8ef5

Browse files
committed
chore(cubestore): Upgrade DF 46: Avoid or mark allowed deprecated MemoryExec and ParquetExec
1 parent f80a8de commit caf8ef5

File tree

13 files changed

+77
-59
lines changed

13 files changed

+77
-59
lines changed

rust/cubestore/cubestore/src/queryplanner/mod.rs

Lines changed: 13 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -5,6 +5,8 @@ mod partition_filter;
55
mod planning;
66
use datafusion::logical_expr::planner::ExprPlanner;
77
use datafusion::physical_plan::execution_plan::{Boundedness, EmissionType};
8+
use datafusion_datasource::memory::MemorySourceConfig;
9+
use datafusion_datasource::source::DataSourceExec;
810
// use datafusion::physical_plan::parquet::MetadataCacheFactory;
911
pub use planning::PlanningMeta;
1012
mod check_memory;
@@ -940,6 +942,17 @@ fn compute_workers(
940942
}
941943
}
942944

945+
/// Creates a [`DataSourceExec`] with a [`MemorySourceConfig`], i.e. the alternative to the
946+
/// deprecated `MemoryExec`. Useful when the [`MemorySourceConfig`] doesn't need sorting
947+
/// information.
948+
pub fn try_make_memory_data_source(
949+
partitions: &[Vec<RecordBatch>],
950+
schema: SchemaRef,
951+
projection: Option<Vec<usize>>,
952+
) -> Result<Arc<dyn ExecutionPlan>, DataFusionError> {
953+
Ok(Arc::new(DataSourceExec::new(Arc::new(MemorySourceConfig::try_new(partitions, schema, projection)?))))
954+
}
955+
943956
#[cfg(test)]
944957
pub mod tests {
945958
use super::*;

rust/cubestore/cubestore/src/queryplanner/optimizations/check_memory.rs

Lines changed: 6 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -1,20 +1,23 @@
11
use crate::queryplanner::check_memory::CheckMemoryExec;
22
use crate::queryplanner::query_executor::ClusterSendExec;
33
use crate::util::memory::MemoryHandler;
4-
use datafusion::datasource::physical_plan::ParquetExec;
54
use datafusion::datasource::source::DataSourceExec;
65
use datafusion::error::DataFusionError;
76
use datafusion::physical_plan::ExecutionPlan;
8-
use datafusion_datasource::memory::MemoryExec;
97
use std::sync::Arc;
108

119
/// Add `CheckMemoryExec` behind some nodes.
10+
#[allow(deprecated)]
1211
pub fn add_check_memory_exec(
1312
p: Arc<dyn ExecutionPlan>,
1413
mem_handler: Arc<dyn MemoryHandler>,
1514
) -> Result<Arc<dyn ExecutionPlan>, DataFusionError> {
15+
use datafusion::datasource::physical_plan::ParquetExec;
16+
use datafusion_datasource::memory::MemoryExec;
17+
1618
let p_any = p.as_any();
17-
// We supposedly don't use ParquetExec, which is deprecated in DF 46, anymore but we keep the check here in case we do.
19+
// We supposedly don't use ParquetExec or MemoryExec, which are deprecated in DF 46 (in favor of
20+
// DataSourceExec), anymore but we keep the check here in case we do.
1821
if p_any.is::<DataSourceExec>() || p_any.is::<ParquetExec>() || p_any.is::<MemoryExec>() || p_any.is::<ClusterSendExec>() {
1922
let memory_check = Arc::new(CheckMemoryExec::new(p, mem_handler.clone()));
2023
Ok(memory_check)

rust/cubestore/cubestore/src/queryplanner/optimizations/trace_data_loaded.rs

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,16 +1,19 @@
11
use crate::queryplanner::trace_data_loaded::{DataLoadedSize, TraceDataLoadedExec};
2-
use datafusion::datasource::physical_plan::{ParquetExec, ParquetSource};
2+
use datafusion::datasource::physical_plan::ParquetSource;
33
use datafusion::error::DataFusionError;
44
use datafusion::physical_plan::ExecutionPlan;
55
use datafusion_datasource::file_scan_config::FileScanConfig;
66
use datafusion_datasource::source::DataSourceExec;
77
use std::sync::Arc;
88

99
/// Add `TraceDataLoadedExec` behind ParquetExec or DataSourceExec (with File hence Parquet source) nodes.
10+
#[allow(deprecated)]
1011
pub fn add_trace_data_loaded_exec(
1112
p: Arc<dyn ExecutionPlan>,
1213
data_loaded_size: &Arc<DataLoadedSize>,
1314
) -> Result<Arc<dyn ExecutionPlan>, DataFusionError> {
15+
use datafusion::datasource::physical_plan::ParquetExec;
16+
1417
fn do_wrap(p: Arc<dyn ExecutionPlan>, data_loaded_size: &Arc<DataLoadedSize>) -> Result<Arc<dyn ExecutionPlan>, DataFusionError> {
1518
Ok(Arc::new(TraceDataLoadedExec::new(p, data_loaded_size.clone())))
1619
}

rust/cubestore/cubestore/src/queryplanner/pretty_printers.rs

Lines changed: 13 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -4,14 +4,13 @@ use bigdecimal::ToPrimitive;
44
use datafusion::arrow::datatypes::Schema;
55
use datafusion::common::tree_node::{TreeNode, TreeNodeRecursion, TreeNodeVisitor};
66
use datafusion::common::DFSchema;
7-
use datafusion::datasource::physical_plan::{ParquetExec, ParquetSource};
7+
use datafusion::datasource::physical_plan::ParquetSource;
88
use datafusion::datasource::{DefaultTableSource, TableProvider};
99
use datafusion::error::DataFusionError;
1010
use datafusion::logical_expr::{
1111
Aggregate, EmptyRelation, Explain, Extension, FetchType, Filter, Join, Limit, LogicalPlan, Projection, Repartition, SkipType, Sort, TableScan, Union, Window
1212
};
1313
use datafusion::physical_expr::{AcrossPartitions, ConstExpr};
14-
use datafusion::physical_optimizer::pruning;
1514
use datafusion::physical_plan::aggregates::{AggregateExec, AggregateMode};
1615
use datafusion::physical_plan::coalesce_batches::CoalesceBatchesExec;
1716
use datafusion::physical_plan::coalesce_partitions::CoalescePartitionsExec;
@@ -20,7 +19,7 @@ use datafusion::physical_plan::limit::{GlobalLimitExec, LocalLimitExec};
2019
use datafusion::physical_plan::{DefaultDisplay, ExecutionPlan, InputOrderMode, PlanProperties};
2120
use datafusion::prelude::Expr;
2221
use datafusion_datasource::file_scan_config::FileScanConfig;
23-
use datafusion_datasource::memory::MemoryExec;
22+
use datafusion_datasource::memory::MemorySourceConfig;
2423
use datafusion_datasource::source::DataSourceExec;
2524
use itertools::{repeat_n, Itertools};
2625
use std::sync::Arc;
@@ -491,7 +490,11 @@ fn pp_phys_plan_indented(p: &dyn ExecutionPlan, indent: usize, o: &PPOptions, ou
491490
pp_phys_plan_indented(c.as_ref(), indent + 2, o, out);
492491
}
493492

493+
#[allow(deprecated)]
494494
fn pp_instance(p: &dyn ExecutionPlan, indent: usize, o: &PPOptions, out: &mut String) {
495+
use datafusion::datasource::physical_plan::ParquetExec;
496+
use datafusion_datasource::memory::MemoryExec;
497+
495498
if indent != 0 {
496499
*out += "\n";
497500
}
@@ -675,7 +678,8 @@ fn pp_phys_plan_indented(p: &dyn ExecutionPlan, indent: usize, o: &PPOptions, ou
675678
);
676679
} else if let Some(dse) = a.downcast_ref::<DataSourceExec>() {
677680
let data_source = dse.data_source();
678-
if let Some(fse) = data_source.as_any().downcast_ref::<FileScanConfig>() {
681+
let data_source_any = data_source.as_any();
682+
if let Some(fse) = data_source_any.downcast_ref::<FileScanConfig>() {
679683
if let Some(p) = fse.file_source().as_any().downcast_ref::<ParquetSource>() {
680684
*out += &format!(
681685
"ParquetScan, files: {}",
@@ -701,6 +705,8 @@ fn pp_phys_plan_indented(p: &dyn ExecutionPlan, indent: usize, o: &PPOptions, ou
701705
} else {
702706
*out += &format!("{}", DefaultDisplay(dse));
703707
}
708+
} else if data_source_any.is::<MemorySourceConfig>() {
709+
*out += "MemoryScan";
704710
} else {
705711
*out += &format!("{}", DefaultDisplay(dse));
706712
}
@@ -712,8 +718,9 @@ fn pp_phys_plan_indented(p: &dyn ExecutionPlan, indent: usize, o: &PPOptions, ou
712718
// *out += "RollingWindowAgg";
713719
} else if let Some(_) = a.downcast_ref::<LastRowByUniqueKeyExec>() {
714720
*out += "LastRowByUniqueKey";
715-
} else if let Some(_) = a.downcast_ref::<MemoryExec>() {
716-
*out += "MemoryScan";
721+
} else if a.is::<MemoryExec>() {
722+
// We don't use MemoryExec any more.
723+
*out += "MemoryExec (ERROR: deprecated)";
717724
} else if let Some(r) = a.downcast_ref::<RepartitionExec>() {
718725
*out += &format!("Repartition, partitioning: {}", r.partitioning());
719726
} else {

rust/cubestore/cubestore/src/queryplanner/providers/query_cache.rs

Lines changed: 2 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,4 @@
1-
use crate::queryplanner::project_schema;
1+
use crate::queryplanner::{project_schema, try_make_memory_data_source};
22
use crate::sql::cache::{sql_result_cache_sizeof, SqlResultCache};
33
use async_trait::async_trait;
44
use datafusion::arrow::array::{Array, Int64Builder, StringBuilder};
@@ -15,7 +15,6 @@ use datafusion::physical_plan::{
1515
DisplayAs, DisplayFormatType, Partitioning, PlanProperties,
1616
};
1717
use datafusion::physical_plan::{ExecutionPlan, SendableRecordBatchStream};
18-
use datafusion_datasource::memory::MemoryExec;
1918
use std::any::Any;
2019
use std::fmt;
2120
use std::fmt::{Debug, Formatter};
@@ -182,7 +181,7 @@ impl ExecutionPlan for InfoSchemaQueryCacheTableExec {
182181

183182
// TODO: Please migrate to real streaming, if we are going to expose query results
184183
let mem_exec =
185-
MemoryExec::try_new(&vec![vec![batch]], self.schema(), self.projection.clone())?;
184+
try_make_memory_data_source(&vec![vec![batch]], self.schema(), self.projection.clone())?;
186185
mem_exec.execute(partition, context)
187186
}
188187
}

rust/cubestore/cubestore/src/queryplanner/query_executor.rs

Lines changed: 6 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -25,7 +25,7 @@ use crate::{app_metrics, CubeError};
2525
use async_trait::async_trait;
2626
use datafusion::config::TableParquetOptions;
2727
use datafusion::physical_plan::execution_plan::{Boundedness, EmissionType};
28-
use datafusion_datasource::memory::MemoryExec;
28+
use datafusion_datasource::memory::MemorySourceConfig;
2929
use datafusion_datasource::source::DataSourceExec;
3030
use core::fmt;
3131
use datafusion::arrow::array::{
@@ -102,7 +102,7 @@ use super::udfs::{
102102
registerable_arc_aggregate_udfs,
103103
registerable_arc_scalar_udfs,
104104
};
105-
use super::QueryPlannerImpl;
105+
use super::{try_make_memory_data_source, QueryPlannerImpl};
106106

107107
#[automock]
108108
#[async_trait]
@@ -748,19 +748,14 @@ impl CubeTable {
748748
)));
749749
}
750750
}
751-
Arc::new(
752-
MemoryExec::try_new(
753-
&[record_batches.clone()],
754-
index_schema.clone(),
755-
index_projection_or_none_on_schema_match.clone(),
756-
)?
751+
Arc::new(DataSourceExec::new(Arc::new(MemorySourceConfig::try_new(&[record_batches.clone()], index_schema.clone(), index_projection_or_none_on_schema_match.clone())?
757752
.try_with_sort_information(vec![
758753
LexOrdering::new(lex_ordering_for_index(
759754
self.index_snapshot.index.get_row(),
760755
&index_projection_schema,
761756
)?),
762757
])?,
763-
)
758+
)))
764759
} else {
765760
let remote_path = chunk.get_row().get_full_name(chunk.get_id());
766761
let local_path = self
@@ -1787,11 +1782,11 @@ impl TableProvider for InlineTableProvider {
17871782
// TODO batch_size
17881783
let batches = dataframe_to_batches(self.data.as_ref(), 16384)?;
17891784
let projection = projection.cloned();
1790-
Ok(Arc::new(MemoryExec::try_new(
1785+
Ok(try_make_memory_data_source(
17911786
&vec![batches],
17921787
schema.clone(),
17931788
projection,
1794-
)?))
1789+
)?)
17951790
}
17961791

17971792
fn table_type(&self) -> TableType {

rust/cubestore/cubestore/src/queryplanner/tail_limit.rs

Lines changed: 13 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -204,11 +204,12 @@ impl RecordBatchStream for TailLimitStream {
204204

205205
#[cfg(test)]
206206
mod tests {
207+
use crate::queryplanner::try_make_memory_data_source;
208+
207209
use super::*;
208210
use datafusion::arrow::array::Int64Array;
209211
use datafusion::arrow::datatypes::{DataType, Field, Schema};
210212
use datafusion::physical_plan::collect as result_collect;
211-
use datafusion_datasource::memory::MemoryExec;
212213
use itertools::Itertools;
213214

214215
fn ints_schema() -> SchemaRef {
@@ -238,7 +239,7 @@ mod tests {
238239

239240
let schema = ints_schema();
240241
let inp =
241-
Arc::new(MemoryExec::try_new(&vec![input.clone()], schema.clone(), None).unwrap());
242+
try_make_memory_data_source(&vec![input.clone()], schema.clone(), None).unwrap();
242243
let r = result_collect(
243244
Arc::new(TailLimitExec::new(inp, 3)),
244245
Arc::new(TaskContext::default()),
@@ -251,7 +252,7 @@ mod tests {
251252
);
252253

253254
let inp =
254-
Arc::new(MemoryExec::try_new(&vec![input.clone()], schema.clone(), None).unwrap());
255+
try_make_memory_data_source(&vec![input.clone()], schema.clone(), None).unwrap();
255256
let r = result_collect(
256257
Arc::new(TailLimitExec::new(inp, 4)),
257258
Arc::new(TaskContext::default()),
@@ -264,7 +265,7 @@ mod tests {
264265
);
265266

266267
let inp =
267-
Arc::new(MemoryExec::try_new(&vec![input.clone()], schema.clone(), None).unwrap());
268+
try_make_memory_data_source(&vec![input.clone()], schema.clone(), None).unwrap();
268269
let r = result_collect(
269270
Arc::new(TailLimitExec::new(inp, 8)),
270271
Arc::new(TaskContext::default()),
@@ -277,7 +278,7 @@ mod tests {
277278
);
278279

279280
let inp =
280-
Arc::new(MemoryExec::try_new(&vec![input.clone()], schema.clone(), None).unwrap());
281+
try_make_memory_data_source(&vec![input.clone()], schema.clone(), None).unwrap();
281282
let r = result_collect(
282283
Arc::new(TailLimitExec::new(inp, 1)),
283284
Arc::new(TaskContext::default()),
@@ -287,7 +288,7 @@ mod tests {
287288
assert_eq!(to_ints(r).into_iter().flatten().collect_vec(), vec![4],);
288289

289290
let inp =
290-
Arc::new(MemoryExec::try_new(&vec![input.clone()], schema.clone(), None).unwrap());
291+
try_make_memory_data_source(&vec![input.clone()], schema.clone(), None).unwrap();
291292
let r = result_collect(
292293
Arc::new(TailLimitExec::new(inp, 0)),
293294
Arc::new(TaskContext::default()),
@@ -309,7 +310,7 @@ mod tests {
309310

310311
let schema = ints_schema();
311312
let inp =
312-
Arc::new(MemoryExec::try_new(&vec![input.clone()], schema.clone(), None).unwrap());
313+
try_make_memory_data_source(&vec![input.clone()], schema.clone(), None).unwrap();
313314
let r = result_collect(
314315
Arc::new(TailLimitExec::new(inp, 2)),
315316
Arc::new(TaskContext::default()),
@@ -319,7 +320,7 @@ mod tests {
319320
assert_eq!(to_ints(r).into_iter().flatten().collect_vec(), vec![9, 10],);
320321

321322
let inp =
322-
Arc::new(MemoryExec::try_new(&vec![input.clone()], schema.clone(), None).unwrap());
323+
try_make_memory_data_source(&vec![input.clone()], schema.clone(), None).unwrap();
323324
let r = result_collect(
324325
Arc::new(TailLimitExec::new(inp, 3)),
325326
Arc::new(TaskContext::default()),
@@ -332,7 +333,7 @@ mod tests {
332333
);
333334

334335
let inp =
335-
Arc::new(MemoryExec::try_new(&vec![input.clone()], schema.clone(), None).unwrap());
336+
try_make_memory_data_source(&vec![input.clone()], schema.clone(), None).unwrap();
336337
let r = result_collect(
337338
Arc::new(TailLimitExec::new(inp, 4)),
338339
Arc::new(TaskContext::default()),
@@ -345,7 +346,7 @@ mod tests {
345346
);
346347

347348
let inp =
348-
Arc::new(MemoryExec::try_new(&vec![input.clone()], schema.clone(), None).unwrap());
349+
try_make_memory_data_source(&vec![input.clone()], schema.clone(), None).unwrap();
349350
let r = result_collect(
350351
Arc::new(TailLimitExec::new(inp, 5)),
351352
Arc::new(TaskContext::default()),
@@ -358,7 +359,7 @@ mod tests {
358359
);
359360

360361
let inp =
361-
Arc::new(MemoryExec::try_new(&vec![input.clone()], schema.clone(), None).unwrap());
362+
try_make_memory_data_source(&vec![input.clone()], schema.clone(), None).unwrap();
362363
let r = result_collect(
363364
Arc::new(TailLimitExec::new(inp, 10)),
364365
Arc::new(TaskContext::default()),
@@ -371,7 +372,7 @@ mod tests {
371372
);
372373

373374
let inp =
374-
Arc::new(MemoryExec::try_new(&vec![input.clone()], schema.clone(), None).unwrap());
375+
try_make_memory_data_source(&vec![input.clone()], schema.clone(), None).unwrap();
375376
let r = result_collect(
376377
Arc::new(TailLimitExec::new(inp, 100)),
377378
Arc::new(TaskContext::default()),

rust/cubestore/cubestore/src/queryplanner/topk/execute.rs

Lines changed: 4 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,6 @@
11
use crate::queryplanner::topk::util::{append_value, create_builder};
22
use crate::queryplanner::topk::SortColumn;
3+
use crate::queryplanner::try_make_memory_data_source;
34
use crate::queryplanner::udfs::read_sketch;
45
use datafusion::arrow::array::{ArrayBuilder, ArrayRef, StringBuilder};
56
use datafusion::arrow::compute::{concat_batches, SortOptions};
@@ -23,7 +24,6 @@ use datafusion::physical_plan::{
2324
Partitioning, PhysicalExpr, PlanProperties, SendableRecordBatchStream,
2425
};
2526
use datafusion::scalar::ScalarValue;
26-
use datafusion_datasource::memory::MemoryExec;
2727
use flatbuffers::bitflags::_core::cmp::Ordering;
2828
use futures::{Stream, StreamExt};
2929
use itertools::Itertools;
@@ -640,11 +640,11 @@ impl TopKState<'_> {
640640
let schema = new_batch.schema();
641641
let filter_exec = Arc::new(FilterExec::try_new(
642642
having.clone(),
643-
Arc::new(MemoryExec::try_new(
643+
try_make_memory_data_source(
644644
&vec![vec![new_batch]],
645645
schema.clone(),
646646
None,
647-
)?),
647+
)?,
648648
)?);
649649
let batches_stream =
650650
GlobalLimitExec::new(filter_exec, 0, Some(self.limit - self.result.num_rows()))
@@ -1051,7 +1051,6 @@ mod tests {
10511051
use datafusion::physical_plan::ExecutionPlan;
10521052
use datafusion::physical_planner::create_aggregate_expr_and_maybe_filter;
10531053
use datafusion::prelude::Expr;
1054-
use datafusion_datasource::memory::MemoryExec;
10551054
use futures::StreamExt;
10561055
use itertools::Itertools;
10571056

@@ -1550,7 +1549,7 @@ mod tests {
15501549
inputs: Vec<Vec<RecordBatch>>,
15511550
context: Arc<TaskContext>,
15521551
) -> Result<RecordBatch, DataFusionError> {
1553-
let input = Arc::new(MemoryExec::try_new(&inputs, proto.cluster.schema(), None)?);
1552+
let input = try_make_memory_data_source(&inputs, proto.cluster.schema(), None)?;
15541553
let results = proto
15551554
.with_new_children(vec![input])?
15561555
.execute(0, context)?

0 commit comments

Comments
 (0)