Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 2 additions & 3 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,6 @@ This crate will fail to compile if the native libraries are not found.
### Publisher

```rust
use std::sync::Arc;
use std::time::Duration;
use rustecal::{Ecal, EcalComponents, TypedPublisher};
use rustecal_types_string::StringMessage;
Expand All @@ -44,11 +43,11 @@ fn main() {
let publisher = TypedPublisher::<StringMessage>::new("hello").unwrap();

// prepare the message to send
let message = StringMessage { data: Arc::from("Hello from Rust") };
let message = StringMessage { data: "Hello from Rust".into() };

// publish until eCAL shuts down
while Ecal::ok() {
publisher.send(&message);
publisher.send(&message, Timestamp::Auto);
std::thread::sleep(Duration::from_millis(500));
}

Expand Down
5 changes: 2 additions & 3 deletions docs/src/api/publisher.md
Original file line number Diff line number Diff line change
Expand Up @@ -5,12 +5,11 @@ The `Publisher<T>` allows you to publish messages of type `T` on a topic.
## Example

```rust
use std::sync::Arc;
use rustecal::{Ecal, EcalComponents, TypedPublisher};
use rustecal_types_string::StringMessage;

let publisher = TypedPublisher::<StringMessage>::new("hello").unwrap();

let message = StringMessage { data: Arc::from("Hello from Rust") };
publisher.send(&message);
let message = StringMessage { data: "Hello from Rust".into() }
publisher.send(&message, Timestamp::Auto);
```
5 changes: 2 additions & 3 deletions docs/src/examples/binary.md
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,6 @@
## Publisher

```rust
use std::sync::Arc;
use rustecal::{Ecal, EcalComponents, TypedPublisher};
use rustecal_types_bytes::BytesMessage;

Expand All @@ -17,8 +16,8 @@ fn main() -> Result<(), Box<dyn std::error::Error>> {
let buf = vec![counter; 1024];
counter = counter.wrapping_add(1);

let message = BytesMessage { data: Arc::from(buf) };
publisher.send(&message);
let message = BytesMessage { data: buf.into() };
publisher.send(&message, Timestamp::Auto);

std::thread::sleep(std::time::Duration::from_millis(500));
}
Expand Down
4 changes: 1 addition & 3 deletions docs/src/examples/json.md
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,6 @@
## Publisher

```rust
use std::sync::Arc;
use serde::{Serialize, Deserialize};
use rustecal::{Ecal, EcalComponents, TypedPublisher};
use rustecal_types_serde::JsonMessage;
Expand All @@ -21,7 +20,7 @@ fn main() -> Result<(), Box<dyn std::error::Error>> {
while Ecal::ok() {
let payload = MyData { msg: "Hello from Rust".into() };
let message = JsonMessage::new(payload);
publisher.send(&message);
publisher.send(&message, Timestamp::Auto);

std::thread::sleep(std::time::Duration::from_millis(500));
}
Expand All @@ -34,7 +33,6 @@ fn main() -> Result<(), Box<dyn std::error::Error>> {
## Subscriber

```rust
use std::sync::Arc;
use serde::{Serialize, Deserialize};
use rustecal::{Ecal, EcalComponents, TypedSubscriber};
use rustecal_types_serde::JsonMessage;
Expand Down
2 changes: 1 addition & 1 deletion docs/src/examples/protobuf.md
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@ fn main() -> Result<(), Box<dyn std::error::Error>> {
let person = Person { id: 1, name: "Alice".into(), ..Default::default() };

let message = ProtobufMessage { data : Arc::from(person) };
publisher.send(&message);
publisher.send(&message, Timestamp::Auto);

std::thread::sleep(std::time::Duration::from_millis(500));
}
Expand Down
5 changes: 2 additions & 3 deletions docs/src/examples/string.md
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,6 @@
## Publisher

```rust
use std::sync::Arc;
use rustecal::{Ecal, EcalComponents, TypedPublisher};
use rustecal_types_string::StringMessage;

Expand All @@ -13,8 +12,8 @@ fn main() -> Result<(), Box<dyn std::error::Error>> {
let publisher = TypedPublisher::<StringMessage>::new("hello")?;

while Ecal::ok() {
let message = StringMessage { data: Arc::from("Hello from Rust") };
publisher.send(&message);
let message = StringMessage { data: "Hello from Rust".into() };
publisher.send(&message, Timestamp::Auto);

std::thread::sleep(std::time::Duration::from_millis(500));
}
Expand Down
5 changes: 2 additions & 3 deletions rustecal-pubsub/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,6 @@ rustecal-pubsub = "0.1"
### Typed Publisher Example

```rust
use std::sync::Arc;
use rustecal::{Ecal, EcalComponents, TypedPublisher};
use rustecal_types_string::StringMessage;

Expand All @@ -38,8 +37,8 @@ fn main() -> Result<(), Box<dyn std::error::Error>> {
let publisher = TypedPublisher::<StringMessage>::new("hello")?;

while Ecal::ok() {
let message = StringMessage { data: Arc::from("Hello from Rust") };
publisher.send(&message);
let message = StringMessage { data: "Hello from Rust".into() };
publisher.send(&message, Timestamp::Auto);

std::thread::sleep(std::time::Duration::from_millis(500));
}
Expand Down
2 changes: 2 additions & 0 deletions rustecal-pubsub/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ pub mod publisher;
pub mod subscriber;
pub mod typed_publisher;
pub mod typed_subscriber;
pub mod payload_writer;

// Public API
pub use publisher::Publisher;
Expand All @@ -30,3 +31,4 @@ pub use typed_publisher::TypedPublisher;
pub use typed_publisher::PublisherMessage;
pub use typed_subscriber::TypedSubscriber;
pub use typed_subscriber::SubscriberMessage;
pub use payload_writer::PayloadWriter;
68 changes: 68 additions & 0 deletions rustecal-pubsub/src/payload_writer.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,68 @@
// payload_writer.rs
//
// Defines the `PayloadWriter` trait for zero-copy shared-memory payload writes,
// plus thread-local storage and C-style callback functions to integrate
// with eCAL's `SendPayloadWriter` API, using mutable references rather than owning values.

use std::cell::RefCell;
use std::os::raw::{c_void, c_int};

/// A zero‐copy payload writer: you fill the shared‐memory buffer in place.
pub trait PayloadWriter {
/// Called once when the memory is first allocated (or resized).
/// Return `true` on success.
fn write_full(&mut self, buf: &mut [u8]) -> bool;

/// Called on subsequent sends to modify only parts of the buffer.
/// By default this falls back to `write_full`.
fn write_modified(&mut self, buf: &mut [u8]) -> bool {
self.write_full(buf)
}

/// Must return the exact number of bytes you’ll write.
fn get_size(&self) -> usize;
}

// Thread-local slot for the currently active writer reference during a send call
thread_local! {
/// Holds a raw pointer to the active PayloadWriter while eCAL invokes callbacks
pub(crate) static CURRENT_WRITER: RefCell<Option<*mut dyn PayloadWriter>> = RefCell::new(None);
}

/// C callback: perform a full write into the shared-memory buffer
pub(crate) unsafe extern "C" fn write_full_cb(buffer: *mut c_void, size: usize) -> c_int {
CURRENT_WRITER.with(|cell| {
if let Some(writer_ptr) = *cell.borrow() {
let writer: &mut dyn PayloadWriter = &mut *writer_ptr;
let buf = std::slice::from_raw_parts_mut(buffer as *mut u8, size);
if writer.write_full(buf) { 0 } else { -1 }
} else {
-1
}
})
}

/// C callback: perform a partial modification of the shared-memory buffer
pub(crate) unsafe extern "C" fn write_mod_cb(buffer: *mut c_void, size: usize) -> c_int {
CURRENT_WRITER.with(|cell| {
if let Some(writer_ptr) = *cell.borrow() {
let writer: &mut dyn PayloadWriter = &mut *writer_ptr;
let buf = std::slice::from_raw_parts_mut(buffer as *mut u8, size);
if writer.write_modified(buf) { 0 } else { -1 }
} else {
-1
}
})
}

/// C callback: return the size of the payload buffer needed
pub(crate) unsafe extern "C" fn get_size_cb() -> usize {
CURRENT_WRITER.with(|cell| {
if let Some(writer_ptr) = *cell.borrow() {
let writer: &mut dyn PayloadWriter = &mut *writer_ptr;
writer.get_size()
} else {
0
}
})
}
83 changes: 65 additions & 18 deletions rustecal-pubsub/src/publisher.rs
Original file line number Diff line number Diff line change
@@ -1,9 +1,18 @@
use rustecal_sys::*;
use rustecal_core::types::DataTypeInfo;
use crate::types::TopicId;
use crate::payload_writer::{PayloadWriter, CURRENT_WRITER, write_full_cb, write_mod_cb, get_size_cb};
use std::ffi::{CStr, CString};
use std::ptr;

/// When to assign a timestamp to an outgoing message.
pub enum Timestamp {
/// Let eCAL assign its internal send timestamp.
Auto,
/// Use this custom timestamp (microseconds since epoch).
Custom(i64),
}

/// A safe and ergonomic wrapper around the eCAL C publisher API.
///
/// This struct provides a high-level interface for sending serialized messages to
Expand Down Expand Up @@ -66,43 +75,81 @@ impl Publisher {
/// # Arguments
///
/// * `data` - A byte buffer containing the serialized message payload.
/// * `timestamp` - When to timestamp the message.
///
/// # Returns
///
/// `1` on success, `0` on failure.
pub fn send(&self, data: &[u8]) -> i32 {
unsafe {
/// `true` on success, `false` on failure.
pub fn send(&self, data: &[u8], timestamp: Timestamp) -> bool {
let ts_ptr = match timestamp {
Timestamp::Auto => ptr::null(),
Timestamp::Custom(t) => &t as *const i64 as *const _,
};
let ret = unsafe {
eCAL_Publisher_Send(
self.handle,
data.as_ptr() as *const _,
data.len(),
ptr::null(),
ts_ptr,
)
}
};
// eCAL returns 0 on success
ret == 0
}

/// Sends a serialized message with a custom timestamp.
/// Sends a zero-copy payload using a [`PayloadWriter`].
///
/// # Arguments
///
/// * `data` - A byte buffer containing the message.
/// * `timestamp` - Timestamp in microseconds (use `-1` to let eCAL determine the time).
/// * `writer` - A mutable reference to a `PayloadWriter` implementation.
/// * `timestamp` - When to timestamp the message.
///
/// # Returns
///
/// `1` on success, `0` on failure.
pub fn send_with_timestamp(&self, data: &[u8], timestamp: i64) -> i32 {
unsafe {
eCAL_Publisher_Send(
/// `true` on success, `false` on failure.
pub fn send_payload_writer<W: PayloadWriter>(
&self,
writer: &mut W,
timestamp: Timestamp,
) -> bool {
// stash the writer pointer in TLS
let ptr = writer as *mut W as *mut dyn PayloadWriter;
CURRENT_WRITER.with(|cell| {
*cell.borrow_mut() = Some(ptr);
});

// build the C payload writer struct
let c_writer = eCAL_PayloadWriter {
WriteFull: Some(write_full_cb),
WriteModified: Some(write_mod_cb),
GetSize: Some(get_size_cb),
};

// prepare timestamp pointer
let ts_ptr = match timestamp {
Timestamp::Auto => ptr::null(),
Timestamp::Custom(t) => &t as *const i64 as *const _,
};

// call into the FFI
let result = unsafe {
eCAL_Publisher_SendPayloadWriter(
self.handle,
data.as_ptr() as *const _,
data.len(),
&timestamp as *const _ as *const _,
&c_writer as *const _,
ts_ptr,
)
}
}
};

/// Returns the number of currently connected subscribers.
// clear the slot
CURRENT_WRITER.with(|cell| {
cell.borrow_mut().take();
});

// eCAL returns 0 on success
result == 0
}

/// Retrieves the number of currently connected subscribers.
pub fn get_subscriber_count(&self) -> usize {
unsafe { eCAL_Publisher_GetSubscriberCount(self.handle) }
}
Expand Down
2 changes: 1 addition & 1 deletion rustecal-pubsub/src/subscriber.rs
Original file line number Diff line number Diff line change
Expand Up @@ -96,7 +96,7 @@ impl Subscriber {
self.handle
}

/// Returns the number of currently connected publishers.
/// Retrieves the number of currently connected publishers.
pub fn get_publisher_count(&self) -> usize {
unsafe { eCAL_Subscriber_GetPublisherCount(self.handle) }
}
Expand Down
Loading