From e21bca2519a42aec127893e6067038d5299626e1 Mon Sep 17 00:00:00 2001 From: Shadaj Laddad Date: Fri, 6 Feb 2026 15:48:45 -0800 Subject: [PATCH 1/2] feat(hydro_lang): preliminary support for embedding Hydro programs in other Rust code When we got rid of the original macro-entrypoint approach to building Hydro programs (in favor of trybuild + Hydro Deploy), we also lost option to incrementally adopt Hydro in existing Rust codebases by manually instantiating the DFIR projections for each location. This is the first step towards restoring this incremental adoption path by using build scripts as a mechanism to pull generated DFIR projections into a regular Rust crate. For now, we do not support any external inputs or outputs or inter-location networking. In followup PRs, we will start with external in/out support and later support networking operators (which require more design work). --- .config/nextest.toml | 4 + .vscode/settings.json | 1 - Cargo.lock | 12 + Cargo.toml | 1 + hydro_lang/src/compile/deploy.rs | 2 +- hydro_lang/src/compile/embedded.rs | 387 ++++++++++++++++++ hydro_lang/src/compile/mod.rs | 8 +- hydro_test/Cargo.toml | 1 + hydro_test/src/local/first_ten.rs | 7 + hydro_test/src/local/mod.rs | 1 + hydro_test_embedded/Cargo.toml | 31 ++ hydro_test_embedded/build.rs | 21 + .../examples/embedded_first_ten.rs | 54 +++ hydro_test_embedded/src/lib.rs | 10 + 14 files changed, 536 insertions(+), 4 deletions(-) create mode 100644 hydro_lang/src/compile/embedded.rs create mode 100644 hydro_test/src/local/first_ten.rs create mode 100644 hydro_test_embedded/Cargo.toml create mode 100644 hydro_test_embedded/build.rs create mode 100644 hydro_test_embedded/examples/embedded_first_ten.rs create mode 100644 hydro_test_embedded/src/lib.rs diff --git a/.config/nextest.toml b/.config/nextest.toml index e1a9635e182d..beb447367ebe 100644 --- a/.config/nextest.toml +++ b/.config/nextest.toml @@ -40,3 +40,7 @@ test-group = 'hydro-trybuild-group' [[profile.default.overrides]] filter = 'package(hydro_test_template)' test-group = 'hydro-trybuild-group' + +[[profile.default.overrides]] +filter = 'package(hydro_test_embedded) & kind(example)' +test-group = 'hydro-trybuild-group' diff --git a/.vscode/settings.json b/.vscode/settings.json index 8bf8d84dbe97..6852037226e4 100644 --- a/.vscode/settings.json +++ b/.vscode/settings.json @@ -11,7 +11,6 @@ "rust-analyzer.rustfmt.extraArgs": [ "+nightly" ], - "rust-analyzer.cargo.features": "all", "editor.semanticTokenColorCustomizations": { "enabled": true, "rules": { diff --git a/Cargo.lock b/Cargo.lock index 76d1a39f74f5..6f8a25871bb9 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -2781,6 +2781,18 @@ dependencies = [ "tracing-subscriber", ] +[[package]] +name = "hydro_test_embedded" +version = "0.0.0" +dependencies = [ + "dfir_rs", + "hydro_lang", + "hydro_test", + "prettyplease", + "stageleft", + "tokio", +] + [[package]] name = "hydro_test_template" version = "0.0.0" diff --git a/Cargo.toml b/Cargo.toml index 8df9bf271b9c..d79554e1181e 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -13,6 +13,7 @@ members = [ "hydro_lang", "hydro_std", "hydro_test", + "hydro_test_embedded", "hydro_test_template", "include_mdtests", "lattices_macro", diff --git a/hydro_lang/src/compile/deploy.rs b/hydro_lang/src/compile/deploy.rs index 3a139defaa98..d6b3dca1e9c0 100644 --- a/hydro_lang/src/compile/deploy.rs +++ b/hydro_lang/src/compile/deploy.rs @@ -269,7 +269,7 @@ impl<'a, D: Deploy<'a>> DeployFlow<'a, D> { /// Same as [`Self::compile`] but does not invalidate `self`, for internal use. /// /// Empties `self.sidecars` and modifies `self.ir`, leaving `self` in a partial state. - fn compile_internal(&mut self) -> CompiledFlow<'a> { + pub(super) fn compile_internal(&mut self) -> CompiledFlow<'a> { let mut seen_tees: HashMap<_, _> = HashMap::new(); let mut extra_stmts = SparseSecondaryMap::new(); for leaf in self.ir.iter_mut() { diff --git a/hydro_lang/src/compile/embedded.rs b/hydro_lang/src/compile/embedded.rs new file mode 100644 index 000000000000..5a9ea61557a4 --- /dev/null +++ b/hydro_lang/src/compile/embedded.rs @@ -0,0 +1,387 @@ +//! "Embedded" deployment backend for Hydro. +//! +//! Instead of compiling each location into a standalone binary, this backend generates +//! a Rust source file containing one function per location. Each function returns a +//! `dfir_rs::scheduled::graph::Dfir` that can be manually driven by the caller. +//! +//! This is useful when you want full control over where and how the projected DFIR +//! code runs (e.g. embedding it into an existing application). +//! +//! # Limitations +//! +//! Networking is **not** supported. All `Deploy` networking trait methods will panic +//! if called. Only pure local computations (with data embedded in the Hydro program) +//! are supported. + +use std::future::Future; +use std::io::Error; +use std::pin::Pin; + +use bytes::{Bytes, BytesMut}; +use dfir_lang::diagnostic::Diagnostics; +use dfir_lang::graph::DfirGraph; +use futures::{Sink, Stream}; +use proc_macro2::Span; +use quote::quote; +use serde::Serialize; +use serde::de::DeserializeOwned; +use stageleft::{QuotedWithContext, q}; + +use super::deploy_provider::{ClusterSpec, Deploy, ExternalSpec, Node, ProcessSpec, RegisterPort}; +use crate::compile::builder::ExternalPortId; +use crate::location::dynamic::LocationId; +use crate::location::member_id::TaglessMemberId; +use crate::location::{LocationKey, MembershipEvent, NetworkHint}; + +/// Marker type for the embedded deployment backend. +/// +/// All networking methods panic — this backend only supports pure local computation. +pub enum EmbeddedDeploy {} + +/// A trivial node type for embedded deployment. Stores a user-provided function name. +#[derive(Clone)] +pub struct EmbeddedNode { + /// The function name to use in the generated code for this location. + pub fn_name: String, +} + +impl Node for EmbeddedNode { + type Port = (); + type Meta = (); + type InstantiateEnv = (); + + fn next_port(&self) -> Self::Port {} + + fn update_meta(&self, _meta: &Self::Meta) {} + + fn instantiate( + &self, + _env: &mut Self::InstantiateEnv, + _meta: &mut Self::Meta, + _graph: DfirGraph, + _extra_stmts: &[syn::Stmt], + _sidecars: &[syn::Expr], + ) { + // No-op: embedded mode doesn't instantiate nodes at deploy time. + } +} + +impl<'a> RegisterPort<'a, EmbeddedDeploy> for EmbeddedNode { + fn register(&self, _external_port_id: ExternalPortId, _port: Self::Port) { + panic!("EmbeddedDeploy does not support external ports"); + } + + #[expect(clippy::manual_async_fn, reason = "false positive, involves lifetimes")] + fn as_bytes_bidi( + &self, + _external_port_id: ExternalPortId, + ) -> impl Future< + Output = super::deploy_provider::DynSourceSink, Bytes, Error>, + > + 'a { + async { panic!("EmbeddedDeploy does not support external ports") } + } + + #[expect(clippy::manual_async_fn, reason = "false positive, involves lifetimes")] + fn as_bincode_bidi( + &self, + _external_port_id: ExternalPortId, + ) -> impl Future> + 'a + where + InT: Serialize + 'static, + OutT: DeserializeOwned + 'static, + { + async { panic!("EmbeddedDeploy does not support external ports") } + } + + #[expect(clippy::manual_async_fn, reason = "false positive, involves lifetimes")] + fn as_bincode_sink( + &self, + _external_port_id: ExternalPortId, + ) -> impl Future>>> + 'a + where + T: Serialize + 'static, + { + async { panic!("EmbeddedDeploy does not support external ports") } + } + + #[expect(clippy::manual_async_fn, reason = "false positive, involves lifetimes")] + fn as_bincode_source( + &self, + _external_port_id: ExternalPortId, + ) -> impl Future>>> + 'a + where + T: DeserializeOwned + 'static, + { + async { panic!("EmbeddedDeploy does not support external ports") } + } +} + +impl> ProcessSpec<'_, EmbeddedDeploy> for S { + fn build(self, _location_key: LocationKey, _name_hint: &str) -> EmbeddedNode { + EmbeddedNode { + fn_name: self.into(), + } + } +} + +impl> ClusterSpec<'_, EmbeddedDeploy> for S { + fn build(self, _location_key: LocationKey, _name_hint: &str) -> EmbeddedNode { + EmbeddedNode { + fn_name: self.into(), + } + } +} + +impl> ExternalSpec<'_, EmbeddedDeploy> for S { + fn build(self, _location_key: LocationKey, _name_hint: &str) -> EmbeddedNode { + EmbeddedNode { + fn_name: self.into(), + } + } +} + +impl<'a> Deploy<'a> for EmbeddedDeploy { + type Meta = (); + type InstantiateEnv = (); + + type Process = EmbeddedNode; + type Cluster = EmbeddedNode; + type External = EmbeddedNode; + + fn o2o_sink_source( + _p1: &Self::Process, + _p1_port: &(), + _p2: &Self::Process, + _p2_port: &(), + ) -> (syn::Expr, syn::Expr) { + panic!("EmbeddedDeploy does not support networking (o2o)") + } + + fn o2o_connect( + _p1: &Self::Process, + _p1_port: &(), + _p2: &Self::Process, + _p2_port: &(), + ) -> Box { + panic!("EmbeddedDeploy does not support networking (o2o)") + } + + fn o2m_sink_source( + _p1: &Self::Process, + _p1_port: &(), + _c2: &Self::Cluster, + _c2_port: &(), + ) -> (syn::Expr, syn::Expr) { + panic!("EmbeddedDeploy does not support networking (o2m)") + } + + fn o2m_connect( + _p1: &Self::Process, + _p1_port: &(), + _c2: &Self::Cluster, + _c2_port: &(), + ) -> Box { + panic!("EmbeddedDeploy does not support networking (o2m)") + } + + fn m2o_sink_source( + _c1: &Self::Cluster, + _c1_port: &(), + _p2: &Self::Process, + _p2_port: &(), + ) -> (syn::Expr, syn::Expr) { + panic!("EmbeddedDeploy does not support networking (m2o)") + } + + fn m2o_connect( + _c1: &Self::Cluster, + _c1_port: &(), + _p2: &Self::Process, + _p2_port: &(), + ) -> Box { + panic!("EmbeddedDeploy does not support networking (m2o)") + } + + fn m2m_sink_source( + _c1: &Self::Cluster, + _c1_port: &(), + _c2: &Self::Cluster, + _c2_port: &(), + ) -> (syn::Expr, syn::Expr) { + panic!("EmbeddedDeploy does not support networking (m2m)") + } + + fn m2m_connect( + _c1: &Self::Cluster, + _c1_port: &(), + _c2: &Self::Cluster, + _c2_port: &(), + ) -> Box { + panic!("EmbeddedDeploy does not support networking (m2m)") + } + + fn e2o_many_source( + _extra_stmts: &mut Vec, + _p2: &Self::Process, + _p2_port: &(), + _codec_type: &syn::Type, + _shared_handle: String, + ) -> syn::Expr { + panic!("EmbeddedDeploy does not support networking (e2o)") + } + + fn e2o_many_sink(_shared_handle: String) -> syn::Expr { + panic!("EmbeddedDeploy does not support networking (e2o)") + } + + fn e2o_source( + _extra_stmts: &mut Vec, + _p1: &Self::External, + _p1_port: &(), + _p2: &Self::Process, + _p2_port: &(), + _codec_type: &syn::Type, + _shared_handle: String, + ) -> syn::Expr { + panic!("EmbeddedDeploy does not support networking (e2o)") + } + + fn e2o_connect( + _p1: &Self::External, + _p1_port: &(), + _p2: &Self::Process, + _p2_port: &(), + _many: bool, + _server_hint: NetworkHint, + ) -> Box { + panic!("EmbeddedDeploy does not support networking (e2o)") + } + + fn o2e_sink( + _p1: &Self::Process, + _p1_port: &(), + _p2: &Self::External, + _p2_port: &(), + _shared_handle: String, + ) -> syn::Expr { + panic!("EmbeddedDeploy does not support networking (o2e)") + } + + #[expect( + unreachable_code, + reason = "panic before q! which is only for return type" + )] + fn cluster_ids( + _of_cluster: LocationKey, + ) -> impl QuotedWithContext<'a, &'a [TaglessMemberId], ()> + Clone + 'a { + panic!("EmbeddedDeploy does not support cluster IDs"); + q!(unreachable!("EmbeddedDeploy does not support cluster IDs")) + } + + #[expect( + unreachable_code, + reason = "panic before q! which is only for return type" + )] + fn cluster_self_id() -> impl QuotedWithContext<'a, TaglessMemberId, ()> + Clone + 'a { + panic!("EmbeddedDeploy does not support cluster self ID"); + q!(unreachable!( + "EmbeddedDeploy does not support cluster self ID" + )) + } + + #[expect( + unreachable_code, + reason = "panic before q! which is only for return type" + )] + fn cluster_membership_stream( + _location_id: &LocationId, + ) -> impl QuotedWithContext<'a, Box + Unpin>, ()> + { + panic!("EmbeddedDeploy does not support cluster membership streams"); + q!(unreachable!( + "EmbeddedDeploy does not support cluster membership streams" + )) + } +} + +impl super::deploy::DeployFlow<'_, EmbeddedDeploy> { + /// Generates a `syn::File` containing one function per location in the flow. + /// + /// Each generated function has the signature: + /// ```ignore + /// pub fn () -> dfir_rs::scheduled::graph::Dfir<'static> + /// ``` + /// where `fn_name` is the `String` passed to `with_process` / `with_cluster`. + /// + /// The returned `Dfir` can be manually executed by the caller. + /// + /// # Arguments + /// + /// * `crate_name` — the name of the crate containing the Hydro program (used for stageleft + /// re-exports). Hyphens will be replaced with underscores. + /// + /// # Usage + /// + /// Typically called from a `build.rs` in a wrapper crate: + /// ```ignore + /// // build.rs + /// let deploy = flow.with_process(&process, "my_fn".to_string()); + /// let code = deploy.generate_embedded("my_hydro_crate"); + /// let out_dir = std::env::var("OUT_DIR").unwrap(); + /// std::fs::write(format!("{out_dir}/embedded.rs"), prettyplease::unparse(&code)).unwrap(); + /// ``` + /// + /// Then in `lib.rs`: + /// ```ignore + /// include!(concat!(env!("OUT_DIR"), "/embedded.rs")); + /// ``` + pub fn generate_embedded(mut self, crate_name: &str) -> syn::File { + let compiled = self.compile_internal(); + + let root = crate::staging_util::get_this_crate(); + let orig_crate_name = quote::format_ident!("{}", crate_name.replace('-', "_")); + + let mut functions: Vec = Vec::new(); + + // Sort location keys for deterministic output. + let mut location_keys: Vec<_> = compiled.all_dfir().keys().collect(); + location_keys.sort(); + + for location_key in location_keys { + let graph = &compiled.all_dfir()[location_key]; + + // Get the user-provided function name from the node. + let fn_name = self + .processes + .get(location_key) + .map(|n| &n.fn_name) + .or_else(|| self.clusters.get(location_key).map(|n| &n.fn_name)) + .or_else(|| self.externals.get(location_key).map(|n| &n.fn_name)) + .expect("location key not found in any node map"); + + let fn_ident = syn::Ident::new(fn_name, Span::call_site()); + + let mut diagnostics = Diagnostics::new(); + let dfir_tokens = graph + .as_code("e! { __root_dfir_rs }, true, quote!(), &mut diagnostics) + .expect("DFIR code generation failed with diagnostics."); + + let func: syn::Item = syn::parse_quote! { + #[allow(unused, non_snake_case, clippy::suspicious_else_formatting)] + pub fn #fn_ident<'a>() -> #root::runtime_support::dfir_rs::scheduled::graph::Dfir<'a> { + #dfir_tokens + } + }; + functions.push(func); + } + + syn::parse_quote! { + use #orig_crate_name::__staged::__deps::*; + use #root::prelude::*; + use #root::runtime_support::dfir_rs as __root_dfir_rs; + pub use #orig_crate_name::__staged; + + #( #functions )* + } + } +} diff --git a/hydro_lang/src/compile/mod.rs b/hydro_lang/src/compile/mod.rs index 155389d0b392..c7bf256ddf0a 100644 --- a/hydro_lang/src/compile/mod.rs +++ b/hydro_lang/src/compile/mod.rs @@ -18,6 +18,10 @@ pub mod compiled; #[expect(missing_docs, reason = "TODO")] pub mod deploy; +#[cfg(feature = "build")] +#[cfg_attr(docsrs, doc(cfg(feature = "build")))] +pub mod embedded; + #[cfg(feature = "build")] #[cfg_attr(docsrs, doc(cfg(feature = "build")))] #[expect(missing_docs, reason = "TODO")] @@ -45,6 +49,6 @@ pub mod trybuild; pub use trybuild::generate::init_test; /// Ident used for the DFIR runtime instance variable name. -#[cfg(feature = "trybuild")] -#[cfg_attr(docsrs, doc(cfg(feature = "trybuild")))] +#[cfg(feature = "build")] +#[cfg_attr(docsrs, doc(cfg(feature = "build")))] pub(crate) const DFIR_IDENT: &str = "flow"; diff --git a/hydro_test/Cargo.toml b/hydro_test/Cargo.toml index 3fc511af0a4d..874f6b47e9a8 100644 --- a/hydro_test/Cargo.toml +++ b/hydro_test/Cargo.toml @@ -11,6 +11,7 @@ workspace = true docker = ["hydro_lang/docker_deploy"] ecs = ["hydro_lang/ecs_deploy"] maelstrom = ["hydro_lang/maelstrom"] +stageleft_macro_entrypoint = [] [dependencies] hydro_lang = { path = "../hydro_lang", version = "^0.15.0" } diff --git a/hydro_test/src/local/first_ten.rs b/hydro_test/src/local/first_ten.rs new file mode 100644 index 000000000000..d8be0fc0afb1 --- /dev/null +++ b/hydro_test/src/local/first_ten.rs @@ -0,0 +1,7 @@ +use hydro_lang::prelude::*; + +pub fn first_ten<'a>(process: &Process<'a, ()>) { + process + .source_iter(q!(0..10)) + .for_each(q!(|n| println!("{}", n))); +} diff --git a/hydro_test/src/local/mod.rs b/hydro_test/src/local/mod.rs index 7d794e0b728e..c00393ca756d 100644 --- a/hydro_test/src/local/mod.rs +++ b/hydro_test/src/local/mod.rs @@ -1,4 +1,5 @@ pub mod chat_app; pub mod count_elems; +pub mod first_ten; pub mod futures; pub mod graph_reachability; diff --git a/hydro_test_embedded/Cargo.toml b/hydro_test_embedded/Cargo.toml new file mode 100644 index 000000000000..a42bdb5ae7a0 --- /dev/null +++ b/hydro_test_embedded/Cargo.toml @@ -0,0 +1,31 @@ +[package] +name = "hydro_test_embedded" +publish = false +version = "0.0.0" +edition = "2024" + +[lints] +workspace = true + +[features] +default = [] +test_embedded = [ + "dep:hydro_lang", + "dep:hydro_test", + "dep:stageleft", + "dep:prettyplease", +] + +[dependencies] +hydro_lang = { path = "../hydro_lang", version = "^0.15.0", features = ["runtime_support"], optional = true } +hydro_test = { path = "../hydro_test", version = "0.0.0", features = ["stageleft_macro_entrypoint"], optional = true } +stageleft = { workspace = true, optional = true } + +[dev-dependencies] +dfir_rs = { path = "../dfir_rs", version = "^0.15.0", default-features = false } +tokio = { version = "1.29.0", features = ["full"] } + +[build-dependencies] +hydro_lang = { path = "../hydro_lang", version = "^0.15.0", features = ["build"], optional = true } +hydro_test = { path = "../hydro_test", version = "0.0.0", optional = true } +prettyplease = { version = "0.2.0", features = ["verbatim"], optional = true } diff --git a/hydro_test_embedded/build.rs b/hydro_test_embedded/build.rs new file mode 100644 index 000000000000..46d949637daf --- /dev/null +++ b/hydro_test_embedded/build.rs @@ -0,0 +1,21 @@ +fn main() { + #[cfg(feature = "test_embedded")] + generate_embedded(); +} + +#[cfg(feature = "test_embedded")] +fn generate_embedded() { + println!("cargo::rerun-if-changed=build.rs"); + + let mut flow = hydro_lang::compile::builder::FlowBuilder::new(); + let process = flow.process::<()>(); + hydro_test::local::first_ten::first_ten(&process); + + let code = flow + .with_process(&process, "first_ten") + .generate_embedded("hydro_test"); + + let out_dir = std::env::var("OUT_DIR").unwrap(); + let out_path = format!("{out_dir}/embedded.rs"); + std::fs::write(&out_path, prettyplease::unparse(&code)).unwrap(); +} diff --git a/hydro_test_embedded/examples/embedded_first_ten.rs b/hydro_test_embedded/examples/embedded_first_ten.rs new file mode 100644 index 000000000000..a9485a3ab5c3 --- /dev/null +++ b/hydro_test_embedded/examples/embedded_first_ten.rs @@ -0,0 +1,54 @@ +#[cfg(feature = "test_embedded")] +#[tokio::main] +async fn main() { + let mut flow = hydro_test_embedded::embedded::first_ten(); + tokio::task::LocalSet::new() + .run_until(flow.run_available()) + .await; +} + +#[cfg(not(feature = "test_embedded"))] +fn main() { + eprintln!("This example requires the `test_embedded` feature."); + std::process::exit(1); +} + +#[cfg(test)] +mod tests { + use std::process::{Command, Stdio}; + + #[test] + fn test_embedded_first_ten() { + let output = Command::new("cargo") + .args([ + "run", + "--frozen", + "-p", + "hydro_test_embedded", + "--example", + "embedded_first_ten", + "--features", + "test_embedded", + ]) + .stdout(Stdio::piped()) + .output() + .expect("failed to spawn cargo run"); + + let stdout = String::from_utf8_lossy(&output.stdout); + + assert!( + output.status.success(), + "example failed with status {}.\nstdout:\n{}", + output.status, + stdout, + ); + + let lines: Vec<&str> = stdout.lines().collect(); + let expected: Vec = (0..10).map(|i| i.to_string()).collect(); + assert_eq!( + lines, expected, + "expected first 10 numbers, got:\n{}", + stdout + ); + } +} diff --git a/hydro_test_embedded/src/lib.rs b/hydro_test_embedded/src/lib.rs new file mode 100644 index 000000000000..3a8bcc5e858c --- /dev/null +++ b/hydro_test_embedded/src/lib.rs @@ -0,0 +1,10 @@ +#[cfg(feature = "test_embedded")] +#[expect( + clippy::allow_attributes, + clippy::allow_attributes_without_reason, + reason = "generated code" +)] +#[allow(unused_imports, unused_qualifications, missing_docs, non_snake_case)] +pub mod embedded { + include!(concat!(env!("OUT_DIR"), "/embedded.rs")); +} From a9e408c53865a5fb3ec418c8b2da8cdbbf24b12d Mon Sep 17 00:00:00 2001 From: Shadaj Laddad Date: Fri, 6 Feb 2026 16:49:03 -0800 Subject: [PATCH 2/2] feat(hydro_lang): support in-memory inputs for embedded Hydro programs --- hydro_lang/src/compile/builder.rs | 2 +- hydro_lang/src/compile/built.rs | 2 +- hydro_lang/src/compile/deploy.rs | 12 ++-- hydro_lang/src/compile/deploy_provider.rs | 12 ++++ hydro_lang/src/compile/embedded.rs | 40 +++++++++++-- hydro_lang/src/compile/ir/mod.rs | 60 ++++++++++++++++++- hydro_lang/src/location/mod.rs | 29 +++++++++ hydro_lang/src/sim/flow.rs | 1 + hydro_lang/src/viz/render.rs | 4 ++ hydro_test/src/local/capitalize.rs | 7 +++ hydro_test/src/local/first_ten.rs | 7 --- hydro_test/src/local/mod.rs | 2 +- hydro_test_embedded/build.rs | 6 +- ...ed_first_ten.rs => embedded_capitalize.rs} | 15 +++-- 14 files changed, 173 insertions(+), 26 deletions(-) create mode 100644 hydro_test/src/local/capitalize.rs delete mode 100644 hydro_test/src/local/first_ten.rs rename hydro_test_embedded/examples/{embedded_first_ten.rs => embedded_capitalize.rs} (74%) diff --git a/hydro_lang/src/compile/builder.rs b/hydro_lang/src/compile/builder.rs index deee7107c900..33f1b297e1d4 100644 --- a/hydro_lang/src/compile/builder.rs +++ b/hydro_lang/src/compile/builder.rs @@ -231,7 +231,7 @@ impl<'a> FlowBuilder<'a> { self.with_default_optimize().with_remaining_clusters(spec) } - pub fn compile>(self) -> CompiledFlow<'a> { + pub fn compile>(self) -> CompiledFlow<'a> { self.with_default_optimize::().compile() } diff --git a/hydro_lang/src/compile/built.rs b/hydro_lang/src/compile/built.rs index 2313103ef236..a778b5bb1858 100644 --- a/hydro_lang/src/compile/built.rs +++ b/hydro_lang/src/compile/built.rs @@ -316,7 +316,7 @@ impl<'a> BuiltFlow<'a> { self.into_deploy().with_remaining_clusters(spec) } - pub fn compile>(self) -> CompiledFlow<'a> { + pub fn compile>(self) -> CompiledFlow<'a> { self.into_deploy::().compile() } diff --git a/hydro_lang/src/compile/deploy.rs b/hydro_lang/src/compile/deploy.rs index d6b3dca1e9c0..10cd215df3d9 100644 --- a/hydro_lang/src/compile/deploy.rs +++ b/hydro_lang/src/compile/deploy.rs @@ -262,14 +262,17 @@ impl<'a, D: Deploy<'a>> DeployFlow<'a, D> { /// Compiles the flow into DFIR ([`dfir_lang::graph::DfirGraph`]) including networking. /// /// (This does not compile the DFIR itself, instead use [`Self::deploy`] to compile & deploy the DFIR). - pub fn compile(mut self) -> CompiledFlow<'a> { - self.compile_internal() + pub fn compile(mut self) -> CompiledFlow<'a> + where + D: Deploy<'a, InstantiateEnv = ()>, + { + self.compile_internal(&mut ()) } /// Same as [`Self::compile`] but does not invalidate `self`, for internal use. /// /// Empties `self.sidecars` and modifies `self.ir`, leaving `self` in a partial state. - pub(super) fn compile_internal(&mut self) -> CompiledFlow<'a> { + pub(super) fn compile_internal(&mut self, env: &mut D::InstantiateEnv) -> CompiledFlow<'a> { let mut seen_tees: HashMap<_, _> = HashMap::new(); let mut extra_stmts = SparseSecondaryMap::new(); for leaf in self.ir.iter_mut() { @@ -279,6 +282,7 @@ impl<'a, D: Deploy<'a>> DeployFlow<'a, D> { &self.processes, &self.clusters, &self.externals, + env, ); } @@ -348,7 +352,7 @@ impl<'a, D: Deploy<'a>> DeployFlow<'a, D> { mut extra_stmts, mut sidecars, _phantom, - } = self.compile_internal(); + } = self.compile_internal(env); let mut compiled = dfir; self.cluster_id_stmts(&mut extra_stmts); diff --git a/hydro_lang/src/compile/deploy_provider.rs b/hydro_lang/src/compile/deploy_provider.rs index c2a200c0a66d..27c879668212 100644 --- a/hydro_lang/src/compile/deploy_provider.rs +++ b/hydro_lang/src/compile/deploy_provider.rs @@ -118,6 +118,18 @@ pub trait Deploy<'a> { fn cluster_membership_stream( location_id: &LocationId, ) -> impl QuotedWithContext<'a, Box + Unpin>, ()>; + + /// Registers an embedded input for the given ident and element type. + /// + /// Only meaningful for the embedded deployment backend. The default + /// implementation panics. + fn register_embedded_input( + _env: &mut Self::InstantiateEnv, + _ident: &syn::Ident, + _element_type: &syn::Type, + ) { + panic!("register_embedded_input is only supported by EmbeddedDeploy"); + } } pub trait ProcessSpec<'a, D> diff --git a/hydro_lang/src/compile/embedded.rs b/hydro_lang/src/compile/embedded.rs index 5a9ea61557a4..7dae75240d30 100644 --- a/hydro_lang/src/compile/embedded.rs +++ b/hydro_lang/src/compile/embedded.rs @@ -48,7 +48,7 @@ pub struct EmbeddedNode { impl Node for EmbeddedNode { type Port = (); type Meta = (); - type InstantiateEnv = (); + type InstantiateEnv = EmbeddedInstantiateEnv; fn next_port(&self) -> Self::Port {} @@ -140,9 +140,20 @@ impl> ExternalSpec<'_, EmbeddedDeploy> for S { } } +/// Collected embedded input registrations. +/// +/// During `compile_network`, each `EmbeddedInput` IR node registers its ident +/// and element type here. `generate_embedded` then uses this to add parameters +/// to the generated functions. +#[derive(Default)] +pub struct EmbeddedInstantiateEnv { + /// (ident name, element type) pairs collected during compilation. + pub inputs: Vec<(syn::Ident, syn::Type)>, +} + impl<'a> Deploy<'a> for EmbeddedDeploy { type Meta = (); - type InstantiateEnv = (); + type InstantiateEnv = EmbeddedInstantiateEnv; type Process = EmbeddedNode; type Cluster = EmbeddedNode; @@ -302,6 +313,14 @@ impl<'a> Deploy<'a> for EmbeddedDeploy { "EmbeddedDeploy does not support cluster membership streams" )) } + + fn register_embedded_input( + env: &mut Self::InstantiateEnv, + ident: &syn::Ident, + element_type: &syn::Type, + ) { + env.inputs.push((ident.clone(), element_type.clone())); + } } impl super::deploy::DeployFlow<'_, EmbeddedDeploy> { @@ -336,7 +355,12 @@ impl super::deploy::DeployFlow<'_, EmbeddedDeploy> { /// include!(concat!(env!("OUT_DIR"), "/embedded.rs")); /// ``` pub fn generate_embedded(mut self, crate_name: &str) -> syn::File { - let compiled = self.compile_internal(); + let mut env = EmbeddedInstantiateEnv::default(); + let compiled = self.compile_internal(&mut env); + + // Sort inputs by name for deterministic output. + let mut inputs = env.inputs; + inputs.sort_by(|a, b| a.0.to_string().cmp(&b.0.to_string())); let root = crate::staging_util::get_this_crate(); let orig_crate_name = quote::format_ident!("{}", crate_name.replace('-', "_")); @@ -347,6 +371,14 @@ impl super::deploy::DeployFlow<'_, EmbeddedDeploy> { let mut location_keys: Vec<_> = compiled.all_dfir().keys().collect(); location_keys.sort(); + // Build the input parameters for each generated function. + let input_params: Vec = inputs + .iter() + .map(|(ident, element_type)| { + quote! { #ident: impl __root_dfir_rs::futures::Stream + Unpin + 'a } + }) + .collect(); + for location_key in location_keys { let graph = &compiled.all_dfir()[location_key]; @@ -368,7 +400,7 @@ impl super::deploy::DeployFlow<'_, EmbeddedDeploy> { let func: syn::Item = syn::parse_quote! { #[allow(unused, non_snake_case, clippy::suspicious_else_formatting)] - pub fn #fn_ident<'a>() -> #root::runtime_support::dfir_rs::scheduled::graph::Dfir<'a> { + pub fn #fn_ident<'a>(#(#input_params),*) -> #root::runtime_support::dfir_rs::scheduled::graph::Dfir<'a> { #dfir_tokens } }; diff --git a/hydro_lang/src/compile/ir/mod.rs b/hydro_lang/src/compile/ir/mod.rs index aca7356f6ca8..6efe6d5bad4f 100644 --- a/hydro_lang/src/compile/ir/mod.rs +++ b/hydro_lang/src/compile/ir/mod.rs @@ -685,10 +685,12 @@ impl HydroRoot { processes: &SparseSecondaryMap, clusters: &SparseSecondaryMap, externals: &SparseSecondaryMap, + env: &mut D::InstantiateEnv, ) where D: Deploy<'a>, { let refcell_extra_stmts = RefCell::new(extra_stmts); + let refcell_env = RefCell::new(env); self.transform_bottom_up( &mut |l| { if let HydroRoot::SendExternal { @@ -887,6 +889,16 @@ impl HydroRoot { connect_fn: Some(connect_fn), } .into(); + } else if let HydroNode::EmbeddedInput { ident, metadata } = n { + let element_type = match &metadata.collection_kind { + CollectionKind::Stream { element_type, .. } => element_type.0.as_ref().clone(), + _ => panic!("EmbeddedInput must have Stream collection kind"), + }; + D::register_embedded_input( + &mut refcell_env.borrow_mut(), + ident, + &element_type, + ); } }, seen_tees, @@ -1721,6 +1733,15 @@ pub enum HydroNode { input: Box, metadata: HydroIrMetadata, }, + + /// An external input for embedded deployment mode. + /// + /// This node compiles to `source_stream(ident)` where `ident` is a parameter + /// added to the generated function signature. + EmbeddedInput { + ident: syn::Ident, + metadata: HydroIrMetadata, + }, } pub type SeenTees = HashMap<*const RefCell, Rc>>; @@ -1776,7 +1797,8 @@ impl HydroNode { HydroNode::Source { .. } | HydroNode::SingletonSource { .. } | HydroNode::CycleSource { .. } - | HydroNode::ExternalInput { .. } => {} + | HydroNode::ExternalInput { .. } + | HydroNode::EmbeddedInput { .. } => {} HydroNode::Tee { inner, .. } => { if let Some(transformed) = seen_tees.get(&inner.as_ptr()) { @@ -2120,6 +2142,10 @@ impl HydroNode { input: Box::new(input.deep_clone(seen_tees)), metadata: metadata.clone(), }, + HydroNode::EmbeddedInput { ident, metadata } => HydroNode::EmbeddedInput { + ident: ident.clone(), + metadata: metadata.clone(), + }, } } @@ -3430,6 +3456,33 @@ impl HydroNode { ident_stack.push(counter_ident); } + + HydroNode::EmbeddedInput { ident, .. } => { + let source_ident = + syn::Ident::new(&format!("stream_{}", *next_stmt_id), Span::call_site()); + + let ident = ident.clone(); + + match builders_or_callback { + BuildersOrCallback::Builders(graph_builders) => { + let builder = graph_builders.get_dfir_mut(&out_location); + builder.add_dfir( + parse_quote! { + #source_ident = source_stream(#ident); + }, + None, + Some(&next_stmt_id.to_string()), + ); + } + BuildersOrCallback::Callback(_, node_callback) => { + node_callback(node, next_stmt_id); + } + } + + *next_stmt_id += 1; + + ident_stack.push(source_ident); + } } }, seen_tees, @@ -3508,6 +3561,7 @@ impl HydroNode { transform(deserialize_fn); } } + HydroNode::EmbeddedInput { .. } => {} HydroNode::Counter { duration, .. } => { transform(duration); } @@ -3560,6 +3614,7 @@ impl HydroNode { HydroNode::ExternalInput { metadata, .. } => metadata, HydroNode::Network { metadata, .. } => metadata, HydroNode::Counter { metadata, .. } => metadata, + HydroNode::EmbeddedInput { metadata, .. } => metadata, } } @@ -3609,6 +3664,7 @@ impl HydroNode { HydroNode::ExternalInput { metadata, .. } => metadata, HydroNode::Network { metadata, .. } => metadata, HydroNode::Counter { metadata, .. } => metadata, + HydroNode::EmbeddedInput { metadata, .. } => metadata, } } @@ -3621,6 +3677,7 @@ impl HydroNode { | HydroNode::SingletonSource { .. } | HydroNode::ExternalInput { .. } | HydroNode::CycleSource { .. } + | HydroNode::EmbeddedInput { .. } | HydroNode::Tee { .. } => { // Tee should find its input in separate special ways vec![] @@ -3752,6 +3809,7 @@ impl HydroNode { HydroNode::Counter { tag, duration, .. } => { format!("Counter({:?}, {:?})", tag, duration) } + HydroNode::EmbeddedInput { ident, .. } => format!("EmbeddedInput({})", ident), } } } diff --git a/hydro_lang/src/location/mod.rs b/hydro_lang/src/location/mod.rs index de5cfaadb370..9bf1eea59d09 100644 --- a/hydro_lang/src/location/mod.rs +++ b/hydro_lang/src/location/mod.rs @@ -520,6 +520,35 @@ pub trait Location<'a>: dynamic::DynLocation { (SimSender(external.port_id, PhantomData), stream) } + /// Creates an external input stream for embedded deployment mode. + /// + /// The `name` parameter specifies the name of the generated function parameter + /// that will supply data to this stream at runtime. The generated function will + /// accept an `impl Stream + Unpin` argument with this name. + fn embedded_input( + &self, + name: impl Into, + ) -> Stream + where + Self: Sized + NoTick, + { + let ident = syn::Ident::new(&name.into(), Span::call_site()); + + Stream::new( + self.clone(), + HydroNode::EmbeddedInput { + ident, + metadata: self.new_node_metadata(Stream::< + T, + Self, + Unbounded, + TotalOrder, + ExactlyOnce, + >::collection_kind()), + }, + ) + } + /// Establishes a server on this location to receive a bidirectional connection from a single /// client, identified by the given `External` handle. Returns a port handle for the external /// process to connect to, a stream of incoming messages, and a handle to send outgoing diff --git a/hydro_lang/src/sim/flow.rs b/hydro_lang/src/sim/flow.rs index 95742682fdf2..0b369bcd1d82 100644 --- a/hydro_lang/src/sim/flow.rs +++ b/hydro_lang/src/sim/flow.rs @@ -110,6 +110,7 @@ impl<'a> SimFlow<'a> { &self.processes, &self.clusters, &self.externals, + &mut (), ); }); diff --git a/hydro_lang/src/viz/render.rs b/hydro_lang/src/viz/render.rs index efba86280e3f..a7617bbb5dd0 100644 --- a/hydro_lang/src/viz/render.rs +++ b/hydro_lang/src/viz/render.rs @@ -1550,6 +1550,10 @@ impl HydroNode { }, duration, ), + + HydroNode::EmbeddedInput { ident, metadata } => { + build_source_node(structure, metadata, format!("embedded_input({})", ident)) + } } } } diff --git a/hydro_test/src/local/capitalize.rs b/hydro_test/src/local/capitalize.rs new file mode 100644 index 000000000000..4db077e53b62 --- /dev/null +++ b/hydro_test/src/local/capitalize.rs @@ -0,0 +1,7 @@ +use hydro_lang::prelude::*; + +pub fn capitalize<'a>(input: Stream>) { + input + .map(q!(|s| s.to_uppercase())) + .for_each(q!(|s| println!("{}", s))); +} diff --git a/hydro_test/src/local/first_ten.rs b/hydro_test/src/local/first_ten.rs deleted file mode 100644 index d8be0fc0afb1..000000000000 --- a/hydro_test/src/local/first_ten.rs +++ /dev/null @@ -1,7 +0,0 @@ -use hydro_lang::prelude::*; - -pub fn first_ten<'a>(process: &Process<'a, ()>) { - process - .source_iter(q!(0..10)) - .for_each(q!(|n| println!("{}", n))); -} diff --git a/hydro_test/src/local/mod.rs b/hydro_test/src/local/mod.rs index c00393ca756d..6b71b61c378b 100644 --- a/hydro_test/src/local/mod.rs +++ b/hydro_test/src/local/mod.rs @@ -1,5 +1,5 @@ +pub mod capitalize; pub mod chat_app; pub mod count_elems; -pub mod first_ten; pub mod futures; pub mod graph_reachability; diff --git a/hydro_test_embedded/build.rs b/hydro_test_embedded/build.rs index 46d949637daf..7af1c030c0c1 100644 --- a/hydro_test_embedded/build.rs +++ b/hydro_test_embedded/build.rs @@ -5,14 +5,16 @@ fn main() { #[cfg(feature = "test_embedded")] fn generate_embedded() { + use hydro_lang::location::Location; + println!("cargo::rerun-if-changed=build.rs"); let mut flow = hydro_lang::compile::builder::FlowBuilder::new(); let process = flow.process::<()>(); - hydro_test::local::first_ten::first_ten(&process); + hydro_test::local::capitalize::capitalize(process.embedded_input("input")); let code = flow - .with_process(&process, "first_ten") + .with_process(&process, "capitalize") .generate_embedded("hydro_test"); let out_dir = std::env::var("OUT_DIR").unwrap(); diff --git a/hydro_test_embedded/examples/embedded_first_ten.rs b/hydro_test_embedded/examples/embedded_capitalize.rs similarity index 74% rename from hydro_test_embedded/examples/embedded_first_ten.rs rename to hydro_test_embedded/examples/embedded_capitalize.rs index a9485a3ab5c3..5e81c86fd906 100644 --- a/hydro_test_embedded/examples/embedded_first_ten.rs +++ b/hydro_test_embedded/examples/embedded_capitalize.rs @@ -1,7 +1,12 @@ #[cfg(feature = "test_embedded")] #[tokio::main] async fn main() { - let mut flow = hydro_test_embedded::embedded::first_ten(); + let input = dfir_rs::futures::stream::iter(vec![ + "hello".to_string(), + "world".to_string(), + "hydro".to_string(), + ]); + let mut flow = hydro_test_embedded::embedded::capitalize(input); tokio::task::LocalSet::new() .run_until(flow.run_available()) .await; @@ -18,7 +23,7 @@ mod tests { use std::process::{Command, Stdio}; #[test] - fn test_embedded_first_ten() { + fn test_embedded_capitalize() { let output = Command::new("cargo") .args([ "run", @@ -26,7 +31,7 @@ mod tests { "-p", "hydro_test_embedded", "--example", - "embedded_first_ten", + "embedded_capitalize", "--features", "test_embedded", ]) @@ -44,10 +49,10 @@ mod tests { ); let lines: Vec<&str> = stdout.lines().collect(); - let expected: Vec = (0..10).map(|i| i.to_string()).collect(); + let expected = vec!["HELLO", "WORLD", "HYDRO"]; assert_eq!( lines, expected, - "expected first 10 numbers, got:\n{}", + "expected capitalized strings, got:\n{}", stdout ); }