Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@ bimap = { version = "0.6.3", optional = true }
sigmf = { version = "0.1.0", path = "crates/sigmf" }
async-fs = "2.1.2"
serde = "1.0.204"
zenoh = { version = "1.2.1", optional = true }

[dev-dependencies]
criterion = { version = "0.5.1", features = ["html_reports"] }
Expand All @@ -42,6 +43,7 @@ default = []
crossbeam = ["dep:crossbeam-channel"]
async-channel = ["dep:async-channel"]
cw = ["dep:bimap"]
zenoh = ["dep:zenoh"]

[[bench]]
name = "crossbeam_sink"
Expand Down
19 changes: 19 additions & 0 deletions examples/zenoh/Cargo.toml
Original file line number Diff line number Diff line change
@@ -0,0 +1,19 @@
[package]
name = "zenoh"
version = "0.1.0"
edition = "2021"

[workspace]

[dependencies]
anyhow = "1.0"
fsdr-blocks = { path = "../..", features = ["zenoh"] }
futuresdr = { git = "https://github.com/FutureSDR/FutureSDR", branch = "main", version = "0.0.37"}

[[bin]]
name = "zenoh-receiver"
path = "zenoh_receiver.rs"

[[bin]]
name = "zenoh-sender"
path = "zenoh_sender.rs"
24 changes: 24 additions & 0 deletions examples/zenoh/README.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,24 @@
# Zenoh Blocks Example

## Overview
This example demonstrates how to use the `PubSink` and `SubSource` blocks. The `PubSink` declares a Zenoh publisher that publishes samples to the given key expression. The `SubSource` declares a Zenoh subscriber that subscribes to samples from the given key expression.

## Build
To build the example follow these steps:

- Open a terminal
- Run `cargo build`

## Run
To run the example follow these steps:

- Open a terminal
- Run `cargo run --bin zenoh-receiver`
- Open another terminal
- Run `cargo run --bin zenoh-sender`
- Terminate both programs after an arbitrary amount of time
- The output file will be located at `/tmp/zenoh-log.bin`

## References
- [FutureSDR](https://www.futuresdr.org/)
- [Zenoh](https://zenoh.io/)
18 changes: 18 additions & 0 deletions examples/zenoh/zenoh_receiver.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,18 @@
use anyhow::Result;
use fsdr_blocks::zenoh::SubSourceBuilder;
use futuresdr::blocks::FileSink;
use futuresdr::runtime::Flowgraph;
use futuresdr::runtime::Runtime;

fn main() -> Result<()> {
let mut flowgraph = Flowgraph::new();

let sub_source = flowgraph.add_block(SubSourceBuilder::<u8>::new().build())?;
let file_sink = flowgraph.add_block(FileSink::<u8>::new("/tmp/zenoh-log.bin"))?;

flowgraph.connect_stream(sub_source, "out", file_sink, "in")?;

Runtime::new().run(flowgraph)?;

Ok(())
}
24 changes: 24 additions & 0 deletions examples/zenoh/zenoh_sender.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,24 @@
use anyhow::Result;
use fsdr_blocks::zenoh::PubSinkBuilder;
use futuresdr::blocks::Head;
use futuresdr::blocks::NullSource;
use futuresdr::blocks::Throttle;
use futuresdr::runtime::Flowgraph;
use futuresdr::runtime::Runtime;

fn main() -> Result<()> {
let mut flowgraph = Flowgraph::new();

let null_source = flowgraph.add_block(NullSource::<u8>::new())?;
let head = flowgraph.add_block(Head::<u8>::new(1_000_000))?;
let throttle = flowgraph.add_block(Throttle::<u8>::new(100e3))?;
let pub_sink = flowgraph.add_block(PubSinkBuilder::<u8>::new().build())?;

flowgraph.connect_stream(null_source, "out", head, "in")?;
flowgraph.connect_stream(head, "out", throttle, "in")?;
flowgraph.connect_stream(throttle, "out", pub_sink, "in")?;

Runtime::new().run(flowgraph)?;

Ok(())
}
3 changes: 3 additions & 0 deletions src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,9 @@ pub mod async_channel;
#[cfg(feature = "cw")]
pub mod cw;

#[cfg(feature = "zenoh")]
pub mod zenoh;

pub mod agc;
pub mod math;
pub mod sigmf;
Expand Down
8 changes: 8 additions & 0 deletions src/zenoh/mod.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,8 @@
//! ## [Zenoh](https://zenoh.io/) Blocks
mod pub_sink;
pub use pub_sink::PubSink;
pub use pub_sink::PubSinkBuilder;

mod sub_source;
pub use sub_source::SubSource;
pub use sub_source::SubSourceBuilder;
144 changes: 144 additions & 0 deletions src/zenoh/pub_sink.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,144 @@
use futuresdr::runtime::BlockMeta;
use futuresdr::runtime::BlockMetaBuilder;
use futuresdr::runtime::Kernel;
use futuresdr::runtime::MessageIo;
use futuresdr::runtime::MessageIoBuilder;
use futuresdr::runtime::Result;
use futuresdr::runtime::StreamIo;
use futuresdr::runtime::StreamIoBuilder;
use futuresdr::runtime::TypedBlock;
use futuresdr::runtime::WorkIo;

/// Push samples into [Zenoh](https://zenoh.io/) socket.
pub struct PubSink<T: Send + 'static> {
config: zenoh::Config,
key_expression: String,
session: Option<zenoh::Session>,
publisher: Option<zenoh::pubsub::Publisher<'static>>,
_type: std::marker::PhantomData<T>,
min_item: usize,
}

impl<T: Send + 'static> PubSink<T> {
/// Create PubSink
pub fn new(
config: zenoh::Config,
key_expression: impl Into<String>,
min_item: usize,
) -> TypedBlock<Self> {
TypedBlock::new(
BlockMetaBuilder::new("PubSink").blocking().build(),
StreamIoBuilder::new().add_input::<T>("in").build(),
MessageIoBuilder::new().build(),
PubSink {
config: config,
key_expression: key_expression.into(),
session: None,
publisher: None,
_type: std::marker::PhantomData::<T>,
min_item,
},
)
}
}

