Skip to content
Open
Show file tree
Hide file tree
Changes from 3 commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@ bimap = { version = "0.6.3", optional = true }
sigmf = { version = "0.1.0", path = "crates/sigmf" }
async-fs = "2.1.2"
serde = "1.0.204"
zenoh = { version = "1.2.1", optional = true }

[dev-dependencies]
criterion = { version = "0.5.1", features = ["html_reports"] }
Expand All @@ -42,6 +43,7 @@ default = []
crossbeam = ["dep:crossbeam-channel"]
async-channel = ["dep:async-channel"]
cw = ["dep:bimap"]
zenoh = ["dep:zenoh"]

[[bench]]
name = "crossbeam_sink"
Expand Down
19 changes: 19 additions & 0 deletions examples/zenoh/Cargo.toml
Original file line number Diff line number Diff line change
@@ -0,0 +1,19 @@
[package]
name = "zenoh"
version = "0.1.0"
edition = "2021"

[workspace]

[dependencies]
anyhow = "1.0"
fsdr-blocks = { path = "../..", features = ["zenoh"] }
futuresdr = { git = "https://github.com/FutureSDR/FutureSDR", branch = "main", version = "0.0.37"}

[[bin]]
name = "zenoh-receiver"
path = "zenoh_receiver.rs"

[[bin]]
name = "zenoh-sender"
path = "zenoh_sender.rs"
24 changes: 24 additions & 0 deletions examples/zenoh/README.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,24 @@
# Zenoh Blocks Example

## Overview
This example demonstrates how to use the `PubSink` and `SubSource` blocks. The `PubSink` declares a Zenoh publisher that publishes samples to the given key expression. The `SubSource` declares a Zenoh subscriber that subscribes to samples from the given key expression.

## Build
To build the example follow these steps:

- Open a terminal
- Run `cargo build`

## Run
To run the example follow these steps:

- Open a terminal
- Run `cargo run --bin zenoh-receiver`
- Open another terminal
- Run `cargo run --bin zenoh-sender`
- Terminate both programs after an arbitrary amount of time
- The output file will be located at `/tmp/zenoh-log.bin`

## References
- [FutureSDR](https://www.futuresdr.org/)
- [Zenoh](https://zenoh.io/)
18 changes: 18 additions & 0 deletions examples/zenoh/zenoh_receiver.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,18 @@
use anyhow::Result;
use fsdr_blocks::zenoh::SubSourceBuilder;
use futuresdr::blocks::FileSink;
use futuresdr::runtime::Flowgraph;
use futuresdr::runtime::Runtime;

fn main() -> Result<()> {
let mut flowgraph = Flowgraph::new();

let sub_source = flowgraph.add_block(SubSourceBuilder::<u8>::new().build())?;
let file_sink = flowgraph.add_block(FileSink::<u8>::new("/tmp/zenoh-log.bin"))?;

flowgraph.connect_stream(sub_source, "out", file_sink, "in")?;

Runtime::new().run(flowgraph)?;

Ok(())
}
24 changes: 24 additions & 0 deletions examples/zenoh/zenoh_sender.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,24 @@
use anyhow::Result;
use fsdr_blocks::zenoh::PubSinkBuilder;
use futuresdr::blocks::Head;
use futuresdr::blocks::NullSource;
use futuresdr::blocks::Throttle;
use futuresdr::runtime::Flowgraph;
use futuresdr::runtime::Runtime;

fn main() -> Result<()> {
let mut flowgraph = Flowgraph::new();

let null_source = flowgraph.add_block(NullSource::<u8>::new())?;
let head = flowgraph.add_block(Head::<u8>::new(1_000_000))?;
let throttle = flowgraph.add_block(Throttle::<u8>::new(100e3))?;
let pub_sink = flowgraph.add_block(PubSinkBuilder::<u8>::new().build())?;

flowgraph.connect_stream(null_source, "out", head, "in")?;
flowgraph.connect_stream(head, "out", throttle, "in")?;
flowgraph.connect_stream(throttle, "out", pub_sink, "in")?;

Runtime::new().run(flowgraph)?;

Ok(())
}
3 changes: 3 additions & 0 deletions src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,9 @@ pub mod async_channel;
#[cfg(feature = "cw")]
pub mod cw;

#[cfg(feature = "zenoh")]
pub mod zenoh;

pub mod agc;
pub mod math;
pub mod sigmf;
Expand Down
8 changes: 8 additions & 0 deletions src/zenoh/mod.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,8 @@
//! ## [Zenoh](https://zenoh.io/) Blocks
mod pub_sink;
pub use pub_sink::PubSink;
pub use pub_sink::PubSinkBuilder;

mod sub_source;
pub use sub_source::SubSource;
pub use sub_source::SubSourceBuilder;
143 changes: 143 additions & 0 deletions src/zenoh/pub_sink.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,143 @@
use futuresdr::runtime::BlockMeta;
use futuresdr::runtime::BlockMetaBuilder;
use futuresdr::runtime::Kernel;
use futuresdr::runtime::MessageIo;
use futuresdr::runtime::MessageIoBuilder;
use futuresdr::runtime::Result;
use futuresdr::runtime::StreamIo;
use futuresdr::runtime::StreamIoBuilder;
use futuresdr::runtime::TypedBlock;
use futuresdr::runtime::WorkIo;

/// Push samples into [Zenoh](https://zenoh.io/) socket.
pub struct PubSink<T: Send + 'static> {
config: zenoh::Config,
key_expression: String,
session: Option<zenoh::Session>,
publisher: Option<zenoh::pubsub::Publisher<'static>>,
_type: std::marker::PhantomData<T>,
min_item: usize,
}

impl<T: Send + 'static> PubSink<T> {
/// Create PubSink
pub fn new(
config: zenoh::Config,
key_expression: impl Into<String>,
min_item: usize,
) -> TypedBlock<Self> {
TypedBlock::new(
BlockMetaBuilder::new("PubSink").blocking().build(),
StreamIoBuilder::new().add_input::<T>("in").build(),
MessageIoBuilder::new().build(),
PubSink {
config: config,
key_expression: key_expression.into(),
session: None,
publisher: None,
_type: std::marker::PhantomData::<T>,
min_item,
},
)
}
}

#[doc(hidden)]
#[async_trait]
impl<T: Send + 'static> Kernel for PubSink<T> {
async fn work(
&mut self,
work_io: &mut WorkIo,
stream_io: &mut StreamIo,
_message_io: &mut MessageIo<Self>,
_meta: &mut BlockMeta,
) -> Result<()> {
let input = stream_io.input(0).slice::<T>();

let input_len = input.len();

if input_len > 0 && input_len > self.min_item {
let input = stream_io.input(0).slice_unchecked::<u8>();

self.publisher.as_mut().unwrap().put(input).await.unwrap();

stream_io.input(0).consume(input_len);
}

if stream_io.input(0).finished() {
work_io.finished = true;
}

Ok(())
}

async fn init(
&mut self,
_stream_io: &mut StreamIo,
_message_io: &mut MessageIo<Self>,
_meta: &mut BlockMeta,
) -> Result<()> {
let session = zenoh::open(self.config.clone()).await.unwrap();
let publisher = session
.declare_publisher(self.key_expression.clone())
.await
.unwrap();

self.session = Some(session);
self.publisher = Some(publisher);

Ok(())
}
}

/// Build a Zenoh [PubSink].
pub struct PubSinkBuilder<T: Send + 'static> {
config: zenoh::Config,
key_expression: String,
_type: std::marker::PhantomData<T>,
/// Minimum number of items per send
min_item: usize,
}

impl<T: Send + 'static> PubSinkBuilder<T> {
/// Create PubSink builder
pub fn new() -> PubSinkBuilder<T> {
PubSinkBuilder {
config: zenoh::Config::default(),
key_expression: "future-sdr/*".into(),
_type: std::marker::PhantomData,
min_item: 1,
}
}

/// Zenoh configuration
#[must_use]
pub fn config(mut self, config: zenoh::Config) -> PubSinkBuilder<T> {
self.config = config;
self
}

/// Zenoh key expression
#[must_use]
pub fn key_expression(mut self, key_expression: &str) -> PubSinkBuilder<T> {
self.key_expression = key_expression.to_string();
self
}

/// Set minimum number of items in send buffer
pub fn min_item_per_send(mut self, min_item: usize) -> PubSinkBuilder<T> {
self.min_item = min_item;
self
}

/// Build PubSink
pub fn build(self) -> TypedBlock<PubSink<T>> {
PubSink::<T>::new(self.config, self.key_expression, self.min_item)
}
}

impl<T: Send + 'static> Default for PubSinkBuilder<T> {
fn default() -> Self {
Self::new()
}
}
126 changes: 126 additions & 0 deletions src/zenoh/sub_source.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,126 @@
use futuresdr::runtime::BlockMeta;
use futuresdr::runtime::BlockMetaBuilder;
use futuresdr::runtime::Kernel;
use futuresdr::runtime::MessageIo;
use futuresdr::runtime::MessageIoBuilder;
use futuresdr::runtime::Result;
use futuresdr::runtime::StreamIo;
use futuresdr::runtime::StreamIoBuilder;
use futuresdr::runtime::TypedBlock;
use futuresdr::runtime::WorkIo;

/// Read samples from [Zenoh](https://zenoh.io/) socket.
pub struct SubSource<T: Send + 'static> {
config: zenoh::Config,
key_expression: String,
session: Option<zenoh::Session>,
subscriber: Option<
zenoh::pubsub::Subscriber<zenoh::handlers::FifoChannelHandler<zenoh::sample::Sample>>,
>,
_type: std::marker::PhantomData<T>,
}

impl<T: Send + 'static> SubSource<T> {
/// Create SubSource block
pub fn new(config: zenoh::Config, key_expression: impl Into<String>) -> TypedBlock<Self> {
TypedBlock::new(
BlockMetaBuilder::new("SubSource").blocking().build(),
StreamIoBuilder::new().add_output::<T>("out").build(),
MessageIoBuilder::new().build(),
SubSource {
config: config,
key_expression: key_expression.into(),
session: None,
subscriber: None,
_type: std::marker::PhantomData::<T>,
},
)
}
}

#[doc(hidden)]
#[async_trait]
impl<T: Send + 'static> Kernel for SubSource<T> {
async fn work(
&mut self,
_work_io: &mut WorkIo,
stream_io: &mut StreamIo,
_message_io: &mut MessageIo<Self>,
_meta: &mut BlockMeta,
) -> Result<()> {
let output = stream_io.output(0).slice_unchecked::<u8>();

if let Ok(sample) = self.subscriber.as_mut().unwrap().recv_async().await {
let payload = sample.payload().to_bytes();

let output_len = std::cmp::min(output.len(), payload.as_ref().len());

output[..output_len].copy_from_slice(&payload.as_ref()[..output_len]);

stream_io.output(0).produce(output_len);
}

Ok(())
}

async fn init(
&mut self,
_stream_io: &mut StreamIo,
_message_io: &mut MessageIo<Self>,
_meta: &mut BlockMeta,
) -> Result<()> {
let session = zenoh::open(self.config.clone()).await.unwrap();
let subscriber = session
.declare_subscriber(self.key_expression.clone())
.await
.unwrap();

self.session = Some(session);
self.subscriber = Some(subscriber);

Ok(())
}
}

/// Build a Zenoh [SubSource].
pub struct SubSourceBuilder<T: Send + 'static> {
config: zenoh::Config,
key_expression: String,
_type: std::marker::PhantomData<T>,
}

impl<T: Send + 'static> SubSourceBuilder<T> {
/// Create SubSource builder
pub fn new() -> SubSourceBuilder<T> {
SubSourceBuilder {
config: zenoh::Config::default(),
key_expression: "future-sdr/*".into(),
_type: std::marker::PhantomData,
}
}

/// Zenoh configuration
#[must_use]
pub fn config(mut self, config: zenoh::Config) -> SubSourceBuilder<T> {
self.config = config;
self
}

/// Zenoh key expression
#[must_use]
pub fn key_expression(mut self, key_expression: &str) -> SubSourceBuilder<T> {
self.key_expression = key_expression.to_string();
self
}

/// Build SubSource
pub fn build(self) -> TypedBlock<SubSource<T>> {
SubSource::<T>::new(self.config, self.key_expression)
}
}

impl<T: Send + 'static> Default for SubSourceBuilder<T> {
fn default() -> Self {
Self::new()
}
}