Skip to content

Commit d288280

Browse files
Configuration handling (#57)
* configuration access fixed * performance_receive and performance_receive benchmark samples added * commenting fixed * import fixed
1 parent 2baa834 commit d288280

File tree

22 files changed

+249
-37
lines changed

22 files changed

+249
-37
lines changed

Cargo.toml

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -9,6 +9,8 @@ members = [
99
"rustecal-types-protobuf",
1010
"rustecal-types-serde",
1111
"rustecal-types-string",
12+
"rustecal-samples/benchmarks/performance_receive",
13+
"rustecal-samples/benchmarks/performance_send",
1214
"rustecal-samples/monitoring/logging_receive",
1315
"rustecal-samples/monitoring/monitoring_receive",
1416
"rustecal-samples/pubsub/blob_send",

rustecal-core/src/configuration.rs

Lines changed: 17 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -1,13 +1,13 @@
11
//! configuration.rs
22
//!
3-
//! Provides a safe Rust wrapper around the eCAL C Configuration API
3+
//! Provides a safe Rust wrapper around the eCAL C Configuration API.
44
//!
55
//! This module exposes a `Configuration` struct that manages an
66
//! `eCAL_Configuration` instance via FFI. It supports initializing
77
//! default settings or loading from a YAML file, and automatically
88
//! frees the underlying C object on drop.
99
10-
use std::{ffi::{CStr, CString}, path::Path};
10+
use std::{ffi::{CStr, CString}, path::Path, ops::{Deref, DerefMut}};
1111
use thiserror::Error;
1212
use rustecal_sys as sys;
1313

@@ -31,33 +31,25 @@ unsafe impl Sync for Configuration {}
3131
impl Configuration {
3232
/// Creates a new Configuration with default values loaded via eCAL_Configuration_InitFromConfig
3333
pub fn new() -> Result<Self, ConfigError> {
34-
// Allocate new eCAL_Configuration
3534
let cfg = unsafe { sys::eCAL_Configuration_New() };
3635
if cfg.is_null() {
3736
return Err(ConfigError::NullPointer);
3837
}
39-
// Initialize configuration with default settings
4038
unsafe { sys::eCAL_Configuration_InitFromConfig(cfg) };
4139
Ok(Configuration { inner: cfg })
4240
}
4341

4442
/// Loads a Configuration from a YAML file at the given path
4543
pub fn from_file(path: &str) -> Result<Self, ConfigError> {
46-
// Check that the file exists
4744
if !Path::new(path).exists() {
4845
return Err(ConfigError::InvalidPath(path.to_string()));
4946
}
50-
// Convert Rust &str to CString
5147
let c_path = CString::new(path).map_err(|_| ConfigError::InvalidPath(path.to_string()))?;
52-
53-
// Allocate new eCAL_Configuration
5448
let cfg = unsafe { sys::eCAL_Configuration_New() };
5549
if cfg.is_null() {
5650
return Err(ConfigError::NullPointer);
5751
}
58-
// Load configuration from file (void return type)
5952
unsafe { sys::eCAL_Configuration_InitFromFile(cfg, c_path.as_ptr()) };
60-
6153
Ok(Configuration { inner: cfg })
6254
}
6355

@@ -79,6 +71,21 @@ impl Configuration {
7971
}
8072
}
8173

74+
/// Allow transparent access to the underlying C struct
75+
impl Deref for Configuration {
76+
type Target = sys::eCAL_Configuration;
77+
fn deref(&self) -> &Self::Target {
78+
unsafe { &*self.inner }
79+
}
80+
}
81+
82+
/// Allow mutable access to the underlying C struct
83+
impl DerefMut for Configuration {
84+
fn deref_mut(&mut self) -> &mut Self::Target {
85+
unsafe { &mut *self.inner }
86+
}
87+
}
88+
8289
impl Drop for Configuration {
8390
/// Deletes the underlying eCAL_Configuration on drop
8491
fn drop(&mut self) {
Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1 @@
1+
/target
Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,8 @@
1+
[package]
2+
name = "performance_receive"
3+
version = "0.1.0"
4+
edition = "2021"
5+
6+
[dependencies]
7+
rustecal = { path = "../../../rustecal", features = ["pubsub"] }
8+
rustecal-types-bytes = { path = "../../../rustecal-types-bytes" }
Lines changed: 74 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,74 @@
1+
//! A performance benchmark subscriber in Rust, modeled on the eCAL C++ sample.
2+
//!
3+
4+
use std::{sync::{Arc, Mutex, atomic::Ordering}, thread, time::{Duration, Instant}};
5+
use rustecal::{Ecal, EcalComponents, TypedSubscriber};
6+
use rustecal::pubsub::typed_subscriber::Received;
7+
use rustecal_types_bytes::BytesMessage;
8+
9+
fn main() -> Result<(), Box<dyn std::error::Error>> {
10+
// initialize eCAL
11+
Ecal::initialize(Some("performance receive rust"), EcalComponents::DEFAULT, None)?;
12+
13+
// create a typed subscriber for raw bytes
14+
let mut subscriber: TypedSubscriber<BytesMessage> = TypedSubscriber::new("Performance")?;
15+
16+
// shared counters & timer
17+
let msgs = Arc::new(std::sync::atomic::AtomicU64::new(0));
18+
let bytes = Arc::new(std::sync::atomic::AtomicU64::new(0));
19+
let start = Arc::new(Mutex::new(Instant::now()));
20+
21+
// register the receive‐callback
22+
{
23+
let msgs = Arc::clone(&msgs);
24+
let bytes = Arc::clone(&bytes);
25+
let start = Arc::clone(&start);
26+
27+
subscriber.set_callback(move |msg: Received<BytesMessage>| {
28+
let buffer = &msg.payload.data;
29+
if buffer.is_empty() {
30+
// nothing to do
31+
return;
32+
}
33+
34+
// update counters
35+
msgs.fetch_add(1, std::sync::atomic::Ordering::Relaxed);
36+
bytes.fetch_add(buffer.len() as u64, std::sync::atomic::Ordering::Relaxed);
37+
38+
// lock the timer, compute & maybe print
39+
let mut start_lock = start.lock().unwrap();
40+
let elapsed = start_lock.elapsed();
41+
if elapsed >= Duration::from_secs(1) {
42+
let m = msgs.swap(0, Ordering::Relaxed);
43+
let b = bytes.swap(0, Ordering::Relaxed);
44+
45+
let secs = elapsed.as_secs_f64();
46+
let kbyte_s = (b as f64 / 1024.0) / secs;
47+
let mbyte_s = kbyte_s / 1024.0;
48+
let gbyte_s = mbyte_s / 1024.0;
49+
let msg_s = (m as f64) / secs;
50+
let latency_us = (secs * 1e6) / (m as f64);
51+
52+
println!("Payload size : {} kB", buffer.len() / 1024);
53+
println!("Throughput (kB/s) : {:.0}", kbyte_s);
54+
println!("Throughput (MB/s) : {:.2}", mbyte_s);
55+
println!("Throughput (GB/s) : {:.3}", gbyte_s);
56+
println!("Messages/s : {:.0}", msg_s);
57+
println!("Latency (µs) : {:.2}", latency_us);
58+
println!();
59+
60+
// reset the timer
61+
*start_lock = Instant::now();
62+
}
63+
});
64+
}
65+
66+
// keep the thread alive so callbacks can run
67+
while Ecal::ok() {
68+
thread::sleep(Duration::from_millis(100));
69+
}
70+
71+
// clean up and finalize eCAL
72+
Ecal::finalize();
73+
Ok(())
74+
}
Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1 @@
1+
/target
Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,8 @@
1+
[package]
2+
name = "performance_send"
3+
version = "0.1.0"
4+
edition = "2021"
5+
6+
[dependencies]
7+
rustecal = { path = "../../../rustecal", features = ["pubsub"] }
8+
rustecal-types-bytes = { path = "../../../rustecal-types-bytes" }
Lines changed: 111 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,111 @@
1+
//! A performance benchmark publisher in Rust, modeled on the eCAL C++ sample.
2+
//!
3+
//! This will send messages of the given size in a tight loop, logging
4+
//! throughput every second.
5+
6+
use std::{env, sync::Arc, time::{Duration, Instant}};
7+
use rustecal::{Ecal, EcalComponents, Configuration, TypedPublisher};
8+
use rustecal_types_bytes::BytesMessage;
9+
10+
// performance settings
11+
const ZERO_COPY: bool = true;
12+
const BUFFER_COUNT: u32 = 1;
13+
const ACKNOWLEDGE_TIMEOUT_MS: i32 = 50;
14+
const PAYLOAD_SIZE_DEFAULT: usize = 8 * 1024 * 1024;
15+
16+
fn main() -> Result<(), Box<dyn std::error::Error>> {
17+
// parse payload size from CLI (or use default)
18+
let args: Vec<String> = env::args().collect();
19+
let mut payload_size = if args.len() > 1 {
20+
args[1].parse::<usize>().unwrap_or(PAYLOAD_SIZE_DEFAULT)
21+
} else {
22+
PAYLOAD_SIZE_DEFAULT
23+
};
24+
if payload_size == 0 {
25+
payload_size = 1;
26+
}
27+
28+
// log performance settings
29+
println!("Zero copy mode: {}", ZERO_COPY);
30+
println!("Number of write buffers: {}", BUFFER_COUNT);
31+
println!("Acknowledge timeout: {} ms", ACKNOWLEDGE_TIMEOUT_MS);
32+
println!("Payload size: {} bytes", payload_size);
33+
println!();
34+
35+
// prepare and tweak eCAL Configuration
36+
let mut cfg = Configuration::new()?;
37+
cfg.publisher.layer.shm.zero_copy_mode = ZERO_COPY as i32;
38+
cfg.publisher.layer.shm.memfile_buffer_count = BUFFER_COUNT;
39+
cfg.publisher.layer.shm.acknowledge_timeout_ms = ACKNOWLEDGE_TIMEOUT_MS as u32;
40+
41+
// initialize eCAL with custom config
42+
Ecal::initialize(
43+
Some("performance send rust"),
44+
EcalComponents::DEFAULT,
45+
Some(&cfg),
46+
)
47+
.expect("eCAL initialization failed");
48+
49+
// create payload buffer and publisher
50+
let payload_vec: Vec<u8> = vec![0u8; payload_size];
51+
let mut payload: Arc<[u8]> = Arc::from(payload_vec);
52+
let publisher: TypedPublisher<BytesMessage> = TypedPublisher::new("Performance")?;
53+
54+
// benchmark loop
55+
let mut msgs_sent = 0u64;
56+
let mut bytes_sent = 0u64;
57+
let mut iterations = 0u64;
58+
let mut last_log = Instant::now();
59+
60+
// wait for at least one subscriber to be ready
61+
while publisher.get_subscriber_count() == 0 {
62+
println!("Waiting for performance receive to start ...");
63+
std::thread::sleep(Duration::from_millis(1000));
64+
}
65+
66+
// send loop
67+
while Ecal::ok() {
68+
// modify the payload for each message
69+
{
70+
let buf: &mut [u8] = Arc::make_mut(&mut payload);
71+
let chr = (msgs_sent % 9 + 48) as u8;
72+
buf[0..16].fill(chr);
73+
}
74+
75+
let wrapped = BytesMessage { data: payload.clone() };
76+
publisher.send(&wrapped);
77+
78+
msgs_sent += 1;
79+
bytes_sent += payload_size as u64;
80+
iterations += 1;
81+
82+
// every second, print statistics
83+
if iterations % 2000 == 0 {
84+
let elapsed = last_log.elapsed();
85+
if elapsed >= Duration::from_secs(1) {
86+
let secs = elapsed.as_secs_f64();
87+
let kbyte_s = (bytes_sent as f64 / 1024.0) / secs;
88+
let mbyte_s = kbyte_s / 1024.0;
89+
let gbyte_s = mbyte_s / 1024.0;
90+
let msg_s = (msgs_sent as f64) / secs;
91+
let latency_us = (secs * 1e6) / (msgs_sent as f64);
92+
93+
println!("Payload size : {} kB", payload_size / 1024);
94+
println!("Throughput (kB/s) : {:.0}", kbyte_s);
95+
println!("Throughput (MB/s) : {:.2}", mbyte_s);
96+
println!("Throughput (GB/s) : {:.3}", gbyte_s);
97+
println!("Messages/s : {:.0}", msg_s);
98+
println!("Latency (µs) : {:.2}", latency_us);
99+
println!();
100+
101+
msgs_sent = 0;
102+
bytes_sent = 0;
103+
last_log = Instant::now();
104+
}
105+
}
106+
}
107+
108+
// clean up and finalize eCAL
109+
Ecal::finalize();
110+
Ok(())
111+
}

rustecal-samples/monitoring/logging_receive/src/main.rs

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -3,18 +3,18 @@ use rustecal_core::log::Log;
33
use std::{thread, time::Duration};
44

55
fn main() -> Result<(), Box<dyn std::error::Error>> {
6-
// Initialize eCAL (only the logging component)
6+
// initialize eCAL (only the logging component)
77
Ecal::initialize(Some("logging receive sample"), EcalComponents::LOGGING, None)?;
88
println!("eCAL initialized. Entering logging loop…");
99

1010
while Ecal::ok() {
11-
// Fetch whatever log entries are available
11+
// fetch whatever log entries are available
1212
let entries = Log::get_logging()?;
1313

1414
println!("=== Logging Snapshot ===\n");
1515
println!("Entries:\n{:#?}", entries);
1616

17-
// Sleep before next poll
17+
// sleep before next poll
1818
thread::sleep(Duration::from_secs(1));
1919
}
2020

rustecal-samples/monitoring/monitoring_receive/src/main.rs

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -3,7 +3,7 @@ use rustecal_core::monitoring::Monitoring;
33
use std::{thread, time::Duration};
44

55
fn main() -> Result<(), Box<dyn std::error::Error>> {
6-
// Initialize eCAL (only the monitoring component)
6+
// initialize eCAL (only the monitoring component)
77
Ecal::initialize(Some("monitoring receive sample"), EcalComponents::MONITORING, None)?;
88
println!("eCAL initialized. Entering monitoring loop…");
99

@@ -18,7 +18,7 @@ fn main() -> Result<(), Box<dyn std::error::Error>> {
1818
println!("\nServers:\n{:#?}", snap.servers);
1919
println!("\nClients:\n{:#?}", snap.clients);
2020

21-
// Sleep before next poll
21+
// sleep before next poll
2222
thread::sleep(Duration::from_secs(1));
2323
}
2424

0 commit comments

Comments
 (0)