Skip to content

Commit 03bdbb5

Browse files
bpwilcoxskyegalaxy
authored andcommitted
Allow for deferred responses with ipc (#135)
* allow for deferred responses with ipc * fix send response * fix use member service ipc process * add map to store CallbackInfoVariant * add send_response to base class, set function ptr to get handle * move service intra process outside base * copy response into shared pointer for ipc * add typename for service template * remove ref signature * try without std::ref * try without ref wrapper * try emplace * make pair with variant * some cleanup * erase callback info if client invalid * add post_init_setup for services * fix extra comma * add post init setup after lifecycle node services * add documentation for ServiceIntraProcess template change * use weak ptr to service in ServiceIntraProcess * move check for valid service handle to beginning of function * add comment for callback info map (cherry picked from commit aa95a48)
1 parent 2a4495a commit 03bdbb5

File tree

4 files changed

+90
-48
lines changed

4 files changed

+90
-48
lines changed

rclcpp/include/rclcpp/create_service.hpp

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -75,7 +75,8 @@ create_service(
7575

7676
auto serv = Service<ServiceT>::make_shared(
7777
node_base,
78-
service_name, any_service_callback, service_options, ipc_setting);
78+
service_name, any_service_callback, service_options);
79+
serv->post_init_setup(node_base, ipc_setting);
7980
auto serv_base_ptr = std::dynamic_pointer_cast<ServiceBase>(serv);
8081
node_services->add_service(serv_base_ptr, group);
8182
return serv;

rclcpp/include/rclcpp/experimental/service_intra_process.hpp

Lines changed: 55 additions & 25 deletions
Original file line numberDiff line numberDiff line change
@@ -70,11 +70,12 @@ class ServiceIntraProcess : public ServiceIntraProcessBase
7070
using ClientIDtoRequest = std::pair<uint64_t, RequestCallbackPair>;
7171

7272
ServiceIntraProcess(
73+
std::weak_ptr<rclcpp::Service<ServiceT>> service_handle,
7374
AnyServiceCallback<ServiceT> callback,
7475
rclcpp::Context::SharedPtr context,
7576
const std::string & service_name,
7677
const rclcpp::QoS & qos_profile)
77-
: ServiceIntraProcessBase(context, service_name, qos_profile), any_callback_(callback)
78+
: ServiceIntraProcessBase(context, service_name, qos_profile), any_callback_(callback), service_handle_(service_handle)
7879
{
7980
// Create the intra-process buffer.
8081
buffer_ = rclcpp::experimental::create_service_intra_process_buffer<
@@ -107,40 +108,63 @@ class ServiceIntraProcess : public ServiceIntraProcessBase
107108
return std::static_pointer_cast<void>(data);
108109
}
109110

111+
void send_response(uint64_t intra_process_client_id, SharedResponse & response)
112+
{
113+
std::unique_lock<std::recursive_mutex> lock(reentrant_mutex_);
114+
115+
auto client_it = clients_.find(intra_process_client_id);
116+
117+
if (client_it == clients_.end()) {
118+
RCLCPP_WARN(
119+
rclcpp::get_logger("rclcpp"),
120+
"Calling intra_process_service_send_response for invalid or no "
121+
"longer existing client id");
122+
123+
callback_info_.erase(intra_process_client_id);
124+
return;
125+
}
126+
127+
auto client_intra_process_base = client_it->second.lock();
128+
129+
if (client_intra_process_base) {
130+
auto client = std::dynamic_pointer_cast<
131+
rclcpp::experimental::ClientIntraProcess<ServiceT>>(
132+
client_intra_process_base);
133+
CallbackInfoVariant & value = callback_info_[intra_process_client_id];
134+
client->store_intra_process_response(
135+
std::make_pair(std::move(response), std::move(value)));
136+
} else {
137+
clients_.erase(client_it);
138+
}
139+
140+
callback_info_.erase(intra_process_client_id);
141+
}
142+
110143
void execute(const std::shared_ptr<void> & data)
111144
{
145+
auto serv_handle = service_handle_.lock();
146+
147+
// Return if the service handle is no longer valid
148+
if (!serv_handle) {
149+
return;
150+
}
151+
112152
auto ptr = std::static_pointer_cast<ClientIDtoRequest>(data);
113153

114154
uint64_t intra_process_client_id = ptr->first;
115155
SharedRequest & typed_request = ptr->second.first;
116156
CallbackInfoVariant & value = ptr->second.second;
157+
callback_info_.emplace(std::make_pair(intra_process_client_id, std::move(value)));
158+
159+
// To allow for the user callback to handle deferred responses for IPC in an ambiguous way,
160+
// we are overloading the rmw_request_id semantics to provide the intra process client ID.
161+
auto req_id = std::make_shared<rmw_request_id_t>();
162+
req_id->sequence_number = intra_process_client_id;
117163

118-
SharedResponse response = any_callback_.dispatch(nullptr, nullptr, std::move(typed_request));
164+
SharedResponse response = any_callback_.dispatch(serv_handle, req_id, std::move(typed_request));
119165

120166
if (response) {
121-
std::unique_lock<std::recursive_mutex> lock(reentrant_mutex_);
122-
123-
auto client_it = clients_.find(intra_process_client_id);
124-
125-
if (client_it == clients_.end()) {
126-
RCLCPP_WARN(
127-
rclcpp::get_logger("rclcpp"),
128-
"Calling intra_process_service_send_response for invalid or no "
129-
"longer existing client id");
130-
return;
131-
}
132-
133-
auto client_intra_process_base = client_it->second.lock();
134-
135-
if (client_intra_process_base) {
136-
auto client = std::dynamic_pointer_cast<
137-
rclcpp::experimental::ClientIntraProcess<ServiceT>>(
138-
client_intra_process_base);
139-
client->store_intra_process_response(
140-
std::make_pair(std::move(response), std::move(value)));
141-
} else {
142-
clients_.erase(client_it);
143-
}
167+
send_response(intra_process_client_id, response);
144168
}
145169
}
146170

@@ -152,6 +176,12 @@ class ServiceIntraProcess : public ServiceIntraProcessBase
152176
BufferUniquePtr buffer_;
153177

154178
AnyServiceCallback<ServiceT> any_callback_;
179+
180+
std::weak_ptr<rclcpp::Service<ServiceT>> service_handle_;
181+
182+
// Store callback variants in a map to support deferred response
183+
// access by intra-process client id.
184+
std::unordered_map<uint64_t, CallbackInfoVariant> callback_info_;
155185
};
156186

