Skip to content

Commit 6dfff86

Browse files
committed
chore(cubestore): Upgrade DF: Avoid or mark allowed deprecated MemoryExec and ParquetExec
1 parent cc97085 commit 6dfff86

File tree

13 files changed

+86
-74
lines changed

13 files changed

+86
-74
lines changed

rust/cubestore/cubestore/src/queryplanner/mod.rs

Lines changed: 15 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -5,6 +5,8 @@ mod partition_filter;
55
mod planning;
66
use datafusion::logical_expr::planner::ExprPlanner;
77
use datafusion::physical_plan::execution_plan::{Boundedness, EmissionType};
8+
use datafusion_datasource::memory::MemorySourceConfig;
9+
use datafusion_datasource::source::DataSourceExec;
810
// use datafusion::physical_plan::parquet::MetadataCacheFactory;
911
pub use planning::PlanningMeta;
1012
mod check_memory;
@@ -951,6 +953,19 @@ fn compute_workers(
951953
}
952954
}
953955

956+
/// Creates a [`DataSourceExec`] with a [`MemorySourceConfig`], i.e. the alternative to the
957+
/// deprecated `MemoryExec`. Useful when the [`MemorySourceConfig`] doesn't need sorting
958+
/// information.
959+
pub fn try_make_memory_data_source(
960+
partitions: &[Vec<RecordBatch>],
961+
schema: SchemaRef,
962+
projection: Option<Vec<usize>>,
963+
) -> Result<Arc<dyn ExecutionPlan>, DataFusionError> {
964+
Ok(Arc::new(DataSourceExec::new(Arc::new(
965+
MemorySourceConfig::try_new(partitions, schema, projection)?,
966+
))))
967+
}
968+
954969
#[cfg(test)]
955970
pub mod tests {
956971
use super::*;

rust/cubestore/cubestore/src/queryplanner/optimizations/check_memory.rs

Lines changed: 6 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -1,20 +1,23 @@
11
use crate::queryplanner::check_memory::CheckMemoryExec;
22
use crate::queryplanner::query_executor::ClusterSendExec;
33
use crate::util::memory::MemoryHandler;
4-
use datafusion::datasource::physical_plan::ParquetExec;
54
use datafusion::datasource::source::DataSourceExec;
65
use datafusion::error::DataFusionError;
76
use datafusion::physical_plan::ExecutionPlan;
8-
use datafusion_datasource::memory::MemoryExec;
97
use std::sync::Arc;
108

119
/// Add `CheckMemoryExec` behind some nodes.
10+
#[allow(deprecated)]
1211
pub fn add_check_memory_exec(
1312
p: Arc<dyn ExecutionPlan>,
1413
mem_handler: Arc<dyn MemoryHandler>,
1514
) -> Result<Arc<dyn ExecutionPlan>, DataFusionError> {
15+
use datafusion::datasource::physical_plan::ParquetExec;
16+
use datafusion_datasource::memory::MemoryExec;
17+
1618
let p_any = p.as_any();
17-
// We supposedly don't use ParquetExec, which is deprecated in DF 46, anymore but we keep the check here in case we do.
19+
// We supposedly don't use ParquetExec or MemoryExec, which are deprecated in DF 46 (in favor of
20+
// DataSourceExec), anymore but we keep the check here in case we do.
1821
if p_any.is::<DataSourceExec>()
1922
|| p_any.is::<ParquetExec>()
2023
|| p_any.is::<MemoryExec>()

rust/cubestore/cubestore/src/queryplanner/optimizations/trace_data_loaded.rs

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,16 +1,19 @@
11
use crate::queryplanner::trace_data_loaded::{DataLoadedSize, TraceDataLoadedExec};
2-
use datafusion::datasource::physical_plan::{ParquetExec, ParquetSource};
2+
use datafusion::datasource::physical_plan::ParquetSource;
33
use datafusion::error::DataFusionError;
44
use datafusion::physical_plan::ExecutionPlan;
55
use datafusion_datasource::file_scan_config::FileScanConfig;
66
use datafusion_datasource::source::DataSourceExec;
77
use std::sync::Arc;
88

99
/// Add `TraceDataLoadedExec` behind ParquetExec or DataSourceExec (with File hence Parquet source) nodes.
10+
#[allow(deprecated)]
1011
pub fn add_trace_data_loaded_exec(
1112
p: Arc<dyn ExecutionPlan>,
1213
data_loaded_size: &Arc<DataLoadedSize>,
1314
) -> Result<Arc<dyn ExecutionPlan>, DataFusionError> {
15+
use datafusion::datasource::physical_plan::ParquetExec;
16+
1417
fn do_wrap(
1518
p: Arc<dyn ExecutionPlan>,
1619
data_loaded_size: &Arc<DataLoadedSize>,

rust/cubestore/cubestore/src/queryplanner/pretty_printers.rs

Lines changed: 13 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -4,15 +4,14 @@ use bigdecimal::ToPrimitive;
44
use datafusion::arrow::datatypes::Schema;
55
use datafusion::common::tree_node::{TreeNode, TreeNodeRecursion, TreeNodeVisitor};
66
use datafusion::common::DFSchema;
7-
use datafusion::datasource::physical_plan::{ParquetExec, ParquetSource};
7+
use datafusion::datasource::physical_plan::ParquetSource;
88
use datafusion::datasource::{DefaultTableSource, TableProvider};
99
use datafusion::error::DataFusionError;
1010
use datafusion::logical_expr::{
1111
Aggregate, EmptyRelation, Explain, Extension, FetchType, Filter, Join, Limit, LogicalPlan,
1212
Projection, Repartition, SkipType, Sort, TableScan, Union, Window,
1313
};
1414
use datafusion::physical_expr::{AcrossPartitions, ConstExpr};
15-
use datafusion::physical_optimizer::pruning;
1615
use datafusion::physical_plan::aggregates::{AggregateExec, AggregateMode};
1716
use datafusion::physical_plan::coalesce_batches::CoalesceBatchesExec;
1817
use datafusion::physical_plan::coalesce_partitions::CoalescePartitionsExec;
@@ -21,7 +20,7 @@ use datafusion::physical_plan::limit::{GlobalLimitExec, LocalLimitExec};
2120
use datafusion::physical_plan::{DefaultDisplay, ExecutionPlan, InputOrderMode, PlanProperties};
2221
use datafusion::prelude::Expr;
2322
use datafusion_datasource::file_scan_config::FileScanConfig;
24-
use datafusion_datasource::memory::MemoryExec;
23+
use datafusion_datasource::memory::MemorySourceConfig;
2524
use datafusion_datasource::source::DataSourceExec;
2625
use itertools::{repeat_n, Itertools};
2726
use std::sync::Arc;
@@ -509,7 +508,11 @@ fn pp_phys_plan_indented(p: &dyn ExecutionPlan, indent: usize, o: &PPOptions, ou
509508
pp_phys_plan_indented(c.as_ref(), indent + 2, o, out);
510509
}
511510

511+
#[allow(deprecated)]
512512
fn pp_instance(p: &dyn ExecutionPlan, indent: usize, o: &PPOptions, out: &mut String) {
513+
use datafusion::datasource::physical_plan::ParquetExec;
514+
use datafusion_datasource::memory::MemoryExec;
515+
513516
if indent != 0 {
514517
*out += "\n";
515518
}
@@ -693,7 +696,8 @@ fn pp_phys_plan_indented(p: &dyn ExecutionPlan, indent: usize, o: &PPOptions, ou
693696
);
694697
} else if let Some(dse) = a.downcast_ref::<DataSourceExec>() {
695698
let data_source = dse.data_source();
696-
if let Some(fse) = data_source.as_any().downcast_ref::<FileScanConfig>() {
699+
let data_source_any = data_source.as_any();
700+
if let Some(fse) = data_source_any.downcast_ref::<FileScanConfig>() {
697701
if let Some(p) = fse.file_source().as_any().downcast_ref::<ParquetSource>() {
698702
*out += &format!(
699703
"ParquetScan, files: {}",
@@ -723,6 +727,8 @@ fn pp_phys_plan_indented(p: &dyn ExecutionPlan, indent: usize, o: &PPOptions, ou
723727
} else {
724728
*out += &format!("{}", DefaultDisplay(dse));
725729
}
730+
} else if data_source_any.is::<MemorySourceConfig>() {
731+
*out += "MemoryScan";
726732
} else {
727733
*out += &format!("{}", DefaultDisplay(dse));
728734
}
@@ -734,8 +740,9 @@ fn pp_phys_plan_indented(p: &dyn ExecutionPlan, indent: usize, o: &PPOptions, ou
734740
// *out += "RollingWindowAgg";
735741
} else if let Some(_) = a.downcast_ref::<LastRowByUniqueKeyExec>() {
736742
*out += "LastRowByUniqueKey";
737-
} else if let Some(_) = a.downcast_ref::<MemoryExec>() {
738-
*out += "MemoryScan";
743+
} else if a.is::<MemoryExec>() {
744+
// We don't use MemoryExec any more.
745+
*out += "MemoryExec (ERROR: deprecated)";
739746
} else if let Some(r) = a.downcast_ref::<RepartitionExec>() {
740747
*out += &format!("Repartition, partitioning: {}", r.partitioning());
741748
} else {

rust/cubestore/cubestore/src/queryplanner/providers/query_cache.rs

Lines changed: 6 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,4 @@
1-
use crate::queryplanner::project_schema;
1+
use crate::queryplanner::{project_schema, try_make_memory_data_source};
22
use crate::sql::cache::{sql_result_cache_sizeof, SqlResultCache};
33
use async_trait::async_trait;
44
use datafusion::arrow::array::{Array, Int64Builder, StringBuilder};
@@ -13,7 +13,6 @@ use datafusion::physical_expr::EquivalenceProperties;
1313
use datafusion::physical_plan::execution_plan::{Boundedness, EmissionType};
1414
use datafusion::physical_plan::{DisplayAs, DisplayFormatType, Partitioning, PlanProperties};
1515
use datafusion::physical_plan::{ExecutionPlan, SendableRecordBatchStream};
16-
use datafusion_datasource::memory::MemoryExec;
1716
use std::any::Any;
1817
use std::fmt;
1918
use std::fmt::{Debug, Formatter};
@@ -179,8 +178,11 @@ impl ExecutionPlan for InfoSchemaQueryCacheTableExec {
179178
let batch = RecordBatch::try_new(get_schema(), data.to_vec())?;
180179

181180
// TODO: Please migrate to real streaming, if we are going to expose query results
182-
let mem_exec =
183-
MemoryExec::try_new(&vec![vec![batch]], self.schema(), self.projection.clone())?;
181+
let mem_exec = try_make_memory_data_source(
182+
&vec![vec![batch]],
183+
self.schema(),
184+
self.projection.clone(),
185+
)?;
184186
mem_exec.execute(partition, context)
185187
}
186188
}

rust/cubestore/cubestore/src/queryplanner/query_executor.rs

Lines changed: 7 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -80,6 +80,7 @@ use datafusion::physical_plan::{
8080
};
8181
use datafusion::prelude::{and, SessionConfig, SessionContext};
8282
use datafusion_datasource::memory::MemoryExec;
83+
use datafusion_datasource::memory::MemorySourceConfig;
8384
use datafusion_datasource::source::DataSourceExec;
8485
use futures_util::{stream, StreamExt, TryStreamExt};
8586
use itertools::Itertools;
@@ -98,7 +99,7 @@ use tracing::{instrument, Instrument};
9899

99100
use super::serialized_plan::PreSerializedPlan;
100101
use super::udfs::{registerable_arc_aggregate_udfs, registerable_arc_scalar_udfs};
101-
use super::QueryPlannerImpl;
102+
use super::{try_make_memory_data_source, QueryPlannerImpl};
102103

103104
#[automock]
104105
#[async_trait]
@@ -765,8 +766,8 @@ impl CubeTable {
765766
)));
766767
}
767768
}
768-
Arc::new(
769-
MemoryExec::try_new(
769+
Arc::new(DataSourceExec::new(Arc::new(
770+
MemorySourceConfig::try_new(
770771
&[record_batches.clone()],
771772
index_schema.clone(),
772773
index_projection_or_none_on_schema_match.clone(),
@@ -777,7 +778,7 @@ impl CubeTable {
777778
&index_projection_schema,
778779
)?),
779780
])?,
780-
)
781+
)))
781782
} else {
782783
let remote_path = chunk.get_row().get_full_name(chunk.get_id());
783784
let local_path = self
@@ -1810,11 +1811,11 @@ impl TableProvider for InlineTableProvider {
18101811
// TODO batch_size
18111812
let batches = dataframe_to_batches(self.data.as_ref(), 16384)?;
18121813
let projection = projection.cloned();
1813-
Ok(Arc::new(MemoryExec::try_new(
1814+
Ok(try_make_memory_data_source(
18141815
&vec![batches],
18151816
schema.clone(),
18161817
projection,
1817-
)?))
1818+
)?)
18181819
}
18191820

18201821
fn table_type(&self) -> TableType {

rust/cubestore/cubestore/src/queryplanner/tail_limit.rs

Lines changed: 13 additions & 23 deletions
Original file line numberDiff line numberDiff line change
@@ -204,11 +204,12 @@ impl RecordBatchStream for TailLimitStream {
204204

205205
#[cfg(test)]
206206
mod tests {
207+
use crate::queryplanner::try_make_memory_data_source;
208+
207209
use super::*;
208210
use datafusion::arrow::array::Int64Array;
209211
use datafusion::arrow::datatypes::{DataType, Field, Schema};
210212
use datafusion::physical_plan::collect as result_collect;
211-
use datafusion_datasource::memory::MemoryExec;
212213
use itertools::Itertools;
213214

214215
fn ints_schema() -> SchemaRef {
@@ -237,8 +238,7 @@ mod tests {
237238
let input = vec![ints(vec![1, 2, 3, 4])];
238239

239240
let schema = ints_schema();
240-
let inp =
241-
Arc::new(MemoryExec::try_new(&vec![input.clone()], schema.clone(), None).unwrap());
241+
let inp = try_make_memory_data_source(&vec![input.clone()], schema.clone(), None).unwrap();
242242
let r = result_collect(
243243
Arc::new(TailLimitExec::new(inp, 3)),
244244
Arc::new(TaskContext::default()),
@@ -250,8 +250,7 @@ mod tests {
250250
vec![2, 3, 4],
251251
);
252252

253-
let inp =
254-
Arc::new(MemoryExec::try_new(&vec![input.clone()], schema.clone(), None).unwrap());
253+
let inp = try_make_memory_data_source(&vec![input.clone()], schema.clone(), None).unwrap();
255254
let r = result_collect(
256255
Arc::new(TailLimitExec::new(inp, 4)),
257256
Arc::new(TaskContext::default()),
@@ -263,8 +262,7 @@ mod tests {
263262
vec![1, 2, 3, 4],
264263
);
265264

266-
let inp =
267-
Arc::new(MemoryExec::try_new(&vec![input.clone()], schema.clone(), None).unwrap());
265+
let inp = try_make_memory_data_source(&vec![input.clone()], schema.clone(), None).unwrap();
268266
let r = result_collect(
269267
Arc::new(TailLimitExec::new(inp, 8)),
270268
Arc::new(TaskContext::default()),
@@ -276,8 +274,7 @@ mod tests {
276274
vec![1, 2, 3, 4],
277275
);
278276

279-
let inp =
280-
Arc::new(MemoryExec::try_new(&vec![input.clone()], schema.clone(), None).unwrap());
277+
let inp = try_make_memory_data_source(&vec![input.clone()], schema.clone(), None).unwrap();
281278
let r = result_collect(
282279
Arc::new(TailLimitExec::new(inp, 1)),
283280
Arc::new(TaskContext::default()),
@@ -286,8 +283,7 @@ mod tests {
286283
.unwrap();
287284
assert_eq!(to_ints(r).into_iter().flatten().collect_vec(), vec![4],);
288285

289-
let inp =
290-
Arc::new(MemoryExec::try_new(&vec![input.clone()], schema.clone(), None).unwrap());
286+
let inp = try_make_memory_data_source(&vec![input.clone()], schema.clone(), None).unwrap();
291287
let r = result_collect(
292288
Arc::new(TailLimitExec::new(inp, 0)),
293289
Arc::new(TaskContext::default()),
@@ -308,8 +304,7 @@ mod tests {
308304
];
309305

310306
let schema = ints_schema();
311-
let inp =
312-
Arc::new(MemoryExec::try_new(&vec![input.clone()], schema.clone(), None).unwrap());
307+
let inp = try_make_memory_data_source(&vec![input.clone()], schema.clone(), None).unwrap();
313308
let r = result_collect(
314309
Arc::new(TailLimitExec::new(inp, 2)),
315310
Arc::new(TaskContext::default()),
@@ -318,8 +313,7 @@ mod tests {
318313
.unwrap();
319314
assert_eq!(to_ints(r).into_iter().flatten().collect_vec(), vec![9, 10],);
320315

321-
let inp =
322-
Arc::new(MemoryExec::try_new(&vec![input.clone()], schema.clone(), None).unwrap());
316+
let inp = try_make_memory_data_source(&vec![input.clone()], schema.clone(), None).unwrap();
323317
let r = result_collect(
324318
Arc::new(TailLimitExec::new(inp, 3)),
325319
Arc::new(TaskContext::default()),
@@ -331,8 +325,7 @@ mod tests {
331325
vec![8, 9, 10],
332326
);
333327

334-
let inp =
335-
Arc::new(MemoryExec::try_new(&vec![input.clone()], schema.clone(), None).unwrap());
328+
let inp = try_make_memory_data_source(&vec![input.clone()], schema.clone(), None).unwrap();
336329
let r = result_collect(
337330
Arc::new(TailLimitExec::new(inp, 4)),
338331
Arc::new(TaskContext::default()),
@@ -344,8 +337,7 @@ mod tests {
344337
vec![7, 8, 9, 10],
345338
);
346339

347-
let inp =
348-
Arc::new(MemoryExec::try_new(&vec![input.clone()], schema.clone(), None).unwrap());
340+
let inp = try_make_memory_data_source(&vec![input.clone()], schema.clone(), None).unwrap();
349341
let r = result_collect(
350342
Arc::new(TailLimitExec::new(inp, 5)),
351343
Arc::new(TaskContext::default()),
@@ -357,8 +349,7 @@ mod tests {
357349
vec![6, 7, 8, 9, 10],
358350
);
359351

360-
let inp =
361-
Arc::new(MemoryExec::try_new(&vec![input.clone()], schema.clone(), None).unwrap());
352+
let inp = try_make_memory_data_source(&vec![input.clone()], schema.clone(), None).unwrap();
362353
let r = result_collect(
363354
Arc::new(TailLimitExec::new(inp, 10)),
364355
Arc::new(TaskContext::default()),
@@ -370,8 +361,7 @@ mod tests {
370361
vec![1, 2, 3, 4, 5, 6, 7, 8, 9, 10],
371362
);
372363

373-
let inp =
374-
Arc::new(MemoryExec::try_new(&vec![input.clone()], schema.clone(), None).unwrap());
364+
let inp = try_make_memory_data_source(&vec![input.clone()], schema.clone(), None).unwrap();
375365
let r = result_collect(
376366
Arc::new(TailLimitExec::new(inp, 100)),
377367
Arc::new(TaskContext::default()),

rust/cubestore/cubestore/src/queryplanner/topk/execute.rs

Lines changed: 3 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,6 @@
11
use crate::queryplanner::topk::util::{append_value, create_builder};
22
use crate::queryplanner::topk::SortColumn;
3+
use crate::queryplanner::try_make_memory_data_source;
34
use crate::queryplanner::udfs::read_sketch;
45
use datafusion::arrow::array::{ArrayBuilder, ArrayRef, StringBuilder};
56
use datafusion::arrow::compute::{concat_batches, SortOptions};
@@ -23,7 +24,6 @@ use datafusion::physical_plan::{
2324
PhysicalExpr, PlanProperties, SendableRecordBatchStream,
2425
};
2526
use datafusion::scalar::ScalarValue;
26-
use datafusion_datasource::memory::MemoryExec;
2727
use flatbuffers::bitflags::_core::cmp::Ordering;
2828
use futures::{Stream, StreamExt};
2929
use itertools::Itertools;
@@ -640,11 +640,7 @@ impl TopKState<'_> {
640640
let schema = new_batch.schema();
641641
let filter_exec = Arc::new(FilterExec::try_new(
642642
having.clone(),
643-
Arc::new(MemoryExec::try_new(
644-
&vec![vec![new_batch]],
645-
schema.clone(),
646-
None,
647-
)?),
643+
try_make_memory_data_source(&vec![vec![new_batch]], schema.clone(), None)?,
648644
)?);
649645
let batches_stream =
650646
GlobalLimitExec::new(filter_exec, 0, Some(self.limit - self.result.num_rows()))
@@ -1051,7 +1047,6 @@ mod tests {
10511047
use datafusion::physical_plan::ExecutionPlan;
10521048
use datafusion::physical_planner::create_aggregate_expr_and_maybe_filter;
10531049
use datafusion::prelude::Expr;
1054-
use datafusion_datasource::memory::MemoryExec;
10551050
use futures::StreamExt;
10561051
use itertools::Itertools;
10571052

@@ -1550,7 +1545,7 @@ mod tests {
15501545
inputs: Vec<Vec<RecordBatch>>,
15511546
context: Arc<TaskContext>,
15521547
) -> Result<RecordBatch, DataFusionError> {
1553-
let input = Arc::new(MemoryExec::try_new(&inputs, proto.cluster.schema(), None)?);
1548+
let input = try_make_memory_data_source(&inputs, proto.cluster.schema(), None)?;
15541549
let results = proto
15551550
.with_new_children(vec![input])?
15561551
.execute(0, context)?

0 commit comments

Comments
 (0)