Skip to content

Commit 01e8ae1

Browse files
authored
[steel thread] Add query plan and eval using DAG model (#202)
* [steel thread] query plan and eval using DAG model Description of changes Adds the required changes for enabling query evaluation using a DAG model. What is our definition of DAG? We are only interested in DAGs that can be used as execution plans, which leads to the following definition. A DAG is a directed, cycle-free graph G = (V, E) with a denoted root node v0 ∈ V such that all v ∈ V \{v0} are reachable from v0. Note that this is the definition of trees without the condition |E| = |V | − 1. Hence, all trees are DAGs. Reference: https://link.springer.com/article/10.1007/s00450-009-0061-0 In addition - The steel thread includes only `Scan` and `Project` operators, and passes the test for planning and evaluation of the following query: ```SQL SELECT a AS b FROM data ``` - The code is _not_ clean, maybe not idiomatic at some instances, and lacks documentation. I'll iterate through the code with a consequent PR when adding new operators. Algorithm We're loosely using a push model with DAG as called out in the following: https://link.springer.com/article/10.1007/s00450-009-0061-0 (Section 10.3.5. Pushing) 1. Create a DAG logical plan `LogicalPlan` with each node as a logical operator (for now manual until we integrate with AST -> PLAN). 1. Create an evaluation plan `EvalPlan` with the same structure as logical plan. 1. [Toposort](https://en.wikipedia.org/wiki/Topological_sorting) the `EvalPlan` to ensure all dependencies of a node get evaluated before the node itself. 1. For each operator in `EvalPlan`, eval the node. 1. Store the result in the node. 1. Push the output to each consumer—for now, for final result, we use a `Sink` node that is considered as the node that accumulates output; we need to see if there are better alternatives. Finally, we're not performing any optimization at this stage (and we won't for extended amount of time).
1 parent 1cea16b commit 01e8ae1

File tree

9 files changed

+363
-51
lines changed

9 files changed

+363
-51
lines changed

partiql-eval/Cargo.toml

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -22,6 +22,7 @@ edition.workspace = true
2222
[dependencies]
2323
partiql-logical = { path = "../partiql-logical" }
2424
partiql-value = { path = "../partiql-value" }
25+
petgraph = "0.6.*"
2526
ordered-float = "3.*"
2627
itertools = "0.10.*"
2728
unicase = "2.*"

partiql-eval/benches/bench_eval.rs

Lines changed: 4 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -1,3 +1,7 @@
1+
use std::cell::RefCell;
2+
use std::rc::Rc;
3+
use std::time::Duration;
4+
15
use criterion::{black_box, criterion_group, criterion_main, Criterion};
26

37
use partiql_eval::env::basic::MapBindings;
@@ -8,9 +12,6 @@ use partiql_eval::eval::{
812
use partiql_value::{
913
partiql_bag, partiql_list, partiql_tuple, Bag, BindingsName, List, Tuple, Value,
1014
};
11-
use std::cell::RefCell;
12-
use std::rc::Rc;
13-
use std::time::Duration;
1415

1516
fn data() -> MapBindings<Value> {
1617
let hr = partiql_tuple![(

partiql-eval/src/env.rs

Lines changed: 0 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,3 @@
1-
use std::fmt::Debug;
2-
31
use partiql_value::{BindingsName, Tuple, Value};
42
use unicase::UniCase;
53

partiql-eval/src/eval.rs

Lines changed: 203 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -1,12 +1,138 @@
1-
use crate::env::basic::MapBindings;
2-
use crate::env::Bindings;
3-
use partiql_value::Value::{Boolean, Missing, Null};
4-
use partiql_value::{Bag, BindingsName, Tuple, Value};
1+
use itertools::Itertools;
52
use std::cell::RefCell;
63
use std::collections::{HashMap, HashSet};
74
use std::fmt::Debug;
85
use std::rc::Rc;
96

7+
use petgraph::algo::toposort;
8+
use petgraph::prelude::StableGraph;
9+
use petgraph::{Directed, Incoming, Outgoing};
10+
11+
use partiql_value::Value::{Boolean, Missing, Null};
12+
use partiql_value::{partiql_bag, Bag, BindingsName, Tuple, Value};
13+
14+
use crate::env::basic::MapBindings;
15+
use crate::env::Bindings;
16+
17+
#[derive(Debug)]
18+
pub struct EvalPlan(pub StableGraph<Box<dyn DagEvaluable>, (), Directed>);
19+
20+
impl Default for EvalPlan {
21+
fn default() -> Self {
22+
Self::new()
23+
}
24+
}
25+
26+
impl EvalPlan {
27+
fn new() -> Self {
28+
EvalPlan(StableGraph::<Box<dyn DagEvaluable>, (), Directed>::new())
29+
}
30+
}
31+
32+
pub type EvalResult = Result<Evaluated, EvalErr>;
33+
34+
pub struct Evaluated {
35+
pub result: Option<Value>,
36+
}
37+
38+
pub struct EvalErr {
39+
pub errors: Vec<EvaluationError>,
40+
}
41+
42+
pub enum EvaluationError {
43+
InvalidEvaluationPlan(String),
44+
}
45+
46+
#[derive(Debug)]
47+
pub struct Scan {
48+
pub expr: Box<dyn EvalExpr>,
49+
pub as_key: String,
50+
pub output: Option<Value>,
51+
}
52+
53+
impl Scan {
54+
pub fn new(expr: Box<dyn EvalExpr>, as_key: &str) -> Self {
55+
Scan {
56+
expr,
57+
as_key: as_key.to_string(),
58+
output: None,
59+
}
60+
}
61+
}
62+
63+
impl DagEvaluable for Scan {
64+
fn evaluate(&mut self, ctx: &dyn EvalContext) -> Option<Value> {
65+
let mut value = partiql_bag!();
66+
let v = self.expr.evaluate(&Tuple(HashMap::new()), ctx);
67+
for t in v.into_iter() {
68+
let out = Tuple(HashMap::from([(self.as_key.clone(), t)]));
69+
value.push(Value::Tuple(Box::new(out)));
70+
}
71+
Some(Value::Bag(Box::new(value)))
72+
}
73+
fn update_input(&mut self, _input: &Value) {
74+
todo!("update_input for Scan")
75+
}
76+
}
77+
78+
#[derive(Debug)]
79+
pub struct Project {
80+
pub exprs: HashMap<String, Box<dyn EvalExpr>>,
81+
pub input: Option<Value>,
82+
pub output: Option<Value>,
83+
}
84+
85+
impl Project {
86+
pub fn new(exprs: HashMap<String, Box<dyn EvalExpr>>) -> Self {
87+
Project {
88+
exprs,
89+
input: None,
90+
output: None,
91+
}
92+
}
93+
}
94+
95+
impl DagEvaluable for Project {
96+
fn evaluate(&mut self, ctx: &dyn EvalContext) -> Option<Value> {
97+
let input_value = self
98+
.input
99+
.as_ref()
100+
.expect("Error in retrieving input value")
101+
.clone();
102+
let mut value = partiql_bag![];
103+
for v in input_value.into_iter() {
104+
let out = v.coerce_to_tuple();
105+
106+
let proj: HashMap<String, Value> = self
107+
.exprs
108+
.iter()
109+
.map(|(alias, expr)| (alias.to_string(), expr.evaluate(&out, ctx)))
110+
.collect();
111+
value.push(Value::Tuple(Box::new(Tuple(proj))));
112+
}
113+
114+
Some(Value::Bag(Box::new(value)))
115+
}
116+
fn update_input(&mut self, input: &Value) {
117+
self.input = Some(input.clone());
118+
}
119+
}
120+
121+
#[derive(Debug)]
122+
pub struct Sink {
123+
pub input: Option<Value>,
124+
pub output: Option<Value>,
125+
}
126+
127+
impl DagEvaluable for Sink {
128+
fn evaluate(&mut self, _ctx: &dyn EvalContext) -> Option<Value> {
129+
self.input.clone()
130+
}
131+
fn update_input(&mut self, input: &Value) {
132+
self.input = Some(input.clone());
133+
}
134+
}
135+
10136
pub trait EvalContext {
11137
fn bindings(&self) -> &dyn Bindings<Value>;
12138
}
@@ -44,10 +170,76 @@ impl Evaluator {
44170
}
45171
}
46172

173+
pub struct DagEvaluator {
174+
ctx: Box<dyn EvalContext>,
175+
}
176+
177+
impl DagEvaluator {
178+
pub fn new(bindings: MapBindings<Value>) -> Self {
179+
let ctx: Box<dyn EvalContext> = Box::new(BasicContext { bindings });
180+
DagEvaluator { ctx }
181+
}
182+
183+
pub fn execute_dag(&mut self, plan: EvalPlan) -> Result<Evaluated, EvalErr> {
184+
let mut graph = plan.0;
185+
// We are only interested in DAGs that can be used as execution plans, which leads to the
186+
// following definition.
187+
// A DAG is a directed, cycle-free graph G = (V, E) with a denoted root node v0 ∈ V such
188+
// that all v ∈ V \{v0} are reachable from v0. Note that this is the definition of trees
189+
// without the condition |E| = |V | − 1. Hence, all trees are DAGs.
190+
// Reference: https://link.springer.com/article/10.1007/s00450-009-0061-0
191+
match graph.externals(Incoming).exactly_one() {
192+
Ok(_) => {
193+
let sorted_ops = toposort(&graph, None);
194+
match sorted_ops {
195+
Ok(ops) => {
196+
let mut result = None;
197+
for idx in ops.into_iter() {
198+
let src = graph
199+
.node_weight_mut(idx)
200+
.expect("Error in retrieving node");
201+
result = src.evaluate(&*self.ctx);
202+
203+
let mut ne = graph.neighbors_directed(idx, Outgoing).detach();
204+
while let Some(n) = ne.next_node(&graph) {
205+
let dst =
206+
graph.node_weight_mut(n).expect("Error in retrieving node");
207+
dst.update_input(
208+
&result.clone().expect("Error in retrieving source value"),
209+
);
210+
}
211+
}
212+
let evaluated = Evaluated { result };
213+
Ok(evaluated)
214+
}
215+
Err(e) => Err(EvalErr {
216+
errors: vec![EvaluationError::InvalidEvaluationPlan(format!(
217+
"Malformed evaluation plan detected: {:?}",
218+
e
219+
))],
220+
}),
221+
}
222+
}
223+
Err(e) => Err(EvalErr {
224+
errors: vec![EvaluationError::InvalidEvaluationPlan(format!(
225+
"Malformed evaluation plan detected: {:?}",
226+
e
227+
))],
228+
}),
229+
}
230+
}
231+
}
232+
47233
pub trait Evaluable: Debug {
48234
fn evaluate(&mut self, ctx: &dyn EvalContext);
49235
}
50236

237+
// TODO rename to `Evaluable` when moved to DAG model completely
238+
pub trait DagEvaluable: Debug {
239+
fn evaluate(&mut self, ctx: &dyn EvalContext) -> Option<Value>;
240+
fn update_input(&mut self, input: &Value);
241+
}
242+
51243
pub trait TupleSink: Debug {
52244
fn push_tuple(&mut self, bindings: Tuple, ctx: &dyn EvalContext);
53245
}
@@ -222,6 +414,13 @@ impl EvalFrom {
222414
}
223415
}
224416

417+
impl Evaluable for EvalFrom {
418+
fn evaluate(&mut self, ctx: &dyn EvalContext) {
419+
let empty = Tuple(HashMap::new());
420+
self.push_tuple(empty, ctx);
421+
}
422+
}
423+
225424
impl TupleSink for EvalFrom {
226425
#[inline]
227426
fn push_tuple(&mut self, bindings: Tuple, ctx: &dyn EvalContext) {
@@ -239,13 +438,6 @@ impl ValueSink for EvalFrom {
239438
}
240439
}
241440

242-
impl Evaluable for EvalFrom {
243-
fn evaluate(&mut self, ctx: &dyn EvalContext) {
244-
let empty = Tuple(HashMap::new());
245-
self.push_tuple(empty, ctx);
246-
}
247-
}
248-
249441
#[derive(Debug)]
250442
pub struct EvalFromAt {
251443
expr: Box<dyn EvalExpr>,

0 commit comments

Comments
 (0)