#[doc(hidden)]
#[async_trait]
impl<T: Send + 'static> Kernel for PubSink<T> {
async fn work(
&mut self,
work_io: &mut WorkIo,
stream_io: &mut StreamIo,
_message_io: &mut MessageIo<Self>,
_meta: &mut BlockMeta,
) -> Result<()> {
let input = stream_io.input(0).slice_unchecked::<u8>();

let item_size = std::mem::size_of::<T>();
let item_count = input.len() / item_size;

if item_count > 0 && item_count > self.min_item {
let input = &input[..item_count * item_size];

self.publisher.as_mut().unwrap().put(input).await.unwrap();

stream_io.input(0).consume(item_count);
}

if stream_io.input(0).finished() {
work_io.finished = true;
}

Ok(())
}

async fn init(
&mut self,
_stream_io: &mut StreamIo,
_message_io: &mut MessageIo<Self>,
_meta: &mut BlockMeta,
) -> Result<()> {
let session = zenoh::open(self.config.clone()).await.unwrap();
let publisher = session
.declare_publisher(self.key_expression.clone())
.await
.unwrap();

self.session = Some(session);
self.publisher = Some(publisher);

Ok(())
}
}

/// Build a Zenoh [PubSink].
pub struct PubSinkBuilder<T: Send + 'static> {
config: zenoh::Config,
key_expression: String,
_type: std::marker::PhantomData<T>,
/// Minimum number of items per send
min_item: usize,
}

impl<T: Send + 'static> PubSinkBuilder<T> {
/// Create PubSink builder
pub fn new() -> PubSinkBuilder<T> {
PubSinkBuilder {
config: zenoh::Config::default(),
key_expression: "future-sdr/*".into(),
_type: std::marker::PhantomData,
min_item: 1,
}
}

/// Zenoh configuration
#[must_use]
pub fn config(mut self, config: zenoh::Config) -> PubSinkBuilder<T> {
self.config = config;
self
}

/// Zenoh key expression
#[must_use]
pub fn key_expression(mut self, key_expression: &str) -> PubSinkBuilder<T> {
self.key_expression = key_expression.to_string();
self
}

/// Set minimum number of items in send buffer
pub fn min_item_per_send(mut self, min_item: usize) -> PubSinkBuilder<T> {
self.min_item = min_item;
self
}

