Skip to content

Commit 9b2c212

Browse files
committed
WIP: Cargo fmt topk changes
1 parent 96308be commit 9b2c212

File tree

16 files changed

+496
-227
lines changed

16 files changed

+496
-227
lines changed

rust/cubestore/cubestore/src/cluster/mod.rs

Lines changed: 33 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -230,7 +230,9 @@ impl WorkerPlanningParams {
230230
}
231231
pub fn panic_worker_params() -> WorkerPlanningParams {
232232
// TODO: Maybe avoid needing to call this function.
233-
WorkerPlanningParams { worker_partition_count: 1 }
233+
WorkerPlanningParams {
234+
worker_partition_count: 1,
235+
}
234236
}
235237
}
236238

@@ -344,7 +346,12 @@ impl WorkerProcessing for WorkerProcessor {
344346
let res = services
345347
.query_executor
346348
.clone()
347-
.execute_worker_plan(plan_node_to_send, worker_planning_params, remote_to_local_names, result)
349+
.execute_worker_plan(
350+
plan_node_to_send,
351+
worker_planning_params,
352+
remote_to_local_names,
353+
result,
354+
)
348355
.await;
349356
debug!(
350357
"Running select in worker completed ({:?})",
@@ -495,7 +502,10 @@ impl Cluster for ClusterImpl {
495502
worker_planning_params: WorkerPlanningParams,
496503
) -> Result<Vec<RecordBatch>, CubeError> {
497504
let response = self
498-
.send_or_process_locally(node_name, NetworkMessage::Select(plan_node, worker_planning_params))
505+
.send_or_process_locally(
506+
node_name,
507+
NetworkMessage::Select(plan_node, worker_planning_params),
508+
)
499509
.await?;
500510
match response {
501511
NetworkMessage::SelectResult(r) => {
@@ -512,7 +522,10 @@ impl Cluster for ClusterImpl {
512522
worker_planning_params: WorkerPlanningParams,
513523
) -> Result<String, CubeError> {
514524
let response = self
515-
.send_or_process_locally(node_name, NetworkMessage::ExplainAnalyze(plan, worker_planning_params))
525+
.send_or_process_locally(
526+
node_name,
527+
NetworkMessage::ExplainAnalyze(plan, worker_planning_params),
528+
)
516529
.await?;
517530
match response {
518531
NetworkMessage::ExplainAnalyzeResult(r) => r,
@@ -708,7 +721,9 @@ impl Cluster for ClusterImpl {
708721
NetworkMessage::SelectResult(res)
709722
}
710723
NetworkMessage::ExplainAnalyze(plan, planning_params) => {
711-
let res = self.run_local_explain_analyze_worker(plan, planning_params).await;
724+
let res = self
725+
.run_local_explain_analyze_worker(plan, planning_params)
726+
.await;
712727
NetworkMessage::ExplainAnalyzeResult(res)
713728
}
714729
NetworkMessage::WarmupDownload(remote_path, expected_file_size) => {
@@ -1253,7 +1268,9 @@ impl ClusterImpl {
12531268
table_id: None,
12541269
trace_obj: plan_node.trace_obj(),
12551270
};
1256-
let res = self.run_local_select_worker_impl(plan_node, worker_planning_params).await;
1271+
let res = self
1272+
.run_local_select_worker_impl(plan_node, worker_planning_params)
1273+
.await;
12571274
match res {
12581275
Ok((schema, records, data_loaded_size)) => {
12591276
self.process_rate_limiter
@@ -1406,7 +1423,12 @@ impl ClusterImpl {
14061423

14071424
let res = self
14081425
.query_executor
1409-
.pp_worker_plan(plan_node, worker_planning_params, remote_to_local_names, chunk_id_to_record_batches)
1426+
.pp_worker_plan(
1427+
plan_node,
1428+
worker_planning_params,
1429+
remote_to_local_names,
1430+
chunk_id_to_record_batches,
1431+
)
14101432
.await;
14111433

14121434
res
@@ -1530,7 +1552,10 @@ impl ClusterImpl {
15301552
async fn start_stream_on_worker(self: Arc<Self>, m: NetworkMessage) -> Box<dyn MessageStream> {
15311553
match m {
15321554
NetworkMessage::SelectStart(p, worker_planning_params) => {
1533-
let (schema, results) = match self.run_local_select_worker(p, worker_planning_params).await {
1555+
let (schema, results) = match self
1556+
.run_local_select_worker(p, worker_planning_params)
1557+
.await
1558+
{
15341559
Err(e) => return Box::new(QueryStream::new_error(e)),
15351560
Ok(x) => x,
15361561
};

rust/cubestore/cubestore/src/config/mod.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -2106,7 +2106,7 @@ impl Config {
21062106
i.get_service_typed::<dyn CubestoreMetadataCacheFactory>()
21072107
.await
21082108
.cache_factory()
2109-
.clone()
2109+
.clone(),
21102110
)
21112111
})
21122112
.await;

rust/cubestore/cubestore/src/queryplanner/optimizations/distributed_partial_aggregate.rs

Lines changed: 44 additions & 26 deletions
Original file line numberDiff line numberDiff line change
@@ -29,7 +29,10 @@ pub fn push_aggregate_to_workers(
2929
let p_final_agg: &AggregateExec;
3030
let p_partial: &Arc<dyn ExecutionPlan>;
3131
if let Some(a) = p_final.as_any().downcast_ref::<AggregateExec>() {
32-
if matches!(a.mode(), AggregateMode::Final | AggregateMode::FinalPartitioned) {
32+
if matches!(
33+
a.mode(),
34+
AggregateMode::Final | AggregateMode::FinalPartitioned
35+
) {
3336
p_final_agg = a;
3437
p_partial = a.input();
3538
} else {
@@ -49,33 +52,45 @@ pub fn push_aggregate_to_workers(
4952
return Ok(p_final);
5053
}
5154

52-
let p_final_input: Arc<dyn ExecutionPlan> = if let Some(cs) = agg.input().as_any().downcast_ref::<ClusterSendExec>() {
53-
let clustersend_input = p_partial.clone()
54-
.with_new_children(vec![cs.input_for_optimizations.clone()])?;
55+
let p_final_input: Arc<dyn ExecutionPlan> =
56+
if let Some(cs) = agg.input().as_any().downcast_ref::<ClusterSendExec>() {
57+
let clustersend_input = p_partial
58+
.clone()
59+
.with_new_children(vec![cs.input_for_optimizations.clone()])?;
5560

56-
// Note that required_input_ordering is applicable when p_final_agg has a Sorted input mode.
61+
// Note that required_input_ordering is applicable when p_final_agg has a Sorted input mode.
5762

58-
// Router plan, replace partial aggregate with cluster send.
59-
Arc::new(
60-
cs.with_changed_schema(
61-
clustersend_input,
62-
p_final_agg.required_input_ordering().into_iter().next().unwrap(),
63-
),
64-
)
65-
} else if let Some(w) = agg.input().as_any().downcast_ref::<WorkerExec>() {
66-
let worker_input = p_partial.clone().with_new_children(vec![w.input.clone()])?;
63+
// Router plan, replace partial aggregate with cluster send.
64+
Arc::new(
65+
cs.with_changed_schema(
66+
clustersend_input,
67+
p_final_agg
68+
.required_input_ordering()
69+
.into_iter()
70+
.next()
71+
.unwrap(),
72+
),
73+
)
74+
} else if let Some(w) = agg.input().as_any().downcast_ref::<WorkerExec>() {
75+
let worker_input = p_partial.clone().with_new_children(vec![w.input.clone()])?;
6776

68-
// Worker plan, execute partial aggregate inside the worker.
69-
Arc::new(WorkerExec::new(
70-
worker_input,
71-
w.max_batch_rows,
72-
w.limit_and_reverse.clone(),
73-
p_final_agg.required_input_ordering().into_iter().next().unwrap(),
74-
WorkerPlanningParams { worker_partition_count: w.properties().output_partitioning().partition_count() },
75-
))
76-
} else {
77-
return Ok(p_final);
78-
};
77+
// Worker plan, execute partial aggregate inside the worker.
78+
Arc::new(WorkerExec::new(
79+
worker_input,
80+
w.max_batch_rows,
81+
w.limit_and_reverse.clone(),
82+
p_final_agg
83+
.required_input_ordering()
84+
.into_iter()
85+
.next()
86+
.unwrap(),
87+
WorkerPlanningParams {
88+
worker_partition_count: w.properties().output_partitioning().partition_count(),
89+
},
90+
))
91+
} else {
92+
return Ok(p_final);
93+
};
7994

8095
// We change AggregateMode::FinalPartitioned to AggregateMode::Final, because the ClusterSend
8196
// node ends up creating an incompatible partitioning for FinalPartitioned. Some other ideas,
@@ -157,7 +172,10 @@ pub fn ensure_partition_merge_with_acceptable_parent(
157172
let mut new_children = Vec::new();
158173

159174
for p in parent.children() {
160-
new_children.push(ensure_partition_merge_helper(p.clone(), &mut any_new_children)?);
175+
new_children.push(ensure_partition_merge_helper(
176+
p.clone(),
177+
&mut any_new_children,
178+
)?);
161179
}
162180
if any_new_children {
163181
parent.with_new_children(new_children)

rust/cubestore/cubestore/src/queryplanner/panic.rs

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -161,13 +161,14 @@ pub fn plan_panic_worker() -> Result<Arc<dyn ExecutionPlan>, DataFusionError> {
161161
/* max_batch_rows */ 1,
162162
/* limit_and_reverse */ None,
163163
/* required_input_ordering */ None,
164-
165164
// worker_partition_count is generally set to 1 for panic worker messages
166165
// (SystemCommand::PanicWorker). What is important is that router and worker nodes have the
167166
// same plan properties so that DF optimizations run identically -- router node is creating
168167
// a WorkerExec for some reason. (Also, it's important that DF optimizations run identically
169168
// when it comes to aggregates pushed down through ClusterSend and the like -- it's actually
170169
// NOT important for panic worker planning.)
171-
WorkerPlanningParams { worker_partition_count: 1 },
170+
WorkerPlanningParams {
171+
worker_partition_count: 1,
172+
},
172173
)))
173174
}

rust/cubestore/cubestore/src/queryplanner/planning.rs

Lines changed: 29 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -51,11 +51,11 @@ use crate::queryplanner::rolling::RollingWindowAggregateSerialized;
5151
use crate::queryplanner::serialized_plan::{
5252
IndexSnapshot, InlineSnapshot, PartitionSnapshot, SerializedPlan,
5353
};
54+
use crate::queryplanner::topk::plan_topk;
5455
use crate::queryplanner::topk::ClusterAggregateTopK;
5556
use crate::queryplanner::{CubeTableLogical, InfoSchemaTableProvider};
5657
use crate::table::{cmp_same_types, Row};
5758
use crate::CubeError;
58-
use crate::queryplanner::topk::plan_topk;
5959
use datafusion::common;
6060
use datafusion::common::tree_node::{TreeNode, TreeNodeRecursion, TreeNodeVisitor};
6161
use datafusion::common::DFSchemaRef;
@@ -64,7 +64,9 @@ use datafusion::execution::{SessionState, TaskContext};
6464
use datafusion::logical_expr::expr::Alias;
6565
use datafusion::logical_expr::utils::expr_to_columns;
6666
use datafusion::logical_expr::{
67-
expr, logical_plan, Aggregate, BinaryExpr, Expr, Extension, Filter, Join, Limit, LogicalPlan, Operator, Projection, Sort, SortExpr, SubqueryAlias, TableScan, Union, Unnest, UserDefinedLogicalNode
67+
expr, logical_plan, Aggregate, BinaryExpr, Expr, Extension, Filter, Join, Limit, LogicalPlan,
68+
Operator, Projection, Sort, SortExpr, SubqueryAlias, TableScan, Union, Unnest,
69+
UserDefinedLogicalNode,
6870
};
6971
use datafusion::physical_expr::{Distribution, LexRequirement};
7072
use datafusion::physical_plan::repartition::RepartitionExec;
@@ -1711,7 +1713,10 @@ impl CubeExtensionPlanner {
17111713
Ok(Arc::new(ClusterSendExec::new(
17121714
c.clone(),
17131715
if let Some(logical_plan_to_send) = logical_plan_to_send {
1714-
Arc::new(self.serialized_plan.replace_logical_plan(logical_plan_to_send.clone())?)
1716+
Arc::new(
1717+
self.serialized_plan
1718+
.replace_logical_plan(logical_plan_to_send.clone())?,
1719+
)
17151720
} else {
17161721
self.serialized_plan.clone()
17171722
},
@@ -1752,7 +1757,13 @@ impl WorkerExec {
17521757
required_input_ordering: Option<LexRequirement>,
17531758
worker_planning_params: WorkerPlanningParams,
17541759
) -> WorkerExec {
1755-
let properties = input.properties().clone().with_partitioning(Partitioning::UnknownPartitioning(worker_planning_params.worker_partition_count));
1760+
let properties =
1761+
input
1762+
.properties()
1763+
.clone()
1764+
.with_partitioning(Partitioning::UnknownPartitioning(
1765+
worker_planning_params.worker_partition_count,
1766+
));
17561767
WorkerExec {
17571768
input,
17581769
max_batch_rows,
@@ -2643,19 +2654,27 @@ pub mod tests {
26432654
}
26442655

26452656
fn udf_names(&self) -> Vec<String> {
2646-
self.session_state.scalar_functions().keys().cloned().collect()
2657+
self.session_state
2658+
.scalar_functions()
2659+
.keys()
2660+
.cloned()
2661+
.collect()
26472662
}
26482663

26492664
fn udaf_names(&self) -> Vec<String> {
2650-
self.session_state.aggregate_functions().keys().cloned().collect()
2665+
self.session_state
2666+
.aggregate_functions()
2667+
.keys()
2668+
.cloned()
2669+
.collect()
26512670
}
26522671

26532672
fn udwf_names(&self) -> Vec<String> {
26542673
self.session_state
2655-
.window_functions()
2656-
.keys()
2657-
.cloned()
2658-
.collect()
2674+
.window_functions()
2675+
.keys()
2676+
.cloned()
2677+
.collect()
26592678
}
26602679
}
26612680

rust/cubestore/cubestore/src/queryplanner/pretty_printers.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -31,8 +31,8 @@ use crate::queryplanner::query_executor::{
3131
use crate::queryplanner::rolling::RollingWindowAggregate;
3232
use crate::queryplanner::serialized_plan::{IndexSnapshot, RowRange};
3333
use crate::queryplanner::tail_limit::TailLimitExec;
34-
use crate::queryplanner::topk::{AggregateTopKExec, ClusterAggregateTopK};
3534
use crate::queryplanner::topk::SortColumn;
35+
use crate::queryplanner::topk::{AggregateTopKExec, ClusterAggregateTopK};
3636
use crate::queryplanner::trace_data_loaded::TraceDataLoadedExec;
3737
use crate::queryplanner::{CubeTableLogical, InfoSchemaTableProvider};
3838
use crate::streaming::topic_table_provider::TopicTableProvider;

rust/cubestore/cubestore/src/queryplanner/query_executor.rs

Lines changed: 27 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,6 @@
1-
use crate::cluster::{pick_worker_by_ids, pick_worker_by_partitions, Cluster, WorkerPlanningParams};
1+
use crate::cluster::{
2+
pick_worker_by_ids, pick_worker_by_partitions, Cluster, WorkerPlanningParams,
3+
};
24
use crate::config::injection::DIService;
35
use crate::config::ConfigObj;
46
use crate::metastore::multi_index::MultiPartition;
@@ -75,7 +77,8 @@ use datafusion::physical_plan::sorts::sort::SortExec;
7577
use datafusion::physical_plan::sorts::sort_preserving_merge::SortPreservingMergeExec;
7678
use datafusion::physical_plan::stream::RecordBatchStreamAdapter;
7779
use datafusion::physical_plan::{
78-
collect, DisplayAs, DisplayFormatType, ExecutionMode, ExecutionPlan, ExecutionPlanProperties, Partitioning, PhysicalExpr, PlanProperties, SendableRecordBatchStream
80+
collect, DisplayAs, DisplayFormatType, ExecutionMode, ExecutionPlan, ExecutionPlanProperties,
81+
Partitioning, PhysicalExpr, PlanProperties, SendableRecordBatchStream,
7982
};
8083
use datafusion::prelude::{and, SessionConfig, SessionContext};
8184
use futures_util::{stream, FutureExt, StreamExt, TryStreamExt};
@@ -310,7 +313,8 @@ impl QueryExecutor for QueryExecutorImpl {
310313
)?;
311314
let pre_serialized_plan = Arc::new(pre_serialized_plan);
312315
let ctx = self.router_context(cluster.clone(), pre_serialized_plan.clone())?;
313-
let router_plan = ctx.clone()
316+
let router_plan = ctx
317+
.clone()
314318
.state()
315319
.create_physical_plan(pre_serialized_plan.logical_plan())
316320
.await?;
@@ -337,7 +341,11 @@ impl QueryExecutor for QueryExecutorImpl {
337341
self.parquet_metadata_cache.cache().clone(),
338342
)?;
339343
let pre_serialized_plan = Arc::new(pre_serialized_plan);
340-
let ctx = self.worker_context(pre_serialized_plan.clone(), worker_planning_params, data_loaded_size)?;
344+
let ctx = self.worker_context(
345+
pre_serialized_plan.clone(),
346+
worker_planning_params,
347+
data_loaded_size,
348+
)?;
341349
let plan_ctx = ctx.clone();
342350
Ok((
343351
plan_ctx
@@ -1524,7 +1532,11 @@ impl ClusterSendExec {
15241532
r
15251533
}
15261534

1527-
pub fn with_changed_schema(&self, input_for_optimizations: Arc<dyn ExecutionPlan>, new_required_input_ordering: Option<LexRequirement>) -> Self {
1535+
pub fn with_changed_schema(
1536+
&self,
1537+
input_for_optimizations: Arc<dyn ExecutionPlan>,
1538+
new_required_input_ordering: Option<LexRequirement>,
1539+
) -> Self {
15281540
ClusterSendExec {
15291541
properties: Self::compute_properties(
15301542
input_for_optimizations.properties(),
@@ -1623,7 +1635,11 @@ impl ExecutionPlan for ClusterSendExec {
16231635
// A future that yields a stream
16241636
let fut = async move {
16251637
cluster
1626-
.run_select_stream(&node_name, plan.to_serialized_plan()?, worker_planning_params)
1638+
.run_select_stream(
1639+
&node_name,
1640+
plan.to_serialized_plan()?,
1641+
worker_planning_params,
1642+
)
16271643
.await
16281644
};
16291645
// Use TryStreamExt::try_flatten to flatten the stream of streams
@@ -1633,7 +1649,11 @@ impl ExecutionPlan for ClusterSendExec {
16331649
} else {
16341650
let record_batches = async move {
16351651
cluster
1636-
.run_select(&node_name, plan.to_serialized_plan()?, worker_planning_params)
1652+
.run_select(
1653+
&node_name,
1654+
plan.to_serialized_plan()?,
1655+
worker_planning_params,
1656+
)
16371657
.await
16381658
};
16391659
let stream = futures::stream::once(record_batches).flat_map(|r| match r {

0 commit comments

Comments
 (0)