diff --git a/Cargo.toml b/Cargo.toml index e5099b8..1093de7 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -29,6 +29,7 @@ bimap = { version = "0.6.3", optional = true } sigmf = { version = "0.1.0", path = "crates/sigmf" } async-fs = "2.1.2" serde = "1.0.204" +zenoh = { version = "1.2.1", optional = true } [dev-dependencies] criterion = { version = "0.5.1", features = ["html_reports"] } @@ -42,6 +43,7 @@ default = [] crossbeam = ["dep:crossbeam-channel"] async-channel = ["dep:async-channel"] cw = ["dep:bimap"] +zenoh = ["dep:zenoh"] [[bench]] name = "crossbeam_sink" diff --git a/examples/zenoh/Cargo.toml b/examples/zenoh/Cargo.toml new file mode 100644 index 0000000..ca6ff1e --- /dev/null +++ b/examples/zenoh/Cargo.toml @@ -0,0 +1,19 @@ +[package] +name = "zenoh" +version = "0.1.0" +edition = "2021" + +[workspace] + +[dependencies] +anyhow = "1.0" +fsdr-blocks = { path = "../..", features = ["zenoh"] } +futuresdr = { git = "https://github.com/FutureSDR/FutureSDR", branch = "main", version = "0.0.37"} + +[[bin]] +name = "zenoh-receiver" +path = "zenoh_receiver.rs" + +[[bin]] +name = "zenoh-sender" +path = "zenoh_sender.rs" diff --git a/examples/zenoh/README.md b/examples/zenoh/README.md new file mode 100644 index 0000000..ad332f3 --- /dev/null +++ b/examples/zenoh/README.md @@ -0,0 +1,24 @@ +# Zenoh Blocks Example + +## Overview +This example demonstrates how to use the `PubSink` and `SubSource` blocks. The `PubSink` declares a Zenoh publisher that publishes samples to the given key expression. The `SubSource` declares a Zenoh subscriber that subscribes to samples from the given key expression. + +## Build +To build the example follow these steps: + +- Open a terminal +- Run `cargo build` + +## Run +To run the example follow these steps: + +- Open a terminal +- Run `cargo run --bin zenoh-receiver` +- Open another terminal +- Run `cargo run --bin zenoh-sender` +- Terminate both programs after an arbitrary amount of time +- The output file will be located at `/tmp/zenoh-log.bin` + +## References +- [FutureSDR](https://www.futuresdr.org/) +- [Zenoh](https://zenoh.io/) \ No newline at end of file diff --git a/examples/zenoh/zenoh_receiver.rs b/examples/zenoh/zenoh_receiver.rs new file mode 100644 index 0000000..aa66769 --- /dev/null +++ b/examples/zenoh/zenoh_receiver.rs @@ -0,0 +1,18 @@ +use anyhow::Result; +use fsdr_blocks::zenoh::SubSourceBuilder; +use futuresdr::blocks::FileSink; +use futuresdr::runtime::Flowgraph; +use futuresdr::runtime::Runtime; + +fn main() -> Result<()> { + let mut flowgraph = Flowgraph::new(); + + let sub_source = flowgraph.add_block(SubSourceBuilder::::new().build())?; + let file_sink = flowgraph.add_block(FileSink::::new("/tmp/zenoh-log.bin"))?; + + flowgraph.connect_stream(sub_source, "out", file_sink, "in")?; + + Runtime::new().run(flowgraph)?; + + Ok(()) +} diff --git a/examples/zenoh/zenoh_sender.rs b/examples/zenoh/zenoh_sender.rs new file mode 100644 index 0000000..b277371 --- /dev/null +++ b/examples/zenoh/zenoh_sender.rs @@ -0,0 +1,24 @@ +use anyhow::Result; +use fsdr_blocks::zenoh::PubSinkBuilder; +use futuresdr::blocks::Head; +use futuresdr::blocks::NullSource; +use futuresdr::blocks::Throttle; +use futuresdr::runtime::Flowgraph; +use futuresdr::runtime::Runtime; + +fn main() -> Result<()> { + let mut flowgraph = Flowgraph::new(); + + let null_source = flowgraph.add_block(NullSource::::new())?; + let head = flowgraph.add_block(Head::::new(1_000_000))?; + let throttle = flowgraph.add_block(Throttle::::new(100e3))?; + let pub_sink = flowgraph.add_block(PubSinkBuilder::::new().build())?; + + flowgraph.connect_stream(null_source, "out", head, "in")?; + flowgraph.connect_stream(head, "out", throttle, "in")?; + flowgraph.connect_stream(throttle, "out", pub_sink, "in")?; + + Runtime::new().run(flowgraph)?; + + Ok(()) +} diff --git a/src/lib.rs b/src/lib.rs index afe00d6..ce955a1 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -15,6 +15,9 @@ pub mod async_channel; #[cfg(feature = "cw")] pub mod cw; +#[cfg(feature = "zenoh")] +pub mod zenoh; + pub mod agc; pub mod math; pub mod sigmf; diff --git a/src/zenoh/mod.rs b/src/zenoh/mod.rs new file mode 100644 index 0000000..e3fcf8e --- /dev/null +++ b/src/zenoh/mod.rs @@ -0,0 +1,8 @@ +//! ## [Zenoh](https://zenoh.io/) Blocks +mod pub_sink; +pub use pub_sink::PubSink; +pub use pub_sink::PubSinkBuilder; + +mod sub_source; +pub use sub_source::SubSource; +pub use sub_source::SubSourceBuilder; diff --git a/src/zenoh/pub_sink.rs b/src/zenoh/pub_sink.rs new file mode 100644 index 0000000..71124b6 --- /dev/null +++ b/src/zenoh/pub_sink.rs @@ -0,0 +1,144 @@ +use futuresdr::runtime::BlockMeta; +use futuresdr::runtime::BlockMetaBuilder; +use futuresdr::runtime::Kernel; +use futuresdr::runtime::MessageIo; +use futuresdr::runtime::MessageIoBuilder; +use futuresdr::runtime::Result; +use futuresdr::runtime::StreamIo; +use futuresdr::runtime::StreamIoBuilder; +use futuresdr::runtime::TypedBlock; +use futuresdr::runtime::WorkIo; + +/// Push samples into [Zenoh](https://zenoh.io/) socket. +pub struct PubSink { + config: zenoh::Config, + key_expression: String, + session: Option, + publisher: Option>, + _type: std::marker::PhantomData, + min_item: usize, +} + +impl PubSink { + /// Create PubSink + pub fn new( + config: zenoh::Config, + key_expression: impl Into, + min_item: usize, + ) -> TypedBlock { + TypedBlock::new( + BlockMetaBuilder::new("PubSink").blocking().build(), + StreamIoBuilder::new().add_input::("in").build(), + MessageIoBuilder::new().build(), + PubSink { + config: config, + key_expression: key_expression.into(), + session: None, + publisher: None, + _type: std::marker::PhantomData::, + min_item, + }, + ) + } +} + +#[doc(hidden)] +#[async_trait] +impl Kernel for PubSink { + async fn work( + &mut self, + work_io: &mut WorkIo, + stream_io: &mut StreamIo, + _message_io: &mut MessageIo, + _meta: &mut BlockMeta, + ) -> Result<()> { + let input = stream_io.input(0).slice_unchecked::(); + + let item_size = std::mem::size_of::(); + let item_count = input.len() / item_size; + + if item_count > 0 && item_count > self.min_item { + let input = &input[..item_count * item_size]; + + self.publisher.as_mut().unwrap().put(input).await.unwrap(); + + stream_io.input(0).consume(item_count); + } + + if stream_io.input(0).finished() { + work_io.finished = true; + } + + Ok(()) + } + + async fn init( + &mut self, + _stream_io: &mut StreamIo, + _message_io: &mut MessageIo, + _meta: &mut BlockMeta, + ) -> Result<()> { + let session = zenoh::open(self.config.clone()).await.unwrap(); + let publisher = session + .declare_publisher(self.key_expression.clone()) + .await + .unwrap(); + + self.session = Some(session); + self.publisher = Some(publisher); + + Ok(()) + } +} + +/// Build a Zenoh [PubSink]. +pub struct PubSinkBuilder { + config: zenoh::Config, + key_expression: String, + _type: std::marker::PhantomData, + /// Minimum number of items per send + min_item: usize, +} + +impl PubSinkBuilder { + /// Create PubSink builder + pub fn new() -> PubSinkBuilder { + PubSinkBuilder { + config: zenoh::Config::default(), + key_expression: "future-sdr/*".into(), + _type: std::marker::PhantomData, + min_item: 1, + } + } + + /// Zenoh configuration + #[must_use] + pub fn config(mut self, config: zenoh::Config) -> PubSinkBuilder { + self.config = config; + self + } + + /// Zenoh key expression + #[must_use] + pub fn key_expression(mut self, key_expression: &str) -> PubSinkBuilder { + self.key_expression = key_expression.to_string(); + self + } + + /// Set minimum number of items in send buffer + pub fn min_item_per_send(mut self, min_item: usize) -> PubSinkBuilder { + self.min_item = min_item; + self + } + + /// Build PubSink + pub fn build(self) -> TypedBlock> { + PubSink::::new(self.config, self.key_expression, self.min_item) + } +} + +impl Default for PubSinkBuilder { + fn default() -> Self { + Self::new() + } +} diff --git a/src/zenoh/sub_source.rs b/src/zenoh/sub_source.rs new file mode 100644 index 0000000..56322a1 --- /dev/null +++ b/src/zenoh/sub_source.rs @@ -0,0 +1,126 @@ +use futuresdr::runtime::BlockMeta; +use futuresdr::runtime::BlockMetaBuilder; +use futuresdr::runtime::Kernel; +use futuresdr::runtime::MessageIo; +use futuresdr::runtime::MessageIoBuilder; +use futuresdr::runtime::Result; +use futuresdr::runtime::StreamIo; +use futuresdr::runtime::StreamIoBuilder; +use futuresdr::runtime::TypedBlock; +use futuresdr::runtime::WorkIo; + +/// Read samples from [Zenoh](https://zenoh.io/) socket. +pub struct SubSource { + config: zenoh::Config, + key_expression: String, + session: Option, + subscriber: Option< + zenoh::pubsub::Subscriber>, + >, + _type: std::marker::PhantomData, +} + +impl SubSource { + /// Create SubSource block + pub fn new(config: zenoh::Config, key_expression: impl Into) -> TypedBlock { + TypedBlock::new( + BlockMetaBuilder::new("SubSource").blocking().build(), + StreamIoBuilder::new().add_output::("out").build(), + MessageIoBuilder::new().build(), + SubSource { + config: config, + key_expression: key_expression.into(), + session: None, + subscriber: None, + _type: std::marker::PhantomData::, + }, + ) + } +} + +#[doc(hidden)] +#[async_trait] +impl Kernel for SubSource { + async fn work( + &mut self, + _work_io: &mut WorkIo, + stream_io: &mut StreamIo, + _message_io: &mut MessageIo, + _meta: &mut BlockMeta, + ) -> Result<()> { + let output = stream_io.output(0).slice_unchecked::(); + + if let Ok(sample) = self.subscriber.as_mut().unwrap().recv_async().await { + let payload = sample.payload().to_bytes(); + + let output_len = std::cmp::min(output.len(), payload.as_ref().len()); + + output[..output_len].copy_from_slice(&payload.as_ref()[..output_len]); + + stream_io.output(0).produce(output_len); + } + + Ok(()) + } + + async fn init( + &mut self, + _stream_io: &mut StreamIo, + _message_io: &mut MessageIo, + _meta: &mut BlockMeta, + ) -> Result<()> { + let session = zenoh::open(self.config.clone()).await.unwrap(); + let subscriber = session + .declare_subscriber(self.key_expression.clone()) + .await + .unwrap(); + + self.session = Some(session); + self.subscriber = Some(subscriber); + + Ok(()) + } +} + +/// Build a Zenoh [SubSource]. +pub struct SubSourceBuilder { + config: zenoh::Config, + key_expression: String, + _type: std::marker::PhantomData, +} + +impl SubSourceBuilder { + /// Create SubSource builder + pub fn new() -> SubSourceBuilder { + SubSourceBuilder { + config: zenoh::Config::default(), + key_expression: "future-sdr/*".into(), + _type: std::marker::PhantomData, + } + } + + /// Zenoh configuration + #[must_use] + pub fn config(mut self, config: zenoh::Config) -> SubSourceBuilder { + self.config = config; + self + } + + /// Zenoh key expression + #[must_use] + pub fn key_expression(mut self, key_expression: &str) -> SubSourceBuilder { + self.key_expression = key_expression.to_string(); + self + } + + /// Build SubSource + pub fn build(self) -> TypedBlock> { + SubSource::::new(self.config, self.key_expression) + } +} + +impl Default for SubSourceBuilder { + fn default() -> Self { + Self::new() + } +}