/// Build PubSink
pub fn build(self) -> TypedBlock<PubSink<T>> {
PubSink::<T>::new(self.config, self.key_expression, self.min_item)
}
}

impl<T: Send + 'static> Default for PubSinkBuilder<T> {
fn default() -> Self {
Self::new()
}
}
126 changes: 126 additions & 0 deletions src/zenoh/sub_source.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,126 @@
use futuresdr::runtime::BlockMeta;
use futuresdr::runtime::BlockMetaBuilder;
use futuresdr::runtime::Kernel;
use futuresdr::runtime::MessageIo;
use futuresdr::runtime::MessageIoBuilder;
use futuresdr::runtime::Result;
use futuresdr::runtime::StreamIo;
use futuresdr::runtime::StreamIoBuilder;
use futuresdr::runtime::TypedBlock;
use futuresdr::runtime::WorkIo;

/// Read samples from [Zenoh](https://zenoh.io/) socket.
pub struct SubSource<T: Send + 'static> {
config: zenoh::Config,
key_expression: String,
session: Option<zenoh::Session>,
subscriber: Option<
zenoh::pubsub::Subscriber<zenoh::handlers::FifoChannelHandler<zenoh::sample::Sample>>,
>,
_type: std::marker::PhantomData<T>,
}

impl<T: Send + 'static> SubSource<T> {
/// Create SubSource block
pub fn new(config: zenoh::Config, key_expression: impl Into<String>) -> TypedBlock<Self> {
TypedBlock::new(
BlockMetaBuilder::new("SubSource").blocking().build(),
StreamIoBuilder::new().add_output::<T>("out").build(),
MessageIoBuilder::new().build(),
SubSource {
config: config,
key_expression: key_expression.into(),
session: None,
subscriber: None,
_type: std::marker::PhantomData::<T>,
},
)
}
}

#[doc(hidden)]
#[async_trait]
impl<T: Send + 'static> Kernel for SubSource<T> {
async fn work(
&mut self,
_work_io: &mut WorkIo,
stream_io: &mut StreamIo,
_message_io: &mut MessageIo<Self>,
_meta: &mut BlockMeta,
) -> Result<()> {
let output = stream_io.output(0).slice_unchecked::<u8>();

if let Ok(sample) = self.subscriber.as_mut().unwrap().recv_async().await {
let payload = sample.payload().to_bytes();

let output_len = std::cmp::min(output.len(), payload.as_ref().len());

output[..output_len].copy_from_slice(&payload.as_ref()[..output_len]);

stream_io.output(0).produce(output_len);
}

Ok(())
}

async fn init(
&mut self,
_stream_io: &mut StreamIo,
_message_io: &mut MessageIo<Self>,
_meta: &mut BlockMeta,
) -> Result<()> {
let session = zenoh::open(self.config.clone()).await.unwrap();
let subscriber = session
.declare_subscriber(self.key_expression.clone())
.await
.unwrap();

self.session = Some(session);
self.subscriber = Some(subscriber);

Ok(())
}
}

/// Build a Zenoh [SubSource].
pub struct SubSourceBuilder<T: Send + 'static> {
config: zenoh::Config,
key_expression: String,
_type: std::marker::PhantomData<T>,
}

impl<T: Send + 'static> SubSourceBuilder<T> {
/// Create SubSource builder
pub fn new() -> SubSourceBuilder<T> {
SubSourceBuilder {
config: zenoh::Config::default(),
key_expression: "future-sdr/*".into(),
_type: std::marker::PhantomData,
}
}

/// Zenoh configuration
#[must_use]
pub fn config(mut self, config: zenoh::Config) -> SubSourceBuilder<T> {
self.config = config;
self
}

/// Zenoh key expression
#[must_use]
pub fn key_expression(mut self, key_expression: &str) -> SubSourceBuilder<T> {
self.key_expression = key_expression.to_string();
self
}

/// Build SubSource
pub fn build(self) -> TypedBlock<SubSource<T>> {
SubSource::<T>::new(self.config, self.key_expression)
}
}

impl<T: Send + 'static> Default for SubSourceBuilder<T> {
fn default() -> Self {
Self::new()
}
}