Skip to content

Commit 96fb6b4

Browse files
author
Alberto Soragna
committed
shortcircuit events executor callback invocation in rclcpp for ipc subscriptions
1 parent 53aa9fd commit 96fb6b4

File tree

9 files changed

+60
-22
lines changed

9 files changed

+60
-22
lines changed

rclcpp/include/rclcpp/executors/events_executor_notify_waitable.hpp

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -59,7 +59,7 @@ class EventsExecutorNotifyWaitable final : public EventWaitable
5959
void
6060
set_events_executor_callback(
6161
const rclcpp::executors::EventsExecutor * executor,
62-
rmw_listener_cb_t executor_callback) const override
62+
rmw_listener_cb_t executor_callback) override
6363
{
6464
for (auto gc : notify_guard_conditions_) {
6565
rcl_ret_t ret = rcl_guard_condition_set_listener_callback(

rclcpp/include/rclcpp/experimental/buffers/ring_buffer_implementation.hpp

Lines changed: 8 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -101,7 +101,14 @@ class RingBufferImplementation : public BufferImplementationBase<BufferT>
101101
return size_ == capacity_;
102102
}
103103

104-
void clear() {}
104+
void clear()
105+
{
106+
std::lock_guard<std::mutex> lock(mutex_);
107+
ring_buffer_ = std::vector<BufferT>(capacity_);
108+
write_index_ = capacity_ - 1;
109+
read_index_ = 0;
110+
size_ = 0;
111+
}
105112

106113
private:
107114
size_t capacity_;

rclcpp/include/rclcpp/experimental/subscription_intra_process.hpp

Lines changed: 37 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -118,14 +118,26 @@ class SubscriptionIntraProcess : public SubscriptionIntraProcessBase
118118
provide_intra_process_message(ConstMessageSharedPtr message)
119119
{
120120
buffer_->add_shared(std::move(message));
121-
trigger_guard_condition();
121+
122+
std::lock_guard<std::mutex> lock(executor_callback_mutex_);
123+
if (executor_callback_) {
124+
invoke_executor_callback();
125+
} else {
126+
trigger_guard_condition();
127+
}
122128
}
123129

124130
void
125131
provide_intra_process_message(MessageUniquePtr message)
126132
{
127133
buffer_->add_unique(std::move(message));
128-
trigger_guard_condition();
134+
135+
std::lock_guard<std::mutex> lock(executor_callback_mutex_);
136+
if (executor_callback_) {
137+
invoke_executor_callback();
138+
} else {
139+
trigger_guard_condition();
140+
}
129141
}
130142

131143
bool
@@ -134,6 +146,18 @@ class SubscriptionIntraProcess : public SubscriptionIntraProcessBase
134146
return buffer_->use_take_shared_method();
135147
}
136148

149+
void
150+
set_events_executor_callback(
151+
const rclcpp::executors::EventsExecutor * executor,
152+
rmw_listener_cb_t executor_callback) override
153+
{
154+
std::lock_guard<std::mutex> lock(executor_callback_mutex_);
155+
executor_ = executor;
156+
executor_callback_ = executor_callback;
157+
// Buffer must be cleared under the executor callback lock to make sure that other threads wait for this
158+
buffer_->clear();
159+
}
160+
137161
private:
138162
void
139163
trigger_guard_condition()
@@ -142,6 +166,17 @@ class SubscriptionIntraProcess : public SubscriptionIntraProcessBase
142166
(void)ret;
143167
}
144168

169+
void
170+
invoke_executor_callback()
171+
{
172+
static const rmw_listener_event_t this_event = {
173+
.entity = this,
174+
.type = WAITABLE_EVENT
175+
};
176+
177+
executor_callback_(executor_, this_event);
178+
}
179+
145180
template<typename T>
146181
typename std::enable_if<std::is_same<T, rcl_serialized_message_t>::value, void>::type
147182
execute_impl()

rclcpp/include/rclcpp/experimental/subscription_intra_process_base.hpp

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -74,12 +74,16 @@ class SubscriptionIntraProcessBase : public rclcpp::Waitable
7474
void
7575
set_events_executor_callback(
7676
const rclcpp::executors::EventsExecutor * executor,
77-
rmw_listener_cb_t executor_callback) const override;
77+
rmw_listener_cb_t executor_callback) override;
7878

7979
protected:
8080
std::recursive_mutex reentrant_mutex_;
8181
rcl_guard_condition_t gc_;
8282

83+
const rclcpp::executors::EventsExecutor * executor_;
84+
rmw_listener_cb_t executor_callback_;
85+
std::mutex executor_callback_mutex_;
86+
8387
private:
8488
virtual void
8589
trigger_guard_condition() = 0;

rclcpp/include/rclcpp/qos_event.hpp

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -107,7 +107,7 @@ class QOSEventHandlerBase : public Waitable
107107
void
108108
set_events_executor_callback(
109109
const rclcpp::executors::EventsExecutor * executor,
110-
rmw_listener_cb_t executor_callback) const override;
110+
rmw_listener_cb_t executor_callback) override;
111111

112112
protected:
113113
rcl_event_t event_handle_;

rclcpp/include/rclcpp/waitable.hpp

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -175,7 +175,7 @@ class Waitable
175175
void
176176
set_events_executor_callback(
177177
const rclcpp::executors::EventsExecutor * executor,
178-
rmw_listener_cb_t executor_callback) const;
178+
rmw_listener_cb_t executor_callback);
179179

180180
private:
181181
std::atomic<bool> in_use_by_wait_set_{false};

rclcpp/src/rclcpp/qos_event.cpp

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -71,7 +71,7 @@ QOSEventHandlerBase::is_ready(rcl_wait_set_t * wait_set)
7171
void
7272
QOSEventHandlerBase::set_events_executor_callback(
7373
const rclcpp::executors::EventsExecutor * executor,
74-
rmw_listener_cb_t executor_callback) const
74+
rmw_listener_cb_t executor_callback)
7575
{
7676
rcl_ret_t ret = rcl_event_set_listener_callback(
7777
&event_handle_,

rclcpp/src/rclcpp/subscription_intra_process_base.cpp

Lines changed: 5 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -37,20 +37,12 @@ SubscriptionIntraProcessBase::get_actual_qos() const
3737
return qos_profile_;
3838
}
3939

40-
4140
void
4241
SubscriptionIntraProcessBase::set_events_executor_callback(
4342
const rclcpp::executors::EventsExecutor * executor,
44-
rmw_listener_cb_t executor_callback) const
43+
rmw_listener_cb_t executor_callback)
4544
{
46-
rcl_ret_t ret = rcl_guard_condition_set_listener_callback(
47-
&gc_,
48-
executor_callback,
49-
executor,
50-
this,
51-
true /*Use previous events*/);
52-
53-
if (RCL_RET_OK != ret) {
54-
throw std::runtime_error("Couldn't set guard condition callback");
55-
}
56-
}
45+
(void)executor;
46+
(void)executor_callback;
47+
assert(0);
48+
}

rclcpp/src/rclcpp/waitable.cpp

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -61,7 +61,7 @@ Waitable::exchange_in_use_by_wait_set_state(bool in_use_state)
6161
void
6262
Waitable::set_events_executor_callback(
6363
const rclcpp::executors::EventsExecutor * executor,
64-
rmw_listener_cb_t executor_callback) const
64+
rmw_listener_cb_t executor_callback)
6565
{
6666
(void)executor;
6767
(void)executor_callback;

0 commit comments

Comments
 (0)