Skip to content

Commit d2b4bce

Browse files
committed
Commit: aeacde9
Parents: 58d2a04 Author: Mauro Passerino <[email protected]> Author Date: Mon Dec 20 2021 11:50:38 GMT-0800 (Pacific Standard Time) Committer: Alexis Pojomovsky <[email protected]> Committer Date: Fri Jun 21 2024 13:41:50 GMT-0700 (Pacific Daylight Time) Add action client/server IPC support Signed-off-by: Mauro Passerino <[email protected]>
1 parent 1b76b77 commit d2b4bce

17 files changed

+2454
-36
lines changed

rclcpp/CMakeLists.txt

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -39,6 +39,8 @@ if(CMAKE_COMPILER_IS_GNUCXX OR CMAKE_CXX_COMPILER_ID MATCHES "Clang")
3939
endif()
4040

4141
set(${PROJECT_NAME}_SRCS
42+
src/rclcpp/action_client_intra_process_base.cpp
43+
src/rclcpp/action_server_intra_process_base.cpp
4244
src/rclcpp/any_executable.cpp
4345
src/rclcpp/callback_group.cpp
4446
src/rclcpp/client.cpp
Lines changed: 282 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,282 @@
1+
// Copyright 2022 Open Source Robotics Foundation, Inc.
2+
//
3+
// Licensed under the Apache License, Version 2.0 (the "License");
4+
// you may not use this file except in compliance with the License.
5+
// You may obtain a copy of the License at
6+
//
7+
// http://www.apache.org/licenses/LICENSE-2.0
8+
//
9+
// Unless required by applicable law or agreed to in writing, software
10+
// distributed under the License is distributed on an "AS IS" BASIS,
11+
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12+
// See the License for the specific language governing permissions and
13+
// limitations under the License.
14+
15+
#ifndef RCLCPP__EXPERIMENTAL__ACTION_CLIENT_INTRA_PROCESS_HPP_
16+
#define RCLCPP__EXPERIMENTAL__ACTION_CLIENT_INTRA_PROCESS_HPP_
17+
18+
#include <functional>
19+
#include <future>
20+
#include <memory>
21+
#include <string>
22+
#include <tuple>
23+
#include <utility>
24+
#include <variant> // NOLINT, cpplint doesn't think this is a cpp std header
25+
26+
#include "rcutils/logging_macros.h"
27+
#include "rclcpp/experimental/buffers/intra_process_buffer.hpp"
28+
#include "rclcpp/experimental/create_intra_process_buffer.hpp"
29+
#include "rclcpp/experimental/action_client_intra_process_base.hpp"
30+
31+
typedef struct rcl_action_client_depth_s
32+
{
33+
size_t goal_service_depth;
34+
size_t result_service_depth;
35+
size_t cancel_service_depth;
36+
size_t feedback_topic_depth;
37+
size_t status_topic_depth;
38+
} rcl_action_client_depth_t;
39+
40+
namespace rclcpp
41+
{
42+
namespace experimental
43+
{
44+
45+
template<typename ActionT>
46+
class ActionClientIntraProcess : public ActionClientIntraProcessBase
47+
{
48+
public:
49+
RCLCPP_SMART_PTR_DEFINITIONS(ActionClientIntraProcess)
50+
51+
// Useful aliases for the action client data types
52+
using ResponseCallback = std::function<void (std::shared_ptr<void>)>;
53+
using GoalResponse = typename ActionT::Impl::SendGoalService::Response;
54+
using GoalResponseSharedPtr = typename std::shared_ptr<GoalResponse>;
55+
using ResultResponse = typename ActionT::Impl::GetResultService::Response;
56+
using ResultResponseSharedPtr = typename std::shared_ptr<ResultResponse>;
57+
using FeedbackMessage = typename ActionT::Impl::FeedbackMessage;
58+
using FeedbackSharedPtr = typename std::shared_ptr<FeedbackMessage>;
59+
using CancelGoalSharedPtr = typename std::shared_ptr<void>;
60+
using GoalStatusSharedPtr = typename std::shared_ptr<void>;
61+
62+
ActionClientIntraProcess(
63+
rclcpp::Context::SharedPtr context,
64+
const std::string & action_name,
65+
const rcl_action_client_depth_t & qos_history,
66+
ResponseCallback goal_status_callback,
67+
ResponseCallback feedback_callback)
68+
: goal_status_callback_(goal_status_callback),
69+
feedback_callback_(feedback_callback),
70+
ActionClientIntraProcessBase(
71+
context,
72+
action_name,
73+
QoS(qos_history.goal_service_depth))
74+
{
75+
// Create the intra-process buffers
76+
goal_response_buffer_ =
77+
rclcpp::experimental::create_service_intra_process_buffer<GoalResponseSharedPtr>(
78+
QoS(qos_history.goal_service_depth));
79+
80+
result_response_buffer_ =
81+
rclcpp::experimental::create_service_intra_process_buffer<ResultResponseSharedPtr>(
82+
QoS(qos_history.result_service_depth));
83+
84+
status_buffer_ =
85+
rclcpp::experimental::create_service_intra_process_buffer<GoalStatusSharedPtr>(
86+
QoS(qos_history.status_topic_depth));
87+
88+
feedback_buffer_ =
89+
rclcpp::experimental::create_service_intra_process_buffer<FeedbackSharedPtr>(
90+
QoS(qos_history.feedback_topic_depth));
91+
92+
cancel_response_buffer_ =
93+
rclcpp::experimental::create_service_intra_process_buffer<CancelGoalSharedPtr>(
94+
QoS(qos_history.cancel_service_depth));
95+
}
96+
97+
virtual ~ActionClientIntraProcess() = default;
98+
99+
bool is_ready(const rcl_wait_set_t & wait_set)
100+
{
101+
(void) wait_set;
102+
103+
return is_feedback_ready_ ||
104+
is_status_ready_ ||
105+
is_goal_response_ready_ ||
106+
is_cancel_response_ready_ ||
107+
is_result_response_ready_;
108+
}
109+
110+
// Store responses callbacks.
111+
// We don't use mutex to protect these callbacks since they
112+
// are called always after they are set.
113+
void store_goal_response_callback(ResponseCallback callback)
114+
{
115+
goal_response_callback_ = callback;
116+
}
117+
118+
void store_cancel_goal_callback(ResponseCallback callback)
119+
{
120+
cancel_goal_callback_ = callback;
121+
}
122+
123+
void store_result_response_callback(ResponseCallback callback)
124+
{
125+
result_response_callback_ = callback;
126+
}
127+
128+
// Store responses from server
129+
void store_ipc_action_goal_response(GoalResponseSharedPtr goal_response)
130+
{
131+
goal_response_buffer_->add(std::move(goal_response));
132+
gc_.trigger();
133+
is_goal_response_ready_ = true;
134+
invoke_on_ready_callback(EventType::GoalResponse);
135+
}
136+
137+
void store_ipc_action_result_response(ResultResponseSharedPtr result_response)
138+
{
139+
result_response_buffer_->add(std::move(result_response));
140+
gc_.trigger();
141+
is_result_response_ready_ = true;
142+
invoke_on_ready_callback(EventType::ResultResponse);
143+
}
144+
145+
void store_ipc_action_cancel_response(CancelGoalSharedPtr cancel_response)
146+
{
147+
cancel_response_buffer_->add(std::move(cancel_response));
148+
gc_.trigger();
149+
is_cancel_response_ready_ = true;
150+
invoke_on_ready_callback(EventType::CancelResponse);
151+
}
152+
153+
void store_ipc_action_feedback(FeedbackSharedPtr feedback)
154+
{
155+
feedback_buffer_->add(std::move(feedback));
156+
gc_.trigger();
157+
is_feedback_ready_ = true;
158+
invoke_on_ready_callback(EventType::FeedbackReady);
159+
}
160+
161+
void store_ipc_action_goal_status(GoalStatusSharedPtr status)
162+
{
163+
status_buffer_->add(std::move(status));
164+
gc_.trigger();
165+
is_status_ready_ = true;
166+
invoke_on_ready_callback(EventType::StatusReady);
167+
}
168+
169+
std::shared_ptr<void>
170+
take_data() override
171+
{
172+
if (is_goal_response_ready_) {
173+
auto data = std::move(goal_response_buffer_->consume());
174+
return std::static_pointer_cast<void>(data);
175+
} else if (is_result_response_ready_) {
176+
auto data = std::move(result_response_buffer_->consume());
177+
return std::static_pointer_cast<void>(data);
178+
} else if (is_cancel_response_ready_) {
179+
auto data = std::move(cancel_response_buffer_->consume());
180+
return std::static_pointer_cast<void>(data);
181+
} else if (is_feedback_ready_) {
182+
auto data = std::move(feedback_buffer_->consume());
183+
return std::static_pointer_cast<void>(data);
184+
} else if (is_status_ready_) {
185+
auto data = std::move(status_buffer_->consume());
186+
return std::static_pointer_cast<void>(data);
187+
} else {
188+
throw std::runtime_error("Taking data from intra-process action client but nothing is ready");
189+
}
190+
}
191+
192+
std::shared_ptr<void>
193+
take_data_by_entity_id(size_t id) override
194+
{
195+
// Mark as ready the event type from which we want to take data
196+
switch (static_cast<EventType>(id)) {
197+
case EventType::ResultResponse:
198+
is_result_response_ready_ = true;
199+
break;
200+
case EventType::CancelResponse:
201+
is_cancel_response_ready_ = true;
202+
break;
203+
case EventType::GoalResponse:
204+
is_goal_response_ready_ = true;
205+
break;
206+
case EventType::FeedbackReady:
207+
is_feedback_ready_ = true;
208+
break;
209+
case EventType::StatusReady:
210+
is_status_ready_ = true;
211+
break;
212+
}
213+
214+
return take_data();
215+
}
216+
217+
218+
void execute(std::shared_ptr<void> & data)
219+
{
220+
// How to handle case when more than one flag is ready?
221+
// For example, feedback and status are both ready, guard condition triggered
222+
// twice, but we process a single entity here.
223+
// On the default executor using a waitset, waitables are checked twice if ready,
224+
// so that fixes the issue. Check if this is a problem with EventsExecutor.
225+
if (!data) {
226+
throw std::runtime_error("'data' is empty");
227+
}
228+
229+
if (is_goal_response_ready_) {
230+
is_goal_response_ready_ = false;
231+
goal_response_callback_(std::move(data));
232+
} else if (is_result_response_ready_) {
233+
is_result_response_ready_ = false;
234+
result_response_callback_(std::move(data));
235+
} else if (is_cancel_response_ready_) {
236+
is_cancel_response_ready_ = false;
237+
cancel_goal_callback_(std::move(data));
238+
} else if (is_feedback_ready_) {
239+
is_feedback_ready_ = false;
240+
feedback_callback_(std::move(data));
241+
} else if (is_status_ready_) {
242+
is_status_ready_ = false;
243+
goal_status_callback_(std::move(data));
244+
} else {
245+
throw std::runtime_error("Executing intra-process action client but nothing is ready");
246+
}
247+
}
248+
249+
protected:
250+
ResponseCallback goal_response_callback_;
251+
ResponseCallback result_response_callback_;
252+
ResponseCallback cancel_goal_callback_;
253+
ResponseCallback goal_status_callback_;
254+
ResponseCallback feedback_callback_;
255+
256+
// Create buffers to store data coming from server
257+
typename rclcpp::experimental::buffers::ServiceIntraProcessBuffer<
258+
GoalResponseSharedPtr>::UniquePtr goal_response_buffer_;
259+
260+
typename rclcpp::experimental::buffers::ServiceIntraProcessBuffer<
261+
ResultResponseSharedPtr>::UniquePtr result_response_buffer_;
262+
263+
typename rclcpp::experimental::buffers::ServiceIntraProcessBuffer<
264+
FeedbackSharedPtr>::UniquePtr feedback_buffer_;
265+
266+
rclcpp::experimental::buffers::ServiceIntraProcessBuffer<
267+
GoalStatusSharedPtr>::UniquePtr status_buffer_;
268+
269+
rclcpp::experimental::buffers::ServiceIntraProcessBuffer<
270+
CancelGoalSharedPtr>::UniquePtr cancel_response_buffer_;
271+
272+
std::atomic<bool> is_feedback_ready_{false};
273+
std::atomic<bool> is_status_ready_{false};
274+
std::atomic<bool> is_goal_response_ready_{false};
275+
std::atomic<bool> is_cancel_response_ready_{false};
276+
std::atomic<bool> is_result_response_ready_{false};
277+
};
278+
279+
} // namespace experimental
280+
} // namespace rclcpp
281+
282+
#endif // RCLCPP__EXPERIMENTAL__ACTION_CLIENT_INTRA_PROCESS_HPP_

0 commit comments

Comments
 (0)