Skip to content

Commit 1a1441b

Browse files
committed
feat(hydro_lang): support in-memory inputs for embedded Hydro programs
1 parent e21bca2 commit 1a1441b

File tree

14 files changed

+174
-26
lines changed

14 files changed

+174
-26
lines changed

hydro_lang/src/compile/builder.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -231,7 +231,7 @@ impl<'a> FlowBuilder<'a> {
231231
self.with_default_optimize().with_remaining_clusters(spec)
232232
}
233233

234-
pub fn compile<D: Deploy<'a>>(self) -> CompiledFlow<'a> {
234+
pub fn compile<D: Deploy<'a, InstantiateEnv = ()>>(self) -> CompiledFlow<'a> {
235235
self.with_default_optimize::<D>().compile()
236236
}
237237

hydro_lang/src/compile/built.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -316,7 +316,7 @@ impl<'a> BuiltFlow<'a> {
316316
self.into_deploy().with_remaining_clusters(spec)
317317
}
318318

319-
pub fn compile<D: Deploy<'a>>(self) -> CompiledFlow<'a> {
319+
pub fn compile<D: Deploy<'a, InstantiateEnv = ()>>(self) -> CompiledFlow<'a> {
320320
self.into_deploy::<D>().compile()
321321
}
322322

hydro_lang/src/compile/deploy.rs

Lines changed: 8 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -262,14 +262,17 @@ impl<'a, D: Deploy<'a>> DeployFlow<'a, D> {
262262
/// Compiles the flow into DFIR ([`dfir_lang::graph::DfirGraph`]) including networking.
263263
///
264264
/// (This does not compile the DFIR itself, instead use [`Self::deploy`] to compile & deploy the DFIR).
265-
pub fn compile(mut self) -> CompiledFlow<'a> {
266-
self.compile_internal()
265+
pub fn compile(mut self) -> CompiledFlow<'a>
266+
where
267+
D: Deploy<'a, InstantiateEnv = ()>,
268+
{
269+
self.compile_internal(&mut ())
267270
}
268271

269272
/// Same as [`Self::compile`] but does not invalidate `self`, for internal use.
270273
///
271274
/// Empties `self.sidecars` and modifies `self.ir`, leaving `self` in a partial state.
272-
pub(super) fn compile_internal(&mut self) -> CompiledFlow<'a> {
275+
pub(super) fn compile_internal(&mut self, env: &mut D::InstantiateEnv) -> CompiledFlow<'a> {
273276
let mut seen_tees: HashMap<_, _> = HashMap::new();
274277
let mut extra_stmts = SparseSecondaryMap::new();
275278
for leaf in self.ir.iter_mut() {
@@ -279,6 +282,7 @@ impl<'a, D: Deploy<'a>> DeployFlow<'a, D> {
279282
&self.processes,
280283
&self.clusters,
281284
&self.externals,
285+
env,
282286
);
283287
}
284288

@@ -348,7 +352,7 @@ impl<'a, D: Deploy<'a>> DeployFlow<'a, D> {
348352
mut extra_stmts,
349353
mut sidecars,
350354
_phantom,
351-
} = self.compile_internal();
355+
} = self.compile_internal(env);
352356

353357
let mut compiled = dfir;
354358
self.cluster_id_stmts(&mut extra_stmts);

hydro_lang/src/compile/deploy_provider.rs

Lines changed: 12 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -118,6 +118,18 @@ pub trait Deploy<'a> {
118118
fn cluster_membership_stream(
119119
location_id: &LocationId,
120120
) -> impl QuotedWithContext<'a, Box<dyn Stream<Item = (TaglessMemberId, MembershipEvent)> + Unpin>, ()>;
121+
122+
/// Registers an embedded input for the given ident and element type.
123+
///
124+
/// Only meaningful for the embedded deployment backend. The default
125+
/// implementation panics.
126+
fn register_embedded_input(
127+
_env: &mut Self::InstantiateEnv,
128+
_ident: &syn::Ident,
129+
_element_type: &syn::Type,
130+
) {
131+
panic!("register_embedded_input is only supported by EmbeddedDeploy");
132+
}
121133
}
122134

123135
pub trait ProcessSpec<'a, D>

hydro_lang/src/compile/embedded.rs

