Skip to content

Commit 5144594

Browse files
committed
chore(cubestore): Upgrade DF: Fully implement remove_unused_tables
Implements for other LogicalPlan cases, expression subqueries, for the essential TableScan base case, and patches up resulting problems with unions by adding a projection with appropriate table reference aliases.
1 parent 783c645 commit 5144594

File tree

3 files changed

+474
-173
lines changed

3 files changed

+474
-173
lines changed

rust/cubestore/cubestore/src/queryplanner/query_executor.rs

Lines changed: 11 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -45,7 +45,7 @@ use datafusion::error::DataFusionError;
4545
use datafusion::error::Result as DFResult;
4646
use datafusion::execution::runtime_env::RuntimeEnv;
4747
use datafusion::execution::{SessionStateBuilder, TaskContext};
48-
use datafusion::logical_expr::{Expr, LogicalPlan};
48+
use datafusion::logical_expr::{Expr, LogicalPlan, TableSource};
4949
use datafusion::physical_expr;
5050
use datafusion::physical_expr::{
5151
expressions, Distribution, EquivalenceProperties, LexRequirement, PhysicalSortExpr,
@@ -1584,13 +1584,21 @@ impl ExecutionPlan for ClusterSendExec {
15841584
let node_name = node_name.to_string();
15851585
if self.use_streaming {
15861586
// A future that yields a stream
1587-
let fut = async move { cluster.run_select_stream(&node_name, plan.to_serialized_plan()?).await };
1587+
let fut = async move {
1588+
cluster
1589+
.run_select_stream(&node_name, plan.to_serialized_plan()?)
1590+
.await
1591+
};
15881592
// Use TryStreamExt::try_flatten to flatten the stream of streams
15891593
let stream = futures::stream::once(fut).try_flatten();
15901594

15911595
Ok(Box::pin(RecordBatchStreamAdapter::new(schema, stream)))
15921596
} else {
1593-
let record_batches = async move { cluster.run_select(&node_name, plan.to_serialized_plan()?).await };
1597+
let record_batches = async move {
1598+
cluster
1599+
.run_select(&node_name, plan.to_serialized_plan()?)
1600+
.await
1601+
};
15941602
let stream = futures::stream::once(record_batches).flat_map(|r| match r {
15951603
Ok(vec) => stream::iter(vec.into_iter().map(|b| Ok(b)).collect::<Vec<_>>()),
15961604
Err(e) => stream::iter(vec![Err(DataFusionError::Execution(e.to_string()))]),

0 commit comments

Comments
 (0)