Skip to content

Commit 8599593

Browse files
authored
Add plan flows for JOIN. Add 'self managed' graph nodes. (#385)
1 parent 024620a commit 8599593

File tree

3 files changed

+48
-21
lines changed

3 files changed

+48
-21
lines changed

partiql-eval/src/eval/evaluable.rs

Lines changed: 13 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -27,13 +27,22 @@ macro_rules! take_input {
2727
};
2828
}
2929

30+
/// Whether an [`Evaluable`] takes input from the plan graph or manages its own iteration.
31+
pub enum EvalType {
32+
SelfManaged,
33+
GraphManaged,
34+
}
35+
3036
/// `Evaluable` represents each evaluation operator in the evaluation plan as an evaluable entity.
3137
pub trait Evaluable: Debug {
3238
fn evaluate(&mut self, ctx: &dyn EvalContext) -> Value;
3339
fn update_input(&mut self, input: Value, branch_num: u8);
3440
fn get_vars(&self) -> Option<&[String]> {
3541
None
3642
}
43+
fn eval_type(&self) -> EvalType {
44+
EvalType::GraphManaged
45+
}
3746
}
3847

3948
/// Represents an evaluation `Scan` operator; `Scan` operator scans the given bindings from its
@@ -305,6 +314,10 @@ impl Evaluable for EvalJoin {
305314
fn update_input(&mut self, input: Value, _branch_num: u8) {
306315
self.input = Some(input);
307316
}
317+
318+
fn eval_type(&self) -> EvalType {
319+
EvalType::SelfManaged
320+
}
308321
}
309322

310323
/// An SQL aggregation function call that has been rewritten to be evaluated with the `GROUP BY`

partiql-eval/src/eval/mod.rs

Lines changed: 33 additions & 21 deletions
Original file line numberDiff line numberDiff line change
@@ -18,7 +18,7 @@ use petgraph::graph::NodeIndex;
1818
use crate::error::{EvalErr, EvaluationError};
1919
use petgraph::visit::EdgeRef;
2020

21-
use crate::eval::evaluable::Evaluable;
21+
use crate::eval::evaluable::{EvalType, Evaluable};
2222

2323
pub mod evaluable;
2424
pub mod expr;
@@ -77,32 +77,44 @@ impl EvalPlan {
7777

7878
let mut result = None;
7979
for idx in ops.into_iter() {
80-
result = Some(self.get_node(idx)?.evaluate(&*ctx));
81-
82-
// return on first evaluation error
83-
if ctx.has_errors() {
84-
return Err(EvalErr {
85-
errors: ctx.errors(),
86-
});
87-
}
88-
8980
let destinations: Vec<(usize, (u8, NodeIndex))> = self
9081
.plan_graph()
9182
.edges_directed(idx, Outgoing)
9283
.map(|e| (*e.weight(), e.target()))
9384
.enumerate()
9485
.collect_vec();
95-
let branches = destinations.len();
96-
for (i, (branch_num, dst_id)) in destinations {
97-
let res = if i == branches - 1 {
98-
result.take()
99-
} else {
100-
result.clone()
101-
};
102-
103-
let res =
104-
res.ok_or_else(|| err_illegal_state("Error in retrieving source value"))?;
105-
self.get_node(dst_id)?.update_input(res, branch_num);
86+
87+
// Some evaluables (i.e., `JOIN`) manage their own inputs
88+
let graph_managed = destinations.is_empty()
89+
|| destinations.iter().any(|(_, (_, dest_idx))| {
90+
matches!(
91+
self.get_node(*dest_idx).map(|d| d.eval_type()),
92+
Ok(EvalType::GraphManaged)
93+
)
94+
});
95+
if graph_managed {
96+
let src = self.get_node(idx)?;
97+
result = Some(src.evaluate(&*ctx));
98+
99+
// return on first evaluation error
100+
if ctx.has_errors() {
101+
return Err(EvalErr {
102+
errors: ctx.errors(),
103+
});
104+
}
105+
106+
let num_destinations = destinations.len();
107+
for (i, (branch_num, dst_id)) in destinations {
108+
let res = if i == num_destinations - 1 {
109+
result.take()
110+
} else {
111+
result.clone()
112+
};
113+
114+
let res =
115+
res.ok_or_else(|| err_illegal_state("Error in retrieving source value"))?;
116+
self.get_node(dst_id)?.update_input(res, branch_num);
117+
}
106118
}
107119
}
108120

partiql-logical-planner/src/lower.rs

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1345,6 +1345,8 @@ impl<'a, 'ast> Visitor<'ast> for AstToLogical<'a> {
13451345
right,
13461346
});
13471347
let join = self.plan.add_operator(join);
1348+
self.plan.add_flow_with_branch_num(lid, join, 0);
1349+
self.plan.add_flow_with_branch_num(rid, join, 1);
13481350
self.push_bexpr(join);
13491351
Traverse::Continue
13501352
}

0 commit comments

Comments
 (0)