Lines changed: 36 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -48,7 +48,7 @@ pub struct EmbeddedNode {
4848
impl Node for EmbeddedNode {
4949
type Port = ();
5050
type Meta = ();
51-
type InstantiateEnv = ();
51+
type InstantiateEnv = EmbeddedInstantiateEnv;
5252

5353
fn next_port(&self) -> Self::Port {}
5454

@@ -140,9 +140,20 @@ impl<S: Into<String>> ExternalSpec<'_, EmbeddedDeploy> for S {
140140
}
141141
}
142142

143+
/// Collected embedded input registrations.
144+
///
145+
/// During `compile_network`, each `EmbeddedInput` IR node registers its ident
146+
/// and element type here. `generate_embedded` then uses this to add parameters
147+
/// to the generated functions.
148+
#[derive(Default)]
149+
pub struct EmbeddedInstantiateEnv {
150+
/// (ident name, element type) pairs collected during compilation.
151+
pub inputs: Vec<(syn::Ident, syn::Type)>,
152+
}
153+
143154
impl<'a> Deploy<'a> for EmbeddedDeploy {
144155
type Meta = ();
145-
type InstantiateEnv = ();
156+
type InstantiateEnv = EmbeddedInstantiateEnv;
146157

147158
type Process = EmbeddedNode;
148159
type Cluster = EmbeddedNode;
@@ -302,6 +313,14 @@ impl<'a> Deploy<'a> for EmbeddedDeploy {
302313
"EmbeddedDeploy does not support cluster membership streams"
303314
))
304315
}
316+
317+
fn register_embedded_input(
318+
env: &mut Self::InstantiateEnv,
319+
ident: &syn::Ident,
320+
element_type: &syn::Type,
321+
) {
322+
env.inputs.push((ident.clone(), element_type.clone()));
323+
}
305324
}
306325