157187
} // namespace experimental

rclcpp/include/rclcpp/service.hpp

Lines changed: 27 additions & 20 deletions
Original file line numberDiff line numberDiff line change
@@ -292,8 +292,6 @@ class ServiceBase
292292
uint64_t intra_process_service_id,
293293
IntraProcessManagerWeakPtr weak_ipm);
294294

295-
std::shared_ptr<rclcpp::experimental::ServiceIntraProcessBase> service_intra_process_;
296-
297295
std::shared_ptr<rcl_node_t> node_handle_;
298296
std::shared_ptr<rclcpp::Context> context_;
299297

@@ -345,14 +343,12 @@ class Service
345343
* \param[in] service_name Name of the topic to publish to.
346344
* \param[in] any_callback User defined callback to call when a client request is received.
347345
* \param[in] service_options options for the subscription.
348-
* \param[in] ipc_setting Intra-process communication setting for the service.
349346
*/
350347
Service(
351348
std::shared_ptr<rclcpp::node_interfaces::NodeBaseInterface> node_base,
352349
const std::string & service_name,
353350
AnyServiceCallback<ServiceT> any_callback,
354-
rcl_service_options_t & service_options,
355-
rclcpp::IntraProcessSetting ipc_setting = rclcpp::IntraProcessSetting::NodeDefault)
351+
rcl_service_options_t & service_options)
356352
: ServiceBase(node_base), any_callback_(any_callback),
357353
srv_type_support_handle_(rosidl_typesupport_cpp::get_service_type_support_handle<ServiceT>())
358354
{
@@ -398,7 +394,14 @@ class Service
398394
#ifndef TRACETOOLS_DISABLED
399395
any_callback_.register_callback_for_tracing();
400396
#endif
397+
// Setup continues in the post construction method, post_init_setup().
398+
}
401399

