Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
158 changes: 107 additions & 51 deletions lib/query-planner/src/planner/query_plan.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,9 @@
use std::collections::{HashMap, VecDeque};
use std::collections::VecDeque;

use petgraph::{graph::NodeIndex, visit::EdgeRef};
use petgraph::{
graph::NodeIndex,
visit::{EdgeRef, NodeIndexable},
};

use crate::{
planner::plan_nodes::ConditionNode, state::supergraph_state::SupergraphState,
Expand All @@ -17,58 +20,110 @@ use super::{
/// The in-degree of a step is the number of its prerequisite parent steps
/// that have not yet been processed. A step is "fulfilled" (ready to be processed)
/// when its in-degree becomes zero.
///
/// Internally, values are stored as `in_degree + 1` to allow using 0 as the
/// "unvisited" sentinel. This enables `vec![0; n]` initialization which uses
/// `calloc` for better memory performance (zero-pages from OS, lazy allocation).
pub struct InDegree<'a> {
state: HashMap<NodeIndex, usize>,
state: Box<[u32]>,
fetch_graph: &'a FetchGraph,
}

impl<'a> InDegree<'a> {
/// Sentinel value for unvisited nodes (stored as 0, meaning in_degree + 1 = 0)
const UNVISITED: u32 = 0;

pub fn new(fetch_graph: &'a FetchGraph) -> Result<Self, QueryPlanError> {
let mut state: HashMap<NodeIndex, usize> = HashMap::new();
let root_index = fetch_graph.root_index.ok_or(QueryPlanError::NoRoot)?;
// Zero-init is optimized by the allocator (calloc), avoiding memset overhead
let mut state: Vec<u32> = vec![0; fetch_graph.graph.node_bound()];
let mut overflow_error: Option<QueryPlanError> = None;

// For all steps, the initial in-degree is set to the total number of their parents
fetch_graph.bfs(root_index, |step_index, _| {
state.insert(
*step_index,
fetch_graph
.parents_of(*step_index)
// skip roots
.filter(|edge| edge.source() != root_index)
.count(),
);
let in_degree_usize = fetch_graph
.parents_of(*step_index)
// skip roots
.filter(|edge| edge.source() != root_index)
.count();

// Store as in_degree + 1 to reserve 0 as the "unvisited" sentinel
let Some(stored_value) = u32::try_from(in_degree_usize)
.ok()
.and_then(|v| v.checked_add(1))
else {
overflow_error = Some(QueryPlanError::Internal(format!(
"In-degree overflow for step {}: {in_degree_usize}",
step_index.index(),
)));
return true; // stop traversal early
};

state[step_index.index()] = stored_value;
false // never stop traversing
});

Ok(Self { state, fetch_graph })
if let Some(err) = overflow_error {
return Err(err);
}

Ok(Self {
state: state.into_boxed_slice(),
fetch_graph,
})
}

/// Marks a `FetchStep` as processed. This involves decrementing the in-degree
/// of all its direct children as one of their parent dependencies
/// has been met.
pub fn mark_as_processed(&mut self, index: NodeIndex) {
pub fn mark_as_processed(&mut self, index: NodeIndex) -> Result<(), QueryPlanError> {
for edge in self.fetch_graph.children_of(index) {
let child_index = edge.target();
let current = self.state.get(&child_index);

if let Some(in_degree) = current {
if *in_degree == 0 {
panic!("In-degree was 0");
}
let entry = self.state.get_mut(child_index.index()).ok_or_else(|| {
QueryPlanError::Internal(format!(
"In-degree vector missing entry for child step {}",
child_index.index()
))
})?;

if *entry == Self::UNVISITED {
return Err(QueryPlanError::Internal(format!(
"Attempt to decrease in-degree of a non-existing step {}",
child_index.index()
)));
}

self.state.insert(child_index, in_degree - 1);
} else {
panic!("Attempt to decrease an in-degree of a non-existing step");
// stored_value = in_degree + 1, so stored_value == 1 means in_degree == 0
if *entry == 1 {
return Err(QueryPlanError::Internal(format!(
"In-degree was 0 for step {}",
child_index.index()
)));
}

*entry -= 1;
}
Ok(())
}

/// Checks if a `FetchStep` is "fulfilled," meaning all its parent dependencies have been met.
pub fn is_fulfilled(&self, child_index: NodeIndex) -> bool {
self.state
.get(&child_index)
.expect("In-degree record missing")
== &0
pub fn is_fulfilled(&self, child_index: NodeIndex) -> Result<bool, QueryPlanError> {
let v = *self.state.get(child_index.index()).ok_or_else(|| {
QueryPlanError::Internal(format!(
"In-degree vector missing entry for step {}",
child_index.index()
))
})?;

if v == Self::UNVISITED {
return Err(QueryPlanError::Internal(format!(
"In-degree record missing for step {}",
child_index.index()
)));
}

// stored_value = in_degree + 1, so fulfilled when stored_value == 1
Ok(v == 1)
}
}

Expand All @@ -88,7 +143,7 @@ pub fn build_query_plan_from_fetch_graph(
// Their in-degrees (calculated in InDegree::new) should be 0.
for edge in fetch_graph.children_of(root_index) {
let child_index = edge.target();
if in_degrees.is_fulfilled(child_index) {
if in_degrees.is_fulfilled(child_index)? {
queue.push_back(child_index);
} else {
return Err(QueryPlanError::Internal(format!(
Expand All @@ -114,7 +169,7 @@ pub fn build_query_plan_from_fetch_graph(
let step_data = fetch_graph.get_step_data(step_index)?;
current_wave_nodes.push(PlanNode::from_fetch_step(step_data, supergraph));
planned_nodes_count += 1;
in_degrees.mark_as_processed(step_index);
in_degrees.mark_as_processed(step_index)?;

for child_edge in fetch_graph.children_of(step_index) {
cancellation_token.bail_if_cancelled()?;
Expand All @@ -125,7 +180,7 @@ pub fn build_query_plan_from_fetch_graph(
)));
}

if in_degrees.is_fulfilled(child_index) {
if in_degrees.is_fulfilled(child_index)? {
queue.push_back(child_index);
}
}
Expand All @@ -137,7 +192,7 @@ pub fn build_query_plan_from_fetch_graph(
)));
} else if current_wave_nodes.len() == 1 {
overall_plan_sequence.push(current_wave_nodes.into_iter().next().ok_or(
QueryPlanError::Internal(String::from("Was was expected to be of length 1")),
QueryPlanError::Internal(String::from("Wave was expected to be of length 1")),
)?);
} else {
overall_plan_sequence.push(PlanNode::Parallel(ParallelNode {
Expand Down Expand Up @@ -167,11 +222,9 @@ pub fn build_query_plan_from_fetch_graph(

let overall_plan_sequence = optimize_plan_sequence(overall_plan_sequence);

let root_node = match overall_plan_sequence.len() == 1 {
true => overall_plan_sequence.into_iter().next().unwrap(),
false => PlanNode::Sequence(SequenceNode {
nodes: overall_plan_sequence,
}),
let root_node = match <[_; 1]>::try_from(overall_plan_sequence) {
Ok([single]) => single,
Err(nodes) => PlanNode::Sequence(SequenceNode { nodes }),
};

Ok(QueryPlan {
Expand Down Expand Up @@ -200,12 +253,7 @@ fn are_conditions_compatible(c1: &ConditionNode, c2: &ConditionNode) -> bool {
false
}

fn merge_two_condition_nodes(a: PlanNode, b: PlanNode) -> PlanNode {
let (mut a, mut b) = match (a, b) {
(PlanNode::Condition(c1), PlanNode::Condition(c2)) => (c1, c2),
_ => panic!("Can only merge two ConditionNodes"),
};

fn merge_two_condition_nodes(mut a: ConditionNode, mut b: ConditionNode) -> PlanNode {
let is_if = a.if_clause.is_some();

let mut inner_nodes: Vec<PlanNode> = a
Expand Down Expand Up @@ -249,15 +297,23 @@ fn merge_two_condition_nodes(a: PlanNode, b: PlanNode) -> PlanNode {

fn optimize_plan_sequence(nodes: Vec<PlanNode>) -> Vec<PlanNode> {
nodes.into_iter().fold(Vec::new(), |mut acc, current_node| {
match (acc.last_mut(), &current_node) {
// Check if the last node and the current node
// have compatible conditions
(Some(PlanNode::Condition(last_cond)), PlanNode::Condition(current_cond))
if are_conditions_compatible(last_cond, current_cond) =>
match (&acc[..], &current_node) {
// Check if the last node and the current node have compatible conditions
([.., PlanNode::Condition(last_ref)], PlanNode::Condition(current_ref))
if are_conditions_compatible(last_ref, current_ref) =>
{
let last_node = acc.pop().unwrap();
let merged_node = merge_two_condition_nodes(last_node, current_node);
acc.push(merged_node);
// Pop the last element - we know it exists and is a Condition from the pattern
let Some(PlanNode::Condition(last_owned)) = acc.pop() else {
unreachable!(
"The slice pattern guarantees the last element is a ConditionNode."
);
};
let PlanNode::Condition(current_owned) = current_node else {
unreachable!(
"The match pattern guarantees the current node is a ConditionNode."
);
};
acc.push(merge_two_condition_nodes(last_owned, current_owned));
}
_ => {
acc.push(current_node);
Expand Down
4 changes: 2 additions & 2 deletions lib/query-planner/src/planner/walker/best_path.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@ pub struct BestPathTracker<'graph> {
graph: &'graph Graph,
/// A map from subgraph name to the best path and its cost.
/// BTreeMap instead of HashMap to keep the order of inserted keys deterministic.
subgraph_to_best_paths: BTreeMap<String, (Vec<OperationPath>, u64)>,
subgraph_to_best_paths: BTreeMap<&'graph str, (Vec<OperationPath>, u64)>,
}

pub fn find_best_paths(paths: Vec<OperationPath>) -> Vec<OperationPath> {
Expand Down Expand Up @@ -45,7 +45,7 @@ impl<'graph> BestPathTracker<'graph> {
.graph_id()
.expect("Graph ID not found in node");

match self.subgraph_to_best_paths.entry(tail_graph_id.to_string()) {
match self.subgraph_to_best_paths.entry(tail_graph_id) {
Entry::Occupied(mut entry) => {
let (existing_paths, existing_cost) = entry.get_mut();

Expand Down
Loading