Skip to content

Commit fdc4a39

Browse files
committed
RECREATION OF Fixes for intra-process actions (#144)
action client / server ipc decrustification
1 parent e78215e commit fdc4a39

File tree

12 files changed

+456
-465
lines changed

12 files changed

+456
-465
lines changed

rclcpp/include/rclcpp/experimental/action_client_intra_process.hpp

Lines changed: 121 additions & 82 deletions
Original file line numberDiff line numberDiff line change
@@ -50,35 +50,52 @@ class ActionClientIntraProcess : public ActionClientIntraProcessBase
5050

5151
// Useful aliases for the action client data types
5252
using ResponseCallback = std::function<void (std::shared_ptr<void>)>;
53+
54+
// Aliases for the GoalResponse ring buffer
5355
using GoalResponse = typename ActionT::Impl::SendGoalService::Response;
5456
using GoalResponseSharedPtr = typename std::shared_ptr<GoalResponse>;
57+
using GoalResponseDataPair = typename std::pair<uint64_t, GoalResponseSharedPtr>;
58+
using GoalResponseVoidDataPair = typename std::pair<uint64_t, std::shared_ptr<void>>;
59+
using GoalResponsePairSharedPtr = typename std::shared_ptr<GoalResponseDataPair>;
60+
61+
// Aliases for the ResultResponse ring buffer
5562
using ResultResponse = typename ActionT::Impl::GetResultService::Response;
5663
using ResultResponseSharedPtr = typename std::shared_ptr<ResultResponse>;
64+
using ResultResponseDataPair = typename std::pair<uint64_t, ResultResponseSharedPtr>;
65+
using ResultResponseVoidDataPair = typename std::pair<uint64_t, std::shared_ptr<void>>;
66+
using ResultResponsePairSharedPtr = typename std::shared_ptr<ResultResponseDataPair>;
67+
68+
// Aliases for the CancelResponse ring buffer
69+
using CancelResponse = typename ActionT::Impl::CancelGoalService::Response;
70+
using CancelResponseSharedPtr = typename std::shared_ptr<CancelResponse>;
71+
using CancelResponseDataPair = typename std::pair<uint64_t, CancelResponseSharedPtr>;
72+
using CancelResponseVoidDataPair = typename std::pair<uint64_t, std::shared_ptr<void>>;
73+
using CancelResponsePairSharedPtr = typename std::shared_ptr<CancelResponseDataPair>;
74+
5775
using FeedbackMessage = typename ActionT::Impl::FeedbackMessage;
5876
using FeedbackSharedPtr = typename std::shared_ptr<FeedbackMessage>;
59-
using CancelGoalSharedPtr = typename std::shared_ptr<void>;
6077
using GoalStatusSharedPtr = typename std::shared_ptr<void>;
6178

6279
ActionClientIntraProcess(
6380
rclcpp::Context::SharedPtr context,
6481
const std::string & action_name,
6582
const rcl_action_client_depth_t & qos_history,
6683
ResponseCallback goal_status_callback,
67-
ResponseCallback feedback_callback)
68-
: goal_status_callback_(goal_status_callback),
69-
feedback_callback_(feedback_callback),
70-
ActionClientIntraProcessBase(
84+
ResponseCallback feedback_callback,
85+
std::recursive_mutex & reentrant_mutex)
86+
: ActionClientIntraProcessBase(
7187
context,
7288
action_name,
73-
QoS(qos_history.goal_service_depth))
89+
QoS(qos_history.goal_service_depth),
90+
reentrant_mutex)
7491
{
7592
// Create the intra-process buffers
7693
goal_response_buffer_ =
77-
rclcpp::experimental::create_service_intra_process_buffer<GoalResponseSharedPtr>(
94+
rclcpp::experimental::create_service_intra_process_buffer<GoalResponsePairSharedPtr>(
7895
QoS(qos_history.goal_service_depth));
7996

8097
result_response_buffer_ =
81-
rclcpp::experimental::create_service_intra_process_buffer<ResultResponseSharedPtr>(
98+
rclcpp::experimental::create_service_intra_process_buffer<ResultResponsePairSharedPtr>(
8299
QoS(qos_history.result_service_depth));
83100

84101
status_buffer_ =
@@ -90,8 +107,11 @@ class ActionClientIntraProcess : public ActionClientIntraProcessBase
90107
QoS(qos_history.feedback_topic_depth));
91108

92109
cancel_response_buffer_ =
93-
rclcpp::experimental::create_service_intra_process_buffer<CancelGoalSharedPtr>(
110+
rclcpp::experimental::create_service_intra_process_buffer<CancelResponsePairSharedPtr>(
94111
QoS(qos_history.cancel_service_depth));
112+
113+
set_response_callback_to_event_type(EventType::FeedbackReady, feedback_callback);
114+
set_response_callback_to_event_type(EventType::StatusReady, goal_status_callback);
95115
}
96116

97117
virtual ~ActionClientIntraProcess() = default;
@@ -100,93 +120,105 @@ class ActionClientIntraProcess : public ActionClientIntraProcessBase
100120
{
101121
(void) wait_set;
102122

123+
is_goal_response_ready_ = goal_response_buffer_->has_data();
124+
is_result_response_ready_ = result_response_buffer_->has_data();
125+
is_cancel_response_ready_ = cancel_response_buffer_->has_data();
126+
is_feedback_ready_ = feedback_buffer_->has_data();
127+
is_status_ready_ = status_buffer_->has_data();
128+
103129
return is_feedback_ready_ ||
104130
is_status_ready_ ||
105131
is_goal_response_ready_ ||
106132
is_cancel_response_ready_ ||
107133
is_result_response_ready_;
108134
}
109135

110-
// Store responses callbacks.
111-
// We don't use mutex to protect these callbacks since they
112-
// are called always after they are set.
113-
void store_goal_response_callback(ResponseCallback callback)
136+
137+
void store_goal_response_callback(size_t goal_id, ResponseCallback response_callback)
114138
{
115-
goal_response_callback_ = callback;
139+
set_response_callback_to_event_type(EventType::GoalResponse, response_callback, goal_id);
116140
}
117141

118-
void store_cancel_goal_callback(ResponseCallback callback)
142+
void store_cancel_goal_callback(size_t goal_id, ResponseCallback callback)
119143
{
120-
cancel_goal_callback_ = callback;
144+
set_response_callback_to_event_type(EventType::CancelResponse, callback, goal_id);
121145
}
122146

123-
void store_result_response_callback(ResponseCallback callback)
147+
void store_result_response_callback(size_t goal_id, ResponseCallback callback)
124148
{
125-
result_response_callback_ = callback;
149+
set_response_callback_to_event_type(EventType::ResultResponse, callback, goal_id);
126150
}
127151

128152
// Store responses from server
129-
void store_ipc_action_goal_response(GoalResponseSharedPtr goal_response)
153+
void store_ipc_action_goal_response(
154+
GoalResponseSharedPtr goal_response,
155+
size_t goal_id)
130156
{
131-
goal_response_buffer_->add(std::move(goal_response));
157+
goal_response_buffer_->add(
158+
std::make_shared<GoalResponseDataPair>(
159+
std::make_pair(goal_id, std::move(goal_response))));
160+
132161
gc_.trigger();
133-
is_goal_response_ready_ = true;
134-
invoke_on_ready_callback(EventType::GoalResponse);
162+
invoke_on_ready_callback(EventType::GoalResponse, goal_id);
135163
}
136164

137-
void store_ipc_action_result_response(ResultResponseSharedPtr result_response)
165+
void store_ipc_action_result_response(
166+
ResultResponseSharedPtr result_response,
167+
size_t goal_id)
138168
{
139-
result_response_buffer_->add(std::move(result_response));
169+
result_response_buffer_->add(
170+
std::make_shared<ResultResponseDataPair>(
171+
std::make_pair(goal_id, std::move(result_response))));
172+
140173
gc_.trigger();
141-
is_result_response_ready_ = true;
142-
invoke_on_ready_callback(EventType::ResultResponse);
174+
invoke_on_ready_callback(EventType::ResultResponse, goal_id);
143175
}
144176

145-
void store_ipc_action_cancel_response(CancelGoalSharedPtr cancel_response)
177+
void store_ipc_action_cancel_response(
178+
CancelResponseSharedPtr cancel_response,
179+
size_t goal_id)
146180
{
147-
cancel_response_buffer_->add(std::move(cancel_response));
181+
cancel_response_buffer_->add(
182+
std::make_shared<CancelResponseDataPair>(
183+
std::make_pair(goal_id, std::move(cancel_response))));
184+
148185
gc_.trigger();
149-
is_cancel_response_ready_ = true;
150-
invoke_on_ready_callback(EventType::CancelResponse);
186+
invoke_on_ready_callback(EventType::CancelResponse, goal_id);
151187
}
152188

153189
void store_ipc_action_feedback(FeedbackSharedPtr feedback)
154190
{
155191
feedback_buffer_->add(std::move(feedback));
156192
gc_.trigger();
157-
is_feedback_ready_ = true;
158193
invoke_on_ready_callback(EventType::FeedbackReady);
159194
}
160195

161196
void store_ipc_action_goal_status(GoalStatusSharedPtr status)
162197
{
163198
status_buffer_->add(std::move(status));
164199
gc_.trigger();
165-
is_status_ready_ = true;
166200
invoke_on_ready_callback(EventType::StatusReady);
167201
}
168202

169203
std::shared_ptr<void>
170204
take_data() override
171205
{
206+
std::shared_ptr<void> data;
207+
172208
if (is_goal_response_ready_) {
173-
auto data = std::move(goal_response_buffer_->consume());
174-
return std::static_pointer_cast<void>(data);
209+
data = std::move(goal_response_buffer_->consume());
175210
} else if (is_result_response_ready_) {
176-
auto data = std::move(result_response_buffer_->consume());
177-
return std::static_pointer_cast<void>(data);
211+
data = std::move(result_response_buffer_->consume());
178212
} else if (is_cancel_response_ready_) {
179-
auto data = std::move(cancel_response_buffer_->consume());
180-
return std::static_pointer_cast<void>(data);
213+
data = std::move(cancel_response_buffer_->consume());
181214
} else if (is_feedback_ready_) {
182-
auto data = std::move(feedback_buffer_->consume());
183-
return std::static_pointer_cast<void>(data);
215+
data = std::move(feedback_buffer_->consume());
184216
} else if (is_status_ready_) {
185-
auto data = std::move(status_buffer_->consume());
186-
return std::static_pointer_cast<void>(data);
187-
} else {
188-
throw std::runtime_error("Taking data from intra-process action client but nothing is ready");
217+
data = status_buffer_->consume();
189218
}
219+
220+
// Data could be null if there were more events than elements in the buffer
221+
return data;
190222
}
191223

192224
std::shared_ptr<void>
@@ -195,79 +227,86 @@ class ActionClientIntraProcess : public ActionClientIntraProcessBase
195227
// Mark as ready the event type from which we want to take data
196228
switch (static_cast<EventType>(id)) {
197229
case EventType::ResultResponse:
198-
is_result_response_ready_ = true;
230+
is_result_response_ready_ = result_response_buffer_->has_data();
199231
break;
200232
case EventType::CancelResponse:
201-
is_cancel_response_ready_ = true;
233+
is_cancel_response_ready_ = cancel_response_buffer_->has_data();
202234
break;
203235
case EventType::GoalResponse:
204-
is_goal_response_ready_ = true;
236+
is_goal_response_ready_ = goal_response_buffer_->has_data();
205237
break;
206238
case EventType::FeedbackReady:
207-
is_feedback_ready_ = true;
239+
is_feedback_ready_ = feedback_buffer_->has_data();
208240
break;
209241
case EventType::StatusReady:
210-
is_status_ready_ = true;
242+
is_status_ready_ = status_buffer_->has_data();
211243
break;
212244
}
213245

214246
return take_data();
215247
}
216248

217-
218249
void execute(std::shared_ptr<void> & data)
219250
{
220-
// How to handle case when more than one flag is ready?
221-
// For example, feedback and status are both ready, guard condition triggered
222-
// twice, but we process a single entity here.
223-
// On the default executor using a waitset, waitables are checked twice if ready,
224-
// so that fixes the issue. Check if this is a problem with EventsExecutor.
225251
if (!data) {
226-
throw std::runtime_error("'data' is empty");
252+
// This can happen when there were more events than elements in the ring buffer
253+
return;
227254
}
228255

229-
if (is_goal_response_ready_) {
230-
is_goal_response_ready_ = false;
231-
goal_response_callback_(std::move(data));
232-
} else if (is_result_response_ready_) {
233-
is_result_response_ready_ = false;
234-
result_response_callback_(std::move(data));
235-
} else if (is_cancel_response_ready_) {
236-
is_cancel_response_ready_ = false;
237-
cancel_goal_callback_(std::move(data));
238-
} else if (is_feedback_ready_) {
239-
is_feedback_ready_ = false;
240-
feedback_callback_(std::move(data));
241-
} else if (is_status_ready_) {
242-
is_status_ready_ = false;
243-
goal_status_callback_(std::move(data));
256+
if (is_goal_response_ready_.exchange(false)) {
257+
auto goal_response_pair = std::static_pointer_cast<GoalResponseVoidDataPair>(data);
258+
auto goal_id = goal_response_pair->first;
259+
auto & goal_response = goal_response_pair->second;
260+
261+
call_response_callback_and_erase(
262+
EventType::GoalResponse,
263+
goal_response,
264+
goal_id);
265+
} else if (is_result_response_ready_.exchange(false)) {
266+
auto result_response_pair = std::static_pointer_cast<ResultResponseVoidDataPair>(data);
267+
auto goal_id = result_response_pair->first;
268+
auto & result_response = result_response_pair->second;
269+
270+
call_response_callback_and_erase(
271+
EventType::ResultResponse,
272+
result_response,
273+
goal_id);
274+
} else if (is_cancel_response_ready_.exchange(false)) {
275+
auto cancel_response_pair = std::static_pointer_cast<CancelResponseVoidDataPair>(data);
276+
auto goal_id = cancel_response_pair->first;
277+
auto & cancel_response = cancel_response_pair->second;
278+
279+
call_response_callback_and_erase(
280+
EventType::CancelResponse,
281+
cancel_response,
282+
goal_id);
283+
} else if (is_feedback_ready_.exchange(false)) {
284+
call_response_callback_and_erase(
285+
EventType::FeedbackReady, data, 0, false);
286+
} else if (is_status_ready_.exchange(false)) {
287+
call_response_callback_and_erase(
288+
EventType::StatusReady, data, 0, false);
244289
} else {
245290
throw std::runtime_error("Executing intra-process action client but nothing is ready");
246291
}
247292
}
248293

249294
protected:
250-
ResponseCallback goal_response_callback_;
251-
ResponseCallback result_response_callback_;
252-
ResponseCallback cancel_goal_callback_;
253-
ResponseCallback goal_status_callback_;
254-
ResponseCallback feedback_callback_;
255-
256-
// Create buffers to store data coming from server
295+
// Declare buffers to store responses coming from action server
257296
typename rclcpp::experimental::buffers::ServiceIntraProcessBuffer<
258-
GoalResponseSharedPtr>::UniquePtr goal_response_buffer_;
297+
GoalResponsePairSharedPtr>::UniquePtr goal_response_buffer_;
259298

260299
typename rclcpp::experimental::buffers::ServiceIntraProcessBuffer<
261-
ResultResponseSharedPtr>::UniquePtr result_response_buffer_;
300+
ResultResponsePairSharedPtr>::UniquePtr result_response_buffer_;
262301

263302
typename rclcpp::experimental::buffers::ServiceIntraProcessBuffer<
264303
FeedbackSharedPtr>::UniquePtr feedback_buffer_;
265304

266305
rclcpp::experimental::buffers::ServiceIntraProcessBuffer<
267306
GoalStatusSharedPtr>::UniquePtr status_buffer_;
268307

269-
rclcpp::experimental::buffers::ServiceIntraProcessBuffer<
270-
CancelGoalSharedPtr>::UniquePtr cancel_response_buffer_;
308+
typename rclcpp::experimental::buffers::ServiceIntraProcessBuffer<
309+
CancelResponsePairSharedPtr>::UniquePtr cancel_response_buffer_;
271310

272311
std::atomic<bool> is_feedback_ready_{false};
273312
std::atomic<bool> is_status_ready_{false};

0 commit comments

Comments
 (0)