400+
/// Called post construction, so that construction may continue after shared_from_this() works.
401+
void post_init_setup(
402+
std::shared_ptr<rclcpp::node_interfaces::NodeBaseInterface> node_base,
403+
rclcpp::IntraProcessSetting ipc_setting = rclcpp::IntraProcessSetting::NodeDefault)
404+
{
402405
// Setup intra process if requested.
403406
if (rclcpp::detail::resolve_use_intra_process(ipc_setting, *node_base)) {
404407
create_intra_process_service();
@@ -414,13 +417,11 @@ class Service
414417
* \param[in] node_base NodeBaseInterface pointer that is used in part of the setup.
415418
* \param[in] service_handle service handle.
416419
* \param[in] any_callback User defined callback to call when a client request is received.
417-
* \param[in] ipc_setting Intra-process communication setting for the service.
418420
*/
419421
Service(
420422
std::shared_ptr<rclcpp::node_interfaces::NodeBaseInterface> node_base,
421423
std::shared_ptr<rcl_service_t> service_handle,
422-
AnyServiceCallback<ServiceT> any_callback,
423-
rclcpp::IntraProcessSetting ipc_setting = rclcpp::IntraProcessSetting::NodeDefault)
424+
AnyServiceCallback<ServiceT> any_callback)
424425
: ServiceBase(node_base), any_callback_(any_callback),
425426
srv_type_support_handle_(rosidl_typesupport_cpp::get_service_type_support_handle<ServiceT>())
426427
{
@@ -440,11 +441,7 @@ class Service
440441
#ifndef TRACETOOLS_DISABLED
441442
any_callback_.register_callback_for_tracing();
442443
#endif
443-
444-
// Setup intra process if requested.
445-
if (rclcpp::detail::resolve_use_intra_process(ipc_setting, *node_base)) {
446-
create_intra_process_service();
447-
}
444+
// Setup continues in the post construction method, post_init_setup().
448445
}
449446

450447
/// Default constructor.
@@ -456,13 +453,11 @@ class Service
456453
* \param[in] node_base NodeBaseInterface pointer that is used in part of the setup.
457454
* \param[in] service_handle service handle.
458455
* \param[in] any_callback User defined callback to call when a client request is received.
459-
* \param[in] ipc_setting Intra-process communication setting for the service.
460456
*/
461457
Service(
462458
std::shared_ptr<rclcpp::node_interfaces::NodeBaseInterface> node_base,
463459
rcl_service_t * service_handle,
464-
AnyServiceCallback<ServiceT> any_callback,
465-
rclcpp::IntraProcessSetting ipc_setting = rclcpp::IntraProcessSetting::NodeDefault)
460+
AnyServiceCallback<ServiceT> any_callback)
466461
: ServiceBase(node_base), any_callback_(any_callback),
467462
srv_type_support_handle_(rosidl_typesupport_cpp::get_service_type_support_handle<ServiceT>())
468463
{
@@ -484,10 +479,7 @@ class Service
484479
#ifndef TRACETOOLS_DISABLED
485480
any_callback_.register_callback_for_tracing();
486481
#endif
487-
// Setup intra process if requested.
488-
if (rclcpp::detail::resolve_use_intra_process(ipc_setting, *node_base)) {
489-
create_intra_process_service();
490-
}
482+
// Setup continues in the post construction method, post_init_setup().
491483
}
492484

