@@ -685,10 +685,12 @@ impl HydroRoot {
685685 processes : & SparseSecondaryMap < LocationKey , D :: Process > ,
686686 clusters : & SparseSecondaryMap < LocationKey , D :: Cluster > ,
687687 externals : & SparseSecondaryMap < LocationKey , D :: External > ,
688+ env : & mut D :: InstantiateEnv ,
688689 ) where
689690 D : Deploy < ' a > ,
690691 {
691692 let refcell_extra_stmts = RefCell :: new ( extra_stmts) ;
693+ let refcell_env = RefCell :: new ( env) ;
692694 self . transform_bottom_up (
693695 & mut |l| {
694696 if let HydroRoot :: SendExternal {
@@ -887,6 +889,16 @@ impl HydroRoot {
887889 connect_fn : Some ( connect_fn) ,
888890 }
889891 . into ( ) ;
892+ } else if let HydroNode :: EmbeddedInput { ident, metadata } = n {
893+ let element_type = match & metadata. collection_kind {
894+ CollectionKind :: Stream { element_type, .. } => element_type. 0 . as_ref ( ) . clone ( ) ,
895+ _ => panic ! ( "EmbeddedInput must have Stream collection kind" ) ,
896+ } ;
897+ D :: register_embedded_input (
898+ & mut refcell_env. borrow_mut ( ) ,
899+ ident,
900+ & element_type,
901+ ) ;
890902 }
891903 } ,
892904 seen_tees,
@@ -1721,6 +1733,15 @@ pub enum HydroNode {
17211733 input : Box < HydroNode > ,
17221734 metadata : HydroIrMetadata ,
17231735 } ,
1736+
1737+ /// An external input for embedded deployment mode.
1738+ ///
1739+ /// This node compiles to `source_stream(ident)` where `ident` is a parameter
1740+ /// added to the generated function signature.
1741+ EmbeddedInput {
1742+ ident : syn:: Ident ,
1743+ metadata : HydroIrMetadata ,
1744+ } ,
17241745}
17251746
17261747pub type SeenTees = HashMap < * const RefCell < HydroNode > , Rc < RefCell < HydroNode > > > ;
@@ -1776,7 +1797,8 @@ impl HydroNode {
17761797 HydroNode :: Source { .. }
17771798 | HydroNode :: SingletonSource { .. }
17781799 | HydroNode :: CycleSource { .. }
1779- | HydroNode :: ExternalInput { .. } => { }
1800+ | HydroNode :: ExternalInput { .. }
1801+ | HydroNode :: EmbeddedInput { .. } => { }
17801802
17811803 HydroNode :: Tee { inner, .. } => {
17821804 if let Some ( transformed) = seen_tees. get ( & inner. as_ptr ( ) ) {
@@ -2120,6 +2142,10 @@ impl HydroNode {
21202142 input : Box :: new ( input. deep_clone ( seen_tees) ) ,
21212143 metadata : metadata. clone ( ) ,
21222144 } ,
2145+ HydroNode :: EmbeddedInput { ident, metadata } => HydroNode :: EmbeddedInput {
2146+ ident : ident. clone ( ) ,
2147+ metadata : metadata. clone ( ) ,
2148+ } ,
21232149 }
21242150 }
21252151
@@ -3430,6 +3456,33 @@ impl HydroNode {
34303456
34313457 ident_stack. push ( counter_ident) ;
34323458 }
3459+
3460+ HydroNode :: EmbeddedInput { ident, .. } => {
3461+ let source_ident =
3462+ syn:: Ident :: new ( & format ! ( "stream_{}" , * next_stmt_id) , Span :: call_site ( ) ) ;
3463+
3464+ let ident = ident. clone ( ) ;
3465+
3466+ match builders_or_callback {
3467+ BuildersOrCallback :: Builders ( graph_builders) => {
3468+ let builder = graph_builders. get_dfir_mut ( & out_location) ;
3469+ builder. add_dfir (
3470+ parse_quote ! {
3471+ #source_ident = source_stream( #ident) ;
3472+ } ,
3473+ None ,
3474+ Some ( & next_stmt_id. to_string ( ) ) ,
3475+ ) ;
3476+ }
3477+ BuildersOrCallback :: Callback ( _, node_callback) => {
3478+ node_callback ( node, next_stmt_id) ;
3479+ }
3480+ }
3481+
3482+ * next_stmt_id += 1 ;
3483+
3484+ ident_stack. push ( source_ident) ;
3485+ }
34333486 }
34343487 } ,
34353488 seen_tees,
@@ -3508,6 +3561,7 @@ impl HydroNode {
35083561 transform ( deserialize_fn) ;
35093562 }
35103563 }
3564+ HydroNode :: EmbeddedInput { .. } => { }
35113565 HydroNode :: Counter { duration, .. } => {
35123566 transform ( duration) ;
35133567 }
@@ -3560,6 +3614,7 @@ impl HydroNode {
35603614 HydroNode :: ExternalInput { metadata, .. } => metadata,
35613615 HydroNode :: Network { metadata, .. } => metadata,
35623616 HydroNode :: Counter { metadata, .. } => metadata,
3617+ HydroNode :: EmbeddedInput { metadata, .. } => metadata,
35633618 }
35643619 }
35653620
@@ -3609,6 +3664,7 @@ impl HydroNode {
36093664 HydroNode :: ExternalInput { metadata, .. } => metadata,
36103665 HydroNode :: Network { metadata, .. } => metadata,
36113666 HydroNode :: Counter { metadata, .. } => metadata,
3667+ HydroNode :: EmbeddedInput { metadata, .. } => metadata,
36123668 }
36133669 }
36143670
@@ -3621,6 +3677,7 @@ impl HydroNode {
36213677 | HydroNode :: SingletonSource { .. }
36223678 | HydroNode :: ExternalInput { .. }
36233679 | HydroNode :: CycleSource { .. }
3680+ | HydroNode :: EmbeddedInput { .. }
36243681 | HydroNode :: Tee { .. } => {
36253682 // Tee should find its input in separate special ways
36263683 vec ! [ ]
@@ -3752,6 +3809,7 @@ impl HydroNode {
37523809 HydroNode :: Counter { tag, duration, .. } => {
37533810 format ! ( "Counter({:?}, {:?})" , tag, duration)
37543811 }
3812+ HydroNode :: EmbeddedInput { ident, .. } => format ! ( "EmbeddedInput({})" , ident) ,
37553813 }
37563814 }
37573815}
0 commit comments