307326
impl super::deploy::DeployFlow<'_, EmbeddedDeploy> {
@@ -336,7 +355,12 @@ impl super::deploy::DeployFlow<'_, EmbeddedDeploy> {
336355
/// include!(concat!(env!("OUT_DIR"), "/embedded.rs"));
337356
/// ```
338357
pub fn generate_embedded(mut self, crate_name: &str) -> syn::File {
339-
let compiled = self.compile_internal();
358+
let mut env = EmbeddedInstantiateEnv::default();
359+
let compiled = self.compile_internal(&mut env);
360+
361+
// Sort inputs by name for deterministic output.
362+
let mut inputs = env.inputs;
363+
inputs.sort_by(|a, b| a.0.to_string().cmp(&b.0.to_string()));
340364

341365
let root = crate::staging_util::get_this_crate();
342366
let orig_crate_name = quote::format_ident!("{}", crate_name.replace('-', "_"));
@@ -347,6 +371,14 @@ impl super::deploy::DeployFlow<'_, EmbeddedDeploy> {
347371
let mut location_keys: Vec<_> = compiled.all_dfir().keys().collect();
348372
location_keys.sort();
349373

374+
// Build the input parameters for each generated function.
375+
let input_params: Vec<proc_macro2::TokenStream> = inputs
376+
.iter()
377+
.map(|(ident, element_type)| {
378+
quote! { #ident: impl __root_dfir_rs::futures::Stream<Item = #element_type> + Unpin + 'a }
379+
})
380+
.collect();
381+
350382
for location_key in location_keys {
351383
let graph = &compiled.all_dfir()[location_key];
352384

@@ -368,7 +400,7 @@ impl super::deploy::DeployFlow<'_, EmbeddedDeploy> {
368400

369401
let func: syn::Item = syn::parse_quote! {
370402
#[allow(unused, non_snake_case, clippy::suspicious_else_formatting)]
371-
pub fn #fn_ident<'a>() -> #root::runtime_support::dfir_rs::scheduled::graph::Dfir<'a> {
403+
pub fn #fn_ident<'a>(#(#input_params),*) -> #root::runtime_support::dfir_rs::scheduled::graph::Dfir<'a> {
372404
#dfir_tokens
373405
}
374406
};

hydro_lang/src/compile/ir/mod.rs

Lines changed: 59 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -685,10 +685,12 @@ impl HydroRoot {
685685
processes: &SparseSecondaryMap<LocationKey, D::Process>,
686686
clusters: &SparseSecondaryMap<LocationKey, D::Cluster>,
687687
externals: &SparseSecondaryMap<LocationKey, D::External>,
688+
env: &mut D::InstantiateEnv,
688689
) where
689690
D: Deploy<'a>,
690691
{
691692
let refcell_extra_stmts = RefCell::new(extra_stmts);
693+
let refcell_env = RefCell::new(env);
692694
self.transform_bottom_up(
693695
&mut |l| {
694696
if let HydroRoot::SendExternal {
@@ -887,6 +889,16 @@ impl HydroRoot {
887889
connect_fn: Some(connect_fn),
888890
}
889891
.into();
892+
} else if let HydroNode::EmbeddedInput { ident, metadata } = n {
893+
let element_type = match &metadata.collection_kind {
894+
CollectionKind::Stream { element_type, .. } => element_type.0.as_ref().clone(),
895+
_ => panic!("EmbeddedInput must have Stream collection kind"),
896+
};
897+
D::register_embedded_input(
898+
&mut refcell_env.borrow_mut(),
899+
ident,
900+
&element_type,
901+
);
890902
}
891903
},
892904
seen_tees,
@@ -1721,6 +1733,15 @@ pub enum HydroNode {
17211733
input: Box<HydroNode>,
17221734
metadata: HydroIrMetadata,
17231735
},
1736+
1737+
/// An external input for embedded deployment mode.
1738+
///
1739+
/// This node compiles to `source_stream(ident)` where `ident` is a parameter
1740+
/// added to the generated function signature.
1741+
EmbeddedInput {
1742+
ident: syn::Ident,
1743+
metadata: HydroIrMetadata,
1744+
},
17241745
}
17251746

17261747
pub type SeenTees = HashMap<*const RefCell<HydroNode>, Rc<RefCell<HydroNode>>>;
@@ -1776,7 +1797,8 @@ impl HydroNode {
17761797
HydroNode::Source { .. }
17771798
| HydroNode::SingletonSource { .. }
17781799
| HydroNode::CycleSource { .. }
1779-
| HydroNode::ExternalInput { .. } => {}
1800+
| HydroNode::ExternalInput { .. }
1801+
| HydroNode::EmbeddedInput { .. } => {}
17801802

17811803
HydroNode::Tee { inner, .. } => {
17821804
if let Some(transformed) = seen_tees.get(&inner.as_ptr()) {
@@ -2120,6 +2142,10 @@ impl HydroNode {
21202142
input: Box::new(input.deep_clone(seen_tees)),
21212143
metadata: metadata.clone(),
21222144
},
2145+
HydroNode::EmbeddedInput { ident, metadata } => HydroNode::EmbeddedInput {
2146+
ident: ident.clone(),
2147+
metadata: metadata.clone(),
2148+
},
21232149
}
21242150
}
21252151

@@ -3430,6 +3456,33 @@ impl HydroNode {
34303456

34313457
ident_stack.push(counter_ident);
34323458
}
3459+
3460+
HydroNode::EmbeddedInput { ident, .. } => {
3461+
let source_ident =
3462+
syn::Ident::new(&format!("stream_{}", *next_stmt_id), Span::call_site());
3463+
3464+
let ident = ident.clone();
3465+
3466+
match builders_or_callback {
3467+
BuildersOrCallback::Builders(graph_builders) => {
3468+
let builder = graph_builders.get_dfir_mut(&out_location);
3469+
builder.add_dfir(
3470+
parse_quote! {
3471+
#source_ident = source_stream(#ident);
3472+
},
3473+
None,
3474+
Some(&next_stmt_id.to_string()),
3475+
);
3476+
}
3477+
BuildersOrCallback::Callback(_, node_callback) => {
3478+
node_callback(node, next_stmt_id);
3479+
}
3480+
}
3481+
3482+
*next_stmt_id += 1;
3483+
3484+
ident_stack.push(source_ident);
3485+
}
34333486
}
34343487
},
34353488
seen_tees,
@@ -3508,6 +3561,7 @@ impl HydroNode {
35083561
transform(deserialize_fn);
35093562
}
35103563
}
3564+
HydroNode::EmbeddedInput { .. } => {}
35113565
HydroNode::Counter { duration, .. } => {
35123566
transform(duration);
35133567
}
@@ -3560,6 +3614,7 @@ impl HydroNode {
35603614
HydroNode::ExternalInput { metadata, .. } => metadata,
35613615
HydroNode::Network { metadata, .. } => metadata,
35623616
HydroNode::Counter { metadata, .. } => metadata,
3617+
HydroNode::EmbeddedInput { metadata, .. } => metadata,
35633618
}
35643619
}
35653620