493485
Service() = delete;
@@ -553,6 +545,14 @@ class Service
553545
void
554546
send_response(rmw_request_id_t & req_id, typename ServiceT::Response & response)
555547
{
548+
if (use_intra_process_)
549+
{
550+
auto intra_response = std::make_shared<typename ServiceT::Response>(response);
551+
// The sequence number here is used as a proxy for the intra-process client ID
552+
service_intra_process_->send_response(req_id.sequence_number, intra_response);
553+
return;
554+
}
555+
556556
rcl_ret_t ret = rcl_send_response(get_service_handle().get(), &req_id, &response);
557557

558558
if (ret == RCL_RET_TIMEOUT) {
@@ -591,6 +591,7 @@ class Service
591591
using ServiceIntraProcessT = rclcpp::experimental::ServiceIntraProcess<ServiceT>;
592592

593593
service_intra_process_ = std::make_shared<ServiceIntraProcessT>(
594+
this->shared_from_this(),
594595
any_callback_,
595596
context_,
596597
this->get_service_name(),
@@ -635,6 +636,12 @@ class Service
635636
AnyServiceCallback<ServiceT> any_callback_;
636637

637638
const rosidl_service_type_support_t * srv_type_support_handle_;
639+
640+
// In order to mirror the send_response signature with a SharedResponse
641+
// of the appropriate ServiceT type, the template class is stored
642+
// as opposed to the base class which has no knowledge of ServiceT.
643+
std::shared_ptr<rclcpp::experimental::ServiceIntraProcess<ServiceT>> service_intra_process_;
644+
638645
};
639646

640647
} // namespace rclcpp

rclcpp_lifecycle/src/lifecycle_node_interface_impl.cpp

Lines changed: 6 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -128,8 +128,8 @@ LifecycleNode::LifecycleNodeInterfaceImpl::init(bool enable_communication_interf
128128
srv_change_state_ = std::make_shared<rclcpp::Service<ChangeStateSrv>>(
129129
node_base_interface_,
130130
&state_machine_.com_interface.srv_change_state,
131-
any_cb,
132-
ipc_setting);
131+
any_cb);
132+
srv_change_state_->post_init_setup(node_base_interface_, ipc_setting);
133133
node_services_interface_->add_service(
134134
std::dynamic_pointer_cast<rclcpp::ServiceBase>(srv_change_state_),
135135
nullptr);
@@ -146,6 +146,7 @@ LifecycleNode::LifecycleNodeInterfaceImpl::init(bool enable_communication_interf
146146
node_base_interface_,
147147
&state_machine_.com_interface.srv_get_state,
148148
any_cb);
149+
srv_get_state_->post_init_setup(node_base_interface_, ipc_setting);
149150
node_services_interface_->add_service(
150151
std::dynamic_pointer_cast<rclcpp::ServiceBase>(srv_get_state_),
151152
nullptr);
@@ -162,6 +163,7 @@ LifecycleNode::LifecycleNodeInterfaceImpl::init(bool enable_communication_interf
162163
node_base_interface_,
163164
&state_machine_.com_interface.srv_get_available_states,
164165
any_cb);
166+
srv_get_available_states_->post_init_setup(node_base_interface_, ipc_setting);
165167
node_services_interface_->add_service(
166168
std::dynamic_pointer_cast<rclcpp::ServiceBase>(srv_get_available_states_),
167169
nullptr);
@@ -179,6 +181,7 @@ LifecycleNode::LifecycleNodeInterfaceImpl::init(bool enable_communication_interf
179181
node_base_interface_,
180182
&state_machine_.com_interface.srv_get_available_transitions,
181183
any_cb);
184+
srv_get_available_transitions_->post_init_setup(node_base_interface_, ipc_setting);
182185
node_services_interface_->add_service(
183186
std::dynamic_pointer_cast<rclcpp::ServiceBase>(srv_get_available_transitions_),
184187
nullptr);
@@ -196,6 +199,7 @@ LifecycleNode::LifecycleNodeInterfaceImpl::init(bool enable_communication_interf
196199
node_base_interface_,
197200
&state_machine_.com_interface.srv_get_transition_graph,
198201
any_cb);
202+
srv_get_transition_graph_->post_init_setup(node_base_interface_, ipc_setting);
199203
node_services_interface_->add_service(
200204
std::dynamic_pointer_cast<rclcpp::ServiceBase>(srv_get_transition_graph_),
201205
nullptr);

0 commit comments

Comments
 (0)