Skip to content

Commit 2852665

Browse files
committed
chore(cubestore): Upgrade DF: Address low-hanging warnings
1 parent f7d20f4 commit 2852665

30 files changed

+67
-827
lines changed

rust/cubestore/cubestore-sql-tests/src/tests.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -8329,7 +8329,7 @@ async fn limit_pushdown_group(service: Box<dyn SqlClient>) {
83298329
.await
83308330
.unwrap();
83318331

8332-
let mut res = assert_limit_pushdown(
8332+
let res = assert_limit_pushdown(
83338333
&service,
83348334
"SELECT id, SUM(n) FROM (
83358335
SELECT * FROM foo.pushdown1

rust/cubestore/cubestore/src/cluster/mod.rs

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -45,7 +45,6 @@ use crate::telemetry::tracing::{TraceIdAndSpanId, TracingHelper};
4545
use crate::CubeError;
4646
use async_trait::async_trait;
4747
use datafusion::arrow::datatypes::SchemaRef;
48-
use datafusion::arrow::error::ArrowError;
4948
use datafusion::arrow::record_batch::RecordBatch;
5049
use datafusion::cube_ext;
5150
use datafusion::error::DataFusionError;

rust/cubestore/cubestore/src/cluster/worker_pool.rs

Lines changed: 0 additions & 19 deletions
Original file line numberDiff line numberDiff line change
@@ -460,15 +460,12 @@ mod tests {
460460
use std::time::Duration;
461461

462462
use async_trait::async_trait;
463-
use datafusion::arrow::datatypes::{DataType, Field, Schema};
464-
use datafusion::dfschema::ToDFSchema;
465463
use futures_timer::Delay;
466464
use serde::{Deserialize, Serialize};
467465
use tokio::runtime::{Builder, Runtime};
468466

469467
use crate::cluster::worker_pool::{worker_main, WorkerPool};
470468
use crate::config::Config;
471-
use crate::queryplanner::serialized_plan::SerializedLogicalPlan;
472469
use crate::util::respawn;
473470
use crate::CubeError;
474471
use datafusion::cube_ext;
@@ -654,22 +651,6 @@ mod tests {
654651
});
655652
}
656653

657-
// TODO upgrade DF
658-
// #[tokio::test]
659-
// async fn serialize_plan() -> Result<(), CubeError> {
660-
// let schema = Schema::new(vec![
661-
// Field::new("c1", DataType::Int64, false),
662-
// Field::new("c2", DataType::Utf8, false),
663-
// ]);
664-
// let plan = SerializedLogicalPlan::EmptyRelation {
665-
// produce_one_row: false,
666-
// schema: schema.to_dfschema_ref()?,
667-
// };
668-
// let bytes = bincode::serialize(&plan)?;
669-
// bincode::deserialize::<SerializedLogicalPlan>(bytes.as_slice())?;
670-
// Ok(())
671-
// }
672-
673654
type TestServicePool = WorkerPool<ServConfigurator, ServProcessor, ServTransport>;
674655

675656
#[derive(Debug)]

rust/cubestore/cubestore/src/queryplanner/check_memory.rs

Lines changed: 2 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -1,12 +1,11 @@
11
use crate::util::memory::MemoryHandler;
22
use async_trait::async_trait;
33
use datafusion::arrow::datatypes::SchemaRef;
4-
use datafusion::arrow::error::Result as ArrowResult;
54
use datafusion::arrow::record_batch::RecordBatch;
65
use datafusion::error::DataFusionError;
76
use datafusion::execution::TaskContext;
87
use datafusion::physical_plan::{
9-
DisplayAs, DisplayFormatType, ExecutionPlan, Partitioning, PlanProperties, RecordBatchStream,
8+
DisplayAs, DisplayFormatType, ExecutionPlan, PlanProperties, RecordBatchStream,
109
SendableRecordBatchStream,
1110
};
1211
use flatbuffers::bitflags::_core::any::Any;
@@ -33,7 +32,7 @@ impl CheckMemoryExec {
3332
}
3433

3534
impl DisplayAs for CheckMemoryExec {
36-
fn fmt_as(&self, t: DisplayFormatType, f: &mut Formatter) -> std::fmt::Result {
35+
fn fmt_as(&self, _t: DisplayFormatType, f: &mut Formatter) -> std::fmt::Result {
3736
write!(f, "CheckMemoryExec")
3837
}
3938
}

rust/cubestore/cubestore/src/queryplanner/filter_by_key_range.rs

Lines changed: 2 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -4,12 +4,11 @@ use crate::table::data::cmp_partition_key;
44
use async_trait::async_trait;
55
use datafusion::arrow::array::ArrayRef;
66
use datafusion::arrow::datatypes::SchemaRef;
7-
use datafusion::arrow::error::ArrowError;
87
use datafusion::arrow::record_batch::RecordBatch;
98
use datafusion::error::DataFusionError;
109
use datafusion::execution::TaskContext;
1110
use datafusion::physical_plan::{
12-
DisplayAs, DisplayFormatType, Distribution, ExecutionPlan, Partitioning, PlanProperties,
11+
DisplayAs, DisplayFormatType, ExecutionPlan, PlanProperties,
1312
SendableRecordBatchStream,
1413
};
1514
use futures::StreamExt;
@@ -45,7 +44,7 @@ impl FilterByKeyRangeExec {
4544
}
4645

4746
impl DisplayAs for FilterByKeyRangeExec {
48-
fn fmt_as(&self, t: DisplayFormatType, f: &mut Formatter) -> std::fmt::Result {
47+
fn fmt_as(&self, _t: DisplayFormatType, f: &mut Formatter) -> std::fmt::Result {
4948
write!(f, "FilterByKeyRangeExec")
5049
}
5150
}

rust/cubestore/cubestore/src/queryplanner/flatten_union.rs

Lines changed: 2 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -1,11 +1,10 @@
11
use datafusion::common::tree_node::Transformed;
22
use datafusion::common::DFSchema;
33
use datafusion::error::DataFusionError;
4-
use datafusion::execution::context::ExecutionProps;
54
use datafusion::logical_expr::{LogicalPlan, Union};
65
use datafusion::optimizer::optimizer::OptimizerRule;
7-
use datafusion::optimizer::{utils, OptimizerConfig};
8-
use std::fmt::{Debug, Formatter};
6+
use datafusion::optimizer::OptimizerConfig;
7+
use std::fmt::Debug;
98
use std::sync::Arc;
109

1110
#[derive(Debug)]

rust/cubestore/cubestore/src/queryplanner/merge_sort.rs

Lines changed: 2 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -1,10 +1,9 @@
11
use async_trait::async_trait;
22
use datafusion::arrow::array::{
3-
build_compare, make_comparator, ArrayRef, BooleanArray, DynComparator, RecordBatch,
3+
make_comparator, ArrayRef, BooleanArray, DynComparator, RecordBatch,
44
};
55
use datafusion::arrow::compute::{filter_record_batch, SortOptions};
66
use datafusion::arrow::datatypes::SchemaRef;
7-
use datafusion::arrow::error::ArrowError;
87
use datafusion::error::DataFusionError;
98
use datafusion::execution::{RecordBatchStream, SendableRecordBatchStream, TaskContext};
109
use datafusion::physical_expr::expressions::Column;
@@ -56,7 +55,7 @@ impl LastRowByUniqueKeyExec {
5655
}
5756

5857
impl DisplayAs for LastRowByUniqueKeyExec {
59-
fn fmt_as(&self, t: DisplayFormatType, f: &mut Formatter) -> std::fmt::Result {
58+
fn fmt_as(&self, _t: DisplayFormatType, f: &mut Formatter) -> std::fmt::Result {
6059
write!(f, "LastRowByUniqueKeyExec")
6160
}
6261
}

rust/cubestore/cubestore/src/queryplanner/mod.rs

Lines changed: 5 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -19,7 +19,7 @@ pub mod trace_data_loaded;
1919
use rewrite_inlist_literals::RewriteInListLiterals;
2020
use serialized_plan::PreSerializedPlan;
2121
pub use topk::MIN_TOPK_STREAM_ROWS;
22-
use udfs::{aggregate_udf_by_kind, registerable_aggregate_udfs, registerable_scalar_udfs};
22+
use udfs::{registerable_aggregate_udfs, registerable_scalar_udfs};
2323
mod filter_by_key_range;
2424
mod flatten_union;
2525
pub mod info_schema;
@@ -38,7 +38,6 @@ use crate::config::ConfigObj;
3838
use crate::metastore::multi_index::MultiPartition;
3939
use crate::metastore::table::{Table, TablePath};
4040
use crate::metastore::{IdRow, MetaStore};
41-
use crate::queryplanner::flatten_union::FlattenUnion;
4241
use crate::queryplanner::info_schema::{
4342
ColumnsInfoSchemaTableDef, RocksDBPropertiesTableDef, SchemataInfoSchemaTableDef,
4443
SystemCacheTableDef, SystemChunksTableDef, SystemIndexesTableDef, SystemJobsTableDef,
@@ -53,13 +52,11 @@ use crate::queryplanner::query_executor::{
5352
batches_to_dataframe, ClusterSendExec, InlineTableProvider,
5453
};
5554
use crate::queryplanner::serialized_plan::SerializedPlan;
56-
use crate::queryplanner::topk::{ClusterAggregateTopKUpper, ClusterAggregateTopKLower};
57-
// use crate::queryplanner::udfs::aggregate_udf_by_kind;
58-
use crate::queryplanner::udfs::{scalar_udf_by_kind, CubeAggregateUDFKind, CubeScalarUDFKind};
55+
use crate::queryplanner::topk::ClusterAggregateTopKLower;
5956

6057
use crate::queryplanner::metadata_cache::MetadataCacheFactory;
6158
use crate::queryplanner::optimizations::rolling_optimizer::RollingOptimizerRule;
62-
use crate::queryplanner::pretty_printers::{pp_plan, pp_plan_ext, PPOptions};
59+
use crate::queryplanner::pretty_printers::{pp_plan_ext, PPOptions};
6360
use crate::sql::cache::SqlResultCache;
6461
use crate::sql::InlineTables;
6562
use crate::store::DataFrame;
@@ -74,17 +71,14 @@ use datafusion::catalog::Session;
7471
use datafusion::common::tree_node::{TreeNode, TreeNodeRecursion, TreeNodeVisitor};
7572
use datafusion::common::{plan_datafusion_err, TableReference};
7673
use datafusion::config::ConfigOptions;
77-
use datafusion::datasource::physical_plan::ParquetFileReaderFactory;
78-
use datafusion::datasource::{provider_as_source, DefaultTableSource, TableType};
74+
use datafusion::datasource::{provider_as_source, TableType};
7975
use datafusion::error::DataFusionError;
8076
use datafusion::execution::{SessionState, TaskContext};
8177
use datafusion::logical_expr::{
8278
AggregateUDF, Expr, Extension, LogicalPlan, ScalarUDF, TableProviderFilterPushDown,
8379
TableSource, WindowUDF,
8480
};
8581
use datafusion::physical_expr::EquivalenceProperties;
86-
// TODO upgrade DF
87-
// use datafusion::physical_plan::memory::MemoryExec;
8882
use datafusion::physical_plan::stream::RecordBatchStreamAdapter;
8983
use datafusion::physical_plan::{
9084
collect, DisplayAs, DisplayFormatType, ExecutionPlan, Partitioning,
@@ -94,8 +88,6 @@ use datafusion::prelude::{SessionConfig, SessionContext};
9488
use datafusion::sql::parser::Statement;
9589
use datafusion::sql::planner::{ContextProvider, SqlToRel};
9690
use datafusion::{cube_ext, datasource::TableProvider};
97-
use futures::TryStreamExt;
98-
use futures_util::TryFutureExt;
9991
use log::{debug, trace};
10092
use mockall::automock;
10193
use serde_derive::{Deserialize, Serialize};
@@ -808,7 +800,7 @@ impl fmt::Debug for InfoSchemaTableExec {
808800
}
809801

810802
impl DisplayAs for InfoSchemaTableExec {
811-
fn fmt_as(&self, t: DisplayFormatType, f: &mut Formatter) -> std::fmt::Result {
803+
fn fmt_as(&self, _t: DisplayFormatType, f: &mut Formatter) -> std::fmt::Result {
812804
write!(f, "InfoSchemaTableExec")
813805
}
814806
}

rust/cubestore/cubestore/src/queryplanner/optimizations/mod.rs

Lines changed: 1 addition & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -12,11 +12,8 @@ use crate::queryplanner::optimizations::distributed_partial_aggregate::{
1212
use std::fmt::{Debug, Formatter};
1313
// use crate::queryplanner::optimizations::prefer_inplace_aggregates::try_switch_to_inplace_aggregates;
1414
use super::serialized_plan::PreSerializedPlan;
15-
use crate::queryplanner::optimizations::prefer_inplace_aggregates::try_regroup_columns;
1615
use crate::queryplanner::planning::CubeExtensionPlanner;
17-
use crate::queryplanner::pretty_printers::{pp_phys_plan, pp_plan};
1816
use crate::queryplanner::rolling::RollingWindowPlanner;
19-
use crate::queryplanner::serialized_plan::SerializedPlan;
2017
use crate::queryplanner::trace_data_loaded::DataLoadedSize;
2118
use crate::util::memory::MemoryHandler;
2219
use async_trait::async_trait;
@@ -129,7 +126,7 @@ impl PhysicalOptimizerRule for PreOptimizeRule {
129126
fn optimize(
130127
&self,
131128
plan: Arc<dyn ExecutionPlan>,
132-
config: &ConfigOptions,
129+
_config: &ConfigOptions,
133130
) -> datafusion::common::Result<Arc<dyn ExecutionPlan>> {
134131
pre_optimize_physical_plan(
135132
plan,

rust/cubestore/cubestore/src/queryplanner/optimizations/prefer_inplace_aggregates.rs

Lines changed: 1 addition & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -1,10 +1,8 @@
11
use crate::queryplanner::planning::WorkerExec;
22
use crate::queryplanner::query_executor::ClusterSendExec;
3-
use datafusion::arrow::compute::SortOptions;
43
use datafusion::error::DataFusionError;
5-
use datafusion::physical_expr::{LexOrdering, PhysicalSortExpr};
4+
use datafusion::physical_expr::LexOrdering;
65
use datafusion::physical_plan::aggregates::AggregateExec;
7-
use datafusion::physical_plan::expressions::Column;
86
use datafusion::physical_plan::filter::FilterExec;
97
use datafusion::physical_plan::projection::ProjectionExec;
108
use datafusion::physical_plan::sorts::sort_preserving_merge::SortPreservingMergeExec;

0 commit comments

Comments
 (0)