@@ -3609,6 +3664,7 @@ impl HydroNode {
36093664
HydroNode::ExternalInput { metadata, .. } => metadata,
36103665
HydroNode::Network { metadata, .. } => metadata,
36113666
HydroNode::Counter { metadata, .. } => metadata,
3667+
HydroNode::EmbeddedInput { metadata, .. } => metadata,
36123668
}
36133669
}
36143670

@@ -3621,6 +3677,7 @@ impl HydroNode {
36213677
| HydroNode::SingletonSource { .. }
36223678
| HydroNode::ExternalInput { .. }
36233679
| HydroNode::CycleSource { .. }
3680+
| HydroNode::EmbeddedInput { .. }
36243681
| HydroNode::Tee { .. } => {
36253682
// Tee should find its input in separate special ways
36263683
vec![]
@@ -3752,6 +3809,7 @@ impl HydroNode {
37523809
HydroNode::Counter { tag, duration, .. } => {
37533810
format!("Counter({:?}, {:?})", tag, duration)
37543811
}
3812+
HydroNode::EmbeddedInput { ident, .. } => format!("EmbeddedInput({})", ident),
37553813
}
37563814
}
37573815
}

hydro_lang/src/location/mod.rs

Lines changed: 29 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -520,6 +520,35 @@ pub trait Location<'a>: dynamic::DynLocation {
520520
(SimSender(external.port_id, PhantomData), stream)
521521
}
522522

523+
/// Creates an external input stream for embedded deployment mode.
524+
///
525+
/// The `name` parameter specifies the name of the generated function parameter
526+
/// that will supply data to this stream at runtime. The generated function will
527+
/// accept an `impl Stream<Item = T> + Unpin` argument with this name.
528+
fn embedded_input<T>(
529+
&self,
530+
name: impl Into<String>,
531+
) -> Stream<T, Self, Unbounded, TotalOrder, ExactlyOnce>
532+
where
533+
Self: Sized + NoTick,
534+
{
535+
let ident = syn::Ident::new(&name.into(), Span::call_site());
536+
537+
Stream::new(
538+
self.clone(),
539+
HydroNode::EmbeddedInput {
540+
ident,
541+
metadata: self.new_node_metadata(Stream::<
542+
T,
543+
Self,
544+
Unbounded,
545+
TotalOrder,
546+
ExactlyOnce,
547+
>::collection_kind()),
548+
},
549+
)
550+
}
551+
523552
/// Establishes a server on this location to receive a bidirectional connection from a single
524553
/// client, identified by the given `External` handle. Returns a port handle for the external
525554
/// process to connect to, a stream of incoming messages, and a handle to send outgoing

hydro_lang/src/sim/flow.rs

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -110,6 +110,7 @@ impl<'a> SimFlow<'a> {
110110
&self.processes,
111111
&self.clusters,
112112
&self.externals,
113+
&mut (),
113114
);
114115
});
115116

hydro_lang/src/viz/render.rs

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1550,6 +1550,10 @@ impl HydroNode {
15501550
},
15511551
duration,
15521552
),
1553+
1554+
HydroNode::EmbeddedInput { ident, metadata } => {
1555+
build_source_node(structure, metadata, format!("embedded_input({})", ident))
1556+
}
15531557
}
15541558
}
15551559
}

hydro_test/src/local/capitalize.rs

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,7 @@
1+
use hydro_lang::prelude::*;
2+
3+
pub fn capitalize<'a>(input: Stream<String, Process<'a, ()>>) {
4+
input
5+
.map(q!(|s| s.to_uppercase()))
6+
.for_each(q!(|s| println!("{}", s)));
7+
}

0 commit comments

Comments
 (0)