Skip to content

Commit 8e46f32

Browse files
committed
WIP: WorkerExec metadata parallelling ClusterSendExec metadata work in progress
1 parent f45b277 commit 8e46f32

File tree

4 files changed

+41
-15
lines changed

4 files changed

+41
-15
lines changed

rust/cubestore/cubestore/src/queryplanner/optimizations/distributed_partial_aggregate.rs

Lines changed: 8 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -59,13 +59,15 @@ pub fn push_aggregate_to_workers(
5959
} else if let Some(w) = agg.input().as_any().downcast_ref::<WorkerExec>() {
6060
let worker_input = p_partial.clone().with_new_children(vec![w.input.clone()])?;
6161

62+
// TODO upgrade DF: both cs.with_changed_schema and this need required_input_ordering if we have a sorted (inplace) aggregate pair here
63+
6264
// Worker plan, execute partial aggregate inside the worker.
63-
Arc::new(WorkerExec {
64-
input: worker_input,
65-
max_batch_rows: w.max_batch_rows,
66-
limit_and_reverse: w.limit_and_reverse.clone(),
67-
required_input_ordering: None,
68-
})
65+
Arc::new(WorkerExec::new(
66+
worker_input,
67+
w.max_batch_rows,
68+
w.limit_and_reverse.clone(),
69+
/* required_input_ordering */ None,
70+
))
6971
} else {
7072
return Ok(p_final);
7173
};

rust/cubestore/cubestore/src/queryplanner/panic.rs

Lines changed: 6 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -155,10 +155,10 @@ impl ExecutionPlan for PanicWorkerExec {
155155
}
156156

157157
pub fn plan_panic_worker() -> Result<Arc<dyn ExecutionPlan>, DataFusionError> {
158-
Ok(Arc::new(WorkerExec {
159-
input: Arc::new(PanicWorkerExec::new()),
160-
max_batch_rows: 1,
161-
limit_and_reverse: None,
162-
required_input_ordering: None,
163-
}))
158+
Ok(Arc::new(WorkerExec::new(
159+
Arc::new(PanicWorkerExec::new()),
160+
/* max_batch_rows */ 1,
161+
/* limit_and_reverse */ None,
162+
/* required_input_ordering */ None,
163+
)))
164164
}

rust/cubestore/cubestore/src/queryplanner/planning.rs

Lines changed: 26 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1719,12 +1719,13 @@ impl CubeExtensionPlanner {
17191719
required_input_ordering,
17201720
)?))
17211721
} else {
1722-
Ok(Arc::new(WorkerExec {
1722+
Ok(Arc::new(WorkerExec::new(
17231723
input,
17241724
max_batch_rows,
17251725
limit_and_reverse,
17261726
required_input_ordering,
1727-
}))
1727+
1728+
)))
17281729
}
17291730
}
17301731
}
@@ -1737,6 +1738,27 @@ pub struct WorkerExec {
17371738
pub max_batch_rows: usize,
17381739
pub limit_and_reverse: Option<(usize, bool)>,
17391740
pub required_input_ordering: Option<LexRequirement>,
1741+
properties: PlanProperties,
1742+
}
1743+
1744+
impl WorkerExec {
1745+
pub fn new(
1746+
input: Arc<dyn ExecutionPlan>,
1747+
max_batch_rows: usize,
1748+
limit_and_reverse: Option<(usize, bool)>,
1749+
required_input_ordering: Option<LexRequirement>,
1750+
) -> WorkerExec {
1751+
// TODO upgrade DF: Use partitions_num parameter.
1752+
let partitions_num = 1; // TODO upgrade DF: No.
1753+
let properties = input.properties().clone().with_partitioning(Partitioning::UnknownPartitioning(partitions_num));
1754+
WorkerExec {
1755+
input,
1756+
max_batch_rows,
1757+
limit_and_reverse,
1758+
required_input_ordering,
1759+
properties,
1760+
}
1761+
}
17401762
}
17411763

17421764
impl DisplayAs for WorkerExec {
@@ -1766,6 +1788,7 @@ impl ExecutionPlan for WorkerExec {
17661788
max_batch_rows: self.max_batch_rows,
17671789
limit_and_reverse: self.limit_and_reverse.clone(),
17681790
required_input_ordering: self.required_input_ordering.clone(),
1791+
properties: self.properties.clone(),
17691792
}))
17701793
}
17711794

@@ -1782,6 +1805,7 @@ impl ExecutionPlan for WorkerExec {
17821805
}
17831806

17841807
fn properties(&self) -> &PlanProperties {
1808+
// TODO upgrade DF: No.
17851809
self.input.properties()
17861810
}
17871811

rust/cubestore/cubestore/src/queryplanner/query_executor.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1284,7 +1284,7 @@ impl ClusterSendExec {
12841284
})
12851285
}
12861286

1287-
fn compute_properties(
1287+
pub fn compute_properties(
12881288
input_properties: &PlanProperties,
12891289
partitions_num: usize,
12901290
) -> PlanProperties {

0 commit comments

Comments
 (0)