Skip to content

Commit 9f969f8

Browse files
zero copy publisher / subscriber demo (#65)
* zero copy subscriber implemented * performance samples cleaned up
1 parent 11823db commit 9f969f8

File tree

10 files changed

+136
-153
lines changed

10 files changed

+136
-153
lines changed

rustecal-pubsub/src/typed_publisher.rs

Lines changed: 5 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -18,16 +18,6 @@ pub trait PublisherMessage {
1818
///
1919
/// Wraps an untyped [`Publisher`] and enforces that only compatible messages
2020
/// (implementing [`PublisherMessage`]) are published.
21-
///
22-
/// # Examples
23-
///
24-
/// ```no_run
25-
/// use rustecal::TypedPublisher;
26-
/// use rustecal_types_string::StringMessage;
27-
///
28-
/// let pub_ = TypedPublisher::<StringMessage>::new("hello topic").unwrap();
29-
/// pub_.send(&StringMessage{data: "Hello!".into()}, Timestamp::Auto);
30-
/// ```
3121
pub struct TypedPublisher<T: PublisherMessage> {
3222
publisher: Publisher,
3323
_phantom: PhantomData<T>,
@@ -38,7 +28,7 @@ impl<T: PublisherMessage> TypedPublisher<T> {
3828
///
3929
/// # Arguments
4030
///
41-
/// * `topic_name` The topic name to publish to.
31+
/// * `topic_name` - The topic name to publish to.
4232
///
4333
/// # Errors
4434
///
@@ -57,8 +47,8 @@ impl<T: PublisherMessage> TypedPublisher<T> {
5747
///
5848
/// # Arguments
5949
///
60-
/// * `message` The typed message to send.
61-
/// * `timestamp` When to timestamp the message.
50+
/// * `message` - The typed message to send.
51+
/// * `timestamp` - When to timestamp the message.
6252
///
6353
/// # Returns
6454
///
@@ -75,8 +65,8 @@ impl<T: PublisherMessage> TypedPublisher<T> {
7565
///
7666
/// # Arguments
7767
///
78-
/// * `writer` A mutable reference to a `PayloadWriter`.
79-
/// * `timestamp` When to timestamp the message.
68+
/// * `writer` - A mutable reference to a `PayloadWriter`.
69+
/// * `timestamp` - When to timestamp the message.
8070
///
8171
/// # Returns
8272
///

rustecal-pubsub/src/typed_subscriber.rs

Lines changed: 55 additions & 76 deletions
Original file line numberDiff line numberDiff line change
@@ -2,30 +2,27 @@ use crate::subscriber::Subscriber;
22
use crate::types::TopicId;
33
use rustecal_core::types::DataTypeInfo;
44
use rustecal_sys::{eCAL_SDataTypeInformation, eCAL_SReceiveCallbackData, eCAL_STopicId};
5-
use std::ffi::{c_void, CStr};
6-
use std::sync::Arc;
7-
use std::marker::PhantomData;
8-
use std::slice;
5+
use std::{ffi::{c_void, CStr}, marker::PhantomData, slice};
96

107
/// A trait for message types that can be deserialized by [`TypedSubscriber`].
118
///
129
/// Implement this trait for any type `T` that needs to be reconstructed
13-
/// from raw bytes plus metadata in a typed subscriber.
14-
pub trait SubscriberMessage: Sized {
10+
/// from a zero-copy byte slice plus metadata in a typed subscriber.
11+
pub trait SubscriberMessage<'a>: Sized {
1512
/// Returns metadata (encoding, type name, descriptor) for this message type.
1613
fn datatype() -> DataTypeInfo;
1714

18-
/// Deserializes a message instance from a byte buffer and its metadata.
15+
/// Deserializes a message instance from a zero-copy byte slice and its metadata.
1916
///
2017
/// # Arguments
2118
///
22-
/// * `bytes` A shared byte buffer containing the payload.
23-
/// * `data_type_info` The corresponding `DataTypeInfo` describing the payload format.
19+
/// * `bytes` - A shared byte buffer containing the payload.
20+
/// * `data_type_info` - The corresponding `DataTypeInfo` describing the payload format.
2421
///
2522
/// # Returns
2623
///
2724
/// `Some(T)` on success, or `None` on failure.
28-
fn from_bytes(bytes: Arc<[u8]>, data_type_info: &DataTypeInfo) -> Option<Self>;
25+
fn from_bytes(bytes: &'a [u8], data_type_info: &DataTypeInfo) -> Option<Self>;
2926
}
3027

3128
/// A received message, with payload and metadata.
@@ -45,18 +42,17 @@ pub struct Received<T> {
4542
}
4643

4744
/// Wrapper to store a boxed callback for `Received<T>`
48-
struct CallbackWrapper<T: SubscriberMessage> {
49-
callback: Box<dyn Fn(Received<T>) + Send + Sync>,
45+
struct CallbackWrapper<'buf, T: SubscriberMessage<'buf>> {
46+
callback: Box<dyn Fn(Received<T>) + Send + Sync + 'static>,
47+
_phantom: PhantomData<&'buf T>,
5048
}
5149

52-
impl<T: SubscriberMessage> CallbackWrapper<T> {
50+
impl<'buf, T: SubscriberMessage<'buf>> CallbackWrapper<'buf, T> {
5351
fn new<F>(f: F) -> Self
5452
where
5553
F: Fn(Received<T>) + Send + Sync + 'static,
5654
{
57-
Self {
58-
callback: Box::new(f),
59-
}
55+
Self { callback: Box::new(f), _phantom: PhantomData }
6056
}
6157

6258
fn call(&self, received: Received<T>) {
@@ -68,23 +64,13 @@ impl<T: SubscriberMessage> CallbackWrapper<T> {
6864
///
6965
/// Wraps a lower-level [`Subscriber`] and provides automatic deserialization
7066
/// plus typed callbacks.
71-
///
72-
/// # Examples
73-
///
74-
/// ```no_run
75-
/// use rustecal::TypedSubscriber;
76-
/// use rustecal_types_string::StringMessage;
77-
///
78-
/// let mut sub = TypedSubscriber::<StringMessage>::new("topic").unwrap();
79-
/// sub.set_callback(|msg| println!("Got: {}", msg.payload.0));
80-
/// ```
81-
pub struct TypedSubscriber<T: SubscriberMessage> {
67+
pub struct TypedSubscriber<'buf, T: SubscriberMessage<'buf>> {
8268
subscriber: Subscriber,
83-
user_data: *mut CallbackWrapper<T>,
84-
_phantom: PhantomData<T>,
69+
user_data: *mut CallbackWrapper<'buf, T>,
70+
_phantom: PhantomData<&'buf T>,
8571
}
8672

87-
impl<T: SubscriberMessage> TypedSubscriber<T> {
73+
impl<'buf, T: SubscriberMessage<'buf>> TypedSubscriber<'buf, T> {
8874
/// Creates a new typed subscriber for the specified topic.
8975
///
9076
/// # Arguments
@@ -97,40 +83,29 @@ impl<T: SubscriberMessage> TypedSubscriber<T> {
9783
pub fn new(topic_name: &str) -> Result<Self, String> {
9884
let datatype = T::datatype();
9985

100-
// Set dummy callback for construction, real callback will be assigned later
101-
let boxed: Box<CallbackWrapper<T>> = Box::new(CallbackWrapper::new(|_| {}));
86+
// dummy callback for construction
87+
let boxed = Box::new(CallbackWrapper::new(|_| {}));
10288
let user_data = Box::into_raw(boxed);
10389

104-
let subscriber = Subscriber::new(topic_name, datatype, trampoline::<T>)?;
105-
106-
Ok(Self {
107-
subscriber,
108-
user_data,
109-
_phantom: PhantomData,
110-
})
90+
let subscriber = Subscriber::new(topic_name, datatype, trampoline::< 'buf, T>)?;
91+
Ok(Self { subscriber, user_data, _phantom: PhantomData })
11192
}
11293

11394
/// Registers a user callback that receives a deserialized message with metadata.
114-
///
115-
/// # Arguments
116-
///
117-
/// * `callback` - A closure accepting a [`Received<T>`] message.
11895
pub fn set_callback<F>(&mut self, callback: F)
11996
where
12097
F: Fn(Received<T>) + Send + Sync + 'static,
12198
{
99+
// drop the old callback
122100
unsafe {
123-
// Drop the old callback
124101
let _ = Box::from_raw(self.user_data);
125102
}
126-
127-
let boxed = Box::new(CallbackWrapper::new(callback));
103+
let boxed = Box::new(CallbackWrapper::new(callback));
128104
self.user_data = Box::into_raw(boxed);
129-
130105
unsafe {
131106
rustecal_sys::eCAL_Subscriber_SetReceiveCallback(
132107
self.subscriber.raw_handle(),
133-
Some(trampoline::<T>),
108+
Some(trampoline::< 'buf, T>),
134109
self.user_data as *mut _,
135110
);
136111
}
@@ -164,7 +139,7 @@ impl<T: SubscriberMessage> TypedSubscriber<T> {
164139
}
165140
}
166141

167-
impl<T: SubscriberMessage> Drop for TypedSubscriber<T> {
142+
impl<'buf, T: SubscriberMessage<'buf>> Drop for TypedSubscriber<'buf, T> {
168143
/// Cleans up and removes the callback, releasing any boxed closures.
169144
fn drop(&mut self) {
170145
unsafe {
@@ -174,44 +149,48 @@ impl<T: SubscriberMessage> Drop for TypedSubscriber<T> {
174149
}
175150
}
176151

177-
/// Internal trampoline for dispatching incoming messages to the registered user closure.
178-
///
179-
/// Converts C FFI types into Rust-safe [`Received<T>`] values and passes them to the callback.
180-
extern "C" fn trampoline<T: SubscriberMessage>(
181-
topic_id: *const eCAL_STopicId,
152+
/// Internal trampoline for dispatching incoming messages to the registered user callback.
153+
extern "C" fn trampoline<'buf, T: SubscriberMessage<'buf> + 'buf>(
154+
topic_id: *const eCAL_STopicId,
182155
data_type_info: *const eCAL_SDataTypeInformation,
183-
data: *const eCAL_SReceiveCallbackData,
184-
user_data: *mut c_void,
156+
data: *const eCAL_SReceiveCallbackData,
157+
user_data: *mut c_void,
185158
) {
186159
unsafe {
187160
if data.is_null() || user_data.is_null() {
188161
return;
189162
}
190-
// Raw payload buffer
191-
let msg_slice = slice::from_raw_parts((*data).buffer as *const u8, (*data).buffer_size);
192-
let msg_arc: Arc<[u8]> = Arc::from(msg_slice);
193-
// Build Rust DataTypeInfo from eCAL metadata
194-
let encoding = CStr::from_ptr((*data_type_info).encoding).to_string_lossy().into_owned();
195-
let type_name = CStr::from_ptr((*data_type_info).name).to_string_lossy().into_owned();
196-
let descriptor = if (*data_type_info).descriptor.is_null() || (*data_type_info).descriptor_length == 0 {
163+
164+
// zero-copy view of the shared-memory payload
165+
let rd = &*data;
166+
let payload = slice::from_raw_parts(rd.buffer as *const u8, rd.buffer_size as usize);
167+
168+
// rebuild DataTypeInfo
169+
let info = &*data_type_info;
170+
let encoding = CStr::from_ptr(info.encoding).to_string_lossy().into_owned();
171+
let type_name = CStr::from_ptr(info.name).to_string_lossy().into_owned();
172+
let descriptor = if info.descriptor.is_null() || info.descriptor_length == 0 {
197173
Vec::new()
198174
} else {
199-
slice::from_raw_parts((*data_type_info).descriptor as *const u8, (*data_type_info).descriptor_length as usize).to_vec()
175+
slice::from_raw_parts(info.descriptor as *const u8, info.descriptor_length as usize).to_vec()
200176
};
201-
let dt_info = DataTypeInfo { encoding, type_name, descriptor };
202-
// Deserialize with access to datatype information
203-
if let Some(decoded) = T::from_bytes(msg_arc.clone(), &dt_info) {
204-
let cb_wrapper = &*(user_data as *const CallbackWrapper<T>);
205-
let topic_name = CStr::from_ptr((*topic_id).topic_name).to_string_lossy().into_owned();
206-
let metadata = Received {
207-
payload: decoded,
177+
let dt_info = DataTypeInfo { encoding: encoding.clone(), type_name: type_name.clone(), descriptor };
178+
179+
// direct-borrow deserialization
180+
if let Some(decoded) = T::from_bytes(payload, &dt_info) {
181+
let cb_wrapper = &*(user_data as *const CallbackWrapper<'buf, T>);
182+
let topic_name = CStr::from_ptr((*topic_id).topic_name)
183+
.to_string_lossy()
184+
.into_owned();
185+
let received = Received {
186+
payload: decoded,
208187
topic_name,
209-
encoding: dt_info.encoding.clone(),
210-
type_name: dt_info.type_name.clone(),
211-
timestamp: (*data).send_timestamp,
212-
clock: (*data).send_clock,
188+
encoding: encoding.clone(),
189+
type_name: type_name.clone(),
190+
timestamp: rd.send_timestamp,
191+
clock: rd.send_clock,
213192
};
214-
cb_wrapper.call(metadata);
193+
cb_wrapper.call(received);
215194
}
216195
}
217196
}

rustecal-samples/benchmarks/performance_receive/src/main.rs

Lines changed: 20 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,8 @@
1-
//! A performance benchmark subscriber in Rust, modeled on the eCAL C++ sample.
2-
//!
1+
//! A performance benchmark subscriber in Rust, using the typed `BytesMessage` subscriber
2+
//! to demonstrate zero-copy payload support.
33
44
use std::{sync::{Arc, Mutex, atomic::Ordering}, thread, time::{Duration, Instant}};
5+
use std::thread::sleep;
56
use rustecal::{Ecal, EcalComponents, TypedSubscriber};
67
use rustecal::pubsub::typed_subscriber::Received;
78
use rustecal_types_bytes::BytesMessage;
@@ -12,7 +13,7 @@ fn main() -> Result<(), Box<dyn std::error::Error>> {
1213
.expect("eCAL initialization failed");
1314

1415
// create a typed subscriber for raw bytes
15-
let mut subscriber: TypedSubscriber<BytesMessage> = TypedSubscriber::new("Performance")?;
16+
let mut subscriber: TypedSubscriber<'_, BytesMessage<'_>> = TypedSubscriber::new("Performance")?;
1617

1718
// shared counters & timer
1819
let msgs = Arc::new(std::sync::atomic::AtomicU64::new(0));
@@ -26,7 +27,7 @@ fn main() -> Result<(), Box<dyn std::error::Error>> {
2627
let start = Arc::clone(&start);
2728

2829
subscriber.set_callback(move |msg: Received<BytesMessage>| {
29-
let buffer = &msg.payload.data;
30+
let buffer: &[u8] = msg.payload.data.as_ref();
3031
if buffer.is_empty() {
3132
// nothing to do
3233
return;
@@ -50,6 +51,14 @@ fn main() -> Result<(), Box<dyn std::error::Error>> {
5051
let msg_s = (m as f64) / secs;
5152
let latency_us = (secs * 1e6) / (m as f64);
5253

54+
println!("Topic name : {}", msg.topic_name);
55+
let slice = &buffer[..16];
56+
let spaced = slice
57+
.iter()
58+
.map(|&b| (b as char).to_string())
59+
.collect::<Vec<String>>()
60+
.join(" ");
61+
println!("Message [0 - 15] : {:?}", spaced);
5362
println!("Payload size (kB) : {:.0}", buffer.len() / 1024);
5463
println!("Throughput (kB/s) : {:.0}", kbyte_s);
5564
println!("Throughput (MB/s) : {:.2}", mbyte_s);
@@ -64,6 +73,13 @@ fn main() -> Result<(), Box<dyn std::error::Error>> {
6473
});
6574
}
6675

76+
// wait for publisher
77+
while subscriber.get_publisher_count() == 0 {
78+
println!("Waiting for publisher …");
79+
sleep(Duration::from_secs(1));
80+
}
81+
println!();
82+
6783
// keep the thread alive so callbacks can run
6884
while Ecal::ok() {
6985
thread::sleep(Duration::from_millis(100));

rustecal-samples/benchmarks/performance_send/src/main.rs

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
//! A performance benchmark publisher in Rust, using the typed `BytesMessage` publisher
2-
//! with zero-copy payload support.
2+
//! to demonstrate zero-copy payload support.
33
//!
44
//! Sends messages of the given size in a tight loop, logging throughput every second.
55
@@ -47,7 +47,7 @@ fn main() -> Result<(), Box<dyn std::error::Error>> {
4747
)?;
4848

4949
// create a typed publisher for raw bytes
50-
let typed_pub: TypedPublisher<BytesMessage> =
50+
let publisher: TypedPublisher<BytesMessage> =
5151
TypedPublisher::new("Performance")?;
5252

5353
// prepare our zero-copy payload writer
@@ -60,7 +60,7 @@ fn main() -> Result<(), Box<dyn std::error::Error>> {
6060
let mut last_log = Instant::now();
6161

6262
// wait for subscriber
63-
while typed_pub.get_subscriber_count() == 0 {
63+
while publisher.get_subscriber_count() == 0 {
6464
println!("Waiting for receiver …");
6565
sleep(Duration::from_secs(1));
6666
}
@@ -69,7 +69,7 @@ fn main() -> Result<(), Box<dyn std::error::Error>> {
6969
// send loop
7070
while Ecal::ok() {
7171
// zero-copy send via PayloadWriter
72-
typed_pub.send_payload_writer(&mut payload, Timestamp::Auto);
72+
publisher.send_payload_writer(&mut payload, Timestamp::Auto);
7373

7474
msgs_sent += 1;
7575
bytes_sent += payload_size as u64;

0 commit comments

Comments
 (0)