diff --git a/README.md b/README.md index 4e30d68..5a14eb3 100644 --- a/README.md +++ b/README.md @@ -31,7 +31,6 @@ This crate will fail to compile if the native libraries are not found. ### Publisher ```rust -use std::sync::Arc; use std::time::Duration; use rustecal::{Ecal, EcalComponents, TypedPublisher}; use rustecal_types_string::StringMessage; @@ -44,11 +43,11 @@ fn main() { let publisher = TypedPublisher::::new("hello").unwrap(); // prepare the message to send - let message = StringMessage { data: Arc::from("Hello from Rust") }; + let message = StringMessage { data: "Hello from Rust".into() }; // publish until eCAL shuts down while Ecal::ok() { - publisher.send(&message); + publisher.send(&message, Timestamp::Auto); std::thread::sleep(Duration::from_millis(500)); } diff --git a/docs/src/api/publisher.md b/docs/src/api/publisher.md index d4da7c6..1dcb60a 100644 --- a/docs/src/api/publisher.md +++ b/docs/src/api/publisher.md @@ -5,12 +5,11 @@ The `Publisher` allows you to publish messages of type `T` on a topic. ## Example ```rust -use std::sync::Arc; use rustecal::{Ecal, EcalComponents, TypedPublisher}; use rustecal_types_string::StringMessage; let publisher = TypedPublisher::::new("hello").unwrap(); -let message = StringMessage { data: Arc::from("Hello from Rust") }; -publisher.send(&message); +let message = StringMessage { data: "Hello from Rust".into() } +publisher.send(&message, Timestamp::Auto); ``` diff --git a/docs/src/examples/binary.md b/docs/src/examples/binary.md index 3219092..d3cfac7 100644 --- a/docs/src/examples/binary.md +++ b/docs/src/examples/binary.md @@ -3,7 +3,6 @@ ## Publisher ```rust -use std::sync::Arc; use rustecal::{Ecal, EcalComponents, TypedPublisher}; use rustecal_types_bytes::BytesMessage; @@ -17,8 +16,8 @@ fn main() -> Result<(), Box> { let buf = vec![counter; 1024]; counter = counter.wrapping_add(1); - let message = BytesMessage { data: Arc::from(buf) }; - publisher.send(&message); + let message = BytesMessage { data: buf.into() }; + publisher.send(&message, Timestamp::Auto); std::thread::sleep(std::time::Duration::from_millis(500)); } diff --git a/docs/src/examples/json.md b/docs/src/examples/json.md index 7ee0421..0dfc071 100644 --- a/docs/src/examples/json.md +++ b/docs/src/examples/json.md @@ -3,7 +3,6 @@ ## Publisher ```rust -use std::sync::Arc; use serde::{Serialize, Deserialize}; use rustecal::{Ecal, EcalComponents, TypedPublisher}; use rustecal_types_serde::JsonMessage; @@ -21,7 +20,7 @@ fn main() -> Result<(), Box> { while Ecal::ok() { let payload = MyData { msg: "Hello from Rust".into() }; let message = JsonMessage::new(payload); - publisher.send(&message); + publisher.send(&message, Timestamp::Auto); std::thread::sleep(std::time::Duration::from_millis(500)); } @@ -34,7 +33,6 @@ fn main() -> Result<(), Box> { ## Subscriber ```rust -use std::sync::Arc; use serde::{Serialize, Deserialize}; use rustecal::{Ecal, EcalComponents, TypedSubscriber}; use rustecal_types_serde::JsonMessage; diff --git a/docs/src/examples/protobuf.md b/docs/src/examples/protobuf.md index 7e4e774..177454b 100644 --- a/docs/src/examples/protobuf.md +++ b/docs/src/examples/protobuf.md @@ -23,7 +23,7 @@ fn main() -> Result<(), Box> { let person = Person { id: 1, name: "Alice".into(), ..Default::default() }; let message = ProtobufMessage { data : Arc::from(person) }; - publisher.send(&message); + publisher.send(&message, Timestamp::Auto); std::thread::sleep(std::time::Duration::from_millis(500)); } diff --git a/docs/src/examples/string.md b/docs/src/examples/string.md index 884476b..9783cb6 100644 --- a/docs/src/examples/string.md +++ b/docs/src/examples/string.md @@ -3,7 +3,6 @@ ## Publisher ```rust -use std::sync::Arc; use rustecal::{Ecal, EcalComponents, TypedPublisher}; use rustecal_types_string::StringMessage; @@ -13,8 +12,8 @@ fn main() -> Result<(), Box> { let publisher = TypedPublisher::::new("hello")?; while Ecal::ok() { - let message = StringMessage { data: Arc::from("Hello from Rust") }; - publisher.send(&message); + let message = StringMessage { data: "Hello from Rust".into() }; + publisher.send(&message, Timestamp::Auto); std::thread::sleep(std::time::Duration::from_millis(500)); } diff --git a/rustecal-pubsub/README.md b/rustecal-pubsub/README.md index 9cf9ac3..88fdff2 100644 --- a/rustecal-pubsub/README.md +++ b/rustecal-pubsub/README.md @@ -28,7 +28,6 @@ rustecal-pubsub = "0.1" ### Typed Publisher Example ```rust -use std::sync::Arc; use rustecal::{Ecal, EcalComponents, TypedPublisher}; use rustecal_types_string::StringMessage; @@ -38,8 +37,8 @@ fn main() -> Result<(), Box> { let publisher = TypedPublisher::::new("hello")?; while Ecal::ok() { - let message = StringMessage { data: Arc::from("Hello from Rust") }; - publisher.send(&message); + let message = StringMessage { data: "Hello from Rust".into() }; + publisher.send(&message, Timestamp::Auto); std::thread::sleep(std::time::Duration::from_millis(500)); } diff --git a/rustecal-pubsub/src/lib.rs b/rustecal-pubsub/src/lib.rs index 3c5ad27..c996c44 100644 --- a/rustecal-pubsub/src/lib.rs +++ b/rustecal-pubsub/src/lib.rs @@ -22,6 +22,7 @@ pub mod publisher; pub mod subscriber; pub mod typed_publisher; pub mod typed_subscriber; +pub mod payload_writer; // Public API pub use publisher::Publisher; @@ -30,3 +31,4 @@ pub use typed_publisher::TypedPublisher; pub use typed_publisher::PublisherMessage; pub use typed_subscriber::TypedSubscriber; pub use typed_subscriber::SubscriberMessage; +pub use payload_writer::PayloadWriter; \ No newline at end of file diff --git a/rustecal-pubsub/src/payload_writer.rs b/rustecal-pubsub/src/payload_writer.rs new file mode 100644 index 0000000..83883d0 --- /dev/null +++ b/rustecal-pubsub/src/payload_writer.rs @@ -0,0 +1,68 @@ +// payload_writer.rs +// +// Defines the `PayloadWriter` trait for zero-copy shared-memory payload writes, +// plus thread-local storage and C-style callback functions to integrate +// with eCAL's `SendPayloadWriter` API, using mutable references rather than owning values. + +use std::cell::RefCell; +use std::os::raw::{c_void, c_int}; + +/// A zero‐copy payload writer: you fill the shared‐memory buffer in place. +pub trait PayloadWriter { + /// Called once when the memory is first allocated (or resized). + /// Return `true` on success. + fn write_full(&mut self, buf: &mut [u8]) -> bool; + + /// Called on subsequent sends to modify only parts of the buffer. + /// By default this falls back to `write_full`. + fn write_modified(&mut self, buf: &mut [u8]) -> bool { + self.write_full(buf) + } + + /// Must return the exact number of bytes you’ll write. + fn get_size(&self) -> usize; +} + +// Thread-local slot for the currently active writer reference during a send call +thread_local! { + /// Holds a raw pointer to the active PayloadWriter while eCAL invokes callbacks + pub(crate) static CURRENT_WRITER: RefCell> = RefCell::new(None); +} + +/// C callback: perform a full write into the shared-memory buffer +pub(crate) unsafe extern "C" fn write_full_cb(buffer: *mut c_void, size: usize) -> c_int { + CURRENT_WRITER.with(|cell| { + if let Some(writer_ptr) = *cell.borrow() { + let writer: &mut dyn PayloadWriter = &mut *writer_ptr; + let buf = std::slice::from_raw_parts_mut(buffer as *mut u8, size); + if writer.write_full(buf) { 0 } else { -1 } + } else { + -1 + } + }) +} + +/// C callback: perform a partial modification of the shared-memory buffer +pub(crate) unsafe extern "C" fn write_mod_cb(buffer: *mut c_void, size: usize) -> c_int { + CURRENT_WRITER.with(|cell| { + if let Some(writer_ptr) = *cell.borrow() { + let writer: &mut dyn PayloadWriter = &mut *writer_ptr; + let buf = std::slice::from_raw_parts_mut(buffer as *mut u8, size); + if writer.write_modified(buf) { 0 } else { -1 } + } else { + -1 + } + }) +} + +/// C callback: return the size of the payload buffer needed +pub(crate) unsafe extern "C" fn get_size_cb() -> usize { + CURRENT_WRITER.with(|cell| { + if let Some(writer_ptr) = *cell.borrow() { + let writer: &mut dyn PayloadWriter = &mut *writer_ptr; + writer.get_size() + } else { + 0 + } + }) +} diff --git a/rustecal-pubsub/src/publisher.rs b/rustecal-pubsub/src/publisher.rs index 3feff67..1539f39 100644 --- a/rustecal-pubsub/src/publisher.rs +++ b/rustecal-pubsub/src/publisher.rs @@ -1,9 +1,18 @@ use rustecal_sys::*; use rustecal_core::types::DataTypeInfo; use crate::types::TopicId; +use crate::payload_writer::{PayloadWriter, CURRENT_WRITER, write_full_cb, write_mod_cb, get_size_cb}; use std::ffi::{CStr, CString}; use std::ptr; +/// When to assign a timestamp to an outgoing message. +pub enum Timestamp { + /// Let eCAL assign its internal send timestamp. + Auto, + /// Use this custom timestamp (microseconds since epoch). + Custom(i64), +} + /// A safe and ergonomic wrapper around the eCAL C publisher API. /// /// This struct provides a high-level interface for sending serialized messages to @@ -66,43 +75,81 @@ impl Publisher { /// # Arguments /// /// * `data` - A byte buffer containing the serialized message payload. + /// * `timestamp` - When to timestamp the message. /// /// # Returns /// - /// `1` on success, `0` on failure. - pub fn send(&self, data: &[u8]) -> i32 { - unsafe { + /// `true` on success, `false` on failure. + pub fn send(&self, data: &[u8], timestamp: Timestamp) -> bool { + let ts_ptr = match timestamp { + Timestamp::Auto => ptr::null(), + Timestamp::Custom(t) => &t as *const i64 as *const _, + }; + let ret = unsafe { eCAL_Publisher_Send( self.handle, data.as_ptr() as *const _, data.len(), - ptr::null(), + ts_ptr, ) - } + }; + // eCAL returns 0 on success + ret == 0 } - /// Sends a serialized message with a custom timestamp. + /// Sends a zero-copy payload using a [`PayloadWriter`]. /// /// # Arguments /// - /// * `data` - A byte buffer containing the message. - /// * `timestamp` - Timestamp in microseconds (use `-1` to let eCAL determine the time). + /// * `writer` - A mutable reference to a `PayloadWriter` implementation. + /// * `timestamp` - When to timestamp the message. /// /// # Returns /// - /// `1` on success, `0` on failure. - pub fn send_with_timestamp(&self, data: &[u8], timestamp: i64) -> i32 { - unsafe { - eCAL_Publisher_Send( + /// `true` on success, `false` on failure. + pub fn send_payload_writer( + &self, + writer: &mut W, + timestamp: Timestamp, + ) -> bool { + // stash the writer pointer in TLS + let ptr = writer as *mut W as *mut dyn PayloadWriter; + CURRENT_WRITER.with(|cell| { + *cell.borrow_mut() = Some(ptr); + }); + + // build the C payload writer struct + let c_writer = eCAL_PayloadWriter { + WriteFull: Some(write_full_cb), + WriteModified: Some(write_mod_cb), + GetSize: Some(get_size_cb), + }; + + // prepare timestamp pointer + let ts_ptr = match timestamp { + Timestamp::Auto => ptr::null(), + Timestamp::Custom(t) => &t as *const i64 as *const _, + }; + + // call into the FFI + let result = unsafe { + eCAL_Publisher_SendPayloadWriter( self.handle, - data.as_ptr() as *const _, - data.len(), - ×tamp as *const _ as *const _, + &c_writer as *const _, + ts_ptr, ) - } - } + }; - /// Returns the number of currently connected subscribers. + // clear the slot + CURRENT_WRITER.with(|cell| { + cell.borrow_mut().take(); + }); + + // eCAL returns 0 on success + result == 0 + } + + /// Retrieves the number of currently connected subscribers. pub fn get_subscriber_count(&self) -> usize { unsafe { eCAL_Publisher_GetSubscriberCount(self.handle) } } diff --git a/rustecal-pubsub/src/subscriber.rs b/rustecal-pubsub/src/subscriber.rs index cdd1417..ff4f198 100644 --- a/rustecal-pubsub/src/subscriber.rs +++ b/rustecal-pubsub/src/subscriber.rs @@ -96,7 +96,7 @@ impl Subscriber { self.handle } - /// Returns the number of currently connected publishers. + /// Retrieves the number of currently connected publishers. pub fn get_publisher_count(&self) -> usize { unsafe { eCAL_Subscriber_GetPublisherCount(self.handle) } } diff --git a/rustecal-pubsub/src/typed_publisher.rs b/rustecal-pubsub/src/typed_publisher.rs index 2d66255..1d2e450 100644 --- a/rustecal-pubsub/src/typed_publisher.rs +++ b/rustecal-pubsub/src/typed_publisher.rs @@ -1,88 +1,92 @@ -use crate::publisher::Publisher; +use crate::{publisher::{Publisher, Timestamp}, payload_writer::PayloadWriter, types::TopicId}; use rustecal_core::types::DataTypeInfo; -use crate::types::TopicId; -use std::sync::Arc; -use std::marker::PhantomData; +use std::{marker::PhantomData, sync::Arc}; -/// Trait for types that can be published via [`TypedPublisher`]. +/// A trait for message types that can be published via [`TypedPublisher`]. /// -/// Implement this trait for any message type `T` that should be serialized and sent -/// through eCAL's typed publisher API. -/// -/// # Required Methods -/// -/// - [`datatype()`]: Returns metadata describing the encoding, type name, -/// and optional descriptor (e.g., Protobuf schema). -/// - [`to_bytes()`]: Serializes the message into a binary buffer. +/// Implement this trait for any type `T` that needs to be serialized +/// and sent through eCAL's typed publisher API. pub trait PublisherMessage { - /// Returns topic metadata for this message type. + /// Returns metadata (encoding, type name, descriptor) for this message type. fn datatype() -> DataTypeInfo; - /// Serializes the message into a byte buffer for transmission. + /// Serializes the message into a shared, reference-counted byte buffer. fn to_bytes(&self) -> Arc<[u8]>; } -/// Type-safe, high-level wrapper around an eCAL publisher for messages of type `T`. +/// A type-safe, high-level wrapper over an eCAL publisher for messages of type `T`. /// -/// This struct wraps an untyped [`Publisher`] and ensures that only compatible messages -/// are published. It automatically serializes values of type `T` using the -/// [`PublisherMessage`] trait implementation. +/// Wraps an untyped [`Publisher`] and enforces that only compatible messages +/// (implementing [`PublisherMessage`]) are published. /// -/// # Example +/// # Examples /// /// ```no_run /// use rustecal::TypedPublisher; /// use rustecal_types_string::StringMessage; /// -/// let pub_ = TypedPublisher::::new("hello").unwrap(); -/// pub_.send(&StringMessage(Arc::from("Hello World!"))); +/// let pub_ = TypedPublisher::::new("hello topic").unwrap(); +/// pub_.send(&StringMessage{data: "Hello!".into()}, Timestamp::Auto); /// ``` pub struct TypedPublisher { publisher: Publisher, - _phantom: PhantomData, + _phantom: PhantomData, } impl TypedPublisher { - /// Creates a new typed publisher for the specified topic. + /// Creates a new typed publisher for the given topic. /// /// # Arguments /// - /// * `topic_name` - The topic name to publish to. + /// * `topic_name` — The topic name to publish to. /// /// # Errors /// - /// Returns a `String` if the underlying eCAL publisher could not be created. + /// Returns an `Err(String)` if the underlying eCAL publisher could not be created. pub fn new(topic_name: &str) -> Result { - let datatype = T::datatype(); + let datatype = T::datatype(); let publisher = Publisher::new(topic_name, datatype)?; - Ok(Self { - publisher, - _phantom: PhantomData, - }) + Ok(Self { publisher, _phantom: PhantomData }) } /// Sends a message of type `T` to all connected subscribers. /// - /// The message is serialized using [`PublisherMessage::to_bytes()`]. + /// Serializes the message via [`PublisherMessage::to_bytes()`], and + /// specifies when to timestamp (auto or custom). /// /// # Arguments /// - /// * `message` - The typed message to send. - pub fn send(&self, message: &T) { + /// * `message` — The typed message to send. + /// * `timestamp` — When to timestamp the message. + /// + /// # Returns + /// + /// `true` on success, `false` on failure. + pub fn send(&self, message: &T, timestamp: Timestamp) -> bool { let bytes = message.to_bytes(); - self.publisher.send(&bytes); + self.publisher.send(&bytes, timestamp) } - /// Sends a message of type `T` with a custom timestamp (in microseconds). + /// Performs a zero-copy send using a [`PayloadWriter`]. + /// + /// Bypasses an intermediate buffer for types (like `BytesMessage`) + /// that implement `PayloadWriter`. /// /// # Arguments /// - /// * `message` - The message to send. - /// * `timestamp` - Custom timestamp to associate with the message. - pub fn send_with_timestamp(&self, message: &T, timestamp: i64) { - let bytes = message.to_bytes(); - self.publisher.send_with_timestamp(&bytes, timestamp); + /// * `writer` — A mutable reference to a `PayloadWriter`. + /// * `timestamp` — When to timestamp the message. + /// + /// # Returns + /// + /// `true` on success, `false` on failure. + pub fn send_payload_writer( + &self, + writer: &mut W, + timestamp: Timestamp, + ) -> bool { + self.publisher.send_payload_writer(writer, timestamp) } /// Returns the number of currently connected subscribers. @@ -95,12 +99,17 @@ impl TypedPublisher { self.publisher.get_topic_name() } - /// Returns the topic ID as seen by the eCAL system. + /// Returns the topic ID assigned by eCAL. pub fn get_topic_id(&self) -> Option { self.publisher.get_topic_id() } - /// Returns the declared message type metadata. + /// Returns the declared data type metadata for this topic. + /// + /// Includes: + /// - `encoding` (e.g. `"proto"`, `"string"`, `"raw"`) + /// - `type_name` (e.g. Protobuf type or Rust type) + /// - `descriptor` (optional descriptor bytes, e.g. protobuf schema) pub fn get_data_type_information(&self) -> Option { self.publisher.get_data_type_information() } diff --git a/rustecal-pubsub/src/typed_subscriber.rs b/rustecal-pubsub/src/typed_subscriber.rs index 8d79bf4..26edd82 100644 --- a/rustecal-pubsub/src/typed_subscriber.rs +++ b/rustecal-pubsub/src/typed_subscriber.rs @@ -7,39 +7,44 @@ use std::sync::Arc; use std::marker::PhantomData; use std::slice; -/// Trait that must be implemented for any type used with [`TypedSubscriber`]. +/// A trait for message types that can be deserialized by [`TypedSubscriber`]. /// -/// Provides metadata and deserialization logic for a specific message type. +/// Implement this trait for any type `T` that needs to be reconstructed +/// from raw bytes plus metadata in a typed subscriber. pub trait SubscriberMessage: Sized { - /// Returns the metadata that describes this message type (encoding, name, optional descriptor). + /// Returns metadata (encoding, type name, descriptor) for this message type. fn datatype() -> DataTypeInfo; - /// Constructs an instance of the message type from a byte buffer and the accompanying DataTypeInfo. + /// Deserializes a message instance from a byte buffer and its metadata. + /// + /// # Arguments + /// + /// * `bytes` — A shared byte buffer containing the payload. + /// * `data_type_info` — The corresponding `DataTypeInfo` describing the payload format. + /// + /// # Returns + /// + /// `Some(T)` on success, or `None` on failure. fn from_bytes(bytes: Arc<[u8]>, data_type_info: &DataTypeInfo) -> Option; } -/// Represents a received message with associated metadata. +/// A received message, with payload and metadata. pub struct Received { - /// The decoded message of type `T`. - pub payload: T, - - /// The name of the topic this message was received on. + /// The deserialized payload of type `T`. + pub payload: T, + /// The topic name this message was received on. pub topic_name: String, - - /// The declared encoding format (e.g. "proto", "string", "raw", "json"). - pub encoding: String, - - /// The declared type name of the message. - pub type_name: String, - - /// The send timestamp provided by the publisher (microseconds since epoch). - pub timestamp: i64, - - /// The logical clock value at which the message was sent. - pub clock: i64, + /// The declared encoding format (e.g. "proto", "raw"). + pub encoding: String, + /// The declared type name for the message. + pub type_name: String, + /// The publisher's send timestamp (microseconds since epoch). + pub timestamp: i64, + /// The publisher's logical clock at send time. + pub clock: i64, } -/// Internal trampoline wrapper that stores a type-erased callback for dispatching typed messages. +/// Wrapper to store a boxed callback for `Received` struct CallbackWrapper { callback: Box) + Send + Sync>, } @@ -59,17 +64,19 @@ impl CallbackWrapper { } } -/// A high-level, type-safe subscriber for a specific message type `T`. +/// A type-safe, high-level subscriber for messages of type `T`. +/// +/// Wraps a lower-level [`Subscriber`] and provides automatic deserialization +/// plus typed callbacks. /// -/// Wraps the lower-level [`Subscriber`] to provide automatic deserialization and typed callbacks. +/// # Examples /// -/// # Example /// ```no_run /// use rustecal::TypedSubscriber; /// use rustecal_types_string::StringMessage; /// -/// let mut sub = TypedSubscriber::::new("hello").unwrap(); -/// sub.set_callback(|msg| println!("Received: {}", msg.msg.0)); +/// let mut sub = TypedSubscriber::::new("topic").unwrap(); +/// sub.set_callback(|msg| println!("Got: {}", msg.payload.0)); /// ``` pub struct TypedSubscriber { subscriber: Subscriber, @@ -105,8 +112,6 @@ impl TypedSubscriber { /// Registers a user callback that receives a deserialized message with metadata. /// - /// This replaces any previously set callback and transfers ownership of the closure. - /// /// # Arguments /// /// * `callback` - A closure accepting a [`Received`] message. @@ -131,10 +136,7 @@ impl TypedSubscriber { } } - /// Returns the number of currently connected publishers to this topic. - /// - /// This can be used for diagnostics or to implement optional behavior based - /// on whether any publisher is present. + /// Returns the number of currently connected publishers. pub fn get_publisher_count(&self) -> usize { self.subscriber.get_publisher_count() } @@ -146,9 +148,7 @@ impl TypedSubscriber { self.subscriber.get_topic_name() } - /// Returns the unique topic ID used internally by eCAL. - /// - /// This can be useful for introspection or advanced matching logic across nodes. + /// Returns the topic ID assigned by eCAL. pub fn get_topic_id(&self) -> Option { self.subscriber.get_topic_id() } diff --git a/rustecal-samples/benchmarks/performance_send/Cargo.toml b/rustecal-samples/benchmarks/performance_send/Cargo.toml index 83c0e21..39054ef 100644 --- a/rustecal-samples/benchmarks/performance_send/Cargo.toml +++ b/rustecal-samples/benchmarks/performance_send/Cargo.toml @@ -5,4 +5,5 @@ edition = "2021" [dependencies] rustecal = { path = "../../../rustecal", features = ["pubsub"] } +rustecal-pubsub = { path = "../../../rustecal-pubsub" } rustecal-types-bytes = { path = "../../../rustecal-types-bytes" } diff --git a/rustecal-samples/benchmarks/performance_send/src/binary_payload_writer.rs b/rustecal-samples/benchmarks/performance_send/src/binary_payload_writer.rs new file mode 100644 index 0000000..d66cc52 --- /dev/null +++ b/rustecal-samples/benchmarks/performance_send/src/binary_payload_writer.rs @@ -0,0 +1,72 @@ +//! Defines a simple `BinaryPayload` writer for zero-copy benchmarks. +//! +//! This payload writer mirrors the C++ `CBinaryPayload` example: +//! - `write_full` fills the entire buffer with a constant byte (0x2A). +//! - `write_modified` updates a single byte per invocation to simulate changing data. +//! - `get_size` reports the exact buffer size needed. + +use rustecal_pubsub::PayloadWriter; + +/// A direct-write binary payload that writes into shared memory without copying. +pub struct BinaryPayload { + /// Total size, in bytes, of the payload to allocate and write. + size: usize, + /// A simple counter used to vary which byte is modified each call. + clock: u32, +} + +impl BinaryPayload { + /// Create a new `BinaryPayload` of the given size. + /// + /// # Arguments + /// + /// * `size` – The number of bytes to allocate and write into. + /// + /// # Returns + /// + /// A fresh `BinaryPayload` instance with its internal clock reset. + pub fn new(size: usize) -> Self { + BinaryPayload { size, clock: 0 } + } +} + +impl PayloadWriter for BinaryPayload { + /// Fill the entire buffer with the constant byte `0x2A`. + /// + /// This is called by eCAL when the shared-memory region is first allocated + /// or its size changes. Returning `false` indicates an allocation error. + fn write_full(&mut self, buf: &mut [u8]) -> bool { + if buf.len() < self.size { + // Buffer too small: cannot satisfy payload size + return false; + } + // Fast-path fill: every byte set to 42 (0x2A) + buf[..self.size].fill(42); + true + } + + /// Modify a single byte in the existing buffer to simulate an update. + /// + /// This is called after the first full write when zero-copy mode is enabled. + /// It only changes one byte per call for maximum performance. + fn write_modified(&mut self, buf: &mut [u8]) -> bool { + if buf.len() < self.size { + // Buffer too small: cannot satisfy payload size + return false; + } + // Compute an index that cycles through the first 1024 bytes, then wraps + let idx = ((self.clock as usize) % 1024) % self.size; + // Overwrite that byte with ASCII digit '0'..'9' + buf[idx] = b'0' + (self.clock % 10) as u8; + // Advance the clock for next iteration + self.clock = self.clock.wrapping_add(1); + true + } + + /// Report the exact payload size that this writer will produce. + /// + /// eCAL uses this to allocate the shared-memory region. + fn get_size(&self) -> usize { + self.size + } +} diff --git a/rustecal-samples/benchmarks/performance_send/src/main.rs b/rustecal-samples/benchmarks/performance_send/src/main.rs index af0b7e7..8f4911d 100644 --- a/rustecal-samples/benchmarks/performance_send/src/main.rs +++ b/rustecal-samples/benchmarks/performance_send/src/main.rs @@ -1,29 +1,30 @@ -//! A performance benchmark publisher in Rust, modeled on the eCAL C++ sample. +//! A performance benchmark publisher in Rust, using the typed `BytesMessage` publisher +//! with zero-copy payload support. //! -//! This will send messages of the given size in a tight loop, logging -//! throughput every second. +//! Sends messages of the given size in a tight loop, logging throughput every second. -use std::{env, sync::Arc, time::{Duration, Instant}}; +use std::{env, time::{Duration, Instant}}; +use std::thread::sleep; use rustecal::{Ecal, EcalComponents, Configuration, TypedPublisher}; use rustecal_types_bytes::BytesMessage; +mod binary_payload_writer; +use binary_payload_writer::BinaryPayload; +use rustecal_pubsub::publisher::Timestamp; + // performance settings -const ZERO_COPY: bool = true; -const BUFFER_COUNT: u32 = 1; -const ACKNOWLEDGE_TIMEOUT_MS: i32 = 50; -const PAYLOAD_SIZE_DEFAULT: usize = 8 * 1024 * 1024; +const ZERO_COPY: bool = true; +const BUFFER_COUNT: u32 = 1; +const ACKNOWLEDGE_TIMEOUT_MS: i32 = 50; +const PAYLOAD_SIZE_DEFAULT: usize = 8 * 1024 * 1024; fn main() -> Result<(), Box> { // parse payload size from CLI (or use default) - let args: Vec = env::args().collect(); - let mut payload_size = if args.len() > 1 { - args[1].parse::().unwrap_or(PAYLOAD_SIZE_DEFAULT) - } else { - PAYLOAD_SIZE_DEFAULT - }; - if payload_size == 0 { - payload_size = 1; - } + let payload_size = env::args() + .nth(1) + .and_then(|s| s.parse().ok()) + .filter(|&n| n > 0) + .unwrap_or(PAYLOAD_SIZE_DEFAULT); // log performance settings println!("Zero copy mode : {}", ZERO_COPY); @@ -32,77 +33,73 @@ fn main() -> Result<(), Box> { println!("Payload size : {} bytes", payload_size); println!(); - // prepare and tweak eCAL Configuration + // configure eCAL let mut cfg = Configuration::new()?; cfg.publisher.layer.shm.zero_copy_mode = ZERO_COPY as i32; cfg.publisher.layer.shm.memfile_buffer_count = BUFFER_COUNT; cfg.publisher.layer.shm.acknowledge_timeout_ms = ACKNOWLEDGE_TIMEOUT_MS as u32; - // initialize eCAL with custom config - Ecal::initialize(Some("performance send rust"), EcalComponents::DEFAULT, Some(&cfg)) - .expect("eCAL initialization failed"); + // initialize eCAL + Ecal::initialize( + Some("performance send rust"), + EcalComponents::DEFAULT, + Some(&cfg), + )?; + + // create a typed publisher for raw bytes + let typed_pub: TypedPublisher = + TypedPublisher::new("Performance")?; - // create payload buffer and publisher - let payload_vec: Vec = vec![0u8; payload_size]; - let mut payload: Arc<[u8]> = Arc::from(payload_vec); - let publisher: TypedPublisher = TypedPublisher::new("Performance")?; + // prepare our zero-copy payload writer + let mut payload = BinaryPayload::new(payload_size); - // benchmark loop + // counters and timer let mut msgs_sent = 0u64; let mut bytes_sent = 0u64; let mut iterations = 0u64; let mut last_log = Instant::now(); - // wait for at least one subscriber to be ready - while publisher.get_subscriber_count() == 0 { - println!("Waiting for performance receive to start ..."); - std::thread::sleep(Duration::from_millis(1000)); + // wait for subscriber + while typed_pub.get_subscriber_count() == 0 { + println!("Waiting for receiver …"); + sleep(Duration::from_secs(1)); } println!(); // send loop while Ecal::ok() { - // modify the payload for each message - { - let buf: &mut [u8] = Arc::make_mut(&mut payload); - let chr = (msgs_sent % 9 + 48) as u8; - buf[0..16].fill(chr); - } - - let wrapped = BytesMessage { data: payload.clone() }; - publisher.send(&wrapped); + // zero-copy send via PayloadWriter + typed_pub.send_payload_writer(&mut payload, Timestamp::Auto); msgs_sent += 1; bytes_sent += payload_size as u64; iterations += 1; - // every second, print statistics - if iterations % 2000 == 0 { - let elapsed = last_log.elapsed(); - if elapsed >= Duration::from_secs(1) { - let secs = elapsed.as_secs_f64(); - let kbyte_s = (bytes_sent as f64 / 1024.0) / secs; - let mbyte_s = kbyte_s / 1024.0; - let gbyte_s = mbyte_s / 1024.0; - let msg_s = (msgs_sent as f64) / secs; - let latency_us = (secs * 1e6) / (msgs_sent as f64); - - println!("Payload size (kB) : {:.0}", payload_size / 1024); - println!("Throughput (kB/s) : {:.0}", kbyte_s); - println!("Throughput (MB/s) : {:.2}", mbyte_s); - println!("Throughput (GB/s) : {:.2}", gbyte_s); - println!("Messages (1/s) : {:.0}", msg_s); - println!("Latency (µs) : {:.2}", latency_us); - println!(); - - msgs_sent = 0; - bytes_sent = 0; - last_log = Instant::now(); - } + // every ~2000 msgs, log if 1s has passed + if iterations % 2000 == 0 && last_log.elapsed() >= Duration::from_secs(1) { + let secs = last_log.elapsed().as_secs_f64(); + let kbyte_s = (bytes_sent as f64 / 1024.0) / secs; + let mbyte_s = kbyte_s / 1024.0; + let gbyte_s = mbyte_s / 1024.0; + let msg_s = (msgs_sent as f64) / secs; + let latency_us = (secs * 1e6) / (msgs_sent as f64); + + println!("Payload size (kB) : {}", payload_size / 1024); + println!("Throughput (kB/s) : {:.0}", kbyte_s); + println!("Throughput (MB/s) : {:.2}", mbyte_s); + println!("Throughput (GB/s) : {:.2}", gbyte_s); + println!("Messages (1/s) : {:.0}", msg_s); + println!("Latency (µs) : {:.2}", latency_us); + println!(); + + // reset counters and timer + msgs_sent = 0; + bytes_sent = 0; + last_log = Instant::now(); } } - // clean up and finalize eCAL + // finalize eCAL Ecal::finalize(); Ok(()) } diff --git a/rustecal-samples/pubsub/blob_send/src/main.rs b/rustecal-samples/pubsub/blob_send/src/main.rs index 6a83d12..156fc2c 100644 --- a/rustecal-samples/pubsub/blob_send/src/main.rs +++ b/rustecal-samples/pubsub/blob_send/src/main.rs @@ -1,5 +1,5 @@ -use std::sync::Arc; use rustecal::{Ecal, EcalComponents, TypedPublisher}; +use rustecal::pubsub::publisher::Timestamp; use rustecal_types_bytes::BytesMessage; fn main() -> Result<(), Box> { @@ -15,8 +15,8 @@ fn main() -> Result<(), Box> { let buffer = vec![counter; 1024]; counter = counter.wrapping_add(1); - let wrapped = BytesMessage { data: Arc::from(buffer) }; - publisher.send(&wrapped); + let wrapped = BytesMessage { data: buffer.into() }; + publisher.send(&wrapped, Timestamp::Auto); println!("Sent buffer filled with {}", counter); diff --git a/rustecal-samples/pubsub/hello_send/src/main.rs b/rustecal-samples/pubsub/hello_send/src/main.rs index 816d956..1762546 100644 --- a/rustecal-samples/pubsub/hello_send/src/main.rs +++ b/rustecal-samples/pubsub/hello_send/src/main.rs @@ -1,5 +1,5 @@ -use std::sync::Arc; use rustecal::{Ecal, EcalComponents, TypedPublisher}; +use rustecal::pubsub::publisher::Timestamp; use rustecal_types_string::StringMessage; fn main() -> Result<(), Box> { @@ -14,8 +14,8 @@ fn main() -> Result<(), Box> { count += 1; let msg = format!("HELLO WORLD FROM RUST ({})", count); - let wrapped = StringMessage{ data: Arc::::from(msg) }; - publisher.send(&wrapped); + let wrapped = StringMessage{ data: msg.into()}; + publisher.send(&wrapped, Timestamp::Auto); println!("Sent: {}", wrapped.data); diff --git a/rustecal-samples/pubsub/person_send/src/main.rs b/rustecal-samples/pubsub/person_send/src/main.rs index f6017cb..f643ad3 100644 --- a/rustecal-samples/pubsub/person_send/src/main.rs +++ b/rustecal-samples/pubsub/person_send/src/main.rs @@ -1,4 +1,3 @@ -use std::sync::Arc; use rustecal::{Ecal, EcalComponents, TypedPublisher}; use rustecal_types_protobuf::{ProtobufMessage, IsProtobufType}; @@ -7,6 +6,8 @@ mod animal { include!(concat!(env!("OUT_DIR"), "/pb.animal.rs")); } mod environment { include!(concat!(env!("OUT_DIR"), "/pb.environment.rs")); } use people::Person; +use rustecal::pubsub::publisher::Timestamp; + impl IsProtobufType for Person {} fn main() -> Result<(), Box> { @@ -43,8 +44,8 @@ fn main() -> Result<(), Box> { println!(); // wrap the person struct in ProtobufMessage - let wrapped = ProtobufMessage { data: Arc::from(person) }; - publisher.send(&wrapped); + let wrapped = ProtobufMessage { data: person.into() }; + publisher.send(&wrapped, Timestamp::Auto); std::thread::sleep(std::time::Duration::from_millis(500)); } diff --git a/rustecal-samples/pubsub/serde_send/src/main.rs b/rustecal-samples/pubsub/serde_send/src/main.rs index a5e5faf..9bcce40 100644 --- a/rustecal-samples/pubsub/serde_send/src/main.rs +++ b/rustecal-samples/pubsub/serde_send/src/main.rs @@ -1,4 +1,5 @@ use rustecal::{Ecal, EcalComponents, TypedPublisher}; +use rustecal::pubsub::publisher::Timestamp; use rustecal_types_serde::JsonMessage; #[derive(serde::Serialize, serde::Deserialize, Clone, Debug)] @@ -26,7 +27,7 @@ fn main() -> Result<(), Box> { let wrapped = JsonMessage::new(payload.clone()); // send over eCAL pub/sub - publisher.send(&wrapped); + publisher.send(&wrapped, Timestamp::Auto); println!( "Sent: message = {}, count = {}", wrapped.data.message, wrapped.data.count diff --git a/rustecal-types-bytes/README.md b/rustecal-types-bytes/README.md index bdebf2d..93dd15f 100644 --- a/rustecal-types-bytes/README.md +++ b/rustecal-types-bytes/README.md @@ -23,7 +23,6 @@ rustecal-types-bytes = "0.1" ### Publisher Example ```rust -use std::sync::Arc; use rustecal::{Ecal, EcalComponents, TypedPublisher}; use rustecal_types_bytes::BytesMessage; @@ -37,8 +36,8 @@ fn main() -> Result<(), Box> { let buf = vec![counter; 1024]; counter = counter.wrapping_add(1); - let message = BytesMessage { data: Arc::from(buf) }; - publisher.send(&message); + let message = BytesMessage { data: buf.into() }; + publisher.send(&message, Timestamp::Auto); std::thread::sleep(std::time::Duration::from_millis(500)); } diff --git a/rustecal-types-protobuf/README.md b/rustecal-types-protobuf/README.md index e33d34d..ea21fe6 100644 --- a/rustecal-types-protobuf/README.md +++ b/rustecal-types-protobuf/README.md @@ -24,7 +24,6 @@ rustecal-types-protobuf = "0.1" ### Publisher Example ```rust -use std::sync::Arc; use rustecal::{Ecal, EcalComponents, TypedPublisher}; use rustecal_types_protobuf::{ProtobufMessage, IsProtobufType}; @@ -43,8 +42,8 @@ fn main() -> Result<(), Box> { while Ecal::ok() { let person = Person { id: 1, name: "Alice".into(), ..Default::default() }; - let message = ProtobufMessage { data : Arc::from(person) }; - publisher.send(&message); + let message = ProtobufMessage { data : person.into() }; + publisher.send(&message, Timestamp::Auto); std::thread::sleep(std::time::Duration::from_millis(500)); } diff --git a/rustecal-types-serde/README.md b/rustecal-types-serde/README.md index b68beb6..4dfca57 100644 --- a/rustecal-types-serde/README.md +++ b/rustecal-types-serde/README.md @@ -23,7 +23,6 @@ rustecal-types-serde = "0.1" ### Publisher Example (JSON) ```rust -use std::sync::Arc; use serde::{Serialize, Deserialize}; use rustecal::{Ecal, EcalComponents, TypedPublisher}; use rustecal_types_serde::JsonMessage; @@ -41,7 +40,7 @@ fn main() -> Result<(), Box> { while Ecal::ok() { let payload = MyData { msg: "Hello from Rust".into() }; let message = JsonMessage::new(payload); - publisher.send(&message); + publisher.send(&message, Timestamp::Auto); std::thread::sleep(std::time::Duration::from_millis(500)); } @@ -54,7 +53,6 @@ fn main() -> Result<(), Box> { ### Subscriber Example (JSON) ```rust -use std::sync::Arc; use serde::{Serialize, Deserialize}; use rustecal::{Ecal, EcalComponents, TypedSubscriber}; use rustecal_types_serde::JsonMessage; diff --git a/rustecal-types-string/README.md b/rustecal-types-string/README.md index 5624c7d..2851cff 100644 --- a/rustecal-types-string/README.md +++ b/rustecal-types-string/README.md @@ -23,7 +23,6 @@ rustecal-types-string = "0.1" ### Publisher Example ```rust -use std::sync::Arc; use rustecal::{Ecal, EcalComponents, TypedPublisher}; use rustecal_types_string::StringMessage; @@ -33,8 +32,8 @@ fn main() -> Result<(), Box> { let publisher = TypedPublisher::::new("hello")?; while Ecal::ok() { - let message = StringMessage { data: Arc::from("Hello from Rust") }; - publisher.send(&message); + let message = StringMessage { data: "Hello from Rust".into() }; + publisher.send(&message, Timestamp::Auto); std::thread::sleep(std::time::Duration::from_millis(500)); } diff --git a/rustecal/src/lib.rs b/rustecal/src/lib.rs index f0f88c3..bb57daf 100644 --- a/rustecal/src/lib.rs +++ b/rustecal/src/lib.rs @@ -15,8 +15,8 @@ //! //! fn main() { //! Ecal::initialize(Some("example node"), EcalComponents::DEFAULT, None).unwrap(); -//! let pub_ = TypedPublisher::::new("example_topic").unwrap(); -//! pub_.send(&StringMessage("Hello!".into())); +//! let pub_ = TypedPublisher::::new("hello topic").unwrap(); +//! pub_.send(&StringMessage{data: "Hello!".into()}, Timestamp::Auto); //! } //! ``` //!