@@ -25,6 +25,9 @@ use std::vec::Vec;
2525
2626use parking_lot:: Mutex ;
2727
28+ mod exclusivity_guard;
29+ use exclusivity_guard:: * ;
30+
2831/// A struct for waiting on subscriptions and other waitable entities to become ready.
2932pub struct WaitSet {
3033 rcl_wait_set : rcl_wait_set_t ,
@@ -33,9 +36,9 @@ pub struct WaitSet {
3336 // The subscriptions that are currently registered in the wait set.
3437 // This correspondence is an invariant that must be maintained by all functions,
3538 // even in the error case.
36- subscriptions : Vec < Arc < dyn SubscriptionBase > > ,
37- clients : Vec < Arc < dyn ClientBase > > ,
38- services : Vec < Arc < dyn ServiceBase > > ,
39+ subscriptions : Vec < ExclusivityGuard < Arc < dyn SubscriptionBase > > > ,
40+ clients : Vec < ExclusivityGuard < Arc < dyn ClientBase > > > ,
41+ services : Vec < ExclusivityGuard < Arc < dyn ServiceBase > > > ,
3942}
4043
4144/// A list of entities that are ready, returned by [`WaitSet::wait`].
@@ -118,17 +121,22 @@ impl WaitSet {
118121
119122 /// Adds a subscription to the wait set.
120123 ///
121- /// It is possible, but not useful, to add the same subscription twice.
122- ///
123- /// This will return an error if the number of subscriptions in the wait set is larger than the
124- /// capacity set in [`WaitSet::new`].
124+ /// # Errors
125+ /// - If the subscription was already added to this wait set or another one,
126+ /// [`AlreadyAddedToWaitSet`][1] will be returned
127+ /// - If the number of subscriptions in the wait set is larger than the
128+ /// capacity set in [`WaitSet::new`], [`WaitSetFull`][2] will be returned
125129 ///
126- /// The same subscription must not be added to multiple wait sets, because that would make it
127- /// unsafe to simultaneously wait on those wait sets.
130+ /// [1]: crate::RclrsError
131+ /// [2]: crate::RclReturnCode
128132 pub fn add_subscription (
129133 & mut self ,
130134 subscription : Arc < dyn SubscriptionBase > ,
131135 ) -> Result < ( ) , RclrsError > {
136+ let exclusive_subscription = ExclusivityGuard :: new (
137+ Arc :: clone ( & subscription) ,
138+ Arc :: clone ( & subscription. handle ( ) . in_use_by_wait_set ) ,
139+ ) ?;
132140 unsafe {
133141 // SAFETY: I'm not sure if it's required, but the subscription pointer will remain valid
134142 // for as long as the wait set exists, because it's stored in self.subscriptions.
@@ -140,20 +148,25 @@ impl WaitSet {
140148 )
141149 }
142150 . ok ( ) ?;
143- self . subscriptions . push ( subscription ) ;
151+ self . subscriptions . push ( exclusive_subscription ) ;
144152 Ok ( ( ) )
145153 }
146154
147155 /// Adds a client to the wait set.
148156 ///
149- /// It is possible, but not useful, to add the same client twice.
150- ///
151- /// This will return an error if the number of clients in the wait set is larger than the
152- /// capacity set in [`WaitSet::new`].
157+ /// # Errors
158+ /// - If the client was already added to this wait set or another one,
159+ /// [`AlreadyAddedToWaitSet`][1] will be returned
160+ /// - If the number of clients in the wait set is larger than the
161+ /// capacity set in [`WaitSet::new`], [`WaitSetFull`][2] will be returned
153162 ///
154- /// The same client must not be added to multiple wait sets, because that would make it
155- /// unsafe to simultaneously wait on those wait sets.
163+ /// [1]: crate::RclrsError
164+ /// [2]: crate::RclReturnCode
156165 pub fn add_client ( & mut self , client : Arc < dyn ClientBase > ) -> Result < ( ) , RclrsError > {
166+ let exclusive_client = ExclusivityGuard :: new (
167+ Arc :: clone ( & client) ,
168+ Arc :: clone ( & client. handle ( ) . in_use_by_wait_set ) ,
169+ ) ?;
157170 unsafe {
158171 // SAFETY: I'm not sure if it's required, but the client pointer will remain valid
159172 // for as long as the wait set exists, because it's stored in self.clients.
@@ -165,20 +178,25 @@ impl WaitSet {
165178 )
166179 }
167180 . ok ( ) ?;
168- self . clients . push ( client ) ;
181+ self . clients . push ( exclusive_client ) ;
169182 Ok ( ( ) )
170183 }
171184
172185 /// Adds a service to the wait set.
173186 ///
174- /// It is possible, but not useful, to add the same service twice.
175- ///
176- /// This will return an error if the number of services in the wait set is larger than the
177- /// capacity set in [`WaitSet::new`].
187+ /// # Errors
188+ /// - If the service was already added to this wait set or another one,
189+ /// [`AlreadyAddedToWaitSet`][1] will be returned
190+ /// - If the number of services in the wait set is larger than the
191+ /// capacity set in [`WaitSet::new`], [`WaitSetFull`][2] will be returned
178192 ///
179- /// The same service must not be added to multiple wait sets, because that would make it
180- /// unsafe to simultaneously wait on those wait sets.
193+ /// [1]: crate::RclrsError
194+ /// [2]: crate::RclReturnCode
181195 pub fn add_service ( & mut self , service : Arc < dyn ServiceBase > ) -> Result < ( ) , RclrsError > {
196+ let exclusive_service = ExclusivityGuard :: new (
197+ Arc :: clone ( & service) ,
198+ Arc :: clone ( & service. handle ( ) . in_use_by_wait_set ) ,
199+ ) ?;
182200 unsafe {
183201 // SAFETY: I'm not sure if it's required, but the service pointer will remain valid
184202 // for as long as the wait set exists, because it's stored in self.services.
@@ -190,7 +208,7 @@ impl WaitSet {
190208 )
191209 }
192210 . ok ( ) ?;
193- self . services . push ( service ) ;
211+ self . services . push ( exclusive_service ) ;
194212 Ok ( ( ) )
195213 }
196214
@@ -245,7 +263,9 @@ impl WaitSet {
245263 // https://github.com/ros2/rcl/blob/35a31b00a12f259d492bf53c0701003bd7f1745c/rcl/include/rcl/wait.h#L419
246264 let wait_set_entry = unsafe { * self . rcl_wait_set . subscriptions . add ( i) } ;
247265 if !wait_set_entry. is_null ( ) {
248- ready_entities. subscriptions . push ( subscription. clone ( ) ) ;
266+ ready_entities
267+ . subscriptions
268+ . push ( Arc :: clone ( & subscription. waitable ) ) ;
249269 }
250270 }
251271 for ( i, client) in self . clients . iter ( ) . enumerate ( ) {
@@ -254,7 +274,7 @@ impl WaitSet {
254274 // https://github.com/ros2/rcl/blob/35a31b00a12f259d492bf53c0701003bd7f1745c/rcl/include/rcl/wait.h#L419
255275 let wait_set_entry = unsafe { * self . rcl_wait_set . clients . add ( i) } ;
256276 if !wait_set_entry. is_null ( ) {
257- ready_entities. clients . push ( client . clone ( ) ) ;
277+ ready_entities. clients . push ( Arc :: clone ( & client . waitable ) ) ;
258278 }
259279 }
260280 for ( i, service) in self . services . iter ( ) . enumerate ( ) {
@@ -263,9 +283,48 @@ impl WaitSet {
263283 // https://github.com/ros2/rcl/blob/35a31b00a12f259d492bf53c0701003bd7f1745c/rcl/include/rcl/wait.h#L419
264284 let wait_set_entry = unsafe { * self . rcl_wait_set . services . add ( i) } ;
265285 if !wait_set_entry. is_null ( ) {
266- ready_entities. services . push ( service . clone ( ) ) ;
286+ ready_entities. services . push ( Arc :: clone ( & service . waitable ) ) ;
267287 }
268288 }
269289 Ok ( ready_entities)
270290 }
271291}
292+
293+ #[ cfg( test) ]
294+ mod tests {
295+ use crate :: { Context , Node , RclrsError , WaitSet , QOS_PROFILE_DEFAULT } ;
296+ use std:: sync:: Arc ;
297+
298+ #[ test]
299+ fn test_adding_waitable_to_wait_sets ( ) -> Result < ( ) , RclrsError > {
300+ let context = Context :: new ( [ ] ) ?;
301+ let mut node = Node :: new ( & context, "test_adding_waitable_to_wait_sets" ) ?;
302+ let subscription = node. create_subscription (
303+ "test" ,
304+ QOS_PROFILE_DEFAULT ,
305+ move |_: std_msgs:: msg:: String | { } ,
306+ ) ?;
307+ let mut wait_set_1 = WaitSet :: new ( 1 , 0 , 0 , 0 , 0 , 0 , & context) ?;
308+ let mut wait_set_2 = WaitSet :: new ( 1 , 0 , 0 , 0 , 0 , 0 , & context) ?;
309+
310+ // Try to add the subscription to wait set 1 twice
311+ wait_set_1. add_subscription ( Arc :: clone ( & subscription) as _ ) ?;
312+ assert ! ( wait_set_1
313+ . add_subscription( Arc :: clone( & subscription) as _)
314+ . is_err( ) ) ;
315+
316+ // Try to add it to another wait set
317+ assert ! ( wait_set_2
318+ . add_subscription( Arc :: clone( & subscription) as _)
319+ . is_err( ) ) ;
320+
321+ // It works as soon as it is not anymore part of wait_set_1
322+ wait_set_1. clear ( ) ;
323+ wait_set_2. add_subscription ( Arc :: clone ( & subscription) as _ ) ?;
324+
325+ // Dropping the wait set also frees up the subscription
326+ drop ( wait_set_2) ;
327+ wait_set_1. add_subscription ( Arc :: clone ( & subscription) as _ ) ?;
328+ Ok ( ( ) )
329+ }
330+ }
0 commit comments