Skip to content

Commit 38e47c3

Browse files
committed
RECREATION OF Fixes for intra-process actions (#144)
1 parent 0b6ea6c commit 38e47c3

File tree

11 files changed

+473
-471
lines changed

11 files changed

+473
-471
lines changed

rclcpp/include/rclcpp/experimental/action_client_intra_process.hpp

Lines changed: 135 additions & 87 deletions
Original file line numberDiff line numberDiff line change
@@ -50,35 +50,52 @@ class ActionClientIntraProcess : public ActionClientIntraProcessBase
5050

5151
// Useful aliases for the action client data types
5252
using ResponseCallback = std::function<void (std::shared_ptr<void>)>;
53+
54+
// Aliases for the GoalResponse ring buffer
5355
using GoalResponse = typename ActionT::Impl::SendGoalService::Response;
5456
using GoalResponseSharedPtr = typename std::shared_ptr<GoalResponse>;
57+
using GoalResponseDataPair = typename std::pair<uint64_t, GoalResponseSharedPtr>;
58+
using GoalResponseVoidDataPair = typename std::pair<uint64_t, std::shared_ptr<void>>;
59+
using GoalResponsePairSharedPtr = typename std::shared_ptr<GoalResponseDataPair>;
60+
61+
// Aliases for the ResultResponse ring buffer
5562
using ResultResponse = typename ActionT::Impl::GetResultService::Response;
5663
using ResultResponseSharedPtr = typename std::shared_ptr<ResultResponse>;
64+
using ResultResponseDataPair = typename std::pair<uint64_t, ResultResponseSharedPtr>;
65+
using ResultResponseVoidDataPair = typename std::pair<uint64_t, std::shared_ptr<void>>;
66+
using ResultResponsePairSharedPtr = typename std::shared_ptr<ResultResponseDataPair>;
67+
68+
// Aliases for the CancelResponse ring buffer
69+
using CancelResponse = typename ActionT::Impl::CancelGoalService::Response;
70+
using CancelResponseSharedPtr = typename std::shared_ptr<CancelResponse>;
71+
using CancelResponseDataPair = typename std::pair<uint64_t, CancelResponseSharedPtr>;
72+
using CancelResponseVoidDataPair = typename std::pair<uint64_t, std::shared_ptr<void>>;
73+
using CancelResponsePairSharedPtr = typename std::shared_ptr<CancelResponseDataPair>;
74+
5775
using FeedbackMessage = typename ActionT::Impl::FeedbackMessage;
5876
using FeedbackSharedPtr = typename std::shared_ptr<FeedbackMessage>;
59-
using CancelGoalSharedPtr = typename std::shared_ptr<void>;
6077
using GoalStatusSharedPtr = typename std::shared_ptr<void>;
6178

6279
ActionClientIntraProcess(
6380
rclcpp::Context::SharedPtr context,
6481
const std::string & action_name,
6582
const rcl_action_client_depth_t & qos_history,
6683
ResponseCallback goal_status_callback,
67-
ResponseCallback feedback_callback)
68-
: goal_status_callback_(goal_status_callback),
69-
feedback_callback_(feedback_callback),
70-
ActionClientIntraProcessBase(
84+
ResponseCallback feedback_callback,
85+
std::recursive_mutex & reentrant_mutex)
86+
: ActionClientIntraProcessBase(
7187
context,
7288
action_name,
73-
QoS(qos_history.goal_service_depth))
89+
QoS(qos_history.goal_service_depth),
90+
reentrant_mutex)
7491
{
7592
// Create the intra-process buffers
7693
goal_response_buffer_ =
77-
rclcpp::experimental::create_service_intra_process_buffer<GoalResponseSharedPtr>(
94+
rclcpp::experimental::create_service_intra_process_buffer<GoalResponsePairSharedPtr>(
7895
QoS(qos_history.goal_service_depth));
7996

8097
result_response_buffer_ =
81-
rclcpp::experimental::create_service_intra_process_buffer<ResultResponseSharedPtr>(
98+
rclcpp::experimental::create_service_intra_process_buffer<ResultResponsePairSharedPtr>(
8299
QoS(qos_history.result_service_depth));
83100

84101
status_buffer_ =
@@ -90,8 +107,11 @@ class ActionClientIntraProcess : public ActionClientIntraProcessBase
90107
QoS(qos_history.feedback_topic_depth));
91108

92109
cancel_response_buffer_ =
93-
rclcpp::experimental::create_service_intra_process_buffer<CancelGoalSharedPtr>(
110+
rclcpp::experimental::create_service_intra_process_buffer<CancelResponsePairSharedPtr>(
94111
QoS(qos_history.cancel_service_depth));
112+
113+
set_response_callback_to_event_type(EventType::FeedbackReady, feedback_callback);
114+
set_response_callback_to_event_type(EventType::StatusReady, goal_status_callback);
95115
}
96116

97117
virtual ~ActionClientIntraProcess() = default;
@@ -100,93 +120,109 @@ class ActionClientIntraProcess : public ActionClientIntraProcessBase
100120
{
101121
(void) wait_set;
102122

123+
is_goal_response_ready_ = goal_response_buffer_->has_data();
124+
is_result_response_ready_ = result_response_buffer_->has_data();
125+
is_cancel_response_ready_ = cancel_response_buffer_->has_data();
126+
is_feedback_ready_ = feedback_buffer_->has_data();
127+
is_status_ready_ = status_buffer_->has_data();
128+
103129
return is_feedback_ready_ ||
104130
is_status_ready_ ||
105131
is_goal_response_ready_ ||
106132
is_cancel_response_ready_ ||
107133
is_result_response_ready_;
108134
}
109135

110-
// Store responses callbacks.
111-
// We don't use mutex to protect these callbacks since they
112-
// are called always after they are set.
113-
void store_goal_response_callback(ResponseCallback callback)
136+
137+
void store_goal_response_callback(size_t goal_id, ResponseCallback response_callback)
114138
{
115-
goal_response_callback_ = callback;
139+
set_response_callback_to_event_type(EventType::GoalResponse, response_callback, goal_id);
116140
}
117141

118-
void store_cancel_goal_callback(ResponseCallback callback)
142+
void store_cancel_goal_callback(size_t goal_id, ResponseCallback callback)
119143
{
120-
cancel_goal_callback_ = callback;
144+
set_response_callback_to_event_type(EventType::CancelResponse, callback, goal_id);
121145
}
122146

123-
void store_result_response_callback(ResponseCallback callback)
147+
void store_result_response_callback(size_t goal_id, ResponseCallback callback)
124148
{
125-
result_response_callback_ = callback;
149+
set_response_callback_to_event_type(EventType::ResultResponse, callback, goal_id);
126150
}
127151

128152
// Store responses from server
129-
void store_ipc_action_goal_response(GoalResponseSharedPtr goal_response)
153+
void store_ipc_action_goal_response(
154+
GoalResponseSharedPtr goal_response,
155+
size_t goal_id)
130156
{
131-
goal_response_buffer_->add(std::move(goal_response));
157+
goal_response_buffer_->add(
158+
std::make_shared<GoalResponseDataPair>(
159+
std::make_pair(goal_id, std::move(goal_response))));
160+
132161
gc_.trigger();
133-
is_goal_response_ready_ = true;
134-
invoke_on_ready_callback(EventType::GoalResponse);
162+
invoke_on_ready_callback(EventType::GoalResponse, goal_id);
135163
}
136164

137-
void store_ipc_action_result_response(ResultResponseSharedPtr result_response)
165+
void store_ipc_action_result_response(
166+
ResultResponseSharedPtr result_response,
167+
size_t goal_id)
138168
{
139-
result_response_buffer_->add(std::move(result_response));
169+
result_response_buffer_->add(
170+
std::make_shared<ResultResponseDataPair>(
171+
std::make_pair(goal_id, std::move(result_response))));
172+
140173
gc_.trigger();
141-
is_result_response_ready_ = true;
142-
invoke_on_ready_callback(EventType::ResultResponse);
174+
invoke_on_ready_callback(EventType::ResultResponse, goal_id);
143175
}
144176

145-
void store_ipc_action_cancel_response(CancelGoalSharedPtr cancel_response)
177+
void store_ipc_action_cancel_response(
178+
CancelResponseSharedPtr cancel_response,
179+
size_t goal_id)
146180
{
147-
cancel_response_buffer_->add(std::move(cancel_response));
181+
cancel_response_buffer_->add(
182+
std::make_shared<CancelResponseDataPair>(
183+
std::make_pair(goal_id, std::move(cancel_response))));
184+
148185
gc_.trigger();
149-
is_cancel_response_ready_ = true;
150-
invoke_on_ready_callback(EventType::CancelResponse);
186+
invoke_on_ready_callback(EventType::CancelResponse, goal_id);
151187
}
152188

153189
void store_ipc_action_feedback(FeedbackSharedPtr feedback)
154190
{
155191
feedback_buffer_->add(std::move(feedback));
156192
gc_.trigger();
157-
is_feedback_ready_ = true;
158193
invoke_on_ready_callback(EventType::FeedbackReady);
159194
}
160195

161196
void store_ipc_action_goal_status(GoalStatusSharedPtr status)
162197
{
163198
status_buffer_->add(std::move(status));
164199
gc_.trigger();
165-
is_status_ready_ = true;
166200
invoke_on_ready_callback(EventType::StatusReady);
167201
}
168202

169203
std::shared_ptr<void>
170204
take_data() override
171205
{
206+
std::shared_ptr<void> data;
207+
172208
if (is_goal_response_ready_) {
173-
auto data = std::move(goal_response_buffer_->consume());
174-
return std::static_pointer_cast<void>(data);
175-
} else if (is_result_response_ready_) {
176-
auto data = std::move(result_response_buffer_->consume());
177-
return std::static_pointer_cast<void>(data);
178-
} else if (is_cancel_response_ready_) {
179-
auto data = std::move(cancel_response_buffer_->consume());
180-
return std::static_pointer_cast<void>(data);
181-
} else if (is_feedback_ready_) {
182-
auto data = std::move(feedback_buffer_->consume());
183-
return std::static_pointer_cast<void>(data);
184-
} else if (is_status_ready_) {
185-
auto data = std::move(status_buffer_->consume());
186-
return std::static_pointer_cast<void>(data);
187-
} else {
188-
throw std::runtime_error("Taking data from intra-process action client but nothing is ready");
209+
data = std::move(goal_response_buffer_->consume());
210+
}
211+
else if (is_result_response_ready_) {
212+
data = std::move(result_response_buffer_->consume());
213+
}
214+
else if (is_cancel_response_ready_) {
215+
data = std::move(cancel_response_buffer_->consume());
216+
}
217+
else if (is_feedback_ready_) {
218+
data = std::move(feedback_buffer_->consume());
189219
}
220+
else if (is_status_ready_) {
221+
data = status_buffer_->consume();
222+
}
223+
224+
// Data could be null if there were more events than elements in the buffer
225+
return data;
190226
}
191227

192228
std::shared_ptr<void>
@@ -195,79 +231,91 @@ class ActionClientIntraProcess : public ActionClientIntraProcessBase
195231
// Mark as ready the event type from which we want to take data
196232
switch (static_cast<EventType>(id)) {
197233
case EventType::ResultResponse:
198-
is_result_response_ready_ = true;
234+
is_result_response_ready_ = result_response_buffer_->has_data();
199235
break;
200236
case EventType::CancelResponse:
201-
is_cancel_response_ready_ = true;
237+
is_cancel_response_ready_ = cancel_response_buffer_->has_data();
202238
break;
203239
case EventType::GoalResponse:
204-
is_goal_response_ready_ = true;
240+
is_goal_response_ready_ = goal_response_buffer_->has_data();
205241
break;
206242
case EventType::FeedbackReady:
207-
is_feedback_ready_ = true;
243+
is_feedback_ready_ = feedback_buffer_->has_data();
208244
break;
209245
case EventType::StatusReady:
210-
is_status_ready_ = true;
246+
is_status_ready_ = status_buffer_->has_data();
211247
break;
212248
}
213249

214250
return take_data();
215251
}
216252

217-
218253
void execute(std::shared_ptr<void> & data)
219254
{
220-
// How to handle case when more than one flag is ready?
221-
// For example, feedback and status are both ready, guard condition triggered
222-
// twice, but we process a single entity here.
223-
// On the default executor using a waitset, waitables are checked twice if ready,
224-
// so that fixes the issue. Check if this is a problem with EventsExecutor.
225255
if (!data) {
226-
throw std::runtime_error("'data' is empty");
256+
// This can happen when there were more events than elements in the ring buffer
257+
return;
227258
}
228259

229-
if (is_goal_response_ready_) {
230-
is_goal_response_ready_ = false;
231-
goal_response_callback_(std::move(data));
232-
} else if (is_result_response_ready_) {
233-
is_result_response_ready_ = false;
234-
result_response_callback_(std::move(data));
235-
} else if (is_cancel_response_ready_) {
236-
is_cancel_response_ready_ = false;
237-
cancel_goal_callback_(std::move(data));
238-
} else if (is_feedback_ready_) {
239-
is_feedback_ready_ = false;
240-
feedback_callback_(std::move(data));
241-
} else if (is_status_ready_) {
242-
is_status_ready_ = false;
243-
goal_status_callback_(std::move(data));
244-
} else {
260+
if (is_goal_response_ready_.exchange(false)) {
261+
auto goal_response_pair = std::static_pointer_cast<GoalResponseVoidDataPair>(data);
262+
auto goal_id = goal_response_pair->first;
263+
auto & goal_response = goal_response_pair->second;
264+
265+
call_response_callback_and_erase(
266+
EventType::GoalResponse,
267+
goal_response,
268+
goal_id);
269+
}
270+
else if (is_result_response_ready_.exchange(false)) {
271+
auto result_response_pair = std::static_pointer_cast<ResultResponseVoidDataPair>(data);
272+
auto goal_id = result_response_pair->first;
273+
auto & result_response = result_response_pair->second;
274+
275+
call_response_callback_and_erase(
276+
EventType::ResultResponse,
277+
result_response,
278+
goal_id);
279+
}
280+
else if (is_cancel_response_ready_.exchange(false)) {
281+
auto cancel_response_pair = std::static_pointer_cast<CancelResponseVoidDataPair>(data);
282+
auto goal_id = cancel_response_pair->first;
283+
auto & cancel_response = cancel_response_pair->second;
284+
285+
call_response_callback_and_erase(
286+
EventType::CancelResponse,
287+
cancel_response,
288+
goal_id);
289+
}
290+
else if (is_feedback_ready_.exchange(false)) {
291+
call_response_callback_and_erase(
292+
EventType::FeedbackReady, data, 0, false);
293+
}
294+
else if (is_status_ready_.exchange(false)) {
295+
call_response_callback_and_erase(
296+
EventType::StatusReady, data, 0, false);
297+
}
298+
else {
245299
throw std::runtime_error("Executing intra-process action client but nothing is ready");
246300
}
247301
}
248302

249303
protected:
250-
ResponseCallback goal_response_callback_;
251-
ResponseCallback result_response_callback_;
252-
ResponseCallback cancel_goal_callback_;
253-
ResponseCallback goal_status_callback_;
254-
ResponseCallback feedback_callback_;
255-
256-
// Create buffers to store data coming from server
304+
// Declare buffers to store responses coming from action server
257305
typename rclcpp::experimental::buffers::ServiceIntraProcessBuffer<
258-
GoalResponseSharedPtr>::UniquePtr goal_response_buffer_;
306+
GoalResponsePairSharedPtr>::UniquePtr goal_response_buffer_;
259307

260308
typename rclcpp::experimental::buffers::ServiceIntraProcessBuffer<
261-
ResultResponseSharedPtr>::UniquePtr result_response_buffer_;
309+
ResultResponsePairSharedPtr>::UniquePtr result_response_buffer_;
262310

263311
typename rclcpp::experimental::buffers::ServiceIntraProcessBuffer<
264312
FeedbackSharedPtr>::UniquePtr feedback_buffer_;
265313

266314
rclcpp::experimental::buffers::ServiceIntraProcessBuffer<
267315
GoalStatusSharedPtr>::UniquePtr status_buffer_;
268316

269-
rclcpp::experimental::buffers::ServiceIntraProcessBuffer<
270-
CancelGoalSharedPtr>::UniquePtr cancel_response_buffer_;
317+
typename rclcpp::experimental::buffers::ServiceIntraProcessBuffer<
318+
CancelResponsePairSharedPtr>::UniquePtr cancel_response_buffer_;
271319

272320
std::atomic<bool> is_feedback_ready_{false};
273321
std::atomic<bool> is_status_ready_{false};

0 commit comments

Comments
 (0)