Skip to content

Commit 8f0f192

Browse files
committed
Experimenting with only taking from services and clients in the same thread as the wait set
Signed-off-by: Michael X. Grey <[email protected]>
1 parent fe56cc9 commit 8f0f192

File tree

4 files changed

+178
-79
lines changed

4 files changed

+178
-79
lines changed

rclrs/src/client.rs

Lines changed: 116 additions & 21 deletions
Original file line numberDiff line numberDiff line change
@@ -1,18 +1,16 @@
11
use std::{
2-
boxed::Box,
32
ffi::CString,
43
sync::{Arc, Mutex, MutexGuard},
4+
collections::HashMap,
55
};
66

7-
use futures::channel::mpsc::{UnboundedSender, unbounded};
8-
97
use rosidl_runtime_rs::Message;
108

119
use crate::{
1210
error::ToResult,
1311
rcl_bindings::*,
14-
MessageCow, Node, RclrsError, Promise, ENTITY_LIFECYCLE_MUTEX,
15-
ExecutorCommands, Executable, QoSProfile, Waitable, WaitableLifecycle,
12+
MessageCow, Node, RclrsError, RclReturnCode, Promise, ENTITY_LIFECYCLE_MUTEX,
13+
Executable, QoSProfile, Waitable, WaitableLifecycle,
1614
ExecutableHandle, ExecutableKind, ServiceInfo,
1715
};
1816

@@ -25,9 +23,6 @@ pub use client_callback::*;
2523
mod client_output;
2624
pub use client_output::*;
2725

28-
mod client_task;
29-
use client_task::*;
30-
3126
/// Main class responsible for sending requests to a ROS service.
3227
///
3328
/// The only available way to instantiate clients is via [`Node::create_client`][1], this is to
@@ -40,7 +35,7 @@ where
4035
T: rosidl_runtime_rs::Service,
4136
{
4237
handle: Arc<ClientHandle>,
43-
action: UnboundedSender<ClientAction<T>>,
38+
board: Arc<Mutex<ClientRequestBoard<T>>>,
4439
#[allow(unused)]
4540
lifecycle: WaitableLifecycle,
4641
}
@@ -86,10 +81,9 @@ where
8681
}
8782
.ok()?;
8883

84+
println!("vvvvvvvvv Sent client request {sequence_number} vvvvvvvvvvvv");
8985
// TODO(@mxgrey): Log errors here when logging becomes available.
90-
self.action.unbounded_send(
91-
ClientAction::NewRequest { sequence_number, sender }
92-
).ok();
86+
self.board.lock().unwrap().new_request(sequence_number, sender);
9387

9488
Ok(promise)
9589
}
@@ -231,39 +225,39 @@ where
231225
});
232226

233227
let commands = node.commands();
234-
235-
let (action, receiver) = unbounded();
236-
let _ = commands.run(client_task(receiver, Arc::clone(&handle)));
228+
let board = Arc::new(Mutex::new(ClientRequestBoard::new()));
237229

238230
let (waitable, lifecycle) = Waitable::new(
239231
Box::new(ClientExecutable {
240232
handle: Arc::clone(&handle),
241-
action: action.clone(),
233+
board: Arc::clone(&board),
242234
}),
243235
Some(Arc::clone(&commands.get_guard_condition())),
244236
);
245237
commands.add_to_wait_set(waitable);
246238

247239
Ok(Arc::new(Self {
248240
handle,
249-
action,
241+
board,
250242
lifecycle,
251243
}))
252244
}
253245
}
254246

255-
struct ClientExecutable<T: rosidl_runtime_rs::Service> {
247+
struct ClientExecutable<T>
248+
where
249+
T: rosidl_runtime_rs::Service,
250+
{
256251
handle: Arc<ClientHandle>,
257-
action: UnboundedSender<ClientAction<T>>
252+
board: Arc<Mutex<ClientRequestBoard<T>>>
258253
}
259254

260255
impl<T> Executable for ClientExecutable<T>
261256
where
262257
T: rosidl_runtime_rs::Service,
263258
{
264259
fn execute(&mut self) -> Result<(), RclrsError> {
265-
self.action.unbounded_send(ClientAction::TakeResponse).ok();
266-
Ok(())
260+
self.board.lock().unwrap().execute(&self.handle)
267261
}
268262

269263
fn handle(&self) -> ExecutableHandle {
@@ -275,6 +269,107 @@ where
275269
}
276270
}
277271

272+
type SequenceNumber = i64;
273+
274+
/// This is used internally to monitor the state of active requests, as well as
275+
/// responses that have arrived without a known request.
276+
struct ClientRequestBoard<T>
277+
where
278+
T: rosidl_runtime_rs::Service,
279+
{
280+
// This stores all active requests that have not received a response yet
281+
active_requests: HashMap<SequenceNumber, AnyClientOutputSender<T::Response>>,
282+
// This holds responses that came in when no active request matched the
283+
// sequence number. This could happen if take_response is triggered before
284+
// the new_request for the same sequence number. That is extremely unlikely
285+
// to ever happen but is theoretically possible on systems that may exhibit
286+
// very strange CPU scheduling patterns, so we should account for it.
287+
loose_responses: HashMap<SequenceNumber, (T::Response, rmw_service_info_t)>,
288+
}
289+
290+
impl<T> ClientRequestBoard<T>
291+
where
292+
T: rosidl_runtime_rs::Service,
293+
{
294+
fn new() -> Self {
295+
Self {
296+
active_requests: Default::default(),
297+
loose_responses: Default::default(),
298+
}
299+
}
300+
301+
fn new_request(
302+
&mut self,
303+
sequence_number: SequenceNumber,
304+
sender: AnyClientOutputSender<T::Response>,
305+
) {
306+
if let Some((response, info)) = self.loose_responses.remove(&sequence_number) {
307+
// Weirdly the response for this request already arrived, so we'll
308+
// send it off immediately.
309+
sender.send_response(response, info);
310+
} else {
311+
self.active_requests.insert(sequence_number, sender);
312+
}
313+
}
314+
315+
fn execute(&mut self, handle: &Arc<ClientHandle>) -> Result<(), RclrsError> {
316+
match self.take_response(handle) {
317+
Ok((response, info)) => {
318+
let seq = info.request_id.sequence_number;
319+
if let Some(sender) = self.active_requests.remove(&seq) {
320+
dbg!();
321+
println!("Received response for {info:?}");
322+
// The active request is available, so send this response off
323+
sender.send_response(response, info);
324+
} else {
325+
dbg!();
326+
println!("Received loose response for {info:?}");
327+
// Weirdly there isn't an active request for this, so save
328+
// it in the loose responses map.
329+
self.loose_responses.insert(seq, (response, info));
330+
}
331+
}
332+
Err(err) => {
333+
match err {
334+
RclrsError::RclError { code: RclReturnCode::ClientTakeFailed, .. } => {
335+
// This is okay, it means a spurious wakeup happened
336+
dbg!();
337+
println!("Spurious wakeup for client");
338+
}
339+
err => {
340+
dbg!();
341+
// TODO(@mxgrey): Log the error here once logging is available
342+
eprintln!("Error while taking a response for a client: {err}");
343+
}
344+
}
345+
}
346+
}
347+
Ok(())
348+
}
349+
350+
fn take_response(
351+
&self,
352+
handle: &Arc<ClientHandle>,
353+
) -> Result<(T::Response, rmw_service_info_t), RclrsError> {
354+
let mut service_info_out = ServiceInfo::zero_initialized_rmw();
355+
let mut response_out = <T::Response as Message>::RmwMsg::default();
356+
let handle = &*handle.lock();
357+
unsafe {
358+
// SAFETY: The three pointers are all kept valid by the handle
359+
rcl_take_response_with_info(
360+
handle,
361+
&mut service_info_out,
362+
&mut response_out as *mut <T::Response as Message>::RmwMsg as *mut _,
363+
)
364+
}
365+
.ok()
366+
.map(|_| (
367+
T::Response::from_rmw_message(response_out),
368+
service_info_out,
369+
))
370+
}
371+
}
372+
278373
/// Manage the lifecycle of an `rcl_client_t`, including managing its dependencies
279374
/// on `rcl_node_t` and `rcl_context_t` by ensuring that these dependencies are
280375
/// [dropped after][1] the `rcl_client_t`.

rclrs/src/client/client_task.rs

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -54,10 +54,12 @@ pub(super) async fn client_task<T: Service>(
5454
let seq = info.request_id.sequence_number;
5555
if let Some(sender) = active_requests.remove(&seq) {
5656
dbg!();
57+
println!("Received response for {info:?}");
5758
// The active request is available, so send this response off
5859
sender.send_response(response, info);
5960
} else {
6061
dbg!();
62+
println!("Received loose response for {info:?}");
6163
// Weirdly there isn't an active request for this, so save
6264
// it in the loose responses map.
6365
loose_responses.insert(seq, (response, info));
@@ -68,6 +70,7 @@ pub(super) async fn client_task<T: Service>(
6870
RclrsError::RclError { code: RclReturnCode::ClientTakeFailed, .. } => {
6971
// This is okay, it means a spurious wakeup happened
7072
dbg!();
73+
println!("Spurious wakeup for client");
7174
}
7275
err => {
7376
dbg!();

rclrs/src/service.rs

Lines changed: 20 additions & 51 deletions
Original file line numberDiff line numberDiff line change
@@ -4,12 +4,10 @@ use std::{
44
sync::{Arc, Mutex, MutexGuard},
55
};
66

7-
use futures::channel::mpsc::{unbounded, UnboundedSender};
8-
97
use crate::{
108
error::ToResult,
119
rcl_bindings::*,
12-
NodeHandle, RclrsError, Waitable, WaitableLifecycle, GuardCondition, QoSProfile,
10+
NodeHandle, RclrsError, Waitable, WaitableLifecycle, QoSProfile,
1311
Executable, ExecutableKind, ExecutableHandle, ENTITY_LIFECYCLE_MUTEX, ExecutorCommands,
1412
};
1513

@@ -25,9 +23,6 @@ pub use service_callback::*;
2523
mod service_info;
2624
pub use service_info::*;
2725

28-
mod service_task;
29-
use service_task::*;
30-
3126
/// Struct for responding to requests sent by ROS service clients.
3227
///
3328
/// The only way to instantiate a service is via [`Node::create_service()`][1]
@@ -45,11 +40,8 @@ where
4540
{
4641
/// This handle is used to access the data that rcl holds for this service.
4742
handle: Arc<ServiceHandle>,
48-
/// This allows us to replace the callback in the service task.
49-
///
50-
/// Holding onto this sender will keep the service task alive. Once this
51-
/// sender is dropped, the service task will end itself.
52-
action: UnboundedSender<ServiceAction<T>>,
43+
/// This is the callback that will be executed each time a request arrives.
44+
callback: Arc<Mutex<AnyServiceCallback<T>>>,
5345
/// Holding onto this keeps the waiter for this service alive in the wait
5446
/// set of the executor.
5547
lifecycle: WaitableLifecycle,
@@ -84,7 +76,7 @@ where
8476
) {
8577
let callback = callback.into_service_callback();
8678
// TODO(@mxgrey): Log any errors here when logging becomes available
87-
self.action.unbounded_send(ServiceAction::SetCallback(callback)).ok();
79+
*self.callback.lock().unwrap() = callback;
8880
}
8981

9082
/// Set the callback of this service, replacing the callback that was
@@ -96,8 +88,7 @@ where
9688
callback: impl ServiceAsyncCallback<T, Args>,
9789
) {
9890
let callback = callback.into_service_async_callback();
99-
// TODO(@mxgrey): Log any errors here when logging becomes available.
100-
self.action.unbounded_send(ServiceAction::SetCallback(callback)).ok();
91+
*self.callback.lock().unwrap() = callback;
10192
}
10293

10394
/// Used by [`Node`][crate::Node] to create a new service
@@ -108,35 +99,7 @@ where
10899
node_handle: &Arc<NodeHandle>,
109100
commands: &Arc<ExecutorCommands>,
110101
) -> Result<Arc<Self>, RclrsError> {
111-
let (sender, receiver) = unbounded();
112-
let (service, waitable) = Self::new(
113-
topic,
114-
qos,
115-
sender,
116-
Arc::clone(&node_handle),
117-
Arc::clone(commands.get_guard_condition()),
118-
)?;
119-
120-
let _ = commands.run(service_task(
121-
callback,
122-
receiver,
123-
Arc::clone(&service.handle),
124-
Arc::clone(commands),
125-
));
126-
127-
commands.add_to_wait_set(waitable);
128-
129-
Ok(service)
130-
}
131-
132-
/// Instantiate the service.
133-
fn new(
134-
topic: &str,
135-
qos: QoSProfile,
136-
action: UnboundedSender<ServiceAction<T>>,
137-
node_handle: Arc<NodeHandle>,
138-
guard_condition: Arc<GuardCondition>,
139-
) -> Result<(Arc<Self>, Waitable), RclrsError> {
102+
let callback = Arc::new(Mutex::new(callback));
140103
// SAFETY: Getting a zero-initialized value is always safe.
141104
let mut rcl_service = unsafe { rcl_get_zero_initialized_service() };
142105
let type_support = <T as rosidl_runtime_rs::Service>::get_type_support()
@@ -174,34 +137,40 @@ where
174137

175138
let handle = Arc::new(ServiceHandle {
176139
rcl_service: Mutex::new(rcl_service),
177-
node_handle,
140+
node_handle: Arc::clone(&node_handle),
178141
});
179142

180-
let (waiter, lifecycle) = Waitable::new(
143+
let (waitable, lifecycle) = Waitable::new(
181144
Box::new(ServiceExecutable {
182145
handle: Arc::clone(&handle),
183-
action: action.clone(),
146+
callback: Arc::clone(&callback),
147+
commands: Arc::clone(&commands),
184148
}),
185-
Some(guard_condition),
149+
Some(Arc::clone(commands.get_guard_condition())),
186150
);
187151

188-
let service = Arc::new(Self { handle, action, lifecycle });
152+
let service = Arc::new(Self { handle, callback, lifecycle });
153+
commands.add_to_wait_set(waitable);
189154

190-
Ok((service, waiter))
155+
Ok(service)
191156
}
192157
}
193158

194159
struct ServiceExecutable<T: rosidl_runtime_rs::Service> {
195160
handle: Arc<ServiceHandle>,
196-
action: UnboundedSender<ServiceAction<T>>,
161+
callback: Arc<Mutex<AnyServiceCallback<T>>>,
162+
commands: Arc<ExecutorCommands>,
197163
}
198164

199165
impl<T> Executable for ServiceExecutable<T>
200166
where
201167
T: rosidl_runtime_rs::Service,
202168
{
203169
fn execute(&mut self) -> Result<(), RclrsError> {
204-
self.action.unbounded_send(ServiceAction::Execute).ok();
170+
if let Err(err) = self.callback.lock().unwrap().execute(&self.handle, &self.commands) {
171+
// TODO(@mxgrey): Log the error here once logging is implemented
172+
eprintln!("Error while executing a service callback: {err}");
173+
}
205174
Ok(())
206175
}
207176

0 commit comments

Comments
 (0)