diff --git a/.config/nextest.toml b/.config/nextest.toml index e1a9635e182d..beb447367ebe 100644 --- a/.config/nextest.toml +++ b/.config/nextest.toml @@ -40,3 +40,7 @@ test-group = 'hydro-trybuild-group' [[profile.default.overrides]] filter = 'package(hydro_test_template)' test-group = 'hydro-trybuild-group' + +[[profile.default.overrides]] +filter = 'package(hydro_test_embedded) & kind(example)' +test-group = 'hydro-trybuild-group' diff --git a/.vscode/settings.json b/.vscode/settings.json index 8bf8d84dbe97..6852037226e4 100644 --- a/.vscode/settings.json +++ b/.vscode/settings.json @@ -11,7 +11,6 @@ "rust-analyzer.rustfmt.extraArgs": [ "+nightly" ], - "rust-analyzer.cargo.features": "all", "editor.semanticTokenColorCustomizations": { "enabled": true, "rules": { diff --git a/Cargo.lock b/Cargo.lock index 76d1a39f74f5..6f8a25871bb9 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -2781,6 +2781,18 @@ dependencies = [ "tracing-subscriber", ] +[[package]] +name = "hydro_test_embedded" +version = "0.0.0" +dependencies = [ + "dfir_rs", + "hydro_lang", + "hydro_test", + "prettyplease", + "stageleft", + "tokio", +] + [[package]] name = "hydro_test_template" version = "0.0.0" diff --git a/Cargo.toml b/Cargo.toml index 8df9bf271b9c..d79554e1181e 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -13,6 +13,7 @@ members = [ "hydro_lang", "hydro_std", "hydro_test", + "hydro_test_embedded", "hydro_test_template", "include_mdtests", "lattices_macro", diff --git a/hydro_lang/src/compile/builder.rs b/hydro_lang/src/compile/builder.rs index deee7107c900..33f1b297e1d4 100644 --- a/hydro_lang/src/compile/builder.rs +++ b/hydro_lang/src/compile/builder.rs @@ -231,7 +231,7 @@ impl<'a> FlowBuilder<'a> { self.with_default_optimize().with_remaining_clusters(spec) } - pub fn compile>(self) -> CompiledFlow<'a> { + pub fn compile>(self) -> CompiledFlow<'a> { self.with_default_optimize::().compile() } diff --git a/hydro_lang/src/compile/built.rs b/hydro_lang/src/compile/built.rs index 2313103ef236..a778b5bb1858 100644 --- a/hydro_lang/src/compile/built.rs +++ b/hydro_lang/src/compile/built.rs @@ -316,7 +316,7 @@ impl<'a> BuiltFlow<'a> { self.into_deploy().with_remaining_clusters(spec) } - pub fn compile>(self) -> CompiledFlow<'a> { + pub fn compile>(self) -> CompiledFlow<'a> { self.into_deploy::().compile() } diff --git a/hydro_lang/src/compile/deploy.rs b/hydro_lang/src/compile/deploy.rs index 3a139defaa98..10cd215df3d9 100644 --- a/hydro_lang/src/compile/deploy.rs +++ b/hydro_lang/src/compile/deploy.rs @@ -262,14 +262,17 @@ impl<'a, D: Deploy<'a>> DeployFlow<'a, D> { /// Compiles the flow into DFIR ([`dfir_lang::graph::DfirGraph`]) including networking. /// /// (This does not compile the DFIR itself, instead use [`Self::deploy`] to compile & deploy the DFIR). - pub fn compile(mut self) -> CompiledFlow<'a> { - self.compile_internal() + pub fn compile(mut self) -> CompiledFlow<'a> + where + D: Deploy<'a, InstantiateEnv = ()>, + { + self.compile_internal(&mut ()) } /// Same as [`Self::compile`] but does not invalidate `self`, for internal use. /// /// Empties `self.sidecars` and modifies `self.ir`, leaving `self` in a partial state. - fn compile_internal(&mut self) -> CompiledFlow<'a> { + pub(super) fn compile_internal(&mut self, env: &mut D::InstantiateEnv) -> CompiledFlow<'a> { let mut seen_tees: HashMap<_, _> = HashMap::new(); let mut extra_stmts = SparseSecondaryMap::new(); for leaf in self.ir.iter_mut() { @@ -279,6 +282,7 @@ impl<'a, D: Deploy<'a>> DeployFlow<'a, D> { &self.processes, &self.clusters, &self.externals, + env, ); } @@ -348,7 +352,7 @@ impl<'a, D: Deploy<'a>> DeployFlow<'a, D> { mut extra_stmts, mut sidecars, _phantom, - } = self.compile_internal(); + } = self.compile_internal(env); let mut compiled = dfir; self.cluster_id_stmts(&mut extra_stmts); diff --git a/hydro_lang/src/compile/deploy_provider.rs b/hydro_lang/src/compile/deploy_provider.rs index c2a200c0a66d..27c879668212 100644 --- a/hydro_lang/src/compile/deploy_provider.rs +++ b/hydro_lang/src/compile/deploy_provider.rs @@ -118,6 +118,18 @@ pub trait Deploy<'a> { fn cluster_membership_stream( location_id: &LocationId, ) -> impl QuotedWithContext<'a, Box + Unpin>, ()>; + + /// Registers an embedded input for the given ident and element type. + /// + /// Only meaningful for the embedded deployment backend. The default + /// implementation panics. + fn register_embedded_input( + _env: &mut Self::InstantiateEnv, + _ident: &syn::Ident, + _element_type: &syn::Type, + ) { + panic!("register_embedded_input is only supported by EmbeddedDeploy"); + } } pub trait ProcessSpec<'a, D> diff --git a/hydro_lang/src/compile/embedded.rs b/hydro_lang/src/compile/embedded.rs new file mode 100644 index 000000000000..7dae75240d30 --- /dev/null +++ b/hydro_lang/src/compile/embedded.rs @@ -0,0 +1,419 @@ +//! "Embedded" deployment backend for Hydro. +//! +//! Instead of compiling each location into a standalone binary, this backend generates +//! a Rust source file containing one function per location. Each function returns a +//! `dfir_rs::scheduled::graph::Dfir` that can be manually driven by the caller. +//! +//! This is useful when you want full control over where and how the projected DFIR +//! code runs (e.g. embedding it into an existing application). +//! +//! # Limitations +//! +//! Networking is **not** supported. All `Deploy` networking trait methods will panic +//! if called. Only pure local computations (with data embedded in the Hydro program) +//! are supported. + +use std::future::Future; +use std::io::Error; +use std::pin::Pin; + +use bytes::{Bytes, BytesMut}; +use dfir_lang::diagnostic::Diagnostics; +use dfir_lang::graph::DfirGraph; +use futures::{Sink, Stream}; +use proc_macro2::Span; +use quote::quote; +use serde::Serialize; +use serde::de::DeserializeOwned; +use stageleft::{QuotedWithContext, q}; + +use super::deploy_provider::{ClusterSpec, Deploy, ExternalSpec, Node, ProcessSpec, RegisterPort}; +use crate::compile::builder::ExternalPortId; +use crate::location::dynamic::LocationId; +use crate::location::member_id::TaglessMemberId; +use crate::location::{LocationKey, MembershipEvent, NetworkHint}; + +/// Marker type for the embedded deployment backend. +/// +/// All networking methods panic — this backend only supports pure local computation. +pub enum EmbeddedDeploy {} + +/// A trivial node type for embedded deployment. Stores a user-provided function name. +#[derive(Clone)] +pub struct EmbeddedNode { + /// The function name to use in the generated code for this location. + pub fn_name: String, +} + +impl Node for EmbeddedNode { + type Port = (); + type Meta = (); + type InstantiateEnv = EmbeddedInstantiateEnv; + + fn next_port(&self) -> Self::Port {} + + fn update_meta(&self, _meta: &Self::Meta) {} + + fn instantiate( + &self, + _env: &mut Self::InstantiateEnv, + _meta: &mut Self::Meta, + _graph: DfirGraph, + _extra_stmts: &[syn::Stmt], + _sidecars: &[syn::Expr], + ) { + // No-op: embedded mode doesn't instantiate nodes at deploy time. + } +} + +impl<'a> RegisterPort<'a, EmbeddedDeploy> for EmbeddedNode { + fn register(&self, _external_port_id: ExternalPortId, _port: Self::Port) { + panic!("EmbeddedDeploy does not support external ports"); + } + + #[expect(clippy::manual_async_fn, reason = "false positive, involves lifetimes")] + fn as_bytes_bidi( + &self, + _external_port_id: ExternalPortId, + ) -> impl Future< + Output = super::deploy_provider::DynSourceSink, Bytes, Error>, + > + 'a { + async { panic!("EmbeddedDeploy does not support external ports") } + } + + #[expect(clippy::manual_async_fn, reason = "false positive, involves lifetimes")] + fn as_bincode_bidi( + &self, + _external_port_id: ExternalPortId, + ) -> impl Future> + 'a + where + InT: Serialize + 'static, + OutT: DeserializeOwned + 'static, + { + async { panic!("EmbeddedDeploy does not support external ports") } + } + + #[expect(clippy::manual_async_fn, reason = "false positive, involves lifetimes")] + fn as_bincode_sink( + &self, + _external_port_id: ExternalPortId, + ) -> impl Future>>> + 'a + where + T: Serialize + 'static, + { + async { panic!("EmbeddedDeploy does not support external ports") } + } + + #[expect(clippy::manual_async_fn, reason = "false positive, involves lifetimes")] + fn as_bincode_source( + &self, + _external_port_id: ExternalPortId, + ) -> impl Future>>> + 'a + where + T: DeserializeOwned + 'static, + { + async { panic!("EmbeddedDeploy does not support external ports") } + } +} + +impl> ProcessSpec<'_, EmbeddedDeploy> for S { + fn build(self, _location_key: LocationKey, _name_hint: &str) -> EmbeddedNode { + EmbeddedNode { + fn_name: self.into(), + } + } +} + +impl> ClusterSpec<'_, EmbeddedDeploy> for S { + fn build(self, _location_key: LocationKey, _name_hint: &str) -> EmbeddedNode { + EmbeddedNode { + fn_name: self.into(), + } + } +} + +impl> ExternalSpec<'_, EmbeddedDeploy> for S { + fn build(self, _location_key: LocationKey, _name_hint: &str) -> EmbeddedNode { + EmbeddedNode { + fn_name: self.into(), + } + } +} + +/// Collected embedded input registrations. +/// +/// During `compile_network`, each `EmbeddedInput` IR node registers its ident +/// and element type here. `generate_embedded` then uses this to add parameters +/// to the generated functions. +#[derive(Default)] +pub struct EmbeddedInstantiateEnv { + /// (ident name, element type) pairs collected during compilation. + pub inputs: Vec<(syn::Ident, syn::Type)>, +} + +impl<'a> Deploy<'a> for EmbeddedDeploy { + type Meta = (); + type InstantiateEnv = EmbeddedInstantiateEnv; + + type Process = EmbeddedNode; + type Cluster = EmbeddedNode; + type External = EmbeddedNode; + + fn o2o_sink_source( + _p1: &Self::Process, + _p1_port: &(), + _p2: &Self::Process, + _p2_port: &(), + ) -> (syn::Expr, syn::Expr) { + panic!("EmbeddedDeploy does not support networking (o2o)") + } + + fn o2o_connect( + _p1: &Self::Process, + _p1_port: &(), + _p2: &Self::Process, + _p2_port: &(), + ) -> Box { + panic!("EmbeddedDeploy does not support networking (o2o)") + } + + fn o2m_sink_source( + _p1: &Self::Process, + _p1_port: &(), + _c2: &Self::Cluster, + _c2_port: &(), + ) -> (syn::Expr, syn::Expr) { + panic!("EmbeddedDeploy does not support networking (o2m)") + } + + fn o2m_connect( + _p1: &Self::Process, + _p1_port: &(), + _c2: &Self::Cluster, + _c2_port: &(), + ) -> Box { + panic!("EmbeddedDeploy does not support networking (o2m)") + } + + fn m2o_sink_source( + _c1: &Self::Cluster, + _c1_port: &(), + _p2: &Self::Process, + _p2_port: &(), + ) -> (syn::Expr, syn::Expr) { + panic!("EmbeddedDeploy does not support networking (m2o)") + } + + fn m2o_connect( + _c1: &Self::Cluster, + _c1_port: &(), + _p2: &Self::Process, + _p2_port: &(), + ) -> Box { + panic!("EmbeddedDeploy does not support networking (m2o)") + } + + fn m2m_sink_source( + _c1: &Self::Cluster, + _c1_port: &(), + _c2: &Self::Cluster, + _c2_port: &(), + ) -> (syn::Expr, syn::Expr) { + panic!("EmbeddedDeploy does not support networking (m2m)") + } + + fn m2m_connect( + _c1: &Self::Cluster, + _c1_port: &(), + _c2: &Self::Cluster, + _c2_port: &(), + ) -> Box { + panic!("EmbeddedDeploy does not support networking (m2m)") + } + + fn e2o_many_source( + _extra_stmts: &mut Vec, + _p2: &Self::Process, + _p2_port: &(), + _codec_type: &syn::Type, + _shared_handle: String, + ) -> syn::Expr { + panic!("EmbeddedDeploy does not support networking (e2o)") + } + + fn e2o_many_sink(_shared_handle: String) -> syn::Expr { + panic!("EmbeddedDeploy does not support networking (e2o)") + } + + fn e2o_source( + _extra_stmts: &mut Vec, + _p1: &Self::External, + _p1_port: &(), + _p2: &Self::Process, + _p2_port: &(), + _codec_type: &syn::Type, + _shared_handle: String, + ) -> syn::Expr { + panic!("EmbeddedDeploy does not support networking (e2o)") + } + + fn e2o_connect( + _p1: &Self::External, + _p1_port: &(), + _p2: &Self::Process, + _p2_port: &(), + _many: bool, + _server_hint: NetworkHint, + ) -> Box { + panic!("EmbeddedDeploy does not support networking (e2o)") + } + + fn o2e_sink( + _p1: &Self::Process, + _p1_port: &(), + _p2: &Self::External, + _p2_port: &(), + _shared_handle: String, + ) -> syn::Expr { + panic!("EmbeddedDeploy does not support networking (o2e)") + } + + #[expect( + unreachable_code, + reason = "panic before q! which is only for return type" + )] + fn cluster_ids( + _of_cluster: LocationKey, + ) -> impl QuotedWithContext<'a, &'a [TaglessMemberId], ()> + Clone + 'a { + panic!("EmbeddedDeploy does not support cluster IDs"); + q!(unreachable!("EmbeddedDeploy does not support cluster IDs")) + } + + #[expect( + unreachable_code, + reason = "panic before q! which is only for return type" + )] + fn cluster_self_id() -> impl QuotedWithContext<'a, TaglessMemberId, ()> + Clone + 'a { + panic!("EmbeddedDeploy does not support cluster self ID"); + q!(unreachable!( + "EmbeddedDeploy does not support cluster self ID" + )) + } + + #[expect( + unreachable_code, + reason = "panic before q! which is only for return type" + )] + fn cluster_membership_stream( + _location_id: &LocationId, + ) -> impl QuotedWithContext<'a, Box + Unpin>, ()> + { + panic!("EmbeddedDeploy does not support cluster membership streams"); + q!(unreachable!( + "EmbeddedDeploy does not support cluster membership streams" + )) + } + + fn register_embedded_input( + env: &mut Self::InstantiateEnv, + ident: &syn::Ident, + element_type: &syn::Type, + ) { + env.inputs.push((ident.clone(), element_type.clone())); + } +} + +impl super::deploy::DeployFlow<'_, EmbeddedDeploy> { + /// Generates a `syn::File` containing one function per location in the flow. + /// + /// Each generated function has the signature: + /// ```ignore + /// pub fn () -> dfir_rs::scheduled::graph::Dfir<'static> + /// ``` + /// where `fn_name` is the `String` passed to `with_process` / `with_cluster`. + /// + /// The returned `Dfir` can be manually executed by the caller. + /// + /// # Arguments + /// + /// * `crate_name` — the name of the crate containing the Hydro program (used for stageleft + /// re-exports). Hyphens will be replaced with underscores. + /// + /// # Usage + /// + /// Typically called from a `build.rs` in a wrapper crate: + /// ```ignore + /// // build.rs + /// let deploy = flow.with_process(&process, "my_fn".to_string()); + /// let code = deploy.generate_embedded("my_hydro_crate"); + /// let out_dir = std::env::var("OUT_DIR").unwrap(); + /// std::fs::write(format!("{out_dir}/embedded.rs"), prettyplease::unparse(&code)).unwrap(); + /// ``` + /// + /// Then in `lib.rs`: + /// ```ignore + /// include!(concat!(env!("OUT_DIR"), "/embedded.rs")); + /// ``` + pub fn generate_embedded(mut self, crate_name: &str) -> syn::File { + let mut env = EmbeddedInstantiateEnv::default(); + let compiled = self.compile_internal(&mut env); + + // Sort inputs by name for deterministic output. + let mut inputs = env.inputs; + inputs.sort_by(|a, b| a.0.to_string().cmp(&b.0.to_string())); + + let root = crate::staging_util::get_this_crate(); + let orig_crate_name = quote::format_ident!("{}", crate_name.replace('-', "_")); + + let mut functions: Vec = Vec::new(); + + // Sort location keys for deterministic output. + let mut location_keys: Vec<_> = compiled.all_dfir().keys().collect(); + location_keys.sort(); + + // Build the input parameters for each generated function. + let input_params: Vec = inputs + .iter() + .map(|(ident, element_type)| { + quote! { #ident: impl __root_dfir_rs::futures::Stream + Unpin + 'a } + }) + .collect(); + + for location_key in location_keys { + let graph = &compiled.all_dfir()[location_key]; + + // Get the user-provided function name from the node. + let fn_name = self + .processes + .get(location_key) + .map(|n| &n.fn_name) + .or_else(|| self.clusters.get(location_key).map(|n| &n.fn_name)) + .or_else(|| self.externals.get(location_key).map(|n| &n.fn_name)) + .expect("location key not found in any node map"); + + let fn_ident = syn::Ident::new(fn_name, Span::call_site()); + + let mut diagnostics = Diagnostics::new(); + let dfir_tokens = graph + .as_code("e! { __root_dfir_rs }, true, quote!(), &mut diagnostics) + .expect("DFIR code generation failed with diagnostics."); + + let func: syn::Item = syn::parse_quote! { + #[allow(unused, non_snake_case, clippy::suspicious_else_formatting)] + pub fn #fn_ident<'a>(#(#input_params),*) -> #root::runtime_support::dfir_rs::scheduled::graph::Dfir<'a> { + #dfir_tokens + } + }; + functions.push(func); + } + + syn::parse_quote! { + use #orig_crate_name::__staged::__deps::*; + use #root::prelude::*; + use #root::runtime_support::dfir_rs as __root_dfir_rs; + pub use #orig_crate_name::__staged; + + #( #functions )* + } + } +} diff --git a/hydro_lang/src/compile/ir/mod.rs b/hydro_lang/src/compile/ir/mod.rs index aca7356f6ca8..6efe6d5bad4f 100644 --- a/hydro_lang/src/compile/ir/mod.rs +++ b/hydro_lang/src/compile/ir/mod.rs @@ -685,10 +685,12 @@ impl HydroRoot { processes: &SparseSecondaryMap, clusters: &SparseSecondaryMap, externals: &SparseSecondaryMap, + env: &mut D::InstantiateEnv, ) where D: Deploy<'a>, { let refcell_extra_stmts = RefCell::new(extra_stmts); + let refcell_env = RefCell::new(env); self.transform_bottom_up( &mut |l| { if let HydroRoot::SendExternal { @@ -887,6 +889,16 @@ impl HydroRoot { connect_fn: Some(connect_fn), } .into(); + } else if let HydroNode::EmbeddedInput { ident, metadata } = n { + let element_type = match &metadata.collection_kind { + CollectionKind::Stream { element_type, .. } => element_type.0.as_ref().clone(), + _ => panic!("EmbeddedInput must have Stream collection kind"), + }; + D::register_embedded_input( + &mut refcell_env.borrow_mut(), + ident, + &element_type, + ); } }, seen_tees, @@ -1721,6 +1733,15 @@ pub enum HydroNode { input: Box, metadata: HydroIrMetadata, }, + + /// An external input for embedded deployment mode. + /// + /// This node compiles to `source_stream(ident)` where `ident` is a parameter + /// added to the generated function signature. + EmbeddedInput { + ident: syn::Ident, + metadata: HydroIrMetadata, + }, } pub type SeenTees = HashMap<*const RefCell, Rc>>; @@ -1776,7 +1797,8 @@ impl HydroNode { HydroNode::Source { .. } | HydroNode::SingletonSource { .. } | HydroNode::CycleSource { .. } - | HydroNode::ExternalInput { .. } => {} + | HydroNode::ExternalInput { .. } + | HydroNode::EmbeddedInput { .. } => {} HydroNode::Tee { inner, .. } => { if let Some(transformed) = seen_tees.get(&inner.as_ptr()) { @@ -2120,6 +2142,10 @@ impl HydroNode { input: Box::new(input.deep_clone(seen_tees)), metadata: metadata.clone(), }, + HydroNode::EmbeddedInput { ident, metadata } => HydroNode::EmbeddedInput { + ident: ident.clone(), + metadata: metadata.clone(), + }, } } @@ -3430,6 +3456,33 @@ impl HydroNode { ident_stack.push(counter_ident); } + + HydroNode::EmbeddedInput { ident, .. } => { + let source_ident = + syn::Ident::new(&format!("stream_{}", *next_stmt_id), Span::call_site()); + + let ident = ident.clone(); + + match builders_or_callback { + BuildersOrCallback::Builders(graph_builders) => { + let builder = graph_builders.get_dfir_mut(&out_location); + builder.add_dfir( + parse_quote! { + #source_ident = source_stream(#ident); + }, + None, + Some(&next_stmt_id.to_string()), + ); + } + BuildersOrCallback::Callback(_, node_callback) => { + node_callback(node, next_stmt_id); + } + } + + *next_stmt_id += 1; + + ident_stack.push(source_ident); + } } }, seen_tees, @@ -3508,6 +3561,7 @@ impl HydroNode { transform(deserialize_fn); } } + HydroNode::EmbeddedInput { .. } => {} HydroNode::Counter { duration, .. } => { transform(duration); } @@ -3560,6 +3614,7 @@ impl HydroNode { HydroNode::ExternalInput { metadata, .. } => metadata, HydroNode::Network { metadata, .. } => metadata, HydroNode::Counter { metadata, .. } => metadata, + HydroNode::EmbeddedInput { metadata, .. } => metadata, } } @@ -3609,6 +3664,7 @@ impl HydroNode { HydroNode::ExternalInput { metadata, .. } => metadata, HydroNode::Network { metadata, .. } => metadata, HydroNode::Counter { metadata, .. } => metadata, + HydroNode::EmbeddedInput { metadata, .. } => metadata, } } @@ -3621,6 +3677,7 @@ impl HydroNode { | HydroNode::SingletonSource { .. } | HydroNode::ExternalInput { .. } | HydroNode::CycleSource { .. } + | HydroNode::EmbeddedInput { .. } | HydroNode::Tee { .. } => { // Tee should find its input in separate special ways vec![] @@ -3752,6 +3809,7 @@ impl HydroNode { HydroNode::Counter { tag, duration, .. } => { format!("Counter({:?}, {:?})", tag, duration) } + HydroNode::EmbeddedInput { ident, .. } => format!("EmbeddedInput({})", ident), } } } diff --git a/hydro_lang/src/compile/mod.rs b/hydro_lang/src/compile/mod.rs index 155389d0b392..c7bf256ddf0a 100644 --- a/hydro_lang/src/compile/mod.rs +++ b/hydro_lang/src/compile/mod.rs @@ -18,6 +18,10 @@ pub mod compiled; #[expect(missing_docs, reason = "TODO")] pub mod deploy; +#[cfg(feature = "build")] +#[cfg_attr(docsrs, doc(cfg(feature = "build")))] +pub mod embedded; + #[cfg(feature = "build")] #[cfg_attr(docsrs, doc(cfg(feature = "build")))] #[expect(missing_docs, reason = "TODO")] @@ -45,6 +49,6 @@ pub mod trybuild; pub use trybuild::generate::init_test; /// Ident used for the DFIR runtime instance variable name. -#[cfg(feature = "trybuild")] -#[cfg_attr(docsrs, doc(cfg(feature = "trybuild")))] +#[cfg(feature = "build")] +#[cfg_attr(docsrs, doc(cfg(feature = "build")))] pub(crate) const DFIR_IDENT: &str = "flow"; diff --git a/hydro_lang/src/location/mod.rs b/hydro_lang/src/location/mod.rs index de5cfaadb370..9bf1eea59d09 100644 --- a/hydro_lang/src/location/mod.rs +++ b/hydro_lang/src/location/mod.rs @@ -520,6 +520,35 @@ pub trait Location<'a>: dynamic::DynLocation { (SimSender(external.port_id, PhantomData), stream) } + /// Creates an external input stream for embedded deployment mode. + /// + /// The `name` parameter specifies the name of the generated function parameter + /// that will supply data to this stream at runtime. The generated function will + /// accept an `impl Stream + Unpin` argument with this name. + fn embedded_input( + &self, + name: impl Into, + ) -> Stream + where + Self: Sized + NoTick, + { + let ident = syn::Ident::new(&name.into(), Span::call_site()); + + Stream::new( + self.clone(), + HydroNode::EmbeddedInput { + ident, + metadata: self.new_node_metadata(Stream::< + T, + Self, + Unbounded, + TotalOrder, + ExactlyOnce, + >::collection_kind()), + }, + ) + } + /// Establishes a server on this location to receive a bidirectional connection from a single /// client, identified by the given `External` handle. Returns a port handle for the external /// process to connect to, a stream of incoming messages, and a handle to send outgoing diff --git a/hydro_lang/src/sim/flow.rs b/hydro_lang/src/sim/flow.rs index 95742682fdf2..0b369bcd1d82 100644 --- a/hydro_lang/src/sim/flow.rs +++ b/hydro_lang/src/sim/flow.rs @@ -110,6 +110,7 @@ impl<'a> SimFlow<'a> { &self.processes, &self.clusters, &self.externals, + &mut (), ); }); diff --git a/hydro_lang/src/viz/render.rs b/hydro_lang/src/viz/render.rs index efba86280e3f..a7617bbb5dd0 100644 --- a/hydro_lang/src/viz/render.rs +++ b/hydro_lang/src/viz/render.rs @@ -1550,6 +1550,10 @@ impl HydroNode { }, duration, ), + + HydroNode::EmbeddedInput { ident, metadata } => { + build_source_node(structure, metadata, format!("embedded_input({})", ident)) + } } } } diff --git a/hydro_test/Cargo.toml b/hydro_test/Cargo.toml index 3fc511af0a4d..874f6b47e9a8 100644 --- a/hydro_test/Cargo.toml +++ b/hydro_test/Cargo.toml @@ -11,6 +11,7 @@ workspace = true docker = ["hydro_lang/docker_deploy"] ecs = ["hydro_lang/ecs_deploy"] maelstrom = ["hydro_lang/maelstrom"] +stageleft_macro_entrypoint = [] [dependencies] hydro_lang = { path = "../hydro_lang", version = "^0.15.0" } diff --git a/hydro_test/src/local/capitalize.rs b/hydro_test/src/local/capitalize.rs new file mode 100644 index 000000000000..4db077e53b62 --- /dev/null +++ b/hydro_test/src/local/capitalize.rs @@ -0,0 +1,7 @@ +use hydro_lang::prelude::*; + +pub fn capitalize<'a>(input: Stream>) { + input + .map(q!(|s| s.to_uppercase())) + .for_each(q!(|s| println!("{}", s))); +} diff --git a/hydro_test/src/local/mod.rs b/hydro_test/src/local/mod.rs index 7d794e0b728e..6b71b61c378b 100644 --- a/hydro_test/src/local/mod.rs +++ b/hydro_test/src/local/mod.rs @@ -1,3 +1,4 @@ +pub mod capitalize; pub mod chat_app; pub mod count_elems; pub mod futures; diff --git a/hydro_test_embedded/Cargo.toml b/hydro_test_embedded/Cargo.toml new file mode 100644 index 000000000000..a42bdb5ae7a0 --- /dev/null +++ b/hydro_test_embedded/Cargo.toml @@ -0,0 +1,31 @@ +[package] +name = "hydro_test_embedded" +publish = false +version = "0.0.0" +edition = "2024" + +[lints] +workspace = true + +[features] +default = [] +test_embedded = [ + "dep:hydro_lang", + "dep:hydro_test", + "dep:stageleft", + "dep:prettyplease", +] + +[dependencies] +hydro_lang = { path = "../hydro_lang", version = "^0.15.0", features = ["runtime_support"], optional = true } +hydro_test = { path = "../hydro_test", version = "0.0.0", features = ["stageleft_macro_entrypoint"], optional = true } +stageleft = { workspace = true, optional = true } + +[dev-dependencies] +dfir_rs = { path = "../dfir_rs", version = "^0.15.0", default-features = false } +tokio = { version = "1.29.0", features = ["full"] } + +[build-dependencies] +hydro_lang = { path = "../hydro_lang", version = "^0.15.0", features = ["build"], optional = true } +hydro_test = { path = "../hydro_test", version = "0.0.0", optional = true } +prettyplease = { version = "0.2.0", features = ["verbatim"], optional = true } diff --git a/hydro_test_embedded/build.rs b/hydro_test_embedded/build.rs new file mode 100644 index 000000000000..7af1c030c0c1 --- /dev/null +++ b/hydro_test_embedded/build.rs @@ -0,0 +1,23 @@ +fn main() { + #[cfg(feature = "test_embedded")] + generate_embedded(); +} + +#[cfg(feature = "test_embedded")] +fn generate_embedded() { + use hydro_lang::location::Location; + + println!("cargo::rerun-if-changed=build.rs"); + + let mut flow = hydro_lang::compile::builder::FlowBuilder::new(); + let process = flow.process::<()>(); + hydro_test::local::capitalize::capitalize(process.embedded_input("input")); + + let code = flow + .with_process(&process, "capitalize") + .generate_embedded("hydro_test"); + + let out_dir = std::env::var("OUT_DIR").unwrap(); + let out_path = format!("{out_dir}/embedded.rs"); + std::fs::write(&out_path, prettyplease::unparse(&code)).unwrap(); +} diff --git a/hydro_test_embedded/examples/embedded_capitalize.rs b/hydro_test_embedded/examples/embedded_capitalize.rs new file mode 100644 index 000000000000..5e81c86fd906 --- /dev/null +++ b/hydro_test_embedded/examples/embedded_capitalize.rs @@ -0,0 +1,59 @@ +#[cfg(feature = "test_embedded")] +#[tokio::main] +async fn main() { + let input = dfir_rs::futures::stream::iter(vec![ + "hello".to_string(), + "world".to_string(), + "hydro".to_string(), + ]); + let mut flow = hydro_test_embedded::embedded::capitalize(input); + tokio::task::LocalSet::new() + .run_until(flow.run_available()) + .await; +} + +#[cfg(not(feature = "test_embedded"))] +fn main() { + eprintln!("This example requires the `test_embedded` feature."); + std::process::exit(1); +} + +#[cfg(test)] +mod tests { + use std::process::{Command, Stdio}; + + #[test] + fn test_embedded_capitalize() { + let output = Command::new("cargo") + .args([ + "run", + "--frozen", + "-p", + "hydro_test_embedded", + "--example", + "embedded_capitalize", + "--features", + "test_embedded", + ]) + .stdout(Stdio::piped()) + .output() + .expect("failed to spawn cargo run"); + + let stdout = String::from_utf8_lossy(&output.stdout); + + assert!( + output.status.success(), + "example failed with status {}.\nstdout:\n{}", + output.status, + stdout, + ); + + let lines: Vec<&str> = stdout.lines().collect(); + let expected = vec!["HELLO", "WORLD", "HYDRO"]; + assert_eq!( + lines, expected, + "expected capitalized strings, got:\n{}", + stdout + ); + } +} diff --git a/hydro_test_embedded/src/lib.rs b/hydro_test_embedded/src/lib.rs new file mode 100644 index 000000000000..3a8bcc5e858c --- /dev/null +++ b/hydro_test_embedded/src/lib.rs @@ -0,0 +1,10 @@ +#[cfg(feature = "test_embedded")] +#[expect( + clippy::allow_attributes, + clippy::allow_attributes_without_reason, + reason = "generated code" +)] +#[allow(unused_imports, unused_qualifications, missing_docs, non_snake_case)] +pub mod embedded { + include!(concat!(env!("OUT_DIR"), "/embedded.rs")); +}