Skip to content
Open
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
133 changes: 129 additions & 4 deletions hydro_lang/src/compile/builder.rs
Original file line number Diff line number Diff line change
Expand Up @@ -42,10 +42,96 @@ impl FlowStateInner {
}

pub fn push_root(&mut self, root: HydroRoot) {
self.roots
.as_mut()
.expect("Attempted to add a root to a flow that has already been finalized. No roots can be added after the flow has been compiled.")
.push(root);
if self.roots.is_none() {
panic!("Attempted to add a root to a flow that has already been finalized. No roots can be added after the flow has been compiled.");
}

// Validate candidate root for synchronous forward-ref cycles before appending.
self.validate_candidate_root(&root);

let roots_vec = self.roots.as_mut().unwrap();
roots_vec.push(root);
}

fn validate_candidate_root(&self, root: &HydroRoot) {
use crate::compile::ir::HydroNode;

if let HydroRoot::CycleSink { ident: sink_ident, input, .. } = root {
use std::collections::HashSet;
let target = sink_ident.clone();
let mut seen: HashSet<usize> = HashSet::new();
let mut stack: Vec<*const HydroNode> = vec![&**input as *const HydroNode];

while let Some(ptr) = stack.pop() {
if !seen.insert(ptr as usize) { continue; }
let node: &HydroNode = unsafe { &*ptr };

if matches!(node, HydroNode::DeferTick { .. }) { return; }
if let HydroNode::CycleSource { ident: src_ident, .. } = node {
if src_ident == &target {
panic!("Synchronous cycle detected for forward_ref '{}'. A forward_ref was completed with a collection that depends synchronously on the forward reference. This is not allowed.", target);
}
continue;
}
if let HydroNode::Tee { inner, .. } = node {
stack.push(&*inner.0.borrow() as *const _);
continue;
}

if matches!(node, HydroNode::Source { .. } | HydroNode::Placeholder | HydroNode::ExternalInput { .. }) { continue; }

match node {
HydroNode::Chain { first, second, .. }
| HydroNode::ChainFirst { first, second, .. } => {
stack.push(&**first as *const _);
stack.push(&**second as *const _);
}

HydroNode::CrossSingleton { left, right, .. }
| HydroNode::CrossProduct { left, right, .. }
| HydroNode::Join { left, right, .. }
| HydroNode::Difference { pos: left, neg: right, .. }
| HydroNode::AntiJoin { pos: left, neg: right, .. } => {
stack.push(&**left as *const _);
stack.push(&**right as *const _);
}

HydroNode::ReduceKeyedWatermark { input, watermark, .. } => {
stack.push(&**input as *const _);
stack.push(&**watermark as *const _);
}

HydroNode::Cast { inner, .. }
| HydroNode::ObserveNonDet { inner, .. }
| HydroNode::Persist { inner, .. }
| HydroNode::BeginAtomic { inner, .. }
| HydroNode::EndAtomic { inner, .. }
| HydroNode::Batch { inner, .. }
| HydroNode::YieldConcat { inner, .. }
| HydroNode::ResolveFutures { input: inner, .. }
| HydroNode::ResolveFuturesOrdered { input: inner, .. }
| HydroNode::Map { input: inner, .. }
| HydroNode::FlatMap { input: inner, .. }
| HydroNode::Filter { input: inner, .. }
| HydroNode::FilterMap { input: inner, .. }
| HydroNode::Enumerate { input: inner, .. }
| HydroNode::Inspect { input: inner, .. }
| HydroNode::Unique { input: inner, .. }
| HydroNode::Sort { input: inner, .. }
| HydroNode::Fold { input: inner, .. }
| HydroNode::FoldKeyed { input: inner, .. }
| HydroNode::Scan { input: inner, .. }
| HydroNode::Reduce { input: inner, .. }
| HydroNode::ReduceKeyed { input: inner, .. }
| HydroNode::Network { input: inner, .. }
| HydroNode::Counter { input: inner, .. } => {
stack.push(&**inner as *const _);
}

_ => {}
}
}
}
}
}

Expand Down Expand Up @@ -283,3 +369,42 @@ impl<'a> RewriteIrFlowBuilder<'a> {
self.builder
}
}

#[cfg(test)]
mod tests {
use super::*;
use crate::compile::ir::{HydroNode, HydroRoot};

#[test]
#[should_panic(expected = "Synchronous cycle detected")]
fn rejects_synchronous_cycle_in_forward_ref() {
let mut flow = FlowStateInner {
roots: Some(vec![]),
next_external_out: 0,
cycle_counts: 0,
next_clock_id: 0,
};
let cycle_id = 0;

let source = HydroNode::CycleSource {
ident: cycle_id,
location: None,
};

// Synchronous dependency on the cycle source.
let mapped = HydroNode::Map {
input: Box::new(source),
f: "identity".into(),
};

// Complete the cycle synchronously.
let root = HydroRoot::CycleSink {
ident: cycle_id,
input: Box::new(mapped),
location: None,
};

// Expected failure.
flow.push_root(root);
}
}