Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 2 additions & 1 deletion README.md
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ Rust bindings for the high-performance [eCAL](https://github.com/eclipse-ecal/ec
### Publisher

```rust
use std::sync::Arc;
use std::time::Duration;
use rustecal::{Ecal, EcalComponents, TypedPublisher};
use rustecal_types_string::StringMessage;
Expand All @@ -31,7 +32,7 @@ fn main() {
let publisher = TypedPublisher::<StringMessage>::new("hello").unwrap();

// prepare the message to send
let message = StringMessage("Hello from Rust!".to_string());
let message = StringMessage(Arc::from("Hello from Rust!"));

// publish until eCAL shuts down
while Ecal::ok() {
Expand Down
2 changes: 1 addition & 1 deletion docs/src/examples/binary.md
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@ fn main() -> Result<(), Box<dyn std::error::Error>> {
let mut counter = 0u8;
while Ecal::ok() {
let buf = vec![counter; 1024];
pub_.send(&BytesMessage(buf));
pub_.send(&BytesMessage(Arc::from(buf)));

counter = counter.wrapping_add(1);
std::thread::sleep(std::time::Duration::from_millis(500));
Expand Down
3 changes: 2 additions & 1 deletion docs/src/examples/protobuf.md
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
## Publisher

```rust
use std::sync::Arc;
use rustecal::{Ecal, EcalComponents, TypedPublisher};
use rustecal_types_protobuf::{ProtobufMessage, IsProtobufType};

Expand All @@ -20,7 +21,7 @@ fn main() -> Result<(), Box<dyn std::error::Error>> {

while Ecal::ok() {
let person = Person { id: 1, name: "Alice".into(), ..Default::default() };
pub_.send(&ProtobufMessage(person));
pub_.send(&ProtobufMessage(Arc::from(person)));

std::thread::sleep(std::time::Duration::from_millis(500));
}
Expand Down
2 changes: 1 addition & 1 deletion docs/src/examples/string.md
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@ fn main() -> Result<(), Box<dyn std::error::Error>> {
let publisher = TypedPublisher::<StringMessage>::new("hello")?;

while Ecal::ok() {
let msg = StringMessage(format!("Hello from Rust"));
let msg = StringMessage(Arc::from("Hello from Rust"));
publisher.send(&msg);

std::thread::sleep(std::time::Duration::from_millis(500));
Expand Down
7 changes: 4 additions & 3 deletions rustecal-pubsub/src/typed_publisher.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
use crate::publisher::Publisher;
use rustecal_core::types::DataTypeInfo;
use crate::types::TopicId;
use std::sync::Arc;
use std::marker::PhantomData;

/// Trait for types that can be published via [`TypedPublisher`].
Expand All @@ -18,7 +19,7 @@ pub trait PublisherMessage {
fn datatype() -> DataTypeInfo;

/// Serializes the message into a byte buffer for transmission.
fn to_bytes(&self) -> Vec<u8>;
fn to_bytes(&self) -> Arc<[u8]>;
}

/// Type-safe, high-level wrapper around an eCAL publisher for messages of type `T`.
Expand All @@ -33,8 +34,8 @@ pub trait PublisherMessage {
/// use rustecal::TypedPublisher;
/// use rustecal_types_string::StringMessage;
///
/// let pub_ = TypedPublisher::<StringMessage>::new("example").unwrap();
/// pub_.send(&StringMessage("Hello World!".into()));
/// let pub_ = TypedPublisher::<StringMessage>::new("hello").unwrap();
/// pub_.send(&StringMessage(Arc::from("Hello World!")));
/// ```
pub struct TypedPublisher<T: PublisherMessage> {
publisher: Publisher,
Expand Down
6 changes: 4 additions & 2 deletions rustecal-pubsub/src/typed_subscriber.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ use rustecal_core::types::DataTypeInfo;
use crate::types::TopicId;
use rustecal_sys::{eCAL_SDataTypeInformation, eCAL_SReceiveCallbackData, eCAL_STopicId};
use std::ffi::{c_void, CStr};
use std::sync::Arc;
use std::marker::PhantomData;
use std::slice;

Expand All @@ -14,7 +15,7 @@ pub trait SubscriberMessage: Sized {
fn datatype() -> DataTypeInfo;

/// Constructs an instance of the message type from a byte slice.
fn from_bytes(bytes: &[u8]) -> Option<Self>;
fn from_bytes(bytes: Arc<[u8]>) -> Option<Self>;
}

/// Represents a received message with associated metadata.
Expand Down Expand Up @@ -190,8 +191,9 @@ extern "C" fn trampoline<T: SubscriberMessage>(
}

let msg_slice = slice::from_raw_parts((*data).buffer as *const u8, (*data).buffer_size);
let msg_arc = Arc::from(msg_slice);

if let Some(decoded) = T::from_bytes(msg_slice) {
if let Some(decoded) = T::from_bytes(msg_arc) {
let cb_wrapper = &*(user_data as *const CallbackWrapper<T>);

let topic_name = CStr::from_ptr((*topic_id).topic_name).to_string_lossy().into_owned();
Expand Down
4 changes: 3 additions & 1 deletion rustecal-samples/pubsub/blob_send/src/main.rs
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
use std::sync::Arc;
use rustecal::{Ecal, EcalComponents, TypedPublisher};
use rustecal_types_bytes::BytesMessage;

Expand All @@ -15,7 +16,8 @@ fn main() {
let buffer = vec![counter; 1024];
counter = counter.wrapping_add(1);

publisher.send(&BytesMessage(buffer));
let wrapped = BytesMessage(Arc::from(buffer));
publisher.send(&wrapped);

println!("Sent buffer filled with {}", counter);

Expand Down
9 changes: 5 additions & 4 deletions rustecal-samples/pubsub/hello_send/src/main.rs
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
use std::sync::Arc;
use rustecal::{Ecal, EcalComponents, TypedPublisher};
use rustecal_types_string::StringMessage;

Expand All @@ -12,12 +13,12 @@ fn main() {
while Ecal::ok() {
cnt += 1;
let msg = format!("HELLO WORLD FROM RUST ({})", cnt);
let wrapped = StringMessage(msg);

let wrapped = StringMessage(Arc::from(msg));
publisher.send(&wrapped);

println!("Sent: {}", wrapped.0);

std::thread::sleep(std::time::Duration::from_millis(500));
}

Expand Down
9 changes: 5 additions & 4 deletions rustecal-samples/pubsub/person_send/src/main.rs
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
use std::sync::Arc;
use rustecal::{Ecal, EcalComponents, TypedPublisher};
use rustecal_types_protobuf::{ProtobufMessage, IsProtobufType};

Expand Down Expand Up @@ -33,10 +34,6 @@ fn main() {
}),
};

// Wrap the person struct in ProtobufMessage
let wrapped = ProtobufMessage(person.clone());
publisher.send(&wrapped);

println!("person id : {}", person.id);
println!("person name : {}", person.name);
println!("person stype : {}", person.stype);
Expand All @@ -45,6 +42,10 @@ fn main() {
println!("house.rooms : {}", person.house.as_ref().map_or(0, |h| h.rooms));
println!();

// Wrap the person struct in ProtobufMessage
let wrapped = ProtobufMessage(Arc::from(person));
publisher.send(&wrapped);

std::thread::sleep(std::time::Duration::from_millis(500));
}

Expand Down
23 changes: 11 additions & 12 deletions rustecal-types-bytes/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,22 +4,21 @@
//!
//! ## Example
//! ```rust
//! use std::sync::Arc;
//! use rustecal_types_bytes::BytesMessage;
//! let msg = BytesMessage(vec![1, 2, 3, 4]);
//! let msg = BytesMessage(Arc::from([1, 2, 3, 4]));
//! ```

use std::sync::Arc;
use rustecal_core::types::DataTypeInfo;
use rustecal_pubsub::typed_publisher::PublisherMessage;
use rustecal_pubsub::typed_subscriber::SubscriberMessage;

/// A wrapper for raw binary data transmitted via eCAL.
/// A wrapper for raw binary messages used with typed eCAL pub/sub.
///
/// This message type is ideal for non-structured byte payloads such as images,
/// serialized custom formats, or arbitrary buffers.
///
/// Implements both [`PublisherMessage`] and [`SubscriberMessage`] to support
/// bidirectional pub/sub use.
pub struct BytesMessage(pub Vec<u8>);
/// This type allows sending and receiving raw binary payloads through the
/// `TypedPublisher` and `TypedSubscriber` APIs.
pub struct BytesMessage(pub Arc<[u8]>);

impl SubscriberMessage for BytesMessage {
/// Returns metadata describing the message encoding and type.
Expand All @@ -34,8 +33,8 @@ impl SubscriberMessage for BytesMessage {
}

/// Creates a `BytesMessage` from a raw byte slice.
fn from_bytes(bytes: &[u8]) -> Option<Self> {
Some(BytesMessage(bytes.to_vec()))
fn from_bytes(bytes: Arc<[u8]>) -> Option<Self> {
Some(BytesMessage(Arc::from(bytes)))
}
}

Expand All @@ -45,8 +44,8 @@ impl PublisherMessage for BytesMessage {
<BytesMessage as SubscriberMessage>::datatype()
}

/// Converts the internal byte vector into a byte slice for sending.
fn to_bytes(&self) -> Vec<u8> {
/// Returns the internal binary data as an Arc<[u8]> for zero-copy transmission.
fn to_bytes(&self) -> Arc<[u8]> {
self.0.clone()
}
}
28 changes: 10 additions & 18 deletions rustecal-types-protobuf/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,9 +9,10 @@
//! ## Example
//! ```rust
//! use rustecal_types_protobuf::ProtobufMessage;
//! let msg = ProtobufMessage(MyProstType { ... });
//! let msg = ProtobufMessage(Arc::new(my_proto::MyMessage::default()));
//! ```

use std::sync::Arc;
use prost::Message;
use rustecal_core::types::DataTypeInfo;
use rustecal_pubsub::typed_publisher::PublisherMessage;
Expand All @@ -24,21 +25,12 @@ use rustecal_pubsub::typed_subscriber::SubscriberMessage;
/// to ensure users are aware of what's being exposed to eCAL.
pub trait IsProtobufType {}

/// Wrapper around a `prost::Message` type to enable typed publishing and subscription.
/// A wrapper for protobuf messages used with typed eCAL pub/sub.
///
/// This is the type that should be used with `TypedPublisher` and `TypedSubscriber`
/// for Protobuf messages.
///
/// ```rust
/// use rustecal_types_protobuf::{ProtobufMessage, IsProtobufType};
/// use my_proto::MyMessage;
///
/// impl IsProtobufType for MyMessage {}
///
/// let wrapped = ProtobufMessage(MyMessage::default());
/// ```
/// This type allows sending and receiving protobuf messages through the
/// `TypedPublisher` and `TypedSubscriber` APIs.
#[derive(Debug, Clone)]
pub struct ProtobufMessage<T>(pub T);
pub struct ProtobufMessage<T>(pub Arc<T>);

impl<T> SubscriberMessage for ProtobufMessage<T>
where
Expand All @@ -63,8 +55,8 @@ where
/// # Returns
/// - `Some(ProtobufMessage<T>)` on success
/// - `None` if decoding fails
fn from_bytes(bytes: &[u8]) -> Option<Self> {
T::decode(bytes).ok().map(ProtobufMessage)
fn from_bytes(bytes: Arc<[u8]>) -> Option<Self> {
T::decode(bytes.as_ref()).ok().map(|msg| ProtobufMessage(Arc::new(msg)))
}
}

Expand All @@ -81,11 +73,11 @@ where
///
/// # Panics
/// Will panic if `prost::Message::encode` fails (should never panic for valid messages).
fn to_bytes(&self) -> Vec<u8> {
fn to_bytes(&self) -> Arc<[u8]> {
let mut buf = Vec::with_capacity(self.0.encoded_len());
self.0
.encode(&mut buf)
.expect("Failed to encode protobuf message");
buf
Arc::from(buf)
}
}
20 changes: 12 additions & 8 deletions rustecal-types-string/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,20 +4,22 @@
//!
//! ## Example
//! ```rust
//! use std::sync::Arc;
//! use rustecal_types_string::StringMessage;
//! let msg = StringMessage("Hello World".into());
//! let msg = StringMessage(Arc::from("Hello World"));
//! ```

use std::str;
use std::sync::Arc;
use rustecal_core::types::DataTypeInfo;
use rustecal_pubsub::typed_publisher::PublisherMessage;
use rustecal_pubsub::typed_subscriber::SubscriberMessage;
use std::str;

/// A wrapper for UTF-8 string messages used with typed eCAL pub/sub.
///
/// This type allows sending and receiving strings through the
/// `TypedPublisher<StringMessage>` and `TypedSubscriber<StringMessage>` APIs.
pub struct StringMessage(pub String);
/// `TypedPublisher` and `TypedSubscriber` APIs.
pub struct StringMessage(pub Arc<str>);

impl SubscriberMessage for StringMessage {
/// Returns metadata describing this message type (`utf-8` encoded string).
Expand All @@ -30,8 +32,10 @@ impl SubscriberMessage for StringMessage {
}

/// Attempts to decode a UTF-8 string from a byte buffer.
fn from_bytes(bytes: &[u8]) -> Option<Self> {
str::from_utf8(bytes).ok().map(|s| StringMessage(s.to_string()))
fn from_bytes(bytes: Arc<[u8]>) -> Option<Self> {
str::from_utf8(bytes.as_ref())
.ok()
.map(|s| StringMessage(Arc::<str>::from(s)))
}
}

Expand All @@ -42,7 +46,7 @@ impl PublisherMessage for StringMessage {
}

/// Serializes the string into a byte buffer.
fn to_bytes(&self) -> Vec<u8> {
self.0.as_bytes().to_vec()
fn to_bytes(&self) -> Arc<[u8]> {
Arc::from(self.0.as_bytes())
}
}