From 2681074041b3ce0c1895b1c31ca3b456d860ee3b Mon Sep 17 00:00:00 2001 From: "Michael X. Grey" Date: Tue, 12 Aug 2025 22:36:33 +0800 Subject: [PATCH 1/6] Wake up wait set when adding waitable Signed-off-by: Michael X. Grey --- rclrs/src/executor.rs | 9 +++++---- rclrs/src/executor/basic_executor.rs | 2 +- 2 files changed, 6 insertions(+), 5 deletions(-) diff --git a/rclrs/src/executor.rs b/rclrs/src/executor.rs index 24ac5ea50..3c67c91cf 100644 --- a/rclrs/src/executor.rs +++ b/rclrs/src/executor.rs @@ -241,7 +241,7 @@ impl ExecutorCommands { } pub(crate) fn add_to_wait_set(&self, waitable: Waitable) { - self.async_worker_commands.channel.add_to_waitset(waitable); + self.async_worker_commands.add_to_wait_set(waitable); } #[cfg(test)] @@ -275,7 +275,7 @@ impl ExecutorCommands { guard_condition: Arc::clone(&guard_condition), }); - worker_channel.add_to_waitset(waitable); + worker_channel.add_to_wait_set(waitable); Arc::new(WorkerCommands { channel: worker_channel, @@ -296,7 +296,8 @@ pub(crate) struct WorkerCommands { impl WorkerCommands { pub(crate) fn add_to_wait_set(&self, waitable: Waitable) { - self.channel.add_to_waitset(waitable); + self.channel.add_to_wait_set(waitable); + let _ = self.wakeup_wait_set.trigger(); } pub(crate) fn run_async(&self, f: F) @@ -327,7 +328,7 @@ pub trait WorkerChannel: Send + Sync { fn add_async_task(&self, f: BoxFuture<'static, ()>); /// Add new entities to the waitset of the executor. - fn add_to_waitset(&self, new_entity: Waitable); + fn add_to_wait_set(&self, new_entity: Waitable); /// Send a one-time task for the worker to run with its payload. fn send_payload_task(&self, f: PayloadTask); diff --git a/rclrs/src/executor/basic_executor.rs b/rclrs/src/executor/basic_executor.rs index 1ad9476b1..4df0db9a1 100644 --- a/rclrs/src/executor/basic_executor.rs +++ b/rclrs/src/executor/basic_executor.rs @@ -308,7 +308,7 @@ struct BasicWorkerChannel { } impl WorkerChannel for BasicWorkerChannel { - fn add_to_waitset(&self, new_entity: Waitable) { + fn add_to_wait_set(&self, new_entity: Waitable) { if let Err(err) = self.waitable_sender.unbounded_send(new_entity) { // This is a debug log because it is normal for this to happen while // an executor is winding down. From 4ca197aee0c639e4b141c28c67b0cf80d567ec9c Mon Sep 17 00:00:00 2001 From: "Michael X. Grey" Date: Tue, 12 Aug 2025 23:02:21 +0800 Subject: [PATCH 2/6] Add test for subscribing while spinning Signed-off-by: Michael X. Grey --- rclrs/src/node.rs | 4 ++-- rclrs/src/publisher.rs | 23 ++++++++++++++++------ rclrs/src/subscription.rs | 40 +++++++++++++++++++++++++++++++++++++++ 3 files changed, 59 insertions(+), 8 deletions(-) diff --git a/rclrs/src/node.rs b/rclrs/src/node.rs index dd01d0605..7f4780d35 100644 --- a/rclrs/src/node.rs +++ b/rclrs/src/node.rs @@ -385,13 +385,13 @@ impl NodeState { /// ``` /// pub fn create_publisher<'a, T>( - &self, + self: &Arc, options: impl Into>, ) -> Result, RclrsError> where T: Message, { - PublisherState::::create(options, Arc::clone(&self.handle)) + PublisherState::::create(options, Arc::clone(self)) } /// Creates a [`Service`] with an ordinary callback. diff --git a/rclrs/src/publisher.rs b/rclrs/src/publisher.rs index 29a7e1579..73967dd2e 100644 --- a/rclrs/src/publisher.rs +++ b/rclrs/src/publisher.rs @@ -11,7 +11,7 @@ use crate::{ error::{RclrsError, ToResult}, qos::QoSProfile, rcl_bindings::*, - IntoPrimitiveOptions, NodeHandle, ENTITY_LIFECYCLE_MUTEX, + IntoPrimitiveOptions, Node, Promise, ENTITY_LIFECYCLE_MUTEX, }; mod loaned_message; @@ -28,12 +28,14 @@ unsafe impl Send for rcl_publisher_t {} /// [1]: struct PublisherHandle { rcl_publisher: Mutex, - node_handle: Arc, + /// We store the whole node here because we use some of its user-facing API + /// in some of the Publisher methods. + node: Node, } impl Drop for PublisherHandle { fn drop(&mut self) { - let mut rcl_node = self.node_handle.rcl_node.lock().unwrap(); + let mut rcl_node = self.node.handle().rcl_node.lock().unwrap(); let _lifecycle_lock = ENTITY_LIFECYCLE_MUTEX.lock().unwrap(); // SAFETY: The entity lifecycle mutex is locked to protect against the risk of // global variables in the rmw implementation being unsafely modified during cleanup. @@ -97,7 +99,7 @@ where /// Node and namespace changes are always applied _before_ topic remapping. pub(crate) fn create<'a>( options: impl Into>, - node_handle: Arc, + node: Node, ) -> Result, RclrsError> where T: Message, @@ -117,7 +119,7 @@ where publisher_options.qos = qos.into(); { - let rcl_node = node_handle.rcl_node.lock().unwrap(); + let rcl_node = node.handle().rcl_node.lock().unwrap(); let _lifecycle_lock = ENTITY_LIFECYCLE_MUTEX.lock().unwrap(); unsafe { // SAFETY: @@ -142,7 +144,7 @@ where message: PhantomData, handle: PublisherHandle { rcl_publisher: Mutex::new(rcl_publisher), - node_handle, + node, }, })) } @@ -177,6 +179,15 @@ where Ok(subscription_count) } + /// Get a promise that will be fulfilled when at least one subscriber is + /// listening to this publisher. + pub fn notify_on_subscriber_ready(self: &Arc>) -> Promise<()> { + let publisher = Arc::clone(self); + self.handle + .node + .notify_on_graph_change(move || publisher.get_subscription_count().is_ok_and(|count| count > 0)) + } + /// Publishes a message. /// /// The [`MessageCow`] trait is implemented by any diff --git a/rclrs/src/subscription.rs b/rclrs/src/subscription.rs index e57c542f6..6c4bcd3b3 100644 --- a/rclrs/src/subscription.rs +++ b/rclrs/src/subscription.rs @@ -520,4 +520,44 @@ mod tests { assert!(start_time.elapsed() < std::time::Duration::from_secs(10)); } } + + #[test] + fn test_delayed_subscription() { + use crate::*; + use futures::channel::oneshot; + use example_interfaces::msg::Empty; + + let mut executor = Context::default().create_basic_executor(); + let node = executor.create_node(&format!("test_delayed_subscription_{}", line!())).unwrap(); + + let (sender, receiver) = oneshot::channel(); + let sender = Arc::new(Mutex::new(Some(sender))); + + let _ = executor.commands().run(async move { + let _subscription = node.create_subscription( + "test_delayed_subscription", + move |_: Empty| { + if let Some(sender) = sender.lock().unwrap().take() { + let _ = sender.send(()); + } + }, + ).unwrap(); + + let publisher = node.create_publisher("test_delayed_subscription").unwrap(); + + // Make sure the message doesn't get dropped + let _ = publisher.notify_on_subscriber_ready().await; + + // Publish the message, which should trigger the executor to stop spinning + publisher.publish(Empty::default()).unwrap(); + }); + + let r = executor.spin( + SpinOptions::default() + .until_promise_resolved(receiver) + .timeout(std::time::Duration::from_secs(10)) + ); + + assert!(r.is_empty()); + } } From af5794d36143cd82515ab4ce8d4691a307ec31ac Mon Sep 17 00:00:00 2001 From: "Michael X. Grey" Date: Tue, 12 Aug 2025 23:28:50 +0800 Subject: [PATCH 3/6] prevent subscription from dropping Signed-off-by: Michael X. Grey --- rclrs/src/subscription.rs | 56 +++++++++++++++++++++++++++------------ 1 file changed, 39 insertions(+), 17 deletions(-) diff --git a/rclrs/src/subscription.rs b/rclrs/src/subscription.rs index 6c4bcd3b3..10ef464d2 100644 --- a/rclrs/src/subscription.rs +++ b/rclrs/src/subscription.rs @@ -524,32 +524,52 @@ mod tests { #[test] fn test_delayed_subscription() { use crate::*; - use futures::channel::oneshot; + use futures::{ + channel::{oneshot, mpsc}, + StreamExt, + }; use example_interfaces::msg::Empty; + use std::sync::atomic::{AtomicBool, Ordering}; let mut executor = Context::default().create_basic_executor(); let node = executor.create_node(&format!("test_delayed_subscription_{}", line!())).unwrap(); - let (sender, receiver) = oneshot::channel(); - let sender = Arc::new(Mutex::new(Some(sender))); + let (promise, receiver) = oneshot::channel(); + let promise = Arc::new(Mutex::new(Some(promise))); - let _ = executor.commands().run(async move { - let _subscription = node.create_subscription( - "test_delayed_subscription", - move |_: Empty| { - if let Some(sender) = sender.lock().unwrap().take() { - let _ = sender.send(()); - } - }, - ).unwrap(); + let success = Arc::new(AtomicBool::new(false)); + let send_success = Arc::clone(&success); + + let publisher = node.create_publisher("test_delayed_subscription").unwrap(); - let publisher = node.create_publisher("test_delayed_subscription").unwrap(); + let commands = Arc::clone(executor.commands()); + std::thread::spawn(move || { + // Wait a little while so the executor can start spinning + std::thread::sleep(std::time::Duration::from_millis(1)); - // Make sure the message doesn't get dropped - let _ = publisher.notify_on_subscriber_ready().await; + let _ = commands.run(async move { + let (sender, mut receiver) = mpsc::unbounded(); + let _subscription = node.create_subscription( + "test_delayed_subscription", + move |_: Empty| { + let _ = sender.unbounded_send(()); + }, + ).unwrap(); - // Publish the message, which should trigger the executor to stop spinning - publisher.publish(Empty::default()).unwrap(); + // Make sure the message doesn't get dropped due to the subscriber + // not being connected yet. + let _ = publisher.notify_on_subscriber_ready().await; + + // Publish the message, which should trigger the executor to stop spinning + publisher.publish(Empty::default()).unwrap(); + + if let Some(_) = receiver.next().await { + send_success.store(true, Ordering::Release); + if let Some(promise) = promise.lock().unwrap().take() { + promise.send(()).unwrap(); + } + } + }); }); let r = executor.spin( @@ -559,5 +579,7 @@ mod tests { ); assert!(r.is_empty()); + let message_was_received = success.load(Ordering::Acquire); + assert!(message_was_received); } } From 329c6f4b8ed3df9a7a9b9f024afb2ce896acc1fa Mon Sep 17 00:00:00 2001 From: "Michael X. Grey" Date: Tue, 12 Aug 2025 23:42:23 +0800 Subject: [PATCH 4/6] Turn off parameter services to make the test more reliable Signed-off-by: Michael X. Grey --- rclrs/src/subscription.rs | 14 ++++++++++---- 1 file changed, 10 insertions(+), 4 deletions(-) diff --git a/rclrs/src/subscription.rs b/rclrs/src/subscription.rs index 10ef464d2..3e674f4ca 100644 --- a/rclrs/src/subscription.rs +++ b/rclrs/src/subscription.rs @@ -532,7 +532,12 @@ mod tests { use std::sync::atomic::{AtomicBool, Ordering}; let mut executor = Context::default().create_basic_executor(); - let node = executor.create_node(&format!("test_delayed_subscription_{}", line!())).unwrap(); + let node = executor.create_node( + format!("test_delayed_subscription_{}", line!()) + // We need to turn off parameter services because their activity will + // wake up the wait set, which defeats the purpose of this test. + .start_parameter_services(false) + ).unwrap(); let (promise, receiver) = oneshot::channel(); let promise = Arc::new(Mutex::new(Some(promise))); @@ -544,8 +549,9 @@ mod tests { let commands = Arc::clone(executor.commands()); std::thread::spawn(move || { - // Wait a little while so the executor can start spinning - std::thread::sleep(std::time::Duration::from_millis(1)); + // Wait a little while so the executor can start spinning and guard + // conditions can settle down. + std::thread::sleep(std::time::Duration::from_millis(10)); let _ = commands.run(async move { let (sender, mut receiver) = mpsc::unbounded(); @@ -578,7 +584,7 @@ mod tests { .timeout(std::time::Duration::from_secs(10)) ); - assert!(r.is_empty()); + assert!(r.is_empty(), "{r:?}"); let message_was_received = success.load(Ordering::Acquire); assert!(message_was_received); } From 9e5045d90f1f27717e4f0e0f510a037a84d77690 Mon Sep 17 00:00:00 2001 From: "Michael X. Grey" Date: Tue, 12 Aug 2025 23:58:21 +0800 Subject: [PATCH 5/6] Fix formatting Signed-off-by: Michael X. Grey --- rclrs/src/publisher.rs | 8 +++++--- rclrs/src/subscription.rs | 31 ++++++++++++++++--------------- 2 files changed, 21 insertions(+), 18 deletions(-) diff --git a/rclrs/src/publisher.rs b/rclrs/src/publisher.rs index 73967dd2e..1a5ea3210 100644 --- a/rclrs/src/publisher.rs +++ b/rclrs/src/publisher.rs @@ -183,9 +183,11 @@ where /// listening to this publisher. pub fn notify_on_subscriber_ready(self: &Arc>) -> Promise<()> { let publisher = Arc::clone(self); - self.handle - .node - .notify_on_graph_change(move || publisher.get_subscription_count().is_ok_and(|count| count > 0)) + self.handle.node.notify_on_graph_change(move || { + publisher + .get_subscription_count() + .is_ok_and(|count| count > 0) + }) } /// Publishes a message. diff --git a/rclrs/src/subscription.rs b/rclrs/src/subscription.rs index 3e674f4ca..e094abfde 100644 --- a/rclrs/src/subscription.rs +++ b/rclrs/src/subscription.rs @@ -524,20 +524,22 @@ mod tests { #[test] fn test_delayed_subscription() { use crate::*; + use example_interfaces::msg::Empty; use futures::{ - channel::{oneshot, mpsc}, + channel::{mpsc, oneshot}, StreamExt, }; - use example_interfaces::msg::Empty; use std::sync::atomic::{AtomicBool, Ordering}; let mut executor = Context::default().create_basic_executor(); - let node = executor.create_node( - format!("test_delayed_subscription_{}", line!()) - // We need to turn off parameter services because their activity will - // wake up the wait set, which defeats the purpose of this test. - .start_parameter_services(false) - ).unwrap(); + let node = executor + .create_node( + format!("test_delayed_subscription_{}", line!()) + // We need to turn off parameter services because their activity will + // wake up the wait set, which defeats the purpose of this test. + .start_parameter_services(false), + ) + .unwrap(); let (promise, receiver) = oneshot::channel(); let promise = Arc::new(Mutex::new(Some(promise))); @@ -555,12 +557,11 @@ mod tests { let _ = commands.run(async move { let (sender, mut receiver) = mpsc::unbounded(); - let _subscription = node.create_subscription( - "test_delayed_subscription", - move |_: Empty| { + let _subscription = node + .create_subscription("test_delayed_subscription", move |_: Empty| { let _ = sender.unbounded_send(()); - }, - ).unwrap(); + }) + .unwrap(); // Make sure the message doesn't get dropped due to the subscriber // not being connected yet. @@ -580,8 +581,8 @@ mod tests { let r = executor.spin( SpinOptions::default() - .until_promise_resolved(receiver) - .timeout(std::time::Duration::from_secs(10)) + .until_promise_resolved(receiver) + .timeout(std::time::Duration::from_secs(10)), ); assert!(r.is_empty(), "{r:?}"); From 199a77507f6db36f12b7289c9ddc4f9d88d4eea8 Mon Sep 17 00:00:00 2001 From: "Michael X. Grey" Date: Wed, 13 Aug 2025 00:10:50 +0800 Subject: [PATCH 6/6] Trigger CI Signed-off-by: Michael